RedisSubscription UnSubscribeFromAllChannels is blocking

I am using Rx to wrap a RedisSubscription.

When I dispose of my Rx Observable it should unsubscribe and release the client back to the the pool.

However, when I call UnSubscribeFromAllChannels on my RedisSubscription, it blocks.

Note: In the code below, I have commented out o.OnNext, to ensure all RedisSubscription.OnMessage is not being blocked or prevented from continuing by my code.

    public static IObservable<T> ToObservable<T>(IRedisClientsManager clientManager, params string[] channels)
    {
        IRedisClient client = null;
        IRedisSubscription subscription = null;
        return Observable.Using(() =>
        {
            client = clientManager.GetClient();
            subscription = client.CreateSubscription();
            return subscription;
        }, sub =>
        {
            return Observable.Create<T>(o =>
            {
                sub.OnMessage += (chan, data) =>
                {
                    try
                    {
                        var message = data.FromJson<T>();
                        //o.OnNext(message);
                    }
                    catch (Exception ex)
                    {
                        //o.OnError(ex);
                    }
                };

                CancellationTokenSource token = new CancellationTokenSource();
                Task.Run(() => sub.SubscribeToChannels(channels), token.Token);

                return Disposable.Create(() =>
                {
                    sub.UnSubscribeFromAllChannels(); // Code blocks here.
                    token.Cancel();
                    subscription?.Dispose();
                    client?.Dispose();
                    o.OnCompleted();
                });
            });
        });

I have read the documentation saying that no other operations can execute on the RedisClient whilst it is subscribed (you can see the port numbers for the subscribe calls are distinct - 50136), here is my Redis Log (abbreviated):

	1450786138.851690 [0 127.0.0.1:50136] "INFO"
	1450786138.852218 [0 127.0.0.1:50136] "ROLE"
	1450786138.864982 [0 127.0.0.1:50136] "SUBSCRIBE" "__keyspace@0__:Tx_BTC_Awaiting"
	1450786138.871703 [0 127.0.0.1:50137] "INFO"
	1450786138.872156 [0 127.0.0.1:50137] "ROLE"
	1450786138.934193 [0 127.0.0.1:50137] "ZADD" "Tx_BTC_Awaiting" "0" "10"
	1450786138.934225 [0 127.0.0.1:50137] "EXEC"
	1450786139.953101 [0 127.0.0.1:50136] "UNSUBSCRIBE"

You can see I am subscribing to __keyspace@0__:Tx_BTC_Awaiting and at the end the Observable issues calls RedisSubscription.UnSubscribeFromAllChannels and an UNSUBSCRIBE message is logged on the Redis client.

However, the application blocks when subscribing and does not complete the rest of the Dispose callback.

When I step over UnSubscribeFromAllChannels the thread is sat on Socket.Receive, looking for data that either does not arrive or has already been picked up.

Any help or suggestions would be appreciated. Thank you.

Edit: If I check activeChannels on the RedisSubscription before stepping over UnsubscribeFromAllChannels I can see 1, if I step over, have it block, then pause debugging and check this value again I can see it says it has 0 active channels, so it has unsubscribed, it just does not return.

Edit: I’ve traced this down through the ServiceStack.Redis source and found it is waiting for data using ReadMultiData but it looks like it either cannot find any data on the socket or something else is locking it.

You’re starting the subscriptions in a background thread and trying to close it in another thread, it needs to be unsubscribed from the same blocked thread the subscriptions we’re made on, e.g. in its callback handler. You can find some examples of this in RedisPubSubTests.cs.

Redis Subscriptions can be cumbersome to work with directly which is why we’ve encapsulated it in a friendlier RedisPubSubServer which manages the background subscription thread for you.