mirror of https://github.com/nocodb/nocodb
mertmit
2 years ago
committed by
starbirdtech383
11 changed files with 470 additions and 21 deletions
@ -0,0 +1,70 @@ |
|||||||
|
import type { Socket } from 'socket.io-client' |
||||||
|
import io from 'socket.io-client' |
||||||
|
import { defineNuxtPlugin, useGlobal, watch } from '#imports' |
||||||
|
|
||||||
|
export default defineNuxtPlugin(async (nuxtApp) => { |
||||||
|
const { appInfo } = $(useGlobal()) |
||||||
|
|
||||||
|
let socket: Socket | null = null |
||||||
|
|
||||||
|
const init = async (token: string) => { |
||||||
|
try { |
||||||
|
if (socket) socket.disconnect() |
||||||
|
|
||||||
|
const url = new URL(appInfo.ncSiteUrl, window.location.href.split(/[?#]/)[0]) |
||||||
|
|
||||||
|
url.port = '8081' |
||||||
|
|
||||||
|
socket = io(`${url.href}jobs`, { |
||||||
|
extraHeaders: { 'xc-auth': token }, |
||||||
|
}) |
||||||
|
|
||||||
|
socket.on('connect_error', (e) => { |
||||||
|
console.error(e) |
||||||
|
socket?.disconnect() |
||||||
|
}) |
||||||
|
} catch {} |
||||||
|
} |
||||||
|
|
||||||
|
if (nuxtApp.$state.signedIn.value) { |
||||||
|
await init(nuxtApp.$state.token.value) |
||||||
|
} |
||||||
|
|
||||||
|
const events = { |
||||||
|
subscribe(type: string, id: string, cb: (data: any) => void) { |
||||||
|
if (socket) { |
||||||
|
socket.emit('subscribe', { type, id }) |
||||||
|
const tempFn = (data: any) => { |
||||||
|
if (data.id === id && data.type === type) { |
||||||
|
cb(data) |
||||||
|
if (data.status === 'completed' || data.status === 'failed') { |
||||||
|
socket?.off('status', tempFn) |
||||||
|
} |
||||||
|
} |
||||||
|
} |
||||||
|
socket.on('status', tempFn) |
||||||
|
} |
||||||
|
}, |
||||||
|
getStatus(type: string, id: string): Promise<string> { |
||||||
|
return new Promise((resolve) => { |
||||||
|
if (socket) { |
||||||
|
socket.emit('status', { type, id }) |
||||||
|
const tempFn = (data: any) => { |
||||||
|
if (data.id === id && data.type === type) { |
||||||
|
resolve(data.status) |
||||||
|
socket?.off('status', tempFn) |
||||||
|
} |
||||||
|
} |
||||||
|
socket.on('status', tempFn) |
||||||
|
} |
||||||
|
}) |
||||||
|
}, |
||||||
|
} |
||||||
|
|
||||||
|
watch((nuxtApp.$state as ReturnType<typeof useGlobal>).token, (newToken, oldToken) => { |
||||||
|
if (newToken && newToken !== oldToken) init(newToken) |
||||||
|
else if (!newToken) socket?.disconnect() |
||||||
|
}) |
||||||
|
|
||||||
|
nuxtApp.provide('events', events) |
||||||
|
}) |
@ -0,0 +1,71 @@ |
|||||||
|
import { |
||||||
|
ConnectedSocket, |
||||||
|
MessageBody, |
||||||
|
SubscribeMessage, |
||||||
|
WebSocketGateway, |
||||||
|
WebSocketServer, |
||||||
|
} from '@nestjs/websockets'; |
||||||
|
import { Server, Socket } from 'socket.io'; |
||||||
|
import { Injectable, UseGuards } from '@nestjs/common'; |
||||||
|
import { GlobalGuard } from 'src/guards/global/global.guard'; |
||||||
|
import { ExtractProjectIdMiddleware } from 'src/middlewares/extract-project-id/extract-project-id.middleware'; |
||||||
|
import { JobsService } from './jobs.service'; |
||||||
|
|
||||||
|
@WebSocketGateway(8081, { |
||||||
|
cors: { |
||||||
|
origin: '*', |
||||||
|
}, |
||||||
|
allowedHeaders: ['xc-auth'], |
||||||
|
credentials: true, |
||||||
|
namespace: 'jobs', |
||||||
|
}) |
||||||
|
@Injectable() |
||||||
|
export class JobsGateway { |
||||||
|
constructor(private readonly jobsService: JobsService) {} |
||||||
|
|
||||||
|
@WebSocketServer() |
||||||
|
server: Server; |
||||||
|
|
||||||
|
@SubscribeMessage('subscribe') |
||||||
|
async subscribe( |
||||||
|
@MessageBody() data: { type: string; id: string }, |
||||||
|
@ConnectedSocket() client: Socket, |
||||||
|
): Promise<void> { |
||||||
|
const rooms = await this.jobsService.jobList(data.type); |
||||||
|
const room = rooms.find((r) => r.id === data.id); |
||||||
|
if (room) { |
||||||
|
client.join(data.id); |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
@SubscribeMessage('status') |
||||||
|
async status( |
||||||
|
@MessageBody() data: { type: string; id: string }, |
||||||
|
@ConnectedSocket() client: Socket, |
||||||
|
): Promise<void> { |
||||||
|
client.emit('status', { |
||||||
|
id: data.id, |
||||||
|
type: data.type, |
||||||
|
status: await this.jobsService.jobStatus(data.type, data.id), |
||||||
|
}); |
||||||
|
} |
||||||
|
|
||||||
|
async jobStatus(data: { |
||||||
|
type: string; |
||||||
|
id: string; |
||||||
|
status: |
||||||
|
| 'completed' |
||||||
|
| 'waiting' |
||||||
|
| 'active' |
||||||
|
| 'delayed' |
||||||
|
| 'failed' |
||||||
|
| 'paused' |
||||||
|
| 'refresh'; |
||||||
|
}): Promise<void> { |
||||||
|
this.server.to(data.id).emit('status', { |
||||||
|
id: data.id, |
||||||
|
type: data.type, |
||||||
|
status: data.status, |
||||||
|
}); |
||||||
|
} |
||||||
|
} |
@ -0,0 +1,28 @@ |
|||||||
|
import { InjectQueue } from '@nestjs/bull'; |
||||||
|
import { Injectable } from '@nestjs/common'; |
||||||
|
import { Queue } from 'bull'; |
||||||
|
|
||||||
|
@Injectable() |
||||||
|
export class JobsService { |
||||||
|
constructor(@InjectQueue('duplicate') private duplicateQueue: Queue) {} |
||||||
|
|
||||||
|
async jobStatus(jobType: string, jobId: string) { |
||||||
|
switch (jobType) { |
||||||
|
case 'duplicate': |
||||||
|
default: |
||||||
|
return await (await this.duplicateQueue.getJob(jobId)).getState(); |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
async jobList(jobType: string) { |
||||||
|
switch (jobType) { |
||||||
|
case 'duplicate': |
||||||
|
default: |
||||||
|
return await this.duplicateQueue.getJobs([ |
||||||
|
'active', |
||||||
|
'waiting', |
||||||
|
'delayed', |
||||||
|
]); |
||||||
|
} |
||||||
|
} |
||||||
|
} |
Loading…
Reference in new issue