
Have you ever been in a situation where you needed to notify your client agents regarding events that had occurred in your system? There are a few approaches to solving this. One is a client agent constantly pinging your downstream server for updates at an x interval. This is commonly referred to as "long polling." Another approach is to have the downstream server notify the client. This works by the client establishing a long-lived connection with the downstream server, which then allows the downstream server to send events or streams when it has new updates for it. Another approach is a hybrid approach, in which both techniques are leveraged. Of course, all of these have their own cons, but we wouldn’t be dwelling on that today. Our focus today is maintaining a stable SSE connection across scaled downstream servers. When you begin to deal with a certain scale, it suffices to say one downstream server isn’t enough anymore, or when you want a highly available system. But with this, if we have a system where we maintain open SSE connections to our scaled system, we have a scenario where events or updates would be missed and not sent to the client because a scaled instance has no idea of the client it ought to broadcast the event to. Below, find a snippet of a very dumbed-down implementation of what a simple naive publisher might look like \ import { EventEmitter } from 'events'; import { singleton } from 'tsyringe'; import { SSEEvent } from '@/common/types/events'; @singleton() export class SampleEventBus extends EventEmitter { constructor() { super(); this.setMaxListeners(500); } channelKey(orgId: string, eventId: string) { return `sample:${orgId}:${eventId}`; } publish(orgId: string, eventId: string, event: SSEEvent){ const channel = this.channelKey(orgId, eventId); super.emit(channel, event); } } Now that is a simple snippet written in TypeScript that extends the base Event emitter, and all it does is emit events to connected clients. So what is the issue with this? Let’s say you have two instances of this service running behind a load balancer, Instance A and Instance B. A client opens an SSE connection, and the load balancer routes it to Instance A. That connection now lives entirely in Instance A's memory, tied to Instance A’s EventEmitter . Now imagine a request that triggers publish() , but it lands on Instance B instead, say, a webhook, for instance. Instance B EventEmitter emits the event just fine. The problem is that the client's listener was never there. It's sitting on Instance A, which has no idea that anything happened. And here’s the part that makes this an unacceptable solution. The event is just never seen. In this scenario, this can be where a fallback polling mechanism could catch what SSE missed. What we actually need What we need is a way for every instance to hear about every event, regardless of which instance triggered it. If you are thinking of a light fan-out to all instances, you are right. And we can achieve exactly this with Redis Pub/Sub (one instance publishes to a channel, and every subscribed instance receives the message). Each instance can then check if they have a client connected who needs this event. If yes, write it to that connection’s stream. If not, just ignore it. This is what it might look like \ import { EventEmitter } from 'events'; import { inject, singleton } from 'tsyringe'; import type { RedisClientType } from 'redis'; import { SSEEvent } from '@/common/types/events'; import { Redis } from '@/common/utils/redis/redis'; @singleton() export class SampleEventBus extends EventEmitter { private subscriber: RedisClientType | null = null; private readonly publisher: RedisClientType; constructor(@inject(Redis) redis: Redis) { super(); this.setMaxListeners(500); this.publisher = redis.getInstance(); this.initSubscriber().catch((err) => console.error('subscriber init failed', err)); } private async initSubscriber() { const sub = this.publisher.duplicate() as RedisClientType; await sub.connect(); await sub.pSubscribe('sample:*', (message: string, channel: string) => { try { super.emit(channel, JSON.parse(message) as SSEEvent); } catch (err) { console.error('Error parsing sse event', err) } }); this.subscriber = sub; } channelKey(orgId: string, eventId: string) { return `sample:${orgId}:${eventId}`; } publish(orgId: string, eventId: string, event: SSEEvent) { const channel = this.channelKey(orgId, eventId); if (!this.subscriber) { super.emit(channel, event); return; } this.publisher.publish(channel, JSON.stringify(event)); } } \ So what changed between the two versions? Not much, actually, channelKey() and publish() still look the same from the outside. Anywhere else in your codebase, calling eventBus.publish(...) has no clue that Redis is even involved. That’s really the whole fix. A second Redis connection for subscribing, a pattern match so one subscription covers every channel, and a fallback to local emit if Redis isn’t ready yet. If you already run Redis for caching, you already have what you need. One thing to know Redis Pub/Sub is like a dumb funnel, it doesn’t remember anything. If a client’s connection drops for a few seconds and an event fires during that gap, it’s gone. If you need something stronger than that, you’d be looking at Redis Streams with some last event timestamp, but that is beyond the scope of this article. For now, you don’t need Pusher, Ably or any real-time service product to make SSE work across multiple instances. Just an EventEmitter, a Redis connection you probably already have, and a few extra lines of code. If you got to this point, thanks for reading. \
View original source — Hacker Noon ↗

