Browse Source

feat: move webhook handler to worker (#8525)

* feat: move webhook handler to worker

Signed-off-by: mertmit <mertmit99@gmail.com>

* fix: move job data type to interfaces

Signed-off-by: mertmit <mertmit99@gmail.com>

---------

Signed-off-by: mertmit <mertmit99@gmail.com>
pull/8536/head
Mert E 6 months ago committed by GitHub
parent
commit
9881e8b547
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 2
      packages/nocodb/src/app.module.ts
  2. 13
      packages/nocodb/src/interface/Jobs.ts
  3. 6
      packages/nocodb/src/modules/global/global.module.ts
  4. 6
      packages/nocodb/src/modules/jobs/fallback/fallback-queue.service.ts
  5. 13
      packages/nocodb/src/modules/jobs/jobs-service.interface.ts
  6. 5
      packages/nocodb/src/modules/jobs/jobs.controller.ts
  7. 2
      packages/nocodb/src/modules/jobs/jobs.module.ts
  8. 5
      packages/nocodb/src/modules/jobs/jobs/at-import/at-import.controller.ts
  9. 3
      packages/nocodb/src/modules/jobs/jobs/export-import/duplicate.controller.ts
  10. 8
      packages/nocodb/src/modules/jobs/jobs/health-check.processor.ts
  11. 5
      packages/nocodb/src/modules/jobs/jobs/meta-sync/meta-sync.controller.ts
  12. 5
      packages/nocodb/src/modules/jobs/jobs/source-create/source-create.controller.ts
  13. 3
      packages/nocodb/src/modules/jobs/jobs/source-delete/source-delete.controller.ts
  14. 24
      packages/nocodb/src/modules/jobs/jobs/webhook-handler/webhook-handler.processor.ts
  15. 61
      packages/nocodb/src/services/hook-handler.service.ts

2
packages/nocodb/src/app.module.ts

@ -21,7 +21,6 @@ import { JobsModule } from '~/modules/jobs/jobs.module';
import appConfig from '~/app.config'; import appConfig from '~/app.config';
import { ExtractIdsMiddleware } from '~/middlewares/extract-ids/extract-ids.middleware'; import { ExtractIdsMiddleware } from '~/middlewares/extract-ids/extract-ids.middleware';
import { HookHandlerService } from '~/services/hook-handler.service';
import { BasicStrategy } from '~/strategies/basic.strategy/basic.strategy'; import { BasicStrategy } from '~/strategies/basic.strategy/basic.strategy';
import { UsersModule } from '~/modules/users/users.module'; import { UsersModule } from '~/modules/users/users.module';
import { AuthModule } from '~/modules/auth/auth.module'; import { AuthModule } from '~/modules/auth/auth.module';
@ -66,7 +65,6 @@ export const ceModuleConfig = {
LocalStrategy, LocalStrategy,
AuthTokenStrategy, AuthTokenStrategy,
BaseViewStrategy, BaseViewStrategy,
HookHandlerService,
BasicStrategy, BasicStrategy,
], ],
}; };

13
packages/nocodb/src/interface/Jobs.ts

