Browse Source

feat: solely use id for jobs

Signed-off-by: mertmit <mertmit99@gmail.com>
pull/5711/head
mertmit 1 year ago
parent
commit
7c4dfc99cc
  1. 4
      packages/nc-gui/components/dashboard/TreeView.vue
  2. 1
      packages/nc-gui/nuxt-shim.d.ts
  3. 4
      packages/nc-gui/pages/index/index/index.vue
  4. 17
      packages/nc-gui/plugins/jobs.ts
  5. 6
      packages/nocodb/src/modules/jobs/at-import/at-import.controller.ts
  6. 4
      packages/nocodb/src/modules/jobs/export-import/duplicate.controller.ts
  7. 16
      packages/nocodb/src/modules/jobs/fallback-queue.service.ts
  8. 6
      packages/nocodb/src/modules/jobs/jobs-event.service.ts
  9. 42
      packages/nocodb/src/modules/jobs/jobs.gateway.ts
  10. 16
      packages/nocodb/src/modules/jobs/jobs.service.ts

4
packages/nc-gui/components/dashboard/TreeView.vue

@ -399,8 +399,8 @@ const duplicateTable = async (table: TableType) => {
const { close } = useDialog(resolveComponent('DlgTableDuplicate'), {
'modelValue': isOpen,
'table': table,
'onOk': async (jobData: { name: string; id: string }) => {
$jobs.subscribe({ name: jobData.name, id: jobData.id }, undefined, async (status: string, data?: any) => {
'onOk': async (jobData: { id: string }) => {
$jobs.subscribe({ id: jobData.id }, undefined, async (status: string, data?: any) => {
if (status === JobStatus.COMPLETED) {
await loadTables()
const newTable = tables.value.find((el) => el.id === data?.result?.id)

1
packages/nc-gui/nuxt-shim.d.ts vendored

@ -18,7 +18,6 @@ declare module '#app/nuxt' {
job:
| {
id: string
name: string
}
| any,
subscribedCb?: () => void,

4
packages/nc-gui/pages/index/index/index.vue

@ -90,10 +90,10 @@ const duplicateProject = (project: ProjectType) => {
const { close } = useDialog(resolveComponent('DlgProjectDuplicate'), {
'modelValue': isOpen,
'project': project,
'onOk': async (jobData: { name: string; id: string }) => {
'onOk': async (jobData: { id: string }) => {
await loadProjects()
$jobs.subscribe({ name: jobData.name, id: jobData.id }, undefined, async (status: string) => {
$jobs.subscribe({ id: jobData.id }, undefined, async (status: string) => {
if (status === JobStatus.COMPLETED) {
await loadProjects()
} else if (status === JobStatus.FAILED) {

17
packages/nc-gui/plugins/jobs.ts

@ -29,22 +29,22 @@ export default defineNuxtPlugin(async (nuxtApp) => {
await init(nuxtApp.$state.token.value)
}
const send = (name: string, data: any) => {
const send = (evt: string, data: any) => {
if (socket) {
const _id = messageIndex++
socket.emit(name, { _id, data })
socket.emit(evt, { _id, data })
return _id
}
}
const jobs = {
subscribe(
job: { id: string; name: string } | any,
job: { id: string } | any,
subscribedCb?: () => void,
statusCb?: (status: JobStatus, data?: any) => void,
logCb?: (data: { message: string }) => void,
) {
const logFn = (data: { id: string; name: string; data: { message: string } }) => {
const logFn = (data: { id: string; data: { message: string } }) => {
if (data.id === job.id) {
if (logCb) logCb(data.data)
}
@ -61,11 +61,10 @@ export default defineNuxtPlugin(async (nuxtApp) => {
const _id = send('subscribe', job)
const subscribeFn = (data: { _id: number; name: string; id: string }) => {
const subscribeFn = (data: { _id: number; id: string }) => {
if (data._id === _id) {
if (data.id !== job.id || data.name !== job.name) {
if (data.id !== job.id) {
job.id = data.id
job.name = data.name
}
if (subscribedCb) subscribedCb()
socket?.on('log', logFn)
@ -75,10 +74,10 @@ export default defineNuxtPlugin(async (nuxtApp) => {
}
socket?.on('subscribed', subscribeFn)
},
getStatus(name: string, id: string): Promise<string> {
getStatus(id: string): Promise<string> {
return new Promise((resolve) => {
if (socket) {
const _id = send('status', { name, id })
const _id = send('status', { id })
const tempFn = (data: any) => {
if (data._id === _id) {
resolve(data.status)

6
packages/nocodb/src/modules/jobs/at-import/at-import.controller.ts

@ -18,13 +18,13 @@ export class AtImportController {
...req.body,
});
return { id: job.id, name: job.name };
return { id: job.id };
}
@Post('/api/v1/db/meta/syncs/:syncId/trigger')
@HttpCode(200)
async triggerSync(@Request() req) {
const jobs = await this.jobsService.jobList(JobTypes.AtImport);
const jobs = await this.jobsService.jobList();
const fnd = jobs.find((j) => j.data.syncId === req.params.syncId);
if (fnd) {
@ -54,7 +54,7 @@ export class AtImportController {
user: user,
});
return { id: job.id, name: job.name };
return { id: job.id };
}
@Post('/api/v1/db/meta/syncs/:syncId/abort')

4
packages/nocodb/src/modules/jobs/export-import/duplicate.controller.ts

@ -78,7 +78,7 @@ export class DuplicateController {
},
});
return { id: job.id, name: job.name };
return { id: job.id };
}
@Post('/api/v1/db/meta/duplicate/:projectId/table/:modelId')
@ -128,6 +128,6 @@ export class DuplicateController {
},
});
return { id: job.id, name: job.name };
return { id: job.id };
}
}

16
packages/nocodb/src/modules/jobs/fallback-queue.service.ts

@ -27,16 +27,12 @@ export class QueueService {
private readonly atImportProcessor: AtImportProcessor,
) {
this.emitter.on(JobStatus.ACTIVE, (data: { job: Job }) => {
const job = this.queueMemory.find(
(job) => job.id === data.job.id && job.name === data.job.name,
);
const job = this.queueMemory.find((job) => job.id === data.job.id);
job.status = JobStatus.ACTIVE;
this.jobsEventService.onActive.apply(this.jobsEventService, [job as any]);
});
this.emitter.on(JobStatus.COMPLETED, (data: { job: Job; result: any }) => {
const job = this.queueMemory.find(
(job) => job.id === data.job.id && job.name === data.job.name,
);
const job = this.queueMemory.find((job) => job.id === data.job.id);
job.status = JobStatus.COMPLETED;
this.jobsEventService.onCompleted.apply(this.jobsEventService, [
job,
@ -46,9 +42,7 @@ export class QueueService {
this.removeJob(job);
});
this.emitter.on(JobStatus.FAILED, (data: { job: Job; error: Error }) => {
const job = this.queueMemory.find(
(job) => job.id === data.job.id && job.name === data.job.name,
);
const job = this.queueMemory.find((job) => job.id === data.job.id);
job.status = JobStatus.FAILED;
this.jobsEventService.onFailed.apply(this.jobsEventService, [
job,
@ -126,9 +120,7 @@ export class QueueService {
// remove job from memory
private removeJob(job: Job) {
const fIndex = this.queueMemory.findIndex(
(q) => q.id === job.id && q.name === job.name,
);
const fIndex = this.queueMemory.findIndex((q) => q.id === job.id);
if (fIndex) {
this.queueMemory.splice(fIndex, 1);
}

6
packages/nocodb/src/modules/jobs/jobs-event.service.ts

@ -16,7 +16,6 @@ export class JobsEventService {
@OnQueueActive()
onActive(job: Job) {
this.eventEmitter.emit(JobEvents.STATUS, {
name: job.name,
id: job.id.toString(),
status: JobStatus.ACTIVE,
});
@ -26,7 +25,7 @@ export class JobsEventService {
onFailed(job: Job, error: Error) {
console.error(
boxen(
`---- !! JOB FAILED !! ----\nname: ${job.name}\nid:${job.id}\nerror:${error.name} (${error.message})\n\nstack: ${error.stack}`,
`---- !! JOB FAILED !! ----\nid:${job.id}\nerror:${error.name} (${error.message})\n\nstack: ${error.stack}`,
{
padding: 1,
borderStyle: 'double',
@ -36,7 +35,6 @@ export class JobsEventService {
);
this.eventEmitter.emit(JobEvents.STATUS, {
name: job.name,
id: job.id.toString(),
status: JobStatus.FAILED,
data: {
@ -50,7 +48,6 @@ export class JobsEventService {
@OnQueueCompleted()
onCompleted(job: Job, data: any) {
this.eventEmitter.emit(JobEvents.STATUS, {
name: job.name,
id: job.id.toString(),
status: JobStatus.COMPLETED,
data: {
@ -61,7 +58,6 @@ export class JobsEventService {
sendLog(job: Job, data: { message: string }) {
this.eventEmitter.emit(JobEvents.LOG, {
name: job.name,
id: job.id.toString(),
data,
});

42
packages/nocodb/src/modules/jobs/jobs.gateway.ts

@ -43,34 +43,28 @@ export class JobsGateway implements OnModuleInit {
@SubscribeMessage('subscribe')
async subscribe(
@MessageBody()
body: { _id: number; data: { id: string; name: string } | any },
body: { _id: number; data: { id: string } | any },
@ConnectedSocket() client: Socket,
): Promise<void> {
const { _id, data } = body;
if (
Object.keys(data).every((k) => ['name', 'id'].includes(k)) &&
data?.name &&
data?.id
) {
const rooms = (await this.jobsService.jobList(data.name)).map(
(j) => `${j.name}-${j.id}`,
if (Object.keys(data).every((k) => ['id'].includes(k)) && data?.id) {
const rooms = (await this.jobsService.jobList()).map(
(j) => `jobs-${j.id}`,
);
const room = rooms.find((r) => r === `${data.name}-${data.id}`);
const room = rooms.find((r) => r === `jobs-${data.id}`);
if (room) {
client.join(`${data.name}-${data.id}`);
client.join(`jobs-${data.id}`);
client.emit('subscribed', {
_id,
name: data.name,
id: data.id,
});
}
} else {
const job = await this.jobsService.getJobWithData(data);
if (job) {
client.join(`${job.name}-${job.id}`);
client.join(`jobs-${job.id}`);
client.emit('subscribed', {
_id,
name: job.name,
id: job.id,
});
}
@ -79,42 +73,30 @@ export class JobsGateway implements OnModuleInit {
@SubscribeMessage('status')
async status(
@MessageBody() body: { _id: number; data: { id: string; name: string } },
@MessageBody() body: { _id: number; data: { id: string } },
@ConnectedSocket() client: Socket,
): Promise<void> {
const { _id, data } = body;
client.emit('status', {
_id,
id: data.id,
name: data.name,
status: await this.jobsService.jobStatus(data.id),
});
}
@OnEvent(JobEvents.STATUS)
async sendJobStatus(data: {
name: string;
id: string;
status: JobStatus;
data?: any;
}): Promise<void> {
this.server.to(`${data.name}-${data.id}`).emit('status', {
sendJobStatus(data: { id: string; status: JobStatus; data?: any }): void {
this.server.to(`jobs-${data.id}`).emit('status', {
id: data.id,
name: data.name,
status: data.status,
data: data.data,
});
}
@OnEvent(JobEvents.LOG)
async sendJobLog(data: {
name: string;
id: string;
data: { message: string };
}): Promise<void> {
this.server.to(`${data.name}-${data.id}`).emit('log', {
sendJobLog(data: { id: string; data: { message: string } }): void {
this.server.to(`jobs-${data.id}`).emit('log', {
id: data.id,
name: data.name,
data: data.data,
});
}

16
packages/nocodb/src/modules/jobs/jobs.service.ts

@ -28,15 +28,13 @@ export class JobsService {
return await (await this.activeQueue.getJob(jobId)).getState();
}
async jobList(jobType: string) {
return (
await this.activeQueue.getJobs([
JobStatus.ACTIVE,
JobStatus.WAITING,
JobStatus.DELAYED,
JobStatus.PAUSED,
])
).filter((j) => j.name === jobType);
async jobList() {
return await this.activeQueue.getJobs([
JobStatus.ACTIVE,
JobStatus.WAITING,
JobStatus.DELAYED,
JobStatus.PAUSED,
]);
}
async getJobWithData(data: any) {

Loading…
Cancel
Save