Browse Source

feat: use a separate queue and job for progress (#2170)

Signed-off-by: Pranav C <pranavxc@gmail.com>
pull/2175/head
Pranav C 2 years ago committed by GitHub
parent
commit
36859d71e0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 8
      packages/nocodb/src/lib/noco-jobs/JobsMgr.ts
  2. 42
      packages/nocodb/src/lib/noco/meta/api/sync/importApis.ts

8
packages/nocodb/src/lib/noco-jobs/JobsMgr.ts

@ -42,7 +42,9 @@ export default abstract class JobsMgr {
} }
protected async invokeSuccessCbks(jobName: string, payload: any) { protected async invokeSuccessCbks(jobName: string, payload: any) {
await Promise.all(this.successCbks?.[jobName]?.map(cb => cb(payload))); await Promise.all(
this.successCbks?.[jobName]?.map(cb => cb(payload)) || []
);
} }
protected async invokeFailureCbks( protected async invokeFailureCbks(
jobName: string, jobName: string,
@ -50,7 +52,7 @@ export default abstract class JobsMgr {
error?: Error error?: Error
) { ) {
await Promise.all( await Promise.all(
this.failureCbks?.[jobName]?.map(cb => cb(payload, error)) this.failureCbks?.[jobName]?.map(cb => cb(payload, error)) || []
); );
} }
protected async invokeProgressCbks( protected async invokeProgressCbks(
@ -59,7 +61,7 @@ export default abstract class JobsMgr {
data?: any data?: any
) { ) {
await Promise.all( await Promise.all(
this.progressCbks?.[jobName]?.map(cb => cb(payload, data)) this.progressCbks?.[jobName]?.map(cb => cb(payload, data)) || []
); );
} }
} }

42
packages/nocodb/src/lib/noco/meta/api/sync/importApis.ts

@ -9,6 +9,7 @@ import SyncSource from '../../../../noco-models/SyncSource';
import Noco from '../../../Noco'; import Noco from '../../../Noco';
import * as jwt from 'jsonwebtoken'; import * as jwt from 'jsonwebtoken';
const AIRTABLE_IMPORT_JOB = 'AIRTABLE_IMPORT_JOB'; const AIRTABLE_IMPORT_JOB = 'AIRTABLE_IMPORT_JOB';
const AIRTABLE_PROGRESS_JOB = 'AIRTABLE_PROGRESS_JOB';
enum SyncStatus { enum SyncStatus {
PROGRESS = 'PROGRESS', PROGRESS = 'PROGRESS',
@ -17,24 +18,45 @@ enum SyncStatus {
} }
export default (router: Router, clients: { [id: string]: Socket }) => { export default (router: Router, clients: { [id: string]: Socket }) => {
// add importer job handler and progress notification job handler
NocoJobs.jobsMgr.addJobWorker(AIRTABLE_IMPORT_JOB, job); NocoJobs.jobsMgr.addJobWorker(AIRTABLE_IMPORT_JOB, job);
NocoJobs.jobsMgr.addJobWorker(
AIRTABLE_PROGRESS_JOB,
({ payload, progress }) => {
clients?.[payload?.id]?.emit('progress', {
msg: progress?.msg,
level: progress?.level,
status: progress?.status
});
}
);
NocoJobs.jobsMgr.addProgressCbk(AIRTABLE_IMPORT_JOB, (payload, progress) => { NocoJobs.jobsMgr.addProgressCbk(AIRTABLE_IMPORT_JOB, (payload, progress) => {
clients?.[payload?.id]?.emit('progress', { NocoJobs.jobsMgr.add(AIRTABLE_PROGRESS_JOB, {
msg: progress?.msg, payload,
level: progress?.level, progress: {
status: SyncStatus.PROGRESS msg: progress?.msg,
level: progress?.level,
status: progress?.status
}
}); });
}); });
NocoJobs.jobsMgr.addSuccessCbk(AIRTABLE_IMPORT_JOB, payload => { NocoJobs.jobsMgr.addSuccessCbk(AIRTABLE_IMPORT_JOB, payload => {
clients?.[payload?.id]?.emit('progress', { NocoJobs.jobsMgr.add(AIRTABLE_PROGRESS_JOB, {
msg: 'Complete!', payload,
status: SyncStatus.COMPLETED progress: {
msg: 'Complete!',
status: SyncStatus.COMPLETED
}
}); });
}); });
NocoJobs.jobsMgr.addFailureCbk(AIRTABLE_IMPORT_JOB, (payload, error: any) => { NocoJobs.jobsMgr.addFailureCbk(AIRTABLE_IMPORT_JOB, (payload, error: any) => {
clients?.[payload?.id]?.emit('progress', { NocoJobs.jobsMgr.add(AIRTABLE_PROGRESS_JOB, {
msg: error?.message || 'Failed due to some internal error', payload,
status: SyncStatus.FAILED progress: {
msg: error?.message || 'Failed due to some internal error',
status: SyncStatus.FAILED
}
}); });
}); });

Loading…
Cancel
Save