mirror of https://github.com/nocodb/nocodb
mertmit
2 years ago
11 changed files with 470 additions and 21 deletions
@ -0,0 +1,70 @@
|
||||
import type { Socket } from 'socket.io-client' |
||||
import io from 'socket.io-client' |
||||
import { defineNuxtPlugin, useGlobal, watch } from '#imports' |
||||
|
||||
export default defineNuxtPlugin(async (nuxtApp) => { |
||||
const { appInfo } = $(useGlobal()) |
||||
|
||||
let socket: Socket | null = null |
||||
|
||||
const init = async (token: string) => { |
||||
try { |
||||
if (socket) socket.disconnect() |
||||
|
||||
const url = new URL(appInfo.ncSiteUrl, window.location.href.split(/[?#]/)[0]) |
||||
|
||||
url.port = '8081' |
||||
|
||||
socket = io(`${url.href}jobs`, { |
||||
extraHeaders: { 'xc-auth': token }, |
||||
}) |
||||
|
||||
socket.on('connect_error', (e) => { |
||||
console.error(e) |
||||
socket?.disconnect() |
||||
}) |
||||
} catch {} |
||||
} |
||||
|
||||
if (nuxtApp.$state.signedIn.value) { |
||||
await init(nuxtApp.$state.token.value) |
||||
} |
||||
|
||||
const events = { |
||||
subscribe(type: string, id: string, cb: (data: any) => void) { |
||||
if (socket) { |
||||
socket.emit('subscribe', { type, id }) |
||||
const tempFn = (data: any) => { |
||||
if (data.id === id && data.type === type) { |
||||
cb(data) |
||||
if (data.status === 'completed' || data.status === 'failed') { |
||||
socket?.off('status', tempFn) |
||||
} |
||||
} |
||||
} |
||||
socket.on('status', tempFn) |
||||
} |
||||
}, |
||||
getStatus(type: string, id: string): Promise<string> { |
||||
return new Promise((resolve) => { |
||||
if (socket) { |
||||
socket.emit('status', { type, id }) |
||||
const tempFn = (data: any) => { |
||||
if (data.id === id && data.type === type) { |
||||
resolve(data.status) |
||||
socket?.off('status', tempFn) |
||||
} |
||||
} |
||||
socket.on('status', tempFn) |
||||
} |
||||
}) |
||||
}, |
||||
} |
||||
|
||||
watch((nuxtApp.$state as ReturnType<typeof useGlobal>).token, (newToken, oldToken) => { |
||||
if (newToken && newToken !== oldToken) init(newToken) |
||||
else if (!newToken) socket?.disconnect() |
||||
}) |
||||
|
||||
nuxtApp.provide('events', events) |
||||
}) |
@ -0,0 +1,71 @@
|
||||
import { |
||||
ConnectedSocket, |
||||
MessageBody, |
||||
SubscribeMessage, |
||||
WebSocketGateway, |
||||
WebSocketServer, |
||||
} from '@nestjs/websockets'; |
||||
import { Server, Socket } from 'socket.io'; |
||||
import { Injectable, UseGuards } from '@nestjs/common'; |
||||
import { GlobalGuard } from 'src/guards/global/global.guard'; |
||||
import { ExtractProjectIdMiddleware } from 'src/middlewares/extract-project-id/extract-project-id.middleware'; |
||||
import { JobsService } from './jobs.service'; |
||||
|
||||
@WebSocketGateway(8081, { |
||||
cors: { |
||||
origin: '*', |
||||
}, |
||||
allowedHeaders: ['xc-auth'], |
||||
credentials: true, |
||||
namespace: 'jobs', |
||||
}) |
||||
@Injectable() |
||||
export class JobsGateway { |
||||
constructor(private readonly jobsService: JobsService) {} |
||||
|
||||
@WebSocketServer() |
||||
server: Server; |
||||
|
||||
@SubscribeMessage('subscribe') |
||||
async subscribe( |
||||
@MessageBody() data: { type: string; id: string }, |
||||
@ConnectedSocket() client: Socket, |
||||
): Promise<void> { |
||||
const rooms = await this.jobsService.jobList(data.type); |
||||
const room = rooms.find((r) => r.id === data.id); |
||||
if (room) { |
||||
client.join(data.id); |
||||
} |
||||
} |
||||
|
||||
@SubscribeMessage('status') |
||||
async status( |
||||
@MessageBody() data: { type: string; id: string }, |
||||
@ConnectedSocket() client: Socket, |
||||
): Promise<void> { |
||||
client.emit('status', { |
||||
id: data.id, |
||||
type: data.type, |
||||
status: await this.jobsService.jobStatus(data.type, data.id), |
||||
}); |
||||
} |
||||
|
||||
async jobStatus(data: { |
||||
type: string; |
||||
id: string; |
||||
status: |
||||
| 'completed' |
||||
| 'waiting' |
||||
| 'active' |
||||
| 'delayed' |
||||
| 'failed' |
||||
| 'paused' |
||||
| 'refresh'; |
||||
}): Promise<void> { |
||||
this.server.to(data.id).emit('status', { |
||||
id: data.id, |
||||
type: data.type, |
||||
status: data.status, |
||||
}); |
||||
} |
||||
} |
@ -0,0 +1,28 @@
|
||||
import { InjectQueue } from '@nestjs/bull'; |
||||
import { Injectable } from '@nestjs/common'; |
||||
import { Queue } from 'bull'; |
||||
|
||||
@Injectable() |
||||
export class JobsService { |
||||
constructor(@InjectQueue('duplicate') private duplicateQueue: Queue) {} |
||||
|
||||
async jobStatus(jobType: string, jobId: string) { |
||||
switch (jobType) { |
||||
case 'duplicate': |
||||
default: |
||||
return await (await this.duplicateQueue.getJob(jobId)).getState(); |
||||
} |
||||
} |
||||
|
||||
async jobList(jobType: string) { |
||||
switch (jobType) { |
||||
case 'duplicate': |
||||
default: |
||||
return await this.duplicateQueue.getJobs([ |
||||
'active', |
||||
'waiting', |
||||
'delayed', |
||||
]); |
||||
} |
||||
} |
||||
} |
Loading…
Reference in new issue