Browse Source

feat: duplicate without writing to file

Signed-off-by: mertmit <mertmit99@gmail.com>
feat/export-nest
mertmit 2 years ago
parent
commit
b771c188ba
  1. 258
      packages/nocodb-nest/src/modules/jobs/export-import/duplicate.processor.ts
  2. 83
      packages/nocodb-nest/src/modules/jobs/export-import/export.service.ts

258
packages/nocodb-nest/src/modules/jobs/export-import/duplicate.processor.ts

@ -1,3 +1,4 @@
import { Readable } from 'stream';
import { import {
OnQueueActive, OnQueueActive,
OnQueueCompleted, OnQueueCompleted,
@ -5,13 +6,20 @@ import {
Process, Process,
Processor, Processor,
} from '@nestjs/bull'; } from '@nestjs/bull';
import { Base, Project } from 'src/models'; import { Base, Column, Model, Project } from 'src/models';
import { Job } from 'bull'; import { Job } from 'bull';
import { ProjectsService } from 'src/services/projects.service'; import { ProjectsService } from 'src/services/projects.service';
import boxen from 'boxen'; import boxen from 'boxen';
import { generateUniqueName } from 'src/helpers/exportImportHelpers'; import papaparse from 'papaparse';
import {
findWithIdentifier,
generateUniqueName,
} from 'src/helpers/exportImportHelpers';
import { BulkDataAliasService } from 'src/services/bulk-data-alias.service';
import { UITypes } from 'nocodb-sdk';
import { ExportService } from './export.service'; import { ExportService } from './export.service';
import { ImportService } from './import.service'; import { ImportService } from './import.service';
import type { LinkToAnotherRecordColumn } from 'src/models';
@Processor('duplicate') @Processor('duplicate')
export class DuplicateProcessor { export class DuplicateProcessor {
@ -19,6 +27,7 @@ export class DuplicateProcessor {
private exportService: ExportService, private exportService: ExportService,
private importService: ImportService, private importService: ImportService,
private projectsService: ProjectsService, private projectsService: ProjectsService,
private bulkDataService: BulkDataAliasService,
) {} ) {}
@OnQueueActive() @OnQueueActive()
@ -49,6 +58,16 @@ export class DuplicateProcessor {
@Process('duplicate') @Process('duplicate')
async duplicateBase(job: Job) { async duplicateBase(job: Job) {
console.time('duplicateBase');
let start = process.hrtime();
const elapsedTime = function (label?: string) {
const elapsedS = process.hrtime(start)[0].toFixed(3);
const elapsedMs = process.hrtime(start)[1] / 1000000;
if (label) console.log(`${label}: ${elapsedS}s ${elapsedMs}ms`);
start = process.hrtime();
};
const param: { projectId: string; baseId?: string; req: any } = job.data; const param: { projectId: string; baseId?: string; req: any } = job.data;
const user = (param.req as any).user; const user = (param.req as any).user;
@ -67,12 +86,17 @@ export class DuplicateProcessor {
throw new Error(`Base not found!`); throw new Error(`Base not found!`);
} }
const exported = await this.exportService.exportBase({ const models = (await base.getModels()).filter(
path: `${job.name}_${job.id}`, (m) => !m.mm && m.type === 'table',
baseId: base.id, );
const exportedModels = await this.exportService.serializeModels({
modelIds: models.map((m) => m.id),
}); });
if (!exported) { elapsedTime('serializeModels');
if (!exportedModels) {
throw new Error(`Export failed for base '${base.id}'`); throw new Error(`Export failed for base '${base.id}'`);
} }
@ -88,15 +112,225 @@ export class DuplicateProcessor {
user: { id: user.id }, user: { id: user.id },
}); });
await this.importService.importBase({ const dupBaseId = dupProject.bases[0].id;
elapsedTime('projectCreate');
const idMap = await this.importService.importModels({
user, user,
projectId: dupProject.id, projectId: dupProject.id,
baseId: dupProject.bases[0].id, baseId: dupBaseId,
src: { data: exportedModels,
type: 'local',
path: exported.path,
},
req: param.req, req: param.req,
}); });
elapsedTime('importModels');
if (!idMap) {
throw new Error(`Import failed for base '${base.id}'`);
}
const handledLinks = [];
const lChunk: Record<string, any[]> = {}; // colId: { rowId, childId }[]
for (const sourceModel of models) {
const dataStream = new Readable({
read() {},
});
const linkStream = new Readable({
read() {},
});
this.exportService.streamModelData({
dataStream,
linkStream,
projectId: project.id,
modelId: sourceModel.id,
});
const headers: string[] = [];
let chunk = [];
const model = await Model.get(findWithIdentifier(idMap, sourceModel.id));
await new Promise((resolve) => {
papaparse.parse(dataStream, {
newline: '\r\n',
step: async (results, parser) => {
if (!headers.length) {
parser.pause();
for (const header of results.data) {
const id = idMap.get(header);
if (id) {
const col = await Column.get({
base_id: dupBaseId,
colId: id,
});
if (col.colOptions?.type === 'bt') {
const childCol = await Column.get({
base_id: dupBaseId,
colId: col.colOptions.fk_child_column_id,
});
headers.push(childCol.column_name);
} else {
headers.push(col.column_name);
}
} else {
console.log('header not found', header);
}
}
parser.resume();
} else {
if (results.errors.length === 0) {
const row = {};
for (let i = 0; i < headers.length; i++) {
if (results.data[i] !== '') {
row[headers[i]] = results.data[i];
}
}
chunk.push(row);
if (chunk.length > 1000) {
parser.pause();
try {
await this.bulkDataService.bulkDataInsert({
projectName: dupProject.id,
tableName: model.id,
body: chunk,
cookie: null,
chunkSize: chunk.length + 1,
foreign_key_checks: false,
raw: true,
});
} catch (e) {
console.log(e);
}
chunk = [];
parser.resume();
}
}
}
},
complete: async () => {
if (chunk.length > 0) {
try {
await this.bulkDataService.bulkDataInsert({
projectName: dupProject.id,
tableName: model.id,
body: chunk,
cookie: null,
chunkSize: chunk.length + 1,
foreign_key_checks: false,
raw: true,
});
} catch (e) {
console.log(e);
}
chunk = [];
}
resolve(null);
},
});
});
const lHeaders: string[] = [];
const mmParentChild: any = {};
let pkIndex = -1;
await new Promise((resolve) => {
papaparse.parse(linkStream, {
newline: '\r\n',
step: async (results, parser) => {
if (!lHeaders.length) {
parser.pause();
for (const header of results.data) {
if (header === 'pk') {
lHeaders.push(null);
pkIndex = lHeaders.length - 1;
continue;
}
const id = idMap.get(header);
if (id) {
const col = await Column.get({
base_id: dupBaseId,
colId: id,
});
if (
col.uidt === UITypes.LinkToAnotherRecord &&
col.colOptions.fk_mm_model_id &&
handledLinks.includes(col.colOptions.fk_mm_model_id)
) {
lHeaders.push(null);
} else {
if (
col.uidt === UITypes.LinkToAnotherRecord &&
col.colOptions.fk_mm_model_id &&
!handledLinks.includes(col.colOptions.fk_mm_model_id)
) {
const colOptions =
await col.getColOptions<LinkToAnotherRecordColumn>();
const vChildCol = await colOptions.getMMChildColumn();
const vParentCol = await colOptions.getMMParentColumn();
mmParentChild[col.colOptions.fk_mm_model_id] = {
parent: vParentCol.column_name,
child: vChildCol.column_name,
};
handledLinks.push(col.colOptions.fk_mm_model_id);
}
lHeaders.push(col.colOptions.fk_mm_model_id);
lChunk[col.colOptions.fk_mm_model_id] = [];
}
}
}
parser.resume();
} else {
if (results.errors.length === 0) {
for (let i = 0; i < lHeaders.length; i++) {
if (!lHeaders[i]) continue;
const mm = mmParentChild[lHeaders[i]];
for (const rel of results.data[i].split(',')) {
if (rel.trim() === '') continue;
lChunk[lHeaders[i]].push({
[mm.parent]: rel,
[mm.child]: results.data[pkIndex],
});
}
}
}
}
},
complete: async () => {
resolve(null);
},
});
});
elapsedTime(model.title);
}
for (const [k, v] of Object.entries(lChunk)) {
try {
await this.bulkDataService.bulkDataInsert({
projectName: dupProject.id,
tableName: k,
body: v,
cookie: null,
chunkSize: 1000,
foreign_key_checks: false,
raw: true,
});
} catch (e) {
console.log(e);
}
}
elapsedTime('links');
console.timeEnd('duplicateBase');
} }
} }

83
packages/nocodb-nest/src/modules/jobs/export-import/export.service.ts

@ -233,13 +233,15 @@ export class ExportService {
return serializedModels; return serializedModels;
} }
async exportModelData(param: { async streamModelData(param: {
storageAdapter: IStorageAdapterV2; dataStream: Readable;
path: string; linkStream: Readable;
projectId: string; projectId: string;
modelId: string; modelId: string;
viewId?: string; viewId?: string;
}) { }) {
const { dataStream, linkStream } = param;
const { model, view } = await getViewAndModelByAliasOrId({ const { model, view } = await getViewAndModelByAliasOrId({
projectName: param.projectId, projectName: param.projectId,
tableName: param.modelId, tableName: param.modelId,
@ -248,13 +250,13 @@ export class ExportService {
await model.getColumns(); await model.getColumns();
const hasLink = model.columns.some(
(c) =>
c.uidt === UITypes.LinkToAnotherRecord && c.colOptions?.type === 'mm',
);
const pkMap = new Map<string, string>(); const pkMap = new Map<string, string>();
const fields = model.columns
.filter((c) => c.colOptions?.type !== 'hm')
.map((c) => c.title)
.join(',');
for (const column of model.columns.filter( for (const column of model.columns.filter(
(c) => (c) =>
c.uidt === UITypes.LinkToAnotherRecord && c.colOptions?.type !== 'hm', c.uidt === UITypes.LinkToAnotherRecord && c.colOptions?.type !== 'hm',
@ -268,31 +270,9 @@ export class ExportService {
pkMap.set(column.id, relatedTable.primaryKey.title); pkMap.set(column.id, relatedTable.primaryKey.title);
} }
const readableStream = new Readable({ dataStream.setEncoding('utf8');
read() {},
});
const readableLinkStream = new Readable({ linkStream.setEncoding('utf8');
read() {},
});
readableStream.setEncoding('utf8');
readableLinkStream.setEncoding('utf8');
const storageAdapter = param.storageAdapter;
const uploadPromise = storageAdapter.fileCreateByStream(
`${param.path}/${model.id}.csv`,
readableStream,
);
const uploadLinkPromise = hasLink
? storageAdapter.fileCreateByStream(
`${param.path}/${model.id}_links.csv`,
readableLinkStream,
)
: Promise.resolve();
const limit = 200; const limit = 200;
const offset = 0; const offset = 0;
@ -364,19 +344,16 @@ export class ExportService {
try { try {
await this.recursiveRead( await this.recursiveRead(
formatData, formatData,
readableStream, dataStream,
readableLinkStream, linkStream,
model, model,
view, view,
offset, offset,
limit, limit,
fields,
true, true,
); );
await uploadPromise;
await uploadLinkPromise;
} catch (e) { } catch (e) {
await storageAdapter.fileDelete(`${param.path}/${model.id}.csv`);
await storageAdapter.fileDelete(`${param.path}/${model.id}_links.csv`);
console.error(e); console.error(e);
throw e; throw e;
} }
@ -390,11 +367,12 @@ export class ExportService {
view: View, view: View,
offset: number, offset: number,
limit: number, limit: number,
fields: string,
header = false, header = false,
): Promise<void> { ): Promise<void> {
return new Promise((resolve, reject) => { return new Promise((resolve, reject) => {
this.datasService this.datasService
.getDataList({ model, view, query: { limit, offset } }) .getDataList({ model, view, query: { limit, offset, fields } })
.then((result) => { .then((result) => {
try { try {
if (!header) { if (!header) {
@ -417,6 +395,7 @@ export class ExportService {
view, view,
offset + limit, offset + limit,
limit, limit,
fields,
).then(resolve); ).then(resolve);
} }
} catch (e) { } catch (e) {
@ -469,12 +448,32 @@ export class ExportService {
); );
for (const model of models) { for (const model of models) {
await this.exportModelData({ const dataStream = new Readable({
storageAdapter, read() {},
path: `${destPath}/data`, });
const linkStream = new Readable({
read() {},
});
const uploadPromise = storageAdapter.fileCreateByStream(
`${param.path}/data/${model.id}.csv`,
dataStream,
);
const uploadLinkPromise = storageAdapter.fileCreateByStream(
`${param.path}/data/${model.id}_links.csv`,
linkStream,
);
this.streamModelData({
dataStream,
linkStream,
projectId: project.id, projectId: project.id,
modelId: model.id, modelId: model.id,
}); });
await Promise.all([uploadPromise, uploadLinkPromise]);
} }
} catch (e) { } catch (e) {
throw NcError.badRequest(e); throw NcError.badRequest(e);

Loading…
Cancel
Save