SSE client in Pinia store and Vue3

Hello,
I am trying to use SSE in a Vue SPA

For example with the

If we add the TypeScript ServerEventsClient to a Pinia store
We just display a trace for all event handlers…

import { defineStore } from "pinia"
import {
    ServerEventConnect,
    ServerEventJoin,
    ServerEventLeave,
    ServerEventMessage,
    ServerEventsClient,
    ServerEventUpdate
} from "@servicestack/client"

export const useSseStore = defineStore('sse', () => {
    const client = ref<ServerEventsClient|null>()
    
    const startServerEventsClient = () => {
        client.value = new ServerEventsClient("/", ["app"], {
            handlers: {
                onConnect: (sub:ServerEventConnect) => {  // Successful SSE connection
                    console.log("SSE onConnect");
                },
                onJoin: (msg:ServerEventJoin) => {        // User has joined subscribed channel
                    console.log("SSE onJoin");
                },
                onLeave: (msg:ServerEventLeave) => {      // User has left subscribed channel
                    console.log("SSE onLeave");
                },
                onUpdate: (msg:ServerEventUpdate) => {    // User channel subscription was changed
                    console.log("SSE onUpdate");
                },
                onMessage: (msg:ServerEventMessage) => {
                    console.log("SSE onMessage");
                } 
            },
            onException: (e:Error) => {
                console.log("SSE onException");
            },               
            onReconnect: (e:Error) => {
                console.log("SSE onReconnect");
            }
        }).start();
    }

    return {
        startServerEventsClient
    }
});

I put simple code in a vue page, to just start the SSE client, nothing else

import { useSseStore } from "@/stores/sse"
const storeSse = useSseStore()
storeSse.startServerEventsClient()

Then Server-side, we add the plugin and just trace the events, nothing else

        Plugins.Add(new ServerEventsFeature
        {
            HeartbeatInterval = TimeSpan.FromSeconds(30), 
            IdleTimeout = TimeSpan.FromSeconds(90),
            NotifyChannelOfSubscriptions = true,
            
            // Subscription pre-initialization callback
            OnInit = req =>
            {
                var subscriptionId = req.QueryString["id"];
                if(subscriptionId != null) MLog.Info($"OnInit: {subscriptionId}");
                else MLog.Info($"OnInit: ");
            },
            
            OnConnect = (eventSubscription, _) =>
            {
                MLog.Info($"OnConnect: {eventSubscription.SubscriptionId}");
            },
            
            // Subscription is created
            OnCreated = (eventSubscription, req) =>
            {
                MLog.Info($"OnCreated: {eventSubscription.SubscriptionId}");
            },
            
            OnUpdateAsync = eventSubscription =>
            {
                MLog.Info($"OnUpdateAsync: {eventSubscription.SubscriptionId}");
                return Task.CompletedTask;
            },
            
            OnSubscribeAsync = eventSubscription =>
            {
                MLog.Info($"OnSubscribe: {eventSubscription.SubscriptionId}");
                return Task.CompletedTask;
            },
            
            OnUnsubscribeAsync = eventSubscription =>
            {
                MLog.Info($"OnUnsubscribe: {eventSubscription.SubscriptionId}");
                return Task.CompletedTask;
            },
            
            OnHeartbeatInit = req =>
            {
                var subscriptionId = req.QueryString["id"];
                if(subscriptionId != null) MLog.Info($"OnHeartbeatInit: {subscriptionId}");
                
                var subscription = req.TryResolve<IServerEvents>().GetSubscriptionInfo(subscriptionId);
                if (subscription == null)
                {
                    MLog.Info("... subscription no longer exists");
                }
                else
                {
                    MLog.Info($"Found subscription for Username: {subscription.UserName}");
                }
            },
        });

I am getting error on every heart beat called Client side

404 Subscription 'WB4Sa8sx9XM5SVqET1rT' does not exist

and the trace server-side is:

[info] OnInit:
[info] OnCreated: WB4Sa8sx9XM5SVqET1rT
[info] OnConnect: WB4Sa8sx9XM5SVqET1rT
[info] OnSubscribe: WB4Sa8sx9XM5SVqET1rT
[info] OnInit:
[info] OnCreated: nZDgBhAzOJNDHnxjmad2
[info] OnConnect: nZDgBhAzOJNDHnxjmad2
[info] OnSubscribe: nZDgBhAzOJNDHnxjmad2
[info] OnUnsubscribe: WB4Sa8sx9XM5SVqET1rT
[info] OnHeartbeatInit: WB4Sa8sx9XM5SVqET1rT
[info] ... subscription no longer exists
[info] OnInit:
[info] OnCreated: sj1ZsQwhj3CjcblVNHar
[info] OnConnect: sj1ZsQwhj3CjcblVNHar
[info] OnSubscribe: sj1ZsQwhj3CjcblVNHar
[info] OnHeartbeatInit: WB4Sa8sx9XM5SVqET1rT
[info] ... subscription no longer exists

See, we have an Unsubscribe…

OnUnsubscribe: WB4Sa8sx9XM5SVqET1rT

…and then new subscriptionId are generated, and the heartbeat is still trying to get the first subscriptionId

Do you have an idea?
Note that my code is doing nothing, I am just starting the app and tracing the events client and server side

My real project is Vue2.7 (with composition API and Pinia)… I can reproduce it from your Jamstack template with just the changes above and calling my backend directly

Thierry

Seems the priority should be finding why OnUnsubscribe is being called. Can you try logging Environment.StackTrace in OnUnsubscribeAsync to see if we can work out what’s triggering it.

What’s the trace on the client?

The client will try sending hearbeats to its existing connection until the heartbeat fails, when it does it will try reconnecting.

Will do some more testing shorty

Here is a trace on my real project
Client side on the left - server side on the right

What is suspicious is that the onConnect event client side take a long time to arrive, I would have expect been expecting it right after the start of the client.
It waits about the time for the first heartbeat to be called and the unsubscribe server side also happens around that time
There is a first exception but there is no message in it (just says error)
and right after it is the cycle of the heartbeat failing

It is like it starts client side, does not get back the connect from the server and maybe times out and send unsubscribe… and then the server side send the connect event eventually, too late, and with the old subscriptionId, and it does not realise it is wrong and keep trying the heartbeat with that subscriptionId

Finding out the cause of the first Exception is also important, what does e log as?

onException: (e:Error) => {
     console.log("SSE onException", e, typeof e)
},

You should also use unminified sources so you can debug the client, if the dependency only runs minified version, you should be able to temporarily use an unminified ServerEventsClient with:

import { ServerEventsClient } from 'https://unpkg.com/@servicestack/client@2.0.1/dist/servicestack-client.mjs'

This should help client debugging.

Here is the first exception… nothing in it?

error

If you put a breakpoint on your handler are you able to see a stacktrace to see where it originates from?

Looks like it’s an error coming from the underlying EventSource.

Like this?

Here is Environment.StackTrace in OnUnsubscribeAsync

