diff --git a/packages/nocodb/src/controllers/notifications.controller.ts b/packages/nocodb/src/controllers/notifications.controller.ts index e3d180d3e9..446be008e4 100644 --- a/packages/nocodb/src/controllers/notifications.controller.ts +++ b/packages/nocodb/src/controllers/notifications.controller.ts @@ -45,8 +45,10 @@ export class NotificationsController { this.notificationsService.addConnection(req.user.id, res); + let unsubscribeCallback: (keepRedisChannel?: boolean) => Promise = null; + if (PubSubRedis.available) { - await PubSubRedis.subscribe( + unsubscribeCallback = await PubSubRedis.subscribe( `notification:${req.user.id}`, async (data) => { this.notificationsService.sendToConnections(req.user.id, data); @@ -55,10 +57,7 @@ export class NotificationsController { } res.on('close', async () => { - this.notificationsService.removeConnection(req.user.id, res); - if (PubSubRedis.available) { - await PubSubRedis.unsubscribe(`notification:${req.user.id}`); - } + await this.notificationsService.removeConnection(req.user.id, res, unsubscribeCallback); }); setTimeout(() => { diff --git a/packages/nocodb/src/modules/jobs/jobs.controller.ts b/packages/nocodb/src/modules/jobs/jobs.controller.ts index fe4f48905b..c363544730 100644 --- a/packages/nocodb/src/modules/jobs/jobs.controller.ts +++ b/packages/nocodb/src/modules/jobs/jobs.controller.ts @@ -92,7 +92,7 @@ export class JobsController { }; // subscribe to job events if (JobsRedis.available) { - await JobsRedis.subscribe(jobId, async (data) => { + const unsubscribeCallback = await JobsRedis.subscribe(jobId, async (data) => { if (this.jobRooms[jobId]) { this.jobRooms[jobId].listeners.forEach((res) => { if (!res.headersSent) { @@ -110,7 +110,7 @@ export class JobsController { if ( [JobStatus.COMPLETED, JobStatus.FAILED].includes(data.status) ) { - await JobsRedis.unsubscribe(jobId); + await unsubscribeCallback(); delete this.jobRooms[jobId]; // close the job after 1 second (to allow the update of messages) setTimeout(() => { diff --git a/packages/nocodb/src/modules/jobs/redis/jobs-redis.ts b/packages/nocodb/src/modules/jobs/redis/jobs-redis.ts index 920ca25d2f..8256de8437 100644 --- a/packages/nocodb/src/modules/jobs/redis/jobs-redis.ts +++ b/packages/nocodb/src/modules/jobs/redis/jobs-redis.ts @@ -96,5 +96,4 @@ export class JobsRedis { static publish = PubSubRedis.publish; static subscribe = PubSubRedis.subscribe; - static unsubscribe = PubSubRedis.unsubscribe; } diff --git a/packages/nocodb/src/redis/pubsub-redis.ts b/packages/nocodb/src/redis/pubsub-redis.ts index a46247b90a..dd699afa63 100644 --- a/packages/nocodb/src/redis/pubsub-redis.ts +++ b/packages/nocodb/src/redis/pubsub-redis.ts @@ -10,8 +10,6 @@ export class PubSubRedis { public static redisClient: Redis; public static redisSubscriber: Redis; - private static unsubscribeCallbacks: { [key: string]: () => Promise } = - {}; public static async init() { if (!PubSubRedis.available) { @@ -43,25 +41,16 @@ export class PubSubRedis { } } - static async unsubscribe(channel: string) { - if (!PubSubRedis.initialized) { - if (!PubSubRedis.available) { - return; - } - - await PubSubRedis.init(); - } - - if (PubSubRedis.unsubscribeCallbacks[channel]) { - await PubSubRedis.unsubscribeCallbacks[channel](); - delete PubSubRedis.unsubscribeCallbacks[channel]; - } - } - + /** + * + * @param channel + * @param callback + * @returns Returns a callback to unsubscribe + */ static async subscribe( channel: string, callback: (message: any) => Promise, - ) { + ): Promise<(keepRedisChannel?: boolean) => Promise> { if (!PubSubRedis.initialized) { if (!PubSubRedis.available) { return; @@ -83,8 +72,9 @@ export class PubSubRedis { }; PubSubRedis.redisSubscriber.on('message', onMessage); - PubSubRedis.unsubscribeCallbacks[channel] = async () => { - await PubSubRedis.redisSubscriber.unsubscribe(channel); + return async (keepRedisChannel = false) => { + // keepRedisChannel is used to keep the channel open for other subscribers + if (!keepRedisChannel) await PubSubRedis.redisSubscriber.unsubscribe(channel); PubSubRedis.redisSubscriber.off('message', onMessage); }; } diff --git a/packages/nocodb/src/services/notifications/notifications.service.ts b/packages/nocodb/src/services/notifications/notifications.service.ts index 9705e3e6d9..e9c0e2b780 100644 --- a/packages/nocodb/src/services/notifications/notifications.service.ts +++ b/packages/nocodb/src/services/notifications/notifications.service.ts @@ -36,7 +36,11 @@ export class NotificationsService implements OnModuleInit, OnModuleDestroy { this.connections.get(userId).push(res); }; - removeConnection = (userId: string, res: Response & { resId: string }) => { + removeConnection = async ( + userId: string, + res: Response & { resId: string }, + unsubscribeCb: (keepRedisChannel?: boolean) => Promise | null, + ) => { if (!this.connections.has(userId)) { return; } @@ -51,8 +55,15 @@ export class NotificationsService implements OnModuleInit, OnModuleDestroy { if (userConnections.length === 0) { this.connections.delete(userId); + if (unsubscribeCb) { + await unsubscribeCb(); + } } else { this.connections.set(userId, userConnections); + if (unsubscribeCb) { + // if there are still connections, keep the redis channel + await unsubscribeCb(true); + } } };