Browse Source

feat: long polling instead of socket for jobs

pull/6528/head
mertmit 1 year ago
parent
commit
8e5bd56eab
  1. 52
      packages/nc-gui/components/dashboard/TreeView/ProjectNode.vue
  2. 43
      packages/nc-gui/components/dashboard/TreeView/index.vue
  3. 43
      packages/nc-gui/components/dashboard/settings/Metadata.vue
  4. 49
      packages/nc-gui/components/dashboard/settings/data-sources/CreateBase.vue
  5. 60
      packages/nc-gui/components/dlg/AirtableImport.vue
  6. 35
      packages/nc-gui/components/workspace/ProjectList.vue
  7. 26
      packages/nc-gui/nuxt-shim.d.ts
  8. 102
      packages/nc-gui/plugins/jobs.ts
  9. 92
      packages/nc-gui/plugins/poller.ts
  10. 47
      packages/nocodb/src/Noco.ts
  11. 2
      packages/nocodb/src/modules/jobs/fallback/jobs.service.ts
  12. 266
      packages/nocodb/src/modules/jobs/jobs.controller.ts
  13. 110
      packages/nocodb/src/modules/jobs/jobs.gateway.ts
  14. 8
      packages/nocodb/src/modules/jobs/jobs.module.ts
  15. 91
      packages/nocodb/src/modules/jobs/redis/jobs-event.service.ts
  16. 31
      packages/nocodb/src/modules/jobs/redis/jobs.service.ts
  17. 48
      packages/nocodb/src/schema/swagger.json
  18. 1
      packages/nocodb/src/utils/globals.ts

52
packages/nc-gui/components/dashboard/TreeView/ProjectNode.vue

