RabbitMq error when responding on Temp Queue

Based on this previous discussion:

We are using a temp queue. We publish the original message like so:

var tempQueue = mqClient.GetTempQueueName();
mqClient.Publish(queueName, new Message<StateLoginRequest>(new StateLoginRequest()
    
                    {
                        UserId = user.UserId,
                        Username = req.UserName,
                        Password = req.Password,
                        ReplyToQueue = tempQueue
                    }));
    var response = mqClient.Get<StateLoginResponse>(tempQueue);
    mqClient.Ack(response);

Passing along the temp queue. This message is received on the normal queues. and eventually a response is queued up like so:

using (var mqClient = ResolveService<IMessageService>().CreateMessageQueueClient())
            {
                var tempQueue = request.ReplyToQ;

                var loginResponse = new StateLoginResponse()
                {
                    Transaction = "Here is your response data"
                };

                var message = new Message<StateLoginResponse>(loginResponse);

                mqClient.Publish(tempQueue, message);
            }

I can see the temp queue being created after the GetTempQueueName() call. But when attempting to send the response over the queue, I get this error:

ServiceStack.HttpError: The AMQP operation was interrupted: AMQP close-reason, initiated by Peer, code=406, text="PRECONDITION_FAILED - inequivalent arg 'durable' for queue 'mq:tmp:63f7a0ec340842b1a4aa74b3e536efae' in vhost '/': received 'true' but current is 'false'", classId=50, methodId=10, cause= ---> RabbitMQ.Client.Exceptions.OperationInterruptedException: The AMQP operation was interrupted: AMQP close-reason, initiated by Peer, code=406, text="PRECONDITION_FAILED - inequivalent arg 'durable' for queue 'mq:tmp:63f7a0ec340842b1a4aa74b3e536efae' in vhost '/': received 'true' but current is 'false'", classId=50, methodId=10, cause=
   at RabbitMQ.Client.Impl.SimpleBlockingRpcContinuation.GetReply(TimeSpan timeout)
   at RabbitMQ.Client.Impl.ModelBase.QueueDeclare(String queue, Boolean passive, Boolean durable, Boolean exclusive, Boolean autoDelete, IDictionary`2 arguments)
   at RabbitMQ.Client.Impl.AutorecoveringModel.QueueDeclare(String queue, Boolean durable, Boolean exclusive, Boolean autoDelete, IDictionary`2 arguments)
   at ServiceStack.RabbitMq.RabbitMqExtensions.RegisterQueue(IModel channel, String queueName)
   at ServiceStack.RabbitMq.RabbitMqProducer.PublishMessage(String exchange, String routingKey, IBasicProperties basicProperties, Byte[] body)
   at ServiceStack.RabbitMq.RabbitMqProducer.Publish(String queueName, IMessage message, String exchange)
   at LENSSX.StateHost.Services.StateAuthService.Any(InternalStateLoginResponse res) in C:\Projects\Work\LENSSX\src\LENSSX.StateHost\Services\StateAuthService.cs:line 74
   at LENSSX.StateHost.Services.StateResponseService.Any(StateHostResponse hostResponse) in C:\Projects\Work\LENSSX\src\LENSSX.StateHost\Services\StateResponseService.cs:line 43
   at LENSSX.StateHost.Services.StateAuthService.Any(StateLoginRequest request) in C:\Projects\Work\LENSSX\src\LENSSX.StateHost\Services\StateAuthService.cs:line 51
   at ServiceStack.Host.ServiceExec`1.<>c__DisplayClass3_0.<CreateExecFn>b__0(Object service, Object request)
   at ServiceStack.Host.ServiceRunner`1.<ExecuteAsync>d__13.MoveNext()
   --- End of inner exception stack trace ---

It seems the original creation of the Queue creates it with durable: false, and when sending out it is using durable: true.

The only difference between the startup initialization of these systems is this line in the startup of the receiver of the StateLoginRequest system:

QueueNames.SetQueuePrefix(mqConfig.StateQueuePrefix); // This lets us have different MQ Hosts sitting in different states, listening on their own Queues.

Other than that I’m not sure why a Publish call, which is given a specific queue name (a temp queue name in this case), would change any parameters.

My hunch is that in RabbitMqExtensions.cs, this conditional is returning true, thus causing QueueDeclare to be called:

if (!QueueNames.IsTempQueue(queueName)) //Already declared in GetTempQueueName()
            {
                channel.QueueDeclare(queueName, durable: true, exclusive: false, autoDelete: false, arguments: args);
            }

I think it’s returning true because I use the QueueNames.SetQueuePrefix(mqConfig.StateQueuePrefix);, so instead of mq:tmp, it’s checking for LA.mq:tmp, which it does not have.

Would it be feasible to allow temp queue creation to pass in a prefix?

