ServerEvents GetAllSubscriptionInfos() method contains unsubscribed connections when using Redis

Hello,

I have been using MemoryServerEvents without issue and would now like to use RedisServerEvents to add one or more extra web servers.

Currently, when the OnUnsubscribe event is triggered on the server I use GetAllSubscriptionInfos() to find out if there are any other sessions with the same custom session AccountGuid. If there is then I don’t send my custom message to the channel to say the device has gone offline.

After switching to RedisServerEvents it seems the GetAllSubscriptionInfos() method still contains the unsubscribed connection.

Is there something else I should be doing to ensure subscriptions are removed when they unsubscribe?

Here is how I am registering the plugin:

        this.Plugins.Add(new ServerEventsFeature() {
                        LimitToAuthenticatedUsers = true,                 
                        NotifyChannelOfSubscriptions = false,
                        OnSubscribe = async (sub) =>
                        {
                            using (var service = HostContext.Resolve<ServerEventsSubscribersService>())
                            {
                                var sessionKey = SessionFeature.GetSessionKey(sub.SessionId);
                                var session = HostContext.Cache.Get<CustomUserSession>(sessionKey);

                                var msg = new ServiceModel.ConnectionMessageDTO(ServiceModel.ConnectionMessageTypeEnumDTO.Subscribed, sub.SubscriptionId, session.AccountGuid);
                                service.ServerEvents.NotifyChannel("channel", msg);
                               
                            }
                        },
                        OnUnsubscribe = (sub) =>
                        {
                            using (var service = HostContext.Resolve<ServerEventsSubscribersService>())
                            {
                                var sessionKey = SessionFeature.GetSessionKey(sub.SessionId);
                                var session = HostContext.Cache.Get<CustomUserSession>(sessionKey);

                                if (session != null)
                                {
                                    foreach (var sub in service.ServerEvents.GetAllSubscriptionInfos())
                    {
                        var sessionKey = SessionFeature.GetSessionKey(sub.SessionId);
                        var session = HostContext.Cache.Get<CustomUserSession>(sessionKey);
                        
                        if (session != null)
                        {
                            if (session.AccountGuid == accountGuid)
                            {
                                // Account online but shouldn't be (unless there were more connections but this is not the case)
                                
                            }
                        }
                    }                        
                                }
                            }
                        }                
                    });

I found RedisServerEvents Subscriptions and tried using ((RedisServerEvents)_serverEvents).UnRegisterExpiredSubscriptions(); before GetAllSubscriptionInfos() but it didn’t help.

Thank is advance. :slightly_smiling_face:

Hi @VAST,

The Redis implementation of unsubscribing a client is asynchronous via a Redis event to all apphosts. AppHosts then process the event, remove it from a local memory set and then call Redis to remove the subscription info and related ID keys.

The Redis implementation of GetAllSubscriptionInfos scans keys related to channel subscription keys and returns all SubscriptionInfo based on those keys. This means that during an OnUnsubscribe event there could be pending messages yet to be processed by AppHosts to remove keys in Redis that are used in GetAllSubscriptionInfos.

What kind of guarantees does this system require? Fetching all subscriptions and then fetching each session one by one for any unsubscribe event will be a very chatty system that will exponentially increase as you get more users/subscriptions. You could manage the state of AccountId/sessions yourself in Redis (with expiry/timeout check) as switching to a distributed setup won’t have the same event guarantees as a single server in memory setup will.

Possible if the subscription hasn’t expired yet.

Hi @layoric,

Than you for your reply.

I see, that makes sense.

The reason I am fetching all subscriptions on the unsubscribe is because I was having issues where I would notifiy everyone that someone has unsubscribed which then showed them as “offline”. But in fact their connection had already dropped and reconnected and the unsubscribe was an old connection dying. This left the application showing them as offline because the unsubscribe was after the new connections subscribe event.

All I need is a reliable check to see if an AccountGuid has an active connection to show that they are online. What would be the easiest way to do this do you think?

If my best option is to manage the state of sessions in Redis, how would I go about doing this with ServiceStack? Are there any useful articles on this do you know?

Thank you for your help.

Hi @VAST, So I think I have a better idea of your problem, could you clarify if the AccountGuid is unique for a single user or can multiple users have the same AccountGuid?

If you are moving to multiple app servers, you should change your ICacheClient to a Redis as well which will then have your sessions stored there, possible it might be easier to manage online/offline status separately, but it will depend, any more info on your setup/goals would be appreciated.

Can you add the AccountGuid to the subscriptions Meta dictionary on the OnCreated event, that way it will get passed in the Join/Leave/Update events:

Plugins.Add(new ServerEventsFeature {
    OnCreated = (sub, req) => {
        sub.Meta["AccountGuid"] = ((MyUserSession)req.GetSession()).AccountGuid;
    }
});

