Browse Source

feat: revise link data streaming

Signed-off-by: mertmit <mertmit99@gmail.com>
feat/export-nest-dir-restructure
mertmit 2 years ago committed by starbirdtech383
parent
commit
b4aa7c23be
  1. 97
      packages/nocodb-nest/src/modules/jobs/export-import/duplicate.processor.ts
  2. 200
      packages/nocodb-nest/src/modules/jobs/export-import/export.service.ts

97
packages/nocodb-nest/src/modules/jobs/export-import/duplicate.processor.ts

@ -2,7 +2,6 @@ import { Readable } from 'stream';
import { Process, Processor } from '@nestjs/bull'; import { Process, Processor } from '@nestjs/bull';
import { Job } from 'bull'; import { Job } from 'bull';
import papaparse from 'papaparse'; import papaparse from 'papaparse';
import { UITypes } from 'nocodb-sdk';
import { Base, Column, Model, Project } from '../../../models'; import { Base, Column, Model, Project } from '../../../models';
import { ProjectsService } from '../../../services/projects.service'; import { ProjectsService } from '../../../services/projects.service';
import { findWithIdentifier } from '../../../helpers/exportImportHelpers'; import { findWithIdentifier } from '../../../helpers/exportImportHelpers';
@ -87,7 +86,7 @@ export class DuplicateProcessor {
} }
const handledLinks = []; const handledLinks = [];
const lChunk: Record<string, any[]> = {}; // colId: { rowId, childId }[] const lChunk: Record<string, any[]> = {}; // fk_mm_model_id: { rowId, childId }[]
for (const sourceModel of models) { for (const sourceModel of models) {
const dataStream = new Readable({ const dataStream = new Readable({
@ -103,6 +102,7 @@ export class DuplicateProcessor {
linkStream, linkStream,
projectId: project.id, projectId: project.id,
modelId: sourceModel.id, modelId: sourceModel.id,
handledMmList: handledLinks,
}); });
const headers: string[] = []; const headers: string[] = [];
@ -191,41 +191,54 @@ export class DuplicateProcessor {
}); });
}); });
const lHeaders: string[] = []; let headersFound = false;
const mmParentChild: any = {};
let childIndex = -1;
let parentIndex = -1;
let columnIndex = -1;
let pkIndex = -1; const mmColumns: Record<string, Column> = {};
const mmParentChild: any = {};
await new Promise((resolve) => { await new Promise((resolve) => {
papaparse.parse(linkStream, { papaparse.parse(linkStream, {
newline: '\r\n', newline: '\r\n',
step: async (results, parser) => { step: async (results, parser) => {
if (!lHeaders.length) { if (!headersFound) {
parser.pause(); for (const [i, header] of Object.entries(results.data)) {
for (const header of results.data) { if (header === 'child') {
if (header === 'pk') { childIndex = parseInt(i);
lHeaders.push(null); } else if (header === 'parent') {
pkIndex = lHeaders.length - 1; parentIndex = parseInt(i);
continue; } else if (header === 'column') {
columnIndex = parseInt(i);
} }
const id = idMap.get(header); }
if (id) { headersFound = true;
} else {
if (results.errors.length === 0) {
const child = results.data[childIndex];
const parent = results.data[parentIndex];
const columnId = results.data[columnIndex];
if (child && parent && columnId) {
if (mmColumns[columnId]) {
// push to chunk
const mmModelId =
mmColumns[columnId].colOptions.fk_mm_model_id;
const mm = mmParentChild[mmModelId];
lChunk[mmModelId].push({
[mm.parent]: parent,
[mm.child]: child,
});
} else {
// get column for the first time
parser.pause();
const col = await Column.get({ const col = await Column.get({
base_id: dupBaseId, base_id: dupBaseId,
colId: id, colId: findWithIdentifier(idMap, columnId),
}); });
if (
col.uidt === UITypes.LinkToAnotherRecord &&
col.colOptions.fk_mm_model_id &&
handledLinks.includes(col.colOptions.fk_mm_model_id)
) {
lHeaders.push(null);
} else {
if (
col.uidt === UITypes.LinkToAnotherRecord &&
col.colOptions.fk_mm_model_id &&
!handledLinks.includes(col.colOptions.fk_mm_model_id)
) {
const colOptions = const colOptions =
await col.getColOptions<LinkToAnotherRecordColumn>(); await col.getColOptions<LinkToAnotherRecordColumn>();
@ -237,27 +250,23 @@ export class DuplicateProcessor {
child: vChildCol.column_name, child: vChildCol.column_name,
}; };
mmColumns[columnId] = col;
handledLinks.push(col.colOptions.fk_mm_model_id); handledLinks.push(col.colOptions.fk_mm_model_id);
}
lHeaders.push(col.colOptions.fk_mm_model_id);
lChunk[col.colOptions.fk_mm_model_id] = [];
}
}
}
parser.resume();
} else {
if (results.errors.length === 0) {
for (let i = 0; i < lHeaders.length; i++) {
if (!lHeaders[i]) continue;
const mm = mmParentChild[lHeaders[i]]; const mmModelId = col.colOptions.fk_mm_model_id;
for (const rel of results.data[i].split(',')) { // create chunk
if (rel.trim() === '') continue; lChunk[mmModelId] = [];
lChunk[lHeaders[i]].push({
[mm.parent]: rel, // push to chunk
[mm.child]: results.data[pkIndex], const mm = mmParentChild[mmModelId];
lChunk[mmModelId].push({
[mm.parent]: parent,
[mm.child]: child,
}); });
parser.resume();
} }
} }
} }

200
packages/nocodb-nest/src/modules/jobs/export-import/export.service.ts

@ -2,6 +2,7 @@ import { Readable } from 'stream';
import { UITypes, ViewTypes } from 'nocodb-sdk'; import { UITypes, ViewTypes } from 'nocodb-sdk';
import { unparse } from 'papaparse'; import { unparse } from 'papaparse';
import { Injectable } from '@nestjs/common'; import { Injectable } from '@nestjs/common';
import NcConnectionMgrv2 from '../../../utils/common/NcConnectionMgrv2';
import { getViewAndModelByAliasOrId } from '../../../modules/datas/helpers'; import { getViewAndModelByAliasOrId } from '../../../modules/datas/helpers';
import { import {
clearPrefix, clearPrefix,
@ -11,6 +12,7 @@ import NcPluginMgrv2 from '../../../helpers/NcPluginMgrv2';
import { NcError } from '../../../helpers/catchError'; import { NcError } from '../../../helpers/catchError';
import { Base, Model, Project } from '../../../models'; import { Base, Model, Project } from '../../../models';
import { DatasService } from '../../../services/datas.service'; import { DatasService } from '../../../services/datas.service';
import type { BaseModelSqlv2 } from '../../../db/BaseModelSqlv2';
import type { LinkToAnotherRecordColumn, View } from '../../../models'; import type { LinkToAnotherRecordColumn, View } from '../../../models';
@Injectable() @Injectable()
@ -239,8 +241,9 @@ export class ExportService {
projectId: string; projectId: string;
modelId: string; modelId: string;
viewId?: string; viewId?: string;
handledMmList?: string[];
}) { }) {
const { dataStream, linkStream } = param; const { dataStream, linkStream, handledMmList } = param;
const { model, view } = await getViewAndModelByAliasOrId({ const { model, view } = await getViewAndModelByAliasOrId({
projectName: param.projectId, projectName: param.projectId,
@ -248,18 +251,21 @@ export class ExportService {
viewName: param.viewId, viewName: param.viewId,
}); });
const base = await Base.get(model.base_id);
await model.getColumns(); await model.getColumns();
const pkMap = new Map<string, string>(); const pkMap = new Map<string, string>();
const fields = model.columns const fields = model.columns
.filter((c) => c.colOptions?.type !== 'hm') .filter((c) => c.colOptions?.type !== 'hm' && c.colOptions?.type !== 'mm')
.map((c) => c.title) .map((c) => c.title)
.join(','); .join(',');
for (const column of model.columns.filter( for (const column of model.columns.filter(
(c) => (col) =>
c.uidt === UITypes.LinkToAnotherRecord && c.colOptions?.type !== 'hm', col.uidt === UITypes.LinkToAnotherRecord &&
col.colOptions?.type !== 'hm',
)) { )) {
const relatedTable = await ( const relatedTable = await (
(await column.getColOptions()) as LinkToAnotherRecordColumn (await column.getColOptions()) as LinkToAnotherRecordColumn
@ -270,49 +276,41 @@ export class ExportService {
pkMap.set(column.id, relatedTable.primaryKey.title); pkMap.set(column.id, relatedTable.primaryKey.title);
} }
dataStream.setEncoding('utf8'); const mmColumns = model.columns.filter(
(col) =>
col.uidt === UITypes.LinkToAnotherRecord &&
col.colOptions?.type === 'mm',
);
linkStream.setEncoding('utf8'); const hasLink = mmColumns.length > 0;
const limit = 200; dataStream.setEncoding('utf8');
const offset = 0;
const primaryKey = model.columns.find((c) => c.pk); const baseModel = await Model.getBaseModelSQL({
id: model.id,
viewId: view?.id,
dbDriver: await NcConnectionMgrv2.get(base),
});
const formatData = (data: any) => { const formatData = (data: any) => {
const linkData = [];
for (const row of data) { for (const row of data) {
const pkValue = primaryKey ? row[primaryKey.title] : undefined;
const linkRow = {};
for (const [k, v] of Object.entries(row)) { for (const [k, v] of Object.entries(row)) {
const col = model.columns.find((c) => c.title === k); const col = model.columns.find((c) => c.title === k);
if (col) { if (col) {
if (col.pk) linkRow['pk'] = pkValue;
const colId = `${col.project_id}::${col.base_id}::${col.fk_model_id}::${col.id}`; const colId = `${col.project_id}::${col.base_id}::${col.fk_model_id}::${col.id}`;
switch (col.uidt) { switch (col.uidt) {
case UITypes.LinkToAnotherRecord: case UITypes.LinkToAnotherRecord:
{ {
if (col.system || col.colOptions.type === 'hm') break; if (col.system || col.colOptions.type !== 'bt') break;
const pkList = [];
const links = Array.isArray(v) ? v : [v];
for (const link of links) { if (v) {
if (link) { for (const [k, val] of Object.entries(v)) {
for (const [k, val] of Object.entries(link)) {
if (k === pkMap.get(col.id)) { if (k === pkMap.get(col.id)) {
pkList.push(val); row[colId] = val;
} }
} }
} }
} }
if (col.colOptions.type === 'mm') {
linkRow[colId] = pkList.join(',');
} else {
row[colId] = pkList[0];
}
}
break; break;
case UITypes.Attachment: case UITypes.Attachment:
try { try {
@ -326,6 +324,7 @@ export class ExportService {
case UITypes.Lookup: case UITypes.Lookup:
case UITypes.Rollup: case UITypes.Rollup:
case UITypes.Barcode: case UITypes.Barcode:
case UITypes.QrCode:
// skip these types // skip these types
break; break;
default: default:
@ -335,16 +334,18 @@ export class ExportService {
delete row[k]; delete row[k];
} }
} }
linkData.push(linkRow);
} }
return { data, linkData }; return { data };
}; };
const limit = 200;
const offset = 0;
try { try {
await this.recursiveRead( await this.recursiveRead(
formatData, formatData,
baseModel,
dataStream, dataStream,
linkStream,
model, model,
view, view,
offset, offset,
@ -356,12 +357,86 @@ export class ExportService {
console.error(e); console.error(e);
throw e; throw e;
} }
if (hasLink) {
linkStream.setEncoding('utf8');
for (const mm of mmColumns) {
if (handledMmList.includes(mm.colOptions?.fk_mm_model_id)) continue;
const mmModel = await Model.get(mm.colOptions?.fk_mm_model_id);
await mmModel.getColumns();
const childColumn = mmModel.columns.find(
(col) => col.id === mm.colOptions?.fk_mm_child_column_id,
);
const parentColumn = mmModel.columns.find(
(col) => col.id === mm.colOptions?.fk_mm_parent_column_id,
);
const childColumnTitle = childColumn.title;
const parentColumnTitle = parentColumn.title;
const mmFields = mmModel.columns
.filter((c) => c.uidt === UITypes.ForeignKey)
.map((c) => c.title)
.join(',');
const mmFormatData = (data: any) => {
data.map((d) => {
d.column = mm.id;
d.child = d[childColumnTitle];
d.parent = d[parentColumnTitle];
delete d[childColumnTitle];
delete d[parentColumnTitle];
return d;
});
return { data };
};
const mmLimit = 200;
const mmOffset = 0;
const mmBase =
mmModel.base_id === base.id ? base : await Base.get(mmModel.base_id);
const mmBaseModel = await Model.getBaseModelSQL({
id: mmModel.id,
dbDriver: await NcConnectionMgrv2.get(mmBase),
});
try {
await this.recursiveLinkRead(
mmFormatData,
mmBaseModel,
linkStream,
mmModel,
undefined,
mmOffset,
mmLimit,
mmFields,
true,
);
} catch (e) {
console.error(e);
throw e;
}
handledMmList.push(mm.colOptions?.fk_mm_model_id);
}
linkStream.push(null);
} else {
linkStream.push(null);
}
} }
async recursiveRead( async recursiveRead(
formatter: (data: any) => { data: any; linkData: any }, formatter: (data: any) => { data: any },
baseModel: BaseModelSqlv2,
stream: Readable, stream: Readable,
linkStream: Readable,
model: Model, model: Model,
view: View, view: View,
offset: number, offset: number,
@ -371,24 +446,73 @@ export class ExportService {
): Promise<void> { ): Promise<void> {
return new Promise((resolve, reject) => { return new Promise((resolve, reject) => {
this.datasService this.datasService
.getDataList({ model, view, query: { limit, offset, fields } }) .getDataList({
model,
view,
query: { limit, offset, fields },
baseModel,
})
.then((result) => { .then((result) => {
try { try {
if (!header) { if (!header) {
stream.push('\r\n'); stream.push('\r\n');
linkStream.push('\r\n');
} }
const { data, linkData } = formatter(result.list); const { data } = formatter(result.list);
stream.push(unparse(data, { header })); stream.push(unparse(data, { header }));
linkStream.push(unparse(linkData, { header }));
if (result.pageInfo.isLastPage) { if (result.pageInfo.isLastPage) {
stream.push(null); stream.push(null);
linkStream.push(null);
resolve(); resolve();
} else { } else {
this.recursiveRead( this.recursiveRead(
formatter, formatter,
baseModel,
stream, stream,
model,
view,
offset + limit,
limit,
fields,
).then(resolve);
}
} catch (e) {
reject(e);
}
});
});
}
async recursiveLinkRead(
formatter: (data: any) => { data: any },
baseModel: BaseModelSqlv2,
linkStream: Readable,
model: Model,
view: View,
offset: number,
limit: number,
fields: string,
header = false,
): Promise<void> {
return new Promise((resolve, reject) => {
this.datasService
.getDataList({
model,
view,
query: { limit, offset, fields },
baseModel,
})
.then((result) => {
try {
if (!header) {
linkStream.push('\r\n');
}
const { data } = formatter(result.list);
if (data) linkStream.push(unparse(data, { header }));
if (result.pageInfo.isLastPage) {
resolve();
} else {
this.recursiveLinkRead(
formatter,
baseModel,
linkStream, linkStream,
model, model,
view, view,

Loading…
Cancel
Save