Browse Source

refactor: use enum for job status and job events

Signed-off-by: mertmit <mertmit99@gmail.com>
feat/export-nest
mertmit 2 years ago
parent
commit
545e41f2c5
  1. 5
      packages/nc-gui/components/dashboard/TreeView.vue
  2. 14
      packages/nc-gui/components/dlg/AirtableImport.vue
  3. 2
      packages/nc-gui/components/smartsheet/Grid.vue
  4. 10
      packages/nc-gui/lib/enums.ts
  5. 4
      packages/nc-gui/nuxt-shim.d.ts
  6. 5
      packages/nc-gui/pages/index/index/index.vue
  7. 6
      packages/nc-gui/plugins/jobs.ts
  8. 15
      packages/nocodb/src/interface/Jobs.ts
  9. 22
      packages/nocodb/src/modules/jobs/fallback-queue.service.ts
  10. 16
      packages/nocodb/src/modules/jobs/jobs-event.service.ts
  11. 15
      packages/nocodb/src/modules/jobs/jobs.gateway.ts
  12. 17
      packages/nocodb/src/modules/jobs/jobs.service.ts

5
packages/nc-gui/components/dashboard/TreeView.vue

@ -9,6 +9,7 @@ import type { VNodeRef } from '#imports'
import { import {
ClientType, ClientType,
Empty, Empty,
JobStatus,
TabType, TabType,
computed, computed,
extractSdkResponseErrorMsg, extractSdkResponseErrorMsg,
@ -400,9 +401,9 @@ const duplicateTable = async (table: TableType) => {
'table': table, 'table': table,
'onOk': async (jobData: { name: string; id: string }) => { 'onOk': async (jobData: { name: string; id: string }) => {
$jobs.subscribe({ name: jobData.name, id: jobData.id }, undefined, async (status: string) => { $jobs.subscribe({ name: jobData.name, id: jobData.id }, undefined, async (status: string) => {
if (status === 'completed') { if (status === JobStatus.COMPLETED) {
await loadTables() await loadTables()
} else if (status === 'failed') { } else if (status === JobStatus.FAILED) {
message.error('Failed to duplicate table') message.error('Failed to duplicate table')
await loadTables() await loadTables()
} }

14
packages/nc-gui/components/dlg/AirtableImport.vue

@ -2,6 +2,7 @@
import type { Card as AntCard } from 'ant-design-vue' import type { Card as AntCard } from 'ant-design-vue'
import { import {
Form, Form,
JobStatus,
computed, computed,
extractSdkResponseErrorMsg, extractSdkResponseErrorMsg,
fieldRequiredValidator, fieldRequiredValidator,
@ -67,7 +68,7 @@ const syncSource = ref({
}, },
}) })
const pushProgress = async (message: string, status: 'completed' | 'failed' | 'progress') => { const pushProgress = async (message: string, status: JobStatus | 'progress') => {
progress.value.push({ msg: message, status }) progress.value.push({ msg: message, status })
await nextTick(() => { await nextTick(() => {
@ -81,13 +82,13 @@ const onSubscribe = () => {
step.value = 2 step.value = 2
} }
const onStatus = async (status: 'active' | 'completed' | 'failed' | 'refresh', error?: any) => { const onStatus = async (status: JobStatus, error?: any) => {
if (status === 'completed') { if (status === JobStatus.COMPLETED) {
showGoToDashboardButton.value = true showGoToDashboardButton.value = true
await loadTables() await loadTables()
pushProgress('Done!', status) pushProgress('Done!', status)
// TODO: add tab of the first table // TODO: add tab of the first table
} else if (status === 'failed') { } else if (status === JobStatus.FAILED) {
pushProgress(error, status) pushProgress(error, status)
} }
} }
@ -370,7 +371,7 @@ onMounted(async () => {
<a-card ref="logRef" :body-style="{ backgroundColor: '#000000', height: '400px', overflow: 'auto' }"> <a-card ref="logRef" :body-style="{ backgroundColor: '#000000', height: '400px', overflow: 'auto' }">
<div v-for="({ msg, status }, i) in progress" :key="i"> <div v-for="({ msg, status }, i) in progress" :key="i">
<div v-if="status === 'failed'" class="flex items-center"> <div v-if="status === JobStatus.FAILED" class="flex items-center">
<component :is="iconMap.closeCircle" class="text-red-500" /> <component :is="iconMap.closeCircle" class="text-red-500" />
<span class="text-red-500 ml-2">{{ msg }}</span> <span class="text-red-500 ml-2">{{ msg }}</span>
@ -387,7 +388,8 @@ onMounted(async () => {
v-if=" v-if="
!progress || !progress ||
!progress.length || !progress.length ||
(progress[progress.length - 1].status !== 'completed' && progress[progress.length - 1].status !== 'failed') (progress[progress.length - 1].status !== JobStatus.COMPLETED &&
progress[progress.length - 1].status !== JobStatus.FAILED)
" "
class="flex items-center" class="flex items-center"
> >

2
packages/nc-gui/components/smartsheet/Grid.vue

@ -756,7 +756,7 @@ const closeAddColumnDropdown = () => {
const confirmDeleteRow = (row: number) => { const confirmDeleteRow = (row: number) => {
Modal.confirm({ Modal.confirm({
title: `Do you want to delete this row?`, title: `Do you want to delete this row?`,
wrapClassName: 'nc-modal-attachment-delete', wrapClassName: 'nc-modal-row-delete',
okText: 'Yes', okText: 'Yes',
okType: 'danger', okType: 'danger',
cancelText: 'No', cancelText: 'No',

10
packages/nc-gui/lib/enums.ts

@ -105,3 +105,13 @@ export enum AutomationLogLevel {
ERROR = 'ERROR', ERROR = 'ERROR',
ALL = 'ALL', ALL = 'ALL',
} }
export enum JobStatus {
COMPLETED = 'completed',
WAITING = 'waiting',
ACTIVE = 'active',
DELAYED = 'delayed',
FAILED = 'failed',
PAUSED = 'paused',
REFRESH = 'refresh',
}

4
packages/nc-gui/nuxt-shim.d.ts vendored

@ -1,6 +1,6 @@
import type { Api as BaseAPI } from 'nocodb-sdk' import type { Api as BaseAPI } from 'nocodb-sdk'
import type { UseGlobalReturn } from './composables/useGlobal/types' import type { UseGlobalReturn } from './composables/useGlobal/types'
import type { NocoI18n } from './lib' import type { JobStatus, NocoI18n } from './lib'
import type { TabType } from './composables' import type { TabType } from './composables'
declare module '#app/nuxt' { declare module '#app/nuxt' {
@ -22,7 +22,7 @@ declare module '#app/nuxt' {
} }
| any, | any,
subscribedCb?: () => void, subscribedCb?: () => void,
statusCb?: ((status: 'active' | 'completed' | 'failed' | 'refresh', error?: any) => void) | undefined, statusCb?: ((status: JobStatus, error?: any) => void) | undefined,
logCb?: ((data: { message: string }) => void) | undefined, logCb?: ((data: { message: string }) => void) | undefined,
): void ): void
getStatus(name: string, id: string): Promise<string> getStatus(name: string, id: string): Promise<string>

5
packages/nc-gui/pages/index/index/index.vue

@ -4,6 +4,7 @@ import tinycolor from 'tinycolor2'
import { breakpointsTailwind } from '@vueuse/core' import { breakpointsTailwind } from '@vueuse/core'
import { import {
Empty, Empty,
JobStatus,
Modal, Modal,
computed, computed,
definePageMeta, definePageMeta,
@ -92,9 +93,9 @@ const duplicateProject = (project: ProjectType) => {
await loadProjects() await loadProjects()
$jobs.subscribe({ name: jobData.name, id: jobData.id }, undefined, async (status: string) => { $jobs.subscribe({ name: jobData.name, id: jobData.id }, undefined, async (status: string) => {
if (status === 'completed') { if (status === JobStatus.COMPLETED) {
await loadProjects() await loadProjects()
} else if (status === 'failed') { } else if (status === JobStatus.FAILED) {
message.error('Failed to duplicate project') message.error('Failed to duplicate project')
await loadProjects() await loadProjects()
} }

6
packages/nc-gui/plugins/jobs.ts

@ -1,6 +1,6 @@
import type { Socket } from 'socket.io-client' import type { Socket } from 'socket.io-client'
import io from 'socket.io-client' import io from 'socket.io-client'
import { defineNuxtPlugin, useGlobal, watch } from '#imports' import { JobStatus, defineNuxtPlugin, useGlobal, watch } from '#imports'
export default defineNuxtPlugin(async (nuxtApp) => { export default defineNuxtPlugin(async (nuxtApp) => {
const { appInfo } = $(useGlobal()) const { appInfo } = $(useGlobal())
@ -41,7 +41,7 @@ export default defineNuxtPlugin(async (nuxtApp) => {
subscribe( subscribe(
job: { id: string; name: string } | any, job: { id: string; name: string } | any,
subscribedCb?: () => void, subscribedCb?: () => void,
statusCb?: (status: 'active' | 'completed' | 'failed' | 'refresh', error?: any) => void, statusCb?: (status: JobStatus, error?: any) => void,
logCb?: (data: { message: string }) => void, logCb?: (data: { message: string }) => void,
) { ) {
const logFn = (data: { id: string; name: string; data: { message: string } }) => { const logFn = (data: { id: string; name: string; data: { message: string } }) => {
@ -52,7 +52,7 @@ export default defineNuxtPlugin(async (nuxtApp) => {
const statusFn = (data: any) => { const statusFn = (data: any) => {
if (data.id === job.id) { if (data.id === job.id) {
if (statusCb) statusCb(data.status, data.error) if (statusCb) statusCb(data.status, data.error)
if (data.status === 'completed' || data.status === 'failed') { if (data.status === JobStatus.COMPLETED || data.status === JobStatus.FAILED) {
socket?.off('status', statusFn) socket?.off('status', statusFn)
socket?.off('log', logFn) socket?.off('log', logFn)
} }

15
packages/nocodb/src/interface/Jobs.ts

@ -5,3 +5,18 @@ export enum JobTypes {
DuplicateModel = 'duplicate-model', DuplicateModel = 'duplicate-model',
AtImport = 'at-import', AtImport = 'at-import',
} }
export enum JobStatus {
COMPLETED = 'completed',
WAITING = 'waiting',
ACTIVE = 'active',
DELAYED = 'delayed',
FAILED = 'failed',
PAUSED = 'paused',
REFRESH = 'refresh',
}
export enum JobEvents {
STATUS = 'job.status',
LOG = 'job.log',
}

22
packages/nocodb/src/modules/jobs/fallback-queue.service.ts

@ -1,7 +1,7 @@
import { Injectable } from '@nestjs/common'; import { Injectable } from '@nestjs/common';
import PQueue from 'p-queue'; import PQueue from 'p-queue';
import Emittery from 'emittery'; import Emittery from 'emittery';
import { JobTypes } from '../../interface/Jobs'; import { JobStatus, JobTypes } from '../../interface/Jobs';
import { DuplicateProcessor } from './export-import/duplicate.processor'; import { DuplicateProcessor } from './export-import/duplicate.processor';
import { JobsEventService } from './jobs-event.service'; import { JobsEventService } from './jobs-event.service';
import { AtImportProcessor } from './at-import/at-import.processor'; import { AtImportProcessor } from './at-import/at-import.processor';
@ -26,27 +26,27 @@ export class QueueService {
private readonly duplicateProcessor: DuplicateProcessor, private readonly duplicateProcessor: DuplicateProcessor,
private readonly atImportProcessor: AtImportProcessor, private readonly atImportProcessor: AtImportProcessor,
) { ) {
this.emitter.on('active', (data: any) => { this.emitter.on(JobStatus.ACTIVE, (data: any) => {
const job = this.queueMemory.find( const job = this.queueMemory.find(
(job) => job.id === data.id && job.name === data.name, (job) => job.id === data.id && job.name === data.name,
); );
job.status = 'active'; job.status = JobStatus.ACTIVE;
this.jobsEventService.onActive.apply(this.jobsEventService, [job as any]); this.jobsEventService.onActive.apply(this.jobsEventService, [job as any]);
}); });
this.emitter.on('completed', (data: any) => { this.emitter.on(JobStatus.COMPLETED, (data: any) => {
const job = this.queueMemory.find( const job = this.queueMemory.find(
(job) => job.id === data.id && job.name === data.name, (job) => job.id === data.id && job.name === data.name,
); );
job.status = 'completed'; job.status = JobStatus.COMPLETED;
this.jobsEventService.onCompleted.apply(this.jobsEventService, [ this.jobsEventService.onCompleted.apply(this.jobsEventService, [
data as any, data as any,
]); ]);
}); });
this.emitter.on('failed', (data: { job: Job; error: Error }) => { this.emitter.on(JobStatus.FAILED, (data: { job: Job; error: Error }) => {
const job = this.queueMemory.find( const job = this.queueMemory.find(
(job) => job.id === data.job.id && job.name === data.job.name, (job) => job.id === data.job.id && job.name === data.job.name,
); );
job.status = 'failed'; job.status = JobStatus.FAILED;
this.jobsEventService.onFailed.apply(this.jobsEventService, [ this.jobsEventService.onFailed.apply(this.jobsEventService, [
data.job as any, data.job as any,
data.error, data.error,
@ -70,12 +70,12 @@ export class QueueService {
}; };
async jobWrapper(job: Job) { async jobWrapper(job: Job) {
this.emitter.emit('active', job); this.emitter.emit(JobStatus.ACTIVE, job);
try { try {
await this.jobMap[job.name].fn.apply(this.jobMap[job.name].this, [job]); await this.jobMap[job.name].fn.apply(this.jobMap[job.name].this, [job]);
this.emitter.emit('completed', job); this.emitter.emit(JobStatus.COMPLETED, job);
} catch (error) { } catch (error) {
this.emitter.emit('failed', { job, error }); this.emitter.emit(JobStatus.FAILED, { job, error });
} }
} }
@ -101,7 +101,7 @@ export class QueueService {
async add(name: string, data: any) { async add(name: string, data: any) {
const id = `${this.queueIndex++}`; const id = `${this.queueIndex++}`;
const job = { id: `${id}`, name, status: 'waiting', data }; const job = { id: `${id}`, name, status: JobStatus.WAITING, data };
this.queueMemory.push(job); this.queueMemory.push(job);
this.queue.add(() => this.jobWrapper(job)); this.queue.add(() => this.jobWrapper(job));
return { id, name }; return { id, name };

16
packages/nocodb/src/modules/jobs/jobs-event.service.ts

@ -7,7 +7,7 @@ import {
import { Job } from 'bull'; import { Job } from 'bull';
import boxen from 'boxen'; import boxen from 'boxen';
import { EventEmitter2 } from '@nestjs/event-emitter'; import { EventEmitter2 } from '@nestjs/event-emitter';
import { JOBS_QUEUE } from '../../interface/Jobs'; import { JobEvents, JOBS_QUEUE, JobStatus } from '../../interface/Jobs';
@Processor(JOBS_QUEUE) @Processor(JOBS_QUEUE)
export class JobsEventService { export class JobsEventService {
@ -15,10 +15,10 @@ export class JobsEventService {
@OnQueueActive() @OnQueueActive()
onActive(job: Job) { onActive(job: Job) {
this.eventEmitter.emit('job.status', { this.eventEmitter.emit(JobEvents.STATUS, {
name: job.name, name: job.name,
id: job.id.toString(), id: job.id.toString(),
status: 'active', status: JobStatus.ACTIVE,
}); });
} }
@ -35,25 +35,25 @@ export class JobsEventService {
), ),
); );
this.eventEmitter.emit('job.status', { this.eventEmitter.emit(JobEvents.STATUS, {
name: job.name, name: job.name,
id: job.id.toString(), id: job.id.toString(),
status: 'failed', status: JobStatus.FAILED,
error: error?.message, error: error?.message,
}); });
} }
@OnQueueCompleted() @OnQueueCompleted()
onCompleted(job: Job) { onCompleted(job: Job) {
this.eventEmitter.emit('job.status', { this.eventEmitter.emit(JobEvents.STATUS, {
name: job.name, name: job.name,
id: job.id.toString(), id: job.id.toString(),
status: 'completed', status: JobStatus.COMPLETED,
}); });
} }
sendLog(job: Job, data: { message: string }) { sendLog(job: Job, data: { message: string }) {
this.eventEmitter.emit('job.log', { this.eventEmitter.emit(JobEvents.LOG, {
name: job.name, name: job.name,
id: job.id.toString(), id: job.id.toString(),
data, data,

15
packages/nocodb/src/modules/jobs/jobs.gateway.ts

@ -9,7 +9,9 @@ import { Server, Socket } from 'socket.io';
import { ExecutionContextHost } from '@nestjs/core/helpers/execution-context-host'; import { ExecutionContextHost } from '@nestjs/core/helpers/execution-context-host';
import { AuthGuard } from '@nestjs/passport'; import { AuthGuard } from '@nestjs/passport';
import { OnEvent } from '@nestjs/event-emitter'; import { OnEvent } from '@nestjs/event-emitter';
import { JobEvents } from '../../interface/Jobs';
import { JobsService } from './jobs.service'; import { JobsService } from './jobs.service';
import type { JobStatus } from '../../interface/Jobs';
import type { OnModuleInit } from '@nestjs/common'; import type { OnModuleInit } from '@nestjs/common';
@WebSocketGateway({ @WebSocketGateway({
@ -89,18 +91,11 @@ export class JobsGateway implements OnModuleInit {
}); });
} }
@OnEvent('job.status') @OnEvent(JobEvents.STATUS)
async sendJobStatus(data: { async sendJobStatus(data: {
name: string; name: string;
id: string; id: string;
status: status: JobStatus;
| 'completed'
| 'waiting'
| 'active'
| 'delayed'
| 'failed'
| 'paused'
| 'refresh';
error?: any; error?: any;
}): Promise<void> { }): Promise<void> {
this.server.to(`${data.name}-${data.id}`).emit('status', { this.server.to(`${data.name}-${data.id}`).emit('status', {
@ -111,7 +106,7 @@ export class JobsGateway implements OnModuleInit {
}); });
} }
@OnEvent('job.log') @OnEvent(JobEvents.LOG)
async sendJobLog(data: { async sendJobLog(data: {
name: string; name: string;
id: string; id: string;

17
packages/nocodb/src/modules/jobs/jobs.service.ts

@ -1,7 +1,7 @@
import { InjectQueue } from '@nestjs/bull'; import { InjectQueue } from '@nestjs/bull';
import { Injectable } from '@nestjs/common'; import { Injectable } from '@nestjs/common';
import { Queue } from 'bull'; import { Queue } from 'bull';
import { JOBS_QUEUE } from '../../interface/Jobs'; import { JOBS_QUEUE, JobStatus } from '../../interface/Jobs';
import { QueueService } from './fallback-queue.service'; import { QueueService } from './fallback-queue.service';
@Injectable() @Injectable()
@ -22,18 +22,23 @@ export class JobsService {
async jobList(jobType: string) { async jobList(jobType: string) {
return ( return (
await this.activeQueue.getJobs(['active', 'waiting', 'delayed', 'paused']) await this.activeQueue.getJobs([
JobStatus.ACTIVE,
JobStatus.WAITING,
JobStatus.DELAYED,
JobStatus.PAUSED,
])
).filter((j) => j.name === jobType); ).filter((j) => j.name === jobType);
} }
async getJobWithData(data: any) { async getJobWithData(data: any) {
const jobs = await this.activeQueue.getJobs([ const jobs = await this.activeQueue.getJobs([
// 'completed', // 'completed',
'waiting', JobStatus.WAITING,
'active', JobStatus.ACTIVE,
'delayed', JobStatus.DELAYED,
// 'failed', // 'failed',
'paused', JobStatus.PAUSED,
]); ]);
const job = jobs.find((j) => { const job = jobs.find((j) => {

Loading…
Cancel
Save