I am using Rx to wrap a RedisSubscription
.
When I dispose of my Rx Observable
it should unsubscribe and release the client back to the the pool.
However, when I call UnSubscribeFromAllChannels
on my RedisSubscription
, it blocks.
Note: In the code below, I have commented out o.OnNext
, to ensure all RedisSubscription.OnMessage
is not being blocked or prevented from continuing by my code.
public static IObservable<T> ToObservable<T>(IRedisClientsManager clientManager, params string[] channels)
{
IRedisClient client = null;
IRedisSubscription subscription = null;
return Observable.Using(() =>
{
client = clientManager.GetClient();
subscription = client.CreateSubscription();
return subscription;
}, sub =>
{
return Observable.Create<T>(o =>
{
sub.OnMessage += (chan, data) =>
{
try
{
var message = data.FromJson<T>();
//o.OnNext(message);
}
catch (Exception ex)
{
//o.OnError(ex);
}
};
CancellationTokenSource token = new CancellationTokenSource();
Task.Run(() => sub.SubscribeToChannels(channels), token.Token);
return Disposable.Create(() =>
{
sub.UnSubscribeFromAllChannels(); // Code blocks here.
token.Cancel();
subscription?.Dispose();
client?.Dispose();
o.OnCompleted();
});
});
});
I have read the documentation saying that no other operations can execute on the RedisClient whilst it is subscribed (you can see the port numbers for the subscribe calls are distinct - 50136
), here is my Redis Log (abbreviated):
1450786138.851690 [0 127.0.0.1:50136] "INFO"
1450786138.852218 [0 127.0.0.1:50136] "ROLE"
1450786138.864982 [0 127.0.0.1:50136] "SUBSCRIBE" "__keyspace@0__:Tx_BTC_Awaiting"
1450786138.871703 [0 127.0.0.1:50137] "INFO"
1450786138.872156 [0 127.0.0.1:50137] "ROLE"
1450786138.934193 [0 127.0.0.1:50137] "ZADD" "Tx_BTC_Awaiting" "0" "10"
1450786138.934225 [0 127.0.0.1:50137] "EXEC"
1450786139.953101 [0 127.0.0.1:50136] "UNSUBSCRIBE"
You can see I am subscribing to __keyspace@0__:Tx_BTC_Awaiting
and at the end the Observable issues calls RedisSubscription.UnSubscribeFromAllChannels and an UNSUBSCRIBE message is logged on the Redis client.
However, the application blocks when subscribing and does not complete the rest of the Dispose callback.
When I step over UnSubscribeFromAllChannels the thread is sat on Socket.Receive, looking for data that either does not arrive or has already been picked up.
Any help or suggestions would be appreciated. Thank you.
Edit: If I check activeChannels on the RedisSubscription before stepping over UnsubscribeFromAllChannels I can see 1, if I step over, have it block, then pause debugging and check this value again I can see it says it has 0 active channels, so it has unsubscribed, it just does not return.
Edit: I’ve traced this down through the ServiceStack.Redis source and found it is waiting for data using ReadMultiData but it looks like it either cannot find any data on the socket or something else is locking it.