Hi @mythz,

Yes that will help with not having to try and get the session during the GetAllSubscriptionInfos() loop. So my code will look like:

OnUnsubscribe = (sub) =>
{
	Guid accountGuid = sub.Meta["AccountGuid"] != null && !String.IsNullOrWhiteSpace(sub.Meta["AccountGuid"]) ? new Guid(sub.Meta["AccountGuid"]) : Guid.Empty;
                    
	if (accountGuid != Guid.Empty)
	{
		using (var service = HostContext.Resolve<ServerEventsSubscribersService>())
		{
			foreach (var sub in service.ServerEvents.GetAllSubscriptionInfos())
            {
				if (new Guid(sub.Meta["AccountGuid"]) == accountGuid)
                {
					// Account online but shouldn't be (unless there were more connections but this is not the case)
                }
            }
        }
	}
},
OnCreated = (sub, req) =>
{                    
	sub.Meta["AccountGuid"] = ((CustomUserSession)req.GetSession()).AccountGuid.ToString();                    
}

So now I just need to find a better way to find out if an AccountGuid is “online” rather than using GetAllSubscriptionInfos() as there is a delay when using Redis. Do you have any ideas?

Thanks.

The builtin join/leave/update events will send clients notifications when any subscriptions join & leave channels. Clients can use it to maintain active subscribers within their channel.

Hi @mythz,

Yes but for example, what if a client logs in, gets a list of other clients from the database and wants to know if they are currently connected.

This is where I am using GetAllSubscriptionInfos() at the server to return a list of clients which have a subscription. The problem is that it is returning subscriptions that have been disconnected.

I also set NotifyChannelOfSubscriptions = false as I only want certain clients to know of subscription events which is why I am sending a custom NotifyChannel when a client subscribes or unsubscribes.

What can I do in that situation?

Thanks!

If you turn off events you’ll have no way of getting callbacks when subscriptions join/leave a channel.

The callbacks that do fire are OnSubscribeAsync and OnUnsubscribeAsync which you could use to maintain a set of AccountGuid, to be concurrently safe you’ll likely need a concurrent dictionary with an integer ref counter value where you’d only remove the entry when the ref counter hits 0, so a subscription that subs/unsubs in parallel but receives set instructions out of order, e.g. ADD, ADD, REM is still subscribed with a ref counter of 1, whereas if you used a standard Set the last remove operations wins. To maintain this in a distributed collection you could use a Redis Sorted Set.

I’ve just made the internal local MemoryServerEvents used in RedisServerEvents accessible in this commit so you can access the internal subscribers for a server with:

public IServerEvents ServerEvents { get; set; }

public object Any(MyRequest request)
{
    var local = ((RedisServerEvents)ServerEvents).Local;
    var subInfos = local.GetAllSubscriptionInfos();
}

this change is available in the latest v5.12.1+ that’s now available on MyGet.

You’d need to make the same call on all servers to get a staggered snapshot of all connected subscribers, even then it’s only “what the server thinks is subscribed” at a moment in time, i.e. clients can disconnect without notifying the server in which case the server only acknowledges they’ve become unsubscribed when their heartbeat expires.

Ok @mythz, thank you.

I will take a look at using a

Thank you for your help.

Given this requirement, I would probably look at managing the event yourself on the client by sending custom requests. This will allow you to manage the process yourself of updating specific clients as well as give you more flexibility in the future if requirements change.

Also worth keeping in mind, Redis PubSub which RedisServerEvents is based is highly performant system that can handle a large amount of throughput to lots of clients but doesn’t provide guarantees like “At Least Once Delivery” or “Exactly Once Delivery” like other messaging systems, since messages are not persisted anywhere. Though errors might be rare and small, it is good to keep in mind these known limitations and have fallbacks built in. For example, for someones visible online status, you will likely want a way for a client to fetch a list of known other users and see their current status based on ‘last seen’ or something similar. Same with previous messages etc, SSE gives you that instant feedback to push details to clients but best to keep in mind how users will get that information if they were not connected during the event for whatever reason (network, system issues etc) or an event was lost due to a server outage.

Same with servers going offline, if a server crashes for whatever reason, unsubscribe events will be lost for subscriptions that server was tracking. Storing information like number of sub/unsub event can be useful but you’ll probably want a fallback to something with persisted date information so you can have a timeout to show that user as offline if they haven’t been heard from since.

Hope that helps.

Hi @mythz, I think I have spotted why i’m running into some issues.

When I use MemoryServerEvents and a client calls serverEventsClient.Stop(); the UnSubscribe event is triggered instantly on the server but when I use RedisServerEvents the UnSubscribe event is only triggered when another client or the same client Subscribes.

