Browse Source

fix: wh jobs (#8561)

* fix: only pause queue once for non-worker

* fix: move error handling inside loop

* fix: avoid unhandled reject

* fix: return original template if parsing failed

* fix: remove jobs on complete

* fix: remove job on complete

Signed-off-by: mertmit <mertmit99@gmail.com>

* fix: move wh job handler to invokeWebhook

Signed-off-by: mertmit <mertmit99@gmail.com>

* refactor: import from barrel

Signed-off-by: mertmit <mertmit99@gmail.com>

* fix: improve reduce logic

Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>

* fix: type issues

Signed-off-by: mertmit <mertmit99@gmail.com>

---------

Signed-off-by: mertmit <mertmit99@gmail.com>
Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>
pull/8601/head
Mert E 6 months ago committed by GitHub
parent
commit
47713f16b7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 48
      packages/nocodb/src/helpers/webhookHelpers.ts
  2. 11
      packages/nocodb/src/interface/Jobs.ts
  3. 2
      packages/nocodb/src/modules/jobs/fallback/fallback-queue.service.ts
  4. 6
      packages/nocodb/src/modules/jobs/jobs.controller.ts
  5. 5
      packages/nocodb/src/modules/jobs/jobs/at-import/at-import.controller.ts
  6. 35
      packages/nocodb/src/modules/jobs/jobs/webhook-handler/webhook-handler.processor.ts
  7. 5
      packages/nocodb/src/modules/jobs/redis/jobs-redis.ts
  8. 15
      packages/nocodb/src/modules/jobs/redis/jobs.service.ts
  9. 153
      packages/nocodb/src/services/hook-handler.service.ts
  10. 22
      packages/nocodb/src/services/hooks.service.ts

48
packages/nocodb/src/helpers/webhookHelpers.ts

@ -28,10 +28,15 @@ export function parseBody(template: string, data: any): string {
return template;
}
return Handlebars.compile(template, { noEscape: true })({
data,
event: data,
});
try {
return Handlebars.compile(template, { noEscape: true })({
data,
event: data,
});
} catch (e) {
// if parsing fails then return the original template
return template;
}
}
export async function validateCondition(
@ -452,17 +457,30 @@ export function axiosRequestMake(_apiMeta, _user, data) {
return req;
}
export async function invokeWebhook(
hook: Hook,
model: Model,
view: View,
prevData,
newData,
user,
testFilters = null,
throwErrorOnFailure = false,
testHook = false,
) {
export async function invokeWebhook(param: {
hook: Hook;
model: Model;
view: View;
prevData;
newData;
user;
testFilters?;
throwErrorOnFailure?: boolean;
testHook?: boolean;
}) {
const {
hook,
model,
view,
prevData,
user,
testFilters = null,
throwErrorOnFailure = false,
testHook = false,
} = param;
let { newData } = param;
let hookLog: HookLogType;
const startTime = process.hrtime();
const source = await Source.get(model.source_id);

11
packages/nocodb/src/interface/Jobs.ts

@ -1,5 +1,3 @@
import type { UserType } from 'nocodb-sdk';
export const JOBS_QUEUE = 'jobs';
export enum JobTypes {
@ -46,11 +44,10 @@ export enum InstanceCommands {
}
export interface HandleWebhookJobData {
hookName: string;
hookId: string;
modelId: string;
viewId: string;
prevData;
newData;
user: UserType;
viewId: string;
modelId: string;
tnPath: string;
user;
}

2
packages/nocodb/src/modules/jobs/fallback/fallback-queue.service.ts

@ -129,7 +129,7 @@ export class QueueService {
QueueService.queueIdCounter = index;
}
add(name: string, data: any) {
add(name: string, data: any, _opts = {}) {
const id = `${this.queueIndex++}`;
const job = { id: `${id}`, name, status: JobStatus.WAITING, data };
this.queueMemory.push(job);

6
packages/nocodb/src/modules/jobs/jobs.controller.ts

@ -112,7 +112,11 @@ export class JobsController {
) {
await JobsRedis.unsubscribe(jobId);
delete this.jobRooms[jobId];
this.closedJobs.push(jobId);
// close the job after 1 second (to allow the update of messages)
setTimeout(() => {
this.closedJobs.push(jobId);
}, 1000);
// remove the job after polling interval * 2
setTimeout(() => {
this.closedJobs = this.closedJobs.filter((j) => j !== jobId);
}, POLLING_INTERVAL * 2);

5
packages/nocodb/src/modules/jobs/jobs/at-import/at-import.controller.ts

@ -57,11 +57,6 @@ export class AtImportController {
authToken: '',
baseURL,
user: user,
req: {
user: req.user,
clientIp: req.clientIp,
headers: req.headers,
},
});
return { id: job.id };

35
packages/nocodb/src/modules/jobs/jobs/webhook-handler/webhook-handler.processor.ts

@ -1,24 +1,45 @@
import { Process, Processor } from '@nestjs/bull';
import { forwardRef, Inject, Logger } from '@nestjs/common';
import { Logger } from '@nestjs/common';
import { Job } from 'bull';
import { invokeWebhook } from '~/helpers/webhookHelpers';
import { Hook, Model, View } from '~/models';
import {
type HandleWebhookJobData,
JOBS_QUEUE,
JobTypes,
} from '~/interface/Jobs';
import { HookHandlerService } from '~/services/hook-handler.service';
@Processor(JOBS_QUEUE)
export class WebhookHandlerProcessor {
private logger = new Logger(WebhookHandlerProcessor.name);
constructor(
@Inject(forwardRef(() => HookHandlerService))
private readonly hookHandlerService: HookHandlerService,
) {}
constructor() {}
@Process(JobTypes.HandleWebhook)
async job(job: Job<HandleWebhookJobData>) {
await this.hookHandlerService.handleHooks(job.data);
const { hookId, modelId, viewId, prevData, newData, user } = job.data;
const hook = await Hook.get(hookId);
if (!hook) {
this.logger.error(`Hook not found for id: ${hookId}`);
return;
}
const model = await Model.get(modelId);
if (!model) {
this.logger.error(`Model not found for id: ${modelId}`);
return;
}
const view = viewId ? await View.get(viewId) : null;
await invokeWebhook({
hook,
model,
view,
prevData,
newData,
user,
});
}
}

5
packages/nocodb/src/modules/jobs/redis/jobs-redis.ts

@ -128,13 +128,14 @@ export class JobsRedis {
await this.init();
}
return new Promise((resolve, reject) => {
return new Promise((resolve) => {
this.redisClient.publish(
InstanceTypes.WORKER,
'count',
(error, numberOfSubscribers) => {
if (error) {
reject(0);
this.logger.warn(error);
resolve(0);
} else {
resolve(numberOfSubscribers);
}

15
packages/nocodb/src/modules/jobs/redis/jobs.service.ts

@ -13,6 +13,10 @@ export class JobsService implements OnModuleInit {
// pause primary instance queue
async onModuleInit() {
if (process.env.NC_WORKER_CONTAINER === 'false') {
await this.jobsQueue.pause(true);
}
await this.toggleQueue();
JobsRedis.workerCallbacks[InstanceCommands.RESUME_LOCAL] = async () => {
@ -26,9 +30,10 @@ export class JobsService implements OnModuleInit {
}
async toggleQueue() {
if (process.env.NC_WORKER_CONTAINER === 'false') {
await this.jobsQueue.pause(true);
} else if (process.env.NC_WORKER_CONTAINER !== 'true') {
if (
process.env.NC_WORKER_CONTAINER !== 'true' &&
process.env.NC_WORKER_CONTAINER !== 'false'
) {
// resume primary instance queue if there is no worker
const workerCount = await JobsRedis.workerCount();
const localWorkerPaused = await this.jobsQueue.isPaused(true);
@ -46,7 +51,9 @@ export class JobsService implements OnModuleInit {
async add(name: string, data: any) {
await this.toggleQueue();
const job = await this.jobsQueue.add(name, data);
const job = await this.jobsQueue.add(name, data, {
removeOnComplete: true,
});
return job;
}

153
packages/nocodb/src/services/hook-handler.service.ts

@ -1,16 +1,15 @@
import { Inject, Injectable, Logger } from '@nestjs/common';
import { type HookType, UITypes, ViewTypes } from 'nocodb-sdk';
import { UITypes, ViewTypes } from 'nocodb-sdk';
import ejs from 'ejs';
import type { FormColumnType, HookType } from 'nocodb-sdk';
import type { ColumnType } from 'nocodb-sdk';
import type { OnModuleDestroy, OnModuleInit } from '@nestjs/common';
import NcPluginMgrv2 from '~/helpers/NcPluginMgrv2';
import {
_transformSubmittedFormDataForEmail,
invokeWebhook,
} from '~/helpers/webhookHelpers';
import { _transformSubmittedFormDataForEmail } from '~/helpers/webhookHelpers';
import { IEventEmitter } from '~/modules/event-emitter/event-emitter.interface';
import formSubmissionEmailTemplate from '~/utils/common/formSubmissionEmailTemplate';
import { FormView, Hook, Model, View } from '~/models';
import { type HandleWebhookJobData, JobTypes } from '~/interface/Jobs';
import { JobTypes } from '~/interface/Jobs';
import { IJobsService } from '~/modules/jobs/jobs-service.interface';
export const HANDLE_WEBHOOK = '__nc_handleHooks';
@ -33,7 +32,7 @@ export class HookHandlerService implements OnModuleInit, OnModuleDestroy {
viewId,
modelId,
tnPath,
}: HandleWebhookJobData): Promise<void> {
}): Promise<void> {
const view = await View.get(viewId);
const model = await Model.get(modelId);
@ -44,52 +43,49 @@ export class HookHandlerService implements OnModuleInit, OnModuleDestroy {
) {
try {
const formView = await view.getView<FormView>();
const { columns } = await FormView.getWithInfo(formView.fk_view_id);
const allColumns = await model.getColumns();
const fieldById = columns.reduce(
(o: Record<string, any>, f: Record<string, any>) => ({
...o,
[f.fk_column_id]: f,
}),
{},
);
let order = 1;
const filteredColumns = allColumns
?.map((c: Record<string, any>) => ({
...c,
fk_column_id: c.id,
fk_view_id: formView.fk_view_id,
...(fieldById[c.id] ? fieldById[c.id] : {}),
order: (fieldById[c.id] && fieldById[c.id].order) || order++,
id: fieldById[c.id] && fieldById[c.id].id,
}))
.sort(
(a: Record<string, any>, b: Record<string, any>) =>
a.order - b.order,
)
.filter(
(f: Record<string, any>) =>
f.show &&
f.uidt !== UITypes.Rollup &&
f.uidt !== UITypes.Lookup &&
f.uidt !== UITypes.Formula &&
f.uidt !== UITypes.QrCode &&
f.uidt !== UITypes.Barcode &&
f.uidt !== UITypes.SpecificDBType,
)
.sort(
(a: Record<string, any>, b: Record<string, any>) =>
a.order - b.order,
)
.map((c: Record<string, any>) => ({
...c,
required: !!(c.required || 0),
}));
const emails = Object.entries(JSON.parse(formView?.email) || {})
.filter((a) => a[1])
.map((a) => a[0]);
if (emails?.length) {
const { columns } = await FormView.getWithInfo(formView.fk_view_id);
const allColumns = await model.getColumns();
const fieldById = columns.reduce(
(o: Record<string, FormColumnType>, f: FormColumnType) => {
return Object.assign(o, { [f.fk_column_id]: f });
},
{},
);
let order = 1;
const filteredColumns = allColumns
?.map((c: ColumnType) => {
return {
...c,
fk_column_id: c.id,
fk_view_id: formView.fk_view_id,
...(fieldById[c.id] ? fieldById[c.id] : {}),
order: (fieldById[c.id] && fieldById[c.id].order) || order++,
id: fieldById[c.id] && fieldById[c.id].id,
};
})
.sort((a: ColumnType, b: ColumnType) => a.order - b.order)
.filter(
(f: ColumnType & FormColumnType) =>
f.show &&
f.uidt !== UITypes.Rollup &&
f.uidt !== UITypes.Lookup &&
f.uidt !== UITypes.Formula &&
f.uidt !== UITypes.QrCode &&
f.uidt !== UITypes.Barcode &&
f.uidt !== UITypes.SpecificDBType,
)
.sort((a: ColumnType, b: ColumnType) => a.order - b.order)
.map((c: ColumnType & FormColumnType) => {
c.required = !!(c.required || 0);
return c;
});
const transformedData = _transformSubmittedFormDataForEmail(
newData,
formView,
@ -114,51 +110,38 @@ export class HookHandlerService implements OnModuleInit, OnModuleDestroy {
}
}
try {
const [event, operation] = hookName.split('.');
const hooks = await Hook.list({
fk_model_id: modelId,
event: event as HookType['event'],
operation: operation as HookType['operation'],
});
for (const hook of hooks) {
if (hook.active) {
await invokeWebhook(hook, model, view, prevData, newData, user);
const [event, operation] = hookName.split('.');
const hooks = await Hook.list({
fk_model_id: modelId,
event: event as HookType['event'],
operation: operation as HookType['operation'],
});
for (const hook of hooks) {
if (hook.active) {
try {
await this.jobsService.add(JobTypes.HandleWebhook, {
hookId: hook.id,
modelId,
viewId,
prevData,
newData,
user,
});
} catch (e) {
this.logger.error({
error: e,
details: 'Error while invoking webhook',
hook: hook.id,
});
}
}
} catch (e) {
this.logger.error({
error: e,
details: 'Error while handling webhook',
hookName,
});
}
}
private async triggerHook({
hookName,
prevData,
newData,
user,
viewId,
modelId,
tnPath,
}: HandleWebhookJobData) {
await this.jobsService.add(JobTypes.HandleWebhook, {
hookName,
prevData,
newData,
user,
viewId,
modelId,
tnPath,
});
}
onModuleInit(): any {
this.unsubscribe = this.eventEmitter.on(
HANDLE_WEBHOOK,
this.triggerHook.bind(this),
this.handleHooks.bind(this),
);
}

22
packages/nocodb/src/services/hooks.service.ts

@ -122,17 +122,17 @@ export class HooksService {
payload: { data, user },
} = param.hookTest;
try {
await invokeWebhook(
new Hook(hook),
model,
null,
null,
data,
user,
(hook as any)?.filters,
true,
true,
);
await invokeWebhook({
hook: new Hook(hook),
model: model,
view: null,
prevData: null,
newData: data,
user: user,
testFilters: (hook as any)?.filters,
throwErrorOnFailure: true,
testHook: true,
});
} catch (e) {
throw e;
} finally {

Loading…
Cancel
Save