Chris Gilligan - 353 - Feb 8, 2015

I am having some issues configuring Servicestack redis message queues so that messages don’t get lost if a request processor dies mid process when processing a request.

I am looking at distributing work via a redis queue for many processors in a cloud environment where 1 or more processors could be terminated without any notice meaning anything it was processing at the time needs to be picked up by another processor. Doing some basic tests on my pc i have found that while the queue itself will stay and work correctly the messages that were being processed at the time are lost.

Is there anything like AWS’s queue service where if a message is not successfully processed after x time it was taken from the queue it will be re-added to the queue to be attempted again?

Redis doesn’t have support for Acks/Naks so there’s a potential that the current message being processed could be lost if the AppDomain process gets killed in the middle of processing a message. You can use Rabbit MQ for durable messaging with Ack/Nak support. 
https://github.com/ServiceStack/ServiceStack/wiki/Rabbit-MQ
For RedisMQ we maintained a DateStamped DB column of when the record/message was processed and had a hourly task to just re-send any messages that weren’t flagged as completed, which took care of processing any lost messages (if any).

Chris Gilligan:

Demis any chance of being able to make that configurable based on message/class so you can set the time it waits instead of an hour?

If I change to Rabbit-MQ do you still get to set the number of threads for processors etc?  Remember something said that was redis only but it could have been old.

Chris Gilligan:

ignore the part about multi threads on Rabbit as i just read that you can do that

This is not something that’s in the framework - it’s part of of your System/App DB. i.e. Whenever I have an back-office task I’m processing via MQ, I’d also have a column to maintain the date when the task was successfully performed. The MQ Task itself would update the appropriate column for that Request. 

Then I’d have a HTTP Route (essentially a web cron job) that’s called at any desired interval (e.g. 5-60mins) that would go through all things we have MQ tasks for and check for any that hasn’t been processed within a given time interval, if they haven’t just send a new message.

I’d generally do this for any offline tasks (i.e. even if using RabbitMQ) as it maintains state when the task was successfully processed, which can come in handy at other times, e.g. if for any reason I wanted to re-run the task for a particular item I’d just clear the completed field and hit the HTTP Route again, which gets picked up and re-sent as normal.

Chris Gilligan:

thanks for the info.  With the queues is there a way to say process the normal and priority queue with 10 threads where if there is items in the priority queue then it uses the 10 threads and the normal queue stops and then then once some of the threads are free again they go back to the normal queue.

The reason is my process is resource intensive so i will have a max the server can handle and was hoping to put more transactional though one queue and batch through the other.

We do a lot of this in memory before but i want to move to a proper queue that can save state instead.

The threads are tied to each MQ and aren’t sharable. But the Priority Queue’s can be disabled if you just want to have all threads working off the same InQ.

Chris Gilligan:

in my case i need to limit my service to processing X concurrent requests but i need multiple queues where it will process the priority items first. Ideally i would need more than 2 priority levels so i assume this will be custom stuff i need to write but would you have any suggestions on the best way to go about it while taking advantage of as much of servicestack as possible?

You could create multiple Request DTO’s that all share the same interface and just send a different DTO, e.g: PublishMessage(request.Convert<PriorityTask2>()) and have their server impls delegate to a single impl that handles that interface/base class. This will let you have a multiple PriorityQ/InQ’s.

Chris Gilligan:

i get how i can have multiple queues but my issue is about the order of processing and limiting concurrency over all queues. In my case realistically i would have at least 4 priority levels and about 20 threads per worker per server and hopefully 2 servers to start with. 

Expected queue sizes for the priority levels are
Priority 1 - 0-10 items in MQ but mostly 0-2
Priority 2 - 0-100 items in MQ but mostly either 0 or 10-20
Priority 3 - 0-10,000 items in MQ but mostly either 0 or 1,000-10,000
Priority 4 - 0-1,000,000 items in MQ but mostly either 0 or 10,000-1,000,000

Priority 1 would be more our urgent realtime suff.
Priority 2 would be non urgent realtime
Priority 3 small batch stuff
Priority 4 large batch stuff.

It is not quite like that but gives a rough idea. Basically we want to tweak our works to put our servers under a ~85% CPU load which will be a max of X amount of concurrent request processing which is assumed to be 20 above. We then want to process items available int he highest priority before any others and cascade down to processing lower priorities as there is no work in the higher priorities.

I thought of using something similar the answer on http://stackoverflow.com/questions/2510975/c-sharp-object-pooling-pattern-implementation for controlling the concurrency but that would not handle the priority part.  My issue is more with the mqHost.RegisterHandler<Hello>(this.ServiceController.ExecuteMessage, 3); i think i need to investigate the RegisterHandler and find a better way to get items from multiple queues using my own threads. 

Sounds like you might want to execute messages in your own threadpool.

You can extract the Request DTO in the handler like:

mqHost.RegisterHandler<Hello>(msg => {
  var dto = msg.GetBody();
  if (dto.Priority == 1) { … }
  return this.ServiceController.ExecuteMessage(msg);
});

The handler is synchronous so needs to return the result in the handler like:
  return this.ServiceController.ExecuteMessage(msg);

But I’d first be try the default worker threadpool to see if this is even necessary, when threads are not processing messages they’re just sitting waiting idle on a lock.

Chris Gilligan:

yea i know about idle threads hence why i would have prefered to just say x threads over a group of queues.  Are there methods to get an item from the queue if there are any or return null right away if no items on the queue so i could run my own threads but i guess if i go that way i then need to manage putting results on queues as well and i loose even more of what servicestack offers. I guess i will just need to keep playing to find a good working solution. don’t really want a large number of idle threads if i can help it

Chris Gilligan:

Finally worked this one out Dennis.  There is a priority queue plugin for RabbitMQ that will be automatically integrated and no longer a plugin in version 3.5.0.  I have enabled that plugin and done some manual tests and it works great.  not i just need to work out what classes i need to extend in servicestack to use this functionality instead of the multiple queues and to enable the priorities and pass the command in when creating the queue

Chris Gilligan:

This turned out to be harder than expected.  I took a copy of the RabbitMQ implementation and manipulated it quite a but but hit 2 main issues i could not resolve. 1 i could not work out how to remove the built in way priority works for queues in servicestack and get all the messages on the one queue and 2 unless i stop the consume and restarted it then it would not care about priority but if stopped and started then priority worked. No idea why this is the case but for now i am just doing my own RabbitMQ wrapper from scratch based loosely on servicestack liked the way you did most things and wanted to still use a lot of the classes so it would be possible to switch back later once it is implemented in servicestack