Browse Source

fix: pubsub logic

nc-fix/pubsub
mertmit 6 months ago
parent
commit
21f777167d
  1. 22
      packages/nocodb/src/modules/jobs/redis/jobs-redis.ts
  2. 10
      packages/nocodb/src/redis/pubsub-redis.ts

22
packages/nocodb/src/modules/jobs/redis/jobs-redis.ts

@ -3,7 +3,7 @@ import type { InstanceCommands } from '~/interface/Jobs';
import { PubSubRedis } from '~/redis/pubsub-redis'; import { PubSubRedis } from '~/redis/pubsub-redis';
import { InstanceTypes } from '~/interface/Jobs'; import { InstanceTypes } from '~/interface/Jobs';
export class JobsRedis extends PubSubRedis { export class JobsRedis {
protected static logger = new Logger(JobsRedis.name); protected static logger = new Logger(JobsRedis.name);
public static primaryCallbacks: { public static primaryCallbacks: {
@ -13,16 +13,22 @@ export class JobsRedis extends PubSubRedis {
{}; {};
static async initJobs() { static async initJobs() {
if (!this.initialized) { if (!PubSubRedis.initialized) {
if (!this.available) { if (!PubSubRedis.available) {
return; return;
} }
await this.init(); await PubSubRedis.init();
} }
const onMessage = async (channel, message) => { const onMessage = async (channel, message) => {
try {
if (!message) {
return;
}
const args = message.split(':'); const args = message.split(':');
const command = args.shift(); const command = args.shift();
if (channel === InstanceTypes.WORKER) { if (channel === InstanceTypes.WORKER) {
this.workerCallbacks[command] && this.workerCallbacks[command] &&
(await this.workerCallbacks[command](...args)); (await this.workerCallbacks[command](...args));
@ -30,7 +36,15 @@ export class JobsRedis extends PubSubRedis {
this.primaryCallbacks[command] && this.primaryCallbacks[command] &&
(await this.primaryCallbacks[command](...args)); (await this.primaryCallbacks[command](...args));
} }
} catch (error) {
this.logger.error({
message: `Error processing redis pub-sub message ${message}`,
});
}
}; };
PubSubRedis.redisSubscriber.on('message', onMessage);
if (process.env.NC_WORKER_CONTAINER === 'true') { if (process.env.NC_WORKER_CONTAINER === 'true') {
await this.subscribe(InstanceTypes.WORKER, async (message) => { await this.subscribe(InstanceTypes.WORKER, async (message) => {
await onMessage(InstanceTypes.WORKER, message); await onMessage(InstanceTypes.WORKER, message);

10
packages/nocodb/src/redis/pubsub-redis.ts

@ -8,8 +8,8 @@ export class PubSubRedis {
protected static logger = new Logger(PubSubRedis.name); protected static logger = new Logger(PubSubRedis.name);
static redisClient: Redis; public static redisClient: Redis;
private static redisSubscriber: Redis; public static redisSubscriber: Redis;
private static unsubscribeCallbacks: { [key: string]: () => Promise<void> } = private static unsubscribeCallbacks: { [key: string]: () => Promise<void> } =
{}; {};
private static callbacks: Record<string, (...args) => Promise<void>> = {}; private static callbacks: Record<string, (...args) => Promise<void>> = {};
@ -22,12 +22,6 @@ export class PubSubRedis {
PubSubRedis.redisClient = new Redis(process.env.NC_REDIS_JOB_URL); PubSubRedis.redisClient = new Redis(process.env.NC_REDIS_JOB_URL);
PubSubRedis.redisSubscriber = new Redis(process.env.NC_REDIS_JOB_URL); PubSubRedis.redisSubscriber = new Redis(process.env.NC_REDIS_JOB_URL);
PubSubRedis.redisSubscriber.on('message', async (channel, message) => {
const [command, ...args] = message.split(':');
const callback = PubSubRedis.callbacks[command];
if (callback) await callback(...args);
});
PubSubRedis.initialized = true; PubSubRedis.initialized = true;
} }

Loading…
Cancel
Save