EDIT
I confirmed this was the issue by quickly removing then re-adding my QueuePrefix:

QueueNames.SetQueuePrefix("");
mqClient.Publish(tempQueue, message);
QueueNames.SetQueuePrefix(Config.StateQueuePrefix);

After this the error goes away. I don’t want to leave this as is because I don’t know what ramifications this could have in other areas. If there might be another workaround, let me know.

Never set static configuration at runtime, it’s not ThreadSafe.

Are you saying the issue is due to having a custom prefix? If it is I’ll take a look to make sure it works when a custom prefix is used.

It is due to the custom prefix. The problem arises when one end has one set and the other end doesn’t (my producer doesn’t have a queue prefix, but my consumer of the StateLoginRequest does). My setup is as follows.

SS Traditional Web API (no queue prefix set, I build Queue Names Manually per request based on the users state, ex: LA.mq:StateLoginRequest)

SS Mq Only Host (this sits on each states server, with a Queue Prefix set to the state, ex: LA, so it listens for LA.mq:StateLoginRequest messages).

This lets my Web API pass messages around to hosts in many states, based on whatever state the user is in. A simple fix would be to allow prefixes to be passed into the GetTempQueueName(string prefix).

Either way, thank you for taking a look. Maybe there is a more elegant way.

Wait, what? If you use a prefix, it needs to be specified on both producer and consumers, that’s the only logical way it could work.

Global Prefixes is a static config for effectively declaring a namespace for their Queues, it’s not designed to be changed at runtime and client/servers shouldn’t be sending messages configured with different global prefixes.

TBH I don’t want to pollute runtime APIs for unsupported use cases like this, it wasn’t designed for it so you may end up running into more issues like this. I’d start with using your own custom API (starting from a copy of existing impls) which let you configure prefixes at runtime.

1 Like

I don’t plan on changing the QueuePrefix at runtime permanently (I know those static methods weren’t meant to be used that way). I just did that to verify my hunch (that it had to do with a prefix name).

You have me questioning my use of MQ now though. Isn’t MQ messaging meant to be used in such as way that it enables communication amongst different platforms / systems / hosts / clients / servers?

I understand your position of not changing for my one use case. I will at least state my argument here for arguments sake, because I believe ServiceStack has built a truly amazing, multi-system, multi-platform set of communication APIs and SDKs that are DTO based and easy to use (even fun to use). And you’ve brought me to a 95% solution even if I do need to implement my own MQ API’s.

But is my use case so out of the ordinary for a project like ServiceStack? Right now as it stands, SS allows me to easily publish and consume MQ messages (albeit they were both intended to be on the same QueuePrefix), even to runtime / temp generated queue names via the queueName parameter in the Publish method. But what about taking it a step further?

ServiceStack has enabled me to build an API that can route MQ messages to our New York, Texas, Louisiana, and Arkansas partners with ease, utilizing the same DTO’s (which you guys are all about). With only a few hundreds line of code, I had a multi-state, multi-platform communication infrastructure set up. That’s pretty huge.

Either way, I’ll start re-thinking my implementation for now. Maybe I’ll submit a feature request (or pull request) to handle this case of multi-server routing with the same DTO name. I honestly thought that was part of what the SetQueuePrefix was for. I see I’ve used it in a way in which you didn’t intend. But is that such a bad thing? :smiley:

“Failure is only the opportunity to begin again more intelligently.”
– Henry Ford

It’s about creating APIs that lead to a “pit of success”, the MQ prefix can be thought of as akin to a “Database Name” which is typically only configured on Startup in the connection string, you wouldn’t pollute each DB API with an optional DB name as it goes against how DB connections are typically used and the additional APIs to support niche use-cases would make the extra API usages confusing.

The appropriate way this should be implemented is to be able to configure an existing MQ client with a prefix, something like:

var mqClientState = mqClient.WithPrefix(state);

This would prevent needing to add overloads to every runtime API but as prefixes weren’t supposed to be configurable in this way it would require a fairly significant refactor to implement properly (i.e. for all MQ clients).

As it’s hard to tell how many APIs you need to specify a configurable prefix for, what I recommend you do instead is add the additional API you need as an extension method, which would have the functionality you’re after, e.g:

public static class RabbitMqProducerExt
{
    public static string GetTempQueueName(this RabbitMqProducer producer, string prefix)
    {
        var anonMq = producer.Channel.QueueDeclare(
            queue: prefix + "tmp:" + Guid.NewGuid().ToString("n"),
            durable:false,
            exclusive:true,
            autoDelete:true,
            arguments:null);

        return anonMq.QueueName;
    }
}

var replyQ = mqClient.GetTempQueueName(state);

After you know your full requirements and how many additional APIs you need you could come up with a cleaner solution, but this will get the client API usage you’re looking for.

1 Like

Yeah this is similar to what I did, thank you sir.

1 Like