MQ dlq retries and delayed messages

I am using ServiceStack MQ with RabbitMQ for persistence. I am registering handlers like:

mqServer.RegisterHandler<SendEmail>(appHost.ExecuteMessage);

It is working but I am a bit confused on how to handle dlq retries and situations that I don’t want to fire instantly.

Retries

The retry setting in RabbitMqServer does an instant retry if it fails but a real world scenario is an email server may go down for an hour. All our emails will be in dlq so we need a periodic retry that will succeed when the email server comes back online.

I looked at the example RabbitMQ project and saw it registers a BackgroundService object with:

services.AddHostedService<MqWorker>();

So I guess I need to create background service that manually dequeues messages from dlq but I don’t understand how to do this in multiple threads and lock the message until it’s processed.

I saw the example of adding a retry service to AfterInit but it looks like that will run after apphost starts so not suitable for this usecase and example was using database to read failures not the message queue so I am not exactly sure how it translates.

Does getting a message like below lock the message so no other thread can read it even before the Ack is sent?

IMessage<SendUserCreatedEmail> msgCopy = mqServer.CreateMessageQueueClient().Get<SendUserCreatedEmail>(QueueNames<SendUserCreatedEmail>.Dlq);

Lets say I want to retry failures every 30 minutes and process in multiple threads, what would be the recommended way to handle it?

Delayed messages

Sometime a user will create an action in the system but we don’t want to notify them instantly. Like they may book an appointment and we want to alert them 10 mins before the appointment.

Is there anyway to queue a message that can’t be processed until after a certain time?

Many thanks in advance

Consumer Acknowledgements is built into Rabbit MQ, when you read a message from RabbitMQ you would need to explicitly send an Ack() when you’ve successfully processed the message which is then removed from the queue or a Nack() when message processing failed and you want it requeued so others are able to process it. When a message is retrieved, no other threads have access to it until its explicitly nack’ed.

No, there’s no scheduler built into ServiceStack, Hangfire has features like this. Whenever we’ve needed a timer we just have a Linux cronjob call an API endpoint periodically or use an external service like uptimerobot.com or pingdom.com

1 Like

Thanks for the quick reply, that makes sense. If reading message locks it then I should be OK with a background service.

Sorry to trouble you again but I am handling fanning out events like this:

                mqServer.RegisterHandler<SendUserCreatedEmail>(appHost.ExecuteMessage);
                mqServer.RegisterHandler<SendUserCreatedPushNotification>(appHost.ExecuteMessage);
                mqServer.RegisterHandler<SendUserCreatedWebhook>(appHost.ExecuteMessage);

                mqServer.RegisterHandler<UserCreatedEvent>(m =>
                {
                    var producer = mqServer.MessageFactory.CreateMessageProducer();
                    HostContext.AppHost.PublishMessage(producer, new SendUserCreatedEmail().PopulateWith(m));
                    HostContext.AppHost.PublishMessage(producer, new SendUserCreatedPushNotification().PopulateWith(m));
                    HostContext.AppHost.PublishMessage(producer, new SendUserCreatedWebhook().PopulateWith(m));

                    return null;
                });

Is that the right way to do it? I couldn’t see any example so not sure if this is a good approach or not. The goal is when user is created I trigger multiple other events. I am not sure if there is a better way to achieve this or this is fine.

No need to call appHost.PublishMessage when you already have the producer, you can just publish the message yourself:

producer.Publish(new SendUserCreatedEmail(...));

Also an IMessage is published in RegisterHandler where the Request DTO would be in m.Body.