From b646aa2bbd595e2618a3ee2f5084e7b0e54195c7 Mon Sep 17 00:00:00 2001 From: "Mert E." Date: Fri, 16 Aug 2024 12:23:22 +0300 Subject: [PATCH] feat: improve queue logic (#9255) * feat: improve worker logic * feat: job requeue & version handling --- packages/nocodb/src/interface/Jobs.ts | 17 ++- packages/nocodb/src/meta/meta.service.ts | 2 +- .../jobs/fallback/fallback-queue.service.ts | 116 ++++++------------ .../src/modules/jobs/fallback/jobs.service.ts | 51 ++++++-- .../src/modules/jobs/jobs-event.service.ts | 9 ++ .../src/modules/jobs/jobs-map.service.ts | 79 ++++++++++++ .../modules/jobs/jobs-service.interface.ts | 3 +- .../src/modules/jobs/jobs.controller.ts | 18 +++ .../nocodb/src/modules/jobs/jobs.module.ts | 10 +- .../nocodb/src/modules/jobs/jobs.processor.ts | 82 +++++++++++++ .../jobs/at-import/at-import.processor.ts | 9 +- .../attachment-clean-up.ts | 8 +- .../jobs/data-export/data-export.processor.ts | 10 +- .../jobs/export-import/duplicate.processor.ts | 11 +- .../jobs/meta-sync/meta-sync.processor.ts | 8 +- .../source-create/source-create.processor.ts | 8 +- .../source-delete/source-delete.processor.ts | 8 +- .../thumbnail-generator.processor.ts | 8 +- .../webhook-handler.processor.ts | 13 +- .../migration-jobs/init-migration-jobs.ts | 14 +-- .../migration-jobs/nc_job_002_thumbnail.ts | 3 - .../src/modules/jobs/redis/jobs.service.ts | 45 +++++-- 22 files changed, 358 insertions(+), 174 deletions(-) create mode 100644 packages/nocodb/src/modules/jobs/jobs-map.service.ts create mode 100644 packages/nocodb/src/modules/jobs/jobs.processor.ts diff --git a/packages/nocodb/src/interface/Jobs.ts b/packages/nocodb/src/interface/Jobs.ts index d2291ee0fa..d4dd99d8b6 100644 --- a/packages/nocodb/src/interface/Jobs.ts +++ b/packages/nocodb/src/interface/Jobs.ts @@ -3,7 +3,6 @@ import type { NcContext, NcRequest } from '~/interface/config'; export const JOBS_QUEUE = 'jobs'; export enum MigrationJobTypes { - InitMigrationJobs = 'init-migration-jobs', Attachment = 'attachment', Thumbnail = 'thumbnail', } @@ -25,6 +24,7 @@ export enum JobTypes { DataExport = 'data-export', ThumbnailGenerator = 'thumbnail-generator', AttachmentCleanUp = 'attachment-clean-up', + InitMigrationJobs = 'init-migration-jobs', } export enum JobStatus { @@ -35,6 +35,7 @@ export enum JobStatus { FAILED = 'failed', PAUSED = 'paused', REFRESH = 'refresh', + REQUEUED = 'requeued', } export enum JobEvents { @@ -42,6 +43,14 @@ export enum JobEvents { LOG = 'job.log', } +export const JobVersions: { + [key in JobTypes]?: number; +} = {}; + +export const JOB_REQUEUED = 'job.requeued'; + +export const JOB_REQUEUE_LIMIT = 10; + export const InstanceTypes = { PRIMARY: `${process.env.NC_ENV ?? 'default'}-primary`, WORKER: `${process.env.NC_ENV ?? 'default'}-worker`, @@ -55,6 +64,12 @@ export enum InstanceCommands { } export interface JobData { + // meta info + jobName: string; + _jobDelay?: number; + _jobAttempt?: number; + _jobVersion?: number; + // context context: NcContext; user: Partial; } diff --git a/packages/nocodb/src/meta/meta.service.ts b/packages/nocodb/src/meta/meta.service.ts index dab73a65b8..98b4f15c9f 100644 --- a/packages/nocodb/src/meta/meta.service.ts +++ b/packages/nocodb/src/meta/meta.service.ts @@ -223,7 +223,7 @@ export class MetaService { target: string, data: any | any[], ids: string[], - condition?: { [key: string]: any }, + condition?: { [p: string]: any }, ): Promise { if (Array.isArray(data) ? !data.length : !data) { return []; diff --git a/packages/nocodb/src/modules/jobs/fallback/fallback-queue.service.ts b/packages/nocodb/src/modules/jobs/fallback/fallback-queue.service.ts index 8f2de949fb..d51289c39d 100644 --- a/packages/nocodb/src/modules/jobs/fallback/fallback-queue.service.ts +++ b/packages/nocodb/src/modules/jobs/fallback/fallback-queue.service.ts @@ -1,18 +1,9 @@ -import { Injectable } from '@nestjs/common'; +import { forwardRef, Inject, Injectable } from '@nestjs/common'; import PQueue from 'p-queue'; import Emittery from 'emittery'; -import { DuplicateProcessor } from '~/modules/jobs/jobs/export-import/duplicate.processor'; -import { AtImportProcessor } from '~/modules/jobs/jobs/at-import/at-import.processor'; -import { MetaSyncProcessor } from '~/modules/jobs/jobs/meta-sync/meta-sync.processor'; -import { SourceCreateProcessor } from '~/modules/jobs/jobs/source-create/source-create.processor'; -import { SourceDeleteProcessor } from '~/modules/jobs/jobs/source-delete/source-delete.processor'; -import { WebhookHandlerProcessor } from '~/modules/jobs/jobs/webhook-handler/webhook-handler.processor'; -import { DataExportProcessor } from '~/modules/jobs/jobs/data-export/data-export.processor'; import { JobsEventService } from '~/modules/jobs/jobs-event.service'; -import { JobStatus, JobTypes, MigrationJobTypes } from '~/interface/Jobs'; -import { ThumbnailGeneratorProcessor } from '~/modules/jobs/jobs/thumbnail-generator/thumbnail-generator.processor'; -import { AttachmentCleanUpProcessor } from '~/modules/jobs/jobs/attachment-clean-up/attachment-clean-up'; -import { InitMigrationJobs } from '~/modules/jobs/migration-jobs/init-migration-jobs'; +import { JobStatus } from '~/interface/Jobs'; +import { JobsMap } from '~/modules/jobs/jobs-map.service'; export interface Job { id: string; @@ -31,16 +22,7 @@ export class QueueService { constructor( protected readonly jobsEventService: JobsEventService, - protected readonly duplicateProcessor: DuplicateProcessor, - protected readonly atImportProcessor: AtImportProcessor, - protected readonly metaSyncProcessor: MetaSyncProcessor, - protected readonly sourceCreateProcessor: SourceCreateProcessor, - protected readonly sourceDeleteProcessor: SourceDeleteProcessor, - protected readonly webhookHandlerProcessor: WebhookHandlerProcessor, - protected readonly dataExportProcessor: DataExportProcessor, - protected readonly thumbnailGeneratorProcessor: ThumbnailGeneratorProcessor, - protected readonly attachmentCleanUpProcessor: AttachmentCleanUpProcessor, - protected readonly initMigrationJobs: InitMigrationJobs, + @Inject(forwardRef(() => JobsMap)) protected readonly jobsMap: JobsMap, ) { this.emitter.on(JobStatus.ACTIVE, (data: { job: Job }) => { const job = this.queueMemory.find((job) => job.id === data.job.id); @@ -69,64 +51,12 @@ export class QueueService { }); } - jobMap = { - [JobTypes.DuplicateBase]: { - this: this.duplicateProcessor, - fn: this.duplicateProcessor.duplicateBase, - }, - [JobTypes.DuplicateModel]: { - this: this.duplicateProcessor, - fn: this.duplicateProcessor.duplicateModel, - }, - [JobTypes.DuplicateColumn]: { - this: this.duplicateProcessor, - fn: this.duplicateProcessor.duplicateColumn, - }, - [JobTypes.AtImport]: { - this: this.atImportProcessor, - fn: this.atImportProcessor.job, - }, - [JobTypes.MetaSync]: { - this: this.metaSyncProcessor, - fn: this.metaSyncProcessor.job, - }, - [JobTypes.SourceCreate]: { - this: this.sourceCreateProcessor, - fn: this.sourceCreateProcessor.job, - }, - [JobTypes.SourceDelete]: { - this: this.sourceDeleteProcessor, - fn: this.sourceDeleteProcessor.job, - }, - [JobTypes.HandleWebhook]: { - this: this.webhookHandlerProcessor, - fn: this.webhookHandlerProcessor.job, - }, - [JobTypes.DataExport]: { - this: this.dataExportProcessor, - fn: this.dataExportProcessor.job, - }, - [JobTypes.ThumbnailGenerator]: { - this: this.thumbnailGeneratorProcessor, - fn: this.thumbnailGeneratorProcessor.job, - }, - [JobTypes.AttachmentCleanUp]: { - this: this.attachmentCleanUpProcessor, - fn: this.attachmentCleanUpProcessor.job, - }, - [MigrationJobTypes.InitMigrationJobs]: { - this: this.initMigrationJobs, - fn: this.initMigrationJobs.job, - }, - }; - async jobWrapper(job: Job) { this.emitter.emit(JobStatus.ACTIVE, { job }); + try { - const result = await this.jobMap[job.name].fn.apply( - this.jobMap[job.name].this, - [job], - ); + const { this: processor, fn = 'job' } = this.jobsMap.jobs[job.name]; + const result = await processor[fn](job); this.emitter.emit(JobStatus.COMPLETED, { job, result }); } catch (error) { this.emitter.emit(JobStatus.FAILED, { job, error }); @@ -153,11 +83,35 @@ export class QueueService { QueueService.queueIdCounter = index; } - add(name: string, data: any, opts?: { jobId?: string }) { + add(name: string, data: any, opts?: { jobId?: string; delay?: number }) { const id = opts?.jobId || `${this.queueIndex++}`; - const job = { id: `${id}`, name, status: JobStatus.WAITING, data }; - this.queueMemory.push(job); - this.queue.add(() => this.jobWrapper(job)); + const existingJob = this.queueMemory.find((q) => q.id === id); + + let job; + + if (existingJob) { + if (existingJob.status !== JobStatus.WAITING) { + existingJob.status = JobStatus.WAITING; + } + job = existingJob; + } else { + job = { id: `${id}`, name, status: JobStatus.WAITING, data }; + } + + if (opts?.delay) { + setTimeout(() => { + if (!existingJob) { + this.queueMemory.push(job); + } + this.queue.add(() => this.jobWrapper(job)); + }, opts.delay); + } else { + if (!existingJob) { + this.queueMemory.push(job); + } + this.queue.add(() => this.jobWrapper(job)); + } + return { id, name }; } diff --git a/packages/nocodb/src/modules/jobs/fallback/jobs.service.ts b/packages/nocodb/src/modules/jobs/fallback/jobs.service.ts index 7db63a8fad..aa0580420e 100644 --- a/packages/nocodb/src/modules/jobs/fallback/jobs.service.ts +++ b/packages/nocodb/src/modules/jobs/fallback/jobs.service.ts @@ -1,7 +1,7 @@ import { Injectable } from '@nestjs/common'; import type { OnModuleInit } from '@nestjs/common'; import { QueueService } from '~/modules/jobs/fallback/fallback-queue.service'; -import { JobStatus, MigrationJobTypes } from '~/interface/Jobs'; +import { JobStatus, JobTypes, JobVersions } from '~/interface/Jobs'; import { Job } from '~/models'; import { RootScopes } from '~/utils/globals'; @@ -10,23 +10,56 @@ export class JobsService implements OnModuleInit { constructor(private readonly fallbackQueueService: QueueService) {} async onModuleInit() { - await this.add(MigrationJobTypes.InitMigrationJobs, {}); + await this.add(JobTypes.InitMigrationJobs, {}); } - async add(name: string, data: any) { + async add( + name: string, + data: any, + options?: { + jobId?: string; + delay?: number; // delay in ms + }, + ) { const context = { workspace_id: RootScopes.ROOT, base_id: RootScopes.ROOT, ...(data?.context || {}), }; - const jobData = await Job.insert(context, { - job: name, - status: JobStatus.WAITING, - fk_user_id: data?.user?.id, - }); + let jobData; + + if (options?.jobId) { + const existingJob = await Job.get(context, options.jobId); + if (existingJob) { + jobData = existingJob; + + if (existingJob.status !== JobStatus.WAITING) { + await Job.update(context, existingJob.id, { + status: JobStatus.WAITING, + }); + } + } + } + + if (!jobData) { + jobData = await Job.insert(context, { + job: name, + status: JobStatus.WAITING, + fk_user_id: data?.user?.id, + }); + } - this.fallbackQueueService.add(name, data, { jobId: jobData.id }); + data.jobName = name; + + if (JobVersions?.[name]) { + data._jobVersion = JobVersions[name]; + } + + this.fallbackQueueService.add(name, data, { + jobId: jobData.id, + ...options, + }); return jobData; } diff --git a/packages/nocodb/src/modules/jobs/jobs-event.service.ts b/packages/nocodb/src/modules/jobs/jobs-event.service.ts index 9ea4b2b635..c0bfe372a8 100644 --- a/packages/nocodb/src/modules/jobs/jobs-event.service.ts +++ b/packages/nocodb/src/modules/jobs/jobs-event.service.ts @@ -81,6 +81,15 @@ export class JobsEventService { @OnQueueCompleted() onCompleted(job: BullJob, data: any) { + // If job was requeued, don't update the status + if (data === JobStatus.REQUEUED) { + this.eventEmitter.emit(JobEvents.STATUS, { + id: job.id.toString(), + status: JobStatus.REQUEUED, + }); + return; + } + Job.update( { workspace_id: RootScopes.ROOT, diff --git a/packages/nocodb/src/modules/jobs/jobs-map.service.ts b/packages/nocodb/src/modules/jobs/jobs-map.service.ts new file mode 100644 index 0000000000..142b4d2a98 --- /dev/null +++ b/packages/nocodb/src/modules/jobs/jobs-map.service.ts @@ -0,0 +1,79 @@ +import { Injectable } from '@nestjs/common'; +import { DuplicateProcessor } from '~/modules/jobs/jobs/export-import/duplicate.processor'; +import { AtImportProcessor } from '~/modules/jobs/jobs/at-import/at-import.processor'; +import { MetaSyncProcessor } from '~/modules/jobs/jobs/meta-sync/meta-sync.processor'; +import { SourceCreateProcessor } from '~/modules/jobs/jobs/source-create/source-create.processor'; +import { SourceDeleteProcessor } from '~/modules/jobs/jobs/source-delete/source-delete.processor'; +import { WebhookHandlerProcessor } from '~/modules/jobs/jobs/webhook-handler/webhook-handler.processor'; +import { DataExportProcessor } from '~/modules/jobs/jobs/data-export/data-export.processor'; +import { ThumbnailGeneratorProcessor } from '~/modules/jobs/jobs/thumbnail-generator/thumbnail-generator.processor'; +import { AttachmentCleanUpProcessor } from '~/modules/jobs/jobs/attachment-clean-up/attachment-clean-up'; +import { InitMigrationJobs } from '~/modules/jobs/migration-jobs/init-migration-jobs'; +import { JobTypes } from '~/interface/Jobs'; + +@Injectable() +export class JobsMap { + constructor( + protected readonly duplicateProcessor: DuplicateProcessor, + protected readonly atImportProcessor: AtImportProcessor, + protected readonly metaSyncProcessor: MetaSyncProcessor, + protected readonly sourceCreateProcessor: SourceCreateProcessor, + protected readonly sourceDeleteProcessor: SourceDeleteProcessor, + protected readonly webhookHandlerProcessor: WebhookHandlerProcessor, + protected readonly dataExportProcessor: DataExportProcessor, + protected readonly thumbnailGeneratorProcessor: ThumbnailGeneratorProcessor, + protected readonly attachmentCleanUpProcessor: AttachmentCleanUpProcessor, + protected readonly initMigrationJobs: InitMigrationJobs, + ) {} + + protected _jobMap: { + [key in JobTypes]?: { + this: any; + fn?: string; + }; + } = { + [JobTypes.DuplicateBase]: { + this: this.duplicateProcessor, + fn: 'duplicateBase', + }, + [JobTypes.DuplicateModel]: { + this: this.duplicateProcessor, + fn: 'duplicateModel', + }, + [JobTypes.DuplicateColumn]: { + this: this.duplicateProcessor, + fn: 'duplicateColumn', + }, + [JobTypes.AtImport]: { + this: this.atImportProcessor, + }, + [JobTypes.MetaSync]: { + this: this.metaSyncProcessor, + }, + [JobTypes.SourceCreate]: { + this: this.sourceCreateProcessor, + }, + [JobTypes.SourceDelete]: { + this: this.sourceDeleteProcessor, + }, + [JobTypes.HandleWebhook]: { + this: this.webhookHandlerProcessor, + }, + [JobTypes.DataExport]: { + this: this.dataExportProcessor, + }, + [JobTypes.ThumbnailGenerator]: { + this: this.thumbnailGeneratorProcessor, + }, + [JobTypes.AttachmentCleanUp]: { + this: this.attachmentCleanUpProcessor, + }, + [JobTypes.InitMigrationJobs]: { + this: this.initMigrationJobs, + }, + }; + + public get jobs() { + return this._jobMap; + } +} diff --git a/packages/nocodb/src/modules/jobs/jobs-service.interface.ts b/packages/nocodb/src/modules/jobs/jobs-service.interface.ts index 3696067b71..ef7763759b 100644 --- a/packages/nocodb/src/modules/jobs/jobs-service.interface.ts +++ b/packages/nocodb/src/modules/jobs/jobs-service.interface.ts @@ -1,10 +1,11 @@ +import type { JobOptions } from 'bull'; import type Bull from 'bull'; import type { JobStatus } from '~/interface/Jobs'; export interface IJobsService { jobsQueue: Bull.Queue; toggleQueue(): Promise; - add(name: string, data: any): Promise>; + add(name: string, data: any, options?: JobOptions): Promise>; jobStatus(jobId: string): Promise; jobList(): Promise[]>; resumeQueue(): Promise; diff --git a/packages/nocodb/src/modules/jobs/jobs.controller.ts b/packages/nocodb/src/modules/jobs/jobs.controller.ts index 045cc60da2..8614f1d071 100644 --- a/packages/nocodb/src/modules/jobs/jobs.controller.ts +++ b/packages/nocodb/src/modules/jobs/jobs.controller.ts @@ -159,6 +159,24 @@ export class JobsController { const jobId = data.id; + // clean as it might be taken by another worker + if (data.status === JobStatus.REQUEUED) { + if (this.jobRooms[jobId]) { + this.jobRooms[jobId].listeners.forEach((res) => { + if (!res.headersSent) { + res.send({ + status: 'refresh', + }); + } + }); + } + + delete this.jobRooms[jobId]; + delete this.localJobs[jobId]; + await NocoCache.del(`${CacheScope.JOBS_POLLING}:${jobId}:messages`); + return; + } + if (this.localJobs[jobId]) { response = { status: 'update', diff --git a/packages/nocodb/src/modules/jobs/jobs.module.ts b/packages/nocodb/src/modules/jobs/jobs.module.ts index 46567ea5b1..47397dfd69 100644 --- a/packages/nocodb/src/modules/jobs/jobs.module.ts +++ b/packages/nocodb/src/modules/jobs/jobs.module.ts @@ -21,6 +21,10 @@ import { DataExportController } from '~/modules/jobs/jobs/data-export/data-expor import { ThumbnailGeneratorProcessor } from '~/modules/jobs/jobs/thumbnail-generator/thumbnail-generator.processor'; import { AttachmentCleanUpProcessor } from '~/modules/jobs/jobs/attachment-clean-up/attachment-clean-up'; +// Job Processor +import { JobsProcessor } from '~/modules/jobs/jobs.processor'; +import { JobsMap } from '~/modules/jobs/jobs-map.service'; + // Migration Jobs import { InitMigrationJobs } from '~/modules/jobs/migration-jobs/init-migration-jobs'; import { AttachmentMigration } from '~/modules/jobs/migration-jobs/nc_job_001_attachment'; @@ -48,6 +52,9 @@ export const JobsModuleMetadata = { }), BullModule.registerQueue({ name: JOBS_QUEUE, + defaultJobOptions: { + removeOnComplete: true, + }, }), ] : []), @@ -66,7 +73,7 @@ export const JobsModuleMetadata = { : []), ], providers: [ - ...(process.env.NC_WORKER_CONTAINER !== 'true' ? [] : []), + JobsMap, JobsEventService, ...(process.env.NC_REDIS_JOB_URL ? [] : [FallbackQueueService]), { @@ -76,6 +83,7 @@ export const JobsModuleMetadata = { : FallbackJobsService, }, JobsLogService, + JobsProcessor, ExportService, ImportService, DuplicateProcessor, diff --git a/packages/nocodb/src/modules/jobs/jobs.processor.ts b/packages/nocodb/src/modules/jobs/jobs.processor.ts new file mode 100644 index 0000000000..26abdf178a --- /dev/null +++ b/packages/nocodb/src/modules/jobs/jobs.processor.ts @@ -0,0 +1,82 @@ +import { Process, Processor } from '@nestjs/bull'; +import { Inject, Logger } from '@nestjs/common'; +import { Job } from 'bull'; +import type { JobData } from '~/interface/Jobs'; +import { JOB_REQUEUE_LIMIT, JOBS_QUEUE, JobVersions } from '~/interface/Jobs'; +import { IJobsService } from '~/modules/jobs/jobs-service.interface'; +import { JobsMap } from '~/modules/jobs/jobs-map.service'; +import { JobsEventService } from '~/modules/jobs/jobs-event.service'; +import { JobStatus } from '~/interface/Jobs'; + +const NC_WORKER_CONCURRENCY = process.env.NC_WORKER_CONCURRENCY ?? 5; + +@Processor(JOBS_QUEUE) +export class JobsProcessor { + private logger = new Logger(JobsProcessor.name); + + constructor( + @Inject('JobsService') protected readonly jobsService: IJobsService, + protected readonly jobsEventService: JobsEventService, + protected readonly jobsMap: JobsMap, + ) {} + + @Process({ + concurrency: +NC_WORKER_CONCURRENCY, + }) + async process(job: Job) { + const { jobName } = job.data; + + if (!this.jobsMap.jobs[jobName]) { + this.logger.error(`Job not found for ${jobName}`); + await this.requeue(job); + return; + } + + const { this: processor, fn = 'job' } = this.jobsMap.jobs[jobName]; + + if (!processor[fn]) { + this.logger.error(`Job function not found for ${jobName}`); + await this.requeue(job); + return; + } + + if (JobVersions[jobName] || job.data?._jobVersion) { + if (JobVersions[jobName] !== job.data._jobVersion) { + this.logger.error(`Job version mismatch for ${jobName}`); + await this.requeue(job); + return; + } + } + + try { + return await processor[fn](job); + } catch (e) { + this.logger.error(`Error processing job ${jobName}`, e); + throw e; + } + } + + async requeue(job: Job) { + // Remove the job from the queue otherwise ids will clash + await job.releaseLock(); + await job.remove(); + + await this.jobsEventService.onCompleted(job, JobStatus.REQUEUED); + + const _jobDelay = job.data?._jobDelay ?? 0; + const _jobAttempt = job.data?._jobAttempt ?? 1; + + if (_jobAttempt > JOB_REQUEUE_LIMIT) { + this.logger.error(`Job ${job.data.jobName} failed after 10 attempts`); + return; + } + + job.data._jobDelay = _jobDelay + 5000; + job.data._jobAttempt = _jobAttempt + 1; + + return this.jobsService.add(job.data.jobName, job.data, { + jobId: job.id.toString(), + delay: job.data._jobDelay, + }); + } +} diff --git a/packages/nocodb/src/modules/jobs/jobs/at-import/at-import.processor.ts b/packages/nocodb/src/modules/jobs/jobs/at-import/at-import.processor.ts index e4979aac2d..0725743064 100644 --- a/packages/nocodb/src/modules/jobs/jobs/at-import/at-import.processor.ts +++ b/packages/nocodb/src/modules/jobs/jobs/at-import/at-import.processor.ts @@ -5,15 +5,14 @@ import hash from 'object-hash'; import dayjs from 'dayjs'; import utc from 'dayjs/plugin/utc'; import tinycolor from 'tinycolor2'; -import { Process, Processor } from '@nestjs/bull'; -import { Job } from 'bull'; import { isLinksOrLTAR } from 'nocodb-sdk'; import debug from 'debug'; -import { Logger } from '@nestjs/common'; +import { Injectable, Logger } from '@nestjs/common'; import { JobsLogService } from '../jobs-log.service'; import FetchAT from './helpers/fetchAT'; import { importData } from './helpers/readAndProcessData'; import EntityMap from './helpers/EntityMap'; +import type { Job } from 'bull'; import type { UserType } from 'nocodb-sdk'; import type { AtImportJobData } from '~/interface/Jobs'; import { type Base, Model, Source } from '~/models'; @@ -32,7 +31,6 @@ import { TablesService } from '~/services/tables.service'; import { ViewColumnsService } from '~/services/view-columns.service'; import { ViewsService } from '~/services/views.service'; import { FormsService } from '~/services/forms.service'; -import { JOBS_QUEUE, JobTypes } from '~/interface/Jobs'; import { GridColumnsService } from '~/services/grid-columns.service'; import { TelemetryService } from '~/services/telemetry.service'; import NcConnectionMgrv2 from '~/utils/common/NcConnectionMgrv2'; @@ -88,7 +86,7 @@ const selectColors = { grayDarker: '#444', }; -@Processor(JOBS_QUEUE) +@Injectable() export class AtImportProcessor { private readonly debugLog = debug('nc:jobs:at-import'); @@ -112,7 +110,6 @@ export class AtImportProcessor { private readonly telemetryService: TelemetryService, ) {} - @Process(JobTypes.AtImport) async job(job: Job) { this.debugLog(`job started for ${job.id}`); diff --git a/packages/nocodb/src/modules/jobs/jobs/attachment-clean-up/attachment-clean-up.ts b/packages/nocodb/src/modules/jobs/jobs/attachment-clean-up/attachment-clean-up.ts index 3f040944f4..076e01f338 100644 --- a/packages/nocodb/src/modules/jobs/jobs/attachment-clean-up/attachment-clean-up.ts +++ b/packages/nocodb/src/modules/jobs/jobs/attachment-clean-up/attachment-clean-up.ts @@ -1,8 +1,6 @@ import path from 'path'; import debug from 'debug'; -import { Process, Processor } from '@nestjs/bull'; -import { Job } from 'bull'; -import { JOBS_QUEUE, JobTypes } from '~/interface/Jobs'; +import type { Job } from 'bull'; import Noco from '~/Noco'; import { MetaTable } from '~/utils/globals'; import NcPluginMgrv2 from '~/helpers/NcPluginMgrv2'; @@ -10,13 +8,9 @@ import { getPathFromUrl } from '~/helpers/attachmentHelpers'; const retentionDays = process.env.NC_ATTACHMENT_RETENTION_DAYS || 10; -@Processor(JOBS_QUEUE) export class AttachmentCleanUpProcessor { private readonly debugLog = debug('nc:jobs:attachment-clean-up'); - constructor() {} - - @Process(JobTypes.AttachmentCleanUp) async job(job: Job) { // if retentionDays is set to 0, clean up is disabled if (+retentionDays === 0) { diff --git a/packages/nocodb/src/modules/jobs/jobs/data-export/data-export.processor.ts b/packages/nocodb/src/modules/jobs/jobs/data-export/data-export.processor.ts index 6bfb579dbb..18801ded6a 100644 --- a/packages/nocodb/src/modules/jobs/jobs/data-export/data-export.processor.ts +++ b/packages/nocodb/src/modules/jobs/jobs/data-export/data-export.processor.ts @@ -1,10 +1,9 @@ import { Readable } from 'stream'; import path from 'path'; -import { Process, Processor } from '@nestjs/bull'; -import { Logger } from '@nestjs/common'; -import { Job } from 'bull'; +import { Injectable, Logger } from '@nestjs/common'; import moment from 'moment'; -import { type DataExportJobData, JOBS_QUEUE, JobTypes } from '~/interface/Jobs'; +import type { Job } from 'bull'; +import { type DataExportJobData } from '~/interface/Jobs'; import { elapsedTime, initTime } from '~/modules/jobs/helpers'; import { ExportService } from '~/modules/jobs/jobs/export-import/export.service'; import { Model, PresignedUrl, View } from '~/models'; @@ -15,13 +14,12 @@ function getViewTitle(view: View) { return view?.is_default ? 'Default View' : view?.title; } -@Processor(JOBS_QUEUE) +@Injectable() export class DataExportProcessor { private logger = new Logger(DataExportProcessor.name); constructor(private readonly exportService: ExportService) {} - @Process(JobTypes.DataExport) async job(job: Job) { const { context, diff --git a/packages/nocodb/src/modules/jobs/jobs/export-import/duplicate.processor.ts b/packages/nocodb/src/modules/jobs/jobs/export-import/duplicate.processor.ts index 4da5d905ed..6d5be8f364 100644 --- a/packages/nocodb/src/modules/jobs/jobs/export-import/duplicate.processor.ts +++ b/packages/nocodb/src/modules/jobs/jobs/export-import/duplicate.processor.ts @@ -1,9 +1,9 @@ import { Readable } from 'stream'; -import { Process, Processor } from '@nestjs/bull'; -import { Job } from 'bull'; import papaparse from 'papaparse'; import debug from 'debug'; import { isLinksOrLTAR, isVirtualCol, RelationTypes } from 'nocodb-sdk'; +import { Injectable } from '@nestjs/common'; +import type { Job } from 'bull'; import type { NcContext } from '~/interface/config'; import type { DuplicateBaseJobData, @@ -18,12 +18,12 @@ import { } from '~/helpers/exportImportHelpers'; import { BulkDataAliasService } from '~/services/bulk-data-alias.service'; import { ColumnsService } from '~/services/columns.service'; -import { JOBS_QUEUE, JobTypes } from '~/interface/Jobs'; +import { JobTypes } from '~/interface/Jobs'; import { elapsedTime, initTime } from '~/modules/jobs/helpers'; import { ExportService } from '~/modules/jobs/jobs/export-import/export.service'; import { ImportService } from '~/modules/jobs/jobs/export-import/import.service'; -@Processor(JOBS_QUEUE) +@Injectable() export class DuplicateProcessor { private readonly debugLog = debug('nc:jobs:duplicate'); @@ -35,7 +35,6 @@ export class DuplicateProcessor { private readonly columnsService: ColumnsService, ) {} - @Process(JobTypes.DuplicateBase) async duplicateBase(job: Job) { this.debugLog(`job started for ${job.id} (${JobTypes.DuplicateBase})`); @@ -146,7 +145,6 @@ export class DuplicateProcessor { return { id: dupProject.id }; } - @Process(JobTypes.DuplicateModel) async duplicateModel(job: Job) { this.debugLog(`job started for ${job.id} (${JobTypes.DuplicateModel})`); @@ -258,7 +256,6 @@ export class DuplicateProcessor { return { id: findWithIdentifier(idMap, sourceModel.id) }; } - @Process(JobTypes.DuplicateColumn) async duplicateColumn(job: Job) { this.debugLog(`job started for ${job.id} (${JobTypes.DuplicateColumn})`); diff --git a/packages/nocodb/src/modules/jobs/jobs/meta-sync/meta-sync.processor.ts b/packages/nocodb/src/modules/jobs/jobs/meta-sync/meta-sync.processor.ts index 56f09bfe21..504f7bfa2a 100644 --- a/packages/nocodb/src/modules/jobs/jobs/meta-sync/meta-sync.processor.ts +++ b/packages/nocodb/src/modules/jobs/jobs/meta-sync/meta-sync.processor.ts @@ -1,17 +1,15 @@ import debug from 'debug'; -import { Process, Processor } from '@nestjs/bull'; -import { Job } from 'bull'; +import { Injectable } from '@nestjs/common'; +import type { Job } from 'bull'; import type { NcContext, NcRequest } from '~/interface/config'; -import { JOBS_QUEUE, JobTypes } from '~/interface/Jobs'; import { MetaDiffsService } from '~/services/meta-diffs.service'; -@Processor(JOBS_QUEUE) +@Injectable() export class MetaSyncProcessor { private readonly debugLog = debug('nc:jobs:meta-sync'); constructor(private readonly metaDiffsService: MetaDiffsService) {} - @Process(JobTypes.MetaSync) async job(job: Job) { this.debugLog(`job started for ${job.id}`); diff --git a/packages/nocodb/src/modules/jobs/jobs/source-create/source-create.processor.ts b/packages/nocodb/src/modules/jobs/jobs/source-create/source-create.processor.ts index e80cf83763..cad48aca06 100644 --- a/packages/nocodb/src/modules/jobs/jobs/source-create/source-create.processor.ts +++ b/packages/nocodb/src/modules/jobs/jobs/source-create/source-create.processor.ts @@ -1,11 +1,10 @@ import debug from 'debug'; -import { Process, Processor } from '@nestjs/bull'; -import { Job } from 'bull'; -import { JOBS_QUEUE, JobTypes } from '~/interface/Jobs'; +import { Injectable } from '@nestjs/common'; +import type { Job } from 'bull'; import { SourcesService } from '~/services/sources.service'; import { JobsLogService } from '~/modules/jobs/jobs/jobs-log.service'; -@Processor(JOBS_QUEUE) +@Injectable() export class SourceCreateProcessor { private readonly debugLog = debug('nc:jobs:source-create'); @@ -14,7 +13,6 @@ export class SourceCreateProcessor { private readonly jobsLogService: JobsLogService, ) {} - @Process(JobTypes.SourceCreate) async job(job: Job) { this.debugLog(`job started for ${job.id}`); diff --git a/packages/nocodb/src/modules/jobs/jobs/source-delete/source-delete.processor.ts b/packages/nocodb/src/modules/jobs/jobs/source-delete/source-delete.processor.ts index fd2b2539ee..2335c88a66 100644 --- a/packages/nocodb/src/modules/jobs/jobs/source-delete/source-delete.processor.ts +++ b/packages/nocodb/src/modules/jobs/jobs/source-delete/source-delete.processor.ts @@ -1,16 +1,14 @@ import debug from 'debug'; -import { Process, Processor } from '@nestjs/bull'; -import { Job } from 'bull'; -import { JOBS_QUEUE, JobTypes } from '~/interface/Jobs'; +import { Injectable } from '@nestjs/common'; +import type { Job } from 'bull'; import { SourcesService } from '~/services/sources.service'; -@Processor(JOBS_QUEUE) +@Injectable() export class SourceDeleteProcessor { private readonly debugLog = debug('nc:jobs:source-delete'); constructor(private readonly sourcesService: SourcesService) {} - @Process(JobTypes.SourceDelete) async job(job: Job) { this.debugLog(`job started for ${job.id}`); diff --git a/packages/nocodb/src/modules/jobs/jobs/thumbnail-generator/thumbnail-generator.processor.ts b/packages/nocodb/src/modules/jobs/jobs/thumbnail-generator/thumbnail-generator.processor.ts index 199f54ccee..c6d41f8cb4 100644 --- a/packages/nocodb/src/modules/jobs/jobs/thumbnail-generator/thumbnail-generator.processor.ts +++ b/packages/nocodb/src/modules/jobs/jobs/thumbnail-generator/thumbnail-generator.processor.ts @@ -1,24 +1,18 @@ import path from 'path'; import { Readable } from 'stream'; -import { Process, Processor } from '@nestjs/bull'; -import { Job } from 'bull'; import { Logger } from '@nestjs/common'; import slash from 'slash'; import type { IStorageAdapterV2 } from '~/types/nc-plugin'; +import type { Job } from 'bull'; import type { AttachmentResType } from 'nocodb-sdk'; import type { ThumbnailGeneratorJobData } from '~/interface/Jobs'; import type Sharp from 'sharp'; -import { JOBS_QUEUE, JobTypes } from '~/interface/Jobs'; import NcPluginMgrv2 from '~/helpers/NcPluginMgrv2'; import { getPathFromUrl } from '~/helpers/attachmentHelpers'; -@Processor(JOBS_QUEUE) export class ThumbnailGeneratorProcessor { - constructor() {} - private logger = new Logger(ThumbnailGeneratorProcessor.name); - @Process(JobTypes.ThumbnailGenerator) async job(job: Job) { const { attachments } = job.data; diff --git a/packages/nocodb/src/modules/jobs/jobs/webhook-handler/webhook-handler.processor.ts b/packages/nocodb/src/modules/jobs/jobs/webhook-handler/webhook-handler.processor.ts index dc51844619..36b3cf1966 100644 --- a/packages/nocodb/src/modules/jobs/jobs/webhook-handler/webhook-handler.processor.ts +++ b/packages/nocodb/src/modules/jobs/jobs/webhook-handler/webhook-handler.processor.ts @@ -1,21 +1,12 @@ -import { Process, Processor } from '@nestjs/bull'; import { Logger } from '@nestjs/common'; -import { Job } from 'bull'; +import type { Job } from 'bull'; import { invokeWebhook } from '~/helpers/webhookHelpers'; import { Hook, Model, View } from '~/models'; -import { - type HandleWebhookJobData, - JOBS_QUEUE, - JobTypes, -} from '~/interface/Jobs'; +import { type HandleWebhookJobData } from '~/interface/Jobs'; -@Processor(JOBS_QUEUE) export class WebhookHandlerProcessor { private logger = new Logger(WebhookHandlerProcessor.name); - constructor() {} - - @Process(JobTypes.HandleWebhook) async job(job: Job) { const { context, hookId, modelId, viewId, prevData, newData, user } = job.data; diff --git a/packages/nocodb/src/modules/jobs/migration-jobs/init-migration-jobs.ts b/packages/nocodb/src/modules/jobs/migration-jobs/init-migration-jobs.ts index e2befcd22c..2ab96bedca 100644 --- a/packages/nocodb/src/modules/jobs/migration-jobs/init-migration-jobs.ts +++ b/packages/nocodb/src/modules/jobs/migration-jobs/init-migration-jobs.ts @@ -1,8 +1,7 @@ import debug from 'debug'; -import { Process, Processor } from '@nestjs/bull'; -import { Job } from 'bull'; -import { forwardRef, Inject } from '@nestjs/common'; -import { JOBS_QUEUE, MigrationJobTypes } from '~/interface/Jobs'; +import { forwardRef, Inject, Injectable } from '@nestjs/common'; +import type { Job } from 'bull'; +import { JobTypes, MigrationJobTypes } from '~/interface/Jobs'; import { IJobsService } from '~/modules/jobs/jobs-service.interface'; import { AttachmentMigration } from '~/modules/jobs/migration-jobs/nc_job_001_attachment'; import { ThumbnailMigration } from '~/modules/jobs/migration-jobs/nc_job_002_thumbnail'; @@ -13,7 +12,7 @@ import { updateMigrationJobsState, } from '~/helpers/migrationJobs'; -@Processor(JOBS_QUEUE) +@Injectable() export class InitMigrationJobs { migrationJobsList = [ { @@ -41,7 +40,6 @@ export class InitMigrationJobs { console.log('[init-migration-jobs]: ', ...msgs); }; - @Process(MigrationJobTypes.InitMigrationJobs) async job(job: Job) { this.debugLog(`job started for ${job.id}`); @@ -67,7 +65,7 @@ export class InitMigrationJobs { // migration job is running, make sure it's not stalled by checking after 10 mins // stall check is updated every 5 mins setTimeout(() => { - this.jobsService.add(MigrationJobTypes.InitMigrationJobs, {}); + this.jobsService.add(JobTypes.InitMigrationJobs, {}); }, 10 * 60 * 1000); return; } @@ -122,7 +120,7 @@ export class InitMigrationJobs { // run the job again if successful if (migrated) { - await this.jobsService.add(MigrationJobTypes.InitMigrationJobs, {}); + await this.jobsService.add(JobTypes.InitMigrationJobs, {}); } else { this.log('A migration job failed!'); } diff --git a/packages/nocodb/src/modules/jobs/migration-jobs/nc_job_002_thumbnail.ts b/packages/nocodb/src/modules/jobs/migration-jobs/nc_job_002_thumbnail.ts index 41931dd059..e59094dca9 100644 --- a/packages/nocodb/src/modules/jobs/migration-jobs/nc_job_002_thumbnail.ts +++ b/packages/nocodb/src/modules/jobs/migration-jobs/nc_job_002_thumbnail.ts @@ -1,8 +1,6 @@ import path from 'path'; import debug from 'debug'; -import { Process } from '@nestjs/bull'; import { Injectable } from '@nestjs/common'; -import { MigrationJobTypes } from '~/interface/Jobs'; import NcPluginMgrv2 from '~/helpers/NcPluginMgrv2'; import Noco from '~/Noco'; import mimetypes from '~/utils/mimeTypes'; @@ -22,7 +20,6 @@ export class ThumbnailMigration { console.log('[nc_job_002_thumbnail]: ', ...msgs); }; - @Process(MigrationJobTypes.Thumbnail) async job() { try { const ncMeta = Noco.ncMeta; diff --git a/packages/nocodb/src/modules/jobs/redis/jobs.service.ts b/packages/nocodb/src/modules/jobs/redis/jobs.service.ts index 1ed136fdb0..83c4aac83f 100644 --- a/packages/nocodb/src/modules/jobs/redis/jobs.service.ts +++ b/packages/nocodb/src/modules/jobs/redis/jobs.service.ts @@ -1,12 +1,14 @@ import { InjectQueue } from '@nestjs/bull'; import { Injectable, Logger } from '@nestjs/common'; import { Queue } from 'bull'; +import type { JobOptions } from 'bull'; import type { OnModuleInit } from '@nestjs/common'; import { InstanceCommands, JOBS_QUEUE, JobStatus, - MigrationJobTypes, + JobTypes, + JobVersions, } from '~/interface/Jobs'; import { JobsRedis } from '~/modules/jobs/redis/jobs-redis'; import { Job } from '~/models'; @@ -35,7 +37,7 @@ export class JobsService implements OnModuleInit { await this.jobsQueue.pause(true); }; - await this.add(MigrationJobTypes.InitMigrationJobs, {}); + await this.add(JobTypes.InitMigrationJobs, {}); } async toggleQueue() { @@ -57,7 +59,7 @@ export class JobsService implements OnModuleInit { } } - async add(name: string, data: any) { + async add(name: string, data: any, options?: JobOptions) { await this.toggleQueue(); const context = { @@ -66,15 +68,38 @@ export class JobsService implements OnModuleInit { ...(data?.context || {}), }; - const jobData = await Job.insert(context, { - job: name, - status: JobStatus.WAITING, - fk_user_id: data?.user?.id, - }); + let jobData; + + if (options?.jobId) { + const existingJob = await Job.get(context, options.jobId); + if (existingJob) { + jobData = existingJob; + + if (existingJob.status !== JobStatus.WAITING) { + await Job.update(context, existingJob.id, { + status: JobStatus.WAITING, + }); + } + } + } + + if (!jobData) { + jobData = await Job.insert(context, { + job: name, + status: JobStatus.WAITING, + fk_user_id: data?.user?.id, + }); + } + + data.jobName = name; + + if (JobVersions?.[name]) { + data._jobVersion = JobVersions[name]; + } - await this.jobsQueue.add(name, data, { + await this.jobsQueue.add(data, { jobId: jobData.id, - removeOnComplete: true, + ...options, }); return jobData;