diff --git a/packages/nocodb/src/modules/jobs/jobs/at-import/at-import.processor.ts b/packages/nocodb/src/modules/jobs/jobs/at-import/at-import.processor.ts index 73924d3ac3..dc88f80232 100644 --- a/packages/nocodb/src/modules/jobs/jobs/at-import/at-import.processor.ts +++ b/packages/nocodb/src/modules/jobs/jobs/at-import/at-import.processor.ts @@ -9,12 +9,13 @@ import { Process, Processor } from '@nestjs/bull'; import { Job } from 'bull'; import { isLinksOrLTAR } from 'nocodb-sdk'; import debug from 'debug'; +import { Logger } from '@nestjs/common'; import { JobsLogService } from '../jobs-log.service'; import FetchAT from './helpers/fetchAT'; -import { importData, importLTARData } from './helpers/readAndProcessData'; +import { importData } from './helpers/readAndProcessData'; import EntityMap from './helpers/EntityMap'; import type { UserType } from 'nocodb-sdk'; -import type { Base } from '~/models'; +import { type Base, Source } from '~/models'; import { sanitizeColumnName } from '~/helpers'; import { AttachmentsService } from '~/services/attachments.service'; import { ColumnsService } from '~/services/columns.service'; @@ -34,6 +35,8 @@ import { JOBS_QUEUE, JobTypes } from '~/interface/Jobs'; import { GridColumnsService } from '~/services/grid-columns.service'; import { TelemetryService } from '~/services/telemetry.service'; +const logger = new Logger('at-import'); + dayjs.extend(utc); const selectColors = { @@ -221,7 +224,11 @@ export class AtImportProcessor { rtc.migrationSkipLog.log.push( `tn[${tbl}] cn[${col}] type[${type}] :: ${reason}`, ); - logWarning(`Skipped ${tbl} :: ${col} (${type}) :: ${reason}`); + logWarning( + `Skipped${tbl ? ` ${tbl} :: ` : ``}${col ? `${col}` : ``}${ + type ? ` (${type})` : `` + } :: ${reason}`, + ); }; // mapping table @@ -937,7 +944,7 @@ export class AtImportProcessor { ?.length ) { if (enableErrorLogs) - console.log(`## Invalid column IDs mapped; skip`); + logger.log(`## Invalid column IDs mapped; skip`); updateMigrationSkipLog( srcTableSchema.title, @@ -1017,7 +1024,7 @@ export class AtImportProcessor { ); } if (enableErrorLogs) - console.log( + logger.log( `## Failed to configure ${nestedLookupTbl.length} lookups`, ); break; @@ -1156,7 +1163,7 @@ export class AtImportProcessor { ?.length ) { if (enableErrorLogs) - console.log(`## Invalid column IDs mapped; skip`); + logger.log(`## Invalid column IDs mapped; skip`); updateMigrationSkipLog( srcTableSchema.title, @@ -1514,7 +1521,7 @@ export class AtImportProcessor { req: {}, }); } catch (e) { - console.log(e); + logger.log(e); } rec[key] = JSON.stringify(tempArr); @@ -1841,7 +1848,7 @@ export class AtImportProcessor { // TODO enable after fixing user invite role issue // logWarning(e.message); } else { - console.log(e); + logger.log(e); } }), ); @@ -2052,7 +2059,7 @@ export class AtImportProcessor { const datatype = colSchema.uidt; const ncFilters = []; - // console.log(filter) + // logger.log(filter) if (datatype === UITypes.Links) { // skip filters for links; Link filters in NocoDB are only rollup counts // where-as in airtable, filter can be textual @@ -2457,12 +2464,13 @@ export class AtImportProcessor { sourceId: syncDB.sourceId, roles: { ...userRole, owner: true }, }); + + const source = await Source.get(syncDB.sourceId); + recordPerfStats(_perfStart, 'base.tableList'); logBasic('Reading Records...'); - const recordsMap = {}; - for (let i = 0; i < ncTblList.list.length; i++) { // not a migrated table, skip if ( @@ -2479,59 +2487,29 @@ export class AtImportProcessor { }); recordPerfStats(_perfStart, 'dbTable.read'); - recordsMap[ncTbl.id] = await importData({ + const importStats = await importData({ baseName: syncDB.baseId, table: ncTbl, atBase, nocoBaseDataProcessing_v2, - sDB: syncDB, + syncDB, + source, services: { tableService: this.tablesService, bulkDataService: this.bulkDataAliasService, }, - logBasic, - logDetailed, - logWarning, - }); - rtc.data.records += await recordsMap[ncTbl.id].getCount(); - - logDetailed(`Data inserted from ${ncTbl.title}`); - } - - logBasic('Configuring Record Links...'); - for (let i = 0; i < ncTblList.list.length; i++) { - // not a migrated table, skip - if ( - undefined === - aTblSchema.find((x) => x.name === ncTblList.list[i].title) - ) - continue; - - // const ncTbl = await api.dbTable.read(ncTblList.list[i].id); - const ncTbl: any = - await this.tablesService.getTableWithAccessibleViews({ - tableId: ncTblList.list[i].id, - user: { ...syncDB.user, base_roles: { owner: true } }, - }); - - rtc.data.nestedLinks += await importLTARData({ - table: ncTbl, - baseName: syncDB.baseId, - atBase, fields: null, //Object.values(tblLinkGroup).flat(), insertedAssocRef, - records: recordsMap[ncTbl.id], atNcAliasRef, ncLinkMappingTable, - syncDB, - services: { - tableService: this.tablesService, - bulkDataService: this.bulkDataAliasService, - }, logBasic, logDetailed, logWarning, }); + rtc.data.records += importStats.importedCount; + rtc.data.nestedLinks += importStats.nestedLinkCount; + + logDetailed(`Data inserted from ${ncTbl.title}`); } } catch (error) { logBasic( @@ -2559,7 +2537,7 @@ export class AtImportProcessor { email: syncDB.user.email, data: { error: e.message }, }); - console.log(e); + logger.log(e); throw new Error(e.message); } throw e; diff --git a/packages/nocodb/src/modules/jobs/jobs/at-import/helpers/fetchAT.ts b/packages/nocodb/src/modules/jobs/jobs/at-import/helpers/fetchAT.ts index 471dd9eae8..e6144eca0c 100644 --- a/packages/nocodb/src/modules/jobs/jobs/at-import/helpers/fetchAT.ts +++ b/packages/nocodb/src/modules/jobs/jobs/at-import/helpers/fetchAT.ts @@ -43,7 +43,8 @@ async function initialize(shareId, appId?: string) { } return response.data; }) - .catch(() => { + .catch((e) => { + console.log(e); throw { message: 'Invalid Shared Base ID :: Ensure www.airtable.com/ is accessible. Refer https://bit.ly/3x0OdXI for details', @@ -68,6 +69,11 @@ async function initialize(shareId, appId?: string) { hreq.match(/(?<=fetch\(")(\\.*)(?=")/g)[0].trim(), ); + info.link = info.link.replace( + '%22mayExcludeCellDataForLargeViews%22%3Afalse', + '%22mayExcludeCellDataForLargeViews%22%3Atrue', + ); + info.baseInfo = decodeURIComponent(info.link) .match(/{(.*)}/g)[0] .split('&') @@ -128,7 +134,8 @@ async function read() { .then((response) => { return response.data; }) - .catch(() => { + .catch((e) => { + console.log(e); throw { message: 'Error Reading :: Ensure www.airtable.com/ is accessible. Refer https://bit.ly/3x0OdXI for details', @@ -151,7 +158,12 @@ async function readView(viewId) { if (info.initialized) { const resreq = await axios( `https://airtable.com/v0.3/view/${viewId}/readData?` + - `stringifiedObjectParams=${encodeURIComponent('{}')}&requestId=${ + `stringifiedObjectParams=${encodeURIComponent( + JSON.stringify({ + mayOnlyIncludeRowAndCellDataForIncludedViews: true, + mayExcludeCellDataForLargeViews: true, + }), + )}&requestId=${ info.baseInfo.requestId }&accessPolicy=${encodeURIComponent( JSON.stringify({ @@ -189,7 +201,8 @@ async function readView(viewId) { .then((response) => { return response.data; }) - .catch(() => { + .catch((e) => { + console.log(e); throw { message: 'Error Reading View :: Ensure www.airtable.com/ is accessible. Refer https://bit.ly/3x0OdXI for details', @@ -238,7 +251,8 @@ async function readTemplate(templateId) { .then((response) => { return response.data; }) - .catch(() => { + .catch((e) => { + console.log(e); throw { message: 'Error Fetching :: Ensure www.airtable.com/templates/featured/ is accessible.', diff --git a/packages/nocodb/src/modules/jobs/jobs/at-import/helpers/readAndProcessData.ts b/packages/nocodb/src/modules/jobs/jobs/at-import/helpers/readAndProcessData.ts index 41bce5e203..4e697ac531 100644 --- a/packages/nocodb/src/modules/jobs/jobs/at-import/helpers/readAndProcessData.ts +++ b/packages/nocodb/src/modules/jobs/jobs/at-import/helpers/readAndProcessData.ts @@ -1,20 +1,26 @@ /* eslint-disable no-async-promise-executor */ +import { Readable } from 'stream'; import { isLinksOrLTAR, RelationTypes } from 'nocodb-sdk'; import sizeof from 'object-sizeof'; import { Logger } from '@nestjs/common'; -import EntityMap from './EntityMap'; -import type { BulkDataAliasService } from '../../../../../services/bulk-data-alias.service'; -import type { TablesService } from '../../../../../services/tables.service'; -// @ts-ignore +import PQueue from 'p-queue'; +import type { BulkDataAliasService } from '~/services/bulk-data-alias.service'; +import type { TablesService } from '~/services/tables.service'; import type { AirtableBase } from 'airtable/lib/airtable_base'; import type { TableType } from 'nocodb-sdk'; +import type { Source } from '~/models'; -const logger = new Logger('BaseModelSqlv2'); +const logger = new Logger('at-import:readAndProcessData'); -const BULK_DATA_BATCH_COUNT = 20; // check size for every 100 records -const BULK_DATA_BATCH_SIZE = 50 * 1024; // in bytes -const BULK_LINK_BATCH_COUNT = 1000; // process 1000 records at a time -const BULK_PARALLEL_PROCESS = 5; +const BULK_DATA_BATCH_COUNT = + +process.env.AT_IMPORT_BULK_DATA_BATCH_COUNT || 10; // check size for every N records +const BULK_DATA_BATCH_SIZE = + +process.env.AT_IMPORT_BULK_DATA_BATCH_SIZE || 20 * 1024; // import N bytes at a time +const BULK_LINK_BATCH_COUNT = + +process.env.AT_IMPORT_BULK_LINK_BATCH_COUNT || 200; // import N links at a time +const BULK_PARALLEL_PROCESS = +process.env.AT_IMPORT_BULK_PARALLEL_PROCESS || 2; // process N records at a time +const STREAM_BUFFER_LIMIT = +process.env.AT_IMPORT_STREAM_BUFFER_LIMIT || 100; // pause reading if we have more than N records to avoid backpressure +const QUEUE_BUFFER_LIMIT = +process.env.AT_IMPORT_QUEUE_BUFFER_LIMIT || 20; // pause streaming if we have more than N records in the queue interface AirtableImportContext { bulkDataService: BulkDataAliasService; @@ -25,53 +31,62 @@ async function readAllData({ table, fields, atBase, + dataStream, + counter, logBasic = (_str) => {}, logWarning = (_str) => {}, }: { table: { title?: string }; fields?; atBase: AirtableBase; + dataStream: Readable; + counter?: { streamingCounter: number }; logBasic?: (string) => void; logDetailed?: (string) => void; logWarning?: (string) => void; -}): Promise { +}): Promise { return new Promise((resolve) => { - let data = null; - const selectParams: any = { pageSize: 100, }; if (fields) selectParams.fields = fields; + let recordCounter = 0; + atBase(table.title) .select(selectParams) .eachPage( async function page(records, fetchNextPage) { - if (!data) { - /* - EntityMap is a sqlite3 table dynamically populated based on json data provided - It is used to store data temporarily and then stream it in bulk to import - - This is done to avoid memory issues - heap out of memory - while importing large data - */ - data = new EntityMap(); - await data.init(); - } - - for await (const record of records) { - await data.addRow({ id: record.id, ...record.fields }); + let tempCounter = 0; + for (const record of records) { + dataStream.push( + JSON.stringify({ id: record.id, ...record.fields }), + ); + counter.streamingCounter++; + recordCounter++; + tempCounter++; } - const tmpLength = await data.getCount(); - logBasic( `:: Reading '${table.title}' data :: ${Math.max( 1, - tmpLength - records.length, - )} - ${tmpLength}`, + recordCounter - tempCounter, + )} - ${recordCounter}`, ); + // pause reading if we have more than STREAM_BUFFER_LIMIT to avoid backpressure + if (counter && counter.streamingCounter >= STREAM_BUFFER_LIMIT) { + await new Promise((resolve) => { + const interval = setInterval(() => { + if (counter.streamingCounter < STREAM_BUFFER_LIMIT / 2) { + clearInterval(interval); + resolve(true); + } + }, 100); + }); + } + // To fetch the next page of records, call `fetchNextPage`. // If there are more records, `page` will get called again. // If there are no more records, `done` will get called. @@ -84,7 +99,8 @@ async function readAllData({ `There were errors on reading '${table.title}' data :: ${err}`, ); } - resolve(data); + dataStream.push(null); + resolve(true); }, ); }); @@ -95,107 +111,154 @@ export async function importData({ table, atBase, nocoBaseDataProcessing_v2, - sDB, + syncDB, + source, logBasic = (_str) => {}, logDetailed = (_str) => {}, logWarning = (_str) => {}, services, + // link related props start + insertedAssocRef = {}, + atNcAliasRef, + ncLinkMappingTable, }: { baseName: string; table: { title?: string; id?: string }; fields?; atBase: AirtableBase; + source: Source; logBasic: (string) => void; logDetailed: (string) => void; logWarning: (string) => void; nocoBaseDataProcessing_v2; - sDB; + // link related props start + insertedAssocRef: { [assocTableId: string]: boolean }; + atNcAliasRef: { + [ncTableId: string]: { + [ncTitle: string]: string; + }; + }; + ncLinkMappingTable: Record>[]; + // link related props end + syncDB; services: AirtableImportContext; -}): Promise { +}): Promise<{ + nestedLinkCount: number; + importedCount: number; +}> { try { - // returns EntityMap which allows us to stream data - const records: EntityMap = await readAllData({ + const counter = { + streamingCounter: 0, + }; + + const dataStream = new Readable({ + read() {}, + }); + + dataStream.pause(); + + readAllData({ table, atBase, - logDetailed, + dataStream, + counter, logBasic, + logDetailed, + logWarning, + }).catch((e) => { + logWarning(`There were errors on reading '${table.title}' data :: ${e}`); }); - await new Promise(async (resolve) => { - const readable = records.getStream(); - const allRecordsCount = await records.getCount(); - const promises = []; + return new Promise(async (resolve) => { + const queue = new PQueue({ concurrency: BULK_PARALLEL_PROCESS }); + + const ltarPromise = importLTARData({ + table, + baseName, + insertedAssocRef, + dataStream, + atNcAliasRef, + ncLinkMappingTable, + syncDB, + source, + services, + queue, + logBasic, + logDetailed, + logWarning, + }).catch((e) => { + logWarning( + `There were errors on importing '${table.title}' LTAR data :: ${e}`, + ); + }); let tempData = []; let importedCount = 0; + let nestedLinkCount = 0; let tempCount = 0; - // we keep track of active process to pause and resume the stream as we have async calls within the stream and we don't want to load all data in memory - let activeProcess = 0; - - readable.on('data', async (record) => { - promises.push( - new Promise(async (resolve) => { - try { - activeProcess++; - if (activeProcess >= BULK_PARALLEL_PROCESS) readable.pause(); - - const { id: rid, ...fields } = record; - const r = await nocoBaseDataProcessing_v2(sDB, table, { - id: rid, - fields, - }); - tempData.push(r); - tempCount++; - - if (tempCount >= BULK_DATA_BATCH_COUNT) { - if (sizeof(tempData) >= BULK_DATA_BATCH_SIZE) { - readable.pause(); - - let insertArray = tempData.splice(0, tempData.length); - - await services.bulkDataService.bulkDataInsert({ - baseName, - tableName: table.id, - body: insertArray, - cookie: {}, - skip_hooks: true, - }); - - logBasic( - `:: Importing '${ - table.title - }' data :: ${importedCount} - ${Math.min( - importedCount + insertArray.length, - allRecordsCount, - )}`, - ); + dataStream.on('data', async (record) => { + counter.streamingCounter--; + record = JSON.parse(record); + queue.add( + () => + new Promise(async (resolve) => { + try { + const { id: rid, ...fields } = record; + const r = await nocoBaseDataProcessing_v2(syncDB, table, { + id: rid, + fields, + }); + tempData.push(r); + tempCount++; + + if (tempCount >= BULK_DATA_BATCH_COUNT) { + if (sizeof(tempData) >= BULK_DATA_BATCH_SIZE) { + let insertArray = tempData.splice(0, tempData.length); + + await services.bulkDataService.bulkDataInsert({ + baseName, + tableName: table.id, + body: insertArray, + cookie: {}, + skip_hooks: true, + foreign_key_checks: !!source.isMeta(), + }); + + logBasic( + `:: Importing '${ + table.title + }' data :: ${importedCount} - ${ + importedCount + insertArray.length + }`, + ); + + importedCount += insertArray.length; + insertArray = []; + } + tempCount = 0; + } - importedCount += insertArray.length; - insertArray = []; + if (queue.size < QUEUE_BUFFER_LIMIT / 2) dataStream.resume(); - readable.resume(); - } - tempCount = 0; + resolve(true); + } catch (e) { + logger.error(e); + logWarning( + `There were errors on importing '${table.title}' data :: ${e}`, + ); + if (queue.size < QUEUE_BUFFER_LIMIT / 2) dataStream.resume(); + resolve(true); } - activeProcess--; - if (activeProcess < BULK_PARALLEL_PROCESS) readable.resume(); - resolve(true); - } catch (e) { - logger.error(e); - logWarning( - `There were errors on importing '${table.title}' data :: ${e}`, - ); - readable.resume(); - resolve(true); - } - }), + }), ); + + if (queue.size >= QUEUE_BUFFER_LIMIT) dataStream.pause(); }); - readable.on('end', async () => { + dataStream.on('end', async () => { try { // ensure all chunks are processed - await Promise.all(promises); + await queue.onIdle(); // insert remaining data if (tempData.length > 0) { @@ -205,31 +268,36 @@ export async function importData({ body: tempData, cookie: {}, skip_hooks: true, + foreign_key_checks: !!source.isMeta(), }); logBasic( - `:: Importing '${ - table.title - }' data :: ${importedCount} - ${Math.min( - importedCount + tempData.length, - allRecordsCount, - )}`, + `:: Importing '${table.title}' data :: ${importedCount} - ${ + importedCount + tempData.length + }`, ); importedCount += tempData.length; tempData = []; } - resolve(true); + + nestedLinkCount = (await ltarPromise) as number; + + resolve({ + importedCount, + nestedLinkCount, + }); } catch (e) { logger.error(e); logWarning( `There were errors on importing '${table.title}' data :: ${e}`, ); - resolve(true); + resolve({ + importedCount, + nestedLinkCount, + }); } }); }); - - return records; } catch (e) { throw e; } @@ -237,25 +305,22 @@ export async function importData({ export async function importLTARData({ table, - fields, - atBase, baseName, insertedAssocRef = {}, - records, + dataStream, atNcAliasRef, ncLinkMappingTable, syncDB, + source, services, + queue, logBasic = (_str) => {}, - logDetailed = (_str) => {}, logWarning = (_str) => {}, }: { baseName: string; table: { title?: string; id?: string }; - fields; - atBase: AirtableBase; insertedAssocRef: { [assocTableId: string]: boolean }; - records?: EntityMap; + dataStream?: Readable; atNcAliasRef: { [ncTableId: string]: { [ncTitle: string]: string; @@ -263,26 +328,19 @@ export async function importLTARData({ }; ncLinkMappingTable: Record>[]; syncDB; + source: Source; services: AirtableImportContext; + queue: PQueue; logBasic: (string) => void; logDetailed: (string) => void; logWarning: (string) => void; -}) { +}): Promise { const assocTableMetas: Array<{ modelMeta: { id?: string; title?: string }; colMeta: { title?: string }; curCol: { title?: string }; refCol: { title?: string }; }> = []; - const allData: EntityMap = - records || - (await readAllData({ - table, - fields, - atBase, - logDetailed, - logBasic, - })); const modelMeta: any = await services.tableService.getTableWithAccessibleViews({ @@ -329,108 +387,116 @@ export async function importLTARData({ let nestedLinkCnt = 0; let importedCount = 0; - let assocTableData = []; - // Iterate over all related M2M associative table - for (const assocMeta of assocTableMetas) { - // extract link data from records - await new Promise((resolve) => { - const promises = []; - const readable = allData.getStream(); - - readable.on('data', async (record) => { - promises.push( - new Promise(async (resolve) => { - try { - const { id: _atId, ...rec } = record; - - // todo: use actual alias instead of sanitized - assocTableData.push( - ...( - rec?.[atNcAliasRef[table.id][assocMeta.colMeta.title]] || [] - ).map((id) => ({ - [assocMeta.curCol.title]: record.id, - [assocMeta.refCol.title]: id, - })), - ); - - if (assocTableData.length >= BULK_LINK_BATCH_COUNT) { - readable.pause(); - - let insertArray = assocTableData.splice( - 0, - assocTableData.length, + const assocTableData = {}; + // extract link data from records + return new Promise((resolve, reject) => { + dataStream.on('data', async (record) => { + record = JSON.parse(record); + // Iterate over all related M2M associative table + for (const assocMeta of assocTableMetas) { + if (!assocTableData[assocMeta.modelMeta.id]) { + assocTableData[assocMeta.modelMeta.id] = []; + } + queue.add( + () => + new Promise(async (resolve) => { + try { + const { id: _atId, ...rec } = record; + + // todo: use actual alias instead of sanitized + assocTableData[assocMeta.modelMeta.id].push( + ...( + rec?.[atNcAliasRef[table.id][assocMeta.colMeta.title]] || [] + ).map((id) => ({ + [assocMeta.curCol.title]: record.id, + [assocMeta.refCol.title]: id, + })), ); - logBasic( - `:: Importing '${ - table.title - }' LTAR data :: ${importedCount} - ${ - importedCount + insertArray.length - }`, - ); + if ( + assocTableData[assocMeta.modelMeta.id].length >= + BULK_LINK_BATCH_COUNT + ) { + let insertArray = assocTableData[ + assocMeta.modelMeta.id + ].splice(0, assocTableData[assocMeta.modelMeta.id].length); - await services.bulkDataService.bulkDataInsert({ - baseName, - tableName: assocMeta.modelMeta.id, - body: insertArray, - cookie: {}, - skip_hooks: true, - }); + const lastImportedCount = importedCount; + importedCount += insertArray.length; + + logBasic( + `:: Importing '${ + table.title + }' LTAR data :: ${lastImportedCount} - ${ + lastImportedCount + insertArray.length + }`, + ); + + await services.bulkDataService.bulkDataInsert({ + baseName, + tableName: assocMeta.modelMeta.id, + body: insertArray, + cookie: {}, + skip_hooks: true, + foreign_key_checks: !!source.isMeta(), + }); - importedCount += insertArray.length; - insertArray = []; + insertArray = []; + } - readable.resume(); + if (queue.size < QUEUE_BUFFER_LIMIT / 2) dataStream.resume(); + resolve(true); + } catch (e) { + logger.error(e); + logWarning( + `There were errors on importing '${table.title}' LTAR data :: ${e}`, + ); + if (queue.size < QUEUE_BUFFER_LIMIT / 2) dataStream.resume(); + resolve(true); } - resolve(true); - } catch (e) { - logger.error(e); - logWarning( - `There were errors on importing '${table.title}' LTAR data :: ${e}`, - ); - readable.resume(); - resolve(true); - } - }), + }), ); - }); - readable.on('end', async () => { - try { - // ensure all chunks are processed - await Promise.all(promises); + } + + if (queue.size >= QUEUE_BUFFER_LIMIT) dataStream.pause(); + }); + dataStream.on('end', async () => { + try { + // ensure all chunks are processed + await queue.onIdle(); + for (const assocMeta of assocTableMetas) { // insert remaining data - if (assocTableData.length >= 0) { + if (assocTableData[assocMeta.modelMeta.id].length >= 0) { logBasic( `:: Importing '${table.title}' LTAR data :: ${importedCount} - ${ - importedCount + assocTableData.length + importedCount + assocTableData[assocMeta.modelMeta.id].length }`, ); await services.bulkDataService.bulkDataInsert({ baseName, tableName: assocMeta.modelMeta.id, - body: assocTableData, + body: assocTableData[assocMeta.modelMeta.id], cookie: {}, skip_hooks: true, + foreign_key_checks: !!source.isMeta(), }); - importedCount += assocTableData.length; - assocTableData = []; + importedCount += assocTableData[assocMeta.modelMeta.id].length; + assocTableData[assocMeta.modelMeta.id] = []; } + } - nestedLinkCnt += importedCount; + nestedLinkCnt += importedCount; - resolve(true); - } catch (e) { - logger.error(e); - logWarning( - `There were errors on importing '${table.title}' LTAR data :: ${e}`, - ); - resolve(true); - } - }); + resolve(nestedLinkCnt); + } catch (e) { + reject(e); + } }); - } - return nestedLinkCnt; + + // resume the stream after all listeners are attached + dataStream.resume(); + }); } diff --git a/packages/nocodb/src/plugins/s3/S3.ts b/packages/nocodb/src/plugins/s3/S3.ts index 866f7b9fad..5908fed023 100644 --- a/packages/nocodb/src/plugins/s3/S3.ts +++ b/packages/nocodb/src/plugins/s3/S3.ts @@ -56,27 +56,23 @@ export default class S3 implements IStorageAdapterV2 { const uploadParams: any = { ...this.defaultParams, }; - return new Promise((resolve, reject) => { - axios - .get(url, { - httpAgent: useAgent(url, { stopPortScanningByUrlRedirection: true }), - httpsAgent: useAgent(url, { stopPortScanningByUrlRedirection: true }), - responseType: 'stream', - }) - .then((response) => { - uploadParams.Body = response.data; - uploadParams.Key = key; - uploadParams.ContentType = response.headers['content-type']; - - // call S3 to retrieve upload file to specified bucket - this.upload(uploadParams).then((data) => { - resolve(data); - }); - }) - .catch((error) => { - reject(error); - }); - }); + + try { + const response = await axios.get(url, { + httpAgent: useAgent(url, { stopPortScanningByUrlRedirection: true }), + httpsAgent: useAgent(url, { stopPortScanningByUrlRedirection: true }), + responseType: 'stream', + }); + + uploadParams.Body = response.data; + uploadParams.Key = key; + uploadParams.ContentType = response.headers['content-type']; + + const data = await this.upload(uploadParams); + return data; + } catch (error) { + throw error; + } } // TODO - implement @@ -161,26 +157,23 @@ export default class S3 implements IStorageAdapterV2 { } private async upload(uploadParams): Promise { - return new Promise((resolve, reject) => { + try { // call S3 to retrieve upload file to specified bucket const upload = new Upload({ client: this.s3Client, params: { ...this.defaultParams, ...uploadParams }, }); - upload - .done() - .then((data) => { - if (data) { - resolve( - `https://${this.input.bucket}.s3.${this.input.region}.amazonaws.com/${uploadParams.Key}`, - ); - } - }) - .catch((err) => { - console.error(err); - reject(err); - }); - }); + const data = await upload.done(); + + if (data) { + return `https://${this.input.bucket}.s3.${this.input.region}.amazonaws.com/${uploadParams.Key}`; + } else { + throw new Error('Upload failed or no data returned.'); + } + } catch (error) { + console.error(error); + throw error; + } } } diff --git a/packages/nocodb/src/services/attachments.service.ts b/packages/nocodb/src/services/attachments.service.ts index ff6ba485ea..71f1c18532 100644 --- a/packages/nocodb/src/services/attachments.service.ts +++ b/packages/nocodb/src/services/attachments.service.ts @@ -1,8 +1,9 @@ import path from 'path'; import { AppEvents } from 'nocodb-sdk'; -import { Injectable } from '@nestjs/common'; +import { Injectable, Logger } from '@nestjs/common'; import { nanoid } from 'nanoid'; import slash from 'slash'; +import PQueue from 'p-queue'; import type { AttachmentReqType, FileType } from 'nocodb-sdk'; import type { NcRequest } from '~/interface/config'; import { AppHooksService } from '~/services/app-hooks/app-hooks.service'; @@ -14,6 +15,8 @@ import { utf8ify } from '~/helpers/stringHelpers'; @Injectable() export class AttachmentsService { + protected logger = new Logger(AttachmentsService.name); + constructor(private readonly appHooksService: AppHooksService) {} async upload(param: { path?: string; files: FileType[]; req: NcRequest }) { @@ -25,63 +28,78 @@ export class AttachmentsService { const storageAdapter = await NcPluginMgrv2.storageAdapter(); - const attachments = await Promise.all( - param.files?.map(async (file) => { - const originalName = utf8ify(file.originalname); - const fileName = `${nanoid(18)}${path.extname(originalName)}`; - - const url = await storageAdapter.fileCreate( - slash(path.join(destPath, fileName)), - file, - ); - - const attachment: { - url?: string; - path?: string; - title: string; - mimetype: string; - size: number; - icon?: string; - signedPath?: string; - signedUrl?: string; - } = { - ...(url ? { url } : {}), - title: originalName, - mimetype: file.mimetype, - size: file.size, - icon: mimeIcons[path.extname(originalName).slice(1)] || undefined, - }; - - const promises = []; - // if `url` is null, then it is local attachment - if (!url) { - // then store the attachment path only - // url will be constructed in `useAttachmentCell` - attachment.path = `download/${filePath.join('/')}/${fileName}`; - - promises.push( - PresignedUrl.getSignedUrl({ - path: attachment.path.replace(/^download\//, ''), - }).then((r) => (attachment.signedPath = r)), + // just in case we want to increase concurrency in future + const queue = new PQueue({ concurrency: 1 }); + + const attachments = []; + const errors = []; + + queue.addAll( + param.files?.map((file) => async () => { + try { + const originalName = utf8ify(file.originalname); + const fileName = `${nanoid(18)}${path.extname(originalName)}`; + + const url = await storageAdapter.fileCreate( + slash(path.join(destPath, fileName)), + file, ); - } else { - if (attachment.url.includes('.amazonaws.com/')) { - const relativePath = decodeURI( - attachment.url.split('.amazonaws.com/')[1], - ); - promises.push( - PresignedUrl.getSignedUrl({ + + const attachment: { + url?: string; + path?: string; + title: string; + mimetype: string; + size: number; + icon?: string; + signedPath?: string; + signedUrl?: string; + } = { + ...(url ? { url } : {}), + title: originalName, + mimetype: file.mimetype, + size: file.size, + icon: mimeIcons[path.extname(originalName).slice(1)] || undefined, + }; + + // if `url` is null, then it is local attachment + if (!url) { + // then store the attachment path only + // url will be constructed in `useAttachmentCell` + attachment.path = `download/${filePath.join('/')}/${fileName}`; + + attachment.signedPath = await PresignedUrl.getSignedUrl({ + path: attachment.path.replace(/^download\//, ''), + }); + } else { + if (attachment.url.includes('.amazonaws.com/')) { + const relativePath = decodeURI( + attachment.url.split('.amazonaws.com/')[1], + ); + + attachment.signedUrl = await PresignedUrl.getSignedUrl({ path: relativePath, s3: true, - }).then((r) => (attachment.signedUrl = r)), - ); + }); + } } - } - return Promise.all(promises).then(() => attachment); + attachments.push(attachment); + } catch (e) { + errors.push(e); + } }), ); + await queue.onIdle(); + + if (errors.length) { + for (const error of errors) { + this.logger.error(error); + } + throw errors[0]; + } + this.appHooksService.emit(AppEvents.ATTACHMENT_UPLOAD, { type: 'file', req: param.req, @@ -103,40 +121,59 @@ export class AttachmentsService { const storageAdapter = await NcPluginMgrv2.storageAdapter(); - const attachments = await Promise.all( - param.urls?.map?.(async (urlMeta) => { - const { url, fileName: _fileName } = urlMeta; + // just in case we want to increase concurrency in future + const queue = new PQueue({ concurrency: 1 }); - const fileName = `${nanoid(18)}${path.extname( - _fileName || url.split('/').pop(), - )}`; + const attachments = []; + const errors = []; - const attachmentUrl: string | null = - await storageAdapter.fileCreateByUrl( - slash(path.join(destPath, fileName)), - url, - ); + queue.addAll( + param.urls?.map?.((urlMeta) => async () => { + try { + const { url, fileName: _fileName } = urlMeta; - let attachmentPath: string | undefined; + const fileName = `${nanoid(18)}${path.extname( + _fileName || url.split('/').pop(), + )}`; - // if `attachmentUrl` is null, then it is local attachment - if (!attachmentUrl) { - // then store the attachment path only - // url will be constructed in `useAttachmentCell` - attachmentPath = `download/${filePath.join('/')}/${fileName}`; - } + const attachmentUrl: string | null = + await storageAdapter.fileCreateByUrl( + slash(path.join(destPath, fileName)), + url, + ); + + let attachmentPath: string | undefined; - return { - ...(attachmentUrl ? { url: attachmentUrl } : {}), - ...(attachmentPath ? { path: attachmentPath } : {}), - title: _fileName, - mimetype: urlMeta.mimetype, - size: urlMeta.size, - icon: mimeIcons[path.extname(fileName).slice(1)] || undefined, - }; + // if `attachmentUrl` is null, then it is local attachment + if (!attachmentUrl) { + // then store the attachment path only + // url will be constructed in `useAttachmentCell` + attachmentPath = `download/${filePath.join('/')}/${fileName}`; + } + + attachments.push({ + ...(attachmentUrl ? { url: attachmentUrl } : {}), + ...(attachmentPath ? { path: attachmentPath } : {}), + title: _fileName, + mimetype: urlMeta.mimetype, + size: urlMeta.size, + icon: mimeIcons[path.extname(fileName).slice(1)] || undefined, + }); + } catch (e) { + errors.push(e); + } }), ); + await queue.onIdle(); + + if (errors.length) { + for (const error of errors) { + this.logger.error(error); + } + throw errors[0]; + } + this.appHooksService.emit(AppEvents.ATTACHMENT_UPLOAD, { type: 'url', req: param.req,