diff --git a/packages/nc-gui/components/dlg/AirtableImport.vue b/packages/nc-gui/components/dlg/AirtableImport.vue index d33078bf37..aadfd13697 100644 --- a/packages/nc-gui/components/dlg/AirtableImport.vue +++ b/packages/nc-gui/components/dlg/AirtableImport.vue @@ -22,6 +22,8 @@ const { refreshCommandPalette } = useCommandPalette() const { loadTables } = baseStore +const { getJobsForBase, loadJobsForBase } = useJobs() + const showGoToDashboardButton = ref(false) const step = ref(1) @@ -141,7 +143,13 @@ async function listenForUpdates(id?: string) { listeningForUpdates.value = true - const job = id ? { id } : await $api.jobs.status({ syncId: syncSource.value.id }) + await loadJobsForBase(baseId) + + const jobs = await getJobsForBase(baseId) + + const job = id + ? { id } + : jobs.find((j) => j.base_id === baseId && j.status !== JobStatus.COMPLETED && j.status !== JobStatus.FAILED) if (!job) { listeningForUpdates.value = false diff --git a/packages/nc-gui/components/smartsheet/grid/PaginationV2.vue b/packages/nc-gui/components/smartsheet/grid/PaginationV2.vue index c42e7ccf8e..88cd79868b 100644 --- a/packages/nc-gui/components/smartsheet/grid/PaginationV2.vue +++ b/packages/nc-gui/components/smartsheet/grid/PaginationV2.vue @@ -256,7 +256,7 @@ const renderAltOrOptlKey = () => {
-
+
{
{ + const baseJobs = ref>({}) + return { baseJobs } +}) + +interface JobType { + id: string + job: string + status: string + result: Record + fk_user_id: string + fk_workspace_id: string + base_id: string + created_at: Date + updated_at: Date +} + +export const useJobs = createSharedComposable(() => { + const { baseJobs } = jobsState() + + const { $api } = useNuxtApp() + + const { base } = storeToRefs(useBase()) + + const activeBaseJobs = computed(() => { + if (!base.value || !base.value.id) { + return null + } + return baseJobs.value[base.value.id] + }) + + const jobList = computed(() => { + return activeBaseJobs.value || [] + }) + + const getJobsForBase = (baseId: string) => { + return baseJobs.value[baseId] || [] + } + + const loadJobsForBase = async (baseId?: string) => { + if (!baseId) { + baseId = base.value.id + + if (!baseId) { + return + } + } + + const jobs: JobType[] = await $api.jobs.list(baseId, {}) + + if (baseJobs.value[baseId]) { + baseJobs.value[baseId] = jobs || baseJobs.value[baseId] + } else { + baseJobs.value[baseId] = jobs || [] + } + } + + return { + jobList, + loadJobsForBase, + getJobsForBase, + } +}) diff --git a/packages/nc-gui/extensions/data-exporter/icon.png b/packages/nc-gui/extensions/data-exporter/icon.png new file mode 100644 index 0000000000..112d158118 Binary files /dev/null and b/packages/nc-gui/extensions/data-exporter/icon.png differ diff --git a/packages/nc-gui/extensions/data-exporter/index.vue b/packages/nc-gui/extensions/data-exporter/index.vue new file mode 100644 index 0000000000..8c42f37ede --- /dev/null +++ b/packages/nc-gui/extensions/data-exporter/index.vue @@ -0,0 +1,180 @@ + + + + + diff --git a/packages/nc-gui/extensions/data-exporter/manifest.json b/packages/nc-gui/extensions/data-exporter/manifest.json new file mode 100644 index 0000000000..60a38170a9 --- /dev/null +++ b/packages/nc-gui/extensions/data-exporter/manifest.json @@ -0,0 +1,11 @@ +{ + "id": "nc-data-exporter", + "title": "Data Exporter", + "description": "Export any view in various formats", + "entry": "data-exporter", + "version": "0.1", + "iconUrl": "data-exporter/icon.png", + "publisherName": "NocoDB", + "publisherEmail": "contact@nocodb.com", + "publisherUrl": "https://www.nocodb.com" +} diff --git a/packages/nocodb/src/controllers/attachments-secure.controller.ts b/packages/nocodb/src/controllers/attachments-secure.controller.ts index d39d310145..9d5a69a8e1 100644 --- a/packages/nocodb/src/controllers/attachments-secure.controller.ts +++ b/packages/nocodb/src/controllers/attachments-secure.controller.ts @@ -69,16 +69,33 @@ export class AttachmentsSecureController { @Get('/dltemp/:param(*)') async fileReadv3(@Param('param') param: string, @Res() res: Response) { try { - const fpath = await PresignedUrl.getPath(`dltemp/${param}`); + const fullPath = await PresignedUrl.getPath(`dltemp/${param}`); + + const queryHelper = fullPath.split('?'); + + const fpath = queryHelper[0]; + + let queryFilename = null; + + if (queryHelper.length > 1) { + const query = new URLSearchParams(queryHelper[1]); + queryFilename = query.get('filename'); + } const file = await this.attachmentsService.getFile({ path: path.join('nc', 'uploads', fpath), }); if (this.attachmentsService.previewAvailable(file.type)) { + if (queryFilename) { + res.setHeader( + 'Content-Disposition', + `attachment; filename=${queryFilename}`, + ); + } res.sendFile(file.path); } else { - res.download(file.path); + res.download(file.path, queryFilename); } } catch (e) { res.status(404).send('Not found'); diff --git a/packages/nocodb/src/controllers/attachments.controller.ts b/packages/nocodb/src/controllers/attachments.controller.ts index 5e6f621545..1ae7d13f8a 100644 --- a/packages/nocodb/src/controllers/attachments.controller.ts +++ b/packages/nocodb/src/controllers/attachments.controller.ts @@ -63,16 +63,26 @@ export class AttachmentsController { // , getCacheMiddleware(), catchError(fileRead)); @Get('/download/:filename(*)') // This route will match any URL that starts with - async fileRead(@Param('filename') filename: string, @Res() res: Response) { + async fileRead( + @Param('filename') filename: string, + @Res() res: Response, + @Query('filename') queryFilename?: string, + ) { try { const file = await this.attachmentsService.getFile({ path: path.join('nc', 'uploads', filename), }); if (this.attachmentsService.previewAvailable(file.type)) { + if (queryFilename) { + res.setHeader( + 'Content-Disposition', + `attachment; filename=${queryFilename}`, + ); + } res.sendFile(file.path); } else { - res.download(file.path); + res.download(file.path, queryFilename); } } catch (e) { res.status(404).send('Not found'); @@ -87,6 +97,7 @@ export class AttachmentsController { @Param('param2') param2: string, @Param('filename') filename: string, @Res() res: Response, + @Query('filename') queryFilename?: string, ) { try { const file = await this.attachmentsService.getFile({ @@ -100,9 +111,15 @@ export class AttachmentsController { }); if (this.attachmentsService.previewAvailable(file.type)) { + if (queryFilename) { + res.setHeader( + 'Content-Disposition', + `attachment; filename=${queryFilename}`, + ); + } res.sendFile(file.path); } else { - res.download(file.path); + res.download(file.path, queryFilename); } } catch (e) { res.status(404).send('Not found'); @@ -112,16 +129,33 @@ export class AttachmentsController { @Get('/dltemp/:param(*)') async fileReadv3(@Param('param') param: string, @Res() res: Response) { try { - const fpath = await PresignedUrl.getPath(`dltemp/${param}`); + const fullPath = await PresignedUrl.getPath(`dltemp/${param}`); + + const queryHelper = fullPath.split('?'); + + const fpath = queryHelper[0]; + + let queryFilename = null; + + if (queryHelper.length > 1) { + const query = new URLSearchParams(queryHelper[1]); + queryFilename = query.get('filename'); + } const file = await this.attachmentsService.getFile({ path: path.join('nc', 'uploads', fpath), }); if (this.attachmentsService.previewAvailable(file.type)) { + if (queryFilename) { + res.setHeader( + 'Content-Disposition', + `attachment; filename=${queryFilename}`, + ); + } res.sendFile(file.path); } else { - res.download(file.path); + res.download(file.path, queryFilename); } } catch (e) { res.status(404).send('Not found'); diff --git a/packages/nocodb/src/controllers/jobs-meta.controller.spec.ts b/packages/nocodb/src/controllers/jobs-meta.controller.spec.ts new file mode 100644 index 0000000000..fa030595a9 --- /dev/null +++ b/packages/nocodb/src/controllers/jobs-meta.controller.spec.ts @@ -0,0 +1,21 @@ +import { Test } from '@nestjs/testing'; +import { HooksService } from '../services/hooks.service'; +import { JobsMetaController } from './jobs-meta.controller'; +import type { TestingModule } from '@nestjs/testing'; + +describe('JobsMetaController', () => { + let controller: JobsMetaController; + + beforeEach(async () => { + const module: TestingModule = await Test.createTestingModule({ + controllers: [JobsMetaController], + providers: [HooksService], + }).compile(); + + controller = module.get(JobsMetaController); + }); + + it('should be defined', () => { + expect(controller).toBeDefined(); + }); +}); diff --git a/packages/nocodb/src/controllers/jobs-meta.controller.ts b/packages/nocodb/src/controllers/jobs-meta.controller.ts new file mode 100644 index 0000000000..aea3b38618 --- /dev/null +++ b/packages/nocodb/src/controllers/jobs-meta.controller.ts @@ -0,0 +1,28 @@ +import { Body, Controller, Post, Req, UseGuards } from '@nestjs/common'; +import type { JobStatus, JobTypes } from '~/interface/Jobs'; +import { GlobalGuard } from '~/guards/global/global.guard'; +import { Acl } from '~/middlewares/extract-ids/extract-ids.middleware'; +import { MetaApiLimiterGuard } from '~/guards/meta-api-limiter.guard'; +import { TenantContext } from '~/decorators/tenant-context.decorator'; +import { NcContext, NcRequest } from '~/interface/config'; +import { JobsMetaService } from '~/services/jobs-meta.service'; + +@Controller() +@UseGuards(MetaApiLimiterGuard, GlobalGuard) +export class JobsMetaController { + constructor(private readonly jobsMetaService: JobsMetaService) {} + + @Post(['/api/v2/jobs/:baseId']) + @Acl('jobList') + async jobList( + @TenantContext() context: NcContext, + @Req() req: NcRequest, + @Body() + conditions?: { + job?: JobTypes; + status?: JobStatus; + }, + ) { + return await this.jobsMetaService.list(context, conditions, req); + } +} diff --git a/packages/nocodb/src/helpers/dataHelpers.ts b/packages/nocodb/src/helpers/dataHelpers.ts index 6eb82bd3a3..d0a2a1709f 100644 --- a/packages/nocodb/src/helpers/dataHelpers.ts +++ b/packages/nocodb/src/helpers/dataHelpers.ts @@ -222,6 +222,13 @@ export async function serializeCellValue( .join(', '); } break; + case UITypes.Decimal: + { + if (isNaN(Number(value))) return null; + + return Number(value).toFixed(column.meta?.precision ?? 1); + } + break; default: if (value && typeof value === 'object') { return JSON.stringify(value); diff --git a/packages/nocodb/src/interface/Jobs.ts b/packages/nocodb/src/interface/Jobs.ts index 893bcc56d3..406c55c9dc 100644 --- a/packages/nocodb/src/interface/Jobs.ts +++ b/packages/nocodb/src/interface/Jobs.ts @@ -1,4 +1,5 @@ -import type { NcContext } from '~/interface/config'; +import type { UserType } from 'nocodb-sdk'; +import type { NcContext, NcRequest } from '~/interface/config'; export const JOBS_QUEUE = 'jobs'; export enum JobTypes { @@ -15,6 +16,7 @@ export enum JobTypes { HealthCheck = 'health-check', HandleWebhook = 'handle-webhook', CleanUp = 'clean-up', + DataExport = 'data-export', } export enum JobStatus { @@ -44,12 +46,77 @@ export enum InstanceCommands { RELEASE = 'release', } -export interface HandleWebhookJobData { +export interface JobData { context: NcContext; + user: Partial; +} + +export interface AtImportJobData extends JobData { + syncId: string; + baseId: string; + sourceId: string; + baseName: string; + authToken: string; + baseURL: string; + clientIp: string; + options?: { + syncViews?: boolean; + syncAttachment?: boolean; + syncLookup?: boolean; + syncRollup?: boolean; + syncUsers?: boolean; + syncData?: boolean; + }; + user: any; +} + +export interface DuplicateBaseJobData extends JobData { + sourceId: string; + dupProjectId: string; + req: NcRequest; + options: { + excludeData?: boolean; + excludeViews?: boolean; + excludeHooks?: boolean; + }; +} + +export interface DuplicateModelJobData extends JobData { + sourceId: string; + modelId: string; + title: string; + req: NcRequest; + options: { + excludeData?: boolean; + excludeViews?: boolean; + excludeHooks?: boolean; + }; +} + +export interface DuplicateColumnJobData extends JobData { + sourceId: string; + columnId: string; + extra: Record; // extra data + req: NcRequest; + options: { + excludeData?: boolean; + }; +} + +export interface HandleWebhookJobData extends JobData { hookId: string; modelId: string; viewId: string; prevData; newData; - user; +} + +export interface DataExportJobData extends JobData { + options?: { + delimiter?: string; + }; + modelId: string; + viewId: string; + exportAs: 'csv' | 'json' | 'xlsx'; + ncSiteUrl: string; } diff --git a/packages/nocodb/src/meta/meta.service.ts b/packages/nocodb/src/meta/meta.service.ts index 7eba9a7215..25582842db 100644 --- a/packages/nocodb/src/meta/meta.service.ts +++ b/packages/nocodb/src/meta/meta.service.ts @@ -251,6 +251,7 @@ export class MetaService { [MetaTable.COMMENTS]: 'com', [MetaTable.COMMENTS_REACTIONS]: 'cre', [MetaTable.USER_COMMENTS_NOTIFICATIONS_PREFERENCE]: 'cnp', + [MetaTable.JOBS]: 'job', }; const prefix = prefixMap[target] || 'nc'; @@ -726,6 +727,16 @@ export class MetaService { ); } + public formatDateTime(date: string): string { + return dayjs(date) + .utc() + .format( + this.isMySQL() || this.isMssql() + ? 'YYYY-MM-DD HH:mm:ss' + : 'YYYY-MM-DD HH:mm:ssZ', + ); + } + public async init(): Promise { await this.connection.migrate.latest({ migrationSource: new XcMigrationSource(), diff --git a/packages/nocodb/src/meta/migrations/XcMigrationSourcev2.ts b/packages/nocodb/src/meta/migrations/XcMigrationSourcev2.ts index d8733403a0..b6f946ced4 100644 --- a/packages/nocodb/src/meta/migrations/XcMigrationSourcev2.ts +++ b/packages/nocodb/src/meta/migrations/XcMigrationSourcev2.ts @@ -39,6 +39,7 @@ import * as nc_049_clear_notifications from '~/meta/migrations/v2/nc_049_clear_n import * as nc_050_tenant_isolation from '~/meta/migrations/v2/nc_050_tenant_isolation'; import * as nc_051_source_readonly_columns from '~/meta/migrations/v2/nc_051_source_readonly_columns'; import * as nc_052_field_aggregation from '~/meta/migrations/v2/nc_052_field_aggregation'; +import * as nc_053_jobs from '~/meta/migrations/v2/nc_053_jobs'; // Create a custom migration source class export default class XcMigrationSourcev2 { @@ -89,6 +90,7 @@ export default class XcMigrationSourcev2 { 'nc_050_tenant_isolation', 'nc_051_source_readonly_columns', 'nc_052_field_aggregation', + 'nc_053_jobs', ]); } @@ -180,6 +182,8 @@ export default class XcMigrationSourcev2 { return nc_051_source_readonly_columns; case 'nc_052_field_aggregation': return nc_052_field_aggregation; + case 'nc_053_jobs': + return nc_053_jobs; } } } diff --git a/packages/nocodb/src/meta/migrations/v2/nc_053_jobs.ts b/packages/nocodb/src/meta/migrations/v2/nc_053_jobs.ts new file mode 100644 index 0000000000..13580eaa26 --- /dev/null +++ b/packages/nocodb/src/meta/migrations/v2/nc_053_jobs.ts @@ -0,0 +1,30 @@ +import type { Knex } from 'knex'; +import { MetaTable } from '~/utils/globals'; + +const up = async (knex: Knex) => { + await knex.schema.createTable(MetaTable.JOBS, (table) => { + table.string('id', 20).primary(); + + table.string('job', 255); + + table.string('status', 20); + + table.text('result'); + + table.string('fk_user_id', 20); + + table.string('fk_workspace_id', 20); + + table.string('base_id', 20); + + table.timestamps(true, true); + + // TODO - add indexes + }); +}; + +const down = async (knex: Knex) => { + await knex.schema.dropTable(MetaTable.JOBS); +}; + +export { up, down }; diff --git a/packages/nocodb/src/models/Job.ts b/packages/nocodb/src/models/Job.ts new file mode 100644 index 0000000000..e1b63f12a1 --- /dev/null +++ b/packages/nocodb/src/models/Job.ts @@ -0,0 +1,136 @@ +import type { NcContext } from '~/interface/config'; +import type { Condition } from '~/db/CustomKnex'; +import Noco from '~/Noco'; +import { + CacheDelDirection, + CacheGetType, + CacheScope, + MetaTable, +} from '~/utils/globals'; +import NocoCache from '~/cache/NocoCache'; +import { extractProps } from '~/helpers/extractProps'; +import { prepareForDb, prepareForResponse } from '~/utils/modelUtils'; + +export default class Job { + id: string; + job: string; + status: string; + result: string; + fk_user_id: string; + fk_workspace_id: string; + base_id: string; + created_at: Date; + updated_at: Date; + + constructor(data: Partial) { + Object.assign(this, data); + } + + public static async insert( + context: NcContext, + jobObj: Partial, + ncMeta = Noco.ncMeta, + ) { + const insertObj = extractProps(jobObj, [ + 'job', + 'status', + 'result', + 'fk_user_id', + ]); + + const { id } = await ncMeta.metaInsert2( + context.workspace_id, + context.base_id, + MetaTable.JOBS, + insertObj, + ); + + return this.get(context, id, ncMeta); + } + + public static async update( + context: NcContext, + jobId: string, + jobObj: Partial, + ncMeta = Noco.ncMeta, + ) { + const updateObj = extractProps(jobObj, ['status', 'result']); + + const res = await ncMeta.metaUpdate( + context.workspace_id, + context.base_id, + MetaTable.JOBS, + prepareForDb(updateObj, 'result'), + jobId, + ); + + await NocoCache.update( + `${CacheScope.JOBS}:${jobId}`, + prepareForResponse(updateObj, 'result'), + ); + + return res; + } + + public static async delete( + context: NcContext, + jobId: string, + ncMeta = Noco.ncMeta, + ) { + await ncMeta.metaDelete( + context.workspace_id, + context.base_id, + MetaTable.JOBS, + jobId, + ); + + await NocoCache.deepDel( + `${CacheScope.JOBS}:${jobId}`, + CacheDelDirection.CHILD_TO_PARENT, + ); + } + + public static async get(context: NcContext, id: any, ncMeta = Noco.ncMeta) { + let jobData = + id && + (await NocoCache.get( + `${CacheScope.JOBS}:${id}`, + CacheGetType.TYPE_OBJECT, + )); + + if (!jobData) { + jobData = await ncMeta.metaGet2( + context.workspace_id, + context.base_id, + MetaTable.JOBS, + id, + ); + + jobData = prepareForResponse(jobData, 'result'); + + await NocoCache.set(`${CacheScope.JOBS}:${id}`, jobData); + } + + return jobData && new Job(jobData); + } + + public static async list( + context: NcContext, + opts: { + condition?: Record; + xcCondition?: Condition; + }, + ncMeta = Noco.ncMeta, + ): Promise { + const jobList = await ncMeta.metaList2( + context.workspace_id, + context.base_id, + MetaTable.JOBS, + opts, + ); + + return jobList.map((job) => { + return new Job(prepareForResponse(job, 'result')); + }); + } +} diff --git a/packages/nocodb/src/models/PresignedUrl.ts b/packages/nocodb/src/models/PresignedUrl.ts index 1e9213f31a..a344997263 100644 --- a/packages/nocodb/src/models/PresignedUrl.ts +++ b/packages/nocodb/src/models/PresignedUrl.ts @@ -91,10 +91,18 @@ export default class PresignedUrl { path: string; expireSeconds?: number; s3?: boolean; + filename?: string; }, ncMeta = Noco.ncMeta, ) { - const { path, expireSeconds = DEFAULT_EXPIRE_SECONDS, s3 = false } = param; + let { path } = param; + + const { + expireSeconds = DEFAULT_EXPIRE_SECONDS, + s3 = false, + filename, + } = param; + const expireAt = roundExpiry( new Date(new Date().getTime() + expireSeconds * 1000), ); // at least expireSeconds from now @@ -129,6 +137,7 @@ export default class PresignedUrl { tempUrl = await (storageAdapter as any).getSignedUrl( path, expiresInSeconds, + filename, ); await this.add({ path: path, @@ -139,6 +148,12 @@ export default class PresignedUrl { } else { // if not present, create a new url tempUrl = `dltemp/${nanoid(16)}/${expireAt.getTime()}/${path}`; + + // if filename is present, add it to the destination + if (filename) { + path = `${path}?filename=${encodeURIComponent(filename)}`; + } + await this.add({ path: path, url: tempUrl, diff --git a/packages/nocodb/src/models/index.ts b/packages/nocodb/src/models/index.ts index ae0c876191..e79c8f07ec 100644 --- a/packages/nocodb/src/models/index.ts +++ b/packages/nocodb/src/models/index.ts @@ -43,3 +43,4 @@ export { default as PresignedUrl } from './PresignedUrl'; export { default as UserRefreshToken } from './UserRefreshToken'; export { default as Extension } from './Extension'; export { default as Comment } from './Comment'; +export { default as Job } from './Job'; diff --git a/packages/nocodb/src/modules/jobs/fallback/fallback-queue.service.ts b/packages/nocodb/src/modules/jobs/fallback/fallback-queue.service.ts index 04b47e041a..37b8323477 100644 --- a/packages/nocodb/src/modules/jobs/fallback/fallback-queue.service.ts +++ b/packages/nocodb/src/modules/jobs/fallback/fallback-queue.service.ts @@ -7,7 +7,8 @@ import { MetaSyncProcessor } from '~/modules/jobs/jobs/meta-sync/meta-sync.proce import { SourceCreateProcessor } from '~/modules/jobs/jobs/source-create/source-create.processor'; import { SourceDeleteProcessor } from '~/modules/jobs/jobs/source-delete/source-delete.processor'; import { WebhookHandlerProcessor } from '~/modules/jobs/jobs/webhook-handler/webhook-handler.processor'; -import { JobsEventService } from '~/modules/jobs/fallback/jobs-event.service'; +import { DataExportProcessor } from '~/modules/jobs/jobs/data-export/data-export.processor'; +import { JobsEventService } from '~/modules/jobs/jobs-event.service'; import { JobStatus, JobTypes } from '~/interface/Jobs'; export interface Job { @@ -33,6 +34,7 @@ export class QueueService { protected readonly sourceCreateProcessor: SourceCreateProcessor, protected readonly sourceDeleteProcessor: SourceDeleteProcessor, protected readonly webhookHandlerProcessor: WebhookHandlerProcessor, + protected readonly dataExportProcessor: DataExportProcessor, ) { this.emitter.on(JobStatus.ACTIVE, (data: { job: Job }) => { const job = this.queueMemory.find((job) => job.id === data.job.id); @@ -94,6 +96,10 @@ export class QueueService { this: this.webhookHandlerProcessor, fn: this.webhookHandlerProcessor.job, }, + [JobTypes.DataExport]: { + this: this.dataExportProcessor, + fn: this.dataExportProcessor.job, + }, }; async jobWrapper(job: Job) { @@ -129,8 +135,8 @@ export class QueueService { QueueService.queueIdCounter = index; } - add(name: string, data: any, _opts = {}) { - const id = `${this.queueIndex++}`; + add(name: string, data: any, opts?: { jobId?: string }) { + const id = opts?.jobId || `${this.queueIndex++}`; const job = { id: `${id}`, name, status: JobStatus.WAITING, data }; this.queueMemory.push(job); this.queue.add(() => this.jobWrapper(job)); diff --git a/packages/nocodb/src/modules/jobs/fallback/jobs-event.service.ts b/packages/nocodb/src/modules/jobs/fallback/jobs-event.service.ts deleted file mode 100644 index 9abbb7ae37..0000000000 --- a/packages/nocodb/src/modules/jobs/fallback/jobs-event.service.ts +++ /dev/null @@ -1,54 +0,0 @@ -import { - OnQueueActive, - OnQueueCompleted, - OnQueueFailed, - Processor, -} from '@nestjs/bull'; -import { Job } from 'bull'; -import { EventEmitter2 } from '@nestjs/event-emitter'; -import { Logger } from '@nestjs/common'; -import { JobEvents, JOBS_QUEUE, JobStatus } from '~/interface/Jobs'; - -@Processor(JOBS_QUEUE) -export class JobsEventService { - protected logger = new Logger(JobsEventService.name); - - constructor(private eventEmitter: EventEmitter2) {} - - @OnQueueActive() - onActive(job: Job) { - this.eventEmitter.emit(JobEvents.STATUS, { - id: job.id.toString(), - status: JobStatus.ACTIVE, - }); - } - - @OnQueueFailed() - onFailed(job: Job, error: Error) { - this.logger.error( - `---- !! JOB FAILED !! ----\nid:${job.id}\nerror:${error.name} (${error.message})\n\nstack: ${error.stack}`, - ); - - const newLocal = this; - newLocal.eventEmitter.emit(JobEvents.STATUS, { - id: job.id.toString(), - status: JobStatus.FAILED, - data: { - error: { - message: error?.message, - }, - }, - }); - } - - @OnQueueCompleted() - onCompleted(job: Job, data: any) { - this.eventEmitter.emit(JobEvents.STATUS, { - id: job.id.toString(), - status: JobStatus.COMPLETED, - data: { - result: data, - }, - }); - } -} diff --git a/packages/nocodb/src/modules/jobs/fallback/jobs.service.ts b/packages/nocodb/src/modules/jobs/fallback/jobs.service.ts index c8f1676723..0b42c98266 100644 --- a/packages/nocodb/src/modules/jobs/fallback/jobs.service.ts +++ b/packages/nocodb/src/modules/jobs/fallback/jobs.service.ts @@ -2,6 +2,8 @@ import { Injectable } from '@nestjs/common'; import type { OnModuleInit } from '@nestjs/common'; import { QueueService } from '~/modules/jobs/fallback/fallback-queue.service'; import { JobStatus } from '~/interface/Jobs'; +import { Job } from '~/models'; +import { RootScopes } from '~/utils/globals'; @Injectable() export class JobsService implements OnModuleInit { @@ -10,7 +12,21 @@ export class JobsService implements OnModuleInit { async onModuleInit() {} async add(name: string, data: any) { - return this.fallbackQueueService.add(name, data); + const context = { + workspace_id: RootScopes.ROOT, + base_id: RootScopes.ROOT, + ...(data?.context || {}), + }; + + const jobData = await Job.insert(context, { + job: name, + status: JobStatus.WAITING, + fk_user_id: data?.user?.id, + }); + + this.fallbackQueueService.add(name, data, { jobId: jobData.id }); + + return jobData; } async jobStatus(jobId: string) { @@ -28,30 +44,6 @@ export class JobsService implements OnModuleInit { ]); } - async getJobWithData(data: any) { - const jobs = await this.fallbackQueueService.getJobs([ - // 'completed', - JobStatus.WAITING, - JobStatus.ACTIVE, - JobStatus.DELAYED, - // 'failed', - JobStatus.PAUSED, - ]); - - const job = jobs.find((j) => { - for (const key in data) { - if (j.data[key]) { - if (j.data[key] !== data[key]) return false; - } else { - return false; - } - } - return true; - }); - - return job; - } - async resumeQueue() { await this.fallbackQueueService.queue.start(); } diff --git a/packages/nocodb/src/modules/jobs/jobs-event.service.ts b/packages/nocodb/src/modules/jobs/jobs-event.service.ts new file mode 100644 index 0000000000..2badfa1122 --- /dev/null +++ b/packages/nocodb/src/modules/jobs/jobs-event.service.ts @@ -0,0 +1,108 @@ +import { + OnQueueActive, + OnQueueCompleted, + OnQueueFailed, + Processor, +} from '@nestjs/bull'; +import { Job as BullJob } from 'bull'; +import { EventEmitter2 } from '@nestjs/event-emitter'; +import { Logger } from '@nestjs/common'; +import { JobEvents, JOBS_QUEUE, JobStatus } from '~/interface/Jobs'; +import { Job } from '~/models'; +import { RootScopes } from '~/utils/globals'; + +@Processor(JOBS_QUEUE) +export class JobsEventService { + protected logger = new Logger(JobsEventService.name); + + constructor(private eventEmitter: EventEmitter2) {} + + @OnQueueActive() + onActive(job: BullJob) { + Job.update( + { + workspace_id: RootScopes.ROOT, + base_id: RootScopes.ROOT, + }, + job.id.toString(), + { + status: JobStatus.ACTIVE, + }, + ) + .then(() => { + this.eventEmitter.emit(JobEvents.STATUS, { + id: job.id.toString(), + status: JobStatus.ACTIVE, + }); + }) + .catch((error) => { + this.logger.error( + `Failed to update job (${job.id}) status to active: ${error.message}`, + ); + }); + } + + @OnQueueFailed() + onFailed(job: BullJob, error: Error) { + this.logger.error( + `---- !! JOB FAILED !! ----\nid:${job.id}\nerror:${error.name} (${error.message})\n\nstack: ${error.stack}`, + ); + + Job.update( + { + workspace_id: RootScopes.ROOT, + base_id: RootScopes.ROOT, + }, + job.id.toString(), + { + status: JobStatus.FAILED, + }, + ) + .then(() => { + const newLocal = this; + newLocal.eventEmitter.emit(JobEvents.STATUS, { + id: job.id.toString(), + status: JobStatus.FAILED, + data: { + error: { + message: error?.message, + }, + }, + }); + }) + .catch((error) => { + this.logger.error( + `Failed to update job (${job.id}) status to failed: ${error.message}`, + ); + }); + } + + @OnQueueCompleted() + onCompleted(job: BullJob, data: any) { + Job.update( + { + workspace_id: RootScopes.ROOT, + base_id: RootScopes.ROOT, + }, + job.id.toString(), + { + status: JobStatus.COMPLETED, + result: data, + }, + ) + .then(() => { + this.eventEmitter.emit(JobEvents.STATUS, { + id: job.id.toString(), + status: JobStatus.COMPLETED, + data: { + result: data, + }, + }); + }) + .catch((error) => { + this.logger.error( + `Failed to update job (${job.id}) status to completed: ${error.message}`, + ); + }); + } +} diff --git a/packages/nocodb/src/modules/jobs/jobs-service.interface.ts b/packages/nocodb/src/modules/jobs/jobs-service.interface.ts index 7f083582f2..3696067b71 100644 --- a/packages/nocodb/src/modules/jobs/jobs-service.interface.ts +++ b/packages/nocodb/src/modules/jobs/jobs-service.interface.ts @@ -7,7 +7,6 @@ export interface IJobsService { add(name: string, data: any): Promise>; jobStatus(jobId: string): Promise; jobList(): Promise[]>; - getJobWithData(data: any): Promise>; resumeQueue(): Promise; pauseQueue(): Promise; } diff --git a/packages/nocodb/src/modules/jobs/jobs.controller.ts b/packages/nocodb/src/modules/jobs/jobs.controller.ts index c363544730..045cc60da2 100644 --- a/packages/nocodb/src/modules/jobs/jobs.controller.ts +++ b/packages/nocodb/src/modules/jobs/jobs.controller.ts @@ -56,7 +56,7 @@ export class JobsController { } else { messages = ( await NocoCache.get( - `${CacheScope.JOBS}:${jobId}:messages`, + `${CacheScope.JOBS_POLLING}:${jobId}:messages`, CacheGetType.TYPE_OBJECT, ) )?.messages; @@ -92,38 +92,43 @@ export class JobsController { }; // subscribe to job events if (JobsRedis.available) { - const unsubscribeCallback = await JobsRedis.subscribe(jobId, async (data) => { - if (this.jobRooms[jobId]) { - this.jobRooms[jobId].listeners.forEach((res) => { - if (!res.headersSent) { - res.send({ - status: 'refresh', - }); - } - }); - } - - const cmd = data.cmd; - delete data.cmd; - switch (cmd) { - case JobEvents.STATUS: - if ( - [JobStatus.COMPLETED, JobStatus.FAILED].includes(data.status) - ) { - await unsubscribeCallback(); - delete this.jobRooms[jobId]; - // close the job after 1 second (to allow the update of messages) - setTimeout(() => { - this.closedJobs.push(jobId); - }, 1000); - // remove the job after polling interval * 2 - setTimeout(() => { - this.closedJobs = this.closedJobs.filter((j) => j !== jobId); - }, POLLING_INTERVAL * 2); - } - break; - } - }); + const unsubscribeCallback = await JobsRedis.subscribe( + jobId, + async (data) => { + if (this.jobRooms[jobId]) { + this.jobRooms[jobId].listeners.forEach((res) => { + if (!res.headersSent) { + res.send({ + status: 'refresh', + }); + } + }); + } + + const cmd = data.cmd; + delete data.cmd; + switch (cmd) { + case JobEvents.STATUS: + if ( + [JobStatus.COMPLETED, JobStatus.FAILED].includes(data.status) + ) { + await unsubscribeCallback(); + delete this.jobRooms[jobId]; + // close the job after 1 second (to allow the update of messages) + setTimeout(() => { + this.closedJobs.push(jobId); + }, 1000); + // remove the job after polling interval * 2 + setTimeout(() => { + this.closedJobs = this.closedJobs.filter( + (j) => j !== jobId, + ); + }, POLLING_INTERVAL * 2); + } + break; + } + }, + ); } } @@ -144,32 +149,6 @@ export class JobsController { }, POLLING_INTERVAL); } - @Post('/jobs/status') - async status(@Body() data: { id: string } | any) { - let res: { - id?: string; - status?: JobStatus; - } | null = null; - if (Object.keys(data).every((k) => ['id'].includes(k)) && data?.id) { - const rooms = (await this.jobsService.jobList()).map( - (j) => `jobs-${j.id}`, - ); - const room = rooms.find((r) => r === `jobs-${data.id}`); - if (room) { - res.id = data.id; - } - } else { - const job = await this.jobsService.getJobWithData(data); - if (job) { - res = {}; - res.id = `${job.id}`; - res.status = await this.jobsService.jobStatus(data.id); - } - } - - return res; - } - @OnEvent(JobEvents.STATUS) async sendJobStatus(data: { id: string; @@ -193,7 +172,7 @@ export class JobsController { this.localJobs[jobId].messages.shift(); } - await NocoCache.set(`${CacheScope.JOBS}:${jobId}:messages`, { + await NocoCache.set(`${CacheScope.JOBS_POLLING}:${jobId}:messages`, { messages: this.localJobs[jobId].messages, }); } else { @@ -208,7 +187,7 @@ export class JobsController { _mid: 1, }; - await NocoCache.set(`${CacheScope.JOBS}:${jobId}:messages`, { + await NocoCache.set(`${CacheScope.JOBS_POLLING}:${jobId}:messages`, { messages: this.localJobs[jobId].messages, }); } @@ -237,7 +216,7 @@ export class JobsController { setTimeout(async () => { delete this.jobRooms[jobId]; delete this.localJobs[jobId]; - await NocoCache.del(`${CacheScope.JOBS}:${jobId}:messages`); + await NocoCache.del(`${CacheScope.JOBS_POLLING}:${jobId}:messages`); }, POLLING_INTERVAL * 2); } } @@ -265,7 +244,7 @@ export class JobsController { this.localJobs[jobId].messages.shift(); } - await NocoCache.set(`${CacheScope.JOBS}:${jobId}:messages`, { + await NocoCache.set(`${CacheScope.JOBS_POLLING}:${jobId}:messages`, { messages: this.localJobs[jobId].messages, }); } else { @@ -280,7 +259,7 @@ export class JobsController { _mid: 1, }; - await NocoCache.set(`${CacheScope.JOBS}:${jobId}:messages`, { + await NocoCache.set(`${CacheScope.JOBS_POLLING}:${jobId}:messages`, { messages: this.localJobs[jobId].messages, }); } diff --git a/packages/nocodb/src/modules/jobs/jobs.module.ts b/packages/nocodb/src/modules/jobs/jobs.module.ts index 6f26ad3ddf..6f718d7efe 100644 --- a/packages/nocodb/src/modules/jobs/jobs.module.ts +++ b/packages/nocodb/src/modules/jobs/jobs.module.ts @@ -16,18 +16,19 @@ import { SourceCreateProcessor } from '~/modules/jobs/jobs/source-create/source- import { SourceDeleteController } from '~/modules/jobs/jobs/source-delete/source-delete.controller'; import { SourceDeleteProcessor } from '~/modules/jobs/jobs/source-delete/source-delete.processor'; import { WebhookHandlerProcessor } from '~/modules/jobs/jobs/webhook-handler/webhook-handler.processor'; +import { DataExportProcessor } from '~/modules/jobs/jobs/data-export/data-export.processor'; +import { DataExportController } from '~/modules/jobs/jobs/data-export/data-export.controller'; // Jobs Module Related import { JobsLogService } from '~/modules/jobs/jobs/jobs-log.service'; // import { JobsGateway } from '~/modules/jobs/jobs.gateway'; import { JobsController } from '~/modules/jobs/jobs.controller'; import { JobsService } from '~/modules/jobs/redis/jobs.service'; -import { JobsEventService } from '~/modules/jobs/redis/jobs-event.service'; +import { JobsEventService } from '~/modules/jobs/jobs-event.service'; // Fallback import { JobsService as FallbackJobsService } from '~/modules/jobs/fallback/jobs.service'; import { QueueService as FallbackQueueService } from '~/modules/jobs/fallback/fallback-queue.service'; -import { JobsEventService as FallbackJobsEventService } from '~/modules/jobs/fallback/jobs-event.service'; import { JOBS_QUEUE } from '~/interface/Jobs'; export const JobsModuleMetadata = { @@ -53,14 +54,14 @@ export const JobsModuleMetadata = { MetaSyncController, SourceCreateController, SourceDeleteController, + DataExportController, ] : []), ], providers: [ ...(process.env.NC_WORKER_CONTAINER !== 'true' ? [] : []), - ...(process.env.NC_REDIS_JOB_URL - ? [JobsEventService] - : [FallbackQueueService, FallbackJobsEventService]), + JobsEventService, + ...(process.env.NC_REDIS_JOB_URL ? [] : [FallbackQueueService]), { provide: 'JobsService', useClass: process.env.NC_REDIS_JOB_URL @@ -76,6 +77,7 @@ export const JobsModuleMetadata = { SourceCreateProcessor, SourceDeleteProcessor, WebhookHandlerProcessor, + DataExportProcessor, ], exports: ['JobsService'], }; diff --git a/packages/nocodb/src/modules/jobs/jobs/at-import/at-import.processor.ts b/packages/nocodb/src/modules/jobs/jobs/at-import/at-import.processor.ts index e21f6ca9c0..e011cff638 100644 --- a/packages/nocodb/src/modules/jobs/jobs/at-import/at-import.processor.ts +++ b/packages/nocodb/src/modules/jobs/jobs/at-import/at-import.processor.ts @@ -31,7 +31,7 @@ import { TablesService } from '~/services/tables.service'; import { ViewColumnsService } from '~/services/view-columns.service'; import { ViewsService } from '~/services/views.service'; import { FormsService } from '~/services/forms.service'; -import { JOBS_QUEUE, JobTypes } from '~/interface/Jobs'; +import { AtImportJobData, JOBS_QUEUE, JobTypes } from '~/interface/Jobs'; import { GridColumnsService } from '~/services/grid-columns.service'; import { TelemetryService } from '~/services/telemetry.service'; import NcConnectionMgrv2 from '~/utils/common/NcConnectionMgrv2'; @@ -112,7 +112,7 @@ export class AtImportProcessor { ) {} @Process(JobTypes.AtImport) - async job(job: Job) { + async job(job: Job) { this.debugLog(`job started for ${job.id}`); const context = job.data.context; @@ -2668,7 +2668,7 @@ export interface AirtableSyncConfig { apiKey: string; appId?: string; shareId: string; - user: UserType; + user: Partial; options: { syncViews: boolean; syncData: boolean; diff --git a/packages/nocodb/src/modules/jobs/jobs/data-export/data-export.controller.ts b/packages/nocodb/src/modules/jobs/jobs/data-export/data-export.controller.ts new file mode 100644 index 0000000000..f754a1677b --- /dev/null +++ b/packages/nocodb/src/modules/jobs/jobs/data-export/data-export.controller.ts @@ -0,0 +1,58 @@ +import { + Body, + Controller, + HttpCode, + Inject, + Param, + Post, + Req, + UseGuards, +} from '@nestjs/common'; +import type { DataExportJobData } from '~/interface/Jobs'; +import { GlobalGuard } from '~/guards/global/global.guard'; +import { Acl } from '~/middlewares/extract-ids/extract-ids.middleware'; +import { BasesService } from '~/services/bases.service'; +import { View } from '~/models'; +import { JobTypes } from '~/interface/Jobs'; +import { MetaApiLimiterGuard } from '~/guards/meta-api-limiter.guard'; +import { IJobsService } from '~/modules/jobs/jobs-service.interface'; +import { TenantContext } from '~/decorators/tenant-context.decorator'; +import { NcContext, NcRequest } from '~/interface/config'; +import { NcError } from '~/helpers/catchError'; + +@Controller() +@UseGuards(MetaApiLimiterGuard, GlobalGuard) +export class DataExportController { + constructor( + @Inject('JobsService') protected readonly jobsService: IJobsService, + protected readonly basesService: BasesService, + ) {} + + @Post(['/api/v2/export/:viewId/:exportAs']) + @HttpCode(200) + // TODO add new ACL + @Acl('dataList') + async exportModelData( + @TenantContext() context: NcContext, + @Req() req: NcRequest, + @Param('viewId') viewId: string, + @Param('exportAs') exportAs: 'csv' | 'json' | 'xlsx', + @Body() options: DataExportJobData['options'], + ) { + const view = await View.get(context, viewId); + + if (!view) NcError.viewNotFound(viewId); + + const job = await this.jobsService.add(JobTypes.DataExport, { + context, + options, + modelId: view.fk_model_id, + viewId, + user: req.user, + exportAs, + ncSiteUrl: req.ncSiteUrl, + }); + + return job; + } +} diff --git a/packages/nocodb/src/modules/jobs/jobs/data-export/data-export.processor.ts b/packages/nocodb/src/modules/jobs/jobs/data-export/data-export.processor.ts new file mode 100644 index 0000000000..4a76bf0c78 --- /dev/null +++ b/packages/nocodb/src/modules/jobs/jobs/data-export/data-export.processor.ts @@ -0,0 +1,132 @@ +import { Readable } from 'stream'; +import path from 'path'; +import { Process, Processor } from '@nestjs/bull'; +import { Logger } from '@nestjs/common'; +import { Job } from 'bull'; +import moment from 'moment'; +import { type DataExportJobData, JOBS_QUEUE, JobTypes } from '~/interface/Jobs'; +import { elapsedTime, initTime } from '~/modules/jobs/helpers'; +import { ExportService } from '~/modules/jobs/jobs/export-import/export.service'; +import { Model, PresignedUrl, View } from '~/models'; +import { NcError } from '~/helpers/catchError'; +import NcPluginMgrv2 from '~/helpers/NcPluginMgrv2'; + +function getViewTitle(view: View) { + return view.is_default ? 'Default View' : view.title; +} + +@Processor(JOBS_QUEUE) +export class DataExportProcessor { + private logger = new Logger(DataExportProcessor.name); + + constructor(private readonly exportService: ExportService) {} + + @Process(JobTypes.DataExport) + async job(job: Job) { + const { + context, + options, + modelId, + viewId, + user: _user, + exportAs, + ncSiteUrl, + } = job.data; + + if (exportAs !== 'csv') NcError.notImplemented(`Export as ${exportAs}`); + + const hrTime = initTime(); + + const model = await Model.get(context, modelId); + + if (!model) NcError.tableNotFound(modelId); + + const view = await View.get(context, viewId); + + if (!view) NcError.viewNotFound(viewId); + + // date time as containing folder YYYY-MM-DD/HH + const dateFolder = moment().format('YYYY-MM-DD/HH'); + + const storageAdapter = await NcPluginMgrv2.storageAdapter(); + + const destPath = `nc/uploads/data-export/${dateFolder}/${modelId}/${ + model.title + } (${getViewTitle(view)}) - ${Date.now()}.csv`; + + let url = null; + + try { + const dataStream = new Readable({ + read() {}, + }); + + dataStream.setEncoding('utf8'); + + let error = null; + + const uploadFilePromise = (storageAdapter as any) + .fileCreateByStream(destPath, dataStream) + .catch((e) => { + this.logger.error(e); + error = e; + }); + + this.exportService + .streamModelDataAsCsv(context, { + dataStream, + linkStream: null, + baseId: model.base_id, + modelId: model.id, + viewId: view.id, + ncSiteUrl: ncSiteUrl, + delimiter: options?.delimiter, + }) + .catch((e) => { + this.logger.debug(e); + dataStream.push(null); + error = e; + }); + + url = await uploadFilePromise; + + // if url is not defined, it is local attachment + if (!url) { + url = await PresignedUrl.getSignedUrl({ + path: path.join(destPath.replace('nc/uploads/', '')), + filename: `${model.title} (${getViewTitle(view)}).csv`, + expireSeconds: 3 * 60 * 60, // 3 hours + }); + } else { + if (url.includes('.amazonaws.com/')) { + const relativePath = decodeURI(url.split('.amazonaws.com/')[1]); + url = await PresignedUrl.getSignedUrl({ + path: relativePath, + filename: `${model.title} (${getViewTitle(view)}).csv`, + s3: true, + expireSeconds: 3 * 60 * 60, // 3 hours + }); + } + } + + if (error) { + throw error; + } + + elapsedTime( + hrTime, + `exported data for model ${modelId} view ${viewId} as ${exportAs}`, + 'exportData', + ); + } catch (e) { + throw NcError.badRequest(e); + } + + return { + timestamp: new Date(), + type: exportAs, + title: `${model.title} (${getViewTitle(view)})`, + url, + }; + } +} diff --git a/packages/nocodb/src/modules/jobs/jobs/export-import/duplicate.controller.ts b/packages/nocodb/src/modules/jobs/jobs/export-import/duplicate.controller.ts index 9984b757c5..05f4cdc987 100644 --- a/packages/nocodb/src/modules/jobs/jobs/export-import/duplicate.controller.ts +++ b/packages/nocodb/src/modules/jobs/jobs/export-import/duplicate.controller.ts @@ -8,10 +8,7 @@ import { Req, UseGuards, } from '@nestjs/common'; -import { - ProjectStatus, - readonlyMetaAllowedTypes, -} from 'nocodb-sdk'; +import { ProjectStatus, readonlyMetaAllowedTypes } from 'nocodb-sdk'; import { GlobalGuard } from '~/guards/global/global.guard'; import { Acl } from '~/middlewares/extract-ids/extract-ids.middleware'; import { BasesService } from '~/services/bases.service'; @@ -95,6 +92,7 @@ export class DuplicateController { workspace_id: base.fk_workspace_id, base_id: base.id, }, + user: req.user, baseId: base.id, sourceId: source.id, dupProjectId: dupProject.id, @@ -168,6 +166,7 @@ export class DuplicateController { const job = await this.jobsService.add(JobTypes.DuplicateBase, { context, + user: req.user, baseId: base.id, sourceId: source.id, dupProjectId: dupProject.id, @@ -233,6 +232,7 @@ export class DuplicateController { const job = await this.jobsService.add(JobTypes.DuplicateModel, { context, + user: req.user, baseId: base.id, sourceId: source.id, modelId: model.id, @@ -302,6 +302,7 @@ export class DuplicateController { const job = await this.jobsService.add(JobTypes.DuplicateColumn, { context, + user: req.user, baseId: base.id, sourceId: column.source_id, modelId: model.id, diff --git a/packages/nocodb/src/modules/jobs/jobs/export-import/duplicate.processor.ts b/packages/nocodb/src/modules/jobs/jobs/export-import/duplicate.processor.ts index ff60b6dc5c..be8c647033 100644 --- a/packages/nocodb/src/modules/jobs/jobs/export-import/duplicate.processor.ts +++ b/packages/nocodb/src/modules/jobs/jobs/export-import/duplicate.processor.ts @@ -5,6 +5,11 @@ import papaparse from 'papaparse'; import debug from 'debug'; import { isLinksOrLTAR, isVirtualCol, RelationTypes } from 'nocodb-sdk'; import type { NcContext } from '~/interface/config'; +import type { + DuplicateBaseJobData, + DuplicateColumnJobData, + DuplicateModelJobData, +} from '~/interface/Jobs'; import { Base, Column, Model, Source } from '~/models'; import { BasesService } from '~/services/bases.service'; import { @@ -31,7 +36,7 @@ export class DuplicateProcessor { ) {} @Process(JobTypes.DuplicateBase) - async duplicateBase(job: Job) { + async duplicateBase(job: Job) { this.debugLog(`job started for ${job.id} (${JobTypes.DuplicateBase})`); const hrTime = initTime(); @@ -131,10 +136,12 @@ export class DuplicateProcessor { } this.debugLog(`job completed for ${job.id} (${JobTypes.DuplicateBase})`); + + return { id: dupProject.id }; } @Process(JobTypes.DuplicateModel) - async duplicateModel(job: Job) { + async duplicateModel(job: Job) { this.debugLog(`job started for ${job.id} (${JobTypes.DuplicateModel})`); const hrTime = initTime(); @@ -241,11 +248,11 @@ export class DuplicateProcessor { this.debugLog(`job completed for ${job.id} (${JobTypes.DuplicateModel})`); - return await Model.get(context, findWithIdentifier(idMap, sourceModel.id)); + return { id: findWithIdentifier(idMap, sourceModel.id) }; } @Process(JobTypes.DuplicateColumn) - async duplicateColumn(job: Job) { + async duplicateColumn(job: Job) { this.debugLog(`job started for ${job.id} (${JobTypes.DuplicateColumn})`); const hrTime = initTime(); @@ -398,10 +405,7 @@ export class DuplicateProcessor { this.debugLog(`job completed for ${job.id} (${JobTypes.DuplicateModel})`); - return await Column.get(context, { - source_id: base.id, - colId: findWithIdentifier(idMap, sourceColumn.id), - }); + return { id: findWithIdentifier(idMap, sourceColumn.id) }; } async importModelsData( diff --git a/packages/nocodb/src/modules/jobs/jobs/export-import/export.service.ts b/packages/nocodb/src/modules/jobs/jobs/export-import/export.service.ts index 2866fb01de..05e063961c 100644 --- a/packages/nocodb/src/modules/jobs/jobs/export-import/export.service.ts +++ b/packages/nocodb/src/modules/jobs/jobs/export-import/export.service.ts @@ -9,7 +9,10 @@ import type { NcContext } from '~/interface/config'; import type { LinkToAnotherRecordColumn } from '~/models'; import { Base, Filter, Hook, Model, Source, View } from '~/models'; import NcConnectionMgrv2 from '~/utils/common/NcConnectionMgrv2'; -import { getViewAndModelByAliasOrId } from '~/helpers/dataHelpers'; +import { + getViewAndModelByAliasOrId, + serializeCellValue, +} from '~/helpers/dataHelpers'; import { clearPrefix, generateBaseIdMap, @@ -447,10 +450,14 @@ export class ExportService { viewId?: string; handledMmList?: string[]; _fieldIds?: string[]; + ncSiteUrl?: string; + delimiter?: string; }, ) { const { dataStream, linkStream, handledMmList } = param; + const dataExportMode = !linkStream; + const { model, view } = await getViewAndModelByAliasOrId(context, { baseName: param.baseId, tableName: param.modelId, @@ -463,32 +470,35 @@ export class ExportService { const btMap = new Map(); - for (const column of model.columns.filter( - (col) => - col.uidt === UITypes.LinkToAnotherRecord && - (col.colOptions?.type === RelationTypes.BELONGS_TO || - (col.colOptions?.type === RelationTypes.ONE_TO_ONE && col.meta?.bt)), - )) { - await column.getColOptions(context); - const fkCol = model.columns.find( - (c) => c.id === column.colOptions?.fk_child_column_id, - ); - if (fkCol) { - // replace bt column with fk column if it is in _fieldIds - if (param._fieldIds && param._fieldIds.includes(column.id)) { - param._fieldIds.push(fkCol.id); - const btIndex = param._fieldIds.indexOf(column.id); - param._fieldIds.splice(btIndex, 1); - } - - btMap.set( - fkCol.id, - `${column.base_id}::${column.source_id}::${column.fk_model_id}::${column.id}`, + if (!dataExportMode) { + for (const column of model.columns.filter( + (col) => + col.uidt === UITypes.LinkToAnotherRecord && + (col.colOptions?.type === RelationTypes.BELONGS_TO || + (col.colOptions?.type === RelationTypes.ONE_TO_ONE && + col.meta?.bt)), + )) { + await column.getColOptions(context); + const fkCol = model.columns.find( + (c) => c.id === column.colOptions?.fk_child_column_id, ); + if (fkCol) { + // replace bt column with fk column if it is in _fieldIds + if (param._fieldIds && param._fieldIds.includes(column.id)) { + param._fieldIds.push(fkCol.id); + const btIndex = param._fieldIds.indexOf(column.id); + param._fieldIds.splice(btIndex, 1); + } + + btMap.set( + fkCol.id, + `${column.base_id}::${column.source_id}::${column.fk_model_id}::${column.id}`, + ); + } } } - const fields = param._fieldIds + let fields = param._fieldIds ? model.columns .filter((c) => param._fieldIds?.includes(c.id)) .map((c) => c.title) @@ -498,6 +508,16 @@ export class ExportService { .map((c) => c.title) .join(','); + if (dataExportMode) { + const viewCols = await view.getColumns(context); + + fields = viewCols + .sort((a, b) => a.order - b.order) + .filter((c) => c.show) + .map((vc) => model.columns.find((c) => c.id === vc.fk_column_id).title) + .join(','); + } + const mmColumns = param._fieldIds ? model.columns .filter((c) => param._fieldIds?.includes(c.id)) @@ -506,7 +526,7 @@ export class ExportService { (col) => isLinksOrLTAR(col) && col.colOptions?.type === 'mm', ); - const hasLink = mmColumns.length > 0; + const hasLink = !dataExportMode && mmColumns.length > 0; dataStream.setEncoding('utf8'); @@ -564,6 +584,22 @@ export class ExportService { return { data }; }; + const formatAndSerialize = async (data: any) => { + for (const row of data) { + for (const [k, v] of Object.entries(row)) { + const col = model.columns.find((c) => c.title === k); + if (col) { + row[k] = await serializeCellValue(context, { + value: v, + column: col, + siteUrl: param.ncSiteUrl, + }); + } + } + } + return { data }; + }; + const baseModel = await Model.getBaseModelSQL(context, { id: model.id, viewId: view?.id, @@ -576,7 +612,7 @@ export class ExportService { try { await this.recursiveRead( context, - formatData, + dataExportMode ? formatAndSerialize : formatData, baseModel, dataStream, model, @@ -585,6 +621,8 @@ export class ExportService { limit, fields, true, + param.delimiter, + dataExportMode, ); } catch (e) { this.debugLog(e); @@ -670,13 +708,13 @@ export class ExportService { linkStream.push(null); } else { - linkStream.push(null); + if (linkStream) linkStream.push(null); } } async recursiveRead( context: NcContext, - formatter: (data: any) => { data: any }, + formatter: (data: any) => { data: any } | Promise<{ data: any }>, baseModel: BaseModelSqlv2, stream: Readable, model: Model, @@ -685,6 +723,8 @@ export class ExportService { limit: number, fields: string, header = false, + delimiter = ',', + dataExportMode = false, ): Promise { return new Promise((resolve, reject) => { this.datasService @@ -693,7 +733,7 @@ export class ExportService { view, query: { limit, offset, fields }, baseModel, - ignoreViewFilterAndSort: true, + ignoreViewFilterAndSort: !dataExportMode, limitOverride: limit, }) .then((result) => { @@ -701,25 +741,57 @@ export class ExportService { if (!header) { stream.push('\r\n'); } - const { data } = formatter(result.list); - stream.push(unparse(data, { header })); - if (result.pageInfo.isLastPage) { - stream.push(null); - resolve(); + + // check if formatter is async + const formatterPromise = formatter(result.list); + if (formatterPromise instanceof Promise) { + formatterPromise.then(({ data }) => { + stream.push(unparse(data, { header, delimiter })); + if (result.pageInfo.isLastPage) { + stream.push(null); + resolve(); + } else { + this.recursiveRead( + context, + formatter, + baseModel, + stream, + model, + view, + offset + limit, + limit, + fields, + false, + delimiter, + dataExportMode, + ) + .then(resolve) + .catch(reject); + } + }); } else { - this.recursiveRead( - context, - formatter, - baseModel, - stream, - model, - view, - offset + limit, - limit, - fields, - ) - .then(resolve) - .catch(reject); + stream.push(unparse(formatterPromise.data, { header })); + if (result.pageInfo.isLastPage) { + stream.push(null); + resolve(); + } else { + this.recursiveRead( + context, + formatter, + baseModel, + stream, + model, + view, + offset + limit, + limit, + fields, + false, + delimiter, + dataExportMode, + ) + .then(resolve) + .catch(reject); + } } } catch (e) { reject(e); diff --git a/packages/nocodb/src/modules/jobs/jobs/source-create/source-create.controller.ts b/packages/nocodb/src/modules/jobs/jobs/source-create/source-create.controller.ts index 99d7a1985a..0ce45081ac 100644 --- a/packages/nocodb/src/modules/jobs/jobs/source-create/source-create.controller.ts +++ b/packages/nocodb/src/modules/jobs/jobs/source-create/source-create.controller.ts @@ -50,6 +50,7 @@ export class SourceCreateController { const job = await this.jobsService.add(JobTypes.SourceCreate, { context, + user: req.user, baseId, source: body, req: { diff --git a/packages/nocodb/src/modules/jobs/jobs/source-create/source-create.processor.ts b/packages/nocodb/src/modules/jobs/jobs/source-create/source-create.processor.ts index dfeaa495c3..e80cf83763 100644 --- a/packages/nocodb/src/modules/jobs/jobs/source-create/source-create.processor.ts +++ b/packages/nocodb/src/modules/jobs/jobs/source-create/source-create.processor.ts @@ -46,7 +46,5 @@ export class SourceCreateProcessor { } this.debugLog(`job completed for ${job.id}`); - - return createdSource; } } diff --git a/packages/nocodb/src/modules/jobs/jobs/source-delete/source-delete.controller.ts b/packages/nocodb/src/modules/jobs/jobs/source-delete/source-delete.controller.ts index f6b4d0e777..c28673ce63 100644 --- a/packages/nocodb/src/modules/jobs/jobs/source-delete/source-delete.controller.ts +++ b/packages/nocodb/src/modules/jobs/jobs/source-delete/source-delete.controller.ts @@ -47,6 +47,7 @@ export class SourceDeleteController { const job = await this.jobsService.add(JobTypes.SourceDelete, { context, + user: req.user, sourceId, req: { user: req.user, diff --git a/packages/nocodb/src/modules/jobs/jobs/source-delete/source-delete.processor.ts b/packages/nocodb/src/modules/jobs/jobs/source-delete/source-delete.processor.ts index 5cc31e7ca0..fd2b2539ee 100644 --- a/packages/nocodb/src/modules/jobs/jobs/source-delete/source-delete.processor.ts +++ b/packages/nocodb/src/modules/jobs/jobs/source-delete/source-delete.processor.ts @@ -22,7 +22,5 @@ export class SourceDeleteProcessor { }); this.debugLog(`job completed for ${job.id}`); - - return true; } } diff --git a/packages/nocodb/src/modules/jobs/redis/jobs-event.service.ts b/packages/nocodb/src/modules/jobs/redis/jobs-event.service.ts deleted file mode 100644 index f044ac1f1b..0000000000 --- a/packages/nocodb/src/modules/jobs/redis/jobs-event.service.ts +++ /dev/null @@ -1,53 +0,0 @@ -import { - OnQueueActive, - OnQueueCompleted, - OnQueueFailed, - Processor, -} from '@nestjs/bull'; -import { Job } from 'bull'; -import { EventEmitter2 } from '@nestjs/event-emitter'; -import { Logger } from '@nestjs/common'; -import { JobEvents, JOBS_QUEUE, JobStatus } from '~/interface/Jobs'; - -@Processor(JOBS_QUEUE) -export class JobsEventService { - protected logger = new Logger(JobsEventService.name); - - constructor(private eventEmitter: EventEmitter2) {} - - @OnQueueActive() - onActive(job: Job) { - this.eventEmitter.emit(JobEvents.STATUS, { - id: job.id.toString(), - status: JobStatus.ACTIVE, - }); - } - - @OnQueueFailed() - onFailed(job: Job, error: Error) { - this.logger.error( - `---- !! JOB FAILED !! ----\nid:${job.id}\nerror:${error.name} (${error.message})\n\nstack: ${error.stack}`, - ); - - this.eventEmitter.emit(JobEvents.STATUS, { - id: job.id.toString(), - status: JobStatus.FAILED, - data: { - error: { - message: error?.message, - }, - }, - }); - } - - @OnQueueCompleted() - onCompleted(job: Job, data: any) { - this.eventEmitter.emit(JobEvents.STATUS, { - id: job.id.toString(), - status: JobStatus.COMPLETED, - data: { - result: data, - }, - }); - } -} diff --git a/packages/nocodb/src/modules/jobs/redis/jobs.service.ts b/packages/nocodb/src/modules/jobs/redis/jobs.service.ts index 0a581f2d0b..3b0cb097f8 100644 --- a/packages/nocodb/src/modules/jobs/redis/jobs.service.ts +++ b/packages/nocodb/src/modules/jobs/redis/jobs.service.ts @@ -4,6 +4,8 @@ import { Queue } from 'bull'; import type { OnModuleInit } from '@nestjs/common'; import { InstanceCommands, JOBS_QUEUE, JobStatus } from '~/interface/Jobs'; import { JobsRedis } from '~/modules/jobs/redis/jobs-redis'; +import { Job } from '~/models'; +import { RootScopes } from '~/utils/globals'; @Injectable() export class JobsService implements OnModuleInit { @@ -51,11 +53,24 @@ export class JobsService implements OnModuleInit { async add(name: string, data: any) { await this.toggleQueue(); - const job = await this.jobsQueue.add(name, data, { + const context = { + workspace_id: RootScopes.ROOT, + base_id: RootScopes.ROOT, + ...(data?.context || {}), + }; + + const jobData = await Job.insert(context, { + job: name, + status: JobStatus.WAITING, + fk_user_id: data?.user?.id, + }); + + await this.jobsQueue.add(name, data, { + jobId: jobData.id, removeOnComplete: true, }); - return job; + return jobData; } async jobStatus(jobId: string) { @@ -74,30 +89,6 @@ export class JobsService implements OnModuleInit { ]); } - async getJobWithData(data: any) { - const jobs = await this.jobsQueue.getJobs([ - // 'completed', - JobStatus.WAITING, - JobStatus.ACTIVE, - JobStatus.DELAYED, - // 'failed', - JobStatus.PAUSED, - ]); - - const job = jobs.find((j) => { - for (const key in data) { - if (j.data[key]) { - if (j.data[key] !== data[key]) return false; - } else { - return false; - } - } - return true; - }); - - return job; - } - async resumeQueue() { this.logger.log('Resuming global queue'); await this.jobsQueue.resume(); diff --git a/packages/nocodb/src/modules/noco.module.ts b/packages/nocodb/src/modules/noco.module.ts index 4cc916c988..7866bd16b5 100644 --- a/packages/nocodb/src/modules/noco.module.ts +++ b/packages/nocodb/src/modules/noco.module.ts @@ -100,6 +100,8 @@ import { CommandPaletteService } from '~/services/command-palette.service'; import { CommandPaletteController } from '~/controllers/command-palette.controller'; import { ExtensionsService } from '~/services/extensions.service'; import { ExtensionsController } from '~/controllers/extensions.controller'; +import { JobsMetaService } from '~/services/jobs-meta.service'; +import { JobsMetaController } from '~/controllers/jobs-meta.controller'; /* Datas */ import { DataTableController } from '~/controllers/data-table.controller'; @@ -178,6 +180,7 @@ export const nocoModuleMetadata = { NotificationsController, CommandPaletteController, ExtensionsController, + JobsMetaController, /* Datas */ DataTableController, @@ -246,6 +249,7 @@ export const nocoModuleMetadata = { NotificationsService, CommandPaletteService, ExtensionsService, + JobsMetaService, /* Datas */ DataTableService, diff --git a/packages/nocodb/src/plugins/s3/S3.ts b/packages/nocodb/src/plugins/s3/S3.ts index 5908fed023..4d40424f40 100644 --- a/packages/nocodb/src/plugins/s3/S3.ts +++ b/packages/nocodb/src/plugins/s3/S3.ts @@ -108,10 +108,13 @@ export default class S3 implements IStorageAdapterV2 { }); } - public async getSignedUrl(key, expiresInSeconds = 7200) { + public async getSignedUrl(key, expiresInSeconds = 7200, filename?: string) { const command = new GetObjectCommand({ Key: key, Bucket: this.input.bucket, + ...(filename + ? { ResponseContentDisposition: `attachment; filename="${filename}" ` } + : {}), }); return getSignedUrl(this.s3Client, command, { expiresIn: expiresInSeconds, diff --git a/packages/nocodb/src/schema/swagger-v2.json b/packages/nocodb/src/schema/swagger-v2.json index b60f29dfe3..ecaed2488d 100644 --- a/packages/nocodb/src/schema/swagger-v2.json +++ b/packages/nocodb/src/schema/swagger-v2.json @@ -11817,6 +11817,96 @@ } ] } + }, + "/api/v2/jobs/{baseId}": { + "post": { + "summary": "Get Jobs", + "operationId": "jobs-list", + "description": "Get list of jobs for a given base for the user", + "tags": [ + "Jobs" + ], + "requestBody": { + "content": { + "application/json": { + "schema": { + "type": "object", + "properties": { + "job": { + "type": "string" + }, + "status": { + "type": "string" + } + } + } + } + } + } + }, + "parameters": [ + { + "schema": { + "$ref": "#/components/schemas/Id", + "example": "p124dflkcvasewh", + "type": "string" + }, + "name": "baseId", + "in": "path", + "required": true, + "description": "Unique Base ID" + }, + { + "$ref": "#/components/parameters/xc-auth" + } + ] + }, + "/api/v2/export/{viewId}/{exportAs}": { + "post": { + "summary": "Trigger export as job", + "operationId": "export-data", + "description": "Trigger export as job", + "tags": [ + "Export" + ], + "requestBody": { + "content": { + "application/json": { + "schema": { + "type": "object" + } + } + } + } + }, + "parameters": [ + { + "schema": { + "$ref": "#/components/schemas/Id", + "example": "vw124dflkcvasewh", + "type": "string" + }, + "name": "viewId", + "in": "path", + "required": true, + "description": "Unique View ID" + }, + { + "schema": { + "type": "string", + "enum": [ + "csv" + ] + }, + "name": "exportAs", + "in": "path", + "required": true, + "description": "Export as format" + }, + { + "$ref": "#/components/parameters/xc-auth" + } + ] } }, "components": { diff --git a/packages/nocodb/src/schema/swagger.json b/packages/nocodb/src/schema/swagger.json index ba546821b3..5437020383 100644 --- a/packages/nocodb/src/schema/swagger.json +++ b/packages/nocodb/src/schema/swagger.json @@ -17943,14 +17943,57 @@ } ] }, - "/jobs/status": { + "/api/v2/jobs/{baseId}": { "post": { - "summary": "Jobs Status", - "operationId": "jobs-status", - "description": "Get job status", + "summary": "Get Jobs", + "operationId": "jobs-list", + "description": "Get list of jobs for a given base for the user", "tags": [ "Jobs" ], + "requestBody": { + "content": { + "application/json": { + "schema": { + "type": "object", + "properties": { + "job": { + "type": "string" + }, + "status": { + "type": "string" + } + } + } + } + } + } + }, + "parameters": [ + { + "schema": { + "$ref": "#/components/schemas/Id", + "example": "p124dflkcvasewh", + "type": "string" + }, + "name": "baseId", + "in": "path", + "required": true, + "description": "Unique Base ID" + }, + { + "$ref": "#/components/parameters/xc-auth" + } + ] + }, + "/api/v2/export/{viewId}/{exportAs}": { + "post": { + "summary": "Trigger export as job", + "operationId": "export-data", + "description": "Trigger export as job", + "tags": [ + "Export" + ], "requestBody": { "content": { "application/json": { @@ -17962,6 +18005,29 @@ } }, "parameters": [ + { + "schema": { + "$ref": "#/components/schemas/Id", + "example": "vw124dflkcvasewh", + "type": "string" + }, + "name": "viewId", + "in": "path", + "required": true, + "description": "Unique View ID" + }, + { + "schema": { + "type": "string", + "enum": [ + "csv" + ] + }, + "name": "exportAs", + "in": "path", + "required": true, + "description": "Export as format" + }, { "$ref": "#/components/parameters/xc-auth" } diff --git a/packages/nocodb/src/services/jobs-meta.service.spec.ts b/packages/nocodb/src/services/jobs-meta.service.spec.ts new file mode 100644 index 0000000000..d95546a254 --- /dev/null +++ b/packages/nocodb/src/services/jobs-meta.service.spec.ts @@ -0,0 +1,19 @@ +import { Test } from '@nestjs/testing'; +import { JobsMetaService } from './jobs-meta.service'; +import type { TestingModule } from '@nestjs/testing'; + +describe('JobsMetaService', () => { + let service: JobsMetaService; + + beforeEach(async () => { + const module: TestingModule = await Test.createTestingModule({ + providers: [JobsMetaService], + }).compile(); + + service = module.get(JobsMetaService); + }); + + it('should be defined', () => { + expect(service).toBeDefined(); + }); +}); diff --git a/packages/nocodb/src/services/jobs-meta.service.ts b/packages/nocodb/src/services/jobs-meta.service.ts new file mode 100644 index 0000000000..cada6cef87 --- /dev/null +++ b/packages/nocodb/src/services/jobs-meta.service.ts @@ -0,0 +1,83 @@ +import { Injectable } from '@nestjs/common'; +import dayjs from 'dayjs'; +import type { NcContext, NcRequest } from '~/interface/config'; +import type { JobTypes } from '~/interface/Jobs'; +import { JobStatus } from '~/interface/Jobs'; +import { Job } from '~/models'; +import Noco from '~/Noco'; + +@Injectable() +export class JobsMetaService { + constructor() {} + + async list( + context: NcContext, + param: { job?: JobTypes; status?: JobStatus }, + req: NcRequest, + ) { + /* + * List jobs for the current base. + * If the job is not created by the current user, exclude the result. + * List jobs updated in the last 1 hour or jobs that are still active(, waiting, or delayed). + */ + return Job.list(context, { + xcCondition: { + _and: [ + ...(param.job + ? [ + { + job: { + eq: param.job, + }, + }, + ] + : []), + ...(param.status + ? [ + { + status: { + eq: param.status, + }, + }, + ] + : []), + { + _or: [ + { + updated_at: { + gt: Noco.ncMeta.formatDateTime( + dayjs().subtract(1, 'hour').toISOString(), + ), + }, + }, + { + status: { + eq: JobStatus.ACTIVE, + }, + }, + { + status: { + eq: JobStatus.WAITING, + }, + }, + { + status: { + eq: JobStatus.DELAYED, + }, + }, + ], + }, + ], + }, + }).then((jobs) => { + return jobs.map((job) => { + if (job.fk_user_id === req.user.id) { + return job; + } else { + const { result, ...rest } = job; + return rest; + } + }); + }); + } +} diff --git a/packages/nocodb/src/utils/acl.ts b/packages/nocodb/src/utils/acl.ts index f32253a540..ee98a7b39d 100644 --- a/packages/nocodb/src/utils/acl.ts +++ b/packages/nocodb/src/utils/acl.ts @@ -143,6 +143,9 @@ const permissionScopes = { 'extensionCreate', 'extensionUpdate', 'extensionDelete', + + // Jobs + 'jobList', ], }; @@ -209,6 +212,7 @@ const rolePermissions: extensionList: true, extensionRead: true, + jobList: true, commentList: true, commentsCount: true, auditListRow: true, diff --git a/packages/nocodb/src/utils/globals.ts b/packages/nocodb/src/utils/globals.ts index e4fcc14ad5..e70c7c8ff6 100644 --- a/packages/nocodb/src/utils/globals.ts +++ b/packages/nocodb/src/utils/globals.ts @@ -51,6 +51,7 @@ export enum MetaTable { COMMENTS = 'nc_comments', USER_COMMENTS_NOTIFICATIONS_PREFERENCE = 'nc_user_comment_notifications_preference', COMMENTS_REACTIONS = 'nc_comment_reactions', + JOBS = 'nc_jobs', } export enum MetaTableOldV2 { @@ -171,6 +172,7 @@ export enum CacheScope { DASHBOARD_PROJECT_DB_PROJECT_LINKING = 'dashboardProjectDBProjectLinking', SINGLE_QUERY = 'singleQuery', JOBS = 'nc_jobs', + JOBS_POLLING = 'nc_jobs_polling', PRESIGNED_URL = 'presignedUrl', STORE = 'store', PROJECT_ALIAS = 'baseAlias', @@ -281,6 +283,7 @@ export const RootScopeTables = { MetaTable.PLUGIN, MetaTable.STORE, MetaTable.NOTIFICATION, + MetaTable.JOBS, // Temporarily added need to be discussed within team MetaTable.AUDIT, ],