Browse Source

feat: improved jobs implementation

* separated fallback and redis solutions

* revised folder structure for jobs

Signed-off-by: mertmit <mertmit99@gmail.com>
pull/5711/head
mertmit 2 years ago
parent
commit
a01380c73f
  1. 8
      packages/nocodb/src/app.module.ts
  2. 3
      packages/nocodb/src/main.ts
  3. 8
      packages/nocodb/src/modules/jobs/fallback/fallback-queue.service.ts
  4. 9
      packages/nocodb/src/modules/jobs/fallback/jobs-event.service.ts
  5. 28
      packages/nocodb/src/modules/jobs/fallback/jobs.service.ts
  6. 6
      packages/nocodb/src/modules/jobs/jobs.gateway.ts
  7. 47
      packages/nocodb/src/modules/jobs/jobs.module.ts
  8. 22
      packages/nocodb/src/modules/jobs/jobs/at-import/at-import.controller.ts
  9. 40
      packages/nocodb/src/modules/jobs/jobs/at-import/at-import.processor.ts
  10. 0
      packages/nocodb/src/modules/jobs/jobs/at-import/helpers/EntityMap.ts
  11. 0
      packages/nocodb/src/modules/jobs/jobs/at-import/helpers/fetchAT.ts
  12. 4
      packages/nocodb/src/modules/jobs/jobs/at-import/helpers/readAndProcessData.ts
  13. 0
      packages/nocodb/src/modules/jobs/jobs/at-import/helpers/syncMap.ts
  14. 16
      packages/nocodb/src/modules/jobs/jobs/export-import/duplicate.controller.ts
  15. 12
      packages/nocodb/src/modules/jobs/jobs/export-import/duplicate.processor.ts
  16. 20
      packages/nocodb/src/modules/jobs/jobs/export-import/export.service.ts
  17. 40
      packages/nocodb/src/modules/jobs/jobs/export-import/import.service.ts
  18. 16
      packages/nocodb/src/modules/jobs/jobs/jobs-log.service.ts
  19. 71
      packages/nocodb/src/modules/jobs/redis/jobs-event.service.ts
  20. 53
      packages/nocodb/src/modules/jobs/redis/jobs-redis.service.ts
  21. 90
      packages/nocodb/src/modules/jobs/redis/jobs.service.ts
  22. 1
      packages/nocodb/src/modules/metas/metas.module.ts
  23. 78
      packages/nocodb/src/worker.module.ts

8
packages/nocodb/src/app.module.ts

