I am trying to use two dispatchers / threads using C# / WPF to Redis. One thread would get and set from the server. Other thread would listen to PubSub and respond appropriately. I wrote some simple code to start the two threads, do a few basic operations, and then shut down the threads. My problem is with shutting down the thread that’s being blocked by SubscribeToChannels. I tried to call redisSubscription.UnSubscribeFromAllChannels() from my main/controlling thread, but it looks like the code is just getting stuck in that function. How do I tell the thread that’s being blocked by SubscribeToChannels to exit? My code is below.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Windows.Threading;
namespace RedisInterface
{
public class ThreadDispatcher
{
protected System.Threading.Thread tThread;
protected System.Windows.Threading.Dispatcher oDispatcher;
protected string sName;
public ThreadDispatcher(string _sName, int iThreadDispatcherTimeoutToMakeThreadAndEnterLoop_ms)
{
Constructor(_sName, iThreadDispatcherTimeoutToMakeThreadAndEnterLoop_ms, null);
}
public ThreadDispatcher(string _sName, int iThreadDispatcherTimeoutToMakeThreadAndEnterLoop_ms, Action oPreDispatcherRun)
{
Constructor(_sName, iThreadDispatcherTimeoutToMakeThreadAndEnterLoop_ms, oPreDispatcherRun);
}
void Constructor(string _sName, int iThreadDispatcherTimeoutToMakeThreadAndEnterLoop_ms, Action oPreDispatcherRun)
{
System.Threading.AutoResetEvent oComThreadInstantiationLock = new System.Threading.AutoResetEvent(false);//starting state is non-signaled/red light/reset
sName = _sName;
tThread = new System.Threading.Thread(ThreadMain);
tThread.Name = sName;
tThread.SetApartmentState(System.Threading.ApartmentState.STA);
tThread.Start(new object[] { oComThreadInstantiationLock, oPreDispatcherRun });
if (!oComThreadInstantiationLock.WaitOne(iThreadDispatcherTimeoutToMakeThreadAndEnterLoop_ms))
{
throw new Exception("Failed to start the thread " + sName);
}
}
~ThreadDispatcher()
{
TerminateNicely();
}
void ThreadMain(object oArgs)
{
// unpack the arguments
object[] args = (object[])oArgs;
System.Threading.AutoResetEvent oComThreadInstantiationLock = (System.Threading.AutoResetEvent)args[0];
Action oPreDispatcherRun = (Action)args[1];
// initialize
oDispatcher = System.Windows.Threading.Dispatcher.CurrentDispatcher;
// flag initialization as complete
oPreDispatcherRun?.Invoke();
oComThreadInstantiationLock.Set();
// Start the message loop on this thread
System.Windows.Threading.Dispatcher.Run();
}
public string GetName()
{
return sName;
}
public System.Windows.Threading.Dispatcher GetDispatcher()
{
return oDispatcher;
}
public bool IsOnThread()
{
return System.Threading.Thread.CurrentThread == oDispatcher.Thread;
}
public void TerminateNicely()
{
if (oDispatcher != null)
oDispatcher.BeginInvokeShutdown(System.Windows.Threading.DispatcherPriority.Background);
}
}
class Program
{
static void Main(string[] args)
{
Program pr = new Program();
pr.RunMain();
}
//https://servicestack.net/redis
//https://github.com/ServiceStack/ServiceStack.Redis
//https://servicestack.net/buy/BUS-REDIS
private void RunMain()
{
ThreadDispatcher td1 = new ThreadDispatcher("Redis Read-Write", 30000);
ThreadDispatcher td2 = new ThreadDispatcher("Redis PubSub", 30000);
ServiceStack.Redis.IRedisSubscription redisSubscription = null;
ServiceStack.Redis.RedisClient redisClientListener = null;
td1.GetDispatcher().BeginInvoke(() => //using System.Windows.Threading in System.Windows.Presentation.dll
{
redisClientListener = new ServiceStack.Redis.RedisClient("localhost", 6379);
redisSubscription = redisClientListener.CreateSubscription();
redisSubscription.OnMessage += RedisMessageReceived;
redisSubscription.SubscribeToChannels("cmd_tracker2"); // Thread Blocking
});
td2.GetDispatcher().BeginInvoke(() => //using System.Windows.Threading in System.Windows.Presentation.dll
{
ServiceStack.Redis.RedisClient redisClient = new ServiceStack.Redis.RedisClient("localhost", 6379);
byte[] response = redisClient.Get("cmd_tracker2");
string sResponse = Encoding.UTF8.GetString(response);
Console.WriteLine(sResponse);
redisClient.PublishMessage("cmd_tracker2", "1@1@5.04.004_4");
redisClient.Set("cmd_tracker2", "1@1@5.04.004_4");
});
Console.ReadKey();
td2.TerminateNicely();
td1.TerminateNicely();
Console.ReadKey();
redisSubscription.UnSubscribeFromAllChannels();
Console.ReadKey();
}
private void RedisMessageReceived(string arg1, string arg2)
{
Console.WriteLine(arg1);
Console.WriteLine(arg2);
}
}
}