Issues with MQ upgrading from ServiceStack 6 to 8

I’m coming back to a project after years of being away, trying to get all the dependencies updated (including ServiceStack).

So far everything has worked fine, except our MQ system. Locally we run RabbitMQ with Docker, in the cloud we run AWS SQS. This post is specifically for local (RabbitMQ). I haven’t gotten to AWS SQS yet.

Our project setup runs something like this:

SS API → We mostly queue messages from here to go to SS Console App, and receive one type of message update from the SS Console App.
SS Console App (MQ Host) → This is the service that processes the bulk of commands queued from the SS API project.

What seems to be happening is that as soon as I queue a message from the API, the RabbitMqServer.RunLoop enters into the WorkerOperation.Stop. The only errors I’m getting in my seq are:

System.NullReferenceException: Object reference not set to an instance of an object.
   at ServiceStack.ServiceStackHost.ApplyRequestConvertersAsync(IRequest req, Object requestDto) in /home/runner/work/ServiceStack/ServiceStack/ServiceStack/src/ServiceStack/ServiceStackHost.Runtime.cs:line 45
   at ServiceStack.Host.ServiceController.ExecuteMessage(IMessage dto, IRequest req) in /home/runner/work/ServiceStack/ServiceStack/ServiceStack/src/ServiceStack/Host/ServiceController.cs:line 604
   at ServiceStack.ServiceStackHost.ExecuteMessage(IMessage mqMessage) in /home/runner/work/ServiceStack/ServiceStack/ServiceStack/src/ServiceStack/ServiceStackHost.cs:line 1787
   at ServiceStack.Messaging.MessageHandler`1.ProcessMessage(IMessageQueueClient mqClient, IMessage`1 message) in /home/runner/work/ServiceStack/ServiceStack/ServiceStack/src/ServiceStack/Messaging/MessageHandler.cs:line 151

And additionally:

System.NullReferenceException: Object reference not set to an instance of an object.
   at RabbitMQ.Client.Impl.AutorecoveringModel.Abort()
   at RabbitMQ.Client.Impl.AutorecoveringModel.Dispose(Boolean disposing)
   at RabbitMQ.Client.Impl.AutorecoveringModel.System.IDisposable.Dispose()
   at ServiceStack.RabbitMq.RabbitMqProducer.Dispose() in /home/runner/work/ServiceStack/ServiceStack/ServiceStack/src/ServiceStack.RabbitMq/RabbitMqProducer.cs:line 202

Relevant code:
From SS API Init:

container.Register<IMessageService>(c =>
                {
                    var rabbitMqServer = new RabbitMqServer(mqConfig.HostAndPort, mqConfig.Username, mqConfig.Password);
                    container.Register<ITempQueueCreator>(new RabbitMqTempQueueCreator(rabbitMqServer.MessageFactory));
                    return rabbitMqServer;
                });
var messageService = container.Resolve<IMessageService>();
messageService.RegisterHandler<InvestigationUpdate>(ExecuteMessage);
logger.Debug("Starting messaging service");
messageService.Start();

From SS Console App:

container.Register<IMessageService>(c =>
                    new RabbitMqServer(mqConfig.HostAndPort, mqConfig.Username, mqConfig.Password)
                    {
                        PublishMessageFilter = (queue, props, message) =>
                        {
                            logger.Info($"Publishing message {message.Body} to {queue}.");
                        },
                        CreateQueueFilter = (queue, dict) =>
                        {
                            logger.Info($"Creating queue {queue} with properties {dict}");
                        }
                    });
// register handlers ....
var mqServer = Container.Resolve<IMessageService>();
mqServer.Start();

It’s likely trying to process the messages before the AppHost has finished initializing. If you’re not configuring RabbitMQ with a Startup configuration like Configure.Mq.cs:

[assembly: HostingStartup(typeof(MyApp.ConfigureMq))]

