Rabbit Mq Batching Requests

Hi Mythz,

I thought you may have a better idea on what we are trying to achieve with this.

Here’s the bare minimum.

We are using Reactive Extensions to batch the requests up inside our IRequestBufferIndex when we receive them individually and package them up in a batch. Once the batch reaches the limit or Timeout happens we publish that “packaged” requests batch up doing Gateway.Publish(batchRequest)

public IMessageService MqService { get; set; }

public IRequestBufferIndex RequestBuffer { get; set; }

public object Any(SomeUpdate request) {
     var buffer = RequestBuffer.GetBuffer<SomeUpdate>();

     // If the action has not been specified yet, then provide one.
     if (buffer.OnBatchReady == null)
     {
         buffer.BatchSize = 1000;
         buffer.BatchTimeout = TimeSpan.FromMilliseconds(250);
         buffer.RegisterBatchHandler(batch => {
             var batchRequest = new BatchSiteUpdates();
             batchRequest.AddRange(batch);

             Gateway.Publish(batchRequest);
             // How can we Ack here once the whole batch instead of return the response underneath where framework does the Acking individually.
         });
      }

      buffer.AddItem(request);

      return new SiteUpdateResponse();
}

// This is to Accept 
public void Any(SomeUpdateBatch request)
{
      var db = OpenDb();
      foreach(var item in request) db.Save(ExtractResponse(item));
      db.SaveChanges();
}

// DTOs
public class SomeUpdateBatch : List<SomeUpdate>
{}

public class SomeUpdate
{}

I guess you get the idea based on above of what we are exactly wanting – the postponing/delaying of Acking the individual messages once after Sending that packaged batch.

Thanks,

There’s no batch/ack support for MQ’s, You could create a service that inherits List where it would be sent and handled as a single Request DTO:

class BatchSiteUpdates : List<SomeUpdate> {}

That is what has already been tried, a service that accepts a single request public object Any(SomeUpdate request). There is also a service that implements List<SomeUpdate> public void Any(SomeUpdateBatch request).

The issue is control over when the message is acknowledged. Is there a method for exerting more control over the message lifetime? We have investigated using the “pull” workflow using RabbitMqServer.CreateMessageQueueClient which provides the ability to explicitly Ack/Nack the message. Downside of this approach is it would require us to set up some sort of timer/scheduler to pull in up to X number of messages at a time.

My comment shows how you can accept a batch of Request DTOs as a single Request DTO.

There isn’t finer-grained control available via the messaging abstraction.

Exactly, it’s how you can send multiple Request DTOs in a single Request DTO.

This isn’t Auto Batching, it’s sending a collection of Request DTOs in a single Request DTO.

All MQ messages are handled by MessageHandler.cs the Ack is sent in the finally block after it’s processed.