Browse Source

Merge pull request #7497 from nocodb/fix/at-imp-var

feat: at import improved
pull/7496/head
Mert E 9 months ago committed by GitHub
parent
commit
7ceba8ce32
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 76
      packages/nocodb/src/modules/jobs/jobs/at-import/at-import.processor.ts
  2. 24
      packages/nocodb/src/modules/jobs/jobs/at-import/helpers/fetchAT.ts
  3. 476
      packages/nocodb/src/modules/jobs/jobs/at-import/helpers/readAndProcessData.ts
  4. 65
      packages/nocodb/src/plugins/s3/S3.ts
  5. 189
      packages/nocodb/src/services/attachments.service.ts

76
packages/nocodb/src/modules/jobs/jobs/at-import/at-import.processor.ts

@ -9,12 +9,13 @@ import { Process, Processor } from '@nestjs/bull';
import { Job } from 'bull';
import { isLinksOrLTAR } from 'nocodb-sdk';
import debug from 'debug';
import { Logger } from '@nestjs/common';
import { JobsLogService } from '../jobs-log.service';
import FetchAT from './helpers/fetchAT';
import { importData, importLTARData } from './helpers/readAndProcessData';
import { importData } from './helpers/readAndProcessData';
import EntityMap from './helpers/EntityMap';
import type { UserType } from 'nocodb-sdk';
import type { Base } from '~/models';
import { type Base, Source } from '~/models';
import { sanitizeColumnName } from '~/helpers';
import { AttachmentsService } from '~/services/attachments.service';
import { ColumnsService } from '~/services/columns.service';
@ -34,6 +35,8 @@ import { JOBS_QUEUE, JobTypes } from '~/interface/Jobs';
import { GridColumnsService } from '~/services/grid-columns.service';
import { TelemetryService } from '~/services/telemetry.service';
const logger = new Logger('at-import');
dayjs.extend(utc);
const selectColors = {
@ -221,7 +224,11 @@ export class AtImportProcessor {
rtc.migrationSkipLog.log.push(
`tn[${tbl}] cn[${col}] type[${type}] :: ${reason}`,
);
logWarning(`Skipped ${tbl} :: ${col} (${type}) :: ${reason}`);
logWarning(
`Skipped${tbl ? ` ${tbl} :: ` : ``}${col ? `${col}` : ``}${
type ? ` (${type})` : ``
} :: ${reason}`,
);
};
// mapping table
@ -937,7 +944,7 @@ export class AtImportProcessor {
?.length
) {
if (enableErrorLogs)
console.log(`## Invalid column IDs mapped; skip`);
logger.log(`## Invalid column IDs mapped; skip`);
updateMigrationSkipLog(
srcTableSchema.title,
@ -1017,7 +1024,7 @@ export class AtImportProcessor {
);
}
if (enableErrorLogs)
console.log(
logger.log(
`## Failed to configure ${nestedLookupTbl.length} lookups`,
);
break;
@ -1156,7 +1163,7 @@ export class AtImportProcessor {
?.length
) {
if (enableErrorLogs)
console.log(`## Invalid column IDs mapped; skip`);
logger.log(`## Invalid column IDs mapped; skip`);
updateMigrationSkipLog(
srcTableSchema.title,
@ -1514,7 +1521,7 @@ export class AtImportProcessor {
req: {},
});
} catch (e) {
console.log(e);
logger.log(e);
}
rec[key] = JSON.stringify(tempArr);
@ -1841,7 +1848,7 @@ export class AtImportProcessor {
// TODO enable after fixing user invite role issue
// logWarning(e.message);
} else {
console.log(e);
logger.log(e);
}
}),
);
@ -2052,7 +2059,7 @@ export class AtImportProcessor {
const datatype = colSchema.uidt;
const ncFilters = [];
// console.log(filter)
// logger.log(filter)
if (datatype === UITypes.Links) {
// skip filters for links; Link filters in NocoDB are only rollup counts
// where-as in airtable, filter can be textual
@ -2457,12 +2464,13 @@ export class AtImportProcessor {
sourceId: syncDB.sourceId,
roles: { ...userRole, owner: true },
});
const source = await Source.get(syncDB.sourceId);
recordPerfStats(_perfStart, 'base.tableList');
logBasic('Reading Records...');
const recordsMap = {};
for (let i = 0; i < ncTblList.list.length; i++) {
// not a migrated table, skip
if (
@ -2479,59 +2487,29 @@ export class AtImportProcessor {
});
recordPerfStats(_perfStart, 'dbTable.read');
recordsMap[ncTbl.id] = await importData({
const importStats = await importData({
baseName: syncDB.baseId,
table: ncTbl,
atBase,
nocoBaseDataProcessing_v2,
sDB: syncDB,
syncDB,
source,
services: {
tableService: this.tablesService,
bulkDataService: this.bulkDataAliasService,
},
logBasic,
logDetailed,
logWarning,
});
rtc.data.records += await recordsMap[ncTbl.id].getCount();
logDetailed(`Data inserted from ${ncTbl.title}`);
}
logBasic('Configuring Record Links...');
for (let i = 0; i < ncTblList.list.length; i++) {
// not a migrated table, skip
if (
undefined ===
aTblSchema.find((x) => x.name === ncTblList.list[i].title)
)
continue;
// const ncTbl = await api.dbTable.read(ncTblList.list[i].id);
const ncTbl: any =
await this.tablesService.getTableWithAccessibleViews({
tableId: ncTblList.list[i].id,
user: { ...syncDB.user, base_roles: { owner: true } },
});
rtc.data.nestedLinks += await importLTARData({
table: ncTbl,
baseName: syncDB.baseId,
atBase,
fields: null, //Object.values(tblLinkGroup).flat(),
insertedAssocRef,
records: recordsMap[ncTbl.id],
atNcAliasRef,
ncLinkMappingTable,
syncDB,
services: {
tableService: this.tablesService,
bulkDataService: this.bulkDataAliasService,
},
logBasic,
logDetailed,
logWarning,
});
rtc.data.records += importStats.importedCount;
rtc.data.nestedLinks += importStats.nestedLinkCount;
logDetailed(`Data inserted from ${ncTbl.title}`);
}
} catch (error) {
logBasic(
@ -2559,7 +2537,7 @@ export class AtImportProcessor {
email: syncDB.user.email,
data: { error: e.message },
});
console.log(e);
logger.log(e);
throw new Error(e.message);
}
throw e;

24
packages/nocodb/src/modules/jobs/jobs/at-import/helpers/fetchAT.ts

@ -43,7 +43,8 @@ async function initialize(shareId, appId?: string) {
}
return response.data;
})
.catch(() => {
.catch((e) => {
console.log(e);
throw {
message:
'Invalid Shared Base ID :: Ensure www.airtable.com/<SharedBaseID> is accessible. Refer https://bit.ly/3x0OdXI for details',
@ -68,6 +69,11 @@ async function initialize(shareId, appId?: string) {
hreq.match(/(?<=fetch\(")(\\.*)(?=")/g)[0].trim(),
);
info.link = info.link.replace(
'%22mayExcludeCellDataForLargeViews%22%3Afalse',
'%22mayExcludeCellDataForLargeViews%22%3Atrue',
);
info.baseInfo = decodeURIComponent(info.link)
.match(/{(.*)}/g)[0]
.split('&')
@ -128,7 +134,8 @@ async function read() {
.then((response) => {
return response.data;
})
.catch(() => {
.catch((e) => {
console.log(e);
throw {
message:
'Error Reading :: Ensure www.airtable.com/<SharedBaseID> is accessible. Refer https://bit.ly/3x0OdXI for details',
@ -151,7 +158,12 @@ async function readView(viewId) {
if (info.initialized) {
const resreq = await axios(
`https://airtable.com/v0.3/view/${viewId}/readData?` +
`stringifiedObjectParams=${encodeURIComponent('{}')}&requestId=${
`stringifiedObjectParams=${encodeURIComponent(
JSON.stringify({
mayOnlyIncludeRowAndCellDataForIncludedViews: true,
mayExcludeCellDataForLargeViews: true,
}),
)}&requestId=${
info.baseInfo.requestId
}&accessPolicy=${encodeURIComponent(
JSON.stringify({
@ -189,7 +201,8 @@ async function readView(viewId) {
.then((response) => {
return response.data;
})
.catch(() => {
.catch((e) => {
console.log(e);
throw {
message:
'Error Reading View :: Ensure www.airtable.com/<SharedBaseID> is accessible. Refer https://bit.ly/3x0OdXI for details',
@ -238,7 +251,8 @@ async function readTemplate(templateId) {
.then((response) => {
return response.data;
})
.catch(() => {
.catch((e) => {
console.log(e);
throw {
message:
'Error Fetching :: Ensure www.airtable.com/templates/featured/<TemplateID> is accessible.',

476
packages/nocodb/src/modules/jobs/jobs/at-import/helpers/readAndProcessData.ts

@ -1,20 +1,26 @@
/* eslint-disable no-async-promise-executor */
import { Readable } from 'stream';
import { isLinksOrLTAR, RelationTypes } from 'nocodb-sdk';
import sizeof from 'object-sizeof';
import { Logger } from '@nestjs/common';
import EntityMap from './EntityMap';
import type { BulkDataAliasService } from '../../../../../services/bulk-data-alias.service';
import type { TablesService } from '../../../../../services/tables.service';
// @ts-ignore
import PQueue from 'p-queue';
import type { BulkDataAliasService } from '~/services/bulk-data-alias.service';
import type { TablesService } from '~/services/tables.service';
import type { AirtableBase } from 'airtable/lib/airtable_base';
import type { TableType } from 'nocodb-sdk';
import type { Source } from '~/models';
const logger = new Logger('BaseModelSqlv2');
const logger = new Logger('at-import:readAndProcessData');
const BULK_DATA_BATCH_COUNT = 20; // check size for every 100 records
const BULK_DATA_BATCH_SIZE = 50 * 1024; // in bytes
const BULK_LINK_BATCH_COUNT = 1000; // process 1000 records at a time
const BULK_PARALLEL_PROCESS = 5;
const BULK_DATA_BATCH_COUNT =
+process.env.AT_IMPORT_BULK_DATA_BATCH_COUNT || 10; // check size for every N records
const BULK_DATA_BATCH_SIZE =
+process.env.AT_IMPORT_BULK_DATA_BATCH_SIZE || 20 * 1024; // import N bytes at a time
const BULK_LINK_BATCH_COUNT =
+process.env.AT_IMPORT_BULK_LINK_BATCH_COUNT || 200; // import N links at a time
const BULK_PARALLEL_PROCESS = +process.env.AT_IMPORT_BULK_PARALLEL_PROCESS || 2; // process N records at a time
const STREAM_BUFFER_LIMIT = +process.env.AT_IMPORT_STREAM_BUFFER_LIMIT || 100; // pause reading if we have more than N records to avoid backpressure
const QUEUE_BUFFER_LIMIT = +process.env.AT_IMPORT_QUEUE_BUFFER_LIMIT || 20; // pause streaming if we have more than N records in the queue
interface AirtableImportContext {
bulkDataService: BulkDataAliasService;
@ -25,53 +31,62 @@ async function readAllData({
table,
fields,
atBase,
dataStream,
counter,
logBasic = (_str) => {},
logWarning = (_str) => {},
}: {
table: { title?: string };
fields?;
atBase: AirtableBase;
dataStream: Readable;
counter?: { streamingCounter: number };
logBasic?: (string) => void;
logDetailed?: (string) => void;
logWarning?: (string) => void;
}): Promise<EntityMap> {
}): Promise<boolean> {
return new Promise((resolve) => {
let data = null;
const selectParams: any = {
pageSize: 100,
};
if (fields) selectParams.fields = fields;
let recordCounter = 0;
atBase(table.title)
.select(selectParams)
.eachPage(
async function page(records, fetchNextPage) {
if (!data) {
/*
EntityMap is a sqlite3 table dynamically populated based on json data provided
It is used to store data temporarily and then stream it in bulk to import
This is done to avoid memory issues - heap out of memory - while importing large data
*/
data = new EntityMap();
await data.init();
}
for await (const record of records) {
await data.addRow({ id: record.id, ...record.fields });
let tempCounter = 0;
for (const record of records) {
dataStream.push(
JSON.stringify({ id: record.id, ...record.fields }),
);
counter.streamingCounter++;
recordCounter++;
tempCounter++;
}
const tmpLength = await data.getCount();
logBasic(
`:: Reading '${table.title}' data :: ${Math.max(
1,
tmpLength - records.length,
)} - ${tmpLength}`,
recordCounter - tempCounter,
)} - ${recordCounter}`,
);
// pause reading if we have more than STREAM_BUFFER_LIMIT to avoid backpressure
if (counter && counter.streamingCounter >= STREAM_BUFFER_LIMIT) {
await new Promise((resolve) => {
const interval = setInterval(() => {
if (counter.streamingCounter < STREAM_BUFFER_LIMIT / 2) {
clearInterval(interval);
resolve(true);
}
}, 100);
});
}
// To fetch the next page of records, call `fetchNextPage`.
// If there are more records, `page` will get called again.
// If there are no more records, `done` will get called.
@ -84,7 +99,8 @@ async function readAllData({
`There were errors on reading '${table.title}' data :: ${err}`,
);
}
resolve(data);
dataStream.push(null);
resolve(true);
},
);
});
@ -95,107 +111,154 @@ export async function importData({
table,
atBase,
nocoBaseDataProcessing_v2,
sDB,
syncDB,
source,
logBasic = (_str) => {},
logDetailed = (_str) => {},
logWarning = (_str) => {},
services,
// link related props start
insertedAssocRef = {},
atNcAliasRef,
ncLinkMappingTable,
}: {
baseName: string;
table: { title?: string; id?: string };
fields?;
atBase: AirtableBase;
source: Source;
logBasic: (string) => void;
logDetailed: (string) => void;
logWarning: (string) => void;
nocoBaseDataProcessing_v2;
sDB;
// link related props start
insertedAssocRef: { [assocTableId: string]: boolean };
atNcAliasRef: {
[ncTableId: string]: {
[ncTitle: string]: string;
};
};
ncLinkMappingTable: Record<string, Record<string, any>>[];
// link related props end
syncDB;
services: AirtableImportContext;
}): Promise<EntityMap> {
}): Promise<{
nestedLinkCount: number;
importedCount: number;
}> {
try {
// returns EntityMap which allows us to stream data
const records: EntityMap = await readAllData({
const counter = {
streamingCounter: 0,
};
const dataStream = new Readable({
read() {},
});
dataStream.pause();
readAllData({
table,
atBase,
logDetailed,
dataStream,
counter,
logBasic,
logDetailed,
logWarning,
}).catch((e) => {
logWarning(`There were errors on reading '${table.title}' data :: ${e}`);
});
await new Promise(async (resolve) => {
const readable = records.getStream();
const allRecordsCount = await records.getCount();
const promises = [];
return new Promise(async (resolve) => {
const queue = new PQueue({ concurrency: BULK_PARALLEL_PROCESS });
const ltarPromise = importLTARData({
table,
baseName,
insertedAssocRef,
dataStream,
atNcAliasRef,
ncLinkMappingTable,
syncDB,
source,
services,
queue,
logBasic,
logDetailed,
logWarning,
}).catch((e) => {
logWarning(
`There were errors on importing '${table.title}' LTAR data :: ${e}`,
);
});
let tempData = [];
let importedCount = 0;
let nestedLinkCount = 0;
let tempCount = 0;
// we keep track of active process to pause and resume the stream as we have async calls within the stream and we don't want to load all data in memory
let activeProcess = 0;
readable.on('data', async (record) => {
promises.push(
new Promise(async (resolve) => {
try {
activeProcess++;
if (activeProcess >= BULK_PARALLEL_PROCESS) readable.pause();
const { id: rid, ...fields } = record;
const r = await nocoBaseDataProcessing_v2(sDB, table, {
id: rid,
fields,
});
tempData.push(r);
tempCount++;
if (tempCount >= BULK_DATA_BATCH_COUNT) {
if (sizeof(tempData) >= BULK_DATA_BATCH_SIZE) {
readable.pause();
let insertArray = tempData.splice(0, tempData.length);
await services.bulkDataService.bulkDataInsert({
baseName,
tableName: table.id,
body: insertArray,
cookie: {},
skip_hooks: true,
});
logBasic(
`:: Importing '${
table.title
}' data :: ${importedCount} - ${Math.min(
importedCount + insertArray.length,
allRecordsCount,
)}`,
);
dataStream.on('data', async (record) => {
counter.streamingCounter--;
record = JSON.parse(record);
queue.add(
() =>
new Promise(async (resolve) => {
try {
const { id: rid, ...fields } = record;
const r = await nocoBaseDataProcessing_v2(syncDB, table, {
id: rid,
fields,
});
tempData.push(r);
tempCount++;
if (tempCount >= BULK_DATA_BATCH_COUNT) {
if (sizeof(tempData) >= BULK_DATA_BATCH_SIZE) {
let insertArray = tempData.splice(0, tempData.length);
await services.bulkDataService.bulkDataInsert({
baseName,
tableName: table.id,
body: insertArray,
cookie: {},
skip_hooks: true,
foreign_key_checks: !!source.isMeta(),
});
logBasic(
`:: Importing '${
table.title
}' data :: ${importedCount} - ${
importedCount + insertArray.length
}`,
);
importedCount += insertArray.length;
insertArray = [];
}
tempCount = 0;
}
importedCount += insertArray.length;
insertArray = [];
if (queue.size < QUEUE_BUFFER_LIMIT / 2) dataStream.resume();
readable.resume();
}
tempCount = 0;
resolve(true);
} catch (e) {
logger.error(e);
logWarning(
`There were errors on importing '${table.title}' data :: ${e}`,
);
if (queue.size < QUEUE_BUFFER_LIMIT / 2) dataStream.resume();
resolve(true);
}
activeProcess--;
if (activeProcess < BULK_PARALLEL_PROCESS) readable.resume();
resolve(true);
} catch (e) {
logger.error(e);
logWarning(
`There were errors on importing '${table.title}' data :: ${e}`,
);
readable.resume();
resolve(true);
}
}),
}),
);
if (queue.size >= QUEUE_BUFFER_LIMIT) dataStream.pause();
});
readable.on('end', async () => {
dataStream.on('end', async () => {
try {
// ensure all chunks are processed
await Promise.all(promises);
await queue.onIdle();
// insert remaining data
if (tempData.length > 0) {
@ -205,31 +268,36 @@ export async function importData({
body: tempData,
cookie: {},
skip_hooks: true,
foreign_key_checks: !!source.isMeta(),
});
logBasic(
`:: Importing '${
table.title
}' data :: ${importedCount} - ${Math.min(
importedCount + tempData.length,
allRecordsCount,
)}`,
`:: Importing '${table.title}' data :: ${importedCount} - ${
importedCount + tempData.length
}`,
);
importedCount += tempData.length;
tempData = [];
}
resolve(true);
nestedLinkCount = (await ltarPromise) as number;
resolve({
importedCount,
nestedLinkCount,
});
} catch (e) {
logger.error(e);
logWarning(
`There were errors on importing '${table.title}' data :: ${e}`,
);
resolve(true);
resolve({
importedCount,
nestedLinkCount,
});
}
});
});
return records;
} catch (e) {
throw e;
}
@ -237,25 +305,22 @@ export async function importData({
export async function importLTARData({
table,
fields,
atBase,
baseName,
insertedAssocRef = {},
records,
dataStream,
atNcAliasRef,
ncLinkMappingTable,
syncDB,
source,
services,
queue,
logBasic = (_str) => {},
logDetailed = (_str) => {},
logWarning = (_str) => {},
}: {
baseName: string;
table: { title?: string; id?: string };
fields;
atBase: AirtableBase;
insertedAssocRef: { [assocTableId: string]: boolean };
records?: EntityMap;
dataStream?: Readable;
atNcAliasRef: {
[ncTableId: string]: {
[ncTitle: string]: string;
@ -263,26 +328,19 @@ export async function importLTARData({
};
ncLinkMappingTable: Record<string, Record<string, any>>[];
syncDB;
source: Source;
services: AirtableImportContext;
queue: PQueue;
logBasic: (string) => void;
logDetailed: (string) => void;
logWarning: (string) => void;
}) {
}): Promise<number> {
const assocTableMetas: Array<{
modelMeta: { id?: string; title?: string };
colMeta: { title?: string };
curCol: { title?: string };
refCol: { title?: string };
}> = [];
const allData: EntityMap =
records ||
(await readAllData({
table,
fields,
atBase,
logDetailed,
logBasic,
}));
const modelMeta: any =
await services.tableService.getTableWithAccessibleViews({
@ -329,108 +387,116 @@ export async function importLTARData({
let nestedLinkCnt = 0;
let importedCount = 0;
let assocTableData = [];
// Iterate over all related M2M associative table
for (const assocMeta of assocTableMetas) {
// extract link data from records
await new Promise((resolve) => {
const promises = [];
const readable = allData.getStream();
readable.on('data', async (record) => {
promises.push(
new Promise(async (resolve) => {
try {
const { id: _atId, ...rec } = record;
// todo: use actual alias instead of sanitized
assocTableData.push(
...(
rec?.[atNcAliasRef[table.id][assocMeta.colMeta.title]] || []
).map((id) => ({
[assocMeta.curCol.title]: record.id,
[assocMeta.refCol.title]: id,
})),
);
if (assocTableData.length >= BULK_LINK_BATCH_COUNT) {
readable.pause();
let insertArray = assocTableData.splice(
0,
assocTableData.length,
const assocTableData = {};
// extract link data from records
return new Promise((resolve, reject) => {
dataStream.on('data', async (record) => {
record = JSON.parse(record);
// Iterate over all related M2M associative table
for (const assocMeta of assocTableMetas) {
if (!assocTableData[assocMeta.modelMeta.id]) {
assocTableData[assocMeta.modelMeta.id] = [];
}
queue.add(
() =>
new Promise(async (resolve) => {
try {
const { id: _atId, ...rec } = record;
// todo: use actual alias instead of sanitized
assocTableData[assocMeta.modelMeta.id].push(
...(
rec?.[atNcAliasRef[table.id][assocMeta.colMeta.title]] || []
).map((id) => ({
[assocMeta.curCol.title]: record.id,
[assocMeta.refCol.title]: id,
})),
);
logBasic(
`:: Importing '${
table.title
}' LTAR data :: ${importedCount} - ${
importedCount + insertArray.length
}`,
);
if (
assocTableData[assocMeta.modelMeta.id].length >=
BULK_LINK_BATCH_COUNT
) {
let insertArray = assocTableData[
assocMeta.modelMeta.id
].splice(0, assocTableData[assocMeta.modelMeta.id].length);
await services.bulkDataService.bulkDataInsert({
baseName,
tableName: assocMeta.modelMeta.id,
body: insertArray,
cookie: {},
skip_hooks: true,
});
const lastImportedCount = importedCount;
importedCount += insertArray.length;
logBasic(
`:: Importing '${
table.title
}' LTAR data :: ${lastImportedCount} - ${
lastImportedCount + insertArray.length
}`,
);
await services.bulkDataService.bulkDataInsert({
baseName,
tableName: assocMeta.modelMeta.id,
body: insertArray,
cookie: {},
skip_hooks: true,
foreign_key_checks: !!source.isMeta(),
});
importedCount += insertArray.length;
insertArray = [];
insertArray = [];
}
readable.resume();
if (queue.size < QUEUE_BUFFER_LIMIT / 2) dataStream.resume();
resolve(true);
} catch (e) {
logger.error(e);
logWarning(
`There were errors on importing '${table.title}' LTAR data :: ${e}`,
);
if (queue.size < QUEUE_BUFFER_LIMIT / 2) dataStream.resume();
resolve(true);
}
resolve(true);
} catch (e) {
logger.error(e);
logWarning(
`There were errors on importing '${table.title}' LTAR data :: ${e}`,
);
readable.resume();
resolve(true);
}
}),
}),
);
});
readable.on('end', async () => {
try {
// ensure all chunks are processed
await Promise.all(promises);
}
if (queue.size >= QUEUE_BUFFER_LIMIT) dataStream.pause();
});
dataStream.on('end', async () => {
try {
// ensure all chunks are processed
await queue.onIdle();
for (const assocMeta of assocTableMetas) {
// insert remaining data
if (assocTableData.length >= 0) {
if (assocTableData[assocMeta.modelMeta.id].length >= 0) {
logBasic(
`:: Importing '${table.title}' LTAR data :: ${importedCount} - ${
importedCount + assocTableData.length
importedCount + assocTableData[assocMeta.modelMeta.id].length
}`,
);
await services.bulkDataService.bulkDataInsert({
baseName,
tableName: assocMeta.modelMeta.id,
body: assocTableData,
body: assocTableData[assocMeta.modelMeta.id],
cookie: {},
skip_hooks: true,
foreign_key_checks: !!source.isMeta(),
});
importedCount += assocTableData.length;
assocTableData = [];
importedCount += assocTableData[assocMeta.modelMeta.id].length;
assocTableData[assocMeta.modelMeta.id] = [];
}
}
nestedLinkCnt += importedCount;
nestedLinkCnt += importedCount;
resolve(true);
} catch (e) {
logger.error(e);
logWarning(
`There were errors on importing '${table.title}' LTAR data :: ${e}`,
);
resolve(true);
}
});
resolve(nestedLinkCnt);
} catch (e) {
reject(e);
}
});
}
return nestedLinkCnt;
// resume the stream after all listeners are attached
dataStream.resume();
});
}

65
packages/nocodb/src/plugins/s3/S3.ts

@ -56,27 +56,23 @@ export default class S3 implements IStorageAdapterV2 {
const uploadParams: any = {
...this.defaultParams,
};
return new Promise((resolve, reject) => {
axios
.get(url, {
httpAgent: useAgent(url, { stopPortScanningByUrlRedirection: true }),
httpsAgent: useAgent(url, { stopPortScanningByUrlRedirection: true }),
responseType: 'stream',
})
.then((response) => {
uploadParams.Body = response.data;
uploadParams.Key = key;
uploadParams.ContentType = response.headers['content-type'];
// call S3 to retrieve upload file to specified bucket
this.upload(uploadParams).then((data) => {
resolve(data);
});
})
.catch((error) => {
reject(error);
});
});
try {
const response = await axios.get(url, {
httpAgent: useAgent(url, { stopPortScanningByUrlRedirection: true }),
httpsAgent: useAgent(url, { stopPortScanningByUrlRedirection: true }),
responseType: 'stream',
});
uploadParams.Body = response.data;
uploadParams.Key = key;
uploadParams.ContentType = response.headers['content-type'];
const data = await this.upload(uploadParams);
return data;
} catch (error) {
throw error;
}
}
// TODO - implement
@ -161,26 +157,23 @@ export default class S3 implements IStorageAdapterV2 {
}
private async upload(uploadParams): Promise<any> {
return new Promise((resolve, reject) => {
try {
// call S3 to retrieve upload file to specified bucket
const upload = new Upload({
client: this.s3Client,
params: { ...this.defaultParams, ...uploadParams },
});
upload
.done()
.then((data) => {
if (data) {
resolve(
`https://${this.input.bucket}.s3.${this.input.region}.amazonaws.com/${uploadParams.Key}`,
);
}
})
.catch((err) => {
console.error(err);
reject(err);
});
});
const data = await upload.done();
if (data) {
return `https://${this.input.bucket}.s3.${this.input.region}.amazonaws.com/${uploadParams.Key}`;
} else {
throw new Error('Upload failed or no data returned.');
}
} catch (error) {
console.error(error);
throw error;
}
}
}

189
packages/nocodb/src/services/attachments.service.ts

@ -1,8 +1,9 @@
import path from 'path';
import { AppEvents } from 'nocodb-sdk';
import { Injectable } from '@nestjs/common';
import { Injectable, Logger } from '@nestjs/common';
import { nanoid } from 'nanoid';
import slash from 'slash';
import PQueue from 'p-queue';
import type { AttachmentReqType, FileType } from 'nocodb-sdk';
import type { NcRequest } from '~/interface/config';
import { AppHooksService } from '~/services/app-hooks/app-hooks.service';
@ -14,6 +15,8 @@ import { utf8ify } from '~/helpers/stringHelpers';
@Injectable()
export class AttachmentsService {
protected logger = new Logger(AttachmentsService.name);
constructor(private readonly appHooksService: AppHooksService) {}
async upload(param: { path?: string; files: FileType[]; req: NcRequest }) {
@ -25,63 +28,78 @@ export class AttachmentsService {
const storageAdapter = await NcPluginMgrv2.storageAdapter();
const attachments = await Promise.all(
param.files?.map(async (file) => {
const originalName = utf8ify(file.originalname);
const fileName = `${nanoid(18)}${path.extname(originalName)}`;
const url = await storageAdapter.fileCreate(
slash(path.join(destPath, fileName)),
file,
);
const attachment: {
url?: string;
path?: string;
title: string;
mimetype: string;
size: number;
icon?: string;
signedPath?: string;
signedUrl?: string;
} = {
...(url ? { url } : {}),
title: originalName,
mimetype: file.mimetype,
size: file.size,
icon: mimeIcons[path.extname(originalName).slice(1)] || undefined,
};
const promises = [];
// if `url` is null, then it is local attachment
if (!url) {
// then store the attachment path only
// url will be constructed in `useAttachmentCell`
attachment.path = `download/${filePath.join('/')}/${fileName}`;
promises.push(
PresignedUrl.getSignedUrl({
path: attachment.path.replace(/^download\//, ''),
}).then((r) => (attachment.signedPath = r)),
// just in case we want to increase concurrency in future
const queue = new PQueue({ concurrency: 1 });
const attachments = [];
const errors = [];
queue.addAll(
param.files?.map((file) => async () => {
try {
const originalName = utf8ify(file.originalname);
const fileName = `${nanoid(18)}${path.extname(originalName)}`;
const url = await storageAdapter.fileCreate(
slash(path.join(destPath, fileName)),
file,
);
} else {
if (attachment.url.includes('.amazonaws.com/')) {
const relativePath = decodeURI(
attachment.url.split('.amazonaws.com/')[1],
);
promises.push(
PresignedUrl.getSignedUrl({
const attachment: {
url?: string;
path?: string;
title: string;
mimetype: string;
size: number;
icon?: string;
signedPath?: string;
signedUrl?: string;
} = {
...(url ? { url } : {}),
title: originalName,
mimetype: file.mimetype,
size: file.size,
icon: mimeIcons[path.extname(originalName).slice(1)] || undefined,
};
// if `url` is null, then it is local attachment
if (!url) {
// then store the attachment path only
// url will be constructed in `useAttachmentCell`
attachment.path = `download/${filePath.join('/')}/${fileName}`;
attachment.signedPath = await PresignedUrl.getSignedUrl({
path: attachment.path.replace(/^download\//, ''),
});
} else {
if (attachment.url.includes('.amazonaws.com/')) {
const relativePath = decodeURI(
attachment.url.split('.amazonaws.com/')[1],
);
attachment.signedUrl = await PresignedUrl.getSignedUrl({
path: relativePath,
s3: true,
}).then((r) => (attachment.signedUrl = r)),
);
});
}
}
}
return Promise.all(promises).then(() => attachment);
attachments.push(attachment);
} catch (e) {
errors.push(e);
}
}),
);
await queue.onIdle();
if (errors.length) {
for (const error of errors) {
this.logger.error(error);
}
throw errors[0];
}
this.appHooksService.emit(AppEvents.ATTACHMENT_UPLOAD, {
type: 'file',
req: param.req,
@ -103,40 +121,59 @@ export class AttachmentsService {
const storageAdapter = await NcPluginMgrv2.storageAdapter();
const attachments = await Promise.all(
param.urls?.map?.(async (urlMeta) => {
const { url, fileName: _fileName } = urlMeta;
// just in case we want to increase concurrency in future
const queue = new PQueue({ concurrency: 1 });
const fileName = `${nanoid(18)}${path.extname(
_fileName || url.split('/').pop(),
)}`;
const attachments = [];
const errors = [];
const attachmentUrl: string | null =
await storageAdapter.fileCreateByUrl(
slash(path.join(destPath, fileName)),
url,
);
queue.addAll(
param.urls?.map?.((urlMeta) => async () => {
try {
const { url, fileName: _fileName } = urlMeta;
let attachmentPath: string | undefined;
const fileName = `${nanoid(18)}${path.extname(
_fileName || url.split('/').pop(),
)}`;
// if `attachmentUrl` is null, then it is local attachment
if (!attachmentUrl) {
// then store the attachment path only
// url will be constructed in `useAttachmentCell`
attachmentPath = `download/${filePath.join('/')}/${fileName}`;
}
const attachmentUrl: string | null =
await storageAdapter.fileCreateByUrl(
slash(path.join(destPath, fileName)),
url,
);
let attachmentPath: string | undefined;
return {
...(attachmentUrl ? { url: attachmentUrl } : {}),
...(attachmentPath ? { path: attachmentPath } : {}),
title: _fileName,
mimetype: urlMeta.mimetype,
size: urlMeta.size,
icon: mimeIcons[path.extname(fileName).slice(1)] || undefined,
};
// if `attachmentUrl` is null, then it is local attachment
if (!attachmentUrl) {
// then store the attachment path only
// url will be constructed in `useAttachmentCell`
attachmentPath = `download/${filePath.join('/')}/${fileName}`;
}
attachments.push({
...(attachmentUrl ? { url: attachmentUrl } : {}),
...(attachmentPath ? { path: attachmentPath } : {}),
title: _fileName,
mimetype: urlMeta.mimetype,
size: urlMeta.size,
icon: mimeIcons[path.extname(fileName).slice(1)] || undefined,
});
} catch (e) {
errors.push(e);
}
}),
);
await queue.onIdle();
if (errors.length) {
for (const error of errors) {
this.logger.error(error);
}
throw errors[0];
}
this.appHooksService.emit(AppEvents.ATTACHMENT_UPLOAD, {
type: 'url',
req: param.req,

Loading…
Cancel
Save