Performance of Service.PublishMessage - Suggestions

I need to process a message queue (at up to 200 messages per second).

After processing a message, it needs to be published to 2 or 3 other queues.

The Service.PublishMessage creates a new MessageProducer for each request. This opens a new connection to the MQ broker, which adds a large overhead and significantly slows down the rate I can process messages.

If the message handler returns a response DTO, this will be published to the ResponseDtoName.inq using the same MQ client that consumed the message from the queue, so there is no additional overhead.

Do you have any suggestions for increasing performance in this scenario?

Is it possible to give the message handler access to the underlying MQ client, e.g.:

rabbitMqServer.RegisterHandler<RecordActivityRequest>(ServiceController.ExecuteMessage);

public class ActivityService : Service
{
  public object Post(RecordActivityRequest request)
  {
    //process message ...
  
    //How can I get access to the MQ client here?
    var mqClient = Request.GetCurrentMqClient(); //Would return null if source of request is not MQ
	
    mqClient.Publish(new DoSomethingElse1());
    mqClient.Publish(new DoSomethingElse2());
    mqClient.Publish(new DoSomethingElse3());
	
    return null;
  }
}

You can get access to the MessagePublisher with:

var mqPublisher = base.MessageProducer;

Or a MQ Client with:

var mqClient = base.TryResolve<IMessageService>().MessageFactory
    .CreateMessageQueueClient()

To check if a Request is an MQ Request you can do:

var isMqRequest = base.Request is `BasicRequest`;

Looks like I wasn’t clear.

base.MessageProducer

will create a new MQ client for every request handled by the service (because Service is request scoped).

If I use ServiceController.ExecuteMessage to handle a message from a queue, a new instance of my service will be instantiated for each message (as per the usual request pipeline).

If I am handling hundreds of messages per second and calling base.PublishMessage in my service, this will result in hundreds of new MessageProducers being created per second.

Creating a new MessageProducer (or MessageQueueClient) opens a new TCP connection to the RabbitMQ server (using the underlying RabbitMQ client library).

If I try this, my message processing throughput drops by 50-75%.

Not exactly sure what you’re after then, the above is how you can create a MQ Client in your Service. It’s not ThreadSafe to share the same client instance across multiple Services so that’s not going to work. Ideally RabbitMQ would pool the underlying client connections (that’s how most network services maximize resources). Without connection pooling, (if it works in your use-case) you may be able to create “Batch” Services where clients publish multiple messages at once.

Alternatively you can try use a custom wrapper instance that uses [ThreadStatic] MQ Client instances or a singleton instance behind a global lock to make it Thread Safe. The cleanest/most transparent approach is to implement/register a custom IMessageFactory which implements client connection pooling behind the scenes so when the client is disposed it’s released back into a pool - but a pooled connection factory can be tricky to implement a robust implementation from scratch.

Let me try and simplify what I’m looking for.

Here’s what the RegisterHandler method looks like:

public virtual void RegisterHandler<T>(Func<IMessage<T>, object> processMessageFn, Action<IMessageHandler, IMessage<T>, Exception> processExceptionEx)

The IMessageHandler is passed to the processExceptionEx, but not to the processMessageFn.

If the processMessageFn signature was changed to Func<IMessageHandler, IMessage<T>, object>, this would allow me to write a message handler like this:

rabbitMqServer.RegisterHandler<RecordActivityRequest>((messageHandler, message) =>
{
    var req = new BasicRequest(message);
    
    req.Items["__SourceMqClient"] = messageHandler.MqClient;
	
    return ServiceController.ExecuteMessage(message, req);
}

public class ActivityService : Service
{
    public object Post(RecordActivityRequest request)
    {
        //process message ...
        
        //This is the same MqClient used by the IMessageHandler to consume the message from the queue.
        var mqClient = Request.Items["__SourceMqClient"];
        
        //If I use base.MessageProducer here, the IMessageFactory will create a new MqClient, which opens a new connection to the RabbitMQ server.
        //I want to avoid this overhead, so I use the existing MqClient instead.
		
        mqClient.Publish(new DoSomethingElse1());
        mqClient.Publish(new DoSomethingElse2());
        mqClient.Publish(new DoSomethingElse3());
        
        return null;
    }
}

This is a much less painful approach than implementing a customer IMessageFactory with connection pooling or fiddling with ThreadStatic wrappers.

What do you think?

The correct solution for this issue is to use connection pooling, any other workarounds like this are a side-effect of a temporal deficiency in one of the concrete MQ providers which doesn’t yet implement it. In this case it would either be a breaking change (which we can’t do) or pollute/complicate the API surface and disrupt the existing implementation to support multiple registration overloads - an ugly smell which we’d be stuck to live with forever once released.

I won’t be able to break the user-facing RegisterHandler API but I could extend the RequestFilter/ResponseFilter to include the IMessageHandler, it’s also a breaking change but it would be rarely used and cause a lot less disruption.

This could let you achieve a similar solution by populating a ThreadStatic instance before/after the message is processed, e.g:

public class MqRequestContext
{
    [ThreadStatic]
    public IMessageQueueClient Instance;
}

var mqServer =  new RabbitMqServer(connectionString: Config.RabbitMQConnString)
{
    RequestFilter = (msgHandler,r) => { 
        MqRequestContext.MqClient = msgHandler.MqClient; return r; },
    ResponseFilter = (msgHandler,r) => { 
        MqRequestContext.MqClient = null; return r; }
};

Then in your Service you could use the ThreadStatic instance or retrieve a new msg producer, e.g:

public class ActivityService : Service
{
    public object Post(RecordActivityRequest request)
    {
        var mqClient = MqRequestContext.MqClient ?? base.MessageProducer;
        mqClient.Publish(new DoSomethingElse1());
        mqClient.Publish(new DoSomethingElse2());
        mqClient.Publish(new DoSomethingElse3());
    }
}

Let me know if this would work for you and I’ll make the change?

Hmmm, not sure if it’s worth making a breaking change for this use case.

Probably better to do some basic connection pooling for the Rabbit MQ clients.