Browse Source

fix: return unsubscribe callback on subscribing (#8811)

* fix: return unsubscribe callback on subscribing

* fix: keep redis channel if subscribers available
pull/8821/head
Mert E 3 weeks ago committed by GitHub
parent
commit
5f490fb84d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 9
      packages/nocodb/src/controllers/notifications.controller.ts
  2. 4
      packages/nocodb/src/modules/jobs/jobs.controller.ts
  3. 1
      packages/nocodb/src/modules/jobs/redis/jobs-redis.ts
  4. 30
      packages/nocodb/src/redis/pubsub-redis.ts
  5. 13
      packages/nocodb/src/services/notifications/notifications.service.ts

9
packages/nocodb/src/controllers/notifications.controller.ts

@ -45,8 +45,10 @@ export class NotificationsController {
this.notificationsService.addConnection(req.user.id, res); this.notificationsService.addConnection(req.user.id, res);
let unsubscribeCallback: (keepRedisChannel?: boolean) => Promise<void> = null;
if (PubSubRedis.available) { if (PubSubRedis.available) {
await PubSubRedis.subscribe( unsubscribeCallback = await PubSubRedis.subscribe(
`notification:${req.user.id}`, `notification:${req.user.id}`,
async (data) => { async (data) => {
this.notificationsService.sendToConnections(req.user.id, data); this.notificationsService.sendToConnections(req.user.id, data);
@ -55,10 +57,7 @@ export class NotificationsController {
} }
res.on('close', async () => { res.on('close', async () => {
this.notificationsService.removeConnection(req.user.id, res); await this.notificationsService.removeConnection(req.user.id, res, unsubscribeCallback);
if (PubSubRedis.available) {
await PubSubRedis.unsubscribe(`notification:${req.user.id}`);
}
}); });
setTimeout(() => { setTimeout(() => {

4
packages/nocodb/src/modules/jobs/jobs.controller.ts

@ -92,7 +92,7 @@ export class JobsController {
}; };
// subscribe to job events // subscribe to job events
if (JobsRedis.available) { if (JobsRedis.available) {
await JobsRedis.subscribe(jobId, async (data) => { const unsubscribeCallback = await JobsRedis.subscribe(jobId, async (data) => {
if (this.jobRooms[jobId]) { if (this.jobRooms[jobId]) {
this.jobRooms[jobId].listeners.forEach((res) => { this.jobRooms[jobId].listeners.forEach((res) => {
if (!res.headersSent) { if (!res.headersSent) {
@ -110,7 +110,7 @@ export class JobsController {
if ( if (
[JobStatus.COMPLETED, JobStatus.FAILED].includes(data.status) [JobStatus.COMPLETED, JobStatus.FAILED].includes(data.status)
) { ) {
await JobsRedis.unsubscribe(jobId); await unsubscribeCallback();
delete this.jobRooms[jobId]; delete this.jobRooms[jobId];
// close the job after 1 second (to allow the update of messages) // close the job after 1 second (to allow the update of messages)
setTimeout(() => { setTimeout(() => {

1
packages/nocodb/src/modules/jobs/redis/jobs-redis.ts

@ -96,5 +96,4 @@ export class JobsRedis {
static publish = PubSubRedis.publish; static publish = PubSubRedis.publish;
static subscribe = PubSubRedis.subscribe; static subscribe = PubSubRedis.subscribe;
static unsubscribe = PubSubRedis.unsubscribe;
} }

30
packages/nocodb/src/redis/pubsub-redis.ts

@ -10,8 +10,6 @@ export class PubSubRedis {
public static redisClient: Redis; public static redisClient: Redis;
public static redisSubscriber: Redis; public static redisSubscriber: Redis;
private static unsubscribeCallbacks: { [key: string]: () => Promise<void> } =
{};
public static async init() { public static async init() {
if (!PubSubRedis.available) { if (!PubSubRedis.available) {
@ -43,25 +41,16 @@ export class PubSubRedis {
} }
} }
static async unsubscribe(channel: string) { /**
if (!PubSubRedis.initialized) { *
if (!PubSubRedis.available) { * @param channel
return; * @param callback
} * @returns Returns a callback to unsubscribe
*/
await PubSubRedis.init();
}
if (PubSubRedis.unsubscribeCallbacks[channel]) {
await PubSubRedis.unsubscribeCallbacks[channel]();
delete PubSubRedis.unsubscribeCallbacks[channel];
}
}
static async subscribe( static async subscribe(
channel: string, channel: string,
callback: (message: any) => Promise<void>, callback: (message: any) => Promise<void>,
) { ): Promise<(keepRedisChannel?: boolean) => Promise<void>> {
if (!PubSubRedis.initialized) { if (!PubSubRedis.initialized) {
if (!PubSubRedis.available) { if (!PubSubRedis.available) {
return; return;
@ -83,8 +72,9 @@ export class PubSubRedis {
}; };
PubSubRedis.redisSubscriber.on('message', onMessage); PubSubRedis.redisSubscriber.on('message', onMessage);
PubSubRedis.unsubscribeCallbacks[channel] = async () => { return async (keepRedisChannel = false) => {
await PubSubRedis.redisSubscriber.unsubscribe(channel); // keepRedisChannel is used to keep the channel open for other subscribers
if (!keepRedisChannel) await PubSubRedis.redisSubscriber.unsubscribe(channel);
PubSubRedis.redisSubscriber.off('message', onMessage); PubSubRedis.redisSubscriber.off('message', onMessage);
}; };
} }

13
packages/nocodb/src/services/notifications/notifications.service.ts

@ -36,7 +36,11 @@ export class NotificationsService implements OnModuleInit, OnModuleDestroy {
this.connections.get(userId).push(res); this.connections.get(userId).push(res);
}; };
removeConnection = (userId: string, res: Response & { resId: string }) => { removeConnection = async (
userId: string,
res: Response & { resId: string },
unsubscribeCb: (keepRedisChannel?: boolean) => Promise<void> | null,
) => {
if (!this.connections.has(userId)) { if (!this.connections.has(userId)) {
return; return;
} }
@ -51,8 +55,15 @@ export class NotificationsService implements OnModuleInit, OnModuleDestroy {
if (userConnections.length === 0) { if (userConnections.length === 0) {
this.connections.delete(userId); this.connections.delete(userId);
if (unsubscribeCb) {
await unsubscribeCb();
}
} else { } else {
this.connections.set(userId, userConnections); this.connections.set(userId, userConnections);
if (unsubscribeCb) {
// if there are still connections, keep the redis channel
await unsubscribeCb(true);
}
} }
}; };

Loading…
Cancel
Save