[info] OnUnsubscribe: cAr9gBd7kgPWGiRVOzPg
3:10
[info]    at System.Environment.get_StackTrace()
   at Tiptopweb.MojoPortal.AppHost.<>c.<Configure>b__3_9(IEventSubscription eventSubscription) in C:\DATA\BITBUCKET\MojoDealerPortal-MDS2-Dev-AWS\Tiptopweb.MojoPortal\Tiptopweb.MojoPortal\Configure.AppHost.cs:line 157
   at ServiceStack.MemoryServerEvents.DoAsyncTasks(CancellationToken token)
   at System.Runtime.CompilerServices.AsyncMethodBuilderCore.Start[TStateMachine](TStateMachine& stateMachine)
   at ServiceStack.MemoryServerEvents.DoAsyncTasks(CancellationToken token)
   at ServiceStack.MemoryServerEvents.HandleUnsubscriptionAsync(IEventSubscription subscription, CancellationToken token) in /home/runner/work/ServiceStack/ServiceStack/ServiceStack/src/ServiceStack/ServerEventsFeature.cs:line 1922
   at System.Runtime.CompilerServices.AsyncMethodBuilderCore.Start[TStateMachine](TStateMachine& stateMachine)
   at ServiceStack.MemoryServerEvents.HandleUnsubscriptionAsync(IEventSubscription subscription, CancellationToken token)
   at ServiceStack.MemoryServerEvents.HandleUnsubscriptionAsync(IEventSubscription subscription) in /home/runner/work/ServiceStack/ServiceStack/ServiceStack/src/ServiceStack/ServerEventsFeature.cs:line 1913
   at ServiceStack.EventSubscription.UnsubscribeAsync() in /home/runner/work/ServiceStack/ServiceStack/ServiceStack/src/ServiceStack/ServerEventsFeature.cs:line 869
   at ServiceStack.MemoryServerEvents.DoAsyncTasks(CancellationToken token) in /home/runner/work/ServiceStack/ServiceStack/ServiceStack/src/ServiceStack/ServerEventsFeature.cs:line 1341
   at System.Runtime.CompilerServices.AsyncMethodBuilderCore.Start[TStateMachine](TStateMachine& stateMachine)
   at ServiceStack.MemoryServerEvents.DoAsyncTasks(CancellationToken token)
   at ServiceStack.MemoryServerEvents.NotifyRawAsync(ConcurrentDictionary`2 map, String key, String selector, String body, String channel, CancellationToken token) in /home/runner/work/ServiceStack/ServiceStack/ServiceStack/src/ServiceStack/ServerEventsFeature.cs:line 1463
   at System.Runtime.CompilerServices.AsyncMethodBuilderCore.Start[TStateMachine](TStateMachine& stateMachine)
   at ServiceStack.MemoryServerEvents.NotifyChannelsAsync(String[] channels, String selector, String body, CancellationToken token) in /home/runner/work/ServiceStack/ServiceStack/ServiceStack/src/ServiceStack/ServerEventsFeature.cs:line 1183
   at System.Runtime.CompilerServices.AsyncMethodBuilderCore.Start[TStateMachine](TStateMachine& stateMachine)
   at ServiceStack.MemoryServerEvents.NotifyChannelsAsync(String[] channels, String selector, String body, CancellationToken token)
   at ServiceStack.MemoryServerEvents.<.ctor>b__61_1(IEventSubscription s) in /home/runner/work/ServiceStack/ServiceStack/ServiceStack/src/ServiceStack/ServerEventsFeature.cs:line 1076
   at ServiceStack.MemoryServerEvents.DoAsyncTasks(CancellationToken token) in /home/runner/work/ServiceStack/ServiceStack/ServiceStack/src/ServiceStack/ServerEventsFeature.cs:line 1327
   at System.Runtime.CompilerServices.AsyncMethodBuilderCore.Start[TStateMachine](TStateMachine& stateMachine)
   at ServiceStack.MemoryServerEvents.DoAsyncTasks(CancellationToken token)
   at ServiceStack.MemoryServerEvents.HandleUnsubscriptionAsync(IEventSubscription subscription, CancellationToken token) in /home/runner/work/ServiceStack/ServiceStack/ServiceStack/src/ServiceStack/ServerEventsFeature.cs:line 1922
   at System.Runtime.CompilerServices.AsyncMethodBuilderCore.Start[TStateMachine](TStateMachine& stateMachine)
   at ServiceStack.MemoryServerEvents.HandleUnsubscriptionAsync(IEventSubscription subscription, CancellationToken token)
   at ServiceStack.MemoryServerEvents.HandleUnsubscriptionAsync(IEventSubscription subscription) in /home/runner/work/ServiceStack/ServiceStack/ServiceStack/src/ServiceStack/ServerEventsFeature.cs:line 1913


at ServiceStack.EventSubscription.UnsubscribeAsync() in /home/runner/work/ServiceStack/ServiceStack/ServiceStack/src/ServiceStack/ServerEventsFeature.cs:line 869
   at ServiceStack.MemoryServerEvents.DoAsyncTasks(CancellationToken token) in /home/runner/work/ServiceStack/ServiceStack/ServiceStack/src/ServiceStack/ServerEventsFeature.cs:line 1341
   at System.Runtime.CompilerServices.AsyncMethodBuilderCore.Start[TStateMachine](TStateMachine& stateMachine)
   at ServiceStack.MemoryServerEvents.DoAsyncTasks(CancellationToken token)
   at ServiceStack.MemoryServerEvents.RemoveExpiredSubscriptionsAsync(CancellationToken token) in /home/runner/work/ServiceStack/ServiceStack/ServiceStack/src/ServiceStack/ServerEventsFeature.cs:line 1653
   at System.Runtime.CompilerServices.AsyncMethodBuilderCore.Start[TStateMachine](TStateMachine& stateMachine)
   at ServiceStack.MemoryServerEvents.RemoveExpiredSubscriptionsAsync(CancellationToken token)
   at ServiceStack.ServerEventsHeartbeatHandler.ProcessRequestAsync(IRequest req, IResponse res, String operationName) in /home/runner/work/ServiceStack/ServiceStack/ServiceStack/src/ServiceStack/ServerEventsFeature.cs:line 411
   at System.Runtime.CompilerServices.AsyncMethodBuilderCore.Start[TStateMachine](TStateMachine& stateMachine)
   at ServiceStack.ServerEventsHeartbeatHandler.ProcessRequestAsync(IRequest req, IResponse res, String operationName)
   at ServiceStack.AppHostBase.ProcessRequest(HttpContext context, Func`1 next) in /home/runner/work/ServiceStack/ServiceStack/ServiceStack/src/ServiceStack/AppHostBase.NetCore.cs:line 274
   at System.Runtime.CompilerServices.AsyncMethodBuilderCore.Start[TStateMachine](TStateMachine& stateMachine)
   at ServiceStack.AppHostBase.ProcessRequest(HttpContext context, Func`1 next)
   at Microsoft.AspNetCore.Builder.UseExtensions.<>c__DisplayClass0_1.<Use>b__1(HttpContext context)
   at Microsoft.AspNetCore.HostFiltering.HostFilteringMiddleware.Invoke(HttpContext context)
   at Microsoft.AspNetCore.Hosting.HostingApplication.ProcessRequestAsync(Context context)
   at Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http.HttpProtocol.ProcessRequests[TContext](IHttpApplication`1 application)
   at System.Runtime.CompilerServices.AsyncMethodBuilderCore.Start[TStateMachine](TStateMachine& stateMachine)
   at Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http.HttpProtocol.ProcessRequests[TContext](IHttpApplication`1 application)
   at Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http.HttpProtocol.ProcessRequestsAsync[TContext](IHttpApplication`1 application)
   at System.Runtime.CompilerServices.AsyncMethodBuilderCore.Start[TStateMachine](TStateMachine& stateMachine)
   at Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http.HttpProtocol.ProcessRequestsAsync[TContext](IHttpApplication`1 application)
   at Microsoft.AspNetCore.Server.Kestrel.Core.Internal.HttpConnection.ProcessRequestsAsync[TContext](IHttpApplication`1 httpApplication)
   at System.Runtime.CompilerServices.AsyncMethodBuilderCore.Start[TStateMachine](TStateMachine& stateMachine)
   at Microsoft.AspNetCore.Server.Kestrel.Core.Internal.HttpConnection.ProcessRequestsAsync[TContext](IHttpApplication`1 httpApplication)
   at Microsoft.AspNetCore.Server.Kestrel.Core.Internal.HttpConnectionMiddleware`1.OnConnectionAsync(ConnectionContext connectionContext)
   at Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure.TransportManager.<>c__DisplayClass9_0.<BindAsync>b__0(ConnectionContext c)
   at Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure.KestrelConnection`1.ExecuteAsync()
   at System.Runtime.CompilerServices.AsyncMethodBuilderCore.Start[TStateMachine](TStateMachine& stateMachine)
   at Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure.KestrelConnection`1.ExecuteAsync()

at Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure.KestrelConnection`1.System.Threading.IThreadPoolWorkItem.Execute()
   at System.Threading.ThreadPoolWorkQueue.Dispatch()
   at System.Threading.PortableThreadPool.WorkerThread.WorkerThreadStart()
   at System.Threading.Thread.StartCallback()

Yeah it’s coming from the EventSource object for some reason.

Can you try upgrading to latest @servicestack/client v2.0.2, it’s been rewritten to use async/await for easier readability and it now reads the empty heartbeat body since chrome dev tools reports it as a failed fetch if it’s not read.

It shouldn’t change behavior, but should make it easier to debug heartbeat handling from unminified sources

FYI I’m also able to run it from by browser by dynamically loading the module and running your JS above:

const M = await import('https://unpkg.com/@servicestack/client@2.0.2/dist/servicestack-client.mjs')
const { ServerEventsClient } = M

let sse = new ServerEventsClient("/", ["app"], {
    handlers: {
        onConnect: (sub) => {
            console.log("SSE onConnect");
        },
        onJoin: (msg) => {
            console.log("SSE onJoin");
        },
        onLeave: (msg) => {
            console.log("SSE onLeave");
        },
        onUpdate: (msg) => {
            console.log("SSE onUpdate");
        },
        onMessage: (msg) => {
            console.log("SSE onMessage");
        } 
    },
    onException: (e) => {
        console.log("SSE onException", e);
    },               
    onReconnect: (e) => {
        console.log("SSE onReconnect", e);
    }
}).start();

Which has been running for a while without issues, with all heartbeats passing and no reconnections:

Previously Chrome was reporting heartbeat fetches as failed, but doesn’t now that the heartbeat body is being read.

Yes, I saw the trace on my backend showing you could successfully connect…
It confused me a bit as I thought suddenly it was working :rofl:

I upgraded to @servicestack/client version 2.0.2

It must be something to do with Composition API in Vue / Pinia (my project is Vue2.7)
If I use your Jamstack Vue3 to connect to my backend, it is the same issue

Given that "pinia" EventSource returns very few results with the first one being about Alipine.js and we’re in the 2nd result I’d say using SSE from inside a pinia store isn’t done very much.

I’d look for an alternative approach, maybe a singleton or injecting it as a provider in the vue app.

Yes good point. Thank you.

I removed Pinia and we just have a composable left
It is still not working (exactly the same issue)
Maybe I cannot save the client in a ref

client.value = new ServerEventsClient().start()

Here is the code for the composable for reference:

import { ref } from "vue";
import { ServerEventsClient } from "@servicestack/client";

// global state
const client = ref({});
const userId = ref("");
const userName = ref("");

export function useServerEvent() {
    
    function startServerEventsClient(userData) {
        
        userId.value = userData.userId;
        userName.value = userData.userName;

        const channels = ["app"];

        console.log( "SSE start on channel app for " + userName.value + "(" + userId.value + ")");
        
        client.value = new ServerEventsClient("/", channels, {
            handlers: {
                onConnect: sub => {
                    console.log("SSE onConnect: " + sub.displayName);
                },
                onJoin: msg => {
                    console.log(
                        "SSE onJoin channel " + msg.channel + " : " + msg.displayName
                    );
                },
                onLeave: msg => {
                    console.log(
                        "SSE onLeave channel " + msg.channel + " : " + msg.displayName
                    );
                },
                onUpdate: msg => {
                    console.log(
                        msg.displayName + " channels subscription were updated"
                    );
                },
                onMessage: msg => {
                    console.log("SSE onMessage");
                }
                // END OF HANDLERS
            },
            onException: e => {
                console.log("SSE onException");
                //console.log("SSE onException", e, typeof e);
            },

            onReconnect: e => {
                console.log("SSE onReconnect");
                //console.log("SSE onReconnect", e, typeof e);
            },
        }).start();
    }

    return {
        startServerEventsClient
    }
}

Not sure if Vue’s reactivity system messes with it, try using it without.

VueUse have the EventSource, it is a good sign

I actually do not need to set the Global variables reactive (it is not the Pinia store anymore)

// global state
let client = null;
let userId = "";
let userName = "";

So we are not using the reactivity system

I got something interesting:
The call to start is blocked for 1 minute before succeeding and the first exception comes right after: probably a time-out.
I think it was happening also with the Pinia code, I just missed it yesterday…

Server side we have quickly coming (sorry the Ids are different from the screenshot as it is a different run)

[info] OnInit:
[info] OnCreated: rODRzW3QF24UUsIJYhlD
[info] OnConnect: rODRzW3QF24UUsIJYhlD
[info] Tiptopweb.MojoPortal.ServiceModel.AdminGetBrands - admin {} - Execution time: 00.016
[info] Tiptopweb.MojoPortal.ServiceModel.AdminGetAdminRoles - admin {} - Execution time: 00.000
[info] OnSubscribe: rODRzW3QF24UUsIJYhlD

and then after the one minute, it creates a new SubscriptionId, unsubscribe the first one and the Heartbeat is calling the first one…

[info] OnInit:
[info] OnCreated: EgZdmGTfKAAHuzPMuiLM
[info] OnConnect: EgZdmGTfKAAHuzPMuiLM
[info] OnSubscribe: EgZdmGTfKAAHuzPMuiLM
[info] OnUnsubscribe: rODRzW3QF24UUsIJYhlD
[info] OnHeartbeatInit: rODRzW3QF24UUsIJYhlD
[info] ... subscription no longer exists

We have the same if I remove all server side code (in case my traces server side were the problem) and just have

Plugins.Add(new ServerEventsFeature {});

You still need to find out what’s causing it to Unsubscribe. Can you debug the unminified client? Did you log Environment.StackTrace in OnUnsubscribeAsync on the server?

If it takes a long time for Events to come down you may have some buffering somewhere, does this happen when running locally? are you going through a reverse proxy?

I created a blank page with almost nothing else (still Vue and plugins + router)
If you look here you can see the succession of events

Only the first call is waiting for a minute, the other are returning asap

https://mds2.mojomotorcycles.com.au

Note: when you used it directly via javascript without using Vue, it was working ok
So something to do with Vue 2.7 (using composition API) or Vue 3 / router

Note: I tried on your Jamstack template with Vue3 initially (see my top post) but connecting to my backend
I am sure you add the above code to your template, the issue will be there
I have trouble running your whole template locally on my computer

Note: I do not find a way to not have the code generated and not minified

The code is

<template>
  <div class="py-3 min-w-0 w-full">
    Blank Page
  </div>
</template>

<script>

import { onMounted } from "vue";
import { ServerEventsClient } from "@servicestack/client";

let globalServerEventsClient = null;

export default {

  setup() {
    
    onMounted(() => {

      const channels = ["app"];

      console.log("SSE start on channel app");

      globalServerEventsClient = new ServerEventsClient("/", channels, {
        handlers: {
          onConnect: sub => {
            // Successful SSE connection
            console.log("SSE onConnect: " + sub.displayName);
          },
          onJoin: msg => {
            // User has joined subscribed channel
            console.log("SSE onJoin channel " + msg.channel + " : " + msg.displayName);
          },
          onLeave: msg => {
            // User has left subscribed channel
            console.log("SSE onLeave channel " + msg.channel + " : " + msg.displayName);
          },
          onUpdate: msg => {
            // User channel subscription was changed
            console.log(msg.displayName + " channels subscription were updated");
          },
          onMessage: msg => {
            // Invoked for each other message
            console.log("SSE onMessage");
          }
          // END OF HANDLERS
        },
        onException: e => {
          // Invoked on each Error
          console.log("SSE onException");
          //console.log("SSE onException", e, typeof e);
        },

        onReconnect: e => {
          // Invoked after each auto-reconnect
          console.log("SSE onReconnect");
        },
        onTick: () => {
          //console.log("SSE onTick");
        },
      });

      console.log("SSE Initialised, calling start");

      globalServerEventsClient.start();

      console.log("SSE start complete");
    });
    
  }
};
</script>

That timing screenshot is correct, the /event-stream is supposed to stay running, that’s how EventStream works by keeping the HTTP Request open and flushing events as they’re sent.

The issue that needs investigating is the onException. If you can publish a stand-alone repro with the latest Jamstack template on GitHub I can take a look.