Improving “RPC” support for RabbitMQ in ServiceStack
After quite a bit of struggle, I’ve finally found a temporary solution for the “RPC” mode of using MQ. Most if not all of the observations below apply uniquely to RabbitMQ, but the changes I’d like to propose should not ‘ruin’ purity of ServiceStack’s interfaces which are MQ implementation agnostic.
The way “RPC” is implemented today does not work great, nor will it ever reliably work, but I think it can be fixed with not a lot of effort.
By default, ServiceStack will publish messages with responses to the .inq of the response DTO: https://github.com/ServiceStack/ServiceStack/wiki/Rabbit-MQ#messages-with-responses-are-published-to-the-response-inq
This sort-of works, except when it doesn’t ;), here’s what’s problematic:
1. When multiple requests are “in flight”, multiple consumers will call ‘Get’ on the same .inq, which means that sometimes they will get the response not meant for them. The field “ReplyId” could and should be used to distinguish if the response is meant for the consumer in question, but the problem becomes what to do with the message. You must Nak it, otherwise it stays in limbo (not consumed but not visible by other consumers), but then the same message gets returned to you when you call “Get” again. In fact, I’ve seen - in roughly 50% of tries, so often - a degenerate case where two consumers pull other consumer’s response, then Nak it, then pull it out again, in a loop, endlessly.
2. Also, when multiple requests are in flight, I’ve occasionally seen the exception “Pipelining of requests forbidden”. This is a purely technical issue in the RabbitMQ.Client (or its usage by ServiceStack) but is related to the fact that we’re reusing the same queue as reply queue, so if we can avoid that we can avoid this exception too.
Here’s what can be done to work around the problem:
1. You could use ReplyTo to publish response to a given queue, supported by ServiceStack and documented here: https://github.com/ServiceStack/ServiceStack/wiki/Rabbit-MQ#responses-from-messages-with-replyto-are-published-to-that-address In fact, this is the recommended way to implement “RPC”, both by RabbitMQ tutorial and by the author of the book “RabbitMQ in Action”. However, you cannot reuse the queue or you face again the issues above, so you must create a unique queue for each “RPC call”. But then number of queues with silly names (to assure uniqueness you’ll probably use GUID as a part of the name) will grow rapidly and become a burden. The workaround I’ve found is to configure RabbitMQ server to expire all queues with no activity for N milliseconds, details here: http://www.rabbitmq.com/ttl.html (see section Queue TTL). This works and is what I’m using at the moment, but there is a better solution.
2. It’s a bit difficult to see, but both RabbitMQ tutorial and the book mentioned above use a ‘special’ kind of queue to publish the call response to: it’s a queue with no name (so it gets auto-named by RabbitMQ) which is also auto deleted. It’s very easy to create this queue, using (RabbitMQ IModel’s) QueueDeclare with no parameters will give you exactly the kind of queue you need. Its response can be converted to string to find out what the created queue’s name is. Unfortunately, this method isn’t exposed in MQ agnostic way in ServiceStack, so we have no way of calling it.
So finally, what I propose is that ServiceStack adds a method somewhere (not sure where it would be most appropriate, MqProducer or MqQueueClient or MqServer even) to simply create a queue with no parameters, which would in case of RabbitMQ do the right thing (delegate to the QueueDeclare with no parameters too); for Redis, please see below. This method would return string which we can then use to set “ReplyTo” knowing that the queue will be deleted as soon as we drain this queue (basically by consuming the only message posted to it). For Redis, since the mechanism of auto-delete doesn’t exist, I’m not sure if this method should simply not do anything and return null or even throw. In order to aid API discoverability, the ServiceStack’s method name might even include words like “AutoDelete” and/or “AutoName” and such so that it’s purpose is clear.
I hope this is not a big change to ask for, yet it will help many other trying to use the “RPC” pattern with ServiceStack and RabbitMQ.
I’m going to have to do a bit more R&D on this, I’ll look into your proposals first.
If you have a small RPC test example of what you mean that just uses RabbitMQ.Client that would help.
Drazen Dotlic:
It’s actually directly demonstrated in their own tutorial for RPC here: http://www.rabbitmq.com/tutorials/tutorial-six-dotnet.html (13th line or so in the RPCClient.cs: replyQueueName = channel.QueueDeclare(); note conversion from QueueDeclareOk into string replyQueueName)
Their example is actually incorrect but works by accident
For example, they check messages in a loop and just ignore ones that do not have same correlationId, like I said this doesn’t work (messages that are not Nak’d remain in limbo) plus who’s gonna post to this queue but us - nobody knows the reply queue name since it’s generated by RabbitMQ as illustrated in ‘line 13’ above.
Alternatively, take their example, then fix the name of the reply queue to a fixed value like “ResponseQueue.inq” (so create a named queue) or whatever, then launch two clients and one server and watch in horror how messages get lost (one of the clients will eventually take message meant for the other, then just ignore it).
Then add “Nak” for messages with not matching correlation id then watch in horror how clients ‘steal’ each other’s message, Nak it back into queue, then get it again and so on.
The key is line 13 or so - by calling QueueDeclare() we get back the queue suitable for our purposes, I simply want a way to be able to call ServiceStack to call this for me, I want to keep the MQ abstraction as much as I can (you too probably).
If you have the chance, take a look at RPC chapter in “RabbitMQ in Action”, it says the same thing.
The main reason for the problem appears to be that the order of messages (when pulled or Nak’d back into the queue) isn’t defined, which intuitively makes sense to me.
Note also that once one uses this mechanism to create a reply queue per call, there is no more need for correlation id, since the reply queue is never shared with anyone else and only used for the response message (and is deleted automatically because it’s created as auto-delete).
We have been using the workaround listed in the previous message and haven’t had a single issue since, what I’m proposing is an improvement and guidance for others who might find themselves in a similar situation.
I’ve added GetTempQueueName() on all MQ providers in this commit:
https://github.com/ServiceStack/ServiceStack/commit/2d5f707ab436a1371e0186117de1e668a2ea8831
As well as RPC tests with multiple concurrent clients using the temp queue names for all MQ providers. The exclusive non-durable queue’s prevent them from showing up as a queue in RabbitMQ server so it should do what you need, if not let me know what needs changing.
The Queue Names for all providers are just being created with a configurable prefix and a Guid seen in: https://github.com/ServiceStack/ServiceStack/commit/d479a14f3649df0a111b519ac98521e14acd397f
These changes are now available on MyGet:
https://github.com/ServiceStack/ServiceStack/wiki/MyGet
Drazen Dotlic:
Great! One small question though: why not pass empty string to RabbitMQ, it will generate the queue name for you automatically? Did you want to unify the name strategy for other purposes in ServiceStack?
I think this time I’ll definitely use MyGet feed to test the change because one of the previous fixes (unrelated) I haven’t tested and now the fix is available through NuGet looks like it’s not working (will report this in the original issue).
Yeah wanted all queue names to share the same strategy.