Browse Source

refactor: duplicated code to reusable function

* improved naming on functions

Signed-off-by: mertmit <mertmit99@gmail.com>
pull/5621/head
mertmit 1 year ago
parent
commit
9fab04b265
  1. 240
      packages/nocodb/src/modules/jobs/export-import/duplicate.processor.ts
  2. 4
      packages/nocodb/src/modules/jobs/export-import/export.service.ts
  3. 457
      packages/nocodb/src/modules/jobs/export-import/import.service.ts

240
packages/nocodb/src/modules/jobs/export-import/duplicate.processor.ts

@ -247,28 +247,7 @@ export class DuplicateProcessor {
externalModels,
} = param;
const handledLinks = [];
const lChunks: Record<string, any[]> = {}; // fk_mm_model_id: { rowId, childId }[]
const insertChunks = async () => {
for (const [k, v] of Object.entries(lChunks)) {
try {
if (v.length === 0) continue;
await this.bulkDataService.bulkDataInsert({
projectName: destProject.id,
tableName: k,
body: v,
cookie: null,
chunkSize: 1000,
foreign_key_checks: false,
raw: true,
});
lChunks[k] = [];
} catch (e) {
console.log(e);
}
}
};
let handledLinks = [];
for (const sourceModel of sourceModels) {
const dataStream = new Readable({
@ -279,7 +258,7 @@ export class DuplicateProcessor {
read() {},
});
this.exportService.streamModelData({
this.exportService.streamModelDataAsCsv({
dataStream,
linkStream,
projectId: sourceProject.id,
@ -287,178 +266,24 @@ export class DuplicateProcessor {
handledMmList: handledLinks,
});
const headers: string[] = [];
let chunk = [];
const model = await Model.get(findWithIdentifier(idMap, sourceModel.id));
await new Promise((resolve) => {
papaparse.parse(dataStream, {
newline: '\r\n',
step: async (results, parser) => {
if (!headers.length) {
parser.pause();
for (const header of results.data) {
const id = idMap.get(header);
if (id) {
const col = await Column.get({
base_id: destBase.id,
colId: id,
});
if (col.colOptions?.type === 'bt') {
const childCol = await Column.get({
base_id: destBase.id,
colId: col.colOptions.fk_child_column_id,
});
headers.push(childCol.column_name);
} else {
headers.push(col.column_name);
}
} else {
debugLog('header not found', header);
}
}
parser.resume();
} else {
if (results.errors.length === 0) {
const row = {};
for (let i = 0; i < headers.length; i++) {
if (results.data[i] !== '') {
row[headers[i]] = results.data[i];
}
}
chunk.push(row);
if (chunk.length > 1000) {
parser.pause();
try {
await this.bulkDataService.bulkDataInsert({
projectName: destProject.id,
tableName: model.id,
body: chunk,
cookie: null,
chunkSize: chunk.length + 1,
foreign_key_checks: false,
raw: true,
});
} catch (e) {
console.log(e);
}
chunk = [];
parser.resume();
}
}
}
},
complete: async () => {
if (chunk.length > 0) {
try {
await this.bulkDataService.bulkDataInsert({
projectName: destProject.id,
tableName: model.id,
body: chunk,
cookie: null,
chunkSize: chunk.length + 1,
foreign_key_checks: false,
raw: true,
});
} catch (e) {
console.log(e);
}
chunk = [];
}
resolve(null);
},
});
await this.importService.importDataFromCsvStream({
idMap,
dataStream,
destProject,
destBase,
destModel: model,
debugLog,
});
let headersFound = false;
let childIndex = -1;
let parentIndex = -1;
let columnIndex = -1;
const mmColumns: Record<string, Column> = {};
const mmParentChild: any = {};
await new Promise((resolve) => {
papaparse.parse(linkStream, {
newline: '\r\n',
step: async (results, parser) => {
if (!headersFound) {
for (const [i, header] of Object.entries(results.data)) {
if (header === 'child') {
childIndex = parseInt(i);
} else if (header === 'parent') {
parentIndex = parseInt(i);
} else if (header === 'column') {
columnIndex = parseInt(i);
}
}
headersFound = true;
} else {
if (results.errors.length === 0) {
const child = results.data[childIndex];
const parent = results.data[parentIndex];
const columnId = results.data[columnIndex];
if (child && parent && columnId) {
if (mmColumns[columnId]) {
// push to chunk
const mmModelId =
mmColumns[columnId].colOptions.fk_mm_model_id;
const mm = mmParentChild[mmModelId];
lChunks[mmModelId].push({
[mm.parent]: parent,
[mm.child]: child,
});
} else {
// get column for the first time
parser.pause();
await insertChunks();
const col = await Column.get({
base_id: destBase.id,
colId: findWithIdentifier(idMap, columnId),
});
const colOptions =
await col.getColOptions<LinkToAnotherRecordColumn>();
const vChildCol = await colOptions.getMMChildColumn();
const vParentCol = await colOptions.getMMParentColumn();
mmParentChild[col.colOptions.fk_mm_model_id] = {
parent: vParentCol.column_name,
child: vChildCol.column_name,
};
mmColumns[columnId] = col;
handledLinks.push(col.colOptions.fk_mm_model_id);
const mmModelId = col.colOptions.fk_mm_model_id;
// create chunk
lChunks[mmModelId] = [];
// push to chunk
const mm = mmParentChild[mmModelId];
lChunks[mmModelId].push({
[mm.parent]: parent,
[mm.child]: child,
});
parser.resume();
}
}
}
}
},
complete: async () => {
await insertChunks();
resolve(null);
},
});
handledLinks = await this.importService.importLinkFromCsvStream({
idMap,
linkStream,
destProject,
destBase,
handledLinks,
debugLog,
});
elapsedTime(hrTime, model.title);
@ -479,7 +304,7 @@ export class DuplicateProcessor {
read() {},
});
this.exportService.streamModelData({
this.exportService.streamModelDataAsCsv({
dataStream,
linkStream,
projectId: sourceProject.id,
@ -506,16 +331,27 @@ export class DuplicateProcessor {
base_id: destBase.id,
colId: id,
});
if (col.colOptions?.type === 'bt') {
const childCol = await Column.get({
base_id: destBase.id,
colId: col.colOptions.fk_child_column_id,
});
headers.push(childCol.column_name);
if (col) {
if (col.colOptions?.type === 'bt') {
const childCol = await Column.get({
base_id: destBase.id,
colId: col.colOptions.fk_child_column_id,
});
if (childCol) {
headers.push(childCol.column_name);
} else {
headers.push(null);
debugLog('child column not found', id);
}
} else {
headers.push(col.column_name);
}
} else {
headers.push(col.column_name);
headers.push(null);
debugLog('column not found', id);
}
} else {
headers.push(null);
debugLog('header not found', header);
}
}
@ -524,8 +360,10 @@ export class DuplicateProcessor {
if (results.errors.length === 0) {
const row = {};
for (let i = 0; i < headers.length; i++) {
if (results.data[i] !== '') {
row[headers[i]] = results.data[i];
if (headers[i]) {
if (results.data[i] !== '') {
row[headers[i]] = results.data[i];
}
}
}
chunk.push(row);

4
packages/nocodb/src/modules/jobs/export-import/export.service.ts

@ -293,7 +293,7 @@ export class ExportService {
return serializedModels;
}
async streamModelData(param: {
async streamModelDataAsCsv(param: {
dataStream: Readable;
linkStream: Readable;
projectId: string;
@ -679,7 +679,7 @@ export class ExportService {
dataStream,
);
this.streamModelData({
this.streamModelDataAsCsv({
dataStream,
linkStream,
projectId: project.id,

457
packages/nocodb/src/modules/jobs/export-import/import.service.ts

@ -27,6 +27,7 @@ import { HooksService } from '../../../services/hooks.service';
import { ViewsService } from '../../../services/views.service';
import NcPluginMgrv2 from '../../../helpers/NcPluginMgrv2';
import { BulkDataAliasService } from '../../../services/bulk-data-alias.service';
import type { Readable } from 'stream';
import type { ViewCreateReqType } from 'nocodb-sdk';
import type { LinkToAnotherRecordColumn, User, View } from '../../../models';
@ -1125,6 +1126,13 @@ export class ImportService {
console.log(...args);
};
const destProject = await Project.get(projectId);
const destBase = await Base.get(baseId);
if (!destProject || !destBase) {
throw NcError.badRequest('Project or Base not found');
}
let start = process.hrtime();
const elapsedTime = function (label?: string) {
@ -1148,7 +1156,7 @@ export class ImportService {
elapsedTime('read schema');
// store fk_mm_model_id (mm) to link once
const handledLinks = [];
let handledLinks = [];
const idMap = await this.importModels({
user,
@ -1174,9 +1182,6 @@ export class ImportService {
`${path}/data/${file}`,
);
const headers: string[] = [];
let chunk = [];
const modelId = findWithIdentifier(
idMap,
file.replace(/\.csv$/, ''),
@ -1186,90 +1191,16 @@ export class ImportService {
debugLog(`Importing ${model.title}...`);
await new Promise((resolve) => {
papaparse.parse(readStream, {
newline: '\r\n',
step: async (results, parser) => {
if (!headers.length) {
parser.pause();
for (const header of results.data) {
const id = idMap.get(header);
if (id) {
const col = await Column.get({
base_id: baseId,
colId: id,
});
if (col.colOptions?.type === 'bt') {
const childCol = await Column.get({
base_id: baseId,
colId: col.colOptions.fk_child_column_id,
});
headers.push(childCol.column_name);
} else {
headers.push(col.column_name);
}
} else {
debugLog(header);
}
}
parser.resume();
} else {
if (results.errors.length === 0) {
const row = {};
for (let i = 0; i < headers.length; i++) {
if (results.data[i] !== '') {
row[headers[i]] = results.data[i];
}
}
chunk.push(row);
if (chunk.length > 100) {
parser.pause();
elapsedTime('before import chunk');
try {
await this.bulkDataService.bulkDataInsert({
projectName: projectId,
tableName: modelId,
body: chunk,
cookie: null,
chunkSize: chunk.length + 1,
foreign_key_checks: false,
raw: true,
});
} catch (e) {
debugLog(`${model.title} import throwed an error!`);
console.log(e);
}
chunk = [];
elapsedTime('after import chunk');
parser.resume();
}
}
}
},
complete: async () => {
if (chunk.length > 0) {
elapsedTime('before import chunk');
try {
await this.bulkDataService.bulkDataInsert({
projectName: projectId,
tableName: modelId,
body: chunk,
cookie: null,
chunkSize: chunk.length + 1,
foreign_key_checks: false,
raw: true,
});
} catch (e) {
debugLog(chunk);
console.log(e);
}
chunk = [];
elapsedTime('after import chunk');
}
resolve(null);
},
});
await this.importDataFromCsvStream({
idMap,
dataStream: readStream,
destProject,
destBase,
destModel: model,
debugLog,
});
elapsedTime(`import ${model.title}`);
}
// reset timer
@ -1279,113 +1210,13 @@ export class ImportService {
storageAdapter as any
).fileReadByStream(linkFile);
const lChunk: Record<string, any[]> = {}; // fk_mm_model_id: { rowId, childId }[]
let headersFound = false;
let childIndex = -1;
let parentIndex = -1;
let columnIndex = -1;
const mmColumns: Record<string, Column> = {};
const mmParentChild: any = {};
await new Promise((resolve) => {
papaparse.parse(linkReadStream, {
newline: '\r\n',
step: async (results, parser) => {
if (!headersFound) {
for (const [i, header] of Object.entries(results.data)) {
if (header === 'child') {
childIndex = parseInt(i);
} else if (header === 'parent') {
parentIndex = parseInt(i);
} else if (header === 'column') {
columnIndex = parseInt(i);
}
}
headersFound = true;
} else {
if (results.errors.length === 0) {
if (
results.data[childIndex] === 'child' &&
results.data[parentIndex] === 'parent' &&
results.data[columnIndex] === 'column'
)
return;
const child = results.data[childIndex];
const parent = results.data[parentIndex];
const columnId = results.data[columnIndex];
if (child && parent && columnId) {
if (mmColumns[columnId]) {
// push to chunk
const mmModelId =
mmColumns[columnId].colOptions.fk_mm_model_id;
const mm = mmParentChild[mmModelId];
lChunk[mmModelId].push({
[mm.parent]: parent,
[mm.child]: child,
});
} else {
// get column for the first time
parser.pause();
const col = await Column.get({
colId: findWithIdentifier(idMap, columnId),
});
const colOptions =
await col.getColOptions<LinkToAnotherRecordColumn>();
const vChildCol = await colOptions.getMMChildColumn();
const vParentCol =
await colOptions.getMMParentColumn();
mmParentChild[col.colOptions.fk_mm_model_id] = {
parent: vParentCol.column_name,
child: vChildCol.column_name,
};
mmColumns[columnId] = col;
handledLinks.push(col.colOptions.fk_mm_model_id);
const mmModelId = col.colOptions.fk_mm_model_id;
// create chunk
lChunk[mmModelId] = [];
// push to chunk
const mm = mmParentChild[mmModelId];
lChunk[mmModelId].push({
[mm.parent]: parent,
[mm.child]: child,
});
parser.resume();
}
}
}
}
},
complete: async () => {
for (const [k, v] of Object.entries(lChunk)) {
try {
await this.bulkDataService.bulkDataInsert({
projectName: projectId,
tableName: k,
body: v,
cookie: null,
chunkSize: 1000,
foreign_key_checks: false,
raw: true,
});
} catch (e) {
console.log(e);
}
}
resolve(null);
},
});
handledLinks = await this.importLinkFromCsvStream({
idMap,
linkStream: linkReadStream,
destProject,
destBase,
handledLinks,
debugLog,
});
}
} catch (e) {
@ -1399,4 +1230,242 @@ export class ImportService {
break;
}
}
importDataFromCsvStream(param: {
idMap: Map<string, string>;
dataStream: Readable;
destProject: Project;
destBase: Base;
destModel: Model;
debugLog?: (...args: any[]) => void;
}): Promise<void> {
const { idMap, dataStream, destBase, destProject, destModel } = param;
const debugLog = param.debugLog || (() => {});
const headers: string[] = [];
let chunk = [];
return new Promise((resolve) => {
papaparse.parse(dataStream, {
newline: '\r\n',
step: async (results, parser) => {
if (!headers.length) {
parser.pause();
for (const header of results.data) {
const id = idMap.get(header);
if (id) {
const col = await Column.get({
base_id: destBase.id,
colId: id,
});
if (col) {
if (col.colOptions?.type === 'bt') {
const childCol = await Column.get({
base_id: destBase.id,
colId: col.colOptions.fk_child_column_id,
});
if (childCol) {
headers.push(childCol.column_name);
} else {
headers.push(null);
debugLog('child column not found', header);
}
} else {
headers.push(col.column_name);
}
} else {
headers.push(null);
debugLog('column not found', header);
}
} else {
headers.push(null);
debugLog('header not found', header);
}
}
parser.resume();
} else {
if (results.errors.length === 0) {
const row = {};
for (let i = 0; i < headers.length; i++) {
if (headers[i]) {
if (results.data[i] !== '') {
row[headers[i]] = results.data[i];
}
}
}
chunk.push(row);
if (chunk.length > 1000) {
parser.pause();
try {
await this.bulkDataService.bulkDataInsert({
projectName: destProject.id,
tableName: destModel.id,
body: chunk,
cookie: null,
chunkSize: chunk.length + 1,
foreign_key_checks: false,
raw: true,
});
} catch (e) {
console.log(e);
}
chunk = [];
parser.resume();
}
}
}
},
complete: async () => {
if (chunk.length > 0) {
try {
await this.bulkDataService.bulkDataInsert({
projectName: destProject.id,
tableName: destModel.id,
body: chunk,
cookie: null,
chunkSize: chunk.length + 1,
foreign_key_checks: false,
raw: true,
});
} catch (e) {
console.log(e);
}
chunk = [];
}
resolve(null);
},
});
});
}
// import links and return handled links
async importLinkFromCsvStream(param: {
idMap: Map<string, string>;
linkStream: Readable;
destProject: Project;
destBase: Base;
handledLinks: string[];
debugLog?: (...args: any[]) => void;
}): Promise<string[]> {
const { idMap, linkStream, destBase, destProject, handledLinks } = param;
const debugLog = param.debugLog || (() => {});
const lChunks: Record<string, any[]> = {}; // fk_mm_model_id: { rowId, childId }[]
const insertChunks = async () => {
for (const [k, v] of Object.entries(lChunks)) {
try {
if (v.length === 0) continue;
await this.bulkDataService.bulkDataInsert({
projectName: destProject.id,
tableName: k,
body: v,
cookie: null,
chunkSize: 1000,
foreign_key_checks: false,
raw: true,
});
lChunks[k] = [];
} catch (e) {
console.log(e);
}
}
};
let headersFound = false;
let childIndex = -1;
let parentIndex = -1;
let columnIndex = -1;
const mmColumns: Record<string, Column> = {};
const mmParentChild: any = {};
return new Promise((resolve) => {
papaparse.parse(linkStream, {
newline: '\r\n',
step: async (results, parser) => {
if (!headersFound) {
for (const [i, header] of Object.entries(results.data)) {
if (header === 'child') {
childIndex = parseInt(i);
} else if (header === 'parent') {
parentIndex = parseInt(i);
} else if (header === 'column') {
columnIndex = parseInt(i);
}
}
headersFound = true;
} else {
if (results.errors.length === 0) {
const child = results.data[childIndex];
const parent = results.data[parentIndex];
const columnId = results.data[columnIndex];
if (child && parent && columnId) {
if (mmColumns[columnId]) {
// push to chunk
const mmModelId =
mmColumns[columnId].colOptions.fk_mm_model_id;
const mm = mmParentChild[mmModelId];
lChunks[mmModelId].push({
[mm.parent]: parent,
[mm.child]: child,
});
} else {
// get column for the first time
parser.pause();
await insertChunks();
const col = await Column.get({
base_id: destBase.id,
colId: findWithIdentifier(idMap, columnId),
});
if (col) {
const colOptions =
await col.getColOptions<LinkToAnotherRecordColumn>();
const vChildCol = await colOptions.getMMChildColumn();
const vParentCol = await colOptions.getMMParentColumn();
mmParentChild[col.colOptions.fk_mm_model_id] = {
parent: vParentCol.column_name,
child: vChildCol.column_name,
};
mmColumns[columnId] = col;
handledLinks.push(col.colOptions.fk_mm_model_id);
const mmModelId = col.colOptions.fk_mm_model_id;
// create chunk
lChunks[mmModelId] = [];
// push to chunk
const mm = mmParentChild[mmModelId];
lChunks[mmModelId].push({
[mm.parent]: parent,
[mm.child]: child,
});
} else {
debugLog('column not found', columnId);
}
parser.resume();
}
}
}
}
},
complete: async () => {
await insertChunks();
resolve(handledLinks);
},
});
});
}
}

Loading…
Cancel
Save