diff --git a/packages/nocodb-nest/src/modules/jobs/export-import/duplicate.processor.ts b/packages/nocodb-nest/src/modules/jobs/export-import/duplicate.processor.ts index 412552b519..eaf026d911 100644 --- a/packages/nocodb-nest/src/modules/jobs/export-import/duplicate.processor.ts +++ b/packages/nocodb-nest/src/modules/jobs/export-import/duplicate.processor.ts @@ -1,3 +1,4 @@ +import { Readable } from 'stream'; import { OnQueueActive, OnQueueCompleted, @@ -5,13 +6,20 @@ import { Process, Processor, } from '@nestjs/bull'; -import { Base, Project } from 'src/models'; +import { Base, Column, Model, Project } from 'src/models'; import { Job } from 'bull'; import { ProjectsService } from 'src/services/projects.service'; import boxen from 'boxen'; -import { generateUniqueName } from 'src/helpers/exportImportHelpers'; +import papaparse from 'papaparse'; +import { + findWithIdentifier, + generateUniqueName, +} from 'src/helpers/exportImportHelpers'; +import { BulkDataAliasService } from 'src/services/bulk-data-alias.service'; +import { UITypes } from 'nocodb-sdk'; import { ExportService } from './export.service'; import { ImportService } from './import.service'; +import type { LinkToAnotherRecordColumn } from 'src/models'; @Processor('duplicate') export class DuplicateProcessor { @@ -19,6 +27,7 @@ export class DuplicateProcessor { private exportService: ExportService, private importService: ImportService, private projectsService: ProjectsService, + private bulkDataService: BulkDataAliasService, ) {} @OnQueueActive() @@ -49,6 +58,16 @@ export class DuplicateProcessor { @Process('duplicate') async duplicateBase(job: Job) { + console.time('duplicateBase'); + let start = process.hrtime(); + + const elapsedTime = function (label?: string) { + const elapsedS = process.hrtime(start)[0].toFixed(3); + const elapsedMs = process.hrtime(start)[1] / 1000000; + if (label) console.log(`${label}: ${elapsedS}s ${elapsedMs}ms`); + start = process.hrtime(); + }; + const param: { projectId: string; baseId?: string; req: any } = job.data; const user = (param.req as any).user; @@ -67,12 +86,17 @@ export class DuplicateProcessor { throw new Error(`Base not found!`); } - const exported = await this.exportService.exportBase({ - path: `${job.name}_${job.id}`, - baseId: base.id, + const models = (await base.getModels()).filter( + (m) => !m.mm && m.type === 'table', + ); + + const exportedModels = await this.exportService.serializeModels({ + modelIds: models.map((m) => m.id), }); - if (!exported) { + elapsedTime('serializeModels'); + + if (!exportedModels) { throw new Error(`Export failed for base '${base.id}'`); } @@ -88,15 +112,225 @@ export class DuplicateProcessor { user: { id: user.id }, }); - await this.importService.importBase({ + const dupBaseId = dupProject.bases[0].id; + + elapsedTime('projectCreate'); + + const idMap = await this.importService.importModels({ user, projectId: dupProject.id, - baseId: dupProject.bases[0].id, - src: { - type: 'local', - path: exported.path, - }, + baseId: dupBaseId, + data: exportedModels, req: param.req, }); + + elapsedTime('importModels'); + + if (!idMap) { + throw new Error(`Import failed for base '${base.id}'`); + } + + const handledLinks = []; + const lChunk: Record = {}; // colId: { rowId, childId }[] + + for (const sourceModel of models) { + const dataStream = new Readable({ + read() {}, + }); + + const linkStream = new Readable({ + read() {}, + }); + + this.exportService.streamModelData({ + dataStream, + linkStream, + projectId: project.id, + modelId: sourceModel.id, + }); + + const headers: string[] = []; + let chunk = []; + + const model = await Model.get(findWithIdentifier(idMap, sourceModel.id)); + + await new Promise((resolve) => { + papaparse.parse(dataStream, { + newline: '\r\n', + step: async (results, parser) => { + if (!headers.length) { + parser.pause(); + for (const header of results.data) { + const id = idMap.get(header); + if (id) { + const col = await Column.get({ + base_id: dupBaseId, + colId: id, + }); + if (col.colOptions?.type === 'bt') { + const childCol = await Column.get({ + base_id: dupBaseId, + colId: col.colOptions.fk_child_column_id, + }); + headers.push(childCol.column_name); + } else { + headers.push(col.column_name); + } + } else { + console.log('header not found', header); + } + } + parser.resume(); + } else { + if (results.errors.length === 0) { + const row = {}; + for (let i = 0; i < headers.length; i++) { + if (results.data[i] !== '') { + row[headers[i]] = results.data[i]; + } + } + chunk.push(row); + if (chunk.length > 1000) { + parser.pause(); + try { + await this.bulkDataService.bulkDataInsert({ + projectName: dupProject.id, + tableName: model.id, + body: chunk, + cookie: null, + chunkSize: chunk.length + 1, + foreign_key_checks: false, + raw: true, + }); + } catch (e) { + console.log(e); + } + chunk = []; + parser.resume(); + } + } + } + }, + complete: async () => { + if (chunk.length > 0) { + try { + await this.bulkDataService.bulkDataInsert({ + projectName: dupProject.id, + tableName: model.id, + body: chunk, + cookie: null, + chunkSize: chunk.length + 1, + foreign_key_checks: false, + raw: true, + }); + } catch (e) { + console.log(e); + } + chunk = []; + } + resolve(null); + }, + }); + }); + + const lHeaders: string[] = []; + const mmParentChild: any = {}; + + let pkIndex = -1; + + await new Promise((resolve) => { + papaparse.parse(linkStream, { + newline: '\r\n', + step: async (results, parser) => { + if (!lHeaders.length) { + parser.pause(); + for (const header of results.data) { + if (header === 'pk') { + lHeaders.push(null); + pkIndex = lHeaders.length - 1; + continue; + } + const id = idMap.get(header); + if (id) { + const col = await Column.get({ + base_id: dupBaseId, + colId: id, + }); + if ( + col.uidt === UITypes.LinkToAnotherRecord && + col.colOptions.fk_mm_model_id && + handledLinks.includes(col.colOptions.fk_mm_model_id) + ) { + lHeaders.push(null); + } else { + if ( + col.uidt === UITypes.LinkToAnotherRecord && + col.colOptions.fk_mm_model_id && + !handledLinks.includes(col.colOptions.fk_mm_model_id) + ) { + const colOptions = + await col.getColOptions(); + + const vChildCol = await colOptions.getMMChildColumn(); + const vParentCol = await colOptions.getMMParentColumn(); + + mmParentChild[col.colOptions.fk_mm_model_id] = { + parent: vParentCol.column_name, + child: vChildCol.column_name, + }; + + handledLinks.push(col.colOptions.fk_mm_model_id); + } + lHeaders.push(col.colOptions.fk_mm_model_id); + lChunk[col.colOptions.fk_mm_model_id] = []; + } + } + } + parser.resume(); + } else { + if (results.errors.length === 0) { + for (let i = 0; i < lHeaders.length; i++) { + if (!lHeaders[i]) continue; + + const mm = mmParentChild[lHeaders[i]]; + + for (const rel of results.data[i].split(',')) { + if (rel.trim() === '') continue; + lChunk[lHeaders[i]].push({ + [mm.parent]: rel, + [mm.child]: results.data[pkIndex], + }); + } + } + } + } + }, + complete: async () => { + resolve(null); + }, + }); + }); + + elapsedTime(model.title); + } + + for (const [k, v] of Object.entries(lChunk)) { + try { + await this.bulkDataService.bulkDataInsert({ + projectName: dupProject.id, + tableName: k, + body: v, + cookie: null, + chunkSize: 1000, + foreign_key_checks: false, + raw: true, + }); + } catch (e) { + console.log(e); + } + } + + elapsedTime('links'); + console.timeEnd('duplicateBase'); } } diff --git a/packages/nocodb-nest/src/modules/jobs/export-import/export.service.ts b/packages/nocodb-nest/src/modules/jobs/export-import/export.service.ts index 29d1687f2c..5b185d8d24 100644 --- a/packages/nocodb-nest/src/modules/jobs/export-import/export.service.ts +++ b/packages/nocodb-nest/src/modules/jobs/export-import/export.service.ts @@ -233,13 +233,15 @@ export class ExportService { return serializedModels; } - async exportModelData(param: { - storageAdapter: IStorageAdapterV2; - path: string; + async streamModelData(param: { + dataStream: Readable; + linkStream: Readable; projectId: string; modelId: string; viewId?: string; }) { + const { dataStream, linkStream } = param; + const { model, view } = await getViewAndModelByAliasOrId({ projectName: param.projectId, tableName: param.modelId, @@ -248,13 +250,13 @@ export class ExportService { await model.getColumns(); - const hasLink = model.columns.some( - (c) => - c.uidt === UITypes.LinkToAnotherRecord && c.colOptions?.type === 'mm', - ); - const pkMap = new Map(); + const fields = model.columns + .filter((c) => c.colOptions?.type !== 'hm') + .map((c) => c.title) + .join(','); + for (const column of model.columns.filter( (c) => c.uidt === UITypes.LinkToAnotherRecord && c.colOptions?.type !== 'hm', @@ -268,31 +270,9 @@ export class ExportService { pkMap.set(column.id, relatedTable.primaryKey.title); } - const readableStream = new Readable({ - read() {}, - }); + dataStream.setEncoding('utf8'); - const readableLinkStream = new Readable({ - read() {}, - }); - - readableStream.setEncoding('utf8'); - - readableLinkStream.setEncoding('utf8'); - - const storageAdapter = param.storageAdapter; - - const uploadPromise = storageAdapter.fileCreateByStream( - `${param.path}/${model.id}.csv`, - readableStream, - ); - - const uploadLinkPromise = hasLink - ? storageAdapter.fileCreateByStream( - `${param.path}/${model.id}_links.csv`, - readableLinkStream, - ) - : Promise.resolve(); + linkStream.setEncoding('utf8'); const limit = 200; const offset = 0; @@ -364,19 +344,16 @@ export class ExportService { try { await this.recursiveRead( formatData, - readableStream, - readableLinkStream, + dataStream, + linkStream, model, view, offset, limit, + fields, true, ); - await uploadPromise; - await uploadLinkPromise; } catch (e) { - await storageAdapter.fileDelete(`${param.path}/${model.id}.csv`); - await storageAdapter.fileDelete(`${param.path}/${model.id}_links.csv`); console.error(e); throw e; } @@ -390,11 +367,12 @@ export class ExportService { view: View, offset: number, limit: number, + fields: string, header = false, ): Promise { return new Promise((resolve, reject) => { this.datasService - .getDataList({ model, view, query: { limit, offset } }) + .getDataList({ model, view, query: { limit, offset, fields } }) .then((result) => { try { if (!header) { @@ -417,6 +395,7 @@ export class ExportService { view, offset + limit, limit, + fields, ).then(resolve); } } catch (e) { @@ -469,12 +448,32 @@ export class ExportService { ); for (const model of models) { - await this.exportModelData({ - storageAdapter, - path: `${destPath}/data`, + const dataStream = new Readable({ + read() {}, + }); + + const linkStream = new Readable({ + read() {}, + }); + + const uploadPromise = storageAdapter.fileCreateByStream( + `${param.path}/data/${model.id}.csv`, + dataStream, + ); + + const uploadLinkPromise = storageAdapter.fileCreateByStream( + `${param.path}/data/${model.id}_links.csv`, + linkStream, + ); + + this.streamModelData({ + dataStream, + linkStream, projectId: project.id, modelId: model.id, }); + + await Promise.all([uploadPromise, uploadLinkPromise]); } } catch (e) { throw NcError.badRequest(e);