Browse Source

feat: improve queue logic (#9255)

* feat: improve worker logic

* feat: job requeue & version handling
pull/9273/head
Mert E. 3 months ago committed by GitHub
parent
commit
b646aa2bbd
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 17
      packages/nocodb/src/interface/Jobs.ts
  2. 2
      packages/nocodb/src/meta/meta.service.ts
  3. 116
      packages/nocodb/src/modules/jobs/fallback/fallback-queue.service.ts
  4. 51
      packages/nocodb/src/modules/jobs/fallback/jobs.service.ts
  5. 9
      packages/nocodb/src/modules/jobs/jobs-event.service.ts
  6. 79
      packages/nocodb/src/modules/jobs/jobs-map.service.ts
  7. 3
      packages/nocodb/src/modules/jobs/jobs-service.interface.ts
  8. 18
      packages/nocodb/src/modules/jobs/jobs.controller.ts
  9. 10
      packages/nocodb/src/modules/jobs/jobs.module.ts
  10. 82
      packages/nocodb/src/modules/jobs/jobs.processor.ts
  11. 9
      packages/nocodb/src/modules/jobs/jobs/at-import/at-import.processor.ts
  12. 8
      packages/nocodb/src/modules/jobs/jobs/attachment-clean-up/attachment-clean-up.ts
  13. 10
      packages/nocodb/src/modules/jobs/jobs/data-export/data-export.processor.ts
  14. 11
      packages/nocodb/src/modules/jobs/jobs/export-import/duplicate.processor.ts
  15. 8
      packages/nocodb/src/modules/jobs/jobs/meta-sync/meta-sync.processor.ts
  16. 8
      packages/nocodb/src/modules/jobs/jobs/source-create/source-create.processor.ts
  17. 8
      packages/nocodb/src/modules/jobs/jobs/source-delete/source-delete.processor.ts
  18. 8
      packages/nocodb/src/modules/jobs/jobs/thumbnail-generator/thumbnail-generator.processor.ts
  19. 13
      packages/nocodb/src/modules/jobs/jobs/webhook-handler/webhook-handler.processor.ts
  20. 14
      packages/nocodb/src/modules/jobs/migration-jobs/init-migration-jobs.ts
  21. 3
      packages/nocodb/src/modules/jobs/migration-jobs/nc_job_002_thumbnail.ts
  22. 45
      packages/nocodb/src/modules/jobs/redis/jobs.service.ts

17
packages/nocodb/src/interface/Jobs.ts

@ -3,7 +3,6 @@ import type { NcContext, NcRequest } from '~/interface/config';
export const JOBS_QUEUE = 'jobs';
export enum MigrationJobTypes {
InitMigrationJobs = 'init-migration-jobs',
Attachment = 'attachment',
Thumbnail = 'thumbnail',
}
@ -25,6 +24,7 @@ export enum JobTypes {
DataExport = 'data-export',
ThumbnailGenerator = 'thumbnail-generator',
AttachmentCleanUp = 'attachment-clean-up',
InitMigrationJobs = 'init-migration-jobs',
}
export enum JobStatus {
@ -35,6 +35,7 @@ export enum JobStatus {
FAILED = 'failed',
PAUSED = 'paused',
REFRESH = 'refresh',
REQUEUED = 'requeued',
}
export enum JobEvents {
@ -42,6 +43,14 @@ export enum JobEvents {
LOG = 'job.log',
}
export const JobVersions: {
[key in JobTypes]?: number;
} = {};
export const JOB_REQUEUED = 'job.requeued';
export const JOB_REQUEUE_LIMIT = 10;
export const InstanceTypes = {
PRIMARY: `${process.env.NC_ENV ?? 'default'}-primary`,
WORKER: `${process.env.NC_ENV ?? 'default'}-worker`,
@ -55,6 +64,12 @@ export enum InstanceCommands {
}
export interface JobData {
// meta info
jobName: string;
_jobDelay?: number;
_jobAttempt?: number;
_jobVersion?: number;
// context
context: NcContext;
user: Partial<UserType>;
}

2
packages/nocodb/src/meta/meta.service.ts

@ -223,7 +223,7 @@ export class MetaService {
target: string,
data: any | any[],
ids: string[],
condition?: { [key: string]: any },
condition?: { [p: string]: any },
): Promise<any> {
if (Array.isArray(data) ? !data.length : !data) {
return [];

116
packages/nocodb/src/modules/jobs/fallback/fallback-queue.service.ts

@ -1,18 +1,9 @@
import { Injectable } from '@nestjs/common';
import { forwardRef, Inject, Injectable } from '@nestjs/common';
import PQueue from 'p-queue';
import Emittery from 'emittery';
import { DuplicateProcessor } from '~/modules/jobs/jobs/export-import/duplicate.processor';
import { AtImportProcessor } from '~/modules/jobs/jobs/at-import/at-import.processor';
import { MetaSyncProcessor } from '~/modules/jobs/jobs/meta-sync/meta-sync.processor';
import { SourceCreateProcessor } from '~/modules/jobs/jobs/source-create/source-create.processor';
import { SourceDeleteProcessor } from '~/modules/jobs/jobs/source-delete/source-delete.processor';
import { WebhookHandlerProcessor } from '~/modules/jobs/jobs/webhook-handler/webhook-handler.processor';
import { DataExportProcessor } from '~/modules/jobs/jobs/data-export/data-export.processor';
import { JobsEventService } from '~/modules/jobs/jobs-event.service';
import { JobStatus, JobTypes, MigrationJobTypes } from '~/interface/Jobs';
import { ThumbnailGeneratorProcessor } from '~/modules/jobs/jobs/thumbnail-generator/thumbnail-generator.processor';
import { AttachmentCleanUpProcessor } from '~/modules/jobs/jobs/attachment-clean-up/attachment-clean-up';
import { InitMigrationJobs } from '~/modules/jobs/migration-jobs/init-migration-jobs';
import { JobStatus } from '~/interface/Jobs';
import { JobsMap } from '~/modules/jobs/jobs-map.service';
export interface Job {
id: string;
@ -31,16 +22,7 @@ export class QueueService {
constructor(
protected readonly jobsEventService: JobsEventService,
protected readonly duplicateProcessor: DuplicateProcessor,
protected readonly atImportProcessor: AtImportProcessor,
protected readonly metaSyncProcessor: MetaSyncProcessor,
protected readonly sourceCreateProcessor: SourceCreateProcessor,
protected readonly sourceDeleteProcessor: SourceDeleteProcessor,
protected readonly webhookHandlerProcessor: WebhookHandlerProcessor,
protected readonly dataExportProcessor: DataExportProcessor,
protected readonly thumbnailGeneratorProcessor: ThumbnailGeneratorProcessor,
protected readonly attachmentCleanUpProcessor: AttachmentCleanUpProcessor,
protected readonly initMigrationJobs: InitMigrationJobs,
@Inject(forwardRef(() => JobsMap)) protected readonly jobsMap: JobsMap,
) {
this.emitter.on(JobStatus.ACTIVE, (data: { job: Job }) => {
const job = this.queueMemory.find((job) => job.id === data.job.id);
@ -69,64 +51,12 @@ export class QueueService {
});
}
jobMap = {
[JobTypes.DuplicateBase]: {
this: this.duplicateProcessor,
fn: this.duplicateProcessor.duplicateBase,
},
[JobTypes.DuplicateModel]: {
this: this.duplicateProcessor,
fn: this.duplicateProcessor.duplicateModel,
},
[JobTypes.DuplicateColumn]: {
this: this.duplicateProcessor,
fn: this.duplicateProcessor.duplicateColumn,
},
[JobTypes.AtImport]: {
this: this.atImportProcessor,
fn: this.atImportProcessor.job,
},
[JobTypes.MetaSync]: {
this: this.metaSyncProcessor,
fn: this.metaSyncProcessor.job,
},
[JobTypes.SourceCreate]: {
this: this.sourceCreateProcessor,
fn: this.sourceCreateProcessor.job,
},
[JobTypes.SourceDelete]: {
this: this.sourceDeleteProcessor,
fn: this.sourceDeleteProcessor.job,
},
[JobTypes.HandleWebhook]: {
this: this.webhookHandlerProcessor,
fn: this.webhookHandlerProcessor.job,
},
[JobTypes.DataExport]: {
this: this.dataExportProcessor,
fn: this.dataExportProcessor.job,
},
[JobTypes.ThumbnailGenerator]: {
this: this.thumbnailGeneratorProcessor,
fn: this.thumbnailGeneratorProcessor.job,
},
[JobTypes.AttachmentCleanUp]: {
this: this.attachmentCleanUpProcessor,
fn: this.attachmentCleanUpProcessor.job,
},
[MigrationJobTypes.InitMigrationJobs]: {
this: this.initMigrationJobs,
fn: this.initMigrationJobs.job,
},
};
async jobWrapper(job: Job) {
this.emitter.emit(JobStatus.ACTIVE, { job });
try {
const result = await this.jobMap[job.name].fn.apply(
this.jobMap[job.name].this,
[job],
);
const { this: processor, fn = 'job' } = this.jobsMap.jobs[job.name];
const result = await processor[fn](job);
this.emitter.emit(JobStatus.COMPLETED, { job, result });
} catch (error) {
this.emitter.emit(JobStatus.FAILED, { job, error });
@ -153,11 +83,35 @@ export class QueueService {
QueueService.queueIdCounter = index;
}
add(name: string, data: any, opts?: { jobId?: string }) {
add(name: string, data: any, opts?: { jobId?: string; delay?: number }) {
const id = opts?.jobId || `${this.queueIndex++}`;
const job = { id: `${id}`, name, status: JobStatus.WAITING, data };
this.queueMemory.push(job);
this.queue.add(() => this.jobWrapper(job));
const existingJob = this.queueMemory.find((q) => q.id === id);
let job;
if (existingJob) {
if (existingJob.status !== JobStatus.WAITING) {
existingJob.status = JobStatus.WAITING;
}
job = existingJob;
} else {
job = { id: `${id}`, name, status: JobStatus.WAITING, data };
}
if (opts?.delay) {
setTimeout(() => {
if (!existingJob) {
this.queueMemory.push(job);
}
this.queue.add(() => this.jobWrapper(job));
}, opts.delay);
} else {
if (!existingJob) {
this.queueMemory.push(job);
}
this.queue.add(() => this.jobWrapper(job));
}
return { id, name };
}

51
packages/nocodb/src/modules/jobs/fallback/jobs.service.ts

@ -1,7 +1,7 @@
import { Injectable } from '@nestjs/common';
import type { OnModuleInit } from '@nestjs/common';
import { QueueService } from '~/modules/jobs/fallback/fallback-queue.service';
import { JobStatus, MigrationJobTypes } from '~/interface/Jobs';
import { JobStatus, JobTypes, JobVersions } from '~/interface/Jobs';
import { Job } from '~/models';
import { RootScopes } from '~/utils/globals';
@ -10,23 +10,56 @@ export class JobsService implements OnModuleInit {
constructor(private readonly fallbackQueueService: QueueService) {}
async onModuleInit() {
await this.add(MigrationJobTypes.InitMigrationJobs, {});
await this.add(JobTypes.InitMigrationJobs, {});
}
async add(name: string, data: any) {
async add(
name: string,
data: any,
options?: {
jobId?: string;
delay?: number; // delay in ms
},
) {
const context = {
workspace_id: RootScopes.ROOT,
base_id: RootScopes.ROOT,
...(data?.context || {}),
};
const jobData = await Job.insert(context, {
job: name,
status: JobStatus.WAITING,
fk_user_id: data?.user?.id,
});
let jobData;
if (options?.jobId) {
const existingJob = await Job.get(context, options.jobId);
if (existingJob) {
jobData = existingJob;
if (existingJob.status !== JobStatus.WAITING) {
await Job.update(context, existingJob.id, {
status: JobStatus.WAITING,
});
}
}
}
if (!jobData) {
jobData = await Job.insert(context, {
job: name,
status: JobStatus.WAITING,
fk_user_id: data?.user?.id,
});
}
this.fallbackQueueService.add(name, data, { jobId: jobData.id });
data.jobName = name;
if (JobVersions?.[name]) {
data._jobVersion = JobVersions[name];
}
this.fallbackQueueService.add(name, data, {
jobId: jobData.id,
...options,
});
return jobData;
}

9
packages/nocodb/src/modules/jobs/jobs-event.service.ts

@ -81,6 +81,15 @@ export class JobsEventService {
@OnQueueCompleted()
onCompleted(job: BullJob, data: any) {
// If job was requeued, don't update the status
if (data === JobStatus.REQUEUED) {
this.eventEmitter.emit(JobEvents.STATUS, {
id: job.id.toString(),
status: JobStatus.REQUEUED,
});
return;
}
Job.update(
{
workspace_id: RootScopes.ROOT,

79
packages/nocodb/src/modules/jobs/jobs-map.service.ts

@ -0,0 +1,79 @@
import { Injectable } from '@nestjs/common';
import { DuplicateProcessor } from '~/modules/jobs/jobs/export-import/duplicate.processor';
import { AtImportProcessor } from '~/modules/jobs/jobs/at-import/at-import.processor';
import { MetaSyncProcessor } from '~/modules/jobs/jobs/meta-sync/meta-sync.processor';
import { SourceCreateProcessor } from '~/modules/jobs/jobs/source-create/source-create.processor';
import { SourceDeleteProcessor } from '~/modules/jobs/jobs/source-delete/source-delete.processor';
import { WebhookHandlerProcessor } from '~/modules/jobs/jobs/webhook-handler/webhook-handler.processor';
import { DataExportProcessor } from '~/modules/jobs/jobs/data-export/data-export.processor';
import { ThumbnailGeneratorProcessor } from '~/modules/jobs/jobs/thumbnail-generator/thumbnail-generator.processor';
import { AttachmentCleanUpProcessor } from '~/modules/jobs/jobs/attachment-clean-up/attachment-clean-up';
import { InitMigrationJobs } from '~/modules/jobs/migration-jobs/init-migration-jobs';
import { JobTypes } from '~/interface/Jobs';
@Injectable()
export class JobsMap {
constructor(
protected readonly duplicateProcessor: DuplicateProcessor,
protected readonly atImportProcessor: AtImportProcessor,
protected readonly metaSyncProcessor: MetaSyncProcessor,
protected readonly sourceCreateProcessor: SourceCreateProcessor,
protected readonly sourceDeleteProcessor: SourceDeleteProcessor,
protected readonly webhookHandlerProcessor: WebhookHandlerProcessor,
protected readonly dataExportProcessor: DataExportProcessor,
protected readonly thumbnailGeneratorProcessor: ThumbnailGeneratorProcessor,
protected readonly attachmentCleanUpProcessor: AttachmentCleanUpProcessor,
protected readonly initMigrationJobs: InitMigrationJobs,
) {}
protected _jobMap: {
[key in JobTypes]?: {
this: any;
fn?: string;
};
} = {
[JobTypes.DuplicateBase]: {
this: this.duplicateProcessor,
fn: 'duplicateBase',
},
[JobTypes.DuplicateModel]: {
this: this.duplicateProcessor,
fn: 'duplicateModel',
},
[JobTypes.DuplicateColumn]: {
this: this.duplicateProcessor,
fn: 'duplicateColumn',
},
[JobTypes.AtImport]: {
this: this.atImportProcessor,
},
[JobTypes.MetaSync]: {
this: this.metaSyncProcessor,
},
[JobTypes.SourceCreate]: {
this: this.sourceCreateProcessor,
},
[JobTypes.SourceDelete]: {
this: this.sourceDeleteProcessor,
},
[JobTypes.HandleWebhook]: {
this: this.webhookHandlerProcessor,
},
[JobTypes.DataExport]: {
this: this.dataExportProcessor,
},
[JobTypes.ThumbnailGenerator]: {
this: this.thumbnailGeneratorProcessor,
},
[JobTypes.AttachmentCleanUp]: {
this: this.attachmentCleanUpProcessor,
},
[JobTypes.InitMigrationJobs]: {
this: this.initMigrationJobs,
},
};
public get jobs() {
return this._jobMap;
}
}

3
packages/nocodb/src/modules/jobs/jobs-service.interface.ts

@ -1,10 +1,11 @@
import type { JobOptions } from 'bull';
import type Bull from 'bull';
import type { JobStatus } from '~/interface/Jobs';
export interface IJobsService {
jobsQueue: Bull.Queue;
toggleQueue(): Promise<void>;
add(name: string, data: any): Promise<Bull.Job<any>>;
add(name: string, data: any, options?: JobOptions): Promise<Bull.Job<any>>;
jobStatus(jobId: string): Promise<JobStatus>;
jobList(): Promise<Bull.Job<any>[]>;
resumeQueue(): Promise<void>;

18
packages/nocodb/src/modules/jobs/jobs.controller.ts

@ -159,6 +159,24 @@ export class JobsController {
const jobId = data.id;
// clean as it might be taken by another worker
if (data.status === JobStatus.REQUEUED) {
if (this.jobRooms[jobId]) {
this.jobRooms[jobId].listeners.forEach((res) => {
if (!res.headersSent) {
res.send({
status: 'refresh',
});
}
});
}
delete this.jobRooms[jobId];
delete this.localJobs[jobId];
await NocoCache.del(`${CacheScope.JOBS_POLLING}:${jobId}:messages`);
return;
}
if (this.localJobs[jobId]) {
response = {
status: 'update',

10
packages/nocodb/src/modules/jobs/jobs.module.ts

@ -21,6 +21,10 @@ import { DataExportController } from '~/modules/jobs/jobs/data-export/data-expor
import { ThumbnailGeneratorProcessor } from '~/modules/jobs/jobs/thumbnail-generator/thumbnail-generator.processor';
import { AttachmentCleanUpProcessor } from '~/modules/jobs/jobs/attachment-clean-up/attachment-clean-up';
// Job Processor
import { JobsProcessor } from '~/modules/jobs/jobs.processor';
import { JobsMap } from '~/modules/jobs/jobs-map.service';
// Migration Jobs
import { InitMigrationJobs } from '~/modules/jobs/migration-jobs/init-migration-jobs';
import { AttachmentMigration } from '~/modules/jobs/migration-jobs/nc_job_001_attachment';
@ -48,6 +52,9 @@ export const JobsModuleMetadata = {
}),
BullModule.registerQueue({
name: JOBS_QUEUE,
defaultJobOptions: {
removeOnComplete: true,
},
}),
]
: []),
@ -66,7 +73,7 @@ export const JobsModuleMetadata = {
: []),
],
providers: [
...(process.env.NC_WORKER_CONTAINER !== 'true' ? [] : []),
JobsMap,
JobsEventService,
...(process.env.NC_REDIS_JOB_URL ? [] : [FallbackQueueService]),
{
@ -76,6 +83,7 @@ export const JobsModuleMetadata = {
: FallbackJobsService,
},
JobsLogService,
JobsProcessor,
ExportService,
ImportService,
DuplicateProcessor,

82
packages/nocodb/src/modules/jobs/jobs.processor.ts

@ -0,0 +1,82 @@
import { Process, Processor } from '@nestjs/bull';
import { Inject, Logger } from '@nestjs/common';
import { Job } from 'bull';
import type { JobData } from '~/interface/Jobs';
import { JOB_REQUEUE_LIMIT, JOBS_QUEUE, JobVersions } from '~/interface/Jobs';
import { IJobsService } from '~/modules/jobs/jobs-service.interface';
import { JobsMap } from '~/modules/jobs/jobs-map.service';
import { JobsEventService } from '~/modules/jobs/jobs-event.service';
import { JobStatus } from '~/interface/Jobs';
const NC_WORKER_CONCURRENCY = process.env.NC_WORKER_CONCURRENCY ?? 5;
@Processor(JOBS_QUEUE)
export class JobsProcessor {
private logger = new Logger(JobsProcessor.name);
constructor(
@Inject('JobsService') protected readonly jobsService: IJobsService,
protected readonly jobsEventService: JobsEventService,
protected readonly jobsMap: JobsMap,
) {}
@Process({
concurrency: +NC_WORKER_CONCURRENCY,
})
async process(job: Job<JobData>) {
const { jobName } = job.data;
if (!this.jobsMap.jobs[jobName]) {
this.logger.error(`Job not found for ${jobName}`);
await this.requeue(job);
return;
}
const { this: processor, fn = 'job' } = this.jobsMap.jobs[jobName];
if (!processor[fn]) {
this.logger.error(`Job function not found for ${jobName}`);
await this.requeue(job);
return;
}
if (JobVersions[jobName] || job.data?._jobVersion) {
if (JobVersions[jobName] !== job.data._jobVersion) {
this.logger.error(`Job version mismatch for ${jobName}`);
await this.requeue(job);
return;
}
}
try {
return await processor[fn](job);
} catch (e) {
this.logger.error(`Error processing job ${jobName}`, e);
throw e;
}
}
async requeue(job: Job<JobData>) {
// Remove the job from the queue otherwise ids will clash
await job.releaseLock();
await job.remove();
await this.jobsEventService.onCompleted(job, JobStatus.REQUEUED);
const _jobDelay = job.data?._jobDelay ?? 0;
const _jobAttempt = job.data?._jobAttempt ?? 1;
if (_jobAttempt > JOB_REQUEUE_LIMIT) {
this.logger.error(`Job ${job.data.jobName} failed after 10 attempts`);
return;
}
job.data._jobDelay = _jobDelay + 5000;
job.data._jobAttempt = _jobAttempt + 1;
return this.jobsService.add(job.data.jobName, job.data, {
jobId: job.id.toString(),
delay: job.data._jobDelay,
});
}
}

9
packages/nocodb/src/modules/jobs/jobs/at-import/at-import.processor.ts

@ -5,15 +5,14 @@ import hash from 'object-hash';
import dayjs from 'dayjs';
import utc from 'dayjs/plugin/utc';
import tinycolor from 'tinycolor2';
import { Process, Processor } from '@nestjs/bull';
import { Job } from 'bull';
import { isLinksOrLTAR } from 'nocodb-sdk';
import debug from 'debug';
import { Logger } from '@nestjs/common';
import { Injectable, Logger } from '@nestjs/common';
import { JobsLogService } from '../jobs-log.service';
import FetchAT from './helpers/fetchAT';
import { importData } from './helpers/readAndProcessData';
import EntityMap from './helpers/EntityMap';
import type { Job } from 'bull';
import type { UserType } from 'nocodb-sdk';
import type { AtImportJobData } from '~/interface/Jobs';
import { type Base, Model, Source } from '~/models';
@ -32,7 +31,6 @@ import { TablesService } from '~/services/tables.service';
import { ViewColumnsService } from '~/services/view-columns.service';
import { ViewsService } from '~/services/views.service';
import { FormsService } from '~/services/forms.service';
import { JOBS_QUEUE, JobTypes } from '~/interface/Jobs';
import { GridColumnsService } from '~/services/grid-columns.service';
import { TelemetryService } from '~/services/telemetry.service';
import NcConnectionMgrv2 from '~/utils/common/NcConnectionMgrv2';
@ -88,7 +86,7 @@ const selectColors = {
grayDarker: '#444',
};
@Processor(JOBS_QUEUE)
@Injectable()
export class AtImportProcessor {
private readonly debugLog = debug('nc:jobs:at-import');
@ -112,7 +110,6 @@ export class AtImportProcessor {
private readonly telemetryService: TelemetryService,
) {}
@Process(JobTypes.AtImport)
async job(job: Job<AtImportJobData>) {
this.debugLog(`job started for ${job.id}`);

8
packages/nocodb/src/modules/jobs/jobs/attachment-clean-up/attachment-clean-up.ts

@ -1,8 +1,6 @@
import path from 'path';
import debug from 'debug';
import { Process, Processor } from '@nestjs/bull';
import { Job } from 'bull';
import { JOBS_QUEUE, JobTypes } from '~/interface/Jobs';
import type { Job } from 'bull';
import Noco from '~/Noco';
import { MetaTable } from '~/utils/globals';
import NcPluginMgrv2 from '~/helpers/NcPluginMgrv2';
@ -10,13 +8,9 @@ import { getPathFromUrl } from '~/helpers/attachmentHelpers';
const retentionDays = process.env.NC_ATTACHMENT_RETENTION_DAYS || 10;
@Processor(JOBS_QUEUE)
export class AttachmentCleanUpProcessor {
private readonly debugLog = debug('nc:jobs:attachment-clean-up');
constructor() {}
@Process(JobTypes.AttachmentCleanUp)
async job(job: Job) {
// if retentionDays is set to 0, clean up is disabled
if (+retentionDays === 0) {

10
packages/nocodb/src/modules/jobs/jobs/data-export/data-export.processor.ts

@ -1,10 +1,9 @@
import { Readable } from 'stream';
import path from 'path';
import { Process, Processor } from '@nestjs/bull';
import { Logger } from '@nestjs/common';
import { Job } from 'bull';
import { Injectable, Logger } from '@nestjs/common';
import moment from 'moment';
import { type DataExportJobData, JOBS_QUEUE, JobTypes } from '~/interface/Jobs';
import type { Job } from 'bull';
import { type DataExportJobData } from '~/interface/Jobs';
import { elapsedTime, initTime } from '~/modules/jobs/helpers';
import { ExportService } from '~/modules/jobs/jobs/export-import/export.service';
import { Model, PresignedUrl, View } from '~/models';
@ -15,13 +14,12 @@ function getViewTitle(view: View) {
return view?.is_default ? 'Default View' : view?.title;
}
@Processor(JOBS_QUEUE)
@Injectable()
export class DataExportProcessor {
private logger = new Logger(DataExportProcessor.name);
constructor(private readonly exportService: ExportService) {}
@Process(JobTypes.DataExport)
async job(job: Job<DataExportJobData>) {
const {
context,

11
packages/nocodb/src/modules/jobs/jobs/export-import/duplicate.processor.ts

@ -1,9 +1,9 @@
import { Readable } from 'stream';
import { Process, Processor } from '@nestjs/bull';
import { Job } from 'bull';
import papaparse from 'papaparse';
import debug from 'debug';
import { isLinksOrLTAR, isVirtualCol, RelationTypes } from 'nocodb-sdk';
import { Injectable } from '@nestjs/common';
import type { Job } from 'bull';
import type { NcContext } from '~/interface/config';
import type {
DuplicateBaseJobData,
@ -18,12 +18,12 @@ import {
} from '~/helpers/exportImportHelpers';
import { BulkDataAliasService } from '~/services/bulk-data-alias.service';
import { ColumnsService } from '~/services/columns.service';
import { JOBS_QUEUE, JobTypes } from '~/interface/Jobs';
import { JobTypes } from '~/interface/Jobs';
import { elapsedTime, initTime } from '~/modules/jobs/helpers';
import { ExportService } from '~/modules/jobs/jobs/export-import/export.service';
import { ImportService } from '~/modules/jobs/jobs/export-import/import.service';
@Processor(JOBS_QUEUE)
@Injectable()
export class DuplicateProcessor {
private readonly debugLog = debug('nc:jobs:duplicate');
@ -35,7 +35,6 @@ export class DuplicateProcessor {
private readonly columnsService: ColumnsService,
) {}
@Process(JobTypes.DuplicateBase)
async duplicateBase(job: Job<DuplicateBaseJobData>) {
this.debugLog(`job started for ${job.id} (${JobTypes.DuplicateBase})`);
@ -146,7 +145,6 @@ export class DuplicateProcessor {
return { id: dupProject.id };
}
@Process(JobTypes.DuplicateModel)
async duplicateModel(job: Job<DuplicateModelJobData>) {
this.debugLog(`job started for ${job.id} (${JobTypes.DuplicateModel})`);
@ -258,7 +256,6 @@ export class DuplicateProcessor {
return { id: findWithIdentifier(idMap, sourceModel.id) };
}
@Process(JobTypes.DuplicateColumn)
async duplicateColumn(job: Job<DuplicateColumnJobData>) {
this.debugLog(`job started for ${job.id} (${JobTypes.DuplicateColumn})`);

8
packages/nocodb/src/modules/jobs/jobs/meta-sync/meta-sync.processor.ts

@ -1,17 +1,15 @@
import debug from 'debug';
import { Process, Processor } from '@nestjs/bull';
import { Job } from 'bull';
import { Injectable } from '@nestjs/common';
import type { Job } from 'bull';
import type { NcContext, NcRequest } from '~/interface/config';
import { JOBS_QUEUE, JobTypes } from '~/interface/Jobs';
import { MetaDiffsService } from '~/services/meta-diffs.service';
@Processor(JOBS_QUEUE)
@Injectable()
export class MetaSyncProcessor {
private readonly debugLog = debug('nc:jobs:meta-sync');
constructor(private readonly metaDiffsService: MetaDiffsService) {}
@Process(JobTypes.MetaSync)
async job(job: Job) {
this.debugLog(`job started for ${job.id}`);

8
packages/nocodb/src/modules/jobs/jobs/source-create/source-create.processor.ts

@ -1,11 +1,10 @@
import debug from 'debug';
import { Process, Processor } from '@nestjs/bull';
import { Job } from 'bull';
import { JOBS_QUEUE, JobTypes } from '~/interface/Jobs';
import { Injectable } from '@nestjs/common';
import type { Job } from 'bull';
import { SourcesService } from '~/services/sources.service';
import { JobsLogService } from '~/modules/jobs/jobs/jobs-log.service';
@Processor(JOBS_QUEUE)
@Injectable()
export class SourceCreateProcessor {
private readonly debugLog = debug('nc:jobs:source-create');
@ -14,7 +13,6 @@ export class SourceCreateProcessor {
private readonly jobsLogService: JobsLogService,
) {}
@Process(JobTypes.SourceCreate)
async job(job: Job) {
this.debugLog(`job started for ${job.id}`);

8
packages/nocodb/src/modules/jobs/jobs/source-delete/source-delete.processor.ts

@ -1,16 +1,14 @@
import debug from 'debug';
import { Process, Processor } from '@nestjs/bull';
import { Job } from 'bull';
import { JOBS_QUEUE, JobTypes } from '~/interface/Jobs';
import { Injectable } from '@nestjs/common';
import type { Job } from 'bull';
import { SourcesService } from '~/services/sources.service';
@Processor(JOBS_QUEUE)
@Injectable()
export class SourceDeleteProcessor {
private readonly debugLog = debug('nc:jobs:source-delete');
constructor(private readonly sourcesService: SourcesService) {}
@Process(JobTypes.SourceDelete)
async job(job: Job) {
this.debugLog(`job started for ${job.id}`);

8
packages/nocodb/src/modules/jobs/jobs/thumbnail-generator/thumbnail-generator.processor.ts

@ -1,24 +1,18 @@
import path from 'path';
import { Readable } from 'stream';
import { Process, Processor } from '@nestjs/bull';
import { Job } from 'bull';
import { Logger } from '@nestjs/common';
import slash from 'slash';
import type { IStorageAdapterV2 } from '~/types/nc-plugin';
import type { Job } from 'bull';
import type { AttachmentResType } from 'nocodb-sdk';
import type { ThumbnailGeneratorJobData } from '~/interface/Jobs';
import type Sharp from 'sharp';
import { JOBS_QUEUE, JobTypes } from '~/interface/Jobs';
import NcPluginMgrv2 from '~/helpers/NcPluginMgrv2';
import { getPathFromUrl } from '~/helpers/attachmentHelpers';
@Processor(JOBS_QUEUE)
export class ThumbnailGeneratorProcessor {
constructor() {}
private logger = new Logger(ThumbnailGeneratorProcessor.name);
@Process(JobTypes.ThumbnailGenerator)
async job(job: Job<ThumbnailGeneratorJobData>) {
const { attachments } = job.data;

13
packages/nocodb/src/modules/jobs/jobs/webhook-handler/webhook-handler.processor.ts

@ -1,21 +1,12 @@
import { Process, Processor } from '@nestjs/bull';
import { Logger } from '@nestjs/common';
import { Job } from 'bull';
import type { Job } from 'bull';
import { invokeWebhook } from '~/helpers/webhookHelpers';
import { Hook, Model, View } from '~/models';
import {
type HandleWebhookJobData,
JOBS_QUEUE,
JobTypes,
} from '~/interface/Jobs';
import { type HandleWebhookJobData } from '~/interface/Jobs';
@Processor(JOBS_QUEUE)
export class WebhookHandlerProcessor {
private logger = new Logger(WebhookHandlerProcessor.name);
constructor() {}
@Process(JobTypes.HandleWebhook)
async job(job: Job<HandleWebhookJobData>) {
const { context, hookId, modelId, viewId, prevData, newData, user } =
job.data;

14
packages/nocodb/src/modules/jobs/migration-jobs/init-migration-jobs.ts

@ -1,8 +1,7 @@
import debug from 'debug';
import { Process, Processor } from '@nestjs/bull';
import { Job } from 'bull';
import { forwardRef, Inject } from '@nestjs/common';
import { JOBS_QUEUE, MigrationJobTypes } from '~/interface/Jobs';
import { forwardRef, Inject, Injectable } from '@nestjs/common';
import type { Job } from 'bull';
import { JobTypes, MigrationJobTypes } from '~/interface/Jobs';
import { IJobsService } from '~/modules/jobs/jobs-service.interface';
import { AttachmentMigration } from '~/modules/jobs/migration-jobs/nc_job_001_attachment';
import { ThumbnailMigration } from '~/modules/jobs/migration-jobs/nc_job_002_thumbnail';
@ -13,7 +12,7 @@ import {
updateMigrationJobsState,
} from '~/helpers/migrationJobs';
@Processor(JOBS_QUEUE)
@Injectable()
export class InitMigrationJobs {
migrationJobsList = [
{
@ -41,7 +40,6 @@ export class InitMigrationJobs {
console.log('[init-migration-jobs]: ', ...msgs);
};
@Process(MigrationJobTypes.InitMigrationJobs)
async job(job: Job) {
this.debugLog(`job started for ${job.id}`);
@ -67,7 +65,7 @@ export class InitMigrationJobs {
// migration job is running, make sure it's not stalled by checking after 10 mins
// stall check is updated every 5 mins
setTimeout(() => {
this.jobsService.add(MigrationJobTypes.InitMigrationJobs, {});
this.jobsService.add(JobTypes.InitMigrationJobs, {});
}, 10 * 60 * 1000);
return;
}
@ -122,7 +120,7 @@ export class InitMigrationJobs {
// run the job again if successful
if (migrated) {
await this.jobsService.add(MigrationJobTypes.InitMigrationJobs, {});
await this.jobsService.add(JobTypes.InitMigrationJobs, {});
} else {
this.log('A migration job failed!');
}

3
packages/nocodb/src/modules/jobs/migration-jobs/nc_job_002_thumbnail.ts

@ -1,8 +1,6 @@
import path from 'path';
import debug from 'debug';
import { Process } from '@nestjs/bull';
import { Injectable } from '@nestjs/common';
import { MigrationJobTypes } from '~/interface/Jobs';
import NcPluginMgrv2 from '~/helpers/NcPluginMgrv2';
import Noco from '~/Noco';
import mimetypes from '~/utils/mimeTypes';
@ -22,7 +20,6 @@ export class ThumbnailMigration {
console.log('[nc_job_002_thumbnail]: ', ...msgs);
};
@Process(MigrationJobTypes.Thumbnail)
async job() {
try {
const ncMeta = Noco.ncMeta;

45
packages/nocodb/src/modules/jobs/redis/jobs.service.ts

@ -1,12 +1,14 @@
import { InjectQueue } from '@nestjs/bull';
import { Injectable, Logger } from '@nestjs/common';
import { Queue } from 'bull';
import type { JobOptions } from 'bull';
import type { OnModuleInit } from '@nestjs/common';
import {
InstanceCommands,
JOBS_QUEUE,
JobStatus,
MigrationJobTypes,
JobTypes,
JobVersions,
} from '~/interface/Jobs';
import { JobsRedis } from '~/modules/jobs/redis/jobs-redis';
import { Job } from '~/models';
@ -35,7 +37,7 @@ export class JobsService implements OnModuleInit {
await this.jobsQueue.pause(true);
};
await this.add(MigrationJobTypes.InitMigrationJobs, {});
await this.add(JobTypes.InitMigrationJobs, {});
}
async toggleQueue() {
@ -57,7 +59,7 @@ export class JobsService implements OnModuleInit {
}
}
async add(name: string, data: any) {
async add(name: string, data: any, options?: JobOptions) {
await this.toggleQueue();
const context = {
@ -66,15 +68,38 @@ export class JobsService implements OnModuleInit {
...(data?.context || {}),
};
const jobData = await Job.insert(context, {
job: name,
status: JobStatus.WAITING,
fk_user_id: data?.user?.id,
});
let jobData;
if (options?.jobId) {
const existingJob = await Job.get(context, options.jobId);
if (existingJob) {
jobData = existingJob;
if (existingJob.status !== JobStatus.WAITING) {
await Job.update(context, existingJob.id, {
status: JobStatus.WAITING,
});
}
}
}
if (!jobData) {
jobData = await Job.insert(context, {
job: name,
status: JobStatus.WAITING,
fk_user_id: data?.user?.id,
});
}
data.jobName = name;
if (JobVersions?.[name]) {
data._jobVersion = JobVersions[name];
}
await this.jobsQueue.add(name, data, {
await this.jobsQueue.add(data, {
jobId: jobData.id,
removeOnComplete: true,
...options,
});
return jobData;

Loading…
Cancel
Save