public class ConfigureMq : IHostingStartup
{
    public void Configure(IWebHostBuilder builder) => builder
        .ConfigureServices((context, services) => {
            services.AddSingleton<IMessageService>(c => 
                new RabbitMqServer(context.Configuration.GetConnectionString("RabbitMq") ?? "localhost:5672"));
        })
        .ConfigureAppHost(afterAppHostInit: appHost => {
            appHost.Resolve<IMessageService>().Start();
        });
}

Then can you start the AppHost with an After AppHost Init callback, e.g:

AfterInitCallbacks.Add(host => mqServer.Start());

Yeah that was going to be my next task, moving to the new IHostingStartup / modular startup bootstrapping code.

One thing I’m having a hard time understanding is WHEN certain things like:

  • Configuration (appsettings, environment variables)
  • Logging (Serilog)
  • Other things like Database, MQ, etc…

Should be configured.

I guess the more concrete things like databases, MQ and even Logging can be done with an IHostingStartup Configure.X.cs. But where do I tell my new AppHost : AppHostBase, IHostingStartup class to use the MultiAppSettings configuration?

You wont be able to use MultiAppSettings in your App Configuration since your dependencies and plugins need to be configured in ConfigureServices() which happens before the AppHost is even constructed.

What we’re doing instead is populating a typed configuration class singleton using context.Configuration within ConfigureServices() which will let the rest of the application access it with either AppHost.Instance when an IOC is not available or as a regular dependency with AppConfig appConfig when using IOC:

This is populated using the JSON configuration in your appsettings.json, e.g:

You can also source the configuration from multiple sources using Configuration Providers.

Ahh ok, thank you for that. So I might need to use the ConfigureAppConfiguration extension to add custom config setup.

I see in the rest of that AiServer project you use

services.AddServiceStack(typeof(MyServices).Assembly);

And

app.UseServiceStack(new AppHost(), options => { options.MapEndpoints(); });

Where can I find more documentation on these? I’ve been reading the modular startup migration docs but haven’t seen this before.

This is part of our full integration with ASP .NET Core 8 which now uses Endpoint Routing by default.

Which is what all our new Identity Auth templates now use so you can get a new working configuration by downloading any of the new .NET 8 templates.

Ok, back to this:

I’ve re-written all of the bootstrapping code to fall more in line with .NET 8 / SS 8.5 (IHostingStartup), following your guidance. But still the same problem (msgLock received, disposing all MQ threads).

Here is my Configure.Mq.cs code:

using Amazon.SQS;
using LENSSX.Logic.Helpers;
using LENSSX.Logic.Mq;
using LENSSX.ServiceModel.InternalMessages;
using Serilog;
using ServiceStack.Aws.Sqs;
using ServiceStack.Messaging;
using ServiceStack.RabbitMq;

[assembly: HostingStartup(typeof(ConfigureMq))]

