Browse Source

feat: export as on background (#8890)

* feat: csv export job

* fix: duplicate job

* feat: csv export job final

* feat: data export extension POC

* feat: data export final

* fix: extensions & scroll

---------

Co-authored-by: Raju Udava <86527202+dstala@users.noreply.github.com>
pull/8891/head
Mert E 5 months ago committed by GitHub
parent
commit
fb213eee0e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 10
      packages/nc-gui/components/dlg/AirtableImport.vue
  2. 2
      packages/nc-gui/components/smartsheet/grid/PaginationV2.vue
  3. 2
      packages/nc-gui/components/smartsheet/grid/index.vue
  4. 63
      packages/nc-gui/composables/useJobs.ts
  5. BIN
      packages/nc-gui/extensions/data-exporter/icon.png
  6. 180
      packages/nc-gui/extensions/data-exporter/index.vue
  7. 11
      packages/nc-gui/extensions/data-exporter/manifest.json
  8. 21
      packages/nocodb/src/controllers/attachments-secure.controller.ts
  9. 44
      packages/nocodb/src/controllers/attachments.controller.ts
  10. 21
      packages/nocodb/src/controllers/jobs-meta.controller.spec.ts
  11. 28
      packages/nocodb/src/controllers/jobs-meta.controller.ts
  12. 7
      packages/nocodb/src/helpers/dataHelpers.ts
  13. 73
      packages/nocodb/src/interface/Jobs.ts
  14. 11
      packages/nocodb/src/meta/meta.service.ts
  15. 4
      packages/nocodb/src/meta/migrations/XcMigrationSourcev2.ts
  16. 30
      packages/nocodb/src/meta/migrations/v2/nc_053_jobs.ts
  17. 136
      packages/nocodb/src/models/Job.ts
  18. 17
      packages/nocodb/src/models/PresignedUrl.ts
  19. 1
      packages/nocodb/src/models/index.ts
  20. 12
      packages/nocodb/src/modules/jobs/fallback/fallback-queue.service.ts
  21. 54
      packages/nocodb/src/modules/jobs/fallback/jobs-event.service.ts
  22. 42
      packages/nocodb/src/modules/jobs/fallback/jobs.service.ts
  23. 108
      packages/nocodb/src/modules/jobs/jobs-event.service.ts
  24. 1
      packages/nocodb/src/modules/jobs/jobs-service.interface.ts
  25. 107
      packages/nocodb/src/modules/jobs/jobs.controller.ts
  26. 12
      packages/nocodb/src/modules/jobs/jobs.module.ts
  27. 6
      packages/nocodb/src/modules/jobs/jobs/at-import/at-import.processor.ts
  28. 58
      packages/nocodb/src/modules/jobs/jobs/data-export/data-export.controller.ts
  29. 132
      packages/nocodb/src/modules/jobs/jobs/data-export/data-export.processor.ts
  30. 9
      packages/nocodb/src/modules/jobs/jobs/export-import/duplicate.controller.ts
  31. 20
      packages/nocodb/src/modules/jobs/jobs/export-import/duplicate.processor.ts
  32. 164
      packages/nocodb/src/modules/jobs/jobs/export-import/export.service.ts
  33. 1
      packages/nocodb/src/modules/jobs/jobs/source-create/source-create.controller.ts
  34. 2
      packages/nocodb/src/modules/jobs/jobs/source-create/source-create.processor.ts
  35. 1
      packages/nocodb/src/modules/jobs/jobs/source-delete/source-delete.controller.ts
  36. 2
      packages/nocodb/src/modules/jobs/jobs/source-delete/source-delete.processor.ts
  37. 53
      packages/nocodb/src/modules/jobs/redis/jobs-event.service.ts
  38. 43
      packages/nocodb/src/modules/jobs/redis/jobs.service.ts
  39. 4
      packages/nocodb/src/modules/noco.module.ts
  40. 5
      packages/nocodb/src/plugins/s3/S3.ts
  41. 90
      packages/nocodb/src/schema/swagger-v2.json
  42. 74
      packages/nocodb/src/schema/swagger.json
  43. 19
      packages/nocodb/src/services/jobs-meta.service.spec.ts
  44. 83
      packages/nocodb/src/services/jobs-meta.service.ts
  45. 4
      packages/nocodb/src/utils/acl.ts
  46. 3
      packages/nocodb/src/utils/globals.ts

10
packages/nc-gui/components/dlg/AirtableImport.vue

@ -22,6 +22,8 @@ const { refreshCommandPalette } = useCommandPalette()
const { loadTables } = baseStore const { loadTables } = baseStore
const { getJobsForBase, loadJobsForBase } = useJobs()
const showGoToDashboardButton = ref(false) const showGoToDashboardButton = ref(false)
const step = ref(1) const step = ref(1)
@ -141,7 +143,13 @@ async function listenForUpdates(id?: string) {
listeningForUpdates.value = true listeningForUpdates.value = true
const job = id ? { id } : await $api.jobs.status({ syncId: syncSource.value.id }) await loadJobsForBase(baseId)
const jobs = await getJobsForBase(baseId)
const job = id
? { id }
: jobs.find((j) => j.base_id === baseId && j.status !== JobStatus.COMPLETED && j.status !== JobStatus.FAILED)
if (!job) { if (!job) {
listeningForUpdates.value = false listeningForUpdates.value = false

2
packages/nc-gui/components/smartsheet/grid/PaginationV2.vue

@ -256,7 +256,7 @@ const renderAltOrOptlKey = () => {
<div class="!pl-8 pr-60 !w-8 h-1"></div> <div class="!pl-8 pr-60 !w-8 h-1"></div>
<div class="fixed h-9 bg-white border-l-1 border-gray-200 px-1 flex items-center right-0"> <div class="absolute h-9 bg-white border-l-1 border-gray-200 px-1 flex items-center right-0">
<NcPaginationV2 <NcPaginationV2
v-if="count !== Infinity" v-if="count !== Infinity"
v-model:current="page" v-model:current="page"

2
packages/nc-gui/components/smartsheet/grid/index.vue

@ -235,7 +235,7 @@ onMounted(() => {
<div <div
class="relative flex flex-col h-full min-h-0 w-full nc-grid-wrapper" class="relative flex flex-col h-full min-h-0 w-full nc-grid-wrapper"
data-testid="nc-grid-wrapper" data-testid="nc-grid-wrapper"
:style="`width: ${viewWidth}px; background-color: ${isGroupBy ? `${baseColor}` : 'var(--nc-grid-bg)'};`" :style="`background-color: ${isGroupBy ? `${baseColor}` : 'var(--nc-grid-bg)'};`"
> >
<Table <Table
v-if="!isGroupBy" v-if="!isGroupBy"

63
packages/nc-gui/composables/useJobs.ts

@ -0,0 +1,63 @@
const jobsState = createGlobalState(() => {
const baseJobs = ref<Record<string, JobType[]>>({})
return { baseJobs }
})
interface JobType {
id: string
job: string
status: string
result: Record<string, any>
fk_user_id: string
fk_workspace_id: string
base_id: string
created_at: Date
updated_at: Date
}
export const useJobs = createSharedComposable(() => {
const { baseJobs } = jobsState()
const { $api } = useNuxtApp()
const { base } = storeToRefs(useBase())
const activeBaseJobs = computed(() => {
if (!base.value || !base.value.id) {
return null
}
return baseJobs.value[base.value.id]
})
const jobList = computed<JobType[]>(() => {
return activeBaseJobs.value || []
})
const getJobsForBase = (baseId: string) => {
return baseJobs.value[baseId] || []
}
const loadJobsForBase = async (baseId?: string) => {
if (!baseId) {
baseId = base.value.id
if (!baseId) {
return
}
}
const jobs: JobType[] = await $api.jobs.list(baseId, {})
if (baseJobs.value[baseId]) {
baseJobs.value[baseId] = jobs || baseJobs.value[baseId]
} else {
baseJobs.value[baseId] = jobs || []
}
}
return {
jobList,
loadJobsForBase,
getJobsForBase,
}
})

BIN
packages/nc-gui/extensions/data-exporter/icon.png

Binary file not shown.

After

Width:  |  Height:  |  Size: 36 KiB

180
packages/nc-gui/extensions/data-exporter/index.vue

@ -0,0 +1,180 @@
<script setup lang="ts">
import dayjs from 'dayjs'
import { type ViewType, ViewTypes } from 'nocodb-sdk'
const { $api, $poller } = useNuxtApp()
const { appInfo } = useGlobal()
const { extension, tables, fullscreen, getViewsForTable } = useExtensionHelperOrThrow()
const { jobList, loadJobsForBase } = useJobs()
const views = ref<ViewType[]>([])
const exportedFiles = computed(() => {
return jobList.value
.filter((job) => job.job === 'data-export')
.map((job) => {
return {
...job,
result: (job.result || {}) as { url: string; type: 'csv' | 'json' | 'xlsx'; title: string; timestamp: number },
}
})
.sort((a, b) => dayjs(b.created_at).unix() - dayjs(a.created_at).unix())
})
const exportPayload = ref<{
tableId?: string
viewId?: string
}>({})
const tableList = computed(() => {
return tables.value.map((table) => {
return {
label: table.title,
value: table.id,
}
})
})
const viewList = computed(() => {
if (!exportPayload.value.tableId) return []
return (
views.value
.filter((view) => view.type === ViewTypes.GRID)
.map((view) => {
return {
label: view.is_default ? `Default View` : view.title,
value: view.id,
}
}) || []
)
})
const reloadViews = async () => {
if (exportPayload.value.tableId) {
views.value = await getViewsForTable(exportPayload.value.tableId)
}
}
const onTableSelect = async (tableId: string) => {
exportPayload.value.tableId = tableId
await reloadViews()
exportPayload.value.viewId = views.value.find((view) => view.is_default)?.id
await extension.value.kvStore.set('exportPayload', exportPayload.value)
}
const onViewSelect = async (viewId: string) => {
exportPayload.value.viewId = viewId
await extension.value.kvStore.set('exportPayload', exportPayload.value)
}
const isExporting = ref(false)
async function exportDataAsync() {
try {
if (isExporting.value || !exportPayload.value.viewId) return
isExporting.value = true
const jobData = await $api.export.data(exportPayload.value.viewId, 'csv', {})
jobList.value.unshift(jobData)
$poller.subscribe(
{ id: jobData.id },
async (data: {
id: string
status?: string
data?: {
error?: {
message: string
}
message?: string
result?: any
}
}) => {
if (data.status !== 'close') {
if (data.status === JobStatus.COMPLETED) {
// Export completed successfully
message.info('Successfully exported data!')
const job = jobList.value.find((j) => j.id === data.id)
if (job) {
job.status = JobStatus.COMPLETED
job.result = data.data?.result
}
isExporting.value = false
} else if (data.status === JobStatus.FAILED) {
message.error('Failed to export data!')
const job = jobList.value.find((j) => j.id === data.id)
if (job) {
job.status = JobStatus.FAILED
}
isExporting.value = false
}
}
},
)
} catch (e: any) {
message.error(await extractSdkResponseErrorMsg(e))
}
}
const urlHelper = (url: string) => {
if (url.startsWith('http')) {
return url
} else {
return `${appInfo.value.ncSiteUrl || BASE_FALLBACK_URL}/${url}`
}
}
const titleHelper = () => {
const table = tables.value.find((t) => t.id === exportPayload.value.tableId)
const view = views.value.find((v) => v.id === exportPayload.value.viewId)
return `${table?.title} (${view?.is_default ? 'Default View' : view?.title})`
}
onMounted(() => {
exportPayload.value = extension.value.kvStore.get('exportPayload') || {}
reloadViews()
loadJobsForBase()
})
</script>
<template>
<div class="flex flex-col gap-2 p-2">
<NcSelect v-model:value="exportPayload.tableId" :options="tableList" @disabled="isExporting" @change="onTableSelect" />
<NcSelect v-model:value="exportPayload.viewId" :options="viewList" @disabled="isExporting" @change="onViewSelect" />
<NcButton @loading="isExporting" @click="exportDataAsync">Export</NcButton>
<div
class="flex flex-col"
:class="{
'max-h-[60px] overflow-auto': !fullscreen,
}"
>
<div v-for="exp in exportedFiles" :key="exp.id" class="flex items-center gap-1">
<template v-if="exp.status === JobStatus.COMPLETED && exp.result">
<GeneralIcon icon="file" />
<div>{{ exp.result.title }}</div>
<a :href="urlHelper(exp.result.url)" target="_blank">Download</a>
</template>
<template v-else-if="exp.status === JobStatus.FAILED">
<GeneralIcon icon="error" class="text-red-500" />
<div>{{ exp.result.title }}</div>
</template>
<template v-else>
<GeneralLoader size="small" />
<div>{{ titleHelper() }}</div>
</template>
</div>
</div>
</div>
</template>
<style lang="scss"></style>

11
packages/nc-gui/extensions/data-exporter/manifest.json

@ -0,0 +1,11 @@
{
"id": "nc-data-exporter",
"title": "Data Exporter",
"description": "Export any view in various formats",
"entry": "data-exporter",
"version": "0.1",
"iconUrl": "data-exporter/icon.png",
"publisherName": "NocoDB",
"publisherEmail": "contact@nocodb.com",
"publisherUrl": "https://www.nocodb.com"
}

21
packages/nocodb/src/controllers/attachments-secure.controller.ts

@ -69,16 +69,33 @@ export class AttachmentsSecureController {
@Get('/dltemp/:param(*)') @Get('/dltemp/:param(*)')
async fileReadv3(@Param('param') param: string, @Res() res: Response) { async fileReadv3(@Param('param') param: string, @Res() res: Response) {
try { try {
const fpath = await PresignedUrl.getPath(`dltemp/${param}`); const fullPath = await PresignedUrl.getPath(`dltemp/${param}`);
const queryHelper = fullPath.split('?');
const fpath = queryHelper[0];
let queryFilename = null;
if (queryHelper.length > 1) {
const query = new URLSearchParams(queryHelper[1]);
queryFilename = query.get('filename');
}
const file = await this.attachmentsService.getFile({ const file = await this.attachmentsService.getFile({
path: path.join('nc', 'uploads', fpath), path: path.join('nc', 'uploads', fpath),
}); });
if (this.attachmentsService.previewAvailable(file.type)) { if (this.attachmentsService.previewAvailable(file.type)) {
if (queryFilename) {
res.setHeader(
'Content-Disposition',
`attachment; filename=${queryFilename}`,
);
}
res.sendFile(file.path); res.sendFile(file.path);
} else { } else {
res.download(file.path); res.download(file.path, queryFilename);
} }
} catch (e) { } catch (e) {
res.status(404).send('Not found'); res.status(404).send('Not found');

44
packages/nocodb/src/controllers/attachments.controller.ts

@ -63,16 +63,26 @@ export class AttachmentsController {
// , getCacheMiddleware(), catchError(fileRead)); // , getCacheMiddleware(), catchError(fileRead));
@Get('/download/:filename(*)') @Get('/download/:filename(*)')
// This route will match any URL that starts with // This route will match any URL that starts with
async fileRead(@Param('filename') filename: string, @Res() res: Response) { async fileRead(
@Param('filename') filename: string,
@Res() res: Response,
@Query('filename') queryFilename?: string,
) {
try { try {
const file = await this.attachmentsService.getFile({ const file = await this.attachmentsService.getFile({
path: path.join('nc', 'uploads', filename), path: path.join('nc', 'uploads', filename),
}); });
if (this.attachmentsService.previewAvailable(file.type)) { if (this.attachmentsService.previewAvailable(file.type)) {
if (queryFilename) {
res.setHeader(
'Content-Disposition',
`attachment; filename=${queryFilename}`,
);
}
res.sendFile(file.path); res.sendFile(file.path);
} else { } else {
res.download(file.path); res.download(file.path, queryFilename);
} }
} catch (e) { } catch (e) {
res.status(404).send('Not found'); res.status(404).send('Not found');
@ -87,6 +97,7 @@ export class AttachmentsController {
@Param('param2') param2: string, @Param('param2') param2: string,
@Param('filename') filename: string, @Param('filename') filename: string,
@Res() res: Response, @Res() res: Response,
@Query('filename') queryFilename?: string,
) { ) {
try { try {
const file = await this.attachmentsService.getFile({ const file = await this.attachmentsService.getFile({
@ -100,9 +111,15 @@ export class AttachmentsController {
}); });
if (this.attachmentsService.previewAvailable(file.type)) { if (this.attachmentsService.previewAvailable(file.type)) {
if (queryFilename) {
res.setHeader(
'Content-Disposition',
`attachment; filename=${queryFilename}`,
);
}
res.sendFile(file.path); res.sendFile(file.path);
} else { } else {
res.download(file.path); res.download(file.path, queryFilename);
} }
} catch (e) { } catch (e) {
res.status(404).send('Not found'); res.status(404).send('Not found');
@ -112,16 +129,33 @@ export class AttachmentsController {
@Get('/dltemp/:param(*)') @Get('/dltemp/:param(*)')
async fileReadv3(@Param('param') param: string, @Res() res: Response) { async fileReadv3(@Param('param') param: string, @Res() res: Response) {
try { try {
const fpath = await PresignedUrl.getPath(`dltemp/${param}`); const fullPath = await PresignedUrl.getPath(`dltemp/${param}`);
const queryHelper = fullPath.split('?');
const fpath = queryHelper[0];
let queryFilename = null;
if (queryHelper.length > 1) {
const query = new URLSearchParams(queryHelper[1]);
queryFilename = query.get('filename');
}
const file = await this.attachmentsService.getFile({ const file = await this.attachmentsService.getFile({
path: path.join('nc', 'uploads', fpath), path: path.join('nc', 'uploads', fpath),
}); });
if (this.attachmentsService.previewAvailable(file.type)) { if (this.attachmentsService.previewAvailable(file.type)) {
if (queryFilename) {
res.setHeader(
'Content-Disposition',
`attachment; filename=${queryFilename}`,
);
}
res.sendFile(file.path); res.sendFile(file.path);
} else { } else {
res.download(file.path); res.download(file.path, queryFilename);
} }
} catch (e) { } catch (e) {
res.status(404).send('Not found'); res.status(404).send('Not found');

21
packages/nocodb/src/controllers/jobs-meta.controller.spec.ts

@ -0,0 +1,21 @@
import { Test } from '@nestjs/testing';
import { HooksService } from '../services/hooks.service';
import { JobsMetaController } from './jobs-meta.controller';
import type { TestingModule } from '@nestjs/testing';
describe('JobsMetaController', () => {
let controller: JobsMetaController;
beforeEach(async () => {
const module: TestingModule = await Test.createTestingModule({
controllers: [JobsMetaController],
providers: [HooksService],
}).compile();
controller = module.get<JobsMetaController>(JobsMetaController);
});
it('should be defined', () => {
expect(controller).toBeDefined();
});
});

28
packages/nocodb/src/controllers/jobs-meta.controller.ts

@ -0,0 +1,28 @@
import { Body, Controller, Post, Req, UseGuards } from '@nestjs/common';
import type { JobStatus, JobTypes } from '~/interface/Jobs';
import { GlobalGuard } from '~/guards/global/global.guard';
import { Acl } from '~/middlewares/extract-ids/extract-ids.middleware';
import { MetaApiLimiterGuard } from '~/guards/meta-api-limiter.guard';
import { TenantContext } from '~/decorators/tenant-context.decorator';
import { NcContext, NcRequest } from '~/interface/config';
import { JobsMetaService } from '~/services/jobs-meta.service';
@Controller()
@UseGuards(MetaApiLimiterGuard, GlobalGuard)
export class JobsMetaController {
constructor(private readonly jobsMetaService: JobsMetaService) {}
@Post(['/api/v2/jobs/:baseId'])
@Acl('jobList')
async jobList(
@TenantContext() context: NcContext,
@Req() req: NcRequest,
@Body()
conditions?: {
job?: JobTypes;
status?: JobStatus;
},
) {
return await this.jobsMetaService.list(context, conditions, req);
}
}

7
packages/nocodb/src/helpers/dataHelpers.ts

@ -222,6 +222,13 @@ export async function serializeCellValue(
.join(', '); .join(', ');
} }
break; break;
case UITypes.Decimal:
{
if (isNaN(Number(value))) return null;
return Number(value).toFixed(column.meta?.precision ?? 1);
}
break;
default: default:
if (value && typeof value === 'object') { if (value && typeof value === 'object') {
return JSON.stringify(value); return JSON.stringify(value);

73
packages/nocodb/src/interface/Jobs.ts

@ -1,4 +1,5 @@
import type { NcContext } from '~/interface/config'; import type { UserType } from 'nocodb-sdk';
import type { NcContext, NcRequest } from '~/interface/config';
export const JOBS_QUEUE = 'jobs'; export const JOBS_QUEUE = 'jobs';
export enum JobTypes { export enum JobTypes {
@ -15,6 +16,7 @@ export enum JobTypes {
HealthCheck = 'health-check', HealthCheck = 'health-check',
HandleWebhook = 'handle-webhook', HandleWebhook = 'handle-webhook',
CleanUp = 'clean-up', CleanUp = 'clean-up',
DataExport = 'data-export',
} }
export enum JobStatus { export enum JobStatus {
@ -44,12 +46,77 @@ export enum InstanceCommands {
RELEASE = 'release', RELEASE = 'release',
} }
export interface HandleWebhookJobData { export interface JobData {
context: NcContext; context: NcContext;
user: Partial<UserType>;
}
export interface AtImportJobData extends JobData {
syncId: string;
baseId: string;
sourceId: string;
baseName: string;
authToken: string;
baseURL: string;
clientIp: string;
options?: {
syncViews?: boolean;
syncAttachment?: boolean;
syncLookup?: boolean;
syncRollup?: boolean;
syncUsers?: boolean;
syncData?: boolean;
};
user: any;
}
export interface DuplicateBaseJobData extends JobData {
sourceId: string;
dupProjectId: string;
req: NcRequest;
options: {
excludeData?: boolean;
excludeViews?: boolean;
excludeHooks?: boolean;
};
}
export interface DuplicateModelJobData extends JobData {
sourceId: string;
modelId: string;
title: string;
req: NcRequest;
options: {
excludeData?: boolean;
excludeViews?: boolean;
excludeHooks?: boolean;
};
}
export interface DuplicateColumnJobData extends JobData {
sourceId: string;
columnId: string;
extra: Record<string, any>; // extra data
req: NcRequest;
options: {
excludeData?: boolean;
};
}
export interface HandleWebhookJobData extends JobData {
hookId: string; hookId: string;
modelId: string; modelId: string;
viewId: string; viewId: string;
prevData; prevData;
newData; newData;
user; }
export interface DataExportJobData extends JobData {
options?: {
delimiter?: string;
};
modelId: string;
viewId: string;
exportAs: 'csv' | 'json' | 'xlsx';
ncSiteUrl: string;
} }

11
packages/nocodb/src/meta/meta.service.ts

@ -251,6 +251,7 @@ export class MetaService {
[MetaTable.COMMENTS]: 'com', [MetaTable.COMMENTS]: 'com',
[MetaTable.COMMENTS_REACTIONS]: 'cre', [MetaTable.COMMENTS_REACTIONS]: 'cre',
[MetaTable.USER_COMMENTS_NOTIFICATIONS_PREFERENCE]: 'cnp', [MetaTable.USER_COMMENTS_NOTIFICATIONS_PREFERENCE]: 'cnp',
[MetaTable.JOBS]: 'job',
}; };
const prefix = prefixMap[target] || 'nc'; const prefix = prefixMap[target] || 'nc';
@ -726,6 +727,16 @@ export class MetaService {
); );
} }
public formatDateTime(date: string): string {
return dayjs(date)
.utc()
.format(
this.isMySQL() || this.isMssql()
? 'YYYY-MM-DD HH:mm:ss'
: 'YYYY-MM-DD HH:mm:ssZ',
);
}
public async init(): Promise<boolean> { public async init(): Promise<boolean> {
await this.connection.migrate.latest({ await this.connection.migrate.latest({
migrationSource: new XcMigrationSource(), migrationSource: new XcMigrationSource(),

4
packages/nocodb/src/meta/migrations/XcMigrationSourcev2.ts

@ -39,6 +39,7 @@ import * as nc_049_clear_notifications from '~/meta/migrations/v2/nc_049_clear_n
import * as nc_050_tenant_isolation from '~/meta/migrations/v2/nc_050_tenant_isolation'; import * as nc_050_tenant_isolation from '~/meta/migrations/v2/nc_050_tenant_isolation';
import * as nc_051_source_readonly_columns from '~/meta/migrations/v2/nc_051_source_readonly_columns'; import * as nc_051_source_readonly_columns from '~/meta/migrations/v2/nc_051_source_readonly_columns';
import * as nc_052_field_aggregation from '~/meta/migrations/v2/nc_052_field_aggregation'; import * as nc_052_field_aggregation from '~/meta/migrations/v2/nc_052_field_aggregation';
import * as nc_053_jobs from '~/meta/migrations/v2/nc_053_jobs';
// Create a custom migration source class // Create a custom migration source class
export default class XcMigrationSourcev2 { export default class XcMigrationSourcev2 {
@ -89,6 +90,7 @@ export default class XcMigrationSourcev2 {
'nc_050_tenant_isolation', 'nc_050_tenant_isolation',
'nc_051_source_readonly_columns', 'nc_051_source_readonly_columns',
'nc_052_field_aggregation', 'nc_052_field_aggregation',
'nc_053_jobs',
]); ]);
} }
@ -180,6 +182,8 @@ export default class XcMigrationSourcev2 {
return nc_051_source_readonly_columns; return nc_051_source_readonly_columns;
case 'nc_052_field_aggregation': case 'nc_052_field_aggregation':
return nc_052_field_aggregation; return nc_052_field_aggregation;
case 'nc_053_jobs':
return nc_053_jobs;
} }
} }
} }

30
packages/nocodb/src/meta/migrations/v2/nc_053_jobs.ts

@ -0,0 +1,30 @@
import type { Knex } from 'knex';
import { MetaTable } from '~/utils/globals';
const up = async (knex: Knex) => {
await knex.schema.createTable(MetaTable.JOBS, (table) => {
table.string('id', 20).primary();
table.string('job', 255);
table.string('status', 20);
table.text('result');
table.string('fk_user_id', 20);
table.string('fk_workspace_id', 20);
table.string('base_id', 20);
table.timestamps(true, true);
// TODO - add indexes
});
};
const down = async (knex: Knex) => {
await knex.schema.dropTable(MetaTable.JOBS);
};
export { up, down };

136
packages/nocodb/src/models/Job.ts

@ -0,0 +1,136 @@
import type { NcContext } from '~/interface/config';
import type { Condition } from '~/db/CustomKnex';
import Noco from '~/Noco';
import {
CacheDelDirection,
CacheGetType,
CacheScope,
MetaTable,
} from '~/utils/globals';
import NocoCache from '~/cache/NocoCache';
import { extractProps } from '~/helpers/extractProps';
import { prepareForDb, prepareForResponse } from '~/utils/modelUtils';
export default class Job {
id: string;
job: string;
status: string;
result: string;
fk_user_id: string;
fk_workspace_id: string;
base_id: string;
created_at: Date;
updated_at: Date;
constructor(data: Partial<Job>) {
Object.assign(this, data);
}
public static async insert(
context: NcContext,
jobObj: Partial<Job>,
ncMeta = Noco.ncMeta,
) {
const insertObj = extractProps(jobObj, [
'job',
'status',
'result',
'fk_user_id',
]);
const { id } = await ncMeta.metaInsert2(
context.workspace_id,
context.base_id,
MetaTable.JOBS,
insertObj,
);
return this.get(context, id, ncMeta);
}
public static async update(
context: NcContext,
jobId: string,
jobObj: Partial<Job>,
ncMeta = Noco.ncMeta,
) {
const updateObj = extractProps(jobObj, ['status', 'result']);
const res = await ncMeta.metaUpdate(
context.workspace_id,
context.base_id,
MetaTable.JOBS,
prepareForDb(updateObj, 'result'),
jobId,
);
await NocoCache.update(
`${CacheScope.JOBS}:${jobId}`,
prepareForResponse(updateObj, 'result'),
);
return res;
}
public static async delete(
context: NcContext,
jobId: string,
ncMeta = Noco.ncMeta,
) {
await ncMeta.metaDelete(
context.workspace_id,
context.base_id,
MetaTable.JOBS,
jobId,
);
await NocoCache.deepDel(
`${CacheScope.JOBS}:${jobId}`,
CacheDelDirection.CHILD_TO_PARENT,
);
}
public static async get(context: NcContext, id: any, ncMeta = Noco.ncMeta) {
let jobData =
id &&
(await NocoCache.get(
`${CacheScope.JOBS}:${id}`,
CacheGetType.TYPE_OBJECT,
));
if (!jobData) {
jobData = await ncMeta.metaGet2(
context.workspace_id,
context.base_id,
MetaTable.JOBS,
id,
);
jobData = prepareForResponse(jobData, 'result');
await NocoCache.set(`${CacheScope.JOBS}:${id}`, jobData);
}
return jobData && new Job(jobData);
}
public static async list(
context: NcContext,
opts: {
condition?: Record<string, string>;
xcCondition?: Condition;
},
ncMeta = Noco.ncMeta,
): Promise<Job[]> {
const jobList = await ncMeta.metaList2(
context.workspace_id,
context.base_id,
MetaTable.JOBS,
opts,
);
return jobList.map((job) => {
return new Job(prepareForResponse(job, 'result'));
});
}
}

17
packages/nocodb/src/models/PresignedUrl.ts

@ -91,10 +91,18 @@ export default class PresignedUrl {
path: string; path: string;
expireSeconds?: number; expireSeconds?: number;
s3?: boolean; s3?: boolean;
filename?: string;
}, },
ncMeta = Noco.ncMeta, ncMeta = Noco.ncMeta,
) { ) {
const { path, expireSeconds = DEFAULT_EXPIRE_SECONDS, s3 = false } = param; let { path } = param;
const {
expireSeconds = DEFAULT_EXPIRE_SECONDS,
s3 = false,
filename,
} = param;
const expireAt = roundExpiry( const expireAt = roundExpiry(
new Date(new Date().getTime() + expireSeconds * 1000), new Date(new Date().getTime() + expireSeconds * 1000),
); // at least expireSeconds from now ); // at least expireSeconds from now
@ -129,6 +137,7 @@ export default class PresignedUrl {
tempUrl = await (storageAdapter as any).getSignedUrl( tempUrl = await (storageAdapter as any).getSignedUrl(
path, path,
expiresInSeconds, expiresInSeconds,
filename,
); );
await this.add({ await this.add({
path: path, path: path,
@ -139,6 +148,12 @@ export default class PresignedUrl {
} else { } else {
// if not present, create a new url // if not present, create a new url
tempUrl = `dltemp/${nanoid(16)}/${expireAt.getTime()}/${path}`; tempUrl = `dltemp/${nanoid(16)}/${expireAt.getTime()}/${path}`;
// if filename is present, add it to the destination
if (filename) {
path = `${path}?filename=${encodeURIComponent(filename)}`;
}
await this.add({ await this.add({
path: path, path: path,
url: tempUrl, url: tempUrl,

1
packages/nocodb/src/models/index.ts

@ -43,3 +43,4 @@ export { default as PresignedUrl } from './PresignedUrl';
export { default as UserRefreshToken } from './UserRefreshToken'; export { default as UserRefreshToken } from './UserRefreshToken';
export { default as Extension } from './Extension'; export { default as Extension } from './Extension';
export { default as Comment } from './Comment'; export { default as Comment } from './Comment';
export { default as Job } from './Job';

12
packages/nocodb/src/modules/jobs/fallback/fallback-queue.service.ts

@ -7,7 +7,8 @@ import { MetaSyncProcessor } from '~/modules/jobs/jobs/meta-sync/meta-sync.proce
import { SourceCreateProcessor } from '~/modules/jobs/jobs/source-create/source-create.processor'; import { SourceCreateProcessor } from '~/modules/jobs/jobs/source-create/source-create.processor';
import { SourceDeleteProcessor } from '~/modules/jobs/jobs/source-delete/source-delete.processor'; import { SourceDeleteProcessor } from '~/modules/jobs/jobs/source-delete/source-delete.processor';
import { WebhookHandlerProcessor } from '~/modules/jobs/jobs/webhook-handler/webhook-handler.processor'; import { WebhookHandlerProcessor } from '~/modules/jobs/jobs/webhook-handler/webhook-handler.processor';
import { JobsEventService } from '~/modules/jobs/fallback/jobs-event.service'; import { DataExportProcessor } from '~/modules/jobs/jobs/data-export/data-export.processor';
import { JobsEventService } from '~/modules/jobs/jobs-event.service';
import { JobStatus, JobTypes } from '~/interface/Jobs'; import { JobStatus, JobTypes } from '~/interface/Jobs';
export interface Job { export interface Job {
@ -33,6 +34,7 @@ export class QueueService {
protected readonly sourceCreateProcessor: SourceCreateProcessor, protected readonly sourceCreateProcessor: SourceCreateProcessor,
protected readonly sourceDeleteProcessor: SourceDeleteProcessor, protected readonly sourceDeleteProcessor: SourceDeleteProcessor,
protected readonly webhookHandlerProcessor: WebhookHandlerProcessor, protected readonly webhookHandlerProcessor: WebhookHandlerProcessor,
protected readonly dataExportProcessor: DataExportProcessor,
) { ) {
this.emitter.on(JobStatus.ACTIVE, (data: { job: Job }) => { this.emitter.on(JobStatus.ACTIVE, (data: { job: Job }) => {
const job = this.queueMemory.find((job) => job.id === data.job.id); const job = this.queueMemory.find((job) => job.id === data.job.id);
@ -94,6 +96,10 @@ export class QueueService {
this: this.webhookHandlerProcessor, this: this.webhookHandlerProcessor,
fn: this.webhookHandlerProcessor.job, fn: this.webhookHandlerProcessor.job,
}, },
[JobTypes.DataExport]: {
this: this.dataExportProcessor,
fn: this.dataExportProcessor.job,
},
}; };
async jobWrapper(job: Job) { async jobWrapper(job: Job) {
@ -129,8 +135,8 @@ export class QueueService {
QueueService.queueIdCounter = index; QueueService.queueIdCounter = index;
} }
add(name: string, data: any, _opts = {}) { add(name: string, data: any, opts?: { jobId?: string }) {
const id = `${this.queueIndex++}`; const id = opts?.jobId || `${this.queueIndex++}`;
const job = { id: `${id}`, name, status: JobStatus.WAITING, data }; const job = { id: `${id}`, name, status: JobStatus.WAITING, data };
this.queueMemory.push(job); this.queueMemory.push(job);
this.queue.add(() => this.jobWrapper(job)); this.queue.add(() => this.jobWrapper(job));

54
packages/nocodb/src/modules/jobs/fallback/jobs-event.service.ts

@ -1,54 +0,0 @@
import {
OnQueueActive,
OnQueueCompleted,
OnQueueFailed,
Processor,
} from '@nestjs/bull';
import { Job } from 'bull';
import { EventEmitter2 } from '@nestjs/event-emitter';
import { Logger } from '@nestjs/common';
import { JobEvents, JOBS_QUEUE, JobStatus } from '~/interface/Jobs';
@Processor(JOBS_QUEUE)
export class JobsEventService {
protected logger = new Logger(JobsEventService.name);
constructor(private eventEmitter: EventEmitter2) {}
@OnQueueActive()
onActive(job: Job) {
this.eventEmitter.emit(JobEvents.STATUS, {
id: job.id.toString(),
status: JobStatus.ACTIVE,
});
}
@OnQueueFailed()
onFailed(job: Job, error: Error) {
this.logger.error(
`---- !! JOB FAILED !! ----\nid:${job.id}\nerror:${error.name} (${error.message})\n\nstack: ${error.stack}`,
);
const newLocal = this;
newLocal.eventEmitter.emit(JobEvents.STATUS, {
id: job.id.toString(),
status: JobStatus.FAILED,
data: {
error: {
message: error?.message,
},
},
});
}
@OnQueueCompleted()
onCompleted(job: Job, data: any) {
this.eventEmitter.emit(JobEvents.STATUS, {
id: job.id.toString(),
status: JobStatus.COMPLETED,
data: {
result: data,
},
});
}
}

42
packages/nocodb/src/modules/jobs/fallback/jobs.service.ts

@ -2,6 +2,8 @@ import { Injectable } from '@nestjs/common';
import type { OnModuleInit } from '@nestjs/common'; import type { OnModuleInit } from '@nestjs/common';
import { QueueService } from '~/modules/jobs/fallback/fallback-queue.service'; import { QueueService } from '~/modules/jobs/fallback/fallback-queue.service';
import { JobStatus } from '~/interface/Jobs'; import { JobStatus } from '~/interface/Jobs';
import { Job } from '~/models';
import { RootScopes } from '~/utils/globals';
@Injectable() @Injectable()
export class JobsService implements OnModuleInit { export class JobsService implements OnModuleInit {
@ -10,7 +12,21 @@ export class JobsService implements OnModuleInit {
async onModuleInit() {} async onModuleInit() {}
async add(name: string, data: any) { async add(name: string, data: any) {
return this.fallbackQueueService.add(name, data); const context = {
workspace_id: RootScopes.ROOT,
base_id: RootScopes.ROOT,
...(data?.context || {}),
};
const jobData = await Job.insert(context, {
job: name,
status: JobStatus.WAITING,
fk_user_id: data?.user?.id,
});
this.fallbackQueueService.add(name, data, { jobId: jobData.id });
return jobData;
} }
async jobStatus(jobId: string) { async jobStatus(jobId: string) {
@ -28,30 +44,6 @@ export class JobsService implements OnModuleInit {
]); ]);
} }
async getJobWithData(data: any) {
const jobs = await this.fallbackQueueService.getJobs([
// 'completed',
JobStatus.WAITING,
JobStatus.ACTIVE,
JobStatus.DELAYED,
// 'failed',
JobStatus.PAUSED,
]);
const job = jobs.find((j) => {
for (const key in data) {
if (j.data[key]) {
if (j.data[key] !== data[key]) return false;
} else {
return false;
}
}
return true;
});
return job;
}
async resumeQueue() { async resumeQueue() {
await this.fallbackQueueService.queue.start(); await this.fallbackQueueService.queue.start();
} }

108
packages/nocodb/src/modules/jobs/jobs-event.service.ts

@ -0,0 +1,108 @@
import {
OnQueueActive,
OnQueueCompleted,
OnQueueFailed,
Processor,
} from '@nestjs/bull';
import { Job as BullJob } from 'bull';
import { EventEmitter2 } from '@nestjs/event-emitter';
import { Logger } from '@nestjs/common';
import { JobEvents, JOBS_QUEUE, JobStatus } from '~/interface/Jobs';
import { Job } from '~/models';
import { RootScopes } from '~/utils/globals';
@Processor(JOBS_QUEUE)
export class JobsEventService {
protected logger = new Logger(JobsEventService.name);
constructor(private eventEmitter: EventEmitter2) {}
@OnQueueActive()
onActive(job: BullJob) {
Job.update(
{
workspace_id: RootScopes.ROOT,
base_id: RootScopes.ROOT,
},
job.id.toString(),
{
status: JobStatus.ACTIVE,
},
)
.then(() => {
this.eventEmitter.emit(JobEvents.STATUS, {
id: job.id.toString(),
status: JobStatus.ACTIVE,
});
})
.catch((error) => {
this.logger.error(
`Failed to update job (${job.id}) status to active: ${error.message}`,
);
});
}
@OnQueueFailed()
onFailed(job: BullJob, error: Error) {
this.logger.error(
`---- !! JOB FAILED !! ----\nid:${job.id}\nerror:${error.name} (${error.message})\n\nstack: ${error.stack}`,
);
Job.update(
{
workspace_id: RootScopes.ROOT,
base_id: RootScopes.ROOT,
},
job.id.toString(),
{
status: JobStatus.FAILED,
},
)
.then(() => {
const newLocal = this;
newLocal.eventEmitter.emit(JobEvents.STATUS, {
id: job.id.toString(),
status: JobStatus.FAILED,
data: {
error: {
message: error?.message,
},
},
});
})
.catch((error) => {
this.logger.error(
`Failed to update job (${job.id}) status to failed: ${error.message}`,
);
});
}
@OnQueueCompleted()
onCompleted(job: BullJob, data: any) {
Job.update(
{
workspace_id: RootScopes.ROOT,
base_id: RootScopes.ROOT,
},
job.id.toString(),
{
status: JobStatus.COMPLETED,
result: data,
},
)
.then(() => {
this.eventEmitter.emit(JobEvents.STATUS, {
id: job.id.toString(),
status: JobStatus.COMPLETED,
data: {
result: data,
},
});
})
.catch((error) => {
this.logger.error(
`Failed to update job (${job.id}) status to completed: ${error.message}`,
);
});
}
}

1
packages/nocodb/src/modules/jobs/jobs-service.interface.ts

@ -7,7 +7,6 @@ export interface IJobsService {
add(name: string, data: any): Promise<Bull.Job<any>>; add(name: string, data: any): Promise<Bull.Job<any>>;
jobStatus(jobId: string): Promise<JobStatus>; jobStatus(jobId: string): Promise<JobStatus>;
jobList(): Promise<Bull.Job<any>[]>; jobList(): Promise<Bull.Job<any>[]>;
getJobWithData(data: any): Promise<Bull.Job<any>>;
resumeQueue(): Promise<void>; resumeQueue(): Promise<void>;
pauseQueue(): Promise<void>; pauseQueue(): Promise<void>;
} }

107
packages/nocodb/src/modules/jobs/jobs.controller.ts

@ -56,7 +56,7 @@ export class JobsController {
} else { } else {
messages = ( messages = (
await NocoCache.get( await NocoCache.get(
`${CacheScope.JOBS}:${jobId}:messages`, `${CacheScope.JOBS_POLLING}:${jobId}:messages`,
CacheGetType.TYPE_OBJECT, CacheGetType.TYPE_OBJECT,
) )
)?.messages; )?.messages;
@ -92,38 +92,43 @@ export class JobsController {
}; };
// subscribe to job events // subscribe to job events
if (JobsRedis.available) { if (JobsRedis.available) {
const unsubscribeCallback = await JobsRedis.subscribe(jobId, async (data) => { const unsubscribeCallback = await JobsRedis.subscribe(
if (this.jobRooms[jobId]) { jobId,
this.jobRooms[jobId].listeners.forEach((res) => { async (data) => {
if (!res.headersSent) { if (this.jobRooms[jobId]) {
res.send({ this.jobRooms[jobId].listeners.forEach((res) => {
status: 'refresh', if (!res.headersSent) {
}); res.send({
} status: 'refresh',
}); });
} }
});
const cmd = data.cmd; }
delete data.cmd;
switch (cmd) { const cmd = data.cmd;
case JobEvents.STATUS: delete data.cmd;
if ( switch (cmd) {
[JobStatus.COMPLETED, JobStatus.FAILED].includes(data.status) case JobEvents.STATUS:
) { if (
await unsubscribeCallback(); [JobStatus.COMPLETED, JobStatus.FAILED].includes(data.status)
delete this.jobRooms[jobId]; ) {
// close the job after 1 second (to allow the update of messages) await unsubscribeCallback();
setTimeout(() => { delete this.jobRooms[jobId];
this.closedJobs.push(jobId); // close the job after 1 second (to allow the update of messages)
}, 1000); setTimeout(() => {
// remove the job after polling interval * 2 this.closedJobs.push(jobId);
setTimeout(() => { }, 1000);
this.closedJobs = this.closedJobs.filter((j) => j !== jobId); // remove the job after polling interval * 2
}, POLLING_INTERVAL * 2); setTimeout(() => {
} this.closedJobs = this.closedJobs.filter(
break; (j) => j !== jobId,
} );
}); }, POLLING_INTERVAL * 2);
}
break;
}
},
);
} }
} }
@ -144,32 +149,6 @@ export class JobsController {
}, POLLING_INTERVAL); }, POLLING_INTERVAL);
} }
@Post('/jobs/status')
async status(@Body() data: { id: string } | any) {
let res: {
id?: string;
status?: JobStatus;
} | null = null;
if (Object.keys(data).every((k) => ['id'].includes(k)) && data?.id) {
const rooms = (await this.jobsService.jobList()).map(
(j) => `jobs-${j.id}`,
);
const room = rooms.find((r) => r === `jobs-${data.id}`);
if (room) {
res.id = data.id;
}
} else {
const job = await this.jobsService.getJobWithData(data);
if (job) {
res = {};
res.id = `${job.id}`;
res.status = await this.jobsService.jobStatus(data.id);
}
}
return res;
}
@OnEvent(JobEvents.STATUS) @OnEvent(JobEvents.STATUS)
async sendJobStatus(data: { async sendJobStatus(data: {
id: string; id: string;
@ -193,7 +172,7 @@ export class JobsController {
this.localJobs[jobId].messages.shift(); this.localJobs[jobId].messages.shift();
} }
await NocoCache.set(`${CacheScope.JOBS}:${jobId}:messages`, { await NocoCache.set(`${CacheScope.JOBS_POLLING}:${jobId}:messages`, {
messages: this.localJobs[jobId].messages, messages: this.localJobs[jobId].messages,
}); });
} else { } else {
@ -208,7 +187,7 @@ export class JobsController {
_mid: 1, _mid: 1,
}; };
await NocoCache.set(`${CacheScope.JOBS}:${jobId}:messages`, { await NocoCache.set(`${CacheScope.JOBS_POLLING}:${jobId}:messages`, {
messages: this.localJobs[jobId].messages, messages: this.localJobs[jobId].messages,
}); });
} }
@ -237,7 +216,7 @@ export class JobsController {
setTimeout(async () => { setTimeout(async () => {
delete this.jobRooms[jobId]; delete this.jobRooms[jobId];
delete this.localJobs[jobId]; delete this.localJobs[jobId];
await NocoCache.del(`${CacheScope.JOBS}:${jobId}:messages`); await NocoCache.del(`${CacheScope.JOBS_POLLING}:${jobId}:messages`);
}, POLLING_INTERVAL * 2); }, POLLING_INTERVAL * 2);
} }
} }
@ -265,7 +244,7 @@ export class JobsController {
this.localJobs[jobId].messages.shift(); this.localJobs[jobId].messages.shift();
} }
await NocoCache.set(`${CacheScope.JOBS}:${jobId}:messages`, { await NocoCache.set(`${CacheScope.JOBS_POLLING}:${jobId}:messages`, {
messages: this.localJobs[jobId].messages, messages: this.localJobs[jobId].messages,
}); });
} else { } else {
@ -280,7 +259,7 @@ export class JobsController {
_mid: 1, _mid: 1,
}; };
await NocoCache.set(`${CacheScope.JOBS}:${jobId}:messages`, { await NocoCache.set(`${CacheScope.JOBS_POLLING}:${jobId}:messages`, {
messages: this.localJobs[jobId].messages, messages: this.localJobs[jobId].messages,
}); });
} }

12
packages/nocodb/src/modules/jobs/jobs.module.ts

@ -16,18 +16,19 @@ import { SourceCreateProcessor } from '~/modules/jobs/jobs/source-create/source-
import { SourceDeleteController } from '~/modules/jobs/jobs/source-delete/source-delete.controller'; import { SourceDeleteController } from '~/modules/jobs/jobs/source-delete/source-delete.controller';
import { SourceDeleteProcessor } from '~/modules/jobs/jobs/source-delete/source-delete.processor'; import { SourceDeleteProcessor } from '~/modules/jobs/jobs/source-delete/source-delete.processor';
import { WebhookHandlerProcessor } from '~/modules/jobs/jobs/webhook-handler/webhook-handler.processor'; import { WebhookHandlerProcessor } from '~/modules/jobs/jobs/webhook-handler/webhook-handler.processor';
import { DataExportProcessor } from '~/modules/jobs/jobs/data-export/data-export.processor';
import { DataExportController } from '~/modules/jobs/jobs/data-export/data-export.controller';
// Jobs Module Related // Jobs Module Related
import { JobsLogService } from '~/modules/jobs/jobs/jobs-log.service'; import { JobsLogService } from '~/modules/jobs/jobs/jobs-log.service';
// import { JobsGateway } from '~/modules/jobs/jobs.gateway'; // import { JobsGateway } from '~/modules/jobs/jobs.gateway';
import { JobsController } from '~/modules/jobs/jobs.controller'; import { JobsController } from '~/modules/jobs/jobs.controller';
import { JobsService } from '~/modules/jobs/redis/jobs.service'; import { JobsService } from '~/modules/jobs/redis/jobs.service';
import { JobsEventService } from '~/modules/jobs/redis/jobs-event.service'; import { JobsEventService } from '~/modules/jobs/jobs-event.service';
// Fallback // Fallback
import { JobsService as FallbackJobsService } from '~/modules/jobs/fallback/jobs.service'; import { JobsService as FallbackJobsService } from '~/modules/jobs/fallback/jobs.service';
import { QueueService as FallbackQueueService } from '~/modules/jobs/fallback/fallback-queue.service'; import { QueueService as FallbackQueueService } from '~/modules/jobs/fallback/fallback-queue.service';
import { JobsEventService as FallbackJobsEventService } from '~/modules/jobs/fallback/jobs-event.service';
import { JOBS_QUEUE } from '~/interface/Jobs'; import { JOBS_QUEUE } from '~/interface/Jobs';
export const JobsModuleMetadata = { export const JobsModuleMetadata = {
@ -53,14 +54,14 @@ export const JobsModuleMetadata = {
MetaSyncController, MetaSyncController,
SourceCreateController, SourceCreateController,
SourceDeleteController, SourceDeleteController,
DataExportController,
] ]
: []), : []),
], ],
providers: [ providers: [
...(process.env.NC_WORKER_CONTAINER !== 'true' ? [] : []), ...(process.env.NC_WORKER_CONTAINER !== 'true' ? [] : []),
...(process.env.NC_REDIS_JOB_URL JobsEventService,
? [JobsEventService] ...(process.env.NC_REDIS_JOB_URL ? [] : [FallbackQueueService]),
: [FallbackQueueService, FallbackJobsEventService]),
{ {
provide: 'JobsService', provide: 'JobsService',
useClass: process.env.NC_REDIS_JOB_URL useClass: process.env.NC_REDIS_JOB_URL
@ -76,6 +77,7 @@ export const JobsModuleMetadata = {
SourceCreateProcessor, SourceCreateProcessor,
SourceDeleteProcessor, SourceDeleteProcessor,
WebhookHandlerProcessor, WebhookHandlerProcessor,
DataExportProcessor,
], ],
exports: ['JobsService'], exports: ['JobsService'],
}; };

6
packages/nocodb/src/modules/jobs/jobs/at-import/at-import.processor.ts

@ -31,7 +31,7 @@ import { TablesService } from '~/services/tables.service';
import { ViewColumnsService } from '~/services/view-columns.service'; import { ViewColumnsService } from '~/services/view-columns.service';
import { ViewsService } from '~/services/views.service'; import { ViewsService } from '~/services/views.service';
import { FormsService } from '~/services/forms.service'; import { FormsService } from '~/services/forms.service';
import { JOBS_QUEUE, JobTypes } from '~/interface/Jobs'; import { AtImportJobData, JOBS_QUEUE, JobTypes } from '~/interface/Jobs';
import { GridColumnsService } from '~/services/grid-columns.service'; import { GridColumnsService } from '~/services/grid-columns.service';
import { TelemetryService } from '~/services/telemetry.service'; import { TelemetryService } from '~/services/telemetry.service';
import NcConnectionMgrv2 from '~/utils/common/NcConnectionMgrv2'; import NcConnectionMgrv2 from '~/utils/common/NcConnectionMgrv2';
@ -112,7 +112,7 @@ export class AtImportProcessor {
) {} ) {}
@Process(JobTypes.AtImport) @Process(JobTypes.AtImport)
async job(job: Job) { async job(job: Job<AtImportJobData>) {
this.debugLog(`job started for ${job.id}`); this.debugLog(`job started for ${job.id}`);
const context = job.data.context; const context = job.data.context;
@ -2668,7 +2668,7 @@ export interface AirtableSyncConfig {
apiKey: string; apiKey: string;
appId?: string; appId?: string;
shareId: string; shareId: string;
user: UserType; user: Partial<UserType>;
options: { options: {
syncViews: boolean; syncViews: boolean;
syncData: boolean; syncData: boolean;

58
packages/nocodb/src/modules/jobs/jobs/data-export/data-export.controller.ts

@ -0,0 +1,58 @@
import {
Body,
Controller,
HttpCode,
Inject,
Param,
Post,
Req,
UseGuards,
} from '@nestjs/common';
import type { DataExportJobData } from '~/interface/Jobs';
import { GlobalGuard } from '~/guards/global/global.guard';
import { Acl } from '~/middlewares/extract-ids/extract-ids.middleware';
import { BasesService } from '~/services/bases.service';
import { View } from '~/models';
import { JobTypes } from '~/interface/Jobs';
import { MetaApiLimiterGuard } from '~/guards/meta-api-limiter.guard';
import { IJobsService } from '~/modules/jobs/jobs-service.interface';
import { TenantContext } from '~/decorators/tenant-context.decorator';
import { NcContext, NcRequest } from '~/interface/config';
import { NcError } from '~/helpers/catchError';
@Controller()
@UseGuards(MetaApiLimiterGuard, GlobalGuard)
export class DataExportController {
constructor(
@Inject('JobsService') protected readonly jobsService: IJobsService,
protected readonly basesService: BasesService,
) {}
@Post(['/api/v2/export/:viewId/:exportAs'])
@HttpCode(200)
// TODO add new ACL
@Acl('dataList')
async exportModelData(
@TenantContext() context: NcContext,
@Req() req: NcRequest,
@Param('viewId') viewId: string,
@Param('exportAs') exportAs: 'csv' | 'json' | 'xlsx',
@Body() options: DataExportJobData['options'],
) {
const view = await View.get(context, viewId);
if (!view) NcError.viewNotFound(viewId);
const job = await this.jobsService.add(JobTypes.DataExport, {
context,
options,
modelId: view.fk_model_id,
viewId,
user: req.user,
exportAs,
ncSiteUrl: req.ncSiteUrl,
});
return job;
}
}

132
packages/nocodb/src/modules/jobs/jobs/data-export/data-export.processor.ts

@ -0,0 +1,132 @@
import { Readable } from 'stream';
import path from 'path';
import { Process, Processor } from '@nestjs/bull';
import { Logger } from '@nestjs/common';
import { Job } from 'bull';
import moment from 'moment';
import { type DataExportJobData, JOBS_QUEUE, JobTypes } from '~/interface/Jobs';
import { elapsedTime, initTime } from '~/modules/jobs/helpers';
import { ExportService } from '~/modules/jobs/jobs/export-import/export.service';
import { Model, PresignedUrl, View } from '~/models';
import { NcError } from '~/helpers/catchError';
import NcPluginMgrv2 from '~/helpers/NcPluginMgrv2';
function getViewTitle(view: View) {
return view.is_default ? 'Default View' : view.title;
}
@Processor(JOBS_QUEUE)
export class DataExportProcessor {
private logger = new Logger(DataExportProcessor.name);
constructor(private readonly exportService: ExportService) {}
@Process(JobTypes.DataExport)
async job(job: Job<DataExportJobData>) {
const {
context,
options,
modelId,
viewId,
user: _user,
exportAs,
ncSiteUrl,
} = job.data;
if (exportAs !== 'csv') NcError.notImplemented(`Export as ${exportAs}`);
const hrTime = initTime();
const model = await Model.get(context, modelId);
if (!model) NcError.tableNotFound(modelId);
const view = await View.get(context, viewId);
if (!view) NcError.viewNotFound(viewId);
// date time as containing folder YYYY-MM-DD/HH
const dateFolder = moment().format('YYYY-MM-DD/HH');
const storageAdapter = await NcPluginMgrv2.storageAdapter();
const destPath = `nc/uploads/data-export/${dateFolder}/${modelId}/${
model.title
} (${getViewTitle(view)}) - ${Date.now()}.csv`;
let url = null;
try {
const dataStream = new Readable({
read() {},
});
dataStream.setEncoding('utf8');
let error = null;
const uploadFilePromise = (storageAdapter as any)
.fileCreateByStream(destPath, dataStream)
.catch((e) => {
this.logger.error(e);
error = e;
});
this.exportService
.streamModelDataAsCsv(context, {
dataStream,
linkStream: null,
baseId: model.base_id,
modelId: model.id,
viewId: view.id,
ncSiteUrl: ncSiteUrl,
delimiter: options?.delimiter,
})
.catch((e) => {
this.logger.debug(e);
dataStream.push(null);
error = e;
});
url = await uploadFilePromise;
// if url is not defined, it is local attachment
if (!url) {
url = await PresignedUrl.getSignedUrl({
path: path.join(destPath.replace('nc/uploads/', '')),
filename: `${model.title} (${getViewTitle(view)}).csv`,
expireSeconds: 3 * 60 * 60, // 3 hours
});
} else {
if (url.includes('.amazonaws.com/')) {
const relativePath = decodeURI(url.split('.amazonaws.com/')[1]);
url = await PresignedUrl.getSignedUrl({
path: relativePath,
filename: `${model.title} (${getViewTitle(view)}).csv`,
s3: true,
expireSeconds: 3 * 60 * 60, // 3 hours
});
}
}
if (error) {
throw error;
}
elapsedTime(
hrTime,
`exported data for model ${modelId} view ${viewId} as ${exportAs}`,
'exportData',
);
} catch (e) {
throw NcError.badRequest(e);
}
return {
timestamp: new Date(),
type: exportAs,
title: `${model.title} (${getViewTitle(view)})`,
url,
};
}
}

9
packages/nocodb/src/modules/jobs/jobs/export-import/duplicate.controller.ts

@ -8,10 +8,7 @@ import {
Req, Req,
UseGuards, UseGuards,
} from '@nestjs/common'; } from '@nestjs/common';
import { import { ProjectStatus, readonlyMetaAllowedTypes } from 'nocodb-sdk';
ProjectStatus,
readonlyMetaAllowedTypes,
} from 'nocodb-sdk';
import { GlobalGuard } from '~/guards/global/global.guard'; import { GlobalGuard } from '~/guards/global/global.guard';
import { Acl } from '~/middlewares/extract-ids/extract-ids.middleware'; import { Acl } from '~/middlewares/extract-ids/extract-ids.middleware';
import { BasesService } from '~/services/bases.service'; import { BasesService } from '~/services/bases.service';
@ -95,6 +92,7 @@ export class DuplicateController {
workspace_id: base.fk_workspace_id, workspace_id: base.fk_workspace_id,
base_id: base.id, base_id: base.id,
}, },
user: req.user,
baseId: base.id, baseId: base.id,
sourceId: source.id, sourceId: source.id,
dupProjectId: dupProject.id, dupProjectId: dupProject.id,
@ -168,6 +166,7 @@ export class DuplicateController {
const job = await this.jobsService.add(JobTypes.DuplicateBase, { const job = await this.jobsService.add(JobTypes.DuplicateBase, {
context, context,
user: req.user,
baseId: base.id, baseId: base.id,
sourceId: source.id, sourceId: source.id,
dupProjectId: dupProject.id, dupProjectId: dupProject.id,
@ -233,6 +232,7 @@ export class DuplicateController {
const job = await this.jobsService.add(JobTypes.DuplicateModel, { const job = await this.jobsService.add(JobTypes.DuplicateModel, {
context, context,
user: req.user,
baseId: base.id, baseId: base.id,
sourceId: source.id, sourceId: source.id,
modelId: model.id, modelId: model.id,
@ -302,6 +302,7 @@ export class DuplicateController {
const job = await this.jobsService.add(JobTypes.DuplicateColumn, { const job = await this.jobsService.add(JobTypes.DuplicateColumn, {
context, context,
user: req.user,
baseId: base.id, baseId: base.id,
sourceId: column.source_id, sourceId: column.source_id,
modelId: model.id, modelId: model.id,

20
packages/nocodb/src/modules/jobs/jobs/export-import/duplicate.processor.ts

@ -5,6 +5,11 @@ import papaparse from 'papaparse';
import debug from 'debug'; import debug from 'debug';
import { isLinksOrLTAR, isVirtualCol, RelationTypes } from 'nocodb-sdk'; import { isLinksOrLTAR, isVirtualCol, RelationTypes } from 'nocodb-sdk';
import type { NcContext } from '~/interface/config'; import type { NcContext } from '~/interface/config';
import type {
DuplicateBaseJobData,
DuplicateColumnJobData,
DuplicateModelJobData,
} from '~/interface/Jobs';
import { Base, Column, Model, Source } from '~/models'; import { Base, Column, Model, Source } from '~/models';
import { BasesService } from '~/services/bases.service'; import { BasesService } from '~/services/bases.service';
import { import {
@ -31,7 +36,7 @@ export class DuplicateProcessor {
) {} ) {}
@Process(JobTypes.DuplicateBase) @Process(JobTypes.DuplicateBase)
async duplicateBase(job: Job) { async duplicateBase(job: Job<DuplicateBaseJobData>) {
this.debugLog(`job started for ${job.id} (${JobTypes.DuplicateBase})`); this.debugLog(`job started for ${job.id} (${JobTypes.DuplicateBase})`);
const hrTime = initTime(); const hrTime = initTime();
@ -131,10 +136,12 @@ export class DuplicateProcessor {
} }
this.debugLog(`job completed for ${job.id} (${JobTypes.DuplicateBase})`); this.debugLog(`job completed for ${job.id} (${JobTypes.DuplicateBase})`);
return { id: dupProject.id };
} }
@Process(JobTypes.DuplicateModel) @Process(JobTypes.DuplicateModel)
async duplicateModel(job: Job) { async duplicateModel(job: Job<DuplicateModelJobData>) {
this.debugLog(`job started for ${job.id} (${JobTypes.DuplicateModel})`); this.debugLog(`job started for ${job.id} (${JobTypes.DuplicateModel})`);
const hrTime = initTime(); const hrTime = initTime();
@ -241,11 +248,11 @@ export class DuplicateProcessor {
this.debugLog(`job completed for ${job.id} (${JobTypes.DuplicateModel})`); this.debugLog(`job completed for ${job.id} (${JobTypes.DuplicateModel})`);
return await Model.get(context, findWithIdentifier(idMap, sourceModel.id)); return { id: findWithIdentifier(idMap, sourceModel.id) };
} }
@Process(JobTypes.DuplicateColumn) @Process(JobTypes.DuplicateColumn)
async duplicateColumn(job: Job) { async duplicateColumn(job: Job<DuplicateColumnJobData>) {
this.debugLog(`job started for ${job.id} (${JobTypes.DuplicateColumn})`); this.debugLog(`job started for ${job.id} (${JobTypes.DuplicateColumn})`);
const hrTime = initTime(); const hrTime = initTime();
@ -398,10 +405,7 @@ export class DuplicateProcessor {
this.debugLog(`job completed for ${job.id} (${JobTypes.DuplicateModel})`); this.debugLog(`job completed for ${job.id} (${JobTypes.DuplicateModel})`);
return await Column.get(context, { return { id: findWithIdentifier(idMap, sourceColumn.id) };
source_id: base.id,
colId: findWithIdentifier(idMap, sourceColumn.id),
});
} }
async importModelsData( async importModelsData(

164
packages/nocodb/src/modules/jobs/jobs/export-import/export.service.ts

@ -9,7 +9,10 @@ import type { NcContext } from '~/interface/config';
import type { LinkToAnotherRecordColumn } from '~/models'; import type { LinkToAnotherRecordColumn } from '~/models';
import { Base, Filter, Hook, Model, Source, View } from '~/models'; import { Base, Filter, Hook, Model, Source, View } from '~/models';
import NcConnectionMgrv2 from '~/utils/common/NcConnectionMgrv2'; import NcConnectionMgrv2 from '~/utils/common/NcConnectionMgrv2';
import { getViewAndModelByAliasOrId } from '~/helpers/dataHelpers'; import {
getViewAndModelByAliasOrId,
serializeCellValue,
} from '~/helpers/dataHelpers';
import { import {
clearPrefix, clearPrefix,
generateBaseIdMap, generateBaseIdMap,
@ -447,10 +450,14 @@ export class ExportService {
viewId?: string; viewId?: string;
handledMmList?: string[]; handledMmList?: string[];
_fieldIds?: string[]; _fieldIds?: string[];
ncSiteUrl?: string;
delimiter?: string;
}, },
) { ) {
const { dataStream, linkStream, handledMmList } = param; const { dataStream, linkStream, handledMmList } = param;
const dataExportMode = !linkStream;
const { model, view } = await getViewAndModelByAliasOrId(context, { const { model, view } = await getViewAndModelByAliasOrId(context, {
baseName: param.baseId, baseName: param.baseId,
tableName: param.modelId, tableName: param.modelId,
@ -463,32 +470,35 @@ export class ExportService {
const btMap = new Map<string, string>(); const btMap = new Map<string, string>();
for (const column of model.columns.filter( if (!dataExportMode) {
(col) => for (const column of model.columns.filter(
col.uidt === UITypes.LinkToAnotherRecord && (col) =>
(col.colOptions?.type === RelationTypes.BELONGS_TO || col.uidt === UITypes.LinkToAnotherRecord &&
(col.colOptions?.type === RelationTypes.ONE_TO_ONE && col.meta?.bt)), (col.colOptions?.type === RelationTypes.BELONGS_TO ||
)) { (col.colOptions?.type === RelationTypes.ONE_TO_ONE &&
await column.getColOptions(context); col.meta?.bt)),
const fkCol = model.columns.find( )) {
(c) => c.id === column.colOptions?.fk_child_column_id, await column.getColOptions(context);
); const fkCol = model.columns.find(
if (fkCol) { (c) => c.id === column.colOptions?.fk_child_column_id,
// replace bt column with fk column if it is in _fieldIds
if (param._fieldIds && param._fieldIds.includes(column.id)) {
param._fieldIds.push(fkCol.id);
const btIndex = param._fieldIds.indexOf(column.id);
param._fieldIds.splice(btIndex, 1);
}
btMap.set(
fkCol.id,
`${column.base_id}::${column.source_id}::${column.fk_model_id}::${column.id}`,
); );
if (fkCol) {
// replace bt column with fk column if it is in _fieldIds
if (param._fieldIds && param._fieldIds.includes(column.id)) {
param._fieldIds.push(fkCol.id);
const btIndex = param._fieldIds.indexOf(column.id);
param._fieldIds.splice(btIndex, 1);
}
btMap.set(
fkCol.id,
`${column.base_id}::${column.source_id}::${column.fk_model_id}::${column.id}`,
);
}
} }
} }
const fields = param._fieldIds let fields = param._fieldIds
? model.columns ? model.columns
.filter((c) => param._fieldIds?.includes(c.id)) .filter((c) => param._fieldIds?.includes(c.id))
.map((c) => c.title) .map((c) => c.title)
@ -498,6 +508,16 @@ export class ExportService {
.map((c) => c.title) .map((c) => c.title)
.join(','); .join(',');
if (dataExportMode) {
const viewCols = await view.getColumns(context);
fields = viewCols
.sort((a, b) => a.order - b.order)
.filter((c) => c.show)
.map((vc) => model.columns.find((c) => c.id === vc.fk_column_id).title)
.join(',');
}
const mmColumns = param._fieldIds const mmColumns = param._fieldIds
? model.columns ? model.columns
.filter((c) => param._fieldIds?.includes(c.id)) .filter((c) => param._fieldIds?.includes(c.id))
@ -506,7 +526,7 @@ export class ExportService {
(col) => isLinksOrLTAR(col) && col.colOptions?.type === 'mm', (col) => isLinksOrLTAR(col) && col.colOptions?.type === 'mm',
); );
const hasLink = mmColumns.length > 0; const hasLink = !dataExportMode && mmColumns.length > 0;
dataStream.setEncoding('utf8'); dataStream.setEncoding('utf8');
@ -564,6 +584,22 @@ export class ExportService {
return { data }; return { data };
}; };
const formatAndSerialize = async (data: any) => {
for (const row of data) {
for (const [k, v] of Object.entries(row)) {
const col = model.columns.find((c) => c.title === k);
if (col) {
row[k] = await serializeCellValue(context, {
value: v,
column: col,
siteUrl: param.ncSiteUrl,
});
}
}
}
return { data };
};
const baseModel = await Model.getBaseModelSQL(context, { const baseModel = await Model.getBaseModelSQL(context, {
id: model.id, id: model.id,
viewId: view?.id, viewId: view?.id,
@ -576,7 +612,7 @@ export class ExportService {
try { try {
await this.recursiveRead( await this.recursiveRead(
context, context,
formatData, dataExportMode ? formatAndSerialize : formatData,
baseModel, baseModel,
dataStream, dataStream,
model, model,
@ -585,6 +621,8 @@ export class ExportService {
limit, limit,
fields, fields,
true, true,
param.delimiter,
dataExportMode,
); );
} catch (e) { } catch (e) {
this.debugLog(e); this.debugLog(e);
@ -670,13 +708,13 @@ export class ExportService {
linkStream.push(null); linkStream.push(null);
} else { } else {
linkStream.push(null); if (linkStream) linkStream.push(null);
} }
} }
async recursiveRead( async recursiveRead(
context: NcContext, context: NcContext,
formatter: (data: any) => { data: any }, formatter: (data: any) => { data: any } | Promise<{ data: any }>,
baseModel: BaseModelSqlv2, baseModel: BaseModelSqlv2,
stream: Readable, stream: Readable,
model: Model, model: Model,
@ -685,6 +723,8 @@ export class ExportService {
limit: number, limit: number,
fields: string, fields: string,
header = false, header = false,
delimiter = ',',
dataExportMode = false,
): Promise<void> { ): Promise<void> {
return new Promise((resolve, reject) => { return new Promise((resolve, reject) => {
this.datasService this.datasService
@ -693,7 +733,7 @@ export class ExportService {
view, view,
query: { limit, offset, fields }, query: { limit, offset, fields },
baseModel, baseModel,
ignoreViewFilterAndSort: true, ignoreViewFilterAndSort: !dataExportMode,
limitOverride: limit, limitOverride: limit,
}) })
.then((result) => { .then((result) => {
@ -701,25 +741,57 @@ export class ExportService {
if (!header) { if (!header) {
stream.push('\r\n'); stream.push('\r\n');
} }
const { data } = formatter(result.list);
stream.push(unparse(data, { header })); // check if formatter is async
if (result.pageInfo.isLastPage) { const formatterPromise = formatter(result.list);
stream.push(null); if (formatterPromise instanceof Promise) {
resolve(); formatterPromise.then(({ data }) => {
stream.push(unparse(data, { header, delimiter }));
if (result.pageInfo.isLastPage) {
stream.push(null);
resolve();
} else {
this.recursiveRead(
context,
formatter,
baseModel,
stream,
model,
view,
offset + limit,
limit,
fields,
false,
delimiter,
dataExportMode,
)
.then(resolve)
.catch(reject);
}
});
} else { } else {
this.recursiveRead( stream.push(unparse(formatterPromise.data, { header }));
context, if (result.pageInfo.isLastPage) {
formatter, stream.push(null);
baseModel, resolve();
stream, } else {
model, this.recursiveRead(
view, context,
offset + limit, formatter,
limit, baseModel,
fields, stream,
) model,
.then(resolve) view,
.catch(reject); offset + limit,
limit,
fields,
false,
delimiter,
dataExportMode,
)
.then(resolve)
.catch(reject);
}
} }
} catch (e) { } catch (e) {
reject(e); reject(e);

1
packages/nocodb/src/modules/jobs/jobs/source-create/source-create.controller.ts

@ -50,6 +50,7 @@ export class SourceCreateController {
const job = await this.jobsService.add(JobTypes.SourceCreate, { const job = await this.jobsService.add(JobTypes.SourceCreate, {
context, context,
user: req.user,
baseId, baseId,
source: body, source: body,
req: { req: {

2
packages/nocodb/src/modules/jobs/jobs/source-create/source-create.processor.ts

@ -46,7 +46,5 @@ export class SourceCreateProcessor {
} }
this.debugLog(`job completed for ${job.id}`); this.debugLog(`job completed for ${job.id}`);
return createdSource;
} }
} }

1
packages/nocodb/src/modules/jobs/jobs/source-delete/source-delete.controller.ts

@ -47,6 +47,7 @@ export class SourceDeleteController {
const job = await this.jobsService.add(JobTypes.SourceDelete, { const job = await this.jobsService.add(JobTypes.SourceDelete, {
context, context,
user: req.user,
sourceId, sourceId,
req: { req: {
user: req.user, user: req.user,

2
packages/nocodb/src/modules/jobs/jobs/source-delete/source-delete.processor.ts

@ -22,7 +22,5 @@ export class SourceDeleteProcessor {
}); });
this.debugLog(`job completed for ${job.id}`); this.debugLog(`job completed for ${job.id}`);
return true;
} }
} }

53
packages/nocodb/src/modules/jobs/redis/jobs-event.service.ts

@ -1,53 +0,0 @@
import {
OnQueueActive,
OnQueueCompleted,
OnQueueFailed,
Processor,
} from '@nestjs/bull';
import { Job } from 'bull';
import { EventEmitter2 } from '@nestjs/event-emitter';
import { Logger } from '@nestjs/common';
import { JobEvents, JOBS_QUEUE, JobStatus } from '~/interface/Jobs';
@Processor(JOBS_QUEUE)
export class JobsEventService {
protected logger = new Logger(JobsEventService.name);
constructor(private eventEmitter: EventEmitter2) {}
@OnQueueActive()
onActive(job: Job) {
this.eventEmitter.emit(JobEvents.STATUS, {
id: job.id.toString(),
status: JobStatus.ACTIVE,
});
}
@OnQueueFailed()
onFailed(job: Job, error: Error) {
this.logger.error(
`---- !! JOB FAILED !! ----\nid:${job.id}\nerror:${error.name} (${error.message})\n\nstack: ${error.stack}`,
);
this.eventEmitter.emit(JobEvents.STATUS, {
id: job.id.toString(),
status: JobStatus.FAILED,
data: {
error: {
message: error?.message,
},
},
});
}
@OnQueueCompleted()
onCompleted(job: Job, data: any) {
this.eventEmitter.emit(JobEvents.STATUS, {
id: job.id.toString(),
status: JobStatus.COMPLETED,
data: {
result: data,
},
});
}
}

43
packages/nocodb/src/modules/jobs/redis/jobs.service.ts

@ -4,6 +4,8 @@ import { Queue } from 'bull';
import type { OnModuleInit } from '@nestjs/common'; import type { OnModuleInit } from '@nestjs/common';
import { InstanceCommands, JOBS_QUEUE, JobStatus } from '~/interface/Jobs'; import { InstanceCommands, JOBS_QUEUE, JobStatus } from '~/interface/Jobs';
import { JobsRedis } from '~/modules/jobs/redis/jobs-redis'; import { JobsRedis } from '~/modules/jobs/redis/jobs-redis';
import { Job } from '~/models';
import { RootScopes } from '~/utils/globals';
@Injectable() @Injectable()
export class JobsService implements OnModuleInit { export class JobsService implements OnModuleInit {
@ -51,11 +53,24 @@ export class JobsService implements OnModuleInit {
async add(name: string, data: any) { async add(name: string, data: any) {
await this.toggleQueue(); await this.toggleQueue();
const job = await this.jobsQueue.add(name, data, { const context = {
workspace_id: RootScopes.ROOT,
base_id: RootScopes.ROOT,
...(data?.context || {}),
};
const jobData = await Job.insert(context, {
job: name,
status: JobStatus.WAITING,
fk_user_id: data?.user?.id,
});
await this.jobsQueue.add(name, data, {
jobId: jobData.id,
removeOnComplete: true, removeOnComplete: true,
}); });
return job; return jobData;
} }
async jobStatus(jobId: string) { async jobStatus(jobId: string) {
@ -74,30 +89,6 @@ export class JobsService implements OnModuleInit {
]); ]);
} }
async getJobWithData(data: any) {
const jobs = await this.jobsQueue.getJobs([
// 'completed',
JobStatus.WAITING,
JobStatus.ACTIVE,
JobStatus.DELAYED,
// 'failed',
JobStatus.PAUSED,
]);
const job = jobs.find((j) => {
for (const key in data) {
if (j.data[key]) {
if (j.data[key] !== data[key]) return false;
} else {
return false;
}
}
return true;
});
return job;
}
async resumeQueue() { async resumeQueue() {
this.logger.log('Resuming global queue'); this.logger.log('Resuming global queue');
await this.jobsQueue.resume(); await this.jobsQueue.resume();

4
packages/nocodb/src/modules/noco.module.ts

@ -100,6 +100,8 @@ import { CommandPaletteService } from '~/services/command-palette.service';
import { CommandPaletteController } from '~/controllers/command-palette.controller'; import { CommandPaletteController } from '~/controllers/command-palette.controller';
import { ExtensionsService } from '~/services/extensions.service'; import { ExtensionsService } from '~/services/extensions.service';
import { ExtensionsController } from '~/controllers/extensions.controller'; import { ExtensionsController } from '~/controllers/extensions.controller';
import { JobsMetaService } from '~/services/jobs-meta.service';
import { JobsMetaController } from '~/controllers/jobs-meta.controller';
/* Datas */ /* Datas */
import { DataTableController } from '~/controllers/data-table.controller'; import { DataTableController } from '~/controllers/data-table.controller';
@ -178,6 +180,7 @@ export const nocoModuleMetadata = {
NotificationsController, NotificationsController,
CommandPaletteController, CommandPaletteController,
ExtensionsController, ExtensionsController,
JobsMetaController,
/* Datas */ /* Datas */
DataTableController, DataTableController,
@ -246,6 +249,7 @@ export const nocoModuleMetadata = {
NotificationsService, NotificationsService,
CommandPaletteService, CommandPaletteService,
ExtensionsService, ExtensionsService,
JobsMetaService,
/* Datas */ /* Datas */
DataTableService, DataTableService,

5
packages/nocodb/src/plugins/s3/S3.ts

@ -108,10 +108,13 @@ export default class S3 implements IStorageAdapterV2 {
}); });
} }
public async getSignedUrl(key, expiresInSeconds = 7200) { public async getSignedUrl(key, expiresInSeconds = 7200, filename?: string) {
const command = new GetObjectCommand({ const command = new GetObjectCommand({
Key: key, Key: key,
Bucket: this.input.bucket, Bucket: this.input.bucket,
...(filename
? { ResponseContentDisposition: `attachment; filename="${filename}" ` }
: {}),
}); });
return getSignedUrl(this.s3Client, command, { return getSignedUrl(this.s3Client, command, {
expiresIn: expiresInSeconds, expiresIn: expiresInSeconds,

90
packages/nocodb/src/schema/swagger-v2.json

@ -11817,6 +11817,96 @@
} }
] ]
} }
},
"/api/v2/jobs/{baseId}": {
"post": {
"summary": "Get Jobs",
"operationId": "jobs-list",
"description": "Get list of jobs for a given base for the user",
"tags": [
"Jobs"
],
"requestBody": {
"content": {
"application/json": {
"schema": {
"type": "object",
"properties": {
"job": {
"type": "string"
},
"status": {
"type": "string"
}
}
}
}
}
}
},
"parameters": [
{
"schema": {
"$ref": "#/components/schemas/Id",
"example": "p124dflkcvasewh",
"type": "string"
},
"name": "baseId",
"in": "path",
"required": true,
"description": "Unique Base ID"
},
{
"$ref": "#/components/parameters/xc-auth"
}
]
},
"/api/v2/export/{viewId}/{exportAs}": {
"post": {
"summary": "Trigger export as job",
"operationId": "export-data",
"description": "Trigger export as job",
"tags": [
"Export"
],
"requestBody": {
"content": {
"application/json": {
"schema": {
"type": "object"
}
}
}
}
},
"parameters": [
{
"schema": {
"$ref": "#/components/schemas/Id",
"example": "vw124dflkcvasewh",
"type": "string"
},
"name": "viewId",
"in": "path",
"required": true,
"description": "Unique View ID"
},
{
"schema": {
"type": "string",
"enum": [
"csv"
]
},
"name": "exportAs",
"in": "path",
"required": true,
"description": "Export as format"
},
{
"$ref": "#/components/parameters/xc-auth"
}
]
} }
}, },
"components": { "components": {

74
packages/nocodb/src/schema/swagger.json

@ -17943,14 +17943,57 @@
} }
] ]
}, },
"/jobs/status": { "/api/v2/jobs/{baseId}": {
"post": { "post": {
"summary": "Jobs Status", "summary": "Get Jobs",
"operationId": "jobs-status", "operationId": "jobs-list",
"description": "Get job status", "description": "Get list of jobs for a given base for the user",
"tags": [ "tags": [
"Jobs" "Jobs"
], ],
"requestBody": {
"content": {
"application/json": {
"schema": {
"type": "object",
"properties": {
"job": {
"type": "string"
},
"status": {
"type": "string"
}
}
}
}
}
}
},
"parameters": [
{
"schema": {
"$ref": "#/components/schemas/Id",
"example": "p124dflkcvasewh",
"type": "string"
},
"name": "baseId",
"in": "path",
"required": true,
"description": "Unique Base ID"
},
{
"$ref": "#/components/parameters/xc-auth"
}
]
},
"/api/v2/export/{viewId}/{exportAs}": {
"post": {
"summary": "Trigger export as job",
"operationId": "export-data",
"description": "Trigger export as job",
"tags": [
"Export"
],
"requestBody": { "requestBody": {
"content": { "content": {
"application/json": { "application/json": {
@ -17962,6 +18005,29 @@
} }
}, },
"parameters": [ "parameters": [
{
"schema": {
"$ref": "#/components/schemas/Id",
"example": "vw124dflkcvasewh",
"type": "string"
},
"name": "viewId",
"in": "path",
"required": true,
"description": "Unique View ID"
},
{
"schema": {
"type": "string",
"enum": [
"csv"
]
},
"name": "exportAs",
"in": "path",
"required": true,
"description": "Export as format"
},
{ {
"$ref": "#/components/parameters/xc-auth" "$ref": "#/components/parameters/xc-auth"
} }

19
packages/nocodb/src/services/jobs-meta.service.spec.ts

@ -0,0 +1,19 @@
import { Test } from '@nestjs/testing';
import { JobsMetaService } from './jobs-meta.service';
import type { TestingModule } from '@nestjs/testing';
describe('JobsMetaService', () => {
let service: JobsMetaService;
beforeEach(async () => {
const module: TestingModule = await Test.createTestingModule({
providers: [JobsMetaService],
}).compile();
service = module.get<JobsMetaService>(JobsMetaService);
});
it('should be defined', () => {
expect(service).toBeDefined();
});
});

83
packages/nocodb/src/services/jobs-meta.service.ts

@ -0,0 +1,83 @@
import { Injectable } from '@nestjs/common';
import dayjs from 'dayjs';
import type { NcContext, NcRequest } from '~/interface/config';
import type { JobTypes } from '~/interface/Jobs';
import { JobStatus } from '~/interface/Jobs';
import { Job } from '~/models';
import Noco from '~/Noco';
@Injectable()
export class JobsMetaService {
constructor() {}
async list(
context: NcContext,
param: { job?: JobTypes; status?: JobStatus },
req: NcRequest,
) {
/*
* List jobs for the current base.
* If the job is not created by the current user, exclude the result.
* List jobs updated in the last 1 hour or jobs that are still active(, waiting, or delayed).
*/
return Job.list(context, {
xcCondition: {
_and: [
...(param.job
? [
{
job: {
eq: param.job,
},
},
]
: []),
...(param.status
? [
{
status: {
eq: param.status,
},
},
]
: []),
{
_or: [
{
updated_at: {
gt: Noco.ncMeta.formatDateTime(
dayjs().subtract(1, 'hour').toISOString(),
),
},
},
{
status: {
eq: JobStatus.ACTIVE,
},
},
{
status: {
eq: JobStatus.WAITING,
},
},
{
status: {
eq: JobStatus.DELAYED,
},
},
],
},
],
},
}).then((jobs) => {
return jobs.map((job) => {
if (job.fk_user_id === req.user.id) {
return job;
} else {
const { result, ...rest } = job;
return rest;
}
});
});
}
}

4
packages/nocodb/src/utils/acl.ts

@ -143,6 +143,9 @@ const permissionScopes = {
'extensionCreate', 'extensionCreate',
'extensionUpdate', 'extensionUpdate',
'extensionDelete', 'extensionDelete',
// Jobs
'jobList',
], ],
}; };
@ -209,6 +212,7 @@ const rolePermissions:
extensionList: true, extensionList: true,
extensionRead: true, extensionRead: true,
jobList: true,
commentList: true, commentList: true,
commentsCount: true, commentsCount: true,
auditListRow: true, auditListRow: true,

3
packages/nocodb/src/utils/globals.ts

@ -51,6 +51,7 @@ export enum MetaTable {
COMMENTS = 'nc_comments', COMMENTS = 'nc_comments',
USER_COMMENTS_NOTIFICATIONS_PREFERENCE = 'nc_user_comment_notifications_preference', USER_COMMENTS_NOTIFICATIONS_PREFERENCE = 'nc_user_comment_notifications_preference',
COMMENTS_REACTIONS = 'nc_comment_reactions', COMMENTS_REACTIONS = 'nc_comment_reactions',
JOBS = 'nc_jobs',
} }
export enum MetaTableOldV2 { export enum MetaTableOldV2 {
@ -171,6 +172,7 @@ export enum CacheScope {
DASHBOARD_PROJECT_DB_PROJECT_LINKING = 'dashboardProjectDBProjectLinking', DASHBOARD_PROJECT_DB_PROJECT_LINKING = 'dashboardProjectDBProjectLinking',
SINGLE_QUERY = 'singleQuery', SINGLE_QUERY = 'singleQuery',
JOBS = 'nc_jobs', JOBS = 'nc_jobs',
JOBS_POLLING = 'nc_jobs_polling',
PRESIGNED_URL = 'presignedUrl', PRESIGNED_URL = 'presignedUrl',
STORE = 'store', STORE = 'store',
PROJECT_ALIAS = 'baseAlias', PROJECT_ALIAS = 'baseAlias',
@ -281,6 +283,7 @@ export const RootScopeTables = {
MetaTable.PLUGIN, MetaTable.PLUGIN,
MetaTable.STORE, MetaTable.STORE,
MetaTable.NOTIFICATION, MetaTable.NOTIFICATION,
MetaTable.JOBS,
// Temporarily added need to be discussed within team // Temporarily added need to be discussed within team
MetaTable.AUDIT, MetaTable.AUDIT,
], ],

Loading…
Cancel
Save