@ -344,30 +344,44 @@ const duplicateProject = (project: ProjectType) => {
selectedProjectToDuplicate.value = project
isDuplicateDlgOpen.value = true
}
const { $jobs } = useNuxtApp()
const { $poller } = useNuxtApp()
const DlgProjectDuplicateOnOk = async (jobData: { id: string; project_id: string }) => {
await loadProjects('workspace')
$jobs.subscribe({ id: jobData.id }, undefined, async (status: string) => {
if (status === JobStatus.COMPLETED) {
await loadProjects('workspace')
const project = projects.value.get(jobData.project_id)
// open project after duplication
if (project) {
await navigateToProject({
projectId: project.id,
type: project.type,
})
$poller.subscribe(
{ id: jobData.id },
async (data: {
id: string
status?: string
data?: {
error?: {
message: string
}
message?: string
result?: any
}
} else if (status === JobStatus.FAILED) {
message.error('Failed to duplicate project')
await loadProjects('workspace')
}
})
}) => {
if (data.status !== 'close') {
if (data.status === JobStatus.COMPLETED) {
await loadProjects('workspace')
const project = projects.value.get(jobData.project_id)
// open project after duplication
if (project) {
await navigateToProject({
projectId: project.id,
type: project.type,
})
}
} else if (data.status === JobStatus.FAILED) {
message.error('Failed to duplicate project')
await loadProjects('workspace')
}
}
},
)
$e('a:project:duplicate')
}
</script>

43
packages/nc-gui/components/dashboard/TreeView/index.vue

@ -31,7 +31,7 @@ const { isUIAllowed } = useRoles()
const { addTab } = useTabs()
const { $e, $jobs } = useNuxtApp()
const { $e, $poller } = useNuxtApp()
const router = useRouter()
@ -119,19 +119,34 @@ const duplicateTable = async (table: TableType) => {
'modelValue': isOpen,
'table': table,
'onOk': async (jobData: { id: string }) => {
$jobs.subscribe({ id: jobData.id }, undefined, async (status: string, data?: any) => {
if (status === JobStatus.COMPLETED) {
await loadTables()
refreshCommandPalette()
const newTable = tables.value.find((el) => el.id === data?.result?.id)
if (newTable) addTab({ title: newTable.title, id: newTable.id, type: newTable.type as TabType })
openTable(newTable!)
} else if (status === JobStatus.FAILED) {
message.error(t('msg.error.failedToDuplicateTable'))
await loadTables()
}
})
$poller.subscribe(
{ id: jobData.id },
async (data: {
id: string
status?: string
data?: {
error?: {
message: string
}
message?: string
result?: any
}
}) => {
if (data.status !== 'close') {
if (data.status === JobStatus.COMPLETED) {
await loadTables()
refreshCommandPalette()
const newTable = tables.value.find((el) => el.id === data?.data?.result?.id)
if (newTable) addTab({ title: newTable.title, id: newTable.id, type: newTable.type as TabType })
openTable(newTable!)
} else if (data.status === JobStatus.FAILED) {
message.error(t('msg.error.failedToDuplicateTable'))
await loadTables()
}
}
},
)
$e('a:table:duplicate')
},

43
packages/nc-gui/components/dashboard/settings/Metadata.vue

@ -41,7 +41,7 @@ async function loadMetaDiff() {
}
}
const { $jobs } = useNuxtApp()
const { $poller } = useNuxtApp()
async function syncMetaDiff() {
try {
@ -50,19 +50,34 @@ async function syncMetaDiff() {
isLoading.value = true
const jobData = await $api.base.metaDiffSync(project.value?.id, props.baseId)
$jobs.subscribe({ id: jobData.id }, undefined, async (status: string) => {
if (status === JobStatus.COMPLETED) {
// Table metadata recreated successfully
message.info(t('msg.info.metaDataRecreated'))
await loadTables()
await loadMetaDiff()
emit('baseSynced')
isLoading.value = false
} else if (status === JobStatus.FAILED) {
message.error('Failed to sync base metadata')
isLoading.value = false
}
})
$poller.subscribe(
{ id: jobData.id },
async (data: {
id: string
status?: string
data?: {
error?: {
message: string
}
message?: string
result?: any
}
}) => {
if (data.status !== 'close') {
if (data.status === JobStatus.COMPLETED) {
// Table metadata recreated successfully
message.info(t('msg.info.metaDataRecreated'))
await loadTables()
await loadMetaDiff()
emit('baseSynced')
isLoading.value = false
} else if (status === JobStatus.FAILED) {
message.error('Failed to sync base metadata')
isLoading.value = false
}
}
},
)
} catch (e: any) {
message.error(await extractSdkResponseErrorMsg(e))
}

49
packages/nc-gui/components/dashboard/settings/data-sources/CreateBase.vue

@ -230,7 +230,7 @@ const focusInvalidInput = () => {
form.value?.$el.querySelector('.ant-form-item-explain-error')?.parentNode?.parentNode?.querySelector('input')?.focus()
}
const { $jobs } = useNuxtApp()
const { $poller } = useNuxtApp()
const createBase = async () => {
try {
@ -257,23 +257,38 @@ const createBase = async () => {
inflection_table: formState.value.inflection.inflectionTable,
})
$jobs.subscribe({ id: jobData.id }, undefined, async (status: string) => {
if (status === JobStatus.COMPLETED) {
$e('a:base:create:extdb')
if (projectId.value) {
await loadProject(projectId.value, true)
await loadProjectTables(projectId.value, true)
$poller.subscribe(
{ id: jobData.id },
async (data: {
id: string
status?: string
data?: {
error?: {
message: string
}
message?: string
result?: any
}
emit('baseCreated')
emit('close')
} else if (status === JobStatus.FAILED) {
message.error('Failed to create base')
}
creatingBase.value = false
})
}) => {
if (data.status !== 'close') {
if (data.status === JobStatus.COMPLETED) {
$e('a:base:create:extdb')
if (projectId.value) {
await loadProject(projectId.value, true)
await loadProjectTables(projectId.value, true)
}
emit('baseCreated')
emit('close')
creatingBase.value = false
} else if (status === JobStatus.FAILED) {
message.error('Failed to create base')
creatingBase.value = false
}
}
},
)
} catch (e: any) {
message.error(await extractSdkResponseErrorMsg(e))
}

60
packages/nc-gui/components/dlg/AirtableImport.vue

@ -28,7 +28,7 @@ const { $api } = useNuxtApp()
const baseURL = $api.instance.defaults.baseURL
const { $state, $jobs } = useNuxtApp()
const { $state, $poller } = useNuxtApp()
const projectStore = useProject()
@ -48,6 +48,10 @@ const logRef = ref<typeof AntCard>()
const enableAbort = ref(false)
const goBack = ref(false)
const listeningForUpdates = ref(false)
const syncSource = ref({
id: '',
type: 'Airtable',
@ -81,10 +85,6 @@ const pushProgress = async (message: string, status: JobStatus | 'progress') =>
})
}
const onSubscribe = () => {
step.value = 2
}
const onStatus = async (status: JobStatus, data?: any) => {
if (status === JobStatus.COMPLETED) {
showGoToDashboardButton.value = true
@ -93,6 +93,7 @@ const onStatus = async (status: JobStatus, data?: any) => {
refreshCommandPalette()
// TODO: add tab of the first table
} else if (status === JobStatus.FAILED) {
goBack.value = true
pushProgress(data.error.message, status)
}
}
@ -146,6 +147,45 @@ async function createOrUpdate() {
}
}
async function listenForUpdates() {
if (listeningForUpdates.value) return
listeningForUpdates.value = true
const job = await $api.jobs.status({ syncId: syncSource.value.id })
if (!job) {
listeningForUpdates.value = false
return
}
$poller.subscribe(
{ id: job.id },
(data: {
id: string
status?: string
data?: {
error?: {
message: string
}
message?: string
result?: any
}
}) => {
if (data.status !== 'close') {
step.value = 2
if (data.status) {
onStatus(data.status as JobStatus, data.data)
} else {
onLog(data.data as any)
}
} else {
listeningForUpdates.value = false
}
},
)
}
async function loadSyncSrc() {
const data: any = await $fetch(`/api/v1/db/meta/projects/${project.value.id}/syncs/${baseId}`, {
baseURL,
@ -160,7 +200,7 @@ async function loadSyncSrc() {
syncSource.value = migrateSync(srcs[0])
syncSource.value.details.syncSourceUrlOrId =
srcs[0].details.appId && srcs[0].details.appId.length > 0 ? srcs[0].details.syncSourceUrlOrId : srcs[0].details.shareId
$jobs.subscribe({ syncId: syncSource.value.id }, onSubscribe, onStatus, onLog)
listenForUpdates()
} else {
syncSource.value = {
id: '',
@ -194,7 +234,7 @@ async function sync() {
method: 'POST',
headers: { 'xc-auth': $state.token.value as string },
})
$jobs.subscribe({ syncId: syncSource.value.id }, onSubscribe, onStatus, onLog)
listenForUpdates()
} catch (e: any) {
message.error(await extractSdkResponseErrorMsg(e))
}
@ -252,9 +292,8 @@ watch(
onMounted(async () => {
if (syncSource.value.id) {
$jobs.subscribe({ syncId: syncSource.value.id }, onSubscribe, onStatus, onLog)
listenForUpdates()
}
await loadSyncSrc()
})
</script>
@ -420,6 +459,9 @@ onMounted(async () => {
<a-button v-if="showGoToDashboardButton" class="mt-4" size="large" @click="dialogShow = false">
{{ $t('labels.goToDashboard') }}
</a-button>
<a-button v-else-if="goBack" class="mt-4 uppercase" size="large" danger @click="step = 1">{{
$t('general.cancel')
}}</a-button>
<a-button v-else-if="enableAbort" class="mt-4 uppercase" size="large" danger @click="abort()">{{
$t('general.abort')
}}</a-button>

35
packages/nc-gui/components/workspace/ProjectList.vue

@ -18,7 +18,7 @@ const { navigateToProject } = useGlobal()
// const filteredProjects = computed(() => projects.value?.filter((p) => !p.deleted) || [])
const { $e, $jobs } = useNuxtApp()
const { $e, $poller } = useNuxtApp()
const { isUIAllowed } = useRoles()
@ -145,15 +145,30 @@ const selectedProjectToDuplicate = ref()
const DlgProjectDuplicateOnOk = async (jobData: { id: string }) => {
await loadProjects('workspace')
$jobs.subscribe({ id: jobData.id }, undefined, async (status: string) => {
if (status === JobStatus.COMPLETED) {
await loadProjects('workspace')
refreshCommandPalette()
} else if (status === JobStatus.FAILED) {
message.error('Failed to duplicate project')
await loadProjects('workspace')
}
})
$poller.subscribe(
{ id: jobData.id },
async (data: {
id: string
status?: string
data?: {
error?: {
message: string
}
message?: string
result?: any
}
}) => {
if (data.status !== 'close') {
if (data.status === JobStatus.COMPLETED) {
await loadProjects('workspace')
refreshCommandPalette()
} else if (data.status === JobStatus.FAILED) {
message.error('Failed to duplicate project')
await loadProjects('workspace')
}
}
},
)
$e('a:project:duplicate')
}

26
packages/nc-gui/nuxt-shim.d.ts vendored

@ -1,6 +1,6 @@
import type { Api as BaseAPI } from 'nocodb-sdk'
import type { UseGlobalReturn } from './composables/useGlobal/types'
import type { JobStatus, NocoI18n } from './lib'
import type { NocoI18n } from './lib'
import type { TabType } from './composables'
declare module '#app/nuxt' {
@ -13,18 +13,22 @@ declare module '#app/nuxt' {
/** {@link import('./plugins/tele') Telemetry} Emit telemetry event */
$e: (event: string, data?: any) => void
$state: UseGlobalReturn
$jobs: {
$poller: {
subscribe(
job:
| {
id: string
topic: { id: string },
cb: (data: {
id: string
status?: string
data?: {
error?: {
message: string
}
| any,
subscribedCb?: () => void,
statusCb?: ((status: JobStatus, error?: any) => void) | undefined,
logCb?: ((data: { message: string }) => void) | undefined,
): void
getStatus(name: string, id: string): Promise<string>
message?: string
result?: any
}
}) => void,
_mid = 0,
): Promise<void>
}
}
}

102
packages/nc-gui/plugins/jobs.ts

@ -1,102 +0,0 @@
import type { Socket } from 'socket.io-client'
import io from 'socket.io-client'
import { JobStatus, defineNuxtPlugin, useGlobal, watch } from '#imports'
export default defineNuxtPlugin(async (nuxtApp) => {
const { appInfo } = useGlobal()
let socket: Socket | null = null
let messageIndex = 0
const init = async (token: string) => {
try {
if (socket) socket.disconnect()
const url = new URL(appInfo.value.ncSiteUrl, window.location.href.split(/[?#]/)[0])
let socketPath = url.pathname
socketPath += socketPath.endsWith('/') ? 'socket.io' : '/socket.io'
socket = io(`${url.href}jobs`, {
extraHeaders: { 'xc-auth': token },
path: socketPath,
})
socket.on('connect_error', (e) => {
console.error(e)
socket?.disconnect()
})
} catch {}
}
if (nuxtApp.$state.signedIn.value) {
await init(nuxtApp.$state.token.value)
}
const send = (evt: string, data: any) => {
if (socket) {
const _id = messageIndex++
socket.emit(evt, { _id, data })
return _id
}
}
const jobs = {
subscribe(
job: { id: string } | any,
subscribedCb?: () => void,
statusCb?: (status: JobStatus, data?: any) => void,
logCb?: (data: { message: string }) => void,
) {
const logFn = (data: { id: string; data: { message: string } }) => {
if (data.id === job.id) {
if (logCb) logCb(data.data)
}
}
const statusFn = (data: any) => {
if (data.id === job.id) {
if (statusCb) statusCb(data.status, data.data)
if (data.status === JobStatus.COMPLETED || data.status === JobStatus.FAILED) {
socket?.off('status', statusFn)
socket?.off('log', logFn)
}
}
}
const _id = send('subscribe', job)
const subscribeFn = (data: { _id: number; id: string }) => {
if (data._id === _id) {
if (data.id !== job.id) {
job.id = data.id
}
if (subscribedCb) subscribedCb()
socket?.on('log', logFn)
socket?.on('status', statusFn)
socket?.off('subscribed', subscribeFn)
}
}
socket?.on('subscribed', subscribeFn)
},
getStatus(id: string): Promise<string> {
return new Promise((resolve) => {
if (socket) {
const _id = send('status', { id })
const tempFn = (data: any) => {
if (data._id === _id) {
resolve(data.status)
socket?.off('status', tempFn)
}
}
socket.on('status', tempFn)
}
})
},
}
watch((nuxtApp.$state as ReturnType<typeof useGlobal>).token, (newToken, oldToken) => {
if (newToken && newToken !== oldToken) init(newToken)
else if (!newToken) socket?.disconnect()
})
nuxtApp.provide('jobs', jobs)
})

92
packages/nc-gui/plugins/poller.ts

@ -0,0 +1,92 @@
import type { Api as BaseAPI } from 'nocodb-sdk'
import { defineNuxtPlugin } from '#imports'
export default defineNuxtPlugin(async (nuxtApp) => {
const api: BaseAPI<any> = nuxtApp.$api as any
// unsubscribe all if signed out
let unsub = false
const subscribe = async (
topic: { id: string } | any,
cb: (data: {
id: string
status?: string
data?: {
error?: {
message: string
}
message?: string
result?: any
}
}) => void,
_mid = 0,
) => {
if (unsub) return
try {
const response:
| {
_mid: number
id: string
status: 'refresh' | 'update' | 'close'
data: any
}
| {
_mid: number
id: string
status: 'refresh' | 'update' | 'close'
data: any
}[] = await api.jobs.listen({ _mid, data: topic })
if (Array.isArray(response)) {
let lastMid = 0
for (const r of response) {
if (r.status === 'close') {
return cb(r)
} else {
if (r.status === 'update') {
cb(r.data)
}
lastMid = r._mid
}
}
await subscribe(topic, cb, lastMid)
} else {
if (response.status === 'close') {
return cb(response)
} else if (response.status === 'update') {
cb(response.data)
await subscribe(topic, cb, response._mid)
} else if (response.status === 'refresh') {
await subscribe(topic, cb, _mid)
}
}
} catch (e) {
setTimeout(() => {
subscribe(topic, cb, _mid)
}, 1000)
}
}
const init = () => {
unsub = false
}
if ((nuxtApp.$state as ReturnType<typeof useGlobal>).signedIn.value) {
await init()
}
watch((nuxtApp.$state as ReturnType<typeof useGlobal>).token, (newToken, oldToken) => {
if (newToken && newToken !== oldToken) init()
else if (!newToken) {
unsub = true
}
})
const poller = {
subscribe,
}
nuxtApp.provide('poller', poller)
})

47
packages/nocodb/src/Noco.ts

@ -105,35 +105,44 @@ export default class Noco {
// new ExpressAdapter(server),
);
nestApp.useWebSocketAdapter(new IoAdapter(httpServer));
if (process.env.NC_WORKER_CONTAINER === 'true') {
if (!process.env.NC_REDIS_URL) {
throw new Error('NC_REDIS_URL is required');
}
process.env.NC_DISABLE_TELE = 'true';
this._httpServer = nestApp.getHttpAdapter().getInstance();
this._server = server;
nestApp.init();
} else {
nestApp.useWebSocketAdapter(new IoAdapter(httpServer));
nestApp.use(requestIp.mw());
nestApp.use(cookieParser());
this._httpServer = nestApp.getHttpAdapter().getInstance();
this._server = server;
this.initSentry(nestApp);
nestApp.use(requestIp.mw());
nestApp.use(cookieParser());
nestApp.useWebSocketAdapter(new IoAdapter(httpServer));
this.initSentry(nestApp);
nestApp.use(
express.json({ limit: process.env.NC_REQUEST_BODY_SIZE || '50mb' }),
);
nestApp.useWebSocketAdapter(new IoAdapter(httpServer));
await nestApp.init();
nestApp.use(
express.json({ limit: process.env.NC_REQUEST_BODY_SIZE || '50mb' }),
);
const dashboardPath = process.env.NC_DASHBOARD_URL ?? '/dashboard';
server.use(NcToolGui.expressMiddleware(dashboardPath));
server.use(express.static(path.join(__dirname, 'public')));
await nestApp.init();
if (dashboardPath !== '/' && dashboardPath !== '') {
server.get('/', (_req, res) => res.redirect(dashboardPath));
}
const dashboardPath = process.env.NC_DASHBOARD_URL ?? '/dashboard';
server.use(NcToolGui.expressMiddleware(dashboardPath));
server.use(express.static(path.join(__dirname, 'public')));
this.initSentryErrorHandler(server);
if (dashboardPath !== '/' && dashboardPath !== '') {
server.get('/', (_req, res) => res.redirect(dashboardPath));
}
this.initSentryErrorHandler(server);
return nestApp.getHttpAdapter().getInstance();
return nestApp.getHttpAdapter().getInstance();
}
}
public static get httpServer(): http.Server {

2
packages/nocodb/src/modules/jobs/fallback/jobs.service.ts

@ -13,7 +13,7 @@ export class JobsService {
async jobStatus(jobId: string) {
return await (
await this.fallbackQueueService.getJob(jobId)
).status;
)?.status;
}
async jobList() {

266
packages/nocodb/src/modules/jobs/jobs.controller.ts

@ -0,0 +1,266 @@
import {
Body,
Controller,
HttpCode,
Inject,
Post,
Request,
Response,
UseGuards,
} from '@nestjs/common';
import { OnEvent } from '@nestjs/event-emitter';
import { customAlphabet } from 'nanoid';
import { JobsRedisService } from './redis/jobs-redis.service';
import { JobStatus } from '~/interface/Jobs';
import { JobEvents } from '~/interface/Jobs';
import { GlobalGuard } from '~/guards/global/global.guard';
import NocoCache from '~/cache/NocoCache';
import { CacheDelDirection, CacheGetType, CacheScope } from '~/utils/globals';
const nanoidv2 = customAlphabet('1234567890abcdefghijklmnopqrstuvwxyz', 14);
const POLLING_INTERVAL = 10000;
@Controller()
@UseGuards(GlobalGuard)
export class JobsController {
constructor(
@Inject('JobsService') private readonly jobsService,
private readonly jobsRedisService: JobsRedisService,
) {}
private jobRooms = {};
private localJobs = {};
private closedJobs = [];
@Post('/jobs/listen')
@HttpCode(200)
async listen(
@Response() res,
@Request() req,
@Body() body: { _mid: number; data: { id: string } },
) {
const { _mid = 0, data } = body;
const jobId = data.id;
res.setHeader('Cache-Control', 'no-cache, must-revalidate');
res.resId = nanoidv2();
let messages;
if (this.localJobs[jobId]) {
messages = this.localJobs[jobId].messages;
} else {
messages = (
await NocoCache.get(
`${CacheScope.JOBS}:${jobId}:messages`,
CacheGetType.TYPE_OBJECT,
)
)?.messages;
}
const newMessages: any[] = [];
if (messages) {
messages.forEach((m) => {
if (m._mid > _mid) {
newMessages.push(m);
}
});
}
if (newMessages.length > 0) {
res.send(newMessages);
return;
}
if (this.closedJobs.includes(jobId)) {
res.send({
status: 'close',
});
return;
}
if (this.jobRooms[jobId]) {
this.jobRooms[jobId].listeners.push(res);
} else {
this.jobRooms[jobId] = {
listeners: [res],
};
// subscribe to job events
this.jobsRedisService.subscribe(jobId, (data) => {
if (this.jobRooms[jobId]) {
this.jobRooms[jobId].listeners.forEach((res) => {
if (!res.headersSent) {
res.send({
status: 'refresh',
});
}
});
}
const cmd = data.cmd;
delete data.cmd;
switch (cmd) {
case JobEvents.STATUS:
if ([JobStatus.COMPLETED, JobStatus.FAILED].includes(data.status)) {
this.jobsRedisService.unsubscribe(jobId);
delete this.jobRooms[jobId];
this.closedJobs.push(jobId);
setTimeout(() => {
this.closedJobs = this.closedJobs.filter((j) => j !== jobId);
}, POLLING_INTERVAL * 2);
}
break;
}
});
}
res.on('close', () => {
if (jobId && this.jobRooms[jobId]?.listeners) {
this.jobRooms[jobId].listeners = this.jobRooms[jobId].listeners.filter(
(r) => r.resId !== res.resId,
);
}
});
setTimeout(() => {
if (!res.headersSent) {
res.send({
status: 'refresh',
});
}
}, POLLING_INTERVAL);
}
@Post('/jobs/status')
async status(@Body() data: { id: string } | any) {
let res: {
id?: string;
status?: JobStatus;
} | null = null;
if (Object.keys(data).every((k) => ['id'].includes(k)) && data?.id) {
const rooms = (await this.jobsService.jobList()).map(
(j) => `jobs-${j.id}`,
);
const room = rooms.find((r) => r === `jobs-${data.id}`);
if (room) {
res.id = data.id;
}
} else {
const job = await this.jobsService.getJobWithData(data);
if (job) {
res = {};
res.id = job.id;
res.status = await this.jobsService.jobStatus(data.id);
}
}
return res;
}
@OnEvent(JobEvents.STATUS)
sendJobStatus(data: { id: string; status: JobStatus; data?: any }): void {
let response;
const jobId = data.id;
if (this.localJobs[jobId]) {
response = {
status: 'update',
data,
_mid: this.localJobs[jobId].messages.length + 1,
};
this.localJobs[jobId].messages.push(response);
NocoCache.set(`${CacheScope.JOBS}:${jobId}:messages`, {
messages: this.localJobs[jobId].messages,
});
} else {
response = {
status: 'update',
data,
_mid: 1,
};
this.localJobs[jobId] = {
messages: [response],
};
NocoCache.set(`${CacheScope.JOBS}:${jobId}:messages`, {
messages: this.localJobs[jobId].messages,
});
}
if (this.jobRooms[jobId]) {
this.jobRooms[jobId].listeners.forEach((res) => {
if (!res.headersSent) {
res.send(response);
}
});
}
if (process.env.NC_WORKER_CONTAINER === 'true') {
this.jobsRedisService.publish(jobId, {
cmd: JobEvents.STATUS,
...data,
});
}
if ([JobStatus.COMPLETED, JobStatus.FAILED].includes(data.status)) {
this.closedJobs.push(jobId);
setTimeout(() => {
this.closedJobs = this.closedJobs.filter((j) => j !== jobId);
}, POLLING_INTERVAL * 2);
setTimeout(() => {
delete this.jobRooms[jobId];
delete this.localJobs[jobId];
NocoCache.deepDel(`jobs`, jobId, CacheDelDirection.CHILD_TO_PARENT);
}, POLLING_INTERVAL);
}
}
@OnEvent(JobEvents.LOG)
sendJobLog(data: { id: string; data: { message: string } }): void {
let response;
const jobId = data.id;
if (this.localJobs[jobId]) {
response = {
status: 'update',
data,
_mid: this.localJobs[jobId].messages.length + 1,
};
this.localJobs[jobId].messages.push(response);
NocoCache.set(`${CacheScope.JOBS}:${jobId}:messages`, {
messages: this.localJobs[jobId].messages,
});
} else {
response = {
status: 'update',
data,
_mid: 1,
};
this.localJobs[jobId] = {
messages: [response],
};
NocoCache.set(`${CacheScope.JOBS}:${jobId}:messages`, {
messages: this.localJobs[jobId].messages,
});
}
if (this.jobRooms[jobId]) {
this.jobRooms[jobId].listeners.forEach((res) => {
if (!res.headersSent) {
res.send(response);
}
});
}
if (process.env.NC_WORKER_CONTAINER === 'true') {
this.jobsRedisService.publish(jobId, {
cmd: JobEvents.LOG,
...data,
});
}
}
}

110
packages/nocodb/src/modules/jobs/jobs.gateway.ts

@ -1,110 +0,0 @@
import {
ConnectedSocket,
MessageBody,
SubscribeMessage,
WebSocketGateway,
WebSocketServer,
} from '@nestjs/websockets';
import { Server, Socket } from 'socket.io';
import { ExecutionContextHost } from '@nestjs/core/helpers/execution-context-host';
import { AuthGuard } from '@nestjs/passport';
import { OnEvent } from '@nestjs/event-emitter';
import { Inject } from '@nestjs/common';
import type { OnModuleInit } from '@nestjs/common';
import type { JobStatus } from '~/interface/Jobs';
import { JobEvents } from '~/interface/Jobs';
const url = new URL(
process.env.NC_PUBLIC_URL ||
`http://localhost:${process.env.PORT || '8080'}/`,
);
let namespace = url.pathname;
namespace += namespace.endsWith('/') ? 'jobs' : '/jobs';
@WebSocketGateway({
cors: {
origin: '*',
allowedHeaders: ['xc-auth'],
credentials: true,
},
namespace,
})
export class JobsGateway implements OnModuleInit {
constructor(@Inject('JobsService') private readonly jobsService) {}
@WebSocketServer()
server: Server;
async onModuleInit() {
this.server.use(async (socket, next) => {
try {
const context = new ExecutionContextHost([socket.handshake as any]);
const guard = new (AuthGuard('jwt'))(context);
await guard.canActivate(context);
} catch {}
next();
});
}
@SubscribeMessage('subscribe')
async subscribe(
@MessageBody()
body: { _id: number; data: { id: string } | any },
@ConnectedSocket() client: Socket,
): Promise<void> {
const { _id, data } = body;
if (Object.keys(data).every((k) => ['id'].includes(k)) && data?.id) {
const rooms = (await this.jobsService.jobList()).map(
(j) => `jobs-${j.id}`,
);
const room = rooms.find((r) => r === `jobs-${data.id}`);
if (room) {
client.join(`jobs-${data.id}`);
client.emit('subscribed', {
_id,
id: data.id,
});
}
} else {
const job = await this.jobsService.getJobWithData(data);
if (job) {
client.join(`jobs-${job.id}`);
client.emit('subscribed', {
_id,
id: job.id,
});
}
}
}
@SubscribeMessage('status')
async status(
@MessageBody() body: { _id: number; data: { id: string } },
@ConnectedSocket() client: Socket,
): Promise<void> {
const { _id, data } = body;
client.emit('status', {
_id,
id: data.id,
status: await this.jobsService.jobStatus(data.id),
});
}
@OnEvent(JobEvents.STATUS)
sendJobStatus(data: { id: string; status: JobStatus; data?: any }): void {
this.server.to(`jobs-${data.id}`).emit('status', {
id: data.id,
status: data.status,
data: data.data,
});
}
@OnEvent(JobEvents.LOG)
sendJobLog(data: { id: string; data: { message: string } }): void {
this.server.to(`jobs-${data.id}`).emit('log', {
id: data.id,
data: data.data,
});
}
}

8
packages/nocodb/src/modules/jobs/jobs.module.ts

@ -17,9 +17,8 @@ import { BaseDeleteProcessor } from './jobs/base-delete/base-delete.processor';
// Jobs Module Related
import { JobsLogService } from './jobs/jobs-log.service';
import { JobsGateway } from './jobs.gateway';
// Redis
// import { JobsGateway } from './jobs.gateway';
import { JobsController } from './jobs.controller';
import { JobsService } from './redis/jobs.service';
import { JobsRedisService } from './redis/jobs-redis.service';
import { JobsEventService } from './redis/jobs-event.service';
@ -50,6 +49,7 @@ import { GlobalModule } from '~/modules/global/global.module';
: []),
],
controllers: [
JobsController,
...(process.env.NC_WORKER_CONTAINER !== 'true'
? [
DuplicateController,
@ -61,7 +61,7 @@ import { GlobalModule } from '~/modules/global/global.module';
: []),
],
providers: [
...(process.env.NC_WORKER_CONTAINER !== 'true' ? [JobsGateway] : []),
...(process.env.NC_WORKER_CONTAINER !== 'true' ? [] : []),
...(process.env.NC_REDIS_JOB_URL
? [JobsRedisService, JobsEventService]
: [FallbackQueueService, FallbackJobsEventService]),

91
packages/nocodb/src/modules/jobs/redis/jobs-event.service.ts

@ -6,31 +6,19 @@ import {
} from '@nestjs/bull';
import { Job } from 'bull';
import boxen from 'boxen';
import { EventEmitter2, OnEvent } from '@nestjs/event-emitter';
import { JobsRedisService } from './jobs-redis.service';
import { EventEmitter2 } from '@nestjs/event-emitter';
import { JobEvents, JOBS_QUEUE, JobStatus } from '~/interface/Jobs';
@Processor(JOBS_QUEUE)
export class JobsEventService {
constructor(
private jobsRedisService: JobsRedisService,
private eventEmitter: EventEmitter2,
) {}
constructor(private eventEmitter: EventEmitter2) {}
@OnQueueActive()
onActive(job: Job) {
if (process.env.NC_WORKER_CONTAINER === 'true') {
this.jobsRedisService.publish(`jobs-${job.id.toString()}`, {
cmd: JobEvents.STATUS,
id: job.id.toString(),
status: JobStatus.ACTIVE,
});
} else {
this.eventEmitter.emit(JobEvents.STATUS, {
id: job.id.toString(),
status: JobStatus.ACTIVE,
});
}
this.eventEmitter.emit(JobEvents.STATUS, {
id: job.id.toString(),
status: JobStatus.ACTIVE,
});
}
@OnQueueFailed()
@ -46,62 +34,25 @@ export class JobsEventService {
),
);
if (process.env.NC_WORKER_CONTAINER === 'true') {
this.jobsRedisService.publish(`jobs-${job.id.toString()}`, {
cmd: JobEvents.STATUS,
id: job.id.toString(),
status: JobStatus.FAILED,
data: {
error: {
message: error?.message,
},
this.eventEmitter.emit(JobEvents.STATUS, {
id: job.id.toString(),
status: JobStatus.FAILED,
data: {
error: {
message: error?.message,
},
});
} else {
this.jobsRedisService.unsubscribe(`jobs-${job.id.toString()}`);
this.eventEmitter.emit(JobEvents.STATUS, {
id: job.id.toString(),
status: JobStatus.FAILED,
data: {
error: {
message: error?.message,
},
},
});
}
},
});
}
@OnQueueCompleted()
onCompleted(job: Job, data: any) {
if (process.env.NC_WORKER_CONTAINER === 'true') {
this.jobsRedisService.publish(`jobs-${job.id.toString()}`, {
cmd: JobEvents.STATUS,
id: job.id.toString(),
status: JobStatus.COMPLETED,
data: {
result: data,
},
});
} else {
this.jobsRedisService.unsubscribe(`jobs-${job.id.toString()}`);
this.eventEmitter.emit(JobEvents.STATUS, {
id: job.id.toString(),
status: JobStatus.COMPLETED,
data: {
result: data,
},
});
}
}
@OnEvent(JobEvents.LOG)
onLog(data: { id: string; data: { message: string } }) {
if (process.env.NC_WORKER_CONTAINER === 'true') {
this.jobsRedisService.publish(`jobs-${data.id}`, {
cmd: JobEvents.LOG,
id: data.id,
data: data.data,
});
}
this.eventEmitter.emit(JobEvents.STATUS, {
id: job.id.toString(),
status: JobStatus.COMPLETED,
data: {
result: data,
},
});
}
}

31
packages/nocodb/src/modules/jobs/redis/jobs.service.ts

@ -1,18 +1,12 @@
import { InjectQueue } from '@nestjs/bull';
import { Injectable } from '@nestjs/common';
import { Queue } from 'bull';
import { EventEmitter2 } from '@nestjs/event-emitter';
import { JobsRedisService } from './jobs-redis.service';
import type { OnModuleInit } from '@nestjs/common';
import { JobEvents, JOBS_QUEUE, JobStatus } from '~/interface/Jobs';
import { JOBS_QUEUE, JobStatus } from '~/interface/Jobs';
@Injectable()
export class JobsService implements OnModuleInit {
constructor(
@InjectQueue(JOBS_QUEUE) private readonly jobsQueue: Queue,
private jobsRedisService: JobsRedisService,
private eventEmitter: EventEmitter2,
) {}
constructor(@InjectQueue(JOBS_QUEUE) private readonly jobsQueue: Queue) {}
// pause primary instance queue
async onModuleInit() {
@ -28,31 +22,14 @@ export class JobsService implements OnModuleInit {
// if there is no worker and primary instance queue is paused, resume it
// if there is any worker and primary instance queue is not paused, pause it
if (workerCount < 1 && localWorkerPaused) {
if (workerCount === 1 && localWorkerPaused) {
await this.jobsQueue.resume(true);
} else if (workerCount > 0 && !localWorkerPaused) {
} else if (workerCount > 1 && !localWorkerPaused) {
await this.jobsQueue.pause(true);
}
const job = await this.jobsQueue.add(name, data);
// subscribe to job events
this.jobsRedisService.subscribe(`jobs-${job.id.toString()}`, (data) => {
const cmd = data.cmd;
delete data.cmd;
switch (cmd) {
case JobEvents.STATUS:
this.eventEmitter.emit(JobEvents.STATUS, data);
if ([JobStatus.COMPLETED, JobStatus.FAILED].includes(data.status)) {
this.jobsRedisService.unsubscribe(`jobs-${data.id.toString()}`);
}
break;
case JobEvents.LOG:
this.eventEmitter.emit(JobEvents.LOG, data);
break;
}
});
return job;
}

48
packages/nocodb/src/schema/swagger.json

@ -16227,6 +16227,54 @@
}
]
}
},
"/jobs/listen": {
"post": {
"summary": "Jobs Listen",
"operationId": "jobs-listen",
"description": "Listen for job events",
"tags": [
"Jobs"
],
"requestBody": {
"content": {
"application/json": {
"schema": {
"type": "object"
}
}
}
}
},
"parameters": [
{
"$ref": "#/components/parameters/xc-auth"
}
]
},
"/jobs/status": {
"post": {
"summary": "Jobs Status",
"operationId": "jobs-status",
"description": "Get job status",
"tags": [
"Jobs"
],
"requestBody": {
"content": {
"application/json": {
"schema": {
"type": "object"
}
}
}
}
},
"parameters": [
{
"$ref": "#/components/parameters/xc-auth"
}
]
}
},
"components": {

1
packages/nocodb/src/utils/globals.ts

@ -150,6 +150,7 @@ export enum CacheScope {
USER_PROJECT = 'userProject',
DASHBOARD_PROJECT_DB_PROJECT_LINKING = 'dashboardProjectDBProjectLinking',
SINGLE_QUERY = 'singleQuery',
JOBS = 'nc_jobs',
}
export enum CacheGetType {

Loading…
Cancel
Save