Browse Source

feat: update export/import to new stream structure

Signed-off-by: mertmit <mertmit99@gmail.com>
feat/export-nest
mertmit 2 years ago
parent
commit
33d8350462
  1. 41
      packages/nocodb-nest/src/modules/jobs/export-import/export.service.ts
  2. 207
      packages/nocodb-nest/src/modules/jobs/export-import/import.service.ts

41
packages/nocodb-nest/src/modules/jobs/export-import/export.service.ts

@ -569,6 +569,17 @@ export class ExportService {
readableStream,
);
const handledMmList: string[] = [];
const combinedLinkStream = new Readable({
read() {},
});
const uploadLinkPromise = storageAdapter.fileCreateByStream(
`${destPath}/data/links.csv`,
combinedLinkStream,
);
for (const model of models) {
const dataStream = new Readable({
read() {},
@ -578,25 +589,41 @@ export class ExportService {
read() {},
});
const linkPromise = new Promise((resolve) => {
linkStream.on('data', (chunk) => {
combinedLinkStream.push(chunk);
});
linkStream.on('end', () => {
combinedLinkStream.push('\r\n');
resolve(null);
});
linkStream.on('error', (e) => {
console.error(e);
resolve(null);
});
});
const uploadPromise = storageAdapter.fileCreateByStream(
`${param.path}/data/${model.id}.csv`,
`${destPath}/data/${model.id}.csv`,
dataStream,
);
const uploadLinkPromise = storageAdapter.fileCreateByStream(
`${param.path}/data/${model.id}_links.csv`,
linkStream,
);
this.streamModelData({
dataStream,
linkStream,
projectId: project.id,
modelId: model.id,
handledMmList,
});
await Promise.all([uploadPromise, uploadLinkPromise]);
await Promise.all([uploadPromise, linkPromise]);
}
combinedLinkStream.push(null);
await uploadLinkPromise;
} catch (e) {
throw NcError.badRequest(e);
}

207
packages/nocodb-nest/src/modules/jobs/export-import/import.service.ts

