Browse Source

fix: use PubSubRedis instead of this

nc-fix/pubsub
mertmit 6 months ago
parent
commit
f132495e56
  1. 16
      packages/nocodb/src/modules/jobs/redis/jobs-redis.ts
  2. 6
      packages/nocodb/src/redis/pubsub-redis.ts

16
packages/nocodb/src/modules/jobs/redis/jobs-redis.ts

@ -46,28 +46,28 @@ export class JobsRedis {
PubSubRedis.redisSubscriber.on('message', onMessage);
if (process.env.NC_WORKER_CONTAINER === 'true') {
await this.subscribe(InstanceTypes.WORKER, async (message) => {
await PubSubRedis.subscribe(InstanceTypes.WORKER, async (message) => {
await onMessage(InstanceTypes.WORKER, message);
});
} else {
await this.subscribe(InstanceTypes.PRIMARY, async (message) => {
await PubSubRedis.subscribe(InstanceTypes.PRIMARY, async (message) => {
await onMessage(InstanceTypes.PRIMARY, message);
});
}
}
static async workerCount(): Promise<number> {
if (!this.initialized) {
if (!this.available) {
if (!PubSubRedis.initialized) {
if (!PubSubRedis.available) {
return;
}
await this.init();
await PubSubRedis.init();
await this.initJobs();
}
return new Promise((resolve) => {
this.redisClient.publish(
PubSubRedis.redisClient.publish(
InstanceTypes.WORKER,
'count',
(error, numberOfSubscribers) => {
@ -84,11 +84,11 @@ export class JobsRedis {
static async emitWorkerCommand(command: InstanceCommands, ...args: any[]) {
const data = `${command}${args.length ? `:${args.join(':')}` : ''}`;
await this.publish(InstanceTypes.WORKER, data);
await PubSubRedis.publish(InstanceTypes.WORKER, data);
}
static async emitPrimaryCommand(command: InstanceCommands, ...args: any[]) {
const data = `${command}${args.length ? `:${args.join(':')}` : ''}`;
await this.publish(InstanceTypes.PRIMARY, data);
await PubSubRedis.publish(InstanceTypes.PRIMARY, data);
}
}

6
packages/nocodb/src/redis/pubsub-redis.ts

@ -71,7 +71,11 @@ export class PubSubRedis {
await PubSubRedis.redisSubscriber.subscribe(channel);
const onMessage = async (_channel, message) => {
const onMessage = async (channel, message) => {
if (channel !== channel) {
return;
}
try {
message = JSON.parse(message);
} catch (e) {}

Loading…
Cancel
Save