Race condition in SqsMqBufferFactory

We’re running ServiceStack 6.4.0 on .NET 6. We’re using AWS SQS for deferred messages.

We are switching to a larger fleet of smaller servers on Kubernetes, and we’re starting to see the situation when application instances stop sending messages.

We’re registering SqsMqServer as IMessageService and using IMessageFactory.CreateMessageProducer().Publish() to send the messages.

Sometimes, the messages get stuck. It means that Publish() does accept them and doesn’t throw any errors, but the messages don’t actually get sent to SQS. The only cure for that is to restart the application.

We’ve taken memory dumps of the application when it was in that state. There are some outbound messages that are permanently stuck in the queue:

It looks like the problem is in this part of the code:

The timer seems to end up not being scheduled to run, and the outbound queue is never getting drained:

I’m pretty sure that the problem is in the time callback, which is not really reentrant:

On line 62, the timer schedules the callback (OnTimerElapsed), and in the case of high CPU load and lots of thread switching, there’s a chance that the concurrent invocation of the callback will fire in between lines 62 and 65:

Invocation 1Invocation 2
private void OnTimerElapsed(object state)
{
    if (Interlocked.CompareExchange(ref processingTimer, 1, 0) > 0)
        return;

    try
    {
        foreach (var buffer in queueNameBuffers)
        {
            buffer.Value.Drain(fullDrain: false);
        }
    }
    finally
    {
        if (bufferFlushIntervalSeconds <= 0)
        {
            timer.Dispose();
            timer = null;
        }
        else
        {
            timer.Change(bufferFlushIntervalSeconds, Timeout.Infinite);
        }
// Invocation 2 starts here





// Invocation 2 ends here

        Interlocked.CompareExchange(ref processingTimer, 0, 1);
    }
}
























