diff --git a/packages/nocodb/src/modules/jobs/redis/jobs-redis.ts b/packages/nocodb/src/modules/jobs/redis/jobs-redis.ts index c8128eeeb0..62dc1de00a 100644 --- a/packages/nocodb/src/modules/jobs/redis/jobs-redis.ts +++ b/packages/nocodb/src/modules/jobs/redis/jobs-redis.ts @@ -3,7 +3,7 @@ import type { InstanceCommands } from '~/interface/Jobs'; import { PubSubRedis } from '~/redis/pubsub-redis'; import { InstanceTypes } from '~/interface/Jobs'; -export class JobsRedis extends PubSubRedis { +export class JobsRedis { protected static logger = new Logger(JobsRedis.name); public static primaryCallbacks: { @@ -13,24 +13,38 @@ export class JobsRedis extends PubSubRedis { {}; static async initJobs() { - if (!this.initialized) { - if (!this.available) { + if (!PubSubRedis.initialized) { + if (!PubSubRedis.available) { return; } - await this.init(); + await PubSubRedis.init(); } + const onMessage = async (channel, message) => { - const args = message.split(':'); - const command = args.shift(); - if (channel === InstanceTypes.WORKER) { - this.workerCallbacks[command] && - (await this.workerCallbacks[command](...args)); - } else if (channel === InstanceTypes.PRIMARY) { - this.primaryCallbacks[command] && - (await this.primaryCallbacks[command](...args)); + try { + if (!message) { + return; + } + const args = message.split(':'); + const command = args.shift(); + + if (channel === InstanceTypes.WORKER) { + this.workerCallbacks[command] && + (await this.workerCallbacks[command](...args)); + } else if (channel === InstanceTypes.PRIMARY) { + this.primaryCallbacks[command] && + (await this.primaryCallbacks[command](...args)); + } + } catch (error) { + this.logger.error({ + message: `Error processing redis pub-sub message ${message}`, + }); } }; + + PubSubRedis.redisSubscriber.on('message', onMessage); + if (process.env.NC_WORKER_CONTAINER === 'true') { await this.subscribe(InstanceTypes.WORKER, async (message) => { await onMessage(InstanceTypes.WORKER, message); diff --git a/packages/nocodb/src/redis/pubsub-redis.ts b/packages/nocodb/src/redis/pubsub-redis.ts index 1b06f55459..7df22f26b3 100644 --- a/packages/nocodb/src/redis/pubsub-redis.ts +++ b/packages/nocodb/src/redis/pubsub-redis.ts @@ -8,8 +8,8 @@ export class PubSubRedis { protected static logger = new Logger(PubSubRedis.name); - static redisClient: Redis; - private static redisSubscriber: Redis; + public static redisClient: Redis; + public static redisSubscriber: Redis; private static unsubscribeCallbacks: { [key: string]: () => Promise } = {}; private static callbacks: Record Promise> = {}; @@ -22,12 +22,6 @@ export class PubSubRedis { PubSubRedis.redisClient = new Redis(process.env.NC_REDIS_JOB_URL); PubSubRedis.redisSubscriber = new Redis(process.env.NC_REDIS_JOB_URL); - PubSubRedis.redisSubscriber.on('message', async (channel, message) => { - const [command, ...args] = message.split(':'); - const callback = PubSubRedis.callbacks[command]; - if (callback) await callback(...args); - }); - PubSubRedis.initialized = true; }