I find this really odd as I can see

https://localhost:xxxx/event-unregister?id=xxxxxxxxxxxx

being called at the server but then the UnSubscribe event doesn’t get called.

Is this expected behavior when using RedisServerEvents?

I’m using ServerStack 5.12.1 with .NET 5.0.

Thanks again.

Hello again @mythz,

I have just cleared the Nuget cache to make sure I have the latest version of v5.12.1 to see if it helps with the UnSubscribe issue.

For some reason this has caused an issue with authentication. First it looks like the IAuthResponseFilter has changed public void Execute(AuthFilterContext authContext) to Task ExecuteAsync(AuthFilterContext authContext) which is fine but any services with [RequiresAnyRole("ROLE_HERE", "ROLE_HERE")] fail with

DEBUG ServiceStack.JsonHttpClient [(null)] - Status Code : Unauthorized
DEBUG ServiceStack.JsonHttpClient [(null)] - Status Description : Unauthorized
ERROR ServiceStack.JsonHttpClient [(null)] - HttpClient Exception: Not Authenticated
401 Unauthorized
Code: Unauthorized, Message: Not Authenticated

Has something changed with the way roles work with the new update?

If in the Plugins.Add AuthFeature I set IncludeRolesInAuthenticateResponse = true I can see on the client that the AuthenticateResponse does in fact include the roles.

Thank you.

The Redis Server Events implementation of stop is to just stop its Redis Pub/Sub subscription to stop receiving notifications sent to the topic and then to stop the internal MemoryServerEvents implementation which will remove all subscribers on that server only, i.e. it doesn’t stop all SSE servers connected to redis.

I did resolve an issue where the Server OnUnsubscribeAsync wasn’t getting called, not sure if this is related to the behavior you’re mentioning.

We have changed the role logic to reduce the I/O calls required to validate multiple roles, although that shouldn’t change behavior if you’re using the built-in Auth Repositories, but if you had custom logic in HasRole/Async that API is no longer being called to validate roles, it’s instead calling GetRoles/Async so only 1 I/O call is necessary instead of checking HasRole for each role.

I’ve just made a change where the high-level APIs to validate roles is now overridable in your custom user session which you can override to check if the built-in implementation contains your specified roles, if you’re using [RequiresAnyRole] it will call HasAnyRolesAsync() to check if the user is in any of the specified roles, e.g:

public class CustomUserSession : AuthUserSession
{
    public override async Task<bool> HasAllRolesAsync(ICollection<string> requiredRoles, IAuthRepositoryAsync authRepo, IRequest req,
        CancellationToken token = default)
    {
        // called by [RequiredRole] attribute
        var result = await base.HasAllRolesAsync(requiredRoles, authRepo, req, token);
        return result;
    }

    public override async Task<bool> HasAnyRolesAsync(ICollection<string> roles, IAuthRepositoryAsync authRepo, IRequest req,
        CancellationToken token = default)
    {
        // called by [RequiresAnyRole] attribute
        var result = await base.HasAnyRolesAsync(roles, authRepo, req, token);
        return result;
    }
}

You’ll need to clear your NuGet packages cache to download the latest v5.12.1.

Hi @mythz,

Sorry but I urgently need to revert my ServiceStack dlls to the previous v5.12.1 before the roles were changed, fix the issue with the roles or roll back to any version previous to the roles change. We have an urgent issue with our product and I cannot change anything on it now because the roles are broken and reverting to a backup doesn’t help as it still uses the latest v5.12.1.

If it helps here is my setup:

Startup:

Plugins.Add(new AuthFeature(() => new CustomUserSession(),
                new IAuthProvider[] {
                    new CustomCredentialsAuthProvider(container.Resolve<IAuthenticationRepository>()),
                }
            )
            { IncludeDefaultLogin = false, IncludeAssignRoleServices = false });

CustomCredentialsAuthProvider:

public class CustomCredentialsAuthProvider : CredentialsAuthProvider, IAuthResponseFilter
    {
        private IAuthenticationRepository authenticationRepository;

        public CustomCredentialsAuthProvider(IAuthenticationRepository AuthenticationRepository)
        {
            authenticationRepository = AuthenticationRepository;
        }

        public override Task<bool> TryAuthenticateAsync(IServiceBase authService, string userName, string password, CancellationToken token = default)
        {
            //Return true if credentials are valid, otherwise false
            if (authenticationRepository.ValidateAccount(userName, password))
            {
                return Task.FromResult(true);
            }

            return Task.FromResult(false);
        }

        public override Task<IHttpResult> OnAuthenticatedAsync(IServiceBase authService, IAuthSession session, IAuthTokens tokens, Dictionary<string, string> authInfo, CancellationToken token = default)
        {
            //Fill IAuthSession with data you want to retrieve in the app eg:
            session.UserName = session.UserAuthName;

            AccountDTO item = authenticationRepository.GetAccount(session.UserAuthName);
            ((AuthExtension.CustomUserSession)session).AccountGuid = item.AccountGuid;
            session.Roles = authenticationRepository.GetRoles(session.UserAuthName);

            //Call base method to Save Session and fire Auth/Session callbacks:
            return base.OnAuthenticatedAsync(authService, session, tokens, authInfo);
        }        

        public Task ResultFilterAsync(AuthResultContext authContext, CancellationToken token = default)
        {
            throw new NotImplementedException();
        }

        public async Task ExecuteAsync(AuthFilterContext authContext)
        {
            authContext.AuthResponse.Roles = authContext.Session.Roles;
            authContext.AuthResponse.Permissions = authContext.Session.Permissions;
        }
	}

