Hi Mythz,
I thought you may have a better idea on what we are trying to achieve with this.
Here’s the bare minimum.
We are using Reactive Extensions to batch the requests up inside our IRequestBufferIndex
when we receive them individually and package them up in a batch. Once the batch reaches the limit or Timeout happens we publish that “packaged” requests batch up doing Gateway.Publish(batchRequest)
public IMessageService MqService { get; set; }
public IRequestBufferIndex RequestBuffer { get; set; }
public object Any(SomeUpdate request) {
var buffer = RequestBuffer.GetBuffer<SomeUpdate>();
// If the action has not been specified yet, then provide one.
if (buffer.OnBatchReady == null)
{
buffer.BatchSize = 1000;
buffer.BatchTimeout = TimeSpan.FromMilliseconds(250);
buffer.RegisterBatchHandler(batch => {
var batchRequest = new BatchSiteUpdates();
batchRequest.AddRange(batch);
Gateway.Publish(batchRequest);
// How can we Ack here once the whole batch instead of return the response underneath where framework does the Acking individually.
});
}
buffer.AddItem(request);
return new SiteUpdateResponse();
}
// This is to Accept
public void Any(SomeUpdateBatch request)
{
var db = OpenDb();
foreach(var item in request) db.Save(ExtractResponse(item));
db.SaveChanges();
}
// DTOs
public class SomeUpdateBatch : List<SomeUpdate>
{}
public class SomeUpdate
{}
I guess you get the idea based on above of what we are exactly wanting – the postponing/delaying of Acking the individual messages once after Sending that packaged batch.
Thanks,