From f132495e56c93d6866879f3ca75707cd8171b4a8 Mon Sep 17 00:00:00 2001 From: mertmit Date: Sun, 9 Jun 2024 09:11:40 +0000 Subject: [PATCH] fix: use PubSubRedis instead of this --- .../nocodb/src/modules/jobs/redis/jobs-redis.ts | 16 ++++++++-------- packages/nocodb/src/redis/pubsub-redis.ts | 6 +++++- 2 files changed, 13 insertions(+), 9 deletions(-) diff --git a/packages/nocodb/src/modules/jobs/redis/jobs-redis.ts b/packages/nocodb/src/modules/jobs/redis/jobs-redis.ts index 62dc1de00a..1fe34e2a38 100644 --- a/packages/nocodb/src/modules/jobs/redis/jobs-redis.ts +++ b/packages/nocodb/src/modules/jobs/redis/jobs-redis.ts @@ -46,28 +46,28 @@ export class JobsRedis { PubSubRedis.redisSubscriber.on('message', onMessage); if (process.env.NC_WORKER_CONTAINER === 'true') { - await this.subscribe(InstanceTypes.WORKER, async (message) => { + await PubSubRedis.subscribe(InstanceTypes.WORKER, async (message) => { await onMessage(InstanceTypes.WORKER, message); }); } else { - await this.subscribe(InstanceTypes.PRIMARY, async (message) => { + await PubSubRedis.subscribe(InstanceTypes.PRIMARY, async (message) => { await onMessage(InstanceTypes.PRIMARY, message); }); } } static async workerCount(): Promise { - if (!this.initialized) { - if (!this.available) { + if (!PubSubRedis.initialized) { + if (!PubSubRedis.available) { return; } - await this.init(); + await PubSubRedis.init(); await this.initJobs(); } return new Promise((resolve) => { - this.redisClient.publish( + PubSubRedis.redisClient.publish( InstanceTypes.WORKER, 'count', (error, numberOfSubscribers) => { @@ -84,11 +84,11 @@ export class JobsRedis { static async emitWorkerCommand(command: InstanceCommands, ...args: any[]) { const data = `${command}${args.length ? `:${args.join(':')}` : ''}`; - await this.publish(InstanceTypes.WORKER, data); + await PubSubRedis.publish(InstanceTypes.WORKER, data); } static async emitPrimaryCommand(command: InstanceCommands, ...args: any[]) { const data = `${command}${args.length ? `:${args.join(':')}` : ''}`; - await this.publish(InstanceTypes.PRIMARY, data); + await PubSubRedis.publish(InstanceTypes.PRIMARY, data); } } diff --git a/packages/nocodb/src/redis/pubsub-redis.ts b/packages/nocodb/src/redis/pubsub-redis.ts index 13e56c6cbd..99fb5e69cf 100644 --- a/packages/nocodb/src/redis/pubsub-redis.ts +++ b/packages/nocodb/src/redis/pubsub-redis.ts @@ -71,7 +71,11 @@ export class PubSubRedis { await PubSubRedis.redisSubscriber.subscribe(channel); - const onMessage = async (_channel, message) => { + const onMessage = async (channel, message) => { + if (channel !== channel) { + return; + } + try { message = JSON.parse(message); } catch (e) {}