diff --git a/packages/nocodb/src/modules/jobs/export-import/duplicate.processor.ts b/packages/nocodb/src/modules/jobs/export-import/duplicate.processor.ts index 9ac86d5eef..83a20755ed 100644 --- a/packages/nocodb/src/modules/jobs/export-import/duplicate.processor.ts +++ b/packages/nocodb/src/modules/jobs/export-import/duplicate.processor.ts @@ -247,28 +247,7 @@ export class DuplicateProcessor { externalModels, } = param; - const handledLinks = []; - const lChunks: Record = {}; // fk_mm_model_id: { rowId, childId }[] - - const insertChunks = async () => { - for (const [k, v] of Object.entries(lChunks)) { - try { - if (v.length === 0) continue; - await this.bulkDataService.bulkDataInsert({ - projectName: destProject.id, - tableName: k, - body: v, - cookie: null, - chunkSize: 1000, - foreign_key_checks: false, - raw: true, - }); - lChunks[k] = []; - } catch (e) { - console.log(e); - } - } - }; + let handledLinks = []; for (const sourceModel of sourceModels) { const dataStream = new Readable({ @@ -279,7 +258,7 @@ export class DuplicateProcessor { read() {}, }); - this.exportService.streamModelData({ + this.exportService.streamModelDataAsCsv({ dataStream, linkStream, projectId: sourceProject.id, @@ -287,178 +266,24 @@ export class DuplicateProcessor { handledMmList: handledLinks, }); - const headers: string[] = []; - let chunk = []; - const model = await Model.get(findWithIdentifier(idMap, sourceModel.id)); - await new Promise((resolve) => { - papaparse.parse(dataStream, { - newline: '\r\n', - step: async (results, parser) => { - if (!headers.length) { - parser.pause(); - for (const header of results.data) { - const id = idMap.get(header); - if (id) { - const col = await Column.get({ - base_id: destBase.id, - colId: id, - }); - if (col.colOptions?.type === 'bt') { - const childCol = await Column.get({ - base_id: destBase.id, - colId: col.colOptions.fk_child_column_id, - }); - headers.push(childCol.column_name); - } else { - headers.push(col.column_name); - } - } else { - debugLog('header not found', header); - } - } - parser.resume(); - } else { - if (results.errors.length === 0) { - const row = {}; - for (let i = 0; i < headers.length; i++) { - if (results.data[i] !== '') { - row[headers[i]] = results.data[i]; - } - } - chunk.push(row); - if (chunk.length > 1000) { - parser.pause(); - try { - await this.bulkDataService.bulkDataInsert({ - projectName: destProject.id, - tableName: model.id, - body: chunk, - cookie: null, - chunkSize: chunk.length + 1, - foreign_key_checks: false, - raw: true, - }); - } catch (e) { - console.log(e); - } - chunk = []; - parser.resume(); - } - } - } - }, - complete: async () => { - if (chunk.length > 0) { - try { - await this.bulkDataService.bulkDataInsert({ - projectName: destProject.id, - tableName: model.id, - body: chunk, - cookie: null, - chunkSize: chunk.length + 1, - foreign_key_checks: false, - raw: true, - }); - } catch (e) { - console.log(e); - } - chunk = []; - } - resolve(null); - }, - }); + await this.importService.importDataFromCsvStream({ + idMap, + dataStream, + destProject, + destBase, + destModel: model, + debugLog, }); - let headersFound = false; - - let childIndex = -1; - let parentIndex = -1; - let columnIndex = -1; - - const mmColumns: Record = {}; - const mmParentChild: any = {}; - - await new Promise((resolve) => { - papaparse.parse(linkStream, { - newline: '\r\n', - step: async (results, parser) => { - if (!headersFound) { - for (const [i, header] of Object.entries(results.data)) { - if (header === 'child') { - childIndex = parseInt(i); - } else if (header === 'parent') { - parentIndex = parseInt(i); - } else if (header === 'column') { - columnIndex = parseInt(i); - } - } - headersFound = true; - } else { - if (results.errors.length === 0) { - const child = results.data[childIndex]; - const parent = results.data[parentIndex]; - const columnId = results.data[columnIndex]; - if (child && parent && columnId) { - if (mmColumns[columnId]) { - // push to chunk - const mmModelId = - mmColumns[columnId].colOptions.fk_mm_model_id; - const mm = mmParentChild[mmModelId]; - lChunks[mmModelId].push({ - [mm.parent]: parent, - [mm.child]: child, - }); - } else { - // get column for the first time - parser.pause(); - - await insertChunks(); - - const col = await Column.get({ - base_id: destBase.id, - colId: findWithIdentifier(idMap, columnId), - }); - - const colOptions = - await col.getColOptions(); - - const vChildCol = await colOptions.getMMChildColumn(); - const vParentCol = await colOptions.getMMParentColumn(); - - mmParentChild[col.colOptions.fk_mm_model_id] = { - parent: vParentCol.column_name, - child: vChildCol.column_name, - }; - - mmColumns[columnId] = col; - - handledLinks.push(col.colOptions.fk_mm_model_id); - - const mmModelId = col.colOptions.fk_mm_model_id; - - // create chunk - lChunks[mmModelId] = []; - - // push to chunk - const mm = mmParentChild[mmModelId]; - lChunks[mmModelId].push({ - [mm.parent]: parent, - [mm.child]: child, - }); - - parser.resume(); - } - } - } - } - }, - complete: async () => { - await insertChunks(); - resolve(null); - }, - }); + handledLinks = await this.importService.importLinkFromCsvStream({ + idMap, + linkStream, + destProject, + destBase, + handledLinks, + debugLog, }); elapsedTime(hrTime, model.title); @@ -479,7 +304,7 @@ export class DuplicateProcessor { read() {}, }); - this.exportService.streamModelData({ + this.exportService.streamModelDataAsCsv({ dataStream, linkStream, projectId: sourceProject.id, @@ -506,16 +331,27 @@ export class DuplicateProcessor { base_id: destBase.id, colId: id, }); - if (col.colOptions?.type === 'bt') { - const childCol = await Column.get({ - base_id: destBase.id, - colId: col.colOptions.fk_child_column_id, - }); - headers.push(childCol.column_name); + if (col) { + if (col.colOptions?.type === 'bt') { + const childCol = await Column.get({ + base_id: destBase.id, + colId: col.colOptions.fk_child_column_id, + }); + if (childCol) { + headers.push(childCol.column_name); + } else { + headers.push(null); + debugLog('child column not found', id); + } + } else { + headers.push(col.column_name); + } } else { - headers.push(col.column_name); + headers.push(null); + debugLog('column not found', id); } } else { + headers.push(null); debugLog('header not found', header); } } @@ -524,8 +360,10 @@ export class DuplicateProcessor { if (results.errors.length === 0) { const row = {}; for (let i = 0; i < headers.length; i++) { - if (results.data[i] !== '') { - row[headers[i]] = results.data[i]; + if (headers[i]) { + if (results.data[i] !== '') { + row[headers[i]] = results.data[i]; + } } } chunk.push(row); diff --git a/packages/nocodb/src/modules/jobs/export-import/export.service.ts b/packages/nocodb/src/modules/jobs/export-import/export.service.ts index 8617d48bfe..44f5e843dc 100644 --- a/packages/nocodb/src/modules/jobs/export-import/export.service.ts +++ b/packages/nocodb/src/modules/jobs/export-import/export.service.ts @@ -293,7 +293,7 @@ export class ExportService { return serializedModels; } - async streamModelData(param: { + async streamModelDataAsCsv(param: { dataStream: Readable; linkStream: Readable; projectId: string; @@ -679,7 +679,7 @@ export class ExportService { dataStream, ); - this.streamModelData({ + this.streamModelDataAsCsv({ dataStream, linkStream, projectId: project.id, diff --git a/packages/nocodb/src/modules/jobs/export-import/import.service.ts b/packages/nocodb/src/modules/jobs/export-import/import.service.ts index e10efdc96b..550a3e52a7 100644 --- a/packages/nocodb/src/modules/jobs/export-import/import.service.ts +++ b/packages/nocodb/src/modules/jobs/export-import/import.service.ts @@ -27,6 +27,7 @@ import { HooksService } from '../../../services/hooks.service'; import { ViewsService } from '../../../services/views.service'; import NcPluginMgrv2 from '../../../helpers/NcPluginMgrv2'; import { BulkDataAliasService } from '../../../services/bulk-data-alias.service'; +import type { Readable } from 'stream'; import type { ViewCreateReqType } from 'nocodb-sdk'; import type { LinkToAnotherRecordColumn, User, View } from '../../../models'; @@ -1125,6 +1126,13 @@ export class ImportService { console.log(...args); }; + const destProject = await Project.get(projectId); + const destBase = await Base.get(baseId); + + if (!destProject || !destBase) { + throw NcError.badRequest('Project or Base not found'); + } + let start = process.hrtime(); const elapsedTime = function (label?: string) { @@ -1148,7 +1156,7 @@ export class ImportService { elapsedTime('read schema'); // store fk_mm_model_id (mm) to link once - const handledLinks = []; + let handledLinks = []; const idMap = await this.importModels({ user, @@ -1174,9 +1182,6 @@ export class ImportService { `${path}/data/${file}`, ); - const headers: string[] = []; - let chunk = []; - const modelId = findWithIdentifier( idMap, file.replace(/\.csv$/, ''), @@ -1186,90 +1191,16 @@ export class ImportService { debugLog(`Importing ${model.title}...`); - await new Promise((resolve) => { - papaparse.parse(readStream, { - newline: '\r\n', - step: async (results, parser) => { - if (!headers.length) { - parser.pause(); - for (const header of results.data) { - const id = idMap.get(header); - if (id) { - const col = await Column.get({ - base_id: baseId, - colId: id, - }); - if (col.colOptions?.type === 'bt') { - const childCol = await Column.get({ - base_id: baseId, - colId: col.colOptions.fk_child_column_id, - }); - headers.push(childCol.column_name); - } else { - headers.push(col.column_name); - } - } else { - debugLog(header); - } - } - parser.resume(); - } else { - if (results.errors.length === 0) { - const row = {}; - for (let i = 0; i < headers.length; i++) { - if (results.data[i] !== '') { - row[headers[i]] = results.data[i]; - } - } - chunk.push(row); - if (chunk.length > 100) { - parser.pause(); - elapsedTime('before import chunk'); - try { - await this.bulkDataService.bulkDataInsert({ - projectName: projectId, - tableName: modelId, - body: chunk, - cookie: null, - chunkSize: chunk.length + 1, - foreign_key_checks: false, - raw: true, - }); - } catch (e) { - debugLog(`${model.title} import throwed an error!`); - console.log(e); - } - chunk = []; - elapsedTime('after import chunk'); - parser.resume(); - } - } - } - }, - complete: async () => { - if (chunk.length > 0) { - elapsedTime('before import chunk'); - try { - await this.bulkDataService.bulkDataInsert({ - projectName: projectId, - tableName: modelId, - body: chunk, - cookie: null, - chunkSize: chunk.length + 1, - foreign_key_checks: false, - raw: true, - }); - } catch (e) { - debugLog(chunk); - console.log(e); - } - chunk = []; - elapsedTime('after import chunk'); - } - resolve(null); - }, - }); + await this.importDataFromCsvStream({ + idMap, + dataStream: readStream, + destProject, + destBase, + destModel: model, + debugLog, }); + + elapsedTime(`import ${model.title}`); } // reset timer @@ -1279,113 +1210,13 @@ export class ImportService { storageAdapter as any ).fileReadByStream(linkFile); - const lChunk: Record = {}; // fk_mm_model_id: { rowId, childId }[] - - let headersFound = false; - - let childIndex = -1; - let parentIndex = -1; - let columnIndex = -1; - - const mmColumns: Record = {}; - const mmParentChild: any = {}; - - await new Promise((resolve) => { - papaparse.parse(linkReadStream, { - newline: '\r\n', - step: async (results, parser) => { - if (!headersFound) { - for (const [i, header] of Object.entries(results.data)) { - if (header === 'child') { - childIndex = parseInt(i); - } else if (header === 'parent') { - parentIndex = parseInt(i); - } else if (header === 'column') { - columnIndex = parseInt(i); - } - } - headersFound = true; - } else { - if (results.errors.length === 0) { - if ( - results.data[childIndex] === 'child' && - results.data[parentIndex] === 'parent' && - results.data[columnIndex] === 'column' - ) - return; - const child = results.data[childIndex]; - const parent = results.data[parentIndex]; - const columnId = results.data[columnIndex]; - if (child && parent && columnId) { - if (mmColumns[columnId]) { - // push to chunk - const mmModelId = - mmColumns[columnId].colOptions.fk_mm_model_id; - const mm = mmParentChild[mmModelId]; - lChunk[mmModelId].push({ - [mm.parent]: parent, - [mm.child]: child, - }); - } else { - // get column for the first time - parser.pause(); - const col = await Column.get({ - colId: findWithIdentifier(idMap, columnId), - }); - - const colOptions = - await col.getColOptions(); - - const vChildCol = await colOptions.getMMChildColumn(); - const vParentCol = - await colOptions.getMMParentColumn(); - - mmParentChild[col.colOptions.fk_mm_model_id] = { - parent: vParentCol.column_name, - child: vChildCol.column_name, - }; - - mmColumns[columnId] = col; - - handledLinks.push(col.colOptions.fk_mm_model_id); - - const mmModelId = col.colOptions.fk_mm_model_id; - - // create chunk - lChunk[mmModelId] = []; - - // push to chunk - const mm = mmParentChild[mmModelId]; - lChunk[mmModelId].push({ - [mm.parent]: parent, - [mm.child]: child, - }); - - parser.resume(); - } - } - } - } - }, - complete: async () => { - for (const [k, v] of Object.entries(lChunk)) { - try { - await this.bulkDataService.bulkDataInsert({ - projectName: projectId, - tableName: k, - body: v, - cookie: null, - chunkSize: 1000, - foreign_key_checks: false, - raw: true, - }); - } catch (e) { - console.log(e); - } - } - resolve(null); - }, - }); + handledLinks = await this.importLinkFromCsvStream({ + idMap, + linkStream: linkReadStream, + destProject, + destBase, + handledLinks, + debugLog, }); } } catch (e) { @@ -1399,4 +1230,242 @@ export class ImportService { break; } } + + importDataFromCsvStream(param: { + idMap: Map; + dataStream: Readable; + destProject: Project; + destBase: Base; + destModel: Model; + debugLog?: (...args: any[]) => void; + }): Promise { + const { idMap, dataStream, destBase, destProject, destModel } = param; + + const debugLog = param.debugLog || (() => {}); + + const headers: string[] = []; + let chunk = []; + + return new Promise((resolve) => { + papaparse.parse(dataStream, { + newline: '\r\n', + step: async (results, parser) => { + if (!headers.length) { + parser.pause(); + for (const header of results.data) { + const id = idMap.get(header); + if (id) { + const col = await Column.get({ + base_id: destBase.id, + colId: id, + }); + if (col) { + if (col.colOptions?.type === 'bt') { + const childCol = await Column.get({ + base_id: destBase.id, + colId: col.colOptions.fk_child_column_id, + }); + if (childCol) { + headers.push(childCol.column_name); + } else { + headers.push(null); + debugLog('child column not found', header); + } + } else { + headers.push(col.column_name); + } + } else { + headers.push(null); + debugLog('column not found', header); + } + } else { + headers.push(null); + debugLog('header not found', header); + } + } + parser.resume(); + } else { + if (results.errors.length === 0) { + const row = {}; + for (let i = 0; i < headers.length; i++) { + if (headers[i]) { + if (results.data[i] !== '') { + row[headers[i]] = results.data[i]; + } + } + } + chunk.push(row); + if (chunk.length > 1000) { + parser.pause(); + try { + await this.bulkDataService.bulkDataInsert({ + projectName: destProject.id, + tableName: destModel.id, + body: chunk, + cookie: null, + chunkSize: chunk.length + 1, + foreign_key_checks: false, + raw: true, + }); + } catch (e) { + console.log(e); + } + chunk = []; + parser.resume(); + } + } + } + }, + complete: async () => { + if (chunk.length > 0) { + try { + await this.bulkDataService.bulkDataInsert({ + projectName: destProject.id, + tableName: destModel.id, + body: chunk, + cookie: null, + chunkSize: chunk.length + 1, + foreign_key_checks: false, + raw: true, + }); + } catch (e) { + console.log(e); + } + chunk = []; + } + resolve(null); + }, + }); + }); + } + + // import links and return handled links + async importLinkFromCsvStream(param: { + idMap: Map; + linkStream: Readable; + destProject: Project; + destBase: Base; + handledLinks: string[]; + debugLog?: (...args: any[]) => void; + }): Promise { + const { idMap, linkStream, destBase, destProject, handledLinks } = param; + + const debugLog = param.debugLog || (() => {}); + + const lChunks: Record = {}; // fk_mm_model_id: { rowId, childId }[] + + const insertChunks = async () => { + for (const [k, v] of Object.entries(lChunks)) { + try { + if (v.length === 0) continue; + await this.bulkDataService.bulkDataInsert({ + projectName: destProject.id, + tableName: k, + body: v, + cookie: null, + chunkSize: 1000, + foreign_key_checks: false, + raw: true, + }); + lChunks[k] = []; + } catch (e) { + console.log(e); + } + } + }; + + let headersFound = false; + + let childIndex = -1; + let parentIndex = -1; + let columnIndex = -1; + + const mmColumns: Record = {}; + const mmParentChild: any = {}; + + return new Promise((resolve) => { + papaparse.parse(linkStream, { + newline: '\r\n', + step: async (results, parser) => { + if (!headersFound) { + for (const [i, header] of Object.entries(results.data)) { + if (header === 'child') { + childIndex = parseInt(i); + } else if (header === 'parent') { + parentIndex = parseInt(i); + } else if (header === 'column') { + columnIndex = parseInt(i); + } + } + headersFound = true; + } else { + if (results.errors.length === 0) { + const child = results.data[childIndex]; + const parent = results.data[parentIndex]; + const columnId = results.data[columnIndex]; + if (child && parent && columnId) { + if (mmColumns[columnId]) { + // push to chunk + const mmModelId = + mmColumns[columnId].colOptions.fk_mm_model_id; + const mm = mmParentChild[mmModelId]; + lChunks[mmModelId].push({ + [mm.parent]: parent, + [mm.child]: child, + }); + } else { + // get column for the first time + parser.pause(); + + await insertChunks(); + + const col = await Column.get({ + base_id: destBase.id, + colId: findWithIdentifier(idMap, columnId), + }); + + if (col) { + const colOptions = + await col.getColOptions(); + + const vChildCol = await colOptions.getMMChildColumn(); + const vParentCol = await colOptions.getMMParentColumn(); + + mmParentChild[col.colOptions.fk_mm_model_id] = { + parent: vParentCol.column_name, + child: vChildCol.column_name, + }; + + mmColumns[columnId] = col; + + handledLinks.push(col.colOptions.fk_mm_model_id); + + const mmModelId = col.colOptions.fk_mm_model_id; + + // create chunk + lChunks[mmModelId] = []; + + // push to chunk + const mm = mmParentChild[mmModelId]; + lChunks[mmModelId].push({ + [mm.parent]: parent, + [mm.child]: child, + }); + } else { + debugLog('column not found', columnId); + } + + parser.resume(); + } + } + } + } + }, + complete: async () => { + await insertChunks(); + resolve(handledLinks); + }, + }); + }); + } }