@ -1,6 +1,5 @@
import { Module, RequestMethod } from '@nestjs/common';
import { APP_FILTER } from '@nestjs/core';
import { BullModule } from '@nestjs/bull';
import { EventEmitterModule as NestJsEventEmitter } from '@nestjs/event-emitter';
import { GlobalExceptionFilter } from './filters/global-exception/global-exception.filter';
import { GlobalMiddleware } from './middlewares/global/global.middleware';
@ -30,13 +29,6 @@ import type { MiddlewareConsumer } from '@nestjs/common';
EventEmitterModule,
JobsModule,
NestJsEventEmitter.forRoot(),
...(process.env['NC_REDIS_URL']
? [
BullModule.forRoot({
url: process.env.NC_REDIS_URL,
}),
]
: []),
],
controllers: [],
providers: [

3
packages/nocodb/src/main.ts

@ -2,7 +2,6 @@ import { NestFactory } from '@nestjs/core';
import cors from 'cors';
import express from 'express';
import { AppModule } from './app.module';
import { AppModule as WorkerModule } from './worker.module';
async function bootstrap() {
const app = await NestFactory.create(AppModule);
@ -20,7 +19,7 @@ async function bootstrap() {
if (!process.env['NC_REDIS_URL']) {
throw new Error('NC_REDIS_URL is required');
}
const app = await NestFactory.create(WorkerModule);
const app = await NestFactory.create(AppModule);
await app.init();
}
}

8
packages/nocodb/src/modules/jobs/fallback-queue.service.ts → packages/nocodb/src/modules/jobs/fallback/fallback-queue.service.ts

@ -1,12 +1,12 @@
import { Injectable } from '@nestjs/common';
import PQueue from 'p-queue';
import Emittery from 'emittery';
import { JobStatus, JobTypes } from '../../interface/Jobs';
import { DuplicateProcessor } from './export-import/duplicate.processor';
import { JobStatus, JobTypes } from '../../../interface/Jobs';
import { DuplicateProcessor } from '../jobs/export-import/duplicate.processor';
import { AtImportProcessor } from '../jobs/at-import/at-import.processor';
import { JobsEventService } from './jobs-event.service';
import { AtImportProcessor } from './at-import/at-import.processor';
interface Job {
export interface Job {
id: string;
name: string;
status: string;

9
packages/nocodb/src/modules/jobs/jobs-event.service.ts → packages/nocodb/src/modules/jobs/fallback/jobs-event.service.ts

@ -7,7 +7,7 @@ import {
import { Job } from 'bull';
import boxen from 'boxen';
import { EventEmitter2 } from '@nestjs/event-emitter';
import { JobEvents, JOBS_QUEUE, JobStatus } from '../../interface/Jobs';
import { JobEvents, JOBS_QUEUE, JobStatus } from '../../../interface/Jobs';
@Processor(JOBS_QUEUE)
export class JobsEventService {
@ -55,11 +55,4 @@ export class JobsEventService {
},
});
}
sendLog(job: Job, data: { message: string }) {
this.eventEmitter.emit(JobEvents.LOG, {
id: job.id.toString(),
data,
});
}
}

28
packages/nocodb/src/modules/jobs/jobs.service.ts → packages/nocodb/src/modules/jobs/fallback/jobs.service.ts

@ -1,35 +1,23 @@
import { InjectQueue } from '@nestjs/bull';
import { Injectable } from '@nestjs/common';
import { Queue } from 'bull';
import { JOBS_QUEUE, JobStatus } from '../../interface/Jobs';
import Noco from '../../Noco';
import { JobStatus } from '../../../interface/Jobs';
import { QueueService } from './fallback-queue.service';
@Injectable()
export class JobsService {
public activeQueue;
constructor(
@InjectQueue(JOBS_QUEUE) private readonly jobsQueue: Queue,
private readonly fallbackQueueService: QueueService,
) {
this.activeQueue = process.env['NC_REDIS_URL']
? this.jobsQueue
: this.fallbackQueueService;
if (process.env['NC_REDIS_URL'] && !process.env['NC_WORKER_CONTAINER']) {
this.jobsQueue.pause(true);
}
}
constructor(private readonly fallbackQueueService: QueueService) {}
async add(name: string, data: any) {
return this.activeQueue.add(name, data);
return this.fallbackQueueService.add(name, data);
}
async jobStatus(jobId: string) {
return await (await this.activeQueue.getJob(jobId)).getState();
return await (
await this.fallbackQueueService.getJob(jobId)
).status;
}
async jobList() {
return await this.activeQueue.getJobs([
return await this.fallbackQueueService.getJobs([
JobStatus.ACTIVE,
JobStatus.WAITING,
JobStatus.DELAYED,
@ -38,7 +26,7 @@ export class JobsService {
}
async getJobWithData(data: any) {
const jobs = await this.activeQueue.getJobs([
const jobs = await this.fallbackQueueService.getJobs([
// 'completed',
JobStatus.WAITING,
JobStatus.ACTIVE,

6
packages/nocodb/src/modules/jobs/jobs.gateway.ts

@ -9,10 +9,10 @@ import { Server, Socket } from 'socket.io';
import { ExecutionContextHost } from '@nestjs/core/helpers/execution-context-host';
import { AuthGuard } from '@nestjs/passport';
import { OnEvent } from '@nestjs/event-emitter';
import { Inject } from '@nestjs/common';
import { JobEvents } from '../../interface/Jobs';
import { JobsService } from './jobs.service';
import type { JobStatus } from '../../interface/Jobs';
import type { OnModuleInit } from '@nestjs/common';
import type { JobStatus } from '../../interface/Jobs';
@WebSocketGateway({
cors: {
@ -23,7 +23,7 @@ import type { OnModuleInit } from '@nestjs/common';
namespace: 'jobs',
})
export class JobsGateway implements OnModuleInit {
constructor(private readonly jobsService: JobsService) {}
constructor(@Inject('JobsService') private readonly jobsService) {}
@WebSocketServer()
server: Server;

47
packages/nocodb/src/modules/jobs/jobs.module.ts

@ -4,25 +4,40 @@ import { GlobalModule } from '../global/global.module';
import { DatasModule } from '../datas/datas.module';
import { MetasModule } from '../metas/metas.module';
import { JOBS_QUEUE } from '../../interface/Jobs';
import { JobsService } from './jobs.service';
import { ExportService } from './export-import/export.service';
import { ImportService } from './export-import/import.service';
import { DuplicateController } from './export-import/duplicate.controller';
import { DuplicateProcessor } from './export-import/duplicate.processor';
import { ExportService } from './jobs/export-import/export.service';
import { ImportService } from './jobs/export-import/import.service';
import { AtImportController } from './jobs/at-import/at-import.controller';
import { AtImportProcessor } from './jobs/at-import/at-import.processor';
import { DuplicateController } from './jobs/export-import/duplicate.controller';
import { DuplicateProcessor } from './jobs/export-import/duplicate.processor';
import { JobsLogService } from './jobs/jobs-log.service';
import { JobsGateway } from './jobs.gateway';
import { QueueService } from './fallback-queue.service';
import { JobsEventService } from './jobs-event.service';
import { AtImportController } from './at-import/at-import.controller';
import { AtImportProcessor } from './at-import/at-import.processor';
// Redis
import { JobsService } from './redis/jobs.service';
import { JobsRedisService } from './redis/jobs-redis.service';
import { JobsEventService } from './redis/jobs-event.service';
// Fallback
import { JobsService as FallbackJobsService } from './fallback/jobs.service';
import { QueueService as FallbackQueueService } from './fallback/fallback-queue.service';
import { JobsEventService as FallbackJobsEventService } from './fallback/jobs-event.service';
@Module({
imports: [
GlobalModule,
DatasModule,
MetasModule,
...(process.env['NC_REDIS_URL']
? [
BullModule.forRoot({
url: process.env.NC_REDIS_URL,
}),
BullModule.registerQueue({
name: JOBS_QUEUE,
}),
]
: []),
],
controllers: [
...(!process.env['NC_WORKER_CONTAINER']
@ -31,9 +46,17 @@ import { AtImportProcessor } from './at-import/at-import.processor';
],
providers: [
...(!process.env['NC_WORKER_CONTAINER'] ? [JobsGateway] : []),
QueueService,
JobsService,
JobsEventService,
...(process.env['NC_REDIS_URL']
? [
JobsRedisService,
...(process.env['NC_WORKER_CONTAINER'] ? [JobsEventService] : []),
]
: [FallbackQueueService, FallbackJobsEventService]),
{
provide: 'JobsService',
useClass: process.env['NC_REDIS_URL'] ? JobsService : FallbackJobsService,
},
JobsLogService,
ExportService,
ImportService,
DuplicateProcessor,

22
packages/nocodb/src/modules/jobs/at-import/at-import.controller.ts → packages/nocodb/src/modules/jobs/jobs/at-import/at-import.controller.ts

@ -1,15 +1,21 @@
import { Controller, HttpCode, Post, Request, UseGuards } from '@nestjs/common';
import { GlobalGuard } from '../../../guards/global/global.guard';
import { ExtractProjectIdMiddleware } from '../../../middlewares/extract-project-id/extract-project-id.middleware';
import { SyncSource } from '../../../models';
import { NcError } from '../../../helpers/catchError';
import { JobsService } from '../jobs.service';
import { JobTypes } from '../../../interface/Jobs';
import {
Controller,
HttpCode,
Inject,
Post,
Request,
UseGuards,
} from '@nestjs/common';
import { GlobalGuard } from '../../../../guards/global/global.guard';
import { ExtractProjectIdMiddleware } from '../../../../middlewares/extract-project-id/extract-project-id.middleware';
import { SyncSource } from '../../../../models';
import { NcError } from '../../../../helpers/catchError';
import { JobTypes } from '../../../../interface/Jobs';
@Controller()
@UseGuards(ExtractProjectIdMiddleware, GlobalGuard)
export class AtImportController {
constructor(private readonly jobsService: JobsService) {}
constructor(@Inject('JobsService') private readonly jobsService) {}
@Post('/api/v1/db/meta/import/airtable')
@HttpCode(200)

40
packages/nocodb/src/modules/jobs/at-import/at-import.processor.ts → packages/nocodb/src/modules/jobs/jobs/at-import/at-import.processor.ts

@ -9,23 +9,23 @@ import utc from 'dayjs/plugin/utc';
import tinycolor from 'tinycolor2';
import { Process, Processor } from '@nestjs/bull';
import { Job } from 'bull';
import extractRolesObj from '../../../utils/extractRolesObj';
import { AttachmentsService } from '../../../services/attachments.service';
import { ColumnsService } from '../../../services/columns.service';
import { BulkDataAliasService } from '../../../services/bulk-data-alias.service';
import { FiltersService } from '../../../services/filters.service';
import { FormColumnsService } from '../../../services/form-columns.service';
import { GalleriesService } from '../../../services/galleries.service';
import { GridsService } from '../../../services/grids.service';
import { ProjectUsersService } from '../../../services/project-users/project-users.service';
import { ProjectsService } from '../../../services/projects.service';
import { SortsService } from '../../../services/sorts.service';
import { TablesService } from '../../../services/tables.service';
import { ViewColumnsService } from '../../../services/view-columns.service';
import { ViewsService } from '../../../services/views.service';
import { FormsService } from '../../../services/forms.service';
import { JobsEventService } from '../jobs-event.service';
import { JOBS_QUEUE, JobTypes } from '../../../interface/Jobs';
import extractRolesObj from '../../../../utils/extractRolesObj';
import { AttachmentsService } from '../../../../services/attachments.service';
import { ColumnsService } from '../../../../services/columns.service';
import { BulkDataAliasService } from '../../../../services/bulk-data-alias.service';
import { FiltersService } from '../../../../services/filters.service';
import { FormColumnsService } from '../../../../services/form-columns.service';
import { GalleriesService } from '../../../../services/galleries.service';
import { GridsService } from '../../../../services/grids.service';
import { ProjectUsersService } from '../../../../services/project-users/project-users.service';
import { ProjectsService } from '../../../../services/projects.service';
import { SortsService } from '../../../../services/sorts.service';
import { TablesService } from '../../../../services/tables.service';
import { ViewColumnsService } from '../../../../services/view-columns.service';
import { ViewsService } from '../../../../services/views.service';
import { FormsService } from '../../../../services/forms.service';
import { JOBS_QUEUE, JobTypes } from '../../../../interface/Jobs';
import { JobsLogService } from '../jobs-log.service';
import FetchAT from './helpers/fetchAT';
import { importData, importLTARData } from './helpers/readAndProcessData';
import EntityMap from './helpers/EntityMap';
@ -99,7 +99,7 @@ export class AtImportProcessor {
private readonly viewColumnsService: ViewColumnsService,
private readonly sortsService: SortsService,
private readonly bulkDataAliasService: BulkDataAliasService,
private readonly jobsEventService: JobsEventService,
private readonly jobsLogService: JobsLogService,
) {}
@Process(JobTypes.AtImport)
@ -135,11 +135,11 @@ export class AtImportProcessor {
};
const logBasic = (log) => {
this.jobsEventService.sendLog(job, { message: log });
this.jobsLogService.sendLog(job, { message: log });
};
const logDetailed = (log) => {
if (debugMode) this.jobsEventService.sendLog(job, { message: log });
if (debugMode) this.jobsLogService.sendLog(job, { message: log });
};
const perfStats = [];

0
packages/nocodb/src/modules/jobs/at-import/helpers/EntityMap.ts → packages/nocodb/src/modules/jobs/jobs/at-import/helpers/EntityMap.ts

0
packages/nocodb/src/modules/jobs/at-import/helpers/fetchAT.ts → packages/nocodb/src/modules/jobs/jobs/at-import/helpers/fetchAT.ts

4
packages/nocodb/src/modules/jobs/at-import/helpers/readAndProcessData.ts → packages/nocodb/src/modules/jobs/jobs/at-import/helpers/readAndProcessData.ts

@ -1,8 +1,8 @@
/* eslint-disable no-async-promise-executor */
import { RelationTypes, UITypes } from 'nocodb-sdk';
import EntityMap from './EntityMap';
import type { BulkDataAliasService } from '../../../../services/bulk-data-alias.service';
import type { TablesService } from '../../../../services/tables.service';
import type { BulkDataAliasService } from '../../../../../services/bulk-data-alias.service';
import type { TablesService } from '../../../../../services/tables.service';
// @ts-ignore
import type { AirtableBase } from 'airtable/lib/airtable_base';
import type { TableType } from 'nocodb-sdk';

0
packages/nocodb/src/modules/jobs/at-import/helpers/syncMap.ts → packages/nocodb/src/modules/jobs/jobs/at-import/helpers/syncMap.ts

16
packages/nocodb/src/modules/jobs/export-import/duplicate.controller.ts → packages/nocodb/src/modules/jobs/jobs/export-import/duplicate.controller.ts

@ -2,28 +2,28 @@ import {
Body,
Controller,
HttpCode,
Inject,
Param,
Post,
Request,
UseGuards,
} from '@nestjs/common';
import { ProjectStatus } from 'nocodb-sdk';
import { GlobalGuard } from '../../../guards/global/global.guard';
import { GlobalGuard } from '../../../../guards/global/global.guard';
import {
Acl,
ExtractProjectIdMiddleware,
} from '../../../middlewares/extract-project-id/extract-project-id.middleware';
import { ProjectsService } from '../../../services/projects.service';
import { Base, Model, Project } from '../../../models';
import { generateUniqueName } from '../../../helpers/exportImportHelpers';
import { JobsService } from '../jobs.service';
import { JobTypes } from '../../../interface/Jobs';
} from '../../../../middlewares/extract-project-id/extract-project-id.middleware';
import { ProjectsService } from '../../../../services/projects.service';
import { Base, Model, Project } from '../../../../models';
import { generateUniqueName } from '../../../../helpers/exportImportHelpers';
import { JobTypes } from '../../../../interface/Jobs';
@Controller()
@UseGuards(ExtractProjectIdMiddleware, GlobalGuard)
export class DuplicateController {
constructor(
private readonly jobsService: JobsService,
@Inject('JobsService') private readonly jobsService,
private readonly projectsService: ProjectsService,
) {}

12
packages/nocodb/src/modules/jobs/export-import/duplicate.processor.ts → packages/nocodb/src/modules/jobs/jobs/export-import/duplicate.processor.ts

@ -4,12 +4,12 @@ import { Job } from 'bull';
import papaparse from 'papaparse';
import { UITypes } from 'nocodb-sdk';
import { Logger } from '@nestjs/common';
import { Base, Column, Model, Project } from '../../../models';
import { ProjectsService } from '../../../services/projects.service';
import { findWithIdentifier } from '../../../helpers/exportImportHelpers';
import { BulkDataAliasService } from '../../../services/bulk-data-alias.service';
import { JOBS_QUEUE, JobTypes } from '../../../interface/Jobs';
import { elapsedTime, initTime } from '../helpers';
import { Base, Column, Model, Project } from '../../../../models';
import { ProjectsService } from '../../../../services/projects.service';
import { findWithIdentifier } from '../../../../helpers/exportImportHelpers';
import { BulkDataAliasService } from '../../../../services/bulk-data-alias.service';
import { JOBS_QUEUE, JobTypes } from '../../../../interface/Jobs';
import { elapsedTime, initTime } from '../../helpers';
import { ExportService } from './export.service';
import { ImportService } from './import.service';

20
packages/nocodb/src/modules/jobs/export-import/export.service.ts → packages/nocodb/src/modules/jobs/jobs/export-import/export.service.ts

@ -2,19 +2,19 @@ import { Readable } from 'stream';
import { UITypes, ViewTypes } from 'nocodb-sdk';
import { unparse } from 'papaparse';
import { Injectable, Logger } from '@nestjs/common';
import NcConnectionMgrv2 from '../../../utils/common/NcConnectionMgrv2';
import { getViewAndModelByAliasOrId } from '../../../modules/datas/helpers';
import NcConnectionMgrv2 from '../../../../utils/common/NcConnectionMgrv2';
import { getViewAndModelByAliasOrId } from '../../../datas/helpers';
import {
clearPrefix,
generateBaseIdMap,
} from '../../../helpers/exportImportHelpers';
import NcPluginMgrv2 from '../../../helpers/NcPluginMgrv2';
import { NcError } from '../../../helpers/catchError';
import { Base, Hook, Model, Project } from '../../../models';
import { DatasService } from '../../../services/datas.service';
import { elapsedTime, initTime } from '../helpers';
import type { BaseModelSqlv2 } from '../../../db/BaseModelSqlv2';
import type { View } from '../../../models';
} from '../../../../helpers/exportImportHelpers';
import NcPluginMgrv2 from '../../../../helpers/NcPluginMgrv2';
import { NcError } from '../../../../helpers/catchError';
import { Base, Hook, Model, Project } from '../../../../models';
import { DatasService } from '../../../../services/datas.service';
import { elapsedTime, initTime } from '../../helpers';
import type { BaseModelSqlv2 } from '../../../../db/BaseModelSqlv2';
import type { View } from '../../../../models';
@Injectable()
export class ExportService {

40
packages/nocodb/src/modules/jobs/export-import/import.service.ts → packages/nocodb/src/modules/jobs/jobs/export-import/import.service.ts

@ -9,28 +9,28 @@ import {
reverseGet,
withoutId,
withoutNull,
} from '../../../helpers/exportImportHelpers';
import { NcError } from '../../../helpers/catchError';
import { Base, Column, Model, Project } from '../../../models';
import { TablesService } from '../../../services/tables.service';
import { ColumnsService } from '../../../services/columns.service';
import { FiltersService } from '../../../services/filters.service';
import { SortsService } from '../../../services/sorts.service';
import { ViewColumnsService } from '../../../services/view-columns.service';
import { GridColumnsService } from '../../../services/grid-columns.service';
import { FormColumnsService } from '../../../services/form-columns.service';
import { GridsService } from '../../../services/grids.service';
import { FormsService } from '../../../services/forms.service';
import { GalleriesService } from '../../../services/galleries.service';
import { KanbansService } from '../../../services/kanbans.service';
import { HooksService } from '../../../services/hooks.service';
import { ViewsService } from '../../../services/views.service';
import NcPluginMgrv2 from '../../../helpers/NcPluginMgrv2';
import { BulkDataAliasService } from '../../../services/bulk-data-alias.service';
import { elapsedTime, initTime } from '../helpers';
} from '../../../../helpers/exportImportHelpers';
import { NcError } from '../../../../helpers/catchError';
import { Base, Column, Model, Project } from '../../../../models';
import { TablesService } from '../../../../services/tables.service';
import { ColumnsService } from '../../../../services/columns.service';
import { FiltersService } from '../../../../services/filters.service';
import { SortsService } from '../../../../services/sorts.service';
import { ViewColumnsService } from '../../../../services/view-columns.service';
import { GridColumnsService } from '../../../../services/grid-columns.service';
import { FormColumnsService } from '../../../../services/form-columns.service';
import { GridsService } from '../../../../services/grids.service';
import { FormsService } from '../../../../services/forms.service';
import { GalleriesService } from '../../../../services/galleries.service';
import { KanbansService } from '../../../../services/kanbans.service';
import { HooksService } from '../../../../services/hooks.service';
import { ViewsService } from '../../../../services/views.service';
import NcPluginMgrv2 from '../../../../helpers/NcPluginMgrv2';
import { BulkDataAliasService } from '../../../../services/bulk-data-alias.service';
import { elapsedTime, initTime } from '../../helpers';
import type { Readable } from 'stream';
import type { ViewCreateReqType } from 'nocodb-sdk';
import type { LinkToAnotherRecordColumn, User, View } from '../../../models';
import type { LinkToAnotherRecordColumn, User, View } from '../../../../models';
@Injectable()
export class ImportService {

16
packages/nocodb/src/modules/jobs/jobs/jobs-log.service.ts

@ -0,0 +1,16 @@
import { Injectable } from '@nestjs/common';
import { EventEmitter2 } from '@nestjs/event-emitter';
import { JobEvents } from '../../../interface/Jobs';
import type { Job } from 'bull';
@Injectable()
export class JobsLogService {
constructor(private eventEmitter: EventEmitter2) {}
sendLog(job: Job, data: { message: string }) {
this.eventEmitter.emit(JobEvents.LOG, {
id: job.id.toString(),
data,
});
}
}

71
packages/nocodb/src/modules/jobs/redis/jobs-event.service.ts

@ -0,0 +1,71 @@
import {
OnQueueActive,
OnQueueCompleted,
OnQueueFailed,
Processor,
} from '@nestjs/bull';
import { Job } from 'bull';
import boxen from 'boxen';
import { OnEvent } from '@nestjs/event-emitter';
import { JobEvents, JOBS_QUEUE, JobStatus } from '../../../interface/Jobs';
import { JobsRedisService } from './jobs-redis.service';
@Processor(JOBS_QUEUE)
export class JobsEventService {
constructor(private jobsRedisService: JobsRedisService) {}
@OnQueueActive()
onActive(job: Job) {
this.jobsRedisService.publish(`jobs-${job.id.toString()}`, {
cmd: JobEvents.STATUS,
id: job.id.toString(),
status: JobStatus.ACTIVE,
});
}
@OnQueueFailed()
onFailed(job: Job, error: Error) {
console.error(
boxen(
`---- !! JOB FAILED !! ----\nid:${job.id}\nerror:${error.name} (${error.message})\n\nstack: ${error.stack}`,
{
padding: 1,
borderStyle: 'double',
borderColor: 'yellow',
},
),
);
this.jobsRedisService.publish(`jobs-${job.id.toString()}`, {
cmd: JobEvents.STATUS,
id: job.id.toString(),
status: JobStatus.FAILED,
data: {
error: {
message: error?.message,
},
},
});
}
@OnQueueCompleted()
onCompleted(job: Job, data: any) {
this.jobsRedisService.publish(`jobs-${job.id.toString()}`, {
cmd: JobEvents.STATUS,
id: job.id.toString(),
status: JobStatus.COMPLETED,
data: {
result: data,
},
});
}
@OnEvent(JobEvents.LOG)
onLog(data: { id: string; data: { message: string } }) {
this.jobsRedisService.publish(`jobs-${data.id}`, {
cmd: JobEvents.LOG,
id: data.id,
data: data.data,
});
}
}

53
packages/nocodb/src/modules/jobs/redis/jobs-redis.service.ts

@ -0,0 +1,53 @@
import { Injectable } from '@nestjs/common';
import Redis from 'ioredis';
@Injectable()
export class JobsRedisService {
private redisClient: Redis;
private redisSubscriber: Redis;
private unsubscribeCallbacks: { [key: string]: () => void } = {};
constructor() {
if (process.env['NC_WORKER_CONTAINER']) {
this.redisClient = new Redis(process.env.NC_REDIS_URL);
return;
}
this.redisSubscriber = new Redis(process.env.NC_REDIS_URL);
}
publish(channel: string, message: string | any) {
if (typeof message === 'string') {
this.redisClient.publish(channel, message);
} else {
try {
this.redisClient.publish(channel, JSON.stringify(message));
} catch (e) {
console.error(e);
}
}
}
subscribe(channel: string, callback: (message: any) => void) {
this.redisSubscriber.subscribe(channel);
const onMessage = (_channel, message) => {
try {
message = JSON.parse(message);
} catch (e) {}
callback(message);
};
this.redisSubscriber.on('message', onMessage);
this.unsubscribeCallbacks[channel] = () => {
this.redisSubscriber.unsubscribe(channel);
this.redisSubscriber.off('message', onMessage);
};
}
unsubscribe(channel: string) {
if (this.unsubscribeCallbacks[channel]) {
this.unsubscribeCallbacks[channel]();
delete this.unsubscribeCallbacks[channel];
}
}
}

90
packages/nocodb/src/modules/jobs/redis/jobs.service.ts

@ -0,0 +1,90 @@
import { InjectQueue } from '@nestjs/bull';
import { Injectable } from '@nestjs/common';
import { Queue } from 'bull';
import { EventEmitter2 } from '@nestjs/event-emitter';
import { JobEvents, JOBS_QUEUE, JobStatus } from '../../../interface/Jobs';
import { JobsRedisService } from './jobs-redis.service';
@Injectable()
export class JobsService {
private localJobs: string[] = [];
constructor(
@InjectQueue(JOBS_QUEUE) private readonly jobsQueue: Queue,
private jobsRedisService: JobsRedisService,
private eventEmitter: EventEmitter2,
) {
if (process.env['NC_REDIS_URL'] && !process.env['NC_WORKER_CONTAINER']) {
this.jobsQueue.pause(true);
}
}
async add(name: string, data: any) {
const job = await this.jobsQueue.add(name, data);
this.localJobs.push(job.id.toString());
this.jobsRedisService.subscribe(`jobs-${job.id.toString()}`, (data) => {
const cmd = data.cmd;
delete data.cmd;
switch (cmd) {
case JobEvents.STATUS:
this.eventEmitter.emit(JobEvents.STATUS, data);
if ([JobStatus.COMPLETED, JobStatus.FAILED].includes(data.status)) {
this.jobsRedisService.unsubscribe(`jobs-${data.id.toString()}`);
}
break;
case JobEvents.LOG:
this.eventEmitter.emit(JobEvents.LOG, data);
break;
}
});
return job;
}
async isLocalJob(jobId: string) {
return this.localJobs.includes(jobId);
}
async removeLocalJob(jobId: string) {
this.localJobs = this.localJobs.filter((j) => j !== jobId);
}
async jobStatus(jobId: string) {
const job = await this.jobsQueue.getJob(jobId);
if (job) {
return await job.getState();
}
}
async jobList() {
return await this.jobsQueue.getJobs([
JobStatus.ACTIVE,
JobStatus.WAITING,
JobStatus.DELAYED,
JobStatus.PAUSED,
]);
}
async getJobWithData(data: any) {
const jobs = await this.jobsQueue.getJobs([
// 'completed',
JobStatus.WAITING,
JobStatus.ACTIVE,
JobStatus.DELAYED,
// 'failed',
JobStatus.PAUSED,
]);
const job = jobs.find((j) => {
for (const key in data) {
if (j.data[key]) {
if (j.data[key] !== data[key]) return false;
} else {
return false;
}
}
return true;
});
return job;
}
}

1
packages/nocodb/src/modules/metas/metas.module.ts

@ -67,7 +67,6 @@ import { UtilsService } from '../../services/utils.service';
import { ViewColumnsService } from '../../services/view-columns.service';
import { ViewsService } from '../../services/views.service';
import { ApiDocsService } from '../../services/api-docs/api-docs.service';
import { EventEmitterModule } from '../event-emitter/event-emitter.module';
import { GlobalModule } from '../global/global.module';
import { ProjectUsersController } from '../../controllers/project-users.controller';
import { ProjectUsersService } from '../../services/project-users/project-users.service';

78
packages/nocodb/src/worker.module.ts

@ -1,78 +0,0 @@
import { Inject, Module, RequestMethod } from '@nestjs/common';
import { APP_FILTER } from '@nestjs/core';
import { BullModule } from '@nestjs/bull';
import { EventEmitterModule as NestJsEventEmitter } from '@nestjs/event-emitter';
import { Connection } from './connection/connection';
import { GlobalExceptionFilter } from './filters/global-exception/global-exception.filter';
import NcPluginMgrv2 from './helpers/NcPluginMgrv2';
import { DatasModule } from './modules/datas/datas.module';
import { IEventEmitter } from './modules/event-emitter/event-emitter.interface';
import { EventEmitterModule } from './modules/event-emitter/event-emitter.module';
import { AuthService } from './services/auth.service';
import { UsersModule } from './modules/users/users.module';
import { MetaService } from './meta/meta.service';
import Noco from './Noco';
import { TestModule } from './modules/test/test.module';
import { GlobalModule } from './modules/global/global.module';
import { HookHandlerService } from './services/hook-handler.service';
import { LocalStrategy } from './strategies/local.strategy';
import { AuthTokenStrategy } from './strategies/authtoken.strategy/authtoken.strategy';
import { BaseViewStrategy } from './strategies/base-view.strategy/base-view.strategy';
import { MetasModule } from './modules/metas/metas.module';
import NocoCache from './cache/NocoCache';
import { JobsModule } from './modules/jobs/jobs.module';
import type { OnApplicationBootstrap } from '@nestjs/common';
@Module({
imports: [
GlobalModule,
UsersModule,
...(process.env['PLAYWRIGHT_TEST'] === 'true' ? [TestModule] : []),
MetasModule,
DatasModule,
EventEmitterModule,
JobsModule,
NestJsEventEmitter.forRoot(),
...(process.env['NC_REDIS_URL']
? [
BullModule.forRoot({
url: process.env.NC_REDIS_URL,
}),
]
: []),
],
controllers: [],
providers: [
AuthService,
{
provide: APP_FILTER,
useClass: GlobalExceptionFilter,
},
LocalStrategy,
AuthTokenStrategy,
BaseViewStrategy,
HookHandlerService,
],
})
export class AppModule implements OnApplicationBootstrap {
constructor(
private readonly connection: Connection,
private readonly metaService: MetaService,
@Inject('IEventEmitter') private readonly eventEmitter: IEventEmitter,
) {}
// app init
async onApplicationBootstrap(): Promise<void> {
process.env.NC_VERSION = '0105004';
await NocoCache.init();
// todo: remove
// temporary hack
Noco._ncMeta = this.metaService;
Noco.config = this.connection.config;
Noco.eventEmitter = this.eventEmitter;
await NcPluginMgrv2.init(Noco.ncMeta);
}
}
Loading…
Cancel
Save