@ -713,11 +713,9 @@ export class ImportService {
if (idMap) {
const files = await storageAdapter.getDirectoryList(`${path}/data`);
const dataFiles = files.filter(
(file) => !file.match(/_links\.csv$/),
);
const linkFiles = files.filter((file) =>
file.match(/_links\.csv$/),
(file) => !file.match(/links\.csv$/),
);
const linkFile = `${path}/data/links.csv`;
for (const file of dataFiles) {
const readStream = await storageAdapter.fileReadByStream(
@ -825,119 +823,118 @@ export class ImportService {
// reset timer
elapsedTime();
for (const file of linkFiles) {
const readStream = await storageAdapter.fileReadByStream(
`${path}/data/${file}`,
);
const linkReadStream = await storageAdapter.fileReadByStream(
linkFile,
);
const headers: string[] = [];
const mmParentChild: any = {};
const chunk: Record<string, any[]> = {}; // colId: { rowId, childId }[]
const lChunk: Record<string, any[]> = {}; // fk_mm_model_id: { rowId, childId }[]
let headersFound = false;
let childIndex = -1;
let parentIndex = -1;
let columnIndex = -1;
const mmColumns: Record<string, Column> = {};
const mmParentChild: any = {};
await new Promise((resolve) => {
papaparse.parse(linkReadStream, {
newline: '\r\n',
step: async (results, parser) => {
if (!headersFound) {
for (const [i, header] of Object.entries(results.data)) {
if (header === 'child') {
childIndex = parseInt(i);
} else if (header === 'parent') {
parentIndex = parseInt(i);
} else if (header === 'column') {
columnIndex = parseInt(i);
}
}
headersFound = true;
} else {
if (results.errors.length === 0) {
if (
results.data[childIndex] === 'child' &&
results.data[parentIndex] === 'parent' &&
results.data[columnIndex] === 'column'
)
return;
const child = results.data[childIndex];
const parent = results.data[parentIndex];
const columnId = results.data[columnIndex];
if (child && parent && columnId) {
if (mmColumns[columnId]) {
// push to chunk
const mmModelId =
mmColumns[columnId].colOptions.fk_mm_model_id;
const mm = mmParentChild[mmModelId];
lChunk[mmModelId].push({
[mm.parent]: parent,
[mm.child]: child,
});
} else {
// get column for the first time
parser.pause();
const col = await Column.get({
colId: findWithIdentifier(idMap, columnId),
});
const modelId = findWithIdentifier(
idMap,
file.replace(/_links\.csv$/, ''),
);
const model = await Model.get(modelId);
const colOptions =
await col.getColOptions<LinkToAnotherRecordColumn>();
let pkIndex = -1;
const vChildCol = await colOptions.getMMChildColumn();
const vParentCol =
await colOptions.getMMParentColumn();
debugLog(`Linking ${model.title}...`);
mmParentChild[col.colOptions.fk_mm_model_id] = {
parent: vParentCol.column_name,
child: vChildCol.column_name,
};
await new Promise((resolve) => {
papaparse.parse(readStream, {
newline: '\r\n',
step: async (results, parser) => {
if (!headers.length) {
parser.pause();
for (const header of results.data) {
if (header === 'pk') {
headers.push(null);
pkIndex = headers.length - 1;
continue;
}
const id = idMap.get(header);
if (id) {
const col = await Column.get({
base_id: baseId,
colId: id,
});
if (
col.uidt === UITypes.LinkToAnotherRecord &&
col.colOptions.fk_mm_model_id &&
handledLinks.includes(col.colOptions.fk_mm_model_id)
) {
headers.push(null);
} else {
if (
col.uidt === UITypes.LinkToAnotherRecord &&
col.colOptions.fk_mm_model_id &&
!handledLinks.includes(
col.colOptions.fk_mm_model_id,
)
) {
const colOptions =
await col.getColOptions<LinkToAnotherRecordColumn>();
const vChildCol =
await colOptions.getMMChildColumn();
const vParentCol =
await colOptions.getMMParentColumn();
mmParentChild[col.colOptions.fk_mm_model_id] = {
parent: vParentCol.column_name,
child: vChildCol.column_name,
};
handledLinks.push(col.colOptions.fk_mm_model_id);
}
headers.push(col.colOptions.fk_mm_model_id);
chunk[col.colOptions.fk_mm_model_id] = [];
}
}
}
parser.resume();
} else {
if (results.errors.length === 0) {
for (let i = 0; i < headers.length; i++) {
if (!headers[i]) continue;
mmColumns[columnId] = col;
const mm = mmParentChild[headers[i]];
handledLinks.push(col.colOptions.fk_mm_model_id);
for (const rel of results.data[i].split(',')) {
if (rel.trim() === '') continue;
chunk[headers[i]].push({
[mm.parent]: rel,
[mm.child]: results.data[pkIndex],
});
}
const mmModelId = col.colOptions.fk_mm_model_id;
// create chunk
lChunk[mmModelId] = [];
// push to chunk
const mm = mmParentChild[mmModelId];
lChunk[mmModelId].push({
[mm.parent]: parent,
[mm.child]: child,
});
parser.resume();
}
}
}
},
complete: async () => {
for (const [k, v] of Object.entries(chunk)) {
try {
elapsedTime('prepare link chunk');
await this.bulkDataService.bulkDataInsert({
projectName: projectId,
tableName: k,
body: v,
cookie: null,
chunkSize: 1000,
foreign_key_checks: false,
raw: true,
});
elapsedTime('insert link chunk');
} catch (e) {
console.log(e);
}
}
},
complete: async () => {
for (const [k, v] of Object.entries(lChunk)) {
try {
await this.bulkDataService.bulkDataInsert({
projectName: projectId,
tableName: k,
body: v,
cookie: null,
chunkSize: 1000,
foreign_key_checks: false,
raw: true,
});
} catch (e) {
console.log(e);
}
resolve(null);
},
});
}
resolve(null);
},
});
}
});
}
} catch (e) {
throw new Error(e);

Loading…
Cancel
Save