private void OnTimerElapsed(object state)
{
    if (Interlocked.CompareExchange(ref processingTimer, 1, 0) > 0)
        return;






The concurrent invocation will return right away without doing anything (because processingTimer is still 1 at this point), so the timer doesn’t end up getting scheduled, and is left in this state forever. Or at least until someone sets BufferFlushIntervalSeconds, which we only do once on application startup.

You can easily reproduce it by running the application and putting a breakpoint on line 65:

This never happens if the server has a lot of RAM and CPU’s, but if the server is small and there’s a lot of CPU activity (especially on application startup), it happens often enough to become a problem. It’s not reported to the caller as an exception, so we lose user data without even noticing it.

We’ve fixed it with this following patch:

diff --git a/ServiceStack.Aws/src/Directory.Build.props b/ServiceStack.Aws/src/Directory.Build.props
index 4b03e8e52..76380c0fe 100644
--- a/ServiceStack.Aws/src/Directory.Build.props
+++ b/ServiceStack.Aws/src/Directory.Build.props
@@ -1,7 +1,7 @@
 <Project>

   <PropertyGroup>
-    <Version>6.4.0</Version>
+    <Version>6.4.0.1-custom</Version>
     <Authors>ServiceStack</Authors>
     <Company>ServiceStack, Inc.</Company>
     <Copyright>&#169; 2008-2022 ServiceStack, Inc</Copyright>
diff --git a/ServiceStack.Aws/src/ServiceStack.Aws/Sqs/SqsMqBufferFactory.cs b/ServiceStack.Aws/src/ServiceStack.Aws/Sqs/SqsMqBufferFactory.cs
index 3f928d256..b07caccf0 100644
--- a/ServiceStack.Aws/src/ServiceStack.Aws/Sqs/SqsMqBufferFactory.cs
+++ b/ServiceStack.Aws/src/ServiceStack.Aws/Sqs/SqsMqBufferFactory.cs
@@ -8,7 +8,11 @@ namespace ServiceStack.Aws.Sqs
     public class SqsMqBufferFactory : ISqsMqBufferFactory
     {
         private readonly SqsConnectionFactory sqsConnectionFactory;
-        private static readonly ConcurrentDictionary<string, ISqsMqBuffer> queueNameBuffers = new ConcurrentDictionary<string, ISqsMqBuffer>();
+
+        private static readonly ConcurrentDictionary<string, ISqsMqBuffer> queueNameBuffers =
+            new ConcurrentDictionary<string, ISqsMqBuffer>();
+
+        private int disposing;
         private Timer timer;
         private int processingTimer = 0;

@@ -22,27 +26,36 @@ public SqsMqBufferFactory(SqsConnectionFactory sqsConnectionFactory)
         public Action<Exception> ErrorHandler { get; set; }

         private int bufferFlushIntervalSeconds = 0;
+
         public int BufferFlushIntervalSeconds
         {
             get { return bufferFlushIntervalSeconds; }
             set
             {
+                if (disposing > 0)
+                {
+                    return;
+                }
+
                 bufferFlushIntervalSeconds = value > 0
                     ? value
                     : 0;

                 if (timer != null)
-                    return;
-
-                timer = new Timer(OnTimerElapsed, null, bufferFlushIntervalSeconds, Timeout.Infinite);
+                {
+                    timer.Change(bufferFlushIntervalSeconds, Timeout.Infinite);
+                }
+                else
+                {
+                    timer = new Timer(OnTimerElapsed, null, bufferFlushIntervalSeconds, Timeout.Infinite);
+                }
             }
         }

         private void OnTimerElapsed(object state)
         {
-            if (Interlocked.CompareExchange(ref processingTimer, 1, 0) > 0)
+            if (disposing > 0 || Interlocked.CompareExchange(ref processingTimer, 1, 0) > 0)
                 return;
-
             try
             {
                 foreach (var buffer in queueNameBuffers)
@@ -52,19 +65,13 @@ private void OnTimerElapsed(object state)
             }
             finally
             {
-                if (bufferFlushIntervalSeconds <= 0)
-                {
-                    timer.Dispose();
-                    timer = null;
-                }
-                else
+                Interlocked.CompareExchange(ref processingTimer, 0, 1);
+                var currentBufferFlushIntervalSeconds = bufferFlushIntervalSeconds;
+                if (disposing == 0 && currentBufferFlushIntervalSeconds > 0)
                 {
-                    timer.Change(bufferFlushIntervalSeconds, Timeout.Infinite);
+                    timer.Change(currentBufferFlushIntervalSeconds, Timeout.Infinite);
                 }
-
-                Interlocked.CompareExchange(ref processingTimer, 0, 1);
             }
-
         }

         public ISqsMqBuffer GetOrCreate(SqsQueueDefinition queueDefinition)
@@ -87,6 +94,12 @@ public ISqsMqBuffer GetOrCreate(SqsQueueDefinition queueDefinition)

         public void Dispose()
         {
+            if (Interlocked.CompareExchange(ref disposing, 1, 0) > 0)
+            {
+                return;
+            }
+
+            timer?.Dispose();
             foreach (var buffer in queueNameBuffers)
             {
                 buffer.Value.Dispose();

The 8.8 version of this code is not materially different.

Can you please apply same or similar fix upstream, and back-port it to 6.4?

Thank you!


P. S.

On this line of code:

it says bufferFlushIntervalSeconds, but the Timer constructor treats this value as milliseconds. You might want to rename it.

I’ve added these changes to SqsMqBufferFactory in this commit.

This change is available from v8.8.1+ that’s now available in the pre release packages.

Note: ServiceStack only ships forward-only releases, i.e. bugs and new features are first deployed to pre-release packages before being released to NuGet in the next version. If you can’t upgrade to the latest version you can download the v6.4 Source code and apply the fix to a local snapshot.

https://github.com/ServiceStack/ServiceStack/archive/refs/tags/v6.4.zip

1 Like

Thank you! Is there a chance you could rename the property BufferFlushIntervalSeconds as well, so no one gets burned by it?

That would be a breaking change, I changed the timer to use ms (e.g. bufferFlushIntervalSeconds * 1000).