@ -1,3 +1,5 @@
import type { UserType } from 'nocodb-sdk';
export const JOBS_QUEUE = 'jobs'; export const JOBS_QUEUE = 'jobs';
export enum JobTypes { export enum JobTypes {
@ -12,6 +14,7 @@ export enum JobTypes {
UpdateWsStat = 'update-ws-stats', UpdateWsStat = 'update-ws-stats',
UpdateSrcStat = 'update-source-stat', UpdateSrcStat = 'update-source-stat',
HealthCheck = 'health-check', HealthCheck = 'health-check',
HandleWebhook = 'handle-webhook',
} }
export enum JobStatus { export enum JobStatus {
@ -40,3 +43,13 @@ export enum InstanceCommands {
RESET = 'reset', RESET = 'reset',
RELEASE = 'release', RELEASE = 'release',
} }
export interface HandleWebhookJobData {
hookName: string;
prevData;
newData;
user: UserType;
viewId: string;
modelId: string;
tnPath: string;
}

6
packages/nocodb/src/modules/global/global.module.ts

@ -12,7 +12,9 @@ import { JwtStrategy } from '~/strategies/jwt.strategy';
import { UsersService } from '~/services/users/users.service'; import { UsersService } from '~/services/users/users.service';
import { TelemetryService } from '~/services/telemetry.service'; import { TelemetryService } from '~/services/telemetry.service';
import { AppHooksListenerService } from '~/services/app-hooks-listener.service'; import { AppHooksListenerService } from '~/services/app-hooks-listener.service';
import { HookHandlerService } from '~/services/hook-handler.service';
import { UsersModule } from '~/modules/users/users.module'; import { UsersModule } from '~/modules/users/users.module';
import { JobsModule } from '~/modules/jobs/jobs.module';
export const JwtStrategyProvider: Provider = { export const JwtStrategyProvider: Provider = {
provide: JwtStrategy, provide: JwtStrategy,
@ -36,7 +38,7 @@ export const JwtStrategyProvider: Provider = {
}; };
export const globalModuleMetadata = { export const globalModuleMetadata = {
imports: [EventEmitterModule, forwardRef(() => UsersModule)], imports: [EventEmitterModule, forwardRef(() => UsersModule), JobsModule],
providers: [ providers: [
InitMetaServiceProvider, InitMetaServiceProvider,
AppHooksService, AppHooksService,
@ -46,6 +48,7 @@ export const globalModuleMetadata = {
AppHooksService, AppHooksService,
AppHooksListenerService, AppHooksListenerService,
TelemetryService, TelemetryService,
HookHandlerService,
], ],
exports: [ exports: [
MetaService, MetaService,
@ -54,6 +57,7 @@ export const globalModuleMetadata = {
AppHooksService, AppHooksService,
AppHooksListenerService, AppHooksListenerService,
TelemetryService, TelemetryService,
HookHandlerService,
...(process.env.NC_WORKER_CONTAINER !== 'true' ? [SocketGateway] : []), ...(process.env.NC_WORKER_CONTAINER !== 'true' ? [SocketGateway] : []),
], ],
}; };

6
packages/nocodb/src/modules/jobs/fallback/fallback-queue.service.ts

@ -6,6 +6,7 @@ import { AtImportProcessor } from '~/modules/jobs/jobs/at-import/at-import.proce
import { MetaSyncProcessor } from '~/modules/jobs/jobs/meta-sync/meta-sync.processor'; import { MetaSyncProcessor } from '~/modules/jobs/jobs/meta-sync/meta-sync.processor';
import { SourceCreateProcessor } from '~/modules/jobs/jobs/source-create/source-create.processor'; import { SourceCreateProcessor } from '~/modules/jobs/jobs/source-create/source-create.processor';
import { SourceDeleteProcessor } from '~/modules/jobs/jobs/source-delete/source-delete.processor'; import { SourceDeleteProcessor } from '~/modules/jobs/jobs/source-delete/source-delete.processor';
import { WebhookHandlerProcessor } from '~/modules/jobs/jobs/webhook-handler/webhook-handler.processor';
import { JobsEventService } from '~/modules/jobs/fallback/jobs-event.service'; import { JobsEventService } from '~/modules/jobs/fallback/jobs-event.service';
import { JobStatus, JobTypes } from '~/interface/Jobs'; import { JobStatus, JobTypes } from '~/interface/Jobs';
@ -31,6 +32,7 @@ export class QueueService {
protected readonly metaSyncProcessor: MetaSyncProcessor, protected readonly metaSyncProcessor: MetaSyncProcessor,
protected readonly sourceCreateProcessor: SourceCreateProcessor, protected readonly sourceCreateProcessor: SourceCreateProcessor,
protected readonly sourceDeleteProcessor: SourceDeleteProcessor, protected readonly sourceDeleteProcessor: SourceDeleteProcessor,
protected readonly webhookHandlerProcessor: WebhookHandlerProcessor,
) { ) {
this.emitter.on(JobStatus.ACTIVE, (data: { job: Job }) => { this.emitter.on(JobStatus.ACTIVE, (data: { job: Job }) => {
const job = this.queueMemory.find((job) => job.id === data.job.id); const job = this.queueMemory.find((job) => job.id === data.job.id);
@ -88,6 +90,10 @@ export class QueueService {
this: this.sourceDeleteProcessor, this: this.sourceDeleteProcessor,
fn: this.sourceDeleteProcessor.job, fn: this.sourceDeleteProcessor.job,
}, },
[JobTypes.HandleWebhook]: {
this: this.webhookHandlerProcessor,
fn: this.webhookHandlerProcessor.job,
},
}; };
async jobWrapper(job: Job) { async jobWrapper(job: Job) {

13
packages/nocodb/src/modules/jobs/jobs-service.interface.ts

@ -0,0 +1,13 @@
import type Bull from 'bull';
import type { JobStatus } from '~/interface/Jobs';
export interface IJobsService {
jobsQueue: Bull.Queue;
toggleQueue(): Promise<void>;
add(name: string, data: any): Promise<Bull.Job<any>>;
jobStatus(jobId: string): Promise<JobStatus>;
jobList(): Promise<Bull.Job<any>[]>;
getJobWithData(data: any): Promise<Bull.Job<any>>;
resumeQueue(): Promise<void>;
pauseQueue(): Promise<void>;
}

5
packages/nocodb/src/modules/jobs/jobs.controller.ts

@ -21,6 +21,7 @@ import { GlobalGuard } from '~/guards/global/global.guard';
import NocoCache from '~/cache/NocoCache'; import NocoCache from '~/cache/NocoCache';
import { CacheGetType, CacheScope } from '~/utils/globals'; import { CacheGetType, CacheScope } from '~/utils/globals';
import { MetaApiLimiterGuard } from '~/guards/meta-api-limiter.guard'; import { MetaApiLimiterGuard } from '~/guards/meta-api-limiter.guard';
import { IJobsService } from '~/modules/jobs/jobs-service.interface';
const nanoidv2 = customAlphabet('1234567890abcdefghijklmnopqrstuvwxyz', 14); const nanoidv2 = customAlphabet('1234567890abcdefghijklmnopqrstuvwxyz', 14);
const POLLING_INTERVAL = 30000; const POLLING_INTERVAL = 30000;
@ -31,7 +32,7 @@ export class JobsController implements OnModuleInit {
jobsRedisService: JobsRedisService; jobsRedisService: JobsRedisService;
constructor( constructor(
@Inject('JobsService') private readonly jobsService, @Inject('JobsService') private readonly jobsService: IJobsService,
private moduleRef: ModuleRef, private moduleRef: ModuleRef,
) {} ) {}
@ -168,7 +169,7 @@ export class JobsController implements OnModuleInit {
const job = await this.jobsService.getJobWithData(data); const job = await this.jobsService.getJobWithData(data);
if (job) { if (job) {
res = {}; res = {};
res.id = job.id; res.id = `${job.id}`;
res.status = await this.jobsService.jobStatus(data.id); res.status = await this.jobsService.jobStatus(data.id);
} }
} }

2
packages/nocodb/src/modules/jobs/jobs.module.ts

@ -14,6 +14,7 @@ import { SourceCreateController } from '~/modules/jobs/jobs/source-create/source
import { SourceCreateProcessor } from '~/modules/jobs/jobs/source-create/source-create.processor'; import { SourceCreateProcessor } from '~/modules/jobs/jobs/source-create/source-create.processor';
import { SourceDeleteController } from '~/modules/jobs/jobs/source-delete/source-delete.controller'; import { SourceDeleteController } from '~/modules/jobs/jobs/source-delete/source-delete.controller';
import { SourceDeleteProcessor } from '~/modules/jobs/jobs/source-delete/source-delete.processor'; import { SourceDeleteProcessor } from '~/modules/jobs/jobs/source-delete/source-delete.processor';
import { WebhookHandlerProcessor } from '~/modules/jobs/jobs/webhook-handler/webhook-handler.processor';
// Jobs Module Related // Jobs Module Related
import { JobsLogService } from '~/modules/jobs/jobs/jobs-log.service'; import { JobsLogService } from '~/modules/jobs/jobs/jobs-log.service';
@ -77,6 +78,7 @@ export const JobsModuleMetadata = {
MetaSyncProcessor, MetaSyncProcessor,
SourceCreateProcessor, SourceCreateProcessor,
SourceDeleteProcessor, SourceDeleteProcessor,
WebhookHandlerProcessor,
], ],
exports: ['JobsService'], exports: ['JobsService'],
}; };

5
packages/nocodb/src/modules/jobs/jobs/at-import/at-import.controller.ts

@ -13,11 +13,14 @@ import { SyncSource } from '~/models';
import { NcError } from '~/helpers/catchError'; import { NcError } from '~/helpers/catchError';
import { JobTypes } from '~/interface/Jobs'; import { JobTypes } from '~/interface/Jobs';
import { MetaApiLimiterGuard } from '~/guards/meta-api-limiter.guard'; import { MetaApiLimiterGuard } from '~/guards/meta-api-limiter.guard';
import { IJobsService } from '~/modules/jobs/jobs-service.interface';
@Controller() @Controller()
@UseGuards(MetaApiLimiterGuard, GlobalGuard) @UseGuards(MetaApiLimiterGuard, GlobalGuard)
export class AtImportController { export class AtImportController {
constructor(@Inject('JobsService') private readonly jobsService) {} constructor(
@Inject('JobsService') private readonly jobsService: IJobsService,
) {}
@Post([ @Post([
'/api/v1/db/meta/syncs/:syncId/trigger', '/api/v1/db/meta/syncs/:syncId/trigger',

3
packages/nocodb/src/modules/jobs/jobs/export-import/duplicate.controller.ts

@ -17,12 +17,13 @@ import { Base, Column, Model, Source } from '~/models';
import { generateUniqueName } from '~/helpers/exportImportHelpers'; import { generateUniqueName } from '~/helpers/exportImportHelpers';
import { JobTypes } from '~/interface/Jobs'; import { JobTypes } from '~/interface/Jobs';
import { MetaApiLimiterGuard } from '~/guards/meta-api-limiter.guard'; import { MetaApiLimiterGuard } from '~/guards/meta-api-limiter.guard';
import { IJobsService } from '~/modules/jobs/jobs-service.interface';
@Controller() @Controller()
@UseGuards(MetaApiLimiterGuard, GlobalGuard) @UseGuards(MetaApiLimiterGuard, GlobalGuard)
export class DuplicateController { export class DuplicateController {
constructor( constructor(
@Inject('JobsService') protected readonly jobsService, @Inject('JobsService') protected readonly jobsService: IJobsService,
protected readonly basesService: BasesService, protected readonly basesService: BasesService,
) {} ) {}

8
packages/nocodb/src/modules/jobs/jobs/health-check.processor.ts

@ -1,17 +1,19 @@
import { Process, Processor } from '@nestjs/bull'; import { Process, Processor } from '@nestjs/bull';
import { Inject, Logger } from '@nestjs/common'; import { Inject, Logger } from '@nestjs/common';
import type { Queue } from 'bull';
import { JOBS_QUEUE, JobTypes } from '~/interface/Jobs'; import { JOBS_QUEUE, JobTypes } from '~/interface/Jobs';
import { IJobsService } from '~/modules/jobs/jobs-service.interface';
@Processor(JOBS_QUEUE) @Processor(JOBS_QUEUE)
export class HealthCheckProcessor { export class HealthCheckProcessor {
private logger = new Logger(HealthCheckProcessor.name); private logger = new Logger(HealthCheckProcessor.name);
constructor(@Inject('JobsService') protected readonly jobsService) {} constructor(
@Inject('JobsService') protected readonly jobsService: IJobsService,
) {}
@Process(JobTypes.HealthCheck) @Process(JobTypes.HealthCheck)
async healthCheck() { async healthCheck() {
const queue = this.jobsService.jobsQueue as Queue; const queue = this.jobsService.jobsQueue;
if (queue) { if (queue) {
queue queue

5
packages/nocodb/src/modules/jobs/jobs/meta-sync/meta-sync.controller.ts

@ -13,11 +13,14 @@ import { Acl } from '~/middlewares/extract-ids/extract-ids.middleware';
import { NcError } from '~/helpers/catchError'; import { NcError } from '~/helpers/catchError';
import { JobTypes } from '~/interface/Jobs'; import { JobTypes } from '~/interface/Jobs';
import { MetaApiLimiterGuard } from '~/guards/meta-api-limiter.guard'; import { MetaApiLimiterGuard } from '~/guards/meta-api-limiter.guard';
import { IJobsService } from '~/modules/jobs/jobs-service.interface';
@Controller() @Controller()
@UseGuards(MetaApiLimiterGuard, GlobalGuard) @UseGuards(MetaApiLimiterGuard, GlobalGuard)
export class MetaSyncController { export class MetaSyncController {
constructor(@Inject('JobsService') private readonly jobsService) {} constructor(
@Inject('JobsService') private readonly jobsService: IJobsService,
) {}
@Post([ @Post([
'/api/v1/db/meta/projects/:baseId/meta-diff', '/api/v1/db/meta/projects/:baseId/meta-diff',

5
packages/nocodb/src/modules/jobs/jobs/source-create/source-create.controller.ts

@ -15,11 +15,14 @@ import { Acl } from '~/middlewares/extract-ids/extract-ids.middleware';
import { NcError } from '~/helpers/catchError'; import { NcError } from '~/helpers/catchError';
import { JobTypes } from '~/interface/Jobs'; import { JobTypes } from '~/interface/Jobs';
import { MetaApiLimiterGuard } from '~/guards/meta-api-limiter.guard'; import { MetaApiLimiterGuard } from '~/guards/meta-api-limiter.guard';
import { IJobsService } from '~/modules/jobs/jobs-service.interface';
@Controller() @Controller()
@UseGuards(MetaApiLimiterGuard, GlobalGuard) @UseGuards(MetaApiLimiterGuard, GlobalGuard)
export class SourceCreateController { export class SourceCreateController {
constructor(@Inject('JobsService') private readonly jobsService) {} constructor(
@Inject('JobsService') private readonly jobsService: IJobsService,
) {}
@Post([ @Post([
'/api/v1/db/meta/projects/:baseId/bases', '/api/v1/db/meta/projects/:baseId/bases',

3
packages/nocodb/src/modules/jobs/jobs/source-delete/source-delete.controller.ts

@ -13,12 +13,13 @@ import { NcError } from '~/helpers/catchError';
import { JobTypes } from '~/interface/Jobs'; import { JobTypes } from '~/interface/Jobs';
import { SourcesService } from '~/services/sources.service'; import { SourcesService } from '~/services/sources.service';
import { MetaApiLimiterGuard } from '~/guards/meta-api-limiter.guard'; import { MetaApiLimiterGuard } from '~/guards/meta-api-limiter.guard';
import { IJobsService } from '~/modules/jobs/jobs-service.interface';
@Controller() @Controller()
@UseGuards(MetaApiLimiterGuard, GlobalGuard) @UseGuards(MetaApiLimiterGuard, GlobalGuard)
export class SourceDeleteController { export class SourceDeleteController {
constructor( constructor(
@Inject('JobsService') private readonly jobsService, @Inject('JobsService') private readonly jobsService: IJobsService,
private readonly sourcesService: SourcesService, private readonly sourcesService: SourcesService,
) {} ) {}

24
packages/nocodb/src/modules/jobs/jobs/webhook-handler/webhook-handler.processor.ts

@ -0,0 +1,24 @@
import { Process, Processor } from '@nestjs/bull';
import { forwardRef, Inject, Logger } from '@nestjs/common';
import { Job } from 'bull';
import {
type HandleWebhookJobData,
JOBS_QUEUE,
JobTypes,
} from '~/interface/Jobs';
import { HookHandlerService } from '~/services/hook-handler.service';
@Processor(JOBS_QUEUE)
export class WebhookHandlerProcessor {
private logger = new Logger(WebhookHandlerProcessor.name);
constructor(
@Inject(forwardRef(() => HookHandlerService))
private readonly hookHandlerService: HookHandlerService,
) {}
@Process(JobTypes.HandleWebhook)
async job(job: Job<HandleWebhookJobData>) {
await this.hookHandlerService.handleHooks(job.data);
}
}

61
packages/nocodb/src/services/hook-handler.service.ts

@ -1,8 +1,7 @@
import { Inject, Injectable } from '@nestjs/common'; import { Inject, Injectable, Logger } from '@nestjs/common';
import { UITypes, ViewTypes } from 'nocodb-sdk'; import { type HookType, UITypes, ViewTypes } from 'nocodb-sdk';
import ejs from 'ejs'; import ejs from 'ejs';
import type { OnModuleDestroy, OnModuleInit } from '@nestjs/common'; import type { OnModuleDestroy, OnModuleInit } from '@nestjs/common';
import type { UserType } from 'nocodb-sdk';
import NcPluginMgrv2 from '~/helpers/NcPluginMgrv2'; import NcPluginMgrv2 from '~/helpers/NcPluginMgrv2';
import { import {
_transformSubmittedFormDataForEmail, _transformSubmittedFormDataForEmail,
@ -11,18 +10,22 @@ import {
import { IEventEmitter } from '~/modules/event-emitter/event-emitter.interface'; import { IEventEmitter } from '~/modules/event-emitter/event-emitter.interface';
import formSubmissionEmailTemplate from '~/utils/common/formSubmissionEmailTemplate'; import formSubmissionEmailTemplate from '~/utils/common/formSubmissionEmailTemplate';
import { FormView, Hook, Model, View } from '~/models'; import { FormView, Hook, Model, View } from '~/models';
import { type HandleWebhookJobData, JobTypes } from '~/interface/Jobs';
import { IJobsService } from '~/modules/jobs/jobs-service.interface';
export const HANDLE_WEBHOOK = '__nc_handleHooks'; export const HANDLE_WEBHOOK = '__nc_handleHooks';
@Injectable() @Injectable()
export class HookHandlerService implements OnModuleInit, OnModuleDestroy { export class HookHandlerService implements OnModuleInit, OnModuleDestroy {
private logger = new Logger(HookHandlerService.name);
private unsubscribe: () => void; private unsubscribe: () => void;
constructor( constructor(
@Inject('IEventEmitter') private readonly eventEmitter: IEventEmitter, @Inject('IEventEmitter') private readonly eventEmitter: IEventEmitter,
@Inject('JobsService') private readonly jobsService: IJobsService,
) {} ) {}
private async handleHooks({ public async handleHooks({
hookName, hookName,
prevData, prevData,
newData, newData,
@ -30,15 +33,7 @@ export class HookHandlerService implements OnModuleInit, OnModuleDestroy {
viewId, viewId,
modelId, modelId,
tnPath, tnPath,
}: { }: HandleWebhookJobData): Promise<void> {
hookName;
prevData;
newData;
user: UserType;
viewId: string;
modelId: string;
tnPath: string;
}): Promise<void> {
const view = await View.get(viewId); const view = await View.get(viewId);
const model = await Model.get(modelId); const model = await Model.get(modelId);
@ -111,7 +106,11 @@ export class HookHandlerService implements OnModuleInit, OnModuleDestroy {
}); });
} }
} catch (e) { } catch (e) {
console.log(e); this.logger.error({
error: e,
details: 'Error while sending form submission email',
hookName,
});
} }
} }
@ -119,23 +118,47 @@ export class HookHandlerService implements OnModuleInit, OnModuleDestroy {
const [event, operation] = hookName.split('.'); const [event, operation] = hookName.split('.');
const hooks = await Hook.list({ const hooks = await Hook.list({
fk_model_id: modelId, fk_model_id: modelId,
event, event: event as HookType['event'],
operation, operation: operation as HookType['operation'],
}); });
for (const hook of hooks) { for (const hook of hooks) {
if (hook.active) { if (hook.active) {
invokeWebhook(hook, model, view, prevData, newData, user); await invokeWebhook(hook, model, view, prevData, newData, user);
} }
} }
} catch (e) { } catch (e) {
console.log('hooks :: error', hookName, e); this.logger.error({
error: e,
details: 'Error while handling webhook',
hookName,
});
} }
} }
private async triggerHook({
hookName,
prevData,
newData,
user,
viewId,
modelId,
tnPath,
}: HandleWebhookJobData) {
await this.jobsService.add(JobTypes.HandleWebhook, {
hookName,
prevData,
newData,
user,
viewId,
modelId,
tnPath,
});
}
onModuleInit(): any { onModuleInit(): any {
this.unsubscribe = this.eventEmitter.on( this.unsubscribe = this.eventEmitter.on(
HANDLE_WEBHOOK, HANDLE_WEBHOOK,
this.handleHooks.bind(this), this.triggerHook.bind(this),
); );
} }

Loading…
Cancel
Save