CustomUserSession:

public class CustomUserSession : AuthUserSession
    {
        public Guid GuidId { get; set; }
        public Guid TenantGuid { get; set; }
        public Guid AccountGuid { get; set; }
    }

I have no other custom logic in HasRole/Async and my service uses [RequiresAnyRole("ROLE_HERE", "ROLE_HERE")] for almost every method.

I can’t see why else my roles will not work. Is it a requirement that I override HasAnyRolesAsync in my CustomUserSession?

Thank you.

Hi @mythz,

If it helps, I just added

public override Task<bool> HasAnyRolesAsync(ICollection<string> roles, IAuthRepositoryAsync authRepo, IRequest req, CancellationToken token = default)

to my CustomUserSession class and it was never called which I guess is the issue?

Thanks.

Hello again @mythz,

I have managed to work around the issue by restoring from a backup.

The roles problem is still an issue for me but as I have worked round it we can continue looking into next week as I am finished for the weekend now.

Thank you.

Ok then I’m assuming your authenticated user session doesn’t populate UserAuthId? i.e. that should hold the User Id that the session is for, maybe that’s the value of AccountGuid in your case. Anyway I’ve made the AssertAuthenticated() logic overridable in your AppHost in this commit and removed the requirement that UserAuthId needs to be populated which should pass the Authenticated User Requirement to then call HasAnyRolesAsync() to validate the user roles.

This change is available from the latest v5.12.1 that’s now available on MyGet.

Hi @mythz, yes I use AccountGuid as the Id of the user. I have changed my OnAuthenticatedAsync method to set the UserAuthId to the AccountGuid which has solved the roles issue.

I can also see that the OnUnsubscribeAsync is now triggering. I’m hoping that this will resolve my original issue.

@layoric & @mythz do you think that I should still be looking into using Redis Sorted Set to record who is “online” instead of GetAllSubscriptionInfos()? My new UnSubscribe code is:

   OnUnsubscribeAsync = async (sub) =>
   {
             Guid accountGuid = sub.ServerArgs["AccountGuid"] != null && !String.IsNullOrWhiteSpace(sub.ServerArgs["AccountGuid"]) ? new Guid(sub.ServerArgs["AccountGuid"]) : Guid.Empty;

             if (accountGuid != Guid.Empty)
             {
                  using (var service = HostContext.Resolve<ServerEventsSubscribersService>())
                  {
                       bool isAccountOnline = service.ServerEvents.GetAllSubscriptionInfos().Any(x =>
                                    x.ServerArgs["AccountGuid"] != null && !String.IsNullOrWhiteSpace(x.ServerArgs["AccountGuid"]) && new Guid(x.ServerArgs["AccountGuid"]) == accountGuid &&                            
                                    x.SubscriptionId != sub.SubscriptionId);

                       if (!isAccountOnline)
                       {
                            //Send a ServerEvents message to other clients notifying this client has disconnected
                       }
                  }
             }
    } 

I have not specified a CacheClient so I assume it will be using MemoryCacheClient. Do you recommend I change this to Redis? I have setup sticky connections at the gateway so clients should go back to the same web server for each request until the sticky connection times out.

Once again, thank you for your help.

I’m expecting with the latest v5.12.1 on MyGet this wont be necessary, but IMO it’s better to use UserAuthId to refer to the User Id as that’s its purpose.

I’d only look at maintaining your own online presence if it’s necessary, Server Events does have its own built-in Join/Leave/Update events which clients can handle to get notified when someone enters/leaves their subscribed channels, but you can manage your own if you have differing requirements.

If you’ve not registered an ICacheClient/Async and you have Redis configured ServiceStack automatically registers a Redis Cache Client since it’s assumed if you have Redis configured you’re going to want to use it for caching. Using Redis for caching (which is used to store server sessions) means that an authenticated request is valid in any of the load-balanced App Servers configured to use Redis.