Browse Source

feat: update export/import to new stream structure

Signed-off-by: mertmit <mertmit99@gmail.com>
feat/export-nest-dir-restructure
mertmit 2 years ago committed by starbirdtech383
parent
commit
dd8094af4f
  1. 41
      packages/nocodb-nest/src/modules/jobs/export-import/export.service.ts
  2. 127
      packages/nocodb-nest/src/modules/jobs/export-import/import.service.ts

41
packages/nocodb-nest/src/modules/jobs/export-import/export.service.ts

@ -569,6 +569,17 @@ export class ExportService {
readableStream, readableStream,
); );
const handledMmList: string[] = [];
const combinedLinkStream = new Readable({
read() {},
});
const uploadLinkPromise = storageAdapter.fileCreateByStream(
`${destPath}/data/links.csv`,
combinedLinkStream,
);
for (const model of models) { for (const model of models) {
const dataStream = new Readable({ const dataStream = new Readable({
read() {}, read() {},
@ -578,25 +589,41 @@ export class ExportService {
read() {}, read() {},
}); });
const linkPromise = new Promise((resolve) => {
linkStream.on('data', (chunk) => {
combinedLinkStream.push(chunk);
});
linkStream.on('end', () => {
combinedLinkStream.push('\r\n');
resolve(null);
});
linkStream.on('error', (e) => {
console.error(e);
resolve(null);
});
});
const uploadPromise = storageAdapter.fileCreateByStream( const uploadPromise = storageAdapter.fileCreateByStream(
`${param.path}/data/${model.id}.csv`, `${destPath}/data/${model.id}.csv`,
dataStream, dataStream,
); );
const uploadLinkPromise = storageAdapter.fileCreateByStream(
`${param.path}/data/${model.id}_links.csv`,
linkStream,
);
this.streamModelData({ this.streamModelData({
dataStream, dataStream,
linkStream, linkStream,
projectId: project.id, projectId: project.id,
modelId: model.id, modelId: model.id,
handledMmList,
}); });
await Promise.all([uploadPromise, uploadLinkPromise]); await Promise.all([uploadPromise, linkPromise]);
} }
combinedLinkStream.push(null);
await uploadLinkPromise;
} catch (e) { } catch (e) {
throw NcError.badRequest(e); throw NcError.badRequest(e);
} }

127
packages/nocodb-nest/src/modules/jobs/export-import/import.service.ts