namespace LENSSX.WebHost;
public class ConfigureMq : IHostingStartup
{
    public void Configure(IWebHostBuilder builder) => builder
        .ConfigureServices((context, services) =>
        {
            var brokerType = context.Configuration.GetValue<string>("MessageBroker");
            
            Log.Logger.Information($"Initializing MQ: {brokerType}");
            
            if (brokerType == "RabbitMQ")
            {
                services.AddSingleton<IMessageService>(c => 
                    new RabbitMqServer(context.Configuration.GetConnectionString("RabbitMq") ?? "localhost:5670"));
                
                services.AddSingleton<ITempQueueCreator>(c =>
                {
                    var mqServer = c.GetRequiredService<IMessageService>();
                    return new RabbitMqTempQueueCreator(mqServer.MessageFactory);
                });
            }
            else
            {
                var sqsConnectionFactory = new SqsConnectionFactory(() =>
                    context.Configuration.GetAWSOptions().CreateServiceClient<IAmazonSQS>());

                var sqsMqServer = new SqsMqServer(sqsConnectionFactory)
                {
                    DisableBuffering = true,
                    DisablePriorityQueues = true,
                };

                var factory = sqsMqServer.MessageFactory as ISqsMqMessageFactory;
                var tempQueueCreator = new SqsTempQueueCreator(factory.QueueManager);
                services.AddSingleton<IMessageService>(sqsMqServer);
                services.AddSingleton<ITempQueueCreator>(tempQueueCreator);
            }

            var environmentName = context.Configuration.GetValue<string>("Environment:Name");
            QueueNames.ResolveQueueNameFn = (typeName, queueSuffix) =>
            {
                // If this is an investigation update, then we need to append the environment.
                var queueName = typeName == nameof(InvestigationUpdate) ?
                    $"{(environmentName.IsNullOrWhitespace() ? string.Empty : $"{environmentName}.")}{QueueNames.ResolveQueueName(typeName, queueSuffix)}" :
                    QueueNames.ResolveQueueName(typeName, queueSuffix);
                return queueName;
            };

            Log.Logger.Information("Initializing MQ Done");
        })
        .ConfigureAppHost(afterAppHostInit: appHost =>
        {
            Log.Logger.Information("Starting MQ");
            var messageService = appHost.Resolve<IMessageService>();
            messageService.RegisterHandler<InvestigationUpdate>(appHost.ExecuteMessage);
            messageService.Start();
            Log.Logger.Information("Starting MQ Done");
        });
}

But the same thing occurs, here is the log from the Web side that sends the request through MQ (first part is my logging):

[21:15:02 INF] Preparing to send State Request
[21:15:04 INF] Using queue name Local.LA.mq:StateHostRequest.inq
[21:15:04 INF] Using queue name Local.LA.mq:StateHostRequest.inq
[21:15:05 INF] Sent StateHostRequest from InvestigationQueryService for 267a433e-f36b-1410-8791-00e1157b7105 on queue Local.LA.mq:StateHostRequest.inq
[21:15:05 DBG] msgLock received...
[21:15:05 DBG] Disposing all Rabbit MQ Server worker threads...
[21:15:05 DBG] Stop Command Issued
[21:15:05 DBG] Stopping all Rabbit MQ Server worker threads...
[21:15:05 DBG] Stopping Rabbit MQ Handler Worker: Local.mq:InvestigationUpdate.priorityq...
[21:15:05 DBG] Stopping Rabbit MQ Handler Worker: Local.mq:InvestigationUpdate.inq...
[21:15:05 INF] State Host Request was sent = True

I can see my other service that is listening for StateHostRequests, receives it, processes it, and returns it to the InvestigationUpdate.inq:

[21:15:07 INF] Publishing message LENSSX.ServiceModel.InternalMessages.InvestigationUpdate to Local.mq:InvestigationUpdate.inq.
[21:15:07 INF] Creating queue Local.mq:InvestigationUpdate.inq with properties System.Collections.Generic.Dictionary`2[System.String,System.Object]
[21:15:07 INF] Received StateHostResponse 8938174b-1952-4da1-80cd-e39892bc49e5 for request 44893aca-27f0-4651-a383-82aa87897807 of type VehicleRegistration with Query ID 267a433e-f36b-1410-8791-00e1157b7105
[21:15:07 INF] Running 2 follow up queries for State Request Log 44893aca-27f0-4651-a383-82aa87897807 Query Id 267a433e-f36b-1410-8791-00e1157b7105
[21:15:07 INF] Sending Followup State Switch of type DriverLicenseQuery for query 267a433e-f36b-1410-8791-00e1157b7105

Can you better explain what the issue is now? and when it occurs, i.e. previously there was an issue on Startup, but is this happening when you shutdown the App?

For anyone else that runs into this, it turned out to be an old using statement that I believe was disposing the IMessageService everytime afterwards:

using var messageService = TryResolve<IMessageService>();
....
1 Like