From 9881e8b547e91b6e4389a4cc8accb5711e7a51da Mon Sep 17 00:00:00 2001 From: Mert E Date: Mon, 20 May 2024 19:50:45 +0300 Subject: [PATCH] feat: move webhook handler to worker (#8525) * feat: move webhook handler to worker Signed-off-by: mertmit * fix: move job data type to interfaces Signed-off-by: mertmit --------- Signed-off-by: mertmit --- packages/nocodb/src/app.module.ts | 2 - packages/nocodb/src/interface/Jobs.ts | 13 ++++ .../src/modules/global/global.module.ts | 6 +- .../jobs/fallback/fallback-queue.service.ts | 6 ++ .../modules/jobs/jobs-service.interface.ts | 13 ++++ .../src/modules/jobs/jobs.controller.ts | 5 +- .../nocodb/src/modules/jobs/jobs.module.ts | 2 + .../jobs/at-import/at-import.controller.ts | 5 +- .../export-import/duplicate.controller.ts | 3 +- .../jobs/jobs/health-check.processor.ts | 8 ++- .../jobs/meta-sync/meta-sync.controller.ts | 5 +- .../source-create/source-create.controller.ts | 5 +- .../source-delete/source-delete.controller.ts | 3 +- .../webhook-handler.processor.ts | 24 ++++++++ .../src/services/hook-handler.service.ts | 61 +++++++++++++------ 15 files changed, 129 insertions(+), 32 deletions(-) create mode 100644 packages/nocodb/src/modules/jobs/jobs-service.interface.ts create mode 100644 packages/nocodb/src/modules/jobs/jobs/webhook-handler/webhook-handler.processor.ts diff --git a/packages/nocodb/src/app.module.ts b/packages/nocodb/src/app.module.ts index 108b769b95..55e46d5742 100644 --- a/packages/nocodb/src/app.module.ts +++ b/packages/nocodb/src/app.module.ts @@ -21,7 +21,6 @@ import { JobsModule } from '~/modules/jobs/jobs.module'; import appConfig from '~/app.config'; import { ExtractIdsMiddleware } from '~/middlewares/extract-ids/extract-ids.middleware'; -import { HookHandlerService } from '~/services/hook-handler.service'; import { BasicStrategy } from '~/strategies/basic.strategy/basic.strategy'; import { UsersModule } from '~/modules/users/users.module'; import { AuthModule } from '~/modules/auth/auth.module'; @@ -66,7 +65,6 @@ export const ceModuleConfig = { LocalStrategy, AuthTokenStrategy, BaseViewStrategy, - HookHandlerService, BasicStrategy, ], }; diff --git a/packages/nocodb/src/interface/Jobs.ts b/packages/nocodb/src/interface/Jobs.ts index b1ee06ca94..c73353cd9b 100644 --- a/packages/nocodb/src/interface/Jobs.ts +++ b/packages/nocodb/src/interface/Jobs.ts @@ -1,3 +1,5 @@ +import type { UserType } from 'nocodb-sdk'; + export const JOBS_QUEUE = 'jobs'; export enum JobTypes { @@ -12,6 +14,7 @@ export enum JobTypes { UpdateWsStat = 'update-ws-stats', UpdateSrcStat = 'update-source-stat', HealthCheck = 'health-check', + HandleWebhook = 'handle-webhook', } export enum JobStatus { @@ -40,3 +43,13 @@ export enum InstanceCommands { RESET = 'reset', RELEASE = 'release', } + +export interface HandleWebhookJobData { + hookName: string; + prevData; + newData; + user: UserType; + viewId: string; + modelId: string; + tnPath: string; +} diff --git a/packages/nocodb/src/modules/global/global.module.ts b/packages/nocodb/src/modules/global/global.module.ts index 1f4abce3d9..4d1412bf1b 100644 --- a/packages/nocodb/src/modules/global/global.module.ts +++ b/packages/nocodb/src/modules/global/global.module.ts @@ -12,7 +12,9 @@ import { JwtStrategy } from '~/strategies/jwt.strategy'; import { UsersService } from '~/services/users/users.service'; import { TelemetryService } from '~/services/telemetry.service'; import { AppHooksListenerService } from '~/services/app-hooks-listener.service'; +import { HookHandlerService } from '~/services/hook-handler.service'; import { UsersModule } from '~/modules/users/users.module'; +import { JobsModule } from '~/modules/jobs/jobs.module'; export const JwtStrategyProvider: Provider = { provide: JwtStrategy, @@ -36,7 +38,7 @@ export const JwtStrategyProvider: Provider = { }; export const globalModuleMetadata = { - imports: [EventEmitterModule, forwardRef(() => UsersModule)], + imports: [EventEmitterModule, forwardRef(() => UsersModule), JobsModule], providers: [ InitMetaServiceProvider, AppHooksService, @@ -46,6 +48,7 @@ export const globalModuleMetadata = { AppHooksService, AppHooksListenerService, TelemetryService, + HookHandlerService, ], exports: [ MetaService, @@ -54,6 +57,7 @@ export const globalModuleMetadata = { AppHooksService, AppHooksListenerService, TelemetryService, + HookHandlerService, ...(process.env.NC_WORKER_CONTAINER !== 'true' ? [SocketGateway] : []), ], }; diff --git a/packages/nocodb/src/modules/jobs/fallback/fallback-queue.service.ts b/packages/nocodb/src/modules/jobs/fallback/fallback-queue.service.ts index 49a52c8ad2..2166434b4d 100644 --- a/packages/nocodb/src/modules/jobs/fallback/fallback-queue.service.ts +++ b/packages/nocodb/src/modules/jobs/fallback/fallback-queue.service.ts @@ -6,6 +6,7 @@ import { AtImportProcessor } from '~/modules/jobs/jobs/at-import/at-import.proce import { MetaSyncProcessor } from '~/modules/jobs/jobs/meta-sync/meta-sync.processor'; import { SourceCreateProcessor } from '~/modules/jobs/jobs/source-create/source-create.processor'; import { SourceDeleteProcessor } from '~/modules/jobs/jobs/source-delete/source-delete.processor'; +import { WebhookHandlerProcessor } from '~/modules/jobs/jobs/webhook-handler/webhook-handler.processor'; import { JobsEventService } from '~/modules/jobs/fallback/jobs-event.service'; import { JobStatus, JobTypes } from '~/interface/Jobs'; @@ -31,6 +32,7 @@ export class QueueService { protected readonly metaSyncProcessor: MetaSyncProcessor, protected readonly sourceCreateProcessor: SourceCreateProcessor, protected readonly sourceDeleteProcessor: SourceDeleteProcessor, + protected readonly webhookHandlerProcessor: WebhookHandlerProcessor, ) { this.emitter.on(JobStatus.ACTIVE, (data: { job: Job }) => { const job = this.queueMemory.find((job) => job.id === data.job.id); @@ -88,6 +90,10 @@ export class QueueService { this: this.sourceDeleteProcessor, fn: this.sourceDeleteProcessor.job, }, + [JobTypes.HandleWebhook]: { + this: this.webhookHandlerProcessor, + fn: this.webhookHandlerProcessor.job, + }, }; async jobWrapper(job: Job) { diff --git a/packages/nocodb/src/modules/jobs/jobs-service.interface.ts b/packages/nocodb/src/modules/jobs/jobs-service.interface.ts new file mode 100644 index 0000000000..7f083582f2 --- /dev/null +++ b/packages/nocodb/src/modules/jobs/jobs-service.interface.ts @@ -0,0 +1,13 @@ +import type Bull from 'bull'; +import type { JobStatus } from '~/interface/Jobs'; + +export interface IJobsService { + jobsQueue: Bull.Queue; + toggleQueue(): Promise; + add(name: string, data: any): Promise>; + jobStatus(jobId: string): Promise; + jobList(): Promise[]>; + getJobWithData(data: any): Promise>; + resumeQueue(): Promise; + pauseQueue(): Promise; +} diff --git a/packages/nocodb/src/modules/jobs/jobs.controller.ts b/packages/nocodb/src/modules/jobs/jobs.controller.ts index 7d371f9821..393b4dfcae 100644 --- a/packages/nocodb/src/modules/jobs/jobs.controller.ts +++ b/packages/nocodb/src/modules/jobs/jobs.controller.ts @@ -21,6 +21,7 @@ import { GlobalGuard } from '~/guards/global/global.guard'; import NocoCache from '~/cache/NocoCache'; import { CacheGetType, CacheScope } from '~/utils/globals'; import { MetaApiLimiterGuard } from '~/guards/meta-api-limiter.guard'; +import { IJobsService } from '~/modules/jobs/jobs-service.interface'; const nanoidv2 = customAlphabet('1234567890abcdefghijklmnopqrstuvwxyz', 14); const POLLING_INTERVAL = 30000; @@ -31,7 +32,7 @@ export class JobsController implements OnModuleInit { jobsRedisService: JobsRedisService; constructor( - @Inject('JobsService') private readonly jobsService, + @Inject('JobsService') private readonly jobsService: IJobsService, private moduleRef: ModuleRef, ) {} @@ -168,7 +169,7 @@ export class JobsController implements OnModuleInit { const job = await this.jobsService.getJobWithData(data); if (job) { res = {}; - res.id = job.id; + res.id = `${job.id}`; res.status = await this.jobsService.jobStatus(data.id); } } diff --git a/packages/nocodb/src/modules/jobs/jobs.module.ts b/packages/nocodb/src/modules/jobs/jobs.module.ts index 736d0c7ddd..c29620aca4 100644 --- a/packages/nocodb/src/modules/jobs/jobs.module.ts +++ b/packages/nocodb/src/modules/jobs/jobs.module.ts @@ -14,6 +14,7 @@ import { SourceCreateController } from '~/modules/jobs/jobs/source-create/source import { SourceCreateProcessor } from '~/modules/jobs/jobs/source-create/source-create.processor'; import { SourceDeleteController } from '~/modules/jobs/jobs/source-delete/source-delete.controller'; import { SourceDeleteProcessor } from '~/modules/jobs/jobs/source-delete/source-delete.processor'; +import { WebhookHandlerProcessor } from '~/modules/jobs/jobs/webhook-handler/webhook-handler.processor'; // Jobs Module Related import { JobsLogService } from '~/modules/jobs/jobs/jobs-log.service'; @@ -77,6 +78,7 @@ export const JobsModuleMetadata = { MetaSyncProcessor, SourceCreateProcessor, SourceDeleteProcessor, + WebhookHandlerProcessor, ], exports: ['JobsService'], }; diff --git a/packages/nocodb/src/modules/jobs/jobs/at-import/at-import.controller.ts b/packages/nocodb/src/modules/jobs/jobs/at-import/at-import.controller.ts index 129a75036a..f268250b0f 100644 --- a/packages/nocodb/src/modules/jobs/jobs/at-import/at-import.controller.ts +++ b/packages/nocodb/src/modules/jobs/jobs/at-import/at-import.controller.ts @@ -13,11 +13,14 @@ import { SyncSource } from '~/models'; import { NcError } from '~/helpers/catchError'; import { JobTypes } from '~/interface/Jobs'; import { MetaApiLimiterGuard } from '~/guards/meta-api-limiter.guard'; +import { IJobsService } from '~/modules/jobs/jobs-service.interface'; @Controller() @UseGuards(MetaApiLimiterGuard, GlobalGuard) export class AtImportController { - constructor(@Inject('JobsService') private readonly jobsService) {} + constructor( + @Inject('JobsService') private readonly jobsService: IJobsService, + ) {} @Post([ '/api/v1/db/meta/syncs/:syncId/trigger', diff --git a/packages/nocodb/src/modules/jobs/jobs/export-import/duplicate.controller.ts b/packages/nocodb/src/modules/jobs/jobs/export-import/duplicate.controller.ts index 73b68c142a..213042b83d 100644 --- a/packages/nocodb/src/modules/jobs/jobs/export-import/duplicate.controller.ts +++ b/packages/nocodb/src/modules/jobs/jobs/export-import/duplicate.controller.ts @@ -17,12 +17,13 @@ import { Base, Column, Model, Source } from '~/models'; import { generateUniqueName } from '~/helpers/exportImportHelpers'; import { JobTypes } from '~/interface/Jobs'; import { MetaApiLimiterGuard } from '~/guards/meta-api-limiter.guard'; +import { IJobsService } from '~/modules/jobs/jobs-service.interface'; @Controller() @UseGuards(MetaApiLimiterGuard, GlobalGuard) export class DuplicateController { constructor( - @Inject('JobsService') protected readonly jobsService, + @Inject('JobsService') protected readonly jobsService: IJobsService, protected readonly basesService: BasesService, ) {} diff --git a/packages/nocodb/src/modules/jobs/jobs/health-check.processor.ts b/packages/nocodb/src/modules/jobs/jobs/health-check.processor.ts index d3c839a558..bdf2955336 100644 --- a/packages/nocodb/src/modules/jobs/jobs/health-check.processor.ts +++ b/packages/nocodb/src/modules/jobs/jobs/health-check.processor.ts @@ -1,17 +1,19 @@ import { Process, Processor } from '@nestjs/bull'; import { Inject, Logger } from '@nestjs/common'; -import type { Queue } from 'bull'; import { JOBS_QUEUE, JobTypes } from '~/interface/Jobs'; +import { IJobsService } from '~/modules/jobs/jobs-service.interface'; @Processor(JOBS_QUEUE) export class HealthCheckProcessor { private logger = new Logger(HealthCheckProcessor.name); - constructor(@Inject('JobsService') protected readonly jobsService) {} + constructor( + @Inject('JobsService') protected readonly jobsService: IJobsService, + ) {} @Process(JobTypes.HealthCheck) async healthCheck() { - const queue = this.jobsService.jobsQueue as Queue; + const queue = this.jobsService.jobsQueue; if (queue) { queue diff --git a/packages/nocodb/src/modules/jobs/jobs/meta-sync/meta-sync.controller.ts b/packages/nocodb/src/modules/jobs/jobs/meta-sync/meta-sync.controller.ts index 061cab7467..dfccac10d5 100644 --- a/packages/nocodb/src/modules/jobs/jobs/meta-sync/meta-sync.controller.ts +++ b/packages/nocodb/src/modules/jobs/jobs/meta-sync/meta-sync.controller.ts @@ -13,11 +13,14 @@ import { Acl } from '~/middlewares/extract-ids/extract-ids.middleware'; import { NcError } from '~/helpers/catchError'; import { JobTypes } from '~/interface/Jobs'; import { MetaApiLimiterGuard } from '~/guards/meta-api-limiter.guard'; +import { IJobsService } from '~/modules/jobs/jobs-service.interface'; @Controller() @UseGuards(MetaApiLimiterGuard, GlobalGuard) export class MetaSyncController { - constructor(@Inject('JobsService') private readonly jobsService) {} + constructor( + @Inject('JobsService') private readonly jobsService: IJobsService, + ) {} @Post([ '/api/v1/db/meta/projects/:baseId/meta-diff', diff --git a/packages/nocodb/src/modules/jobs/jobs/source-create/source-create.controller.ts b/packages/nocodb/src/modules/jobs/jobs/source-create/source-create.controller.ts index 8f13b67c86..e262c121ed 100644 --- a/packages/nocodb/src/modules/jobs/jobs/source-create/source-create.controller.ts +++ b/packages/nocodb/src/modules/jobs/jobs/source-create/source-create.controller.ts @@ -15,11 +15,14 @@ import { Acl } from '~/middlewares/extract-ids/extract-ids.middleware'; import { NcError } from '~/helpers/catchError'; import { JobTypes } from '~/interface/Jobs'; import { MetaApiLimiterGuard } from '~/guards/meta-api-limiter.guard'; +import { IJobsService } from '~/modules/jobs/jobs-service.interface'; @Controller() @UseGuards(MetaApiLimiterGuard, GlobalGuard) export class SourceCreateController { - constructor(@Inject('JobsService') private readonly jobsService) {} + constructor( + @Inject('JobsService') private readonly jobsService: IJobsService, + ) {} @Post([ '/api/v1/db/meta/projects/:baseId/bases', diff --git a/packages/nocodb/src/modules/jobs/jobs/source-delete/source-delete.controller.ts b/packages/nocodb/src/modules/jobs/jobs/source-delete/source-delete.controller.ts index c6ec323938..f86d9ad8d7 100644 --- a/packages/nocodb/src/modules/jobs/jobs/source-delete/source-delete.controller.ts +++ b/packages/nocodb/src/modules/jobs/jobs/source-delete/source-delete.controller.ts @@ -13,12 +13,13 @@ import { NcError } from '~/helpers/catchError'; import { JobTypes } from '~/interface/Jobs'; import { SourcesService } from '~/services/sources.service'; import { MetaApiLimiterGuard } from '~/guards/meta-api-limiter.guard'; +import { IJobsService } from '~/modules/jobs/jobs-service.interface'; @Controller() @UseGuards(MetaApiLimiterGuard, GlobalGuard) export class SourceDeleteController { constructor( - @Inject('JobsService') private readonly jobsService, + @Inject('JobsService') private readonly jobsService: IJobsService, private readonly sourcesService: SourcesService, ) {} diff --git a/packages/nocodb/src/modules/jobs/jobs/webhook-handler/webhook-handler.processor.ts b/packages/nocodb/src/modules/jobs/jobs/webhook-handler/webhook-handler.processor.ts new file mode 100644 index 0000000000..6e47501aa9 --- /dev/null +++ b/packages/nocodb/src/modules/jobs/jobs/webhook-handler/webhook-handler.processor.ts @@ -0,0 +1,24 @@ +import { Process, Processor } from '@nestjs/bull'; +import { forwardRef, Inject, Logger } from '@nestjs/common'; +import { Job } from 'bull'; +import { + type HandleWebhookJobData, + JOBS_QUEUE, + JobTypes, +} from '~/interface/Jobs'; +import { HookHandlerService } from '~/services/hook-handler.service'; + +@Processor(JOBS_QUEUE) +export class WebhookHandlerProcessor { + private logger = new Logger(WebhookHandlerProcessor.name); + + constructor( + @Inject(forwardRef(() => HookHandlerService)) + private readonly hookHandlerService: HookHandlerService, + ) {} + + @Process(JobTypes.HandleWebhook) + async job(job: Job) { + await this.hookHandlerService.handleHooks(job.data); + } +} diff --git a/packages/nocodb/src/services/hook-handler.service.ts b/packages/nocodb/src/services/hook-handler.service.ts index d4191ff008..7ef6f1c38e 100644 --- a/packages/nocodb/src/services/hook-handler.service.ts +++ b/packages/nocodb/src/services/hook-handler.service.ts @@ -1,8 +1,7 @@ -import { Inject, Injectable } from '@nestjs/common'; -import { UITypes, ViewTypes } from 'nocodb-sdk'; +import { Inject, Injectable, Logger } from '@nestjs/common'; +import { type HookType, UITypes, ViewTypes } from 'nocodb-sdk'; import ejs from 'ejs'; import type { OnModuleDestroy, OnModuleInit } from '@nestjs/common'; -import type { UserType } from 'nocodb-sdk'; import NcPluginMgrv2 from '~/helpers/NcPluginMgrv2'; import { _transformSubmittedFormDataForEmail, @@ -11,18 +10,22 @@ import { import { IEventEmitter } from '~/modules/event-emitter/event-emitter.interface'; import formSubmissionEmailTemplate from '~/utils/common/formSubmissionEmailTemplate'; import { FormView, Hook, Model, View } from '~/models'; +import { type HandleWebhookJobData, JobTypes } from '~/interface/Jobs'; +import { IJobsService } from '~/modules/jobs/jobs-service.interface'; export const HANDLE_WEBHOOK = '__nc_handleHooks'; @Injectable() export class HookHandlerService implements OnModuleInit, OnModuleDestroy { + private logger = new Logger(HookHandlerService.name); private unsubscribe: () => void; constructor( @Inject('IEventEmitter') private readonly eventEmitter: IEventEmitter, + @Inject('JobsService') private readonly jobsService: IJobsService, ) {} - private async handleHooks({ + public async handleHooks({ hookName, prevData, newData, @@ -30,15 +33,7 @@ export class HookHandlerService implements OnModuleInit, OnModuleDestroy { viewId, modelId, tnPath, - }: { - hookName; - prevData; - newData; - user: UserType; - viewId: string; - modelId: string; - tnPath: string; - }): Promise { + }: HandleWebhookJobData): Promise { const view = await View.get(viewId); const model = await Model.get(modelId); @@ -111,7 +106,11 @@ export class HookHandlerService implements OnModuleInit, OnModuleDestroy { }); } } catch (e) { - console.log(e); + this.logger.error({ + error: e, + details: 'Error while sending form submission email', + hookName, + }); } } @@ -119,23 +118,47 @@ export class HookHandlerService implements OnModuleInit, OnModuleDestroy { const [event, operation] = hookName.split('.'); const hooks = await Hook.list({ fk_model_id: modelId, - event, - operation, + event: event as HookType['event'], + operation: operation as HookType['operation'], }); for (const hook of hooks) { if (hook.active) { - invokeWebhook(hook, model, view, prevData, newData, user); + await invokeWebhook(hook, model, view, prevData, newData, user); } } } catch (e) { - console.log('hooks :: error', hookName, e); + this.logger.error({ + error: e, + details: 'Error while handling webhook', + hookName, + }); } } + private async triggerHook({ + hookName, + prevData, + newData, + user, + viewId, + modelId, + tnPath, + }: HandleWebhookJobData) { + await this.jobsService.add(JobTypes.HandleWebhook, { + hookName, + prevData, + newData, + user, + viewId, + modelId, + tnPath, + }); + } + onModuleInit(): any { this.unsubscribe = this.eventEmitter.on( HANDLE_WEBHOOK, - this.handleHooks.bind(this), + this.triggerHook.bind(this), ); }