@ -713,11 +713,9 @@ export class ImportService {
if (idMap) { if (idMap) {
const files = await storageAdapter.getDirectoryList(`${path}/data`); const files = await storageAdapter.getDirectoryList(`${path}/data`);
const dataFiles = files.filter( const dataFiles = files.filter(
(file) => !file.match(/_links\.csv$/), (file) => !file.match(/links\.csv$/),
);
const linkFiles = files.filter((file) =>
file.match(/_links\.csv$/),
); );
const linkFile = `${path}/data/links.csv`;
for (const file of dataFiles) { for (const file of dataFiles) {
const readStream = await storageAdapter.fileReadByStream( const readStream = await storageAdapter.fileReadByStream(
@ -825,62 +823,68 @@ export class ImportService {
// reset timer // reset timer
elapsedTime(); elapsedTime();
for (const file of linkFiles) { const linkReadStream = await storageAdapter.fileReadByStream(
const readStream = await storageAdapter.fileReadByStream( linkFile,
`${path}/data/${file}`,
); );
const headers: string[] = []; const lChunk: Record<string, any[]> = {}; // fk_mm_model_id: { rowId, childId }[]
const mmParentChild: any = {};
const chunk: Record<string, any[]> = {}; // colId: { rowId, childId }[]
const modelId = findWithIdentifier( let headersFound = false;
idMap,
file.replace(/_links\.csv$/, ''),
);
const model = await Model.get(modelId);
let pkIndex = -1; let childIndex = -1;
let parentIndex = -1;
let columnIndex = -1;
debugLog(`Linking ${model.title}...`); const mmColumns: Record<string, Column> = {};
const mmParentChild: any = {};
await new Promise((resolve) => { await new Promise((resolve) => {
papaparse.parse(readStream, { papaparse.parse(linkReadStream, {
newline: '\r\n', newline: '\r\n',
step: async (results, parser) => { step: async (results, parser) => {
if (!headers.length) { if (!headersFound) {
parser.pause(); for (const [i, header] of Object.entries(results.data)) {
for (const header of results.data) { if (header === 'child') {
if (header === 'pk') { childIndex = parseInt(i);
headers.push(null); } else if (header === 'parent') {
pkIndex = headers.length - 1; parentIndex = parseInt(i);
continue; } else if (header === 'column') {
columnIndex = parseInt(i);
} }
const id = idMap.get(header); }
if (id) { headersFound = true;
const col = await Column.get({
base_id: baseId,
colId: id,
});
if (
col.uidt === UITypes.LinkToAnotherRecord &&
col.colOptions.fk_mm_model_id &&
handledLinks.includes(col.colOptions.fk_mm_model_id)
) {
headers.push(null);
} else { } else {
if (results.errors.length === 0) {
if ( if (
col.uidt === UITypes.LinkToAnotherRecord && results.data[childIndex] === 'child' &&
col.colOptions.fk_mm_model_id && results.data[parentIndex] === 'parent' &&
!handledLinks.includes( results.data[columnIndex] === 'column'
col.colOptions.fk_mm_model_id,
) )
) { return;
const child = results.data[childIndex];
const parent = results.data[parentIndex];
const columnId = results.data[columnIndex];
if (child && parent && columnId) {
if (mmColumns[columnId]) {
// push to chunk
const mmModelId =
mmColumns[columnId].colOptions.fk_mm_model_id;
const mm = mmParentChild[mmModelId];
lChunk[mmModelId].push({
[mm.parent]: parent,
[mm.child]: child,
});
} else {
// get column for the first time
parser.pause();
const col = await Column.get({
colId: findWithIdentifier(idMap, columnId),
});
const colOptions = const colOptions =
await col.getColOptions<LinkToAnotherRecordColumn>(); await col.getColOptions<LinkToAnotherRecordColumn>();
const vChildCol = const vChildCol = await colOptions.getMMChildColumn();
await colOptions.getMMChildColumn();
const vParentCol = const vParentCol =
await colOptions.getMMParentColumn(); await colOptions.getMMParentColumn();
@ -889,36 +893,31 @@ export class ImportService {
child: vChildCol.column_name, child: vChildCol.column_name,
}; };
mmColumns[columnId] = col;
handledLinks.push(col.colOptions.fk_mm_model_id); handledLinks.push(col.colOptions.fk_mm_model_id);
}
headers.push(col.colOptions.fk_mm_model_id);
chunk[col.colOptions.fk_mm_model_id] = [];
}
}
}
parser.resume();
} else {
if (results.errors.length === 0) {
for (let i = 0; i < headers.length; i++) {
if (!headers[i]) continue;
const mm = mmParentChild[headers[i]]; const mmModelId = col.colOptions.fk_mm_model_id;
// create chunk
lChunk[mmModelId] = [];
for (const rel of results.data[i].split(',')) { // push to chunk
if (rel.trim() === '') continue; const mm = mmParentChild[mmModelId];
chunk[headers[i]].push({ lChunk[mmModelId].push({
[mm.parent]: rel, [mm.parent]: parent,
[mm.child]: results.data[pkIndex], [mm.child]: child,
}); });
parser.resume();
} }
} }
} }
} }
}, },
complete: async () => { complete: async () => {
for (const [k, v] of Object.entries(chunk)) { for (const [k, v] of Object.entries(lChunk)) {
try { try {
elapsedTime('prepare link chunk');
await this.bulkDataService.bulkDataInsert({ await this.bulkDataService.bulkDataInsert({
projectName: projectId, projectName: projectId,
tableName: k, tableName: k,
@ -928,7 +927,6 @@ export class ImportService {
foreign_key_checks: false, foreign_key_checks: false,
raw: true, raw: true,
}); });
elapsedTime('insert link chunk');
} catch (e) { } catch (e) {
console.log(e); console.log(e);
} }
@ -938,7 +936,6 @@ export class ImportService {
}); });
}); });
} }
}
} catch (e) { } catch (e) {
throw new Error(e); throw new Error(e);
} }

Loading…
Cancel
Save