From 36859d71e08b97e848d57f6c5bcea52c296dbb06 Mon Sep 17 00:00:00 2001 From: Pranav C Date: Fri, 27 May 2022 14:25:28 +0530 Subject: [PATCH] feat: use a separate queue and job for progress (#2170) Signed-off-by: Pranav C --- packages/nocodb/src/lib/noco-jobs/JobsMgr.ts | 8 ++-- .../src/lib/noco/meta/api/sync/importApis.ts | 42 ++++++++++++++----- 2 files changed, 37 insertions(+), 13 deletions(-) diff --git a/packages/nocodb/src/lib/noco-jobs/JobsMgr.ts b/packages/nocodb/src/lib/noco-jobs/JobsMgr.ts index 458d41a057..dc906c19e9 100644 --- a/packages/nocodb/src/lib/noco-jobs/JobsMgr.ts +++ b/packages/nocodb/src/lib/noco-jobs/JobsMgr.ts @@ -42,7 +42,9 @@ export default abstract class JobsMgr { } protected async invokeSuccessCbks(jobName: string, payload: any) { - await Promise.all(this.successCbks?.[jobName]?.map(cb => cb(payload))); + await Promise.all( + this.successCbks?.[jobName]?.map(cb => cb(payload)) || [] + ); } protected async invokeFailureCbks( jobName: string, @@ -50,7 +52,7 @@ export default abstract class JobsMgr { error?: Error ) { await Promise.all( - this.failureCbks?.[jobName]?.map(cb => cb(payload, error)) + this.failureCbks?.[jobName]?.map(cb => cb(payload, error)) || [] ); } protected async invokeProgressCbks( @@ -59,7 +61,7 @@ export default abstract class JobsMgr { data?: any ) { await Promise.all( - this.progressCbks?.[jobName]?.map(cb => cb(payload, data)) + this.progressCbks?.[jobName]?.map(cb => cb(payload, data)) || [] ); } } diff --git a/packages/nocodb/src/lib/noco/meta/api/sync/importApis.ts b/packages/nocodb/src/lib/noco/meta/api/sync/importApis.ts index bd9d1c3295..d53952514a 100644 --- a/packages/nocodb/src/lib/noco/meta/api/sync/importApis.ts +++ b/packages/nocodb/src/lib/noco/meta/api/sync/importApis.ts @@ -9,6 +9,7 @@ import SyncSource from '../../../../noco-models/SyncSource'; import Noco from '../../../Noco'; import * as jwt from 'jsonwebtoken'; const AIRTABLE_IMPORT_JOB = 'AIRTABLE_IMPORT_JOB'; +const AIRTABLE_PROGRESS_JOB = 'AIRTABLE_PROGRESS_JOB'; enum SyncStatus { PROGRESS = 'PROGRESS', @@ -17,24 +18,45 @@ enum SyncStatus { } export default (router: Router, clients: { [id: string]: Socket }) => { + // add importer job handler and progress notification job handler NocoJobs.jobsMgr.addJobWorker(AIRTABLE_IMPORT_JOB, job); + NocoJobs.jobsMgr.addJobWorker( + AIRTABLE_PROGRESS_JOB, + ({ payload, progress }) => { + clients?.[payload?.id]?.emit('progress', { + msg: progress?.msg, + level: progress?.level, + status: progress?.status + }); + } + ); + NocoJobs.jobsMgr.addProgressCbk(AIRTABLE_IMPORT_JOB, (payload, progress) => { - clients?.[payload?.id]?.emit('progress', { - msg: progress?.msg, - level: progress?.level, - status: SyncStatus.PROGRESS + NocoJobs.jobsMgr.add(AIRTABLE_PROGRESS_JOB, { + payload, + progress: { + msg: progress?.msg, + level: progress?.level, + status: progress?.status + } }); }); NocoJobs.jobsMgr.addSuccessCbk(AIRTABLE_IMPORT_JOB, payload => { - clients?.[payload?.id]?.emit('progress', { - msg: 'Complete!', - status: SyncStatus.COMPLETED + NocoJobs.jobsMgr.add(AIRTABLE_PROGRESS_JOB, { + payload, + progress: { + msg: 'Complete!', + status: SyncStatus.COMPLETED + } }); }); NocoJobs.jobsMgr.addFailureCbk(AIRTABLE_IMPORT_JOB, (payload, error: any) => { - clients?.[payload?.id]?.emit('progress', { - msg: error?.message || 'Failed due to some internal error', - status: SyncStatus.FAILED + NocoJobs.jobsMgr.add(AIRTABLE_PROGRESS_JOB, { + payload, + progress: { + msg: error?.message || 'Failed due to some internal error', + status: SyncStatus.FAILED + } }); });