diff --git a/packages/nocodb/src/interface/Jobs.ts b/packages/nocodb/src/interface/Jobs.ts index c73353cd9b..90ec4739cb 100644 --- a/packages/nocodb/src/interface/Jobs.ts +++ b/packages/nocodb/src/interface/Jobs.ts @@ -15,6 +15,7 @@ export enum JobTypes { UpdateSrcStat = 'update-source-stat', HealthCheck = 'health-check', HandleWebhook = 'handle-webhook', + CleanUp = 'clean-up', } export enum JobStatus { diff --git a/packages/nocodb/src/models/Source.ts b/packages/nocodb/src/models/Source.ts index f6f478e6b2..54de376512 100644 --- a/packages/nocodb/src/models/Source.ts +++ b/packages/nocodb/src/models/Source.ts @@ -21,6 +21,8 @@ import { prepareForResponse, stringifyMetaProp, } from '~/utils/modelUtils'; +import { JobsRedis } from '~/modules/jobs/redis/jobs-redis'; +import { InstanceCommands } from '~/interface/Jobs'; // todo: hide credentials export default class Source implements SourceType { @@ -182,6 +184,11 @@ export default class Source implements SourceType { prepareForResponse(updateObj), ); + if (JobsRedis.available) { + await JobsRedis.emitWorkerCommand(InstanceCommands.RELEASE, sourceId); + await JobsRedis.emitPrimaryCommand(InstanceCommands.RELEASE, sourceId); + } + // call before reorder to update cache const returnBase = await this.get(oldBase.id, false, ncMeta); @@ -351,6 +358,15 @@ export default class Source implements SourceType { return Base.get(this.base_id, ncMeta); } + async sourceCleanup(_ncMeta = Noco.ncMeta) { + await NcConnectionMgrv2.deleteAwait(this); + + if (JobsRedis.available) { + await JobsRedis.emitWorkerCommand(InstanceCommands.RELEASE, this.id); + await JobsRedis.emitPrimaryCommand(InstanceCommands.RELEASE, this.id); + } + } + async delete(ncMeta = Noco.ncMeta, { force }: { force?: boolean } = {}) { const sources = await Source.list({ baseId: this.base_id }, ncMeta); @@ -422,7 +438,7 @@ export default class Source implements SourceType { await SyncSource.delete(syncSource.id, ncMeta); } - await NcConnectionMgrv2.deleteAwait(this); + await this.sourceCleanup(ncMeta); const res = await ncMeta.metaDelete(null, null, MetaTable.BASES, this.id); diff --git a/packages/nocodb/src/modules/jobs/fallback/jobs.service.ts b/packages/nocodb/src/modules/jobs/fallback/jobs.service.ts index 594e806a8e..c8f1676723 100644 --- a/packages/nocodb/src/modules/jobs/fallback/jobs.service.ts +++ b/packages/nocodb/src/modules/jobs/fallback/jobs.service.ts @@ -1,11 +1,14 @@ import { Injectable } from '@nestjs/common'; +import type { OnModuleInit } from '@nestjs/common'; import { QueueService } from '~/modules/jobs/fallback/fallback-queue.service'; import { JobStatus } from '~/interface/Jobs'; @Injectable() -export class JobsService { +export class JobsService implements OnModuleInit { constructor(private readonly fallbackQueueService: QueueService) {} + async onModuleInit() {} + async add(name: string, data: any) { return this.fallbackQueueService.add(name, data); } diff --git a/packages/nocodb/src/modules/jobs/jobs.controller.ts b/packages/nocodb/src/modules/jobs/jobs.controller.ts index 393b4dfcae..e9ea0b6995 100644 --- a/packages/nocodb/src/modules/jobs/jobs.controller.ts +++ b/packages/nocodb/src/modules/jobs/jobs.controller.ts @@ -11,10 +11,7 @@ import { import { Request } from 'express'; import { OnEvent } from '@nestjs/event-emitter'; import { customAlphabet } from 'nanoid'; -import { ModuleRef } from '@nestjs/core'; -import { JobsRedisService } from './redis/jobs-redis.service'; import type { Response } from 'express'; -import type { OnModuleInit } from '@nestjs/common'; import { JobStatus } from '~/interface/Jobs'; import { JobEvents } from '~/interface/Jobs'; import { GlobalGuard } from '~/guards/global/global.guard'; @@ -22,26 +19,18 @@ import NocoCache from '~/cache/NocoCache'; import { CacheGetType, CacheScope } from '~/utils/globals'; import { MetaApiLimiterGuard } from '~/guards/meta-api-limiter.guard'; import { IJobsService } from '~/modules/jobs/jobs-service.interface'; +import { JobsRedis } from '~/modules/jobs/redis/jobs-redis'; const nanoidv2 = customAlphabet('1234567890abcdefghijklmnopqrstuvwxyz', 14); const POLLING_INTERVAL = 30000; @Controller() @UseGuards(MetaApiLimiterGuard, GlobalGuard) -export class JobsController implements OnModuleInit { - jobsRedisService: JobsRedisService; - +export class JobsController { constructor( @Inject('JobsService') private readonly jobsService: IJobsService, - private moduleRef: ModuleRef, ) {} - onModuleInit() { - if (process.env.NC_REDIS_JOB_URL) { - this.jobsRedisService = this.moduleRef.get(JobsRedisService); - } - } - private jobRooms = {}; private localJobs = {}; private closedJobs = []; @@ -102,8 +91,8 @@ export class JobsController implements OnModuleInit { listeners: [res], }; // subscribe to job events - if (this.jobsRedisService) { - this.jobsRedisService.subscribe(jobId, (data) => { + if (JobsRedis.available) { + await JobsRedis.subscribe(jobId, async (data) => { if (this.jobRooms[jobId]) { this.jobRooms[jobId].listeners.forEach((res) => { if (!res.headersSent) { @@ -121,7 +110,7 @@ export class JobsController implements OnModuleInit { if ( [JobStatus.COMPLETED, JobStatus.FAILED].includes(data.status) ) { - this.jobsRedisService.unsubscribe(jobId); + await JobsRedis.unsubscribe(jobId); delete this.jobRooms[jobId]; this.closedJobs.push(jobId); setTimeout(() => { @@ -178,7 +167,11 @@ export class JobsController implements OnModuleInit { } @OnEvent(JobEvents.STATUS) - sendJobStatus(data: { id: string; status: JobStatus; data?: any }): void { + async sendJobStatus(data: { + id: string; + status: JobStatus; + data?: any; + }): Promise { let response; const jobId = data.id; @@ -196,7 +189,7 @@ export class JobsController implements OnModuleInit { this.localJobs[jobId].messages.shift(); } - NocoCache.set(`${CacheScope.JOBS}:${jobId}:messages`, { + await NocoCache.set(`${CacheScope.JOBS}:${jobId}:messages`, { messages: this.localJobs[jobId].messages, }); } else { @@ -211,7 +204,7 @@ export class JobsController implements OnModuleInit { _mid: 1, }; - NocoCache.set(`${CacheScope.JOBS}:${jobId}:messages`, { + await NocoCache.set(`${CacheScope.JOBS}:${jobId}:messages`, { messages: this.localJobs[jobId].messages, }); } @@ -224,8 +217,8 @@ export class JobsController implements OnModuleInit { }); } - if (this.jobsRedisService) { - this.jobsRedisService.publish(jobId, { + if (JobsRedis.available) { + await JobsRedis.publish(jobId, { cmd: JobEvents.STATUS, ...data, }); @@ -237,16 +230,19 @@ export class JobsController implements OnModuleInit { this.closedJobs = this.closedJobs.filter((j) => j !== jobId); }, POLLING_INTERVAL * 2); - setTimeout(() => { + setTimeout(async () => { delete this.jobRooms[jobId]; delete this.localJobs[jobId]; - NocoCache.del(`${CacheScope.JOBS}:${jobId}:messages`); + await NocoCache.del(`${CacheScope.JOBS}:${jobId}:messages`); }, POLLING_INTERVAL * 2); } } @OnEvent(JobEvents.LOG) - sendJobLog(data: { id: string; data: { message: string } }): void { + async sendJobLog(data: { + id: string; + data: { message: string }; + }): Promise { let response; const jobId = data.id; @@ -265,7 +261,7 @@ export class JobsController implements OnModuleInit { this.localJobs[jobId].messages.shift(); } - NocoCache.set(`${CacheScope.JOBS}:${jobId}:messages`, { + await NocoCache.set(`${CacheScope.JOBS}:${jobId}:messages`, { messages: this.localJobs[jobId].messages, }); } else { @@ -280,7 +276,7 @@ export class JobsController implements OnModuleInit { _mid: 1, }; - NocoCache.set(`${CacheScope.JOBS}:${jobId}:messages`, { + await NocoCache.set(`${CacheScope.JOBS}:${jobId}:messages`, { messages: this.localJobs[jobId].messages, }); } @@ -293,8 +289,8 @@ export class JobsController implements OnModuleInit { }); } - if (this.jobsRedisService) { - this.jobsRedisService.publish(jobId, { + if (JobsRedis.available) { + await JobsRedis.publish(jobId, { cmd: JobEvents.LOG, ...data, }); diff --git a/packages/nocodb/src/modules/jobs/jobs.module.ts b/packages/nocodb/src/modules/jobs/jobs.module.ts index 0d53ce224d..6f26ad3ddf 100644 --- a/packages/nocodb/src/modules/jobs/jobs.module.ts +++ b/packages/nocodb/src/modules/jobs/jobs.module.ts @@ -22,7 +22,6 @@ import { JobsLogService } from '~/modules/jobs/jobs/jobs-log.service'; // import { JobsGateway } from '~/modules/jobs/jobs.gateway'; import { JobsController } from '~/modules/jobs/jobs.controller'; import { JobsService } from '~/modules/jobs/redis/jobs.service'; -import { JobsRedisService } from '~/modules/jobs/redis/jobs-redis.service'; import { JobsEventService } from '~/modules/jobs/redis/jobs-event.service'; // Fallback @@ -60,7 +59,7 @@ export const JobsModuleMetadata = { providers: [ ...(process.env.NC_WORKER_CONTAINER !== 'true' ? [] : []), ...(process.env.NC_REDIS_JOB_URL - ? [JobsRedisService, JobsEventService] + ? [JobsEventService] : [FallbackQueueService, FallbackJobsEventService]), { provide: 'JobsService', diff --git a/packages/nocodb/src/modules/jobs/redis/jobs-redis.service.ts b/packages/nocodb/src/modules/jobs/redis/jobs-redis.service.ts deleted file mode 100644 index 67af1d76eb..0000000000 --- a/packages/nocodb/src/modules/jobs/redis/jobs-redis.service.ts +++ /dev/null @@ -1,91 +0,0 @@ -import { Injectable, Logger } from '@nestjs/common'; -import Redis from 'ioredis'; -import { InstanceTypes } from '~/interface/Jobs'; - -@Injectable() -export class JobsRedisService { - protected logger = new Logger(JobsRedisService.name); - - private redisClient: Redis; - private redisSubscriber: Redis; - private unsubscribeCallbacks: { [key: string]: () => void } = {}; - - public primaryCallbacks: { [key: string]: (...args) => void } = {}; - public workerCallbacks: { [key: string]: (...args) => void } = {}; - - constructor() { - this.redisClient = new Redis(process.env.NC_REDIS_JOB_URL); - this.redisSubscriber = new Redis(process.env.NC_REDIS_JOB_URL); - - if (process.env.NC_WORKER_CONTAINER === 'true') { - this.redisSubscriber.subscribe(InstanceTypes.WORKER); - } else { - this.redisSubscriber.subscribe(InstanceTypes.PRIMARY); - } - - const onMessage = (channel, message) => { - const args = message.split(':'); - const command = args.shift(); - if (channel === InstanceTypes.WORKER) { - this.workerCallbacks[command] && this.workerCallbacks[command](...args); - } else if (channel === InstanceTypes.PRIMARY) { - this.primaryCallbacks[command] && - this.primaryCallbacks[command](...args); - } - }; - - this.redisSubscriber.on('message', onMessage); - } - - publish(channel: string, message: string | any) { - if (typeof message === 'string') { - this.redisClient.publish(channel, message); - } else { - try { - this.redisClient.publish(channel, JSON.stringify(message)); - } catch (e) { - this.logger.error(e); - } - } - } - - subscribe(channel: string, callback: (message: any) => void) { - this.redisSubscriber.subscribe(channel); - - const onMessage = (_channel, message) => { - try { - message = JSON.parse(message); - } catch (e) {} - callback(message); - }; - - this.redisSubscriber.on('message', onMessage); - this.unsubscribeCallbacks[channel] = () => { - this.redisSubscriber.unsubscribe(channel); - this.redisSubscriber.off('message', onMessage); - }; - } - - unsubscribe(channel: string) { - if (this.unsubscribeCallbacks[channel]) { - this.unsubscribeCallbacks[channel](); - delete this.unsubscribeCallbacks[channel]; - } - } - - workerCount(): Promise { - return new Promise((resolve, reject) => { - this.redisClient.publish( - InstanceTypes.WORKER, - 'count', - (error, numberOfSubscribers) => { - if (error) { - reject(0); - } else { - resolve(numberOfSubscribers); - } - }, - ); - }); - } -} diff --git a/packages/nocodb/src/modules/jobs/redis/jobs-redis.ts b/packages/nocodb/src/modules/jobs/redis/jobs-redis.ts new file mode 100644 index 0000000000..50cc0851a2 --- /dev/null +++ b/packages/nocodb/src/modules/jobs/redis/jobs-redis.ts @@ -0,0 +1,155 @@ +import { Logger } from '@nestjs/common'; +import Redis from 'ioredis'; +import type { InstanceCommands } from '~/interface/Jobs'; +import { InstanceTypes } from '~/interface/Jobs'; + +export class JobsRedis { + private static initialized = false; + + public static available = process.env.NC_REDIS_JOB_URL ? true : false; + + protected static logger = new Logger(JobsRedis.name); + + private static redisClient: Redis; + private static redisSubscriber: Redis; + private static unsubscribeCallbacks: { [key: string]: () => Promise } = + {}; + + public static primaryCallbacks: { + [key: string]: (...args) => Promise; + } = {}; + public static workerCallbacks: { [key: string]: (...args) => Promise } = + {}; + + static async init() { + if (this.initialized) { + return; + } + + if (!JobsRedis.available) { + return; + } + + this.initialized = true; + + this.redisClient = new Redis(process.env.NC_REDIS_JOB_URL); + this.redisSubscriber = new Redis(process.env.NC_REDIS_JOB_URL); + + if (process.env.NC_WORKER_CONTAINER === 'true') { + await this.redisSubscriber.subscribe(InstanceTypes.WORKER); + } else { + await this.redisSubscriber.subscribe(InstanceTypes.PRIMARY); + } + + const onMessage = async (channel, message) => { + const args = message.split(':'); + const command = args.shift(); + if (channel === InstanceTypes.WORKER) { + this.workerCallbacks[command] && + (await this.workerCallbacks[command](...args)); + } else if (channel === InstanceTypes.PRIMARY) { + this.primaryCallbacks[command] && + (await this.primaryCallbacks[command](...args)); + } + }; + + this.redisSubscriber.on('message', onMessage); + } + + static async publish(channel: string, message: string | any) { + if (!this.initialized) { + if (!JobsRedis.available) { + return; + } + + await this.init(); + } + + if (typeof message === 'string') { + await this.redisClient.publish(channel, message); + } else { + try { + await this.redisClient.publish(channel, JSON.stringify(message)); + } catch (e) { + this.logger.error(e); + } + } + } + + static async subscribe( + channel: string, + callback: (message: any) => Promise, + ) { + if (!this.initialized) { + if (!JobsRedis.available) { + return; + } + + await this.init(); + } + + await this.redisSubscriber.subscribe(channel); + + const onMessage = async (_channel, message) => { + try { + message = JSON.parse(message); + } catch (e) {} + await callback(message); + }; + + this.redisSubscriber.on('message', onMessage); + this.unsubscribeCallbacks[channel] = async () => { + await this.redisSubscriber.unsubscribe(channel); + this.redisSubscriber.off('message', onMessage); + }; + } + + static async unsubscribe(channel: string) { + if (!this.initialized) { + if (!JobsRedis.available) { + return; + } + + await this.init(); + } + + if (this.unsubscribeCallbacks[channel]) { + await this.unsubscribeCallbacks[channel](); + delete this.unsubscribeCallbacks[channel]; + } + } + + static async workerCount(): Promise { + if (!this.initialized) { + if (!JobsRedis.available) { + return; + } + + await this.init(); + } + + return new Promise((resolve, reject) => { + this.redisClient.publish( + InstanceTypes.WORKER, + 'count', + (error, numberOfSubscribers) => { + if (error) { + reject(0); + } else { + resolve(numberOfSubscribers); + } + }, + ); + }); + } + + static async emitWorkerCommand(command: InstanceCommands, ...args: any[]) { + const data = `${command}${args.length ? `:${args.join(':')}` : ''}`; + await JobsRedis.publish(InstanceTypes.WORKER, data); + } + + static async emitPrimaryCommand(command: InstanceCommands, ...args: any[]) { + const data = `${command}${args.length ? `:${args.join(':')}` : ''}`; + await JobsRedis.publish(InstanceTypes.PRIMARY, data); + } +} diff --git a/packages/nocodb/src/modules/jobs/redis/jobs.service.ts b/packages/nocodb/src/modules/jobs/redis/jobs.service.ts index 23add1c064..a720da68aa 100644 --- a/packages/nocodb/src/modules/jobs/redis/jobs.service.ts +++ b/packages/nocodb/src/modules/jobs/redis/jobs.service.ts @@ -2,37 +2,27 @@ import { InjectQueue } from '@nestjs/bull'; import { Injectable, Logger } from '@nestjs/common'; import { Queue } from 'bull'; import type { OnModuleInit } from '@nestjs/common'; -import { - InstanceCommands, - InstanceTypes, - JOBS_QUEUE, - JobStatus, -} from '~/interface/Jobs'; -import { JobsRedisService } from '~/modules/jobs/redis/jobs-redis.service'; +import { InstanceCommands, JOBS_QUEUE, JobStatus } from '~/interface/Jobs'; +import { JobsRedis } from '~/modules/jobs/redis/jobs-redis'; @Injectable() export class JobsService implements OnModuleInit { protected logger = new Logger(JobsService.name); - constructor( - @InjectQueue(JOBS_QUEUE) public readonly jobsQueue: Queue, - protected readonly jobsRedisService: JobsRedisService, - ) {} + constructor(@InjectQueue(JOBS_QUEUE) public readonly jobsQueue: Queue) {} // pause primary instance queue async onModuleInit() { await this.toggleQueue(); - this.jobsRedisService.workerCallbacks[InstanceCommands.RESUME_LOCAL] = - async () => { - this.logger.log('Resuming local queue'); - await this.jobsQueue.resume(true); - }; - this.jobsRedisService.workerCallbacks[InstanceCommands.PAUSE_LOCAL] = - async () => { - this.logger.log('Pausing local queue'); - await this.jobsQueue.pause(true); - }; + JobsRedis.workerCallbacks[InstanceCommands.RESUME_LOCAL] = async () => { + this.logger.log('Resuming local queue'); + await this.jobsQueue.resume(true); + }; + JobsRedis.workerCallbacks[InstanceCommands.PAUSE_LOCAL] = async () => { + this.logger.log('Pausing local queue'); + await this.jobsQueue.pause(true); + }; } async toggleQueue() { @@ -40,7 +30,7 @@ export class JobsService implements OnModuleInit { await this.jobsQueue.pause(true); } else if (process.env.NC_WORKER_CONTAINER !== 'true') { // resume primary instance queue if there is no worker - const workerCount = await this.jobsRedisService.workerCount(); + const workerCount = await JobsRedis.workerCount(); const localWorkerPaused = await this.jobsQueue.isPaused(true); // if there is no worker and primary instance queue is paused, resume it @@ -112,12 +102,10 @@ export class JobsService implements OnModuleInit { } async emitWorkerCommand(command: InstanceCommands, ...args: any[]) { - const data = `${command}${args.length ? `:${args.join(':')}` : ''}`; - await this.jobsRedisService.publish(InstanceTypes.WORKER, data); + return JobsRedis.emitWorkerCommand(command, ...args); } async emitPrimaryCommand(command: InstanceCommands, ...args: any[]) { - const data = `${command}${args.length ? `:${args.join(':')}` : ''}`; - await this.jobsRedisService.publish(InstanceTypes.PRIMARY, data); + return JobsRedis.emitPrimaryCommand(command, ...args); } } diff --git a/packages/nocodb/src/utils/common/NcConnectionMgrv2.ts b/packages/nocodb/src/utils/common/NcConnectionMgrv2.ts index aca16dd7c2..5afba858c3 100644 --- a/packages/nocodb/src/utils/common/NcConnectionMgrv2.ts +++ b/packages/nocodb/src/utils/common/NcConnectionMgrv2.ts @@ -1,3 +1,4 @@ +import { Logger } from '@nestjs/common'; import type Source from '~/models/Source'; import { defaultConnectionConfig, @@ -8,6 +9,8 @@ import { XKnex } from '~/db/CustomKnex'; import Noco from '~/Noco'; export default class NcConnectionMgrv2 { + private static logger = new Logger('NcConnectionMgrv2'); + protected static connectionRefs: { [baseId: string]: { [sourceId: string]: XKnex; @@ -22,31 +25,39 @@ export default class NcConnectionMgrv2 { } } - // Todo: Should await on connection destroy - public static delete(source: Source) { + public static async deleteAwait(source: Source) { // todo: ignore meta bases if (this.connectionRefs?.[source.base_id]?.[source.id]) { try { const conn = this.connectionRefs?.[source.base_id]?.[source.id]; - conn.destroy(); + await conn.destroy(); delete this.connectionRefs?.[source.base_id][source.id]; } catch (e) { - console.log(e); + this.logger.error({ + error: e, + details: 'Error deleting connection ref', + }); } } } - public static async deleteAwait(source: Source) { - // todo: ignore meta bases - if (this.connectionRefs?.[source.base_id]?.[source.id]) { + public static async deleteConnectionRef(sourceId: string) { + let deleted = false; + for (const baseId in this.connectionRefs) { try { - const conn = this.connectionRefs?.[source.base_id]?.[source.id]; - await conn.destroy(); - delete this.connectionRefs?.[source.base_id][source.id]; + if (this.connectionRefs[baseId][sourceId]) { + await this.connectionRefs[baseId][sourceId].destroy(); + delete this.connectionRefs[baseId][sourceId]; + deleted = true; + } } catch (e) { - console.log(e); + this.logger.error({ + error: e, + details: 'Error deleting connection ref', + }); } } + return deleted; } public static async get(source: Source): Promise {