@ -1,5 +1,6 @@
/* eslint-disable no-async-promise-executor */
/* eslint-disable no-async-promise-executor */
import { RelationTypes , UITypes } from 'nocodb-sdk' ;
import { RelationTypes , UITypes } from 'nocodb-sdk' ;
import sizeof from 'object-sizeof' ;
import EntityMap from './EntityMap' ;
import EntityMap from './EntityMap' ;
import type { BulkDataAliasService } from '../../../../../services/bulk-data-alias.service' ;
import type { BulkDataAliasService } from '../../../../../services/bulk-data-alias.service' ;
import type { TablesService } from '../../../../../services/tables.service' ;
import type { TablesService } from '../../../../../services/tables.service' ;
@ -7,8 +8,8 @@ import type { TablesService } from '../../../../../services/tables.service';
import type { AirtableBase } from 'airtable/lib/airtable_base' ;
import type { AirtableBase } from 'airtable/lib/airtable_base' ;
import type { TableType } from 'nocodb-sdk' ;
import type { TableType } from 'nocodb-sdk' ;
const BULK_DATA_BATCH_SIZE = 500 ;
const BULK_DATA_BATCH_COUNT = 20 ; // check size for every 100 records
const ASSOC_ BULK_DATA_BATCH_SIZE = 1000 ;
const BULK_DATA_BATCH_SIZE = 50 * 1024 ; // in bytes
const BULK_PARALLEL_PROCESS = 5 ;
const BULK_PARALLEL_PROCESS = 5 ;
interface AirtableImportContext {
interface AirtableImportContext {
@ -42,6 +43,12 @@ async function readAllData({
. eachPage (
. eachPage (
async function page ( records , fetchNextPage ) {
async function page ( records , fetchNextPage ) {
if ( ! data ) {
if ( ! data ) {
/ *
EntityMap is a sqlite3 table dynamically populated based on json data provided
It is used to store data temporarily and then stream it in bulk to import
This is done to avoid memory issues - heap out of memory - while importing large data
* /
data = new EntityMap ( ) ;
data = new EntityMap ( ) ;
await data . init ( ) ;
await data . init ( ) ;
}
}
@ -96,8 +103,8 @@ export async function importData({
services : AirtableImportContext ;
services : AirtableImportContext ;
} ) : Promise < EntityMap > {
} ) : Promise < EntityMap > {
try {
try {
// @ts-ignore
// returns EntityMap which allows us to stream data
const records = await readAllData ( {
const records : EntityMap = await readAllData ( {
table ,
table ,
base ,
base ,
logDetailed ,
logDetailed ,
@ -108,22 +115,32 @@ export async function importData({
const readable = records . getStream ( ) ;
const readable = records . getStream ( ) ;
const allRecordsCount = await records . getCount ( ) ;
const allRecordsCount = await records . getCount ( ) ;
const promises = [ ] ;
const promises = [ ] ;
let tempData = [ ] ;
let tempData = [ ] ;
let importedCount = 0 ;
let importedCount = 0 ;
let tempCount = 0 ;
// we keep track of active process to pause and resume the stream as we have async calls within the stream and we don't want to load all data in memory
let activeProcess = 0 ;
let activeProcess = 0 ;
readable . on ( 'data' , async ( record ) = > {
readable . on ( 'data' , async ( record ) = > {
promises . push (
promises . push (
new Promise ( async ( resolve ) = > {
new Promise ( async ( resolve ) = > {
activeProcess ++ ;
activeProcess ++ ;
if ( activeProcess >= BULK_PARALLEL_PROCESS ) readable . pause ( ) ;
if ( activeProcess >= BULK_PARALLEL_PROCESS ) readable . pause ( ) ;
const { id : rid , . . . fields } = record ;
const { id : rid , . . . fields } = record ;
const r = await nocoBaseDataProcessing_v2 ( sDB , table , {
const r = await nocoBaseDataProcessing_v2 ( sDB , table , {
id : rid ,
id : rid ,
fields ,
fields ,
} ) ;
} ) ;
tempData . push ( r ) ;
tempData . push ( r ) ;
tempCount ++ ;
if ( tempCount >= BULK_DATA_BATCH_COUNT ) {
if ( sizeof ( tempData ) >= BULK_DATA_BATCH_SIZE ) {
readable . pause ( ) ;
if ( tempData . length >= BULK_DATA_BATCH_SIZE ) {
let insertArray = tempData . splice ( 0 , tempData . length ) ;
let insertArray = tempData . splice ( 0 , tempData . length ) ;
await services . bulkDataService . bulkDataInsert ( {
await services . bulkDataService . bulkDataInsert ( {
@ -131,18 +148,24 @@ export async function importData({
tableName : table.title ,
tableName : table.title ,
body : insertArray ,
body : insertArray ,
cookie : { } ,
cookie : { } ,
skip_hooks : true ,
} ) ;
} ) ;
logBasic (
logBasic (
` :: Importing ' ${
` :: Importing ' ${
table . title
table . title
} ' data : : $ { importedCount } - $ { Math . min (
} ' data : : $ { importedCount } - $ { Math . min (
importedCount + BULK_DATA_BATCH_SIZE ,
importedCount + insertArray . length ,
allRecordsCount ,
allRecordsCount ,
) } ` ,
) } ` ,
) ;
) ;
importedCount += insertArray . length ;
importedCount += insertArray . length ;
insertArray = [ ] ;
insertArray = [ ] ;
readable . resume ( ) ;
}
tempCount = 0 ;
}
}
activeProcess -- ;
activeProcess -- ;
if ( activeProcess < BULK_PARALLEL_PROCESS ) readable . resume ( ) ;
if ( activeProcess < BULK_PARALLEL_PROCESS ) readable . resume ( ) ;
@ -151,26 +174,31 @@ export async function importData({
) ;
) ;
} ) ;
} ) ;
readable . on ( 'end' , async ( ) = > {
readable . on ( 'end' , async ( ) = > {
// ensure all chunks are processed
await Promise . all ( promises ) ;
await Promise . all ( promises ) ;
// insert remaining data
if ( tempData . length > 0 ) {
if ( tempData . length > 0 ) {
await services . bulkDataService . bulkDataInsert ( {
await services . bulkDataService . bulkDataInsert ( {
projectName ,
projectName ,
tableName : table.title ,
tableName : table.title ,
body : tempData ,
body : tempData ,
cookie : { } ,
cookie : { } ,
skip_hooks : true ,
} ) ;
} ) ;
logBasic (
logBasic (
` :: Importing ' ${
` :: Importing ' ${
table . title
table . title
} ' data : : $ { importedCount } - $ { Math . min (
} ' data : : $ { importedCount } - $ { Math . min (
importedCount + BULK_DATA_BATCH_SIZE ,
importedCount + tempData . length ,
allRecordsCount ,
allRecordsCount ,
) } ` ,
) } ` ,
) ;
) ;
importedCount += tempData . length ;
importedCount += tempData . length ;
tempData = [ ] ;
tempData = [ ] ;
}
}
resolve ( true ) ;
resolve ( true ) ;
} ) ;
} ) ;
} ) ;
} ) ;
@ -219,7 +247,7 @@ export async function importLTARData({
curCol : { title? : string } ;
curCol : { title? : string } ;
refCol : { title? : string } ;
refCol : { title? : string } ;
} > = [ ] ;
} > = [ ] ;
const allData =
const allData : EntityMap =
records ||
records ||
( await readAllData ( {
( await readAllData ( {
table ,
table ,
@ -277,17 +305,16 @@ export async function importLTARData({
for await ( const assocMeta of assocTableMetas ) {
for await ( const assocMeta of assocTableMetas ) {
let assocTableData = [ ] ;
let assocTableData = [ ] ;
let importedCount = 0 ;
let importedCount = 0 ;
let tempCount = 0 ;
// extract insert data from records
// extract link data from records
await new Promise ( ( resolve ) = > {
await new Promise ( ( resolve ) = > {
const promises = [ ] ;
const promises = [ ] ;
const readable = allData . getStream ( ) ;
const readable = allData . getStream ( ) ;
let activeProcess = 0 ;
readable . on ( 'data' , async ( record ) = > {
readable . on ( 'data' , async ( record ) = > {
promises . push (
promises . push (
new Promise ( async ( resolve ) = > {
new Promise ( async ( resolve ) = > {
activeProcess ++ ;
if ( activeProcess >= BULK_PARALLEL_PROCESS ) readable . pause ( ) ;
const { id : _atId , . . . rec } = record ;
const { id : _atId , . . . rec } = record ;
// todo: use actual alias instead of sanitized
// todo: use actual alias instead of sanitized
@ -299,14 +326,22 @@ export async function importLTARData({
[ assocMeta . refCol . title ] : id ,
[ assocMeta . refCol . title ] : id ,
} ) ) ,
} ) ) ,
) ;
) ;
tempCount ++ ;
if ( tempCount >= BULK_DATA_BATCH_COUNT ) {
if ( sizeof ( assocTableData ) >= BULK_DATA_BATCH_SIZE ) {
readable . pause ( ) ;
let insertArray = assocTableData . splice (
0 ,
assocTableData . length ,
) ;
if ( assocTableData . length >= ASSOC_BULK_DATA_BATCH_SIZE ) {
let insertArray = assocTableData . splice ( 0 , assocTableData . length ) ;
logBasic (
logBasic (
` :: Importing ' ${
` :: Importing ' ${
table . title
table . title
} ' LTAR data : : $ { importedCount } - $ { Math . min (
} ' LTAR data : : $ { importedCount } - $ { Math . min (
importedCount + ASSOC_BULK_DATA_BATCH_SIZE ,
importedCount + insertArray . length ,
insertArray . length ,
insertArray . length ,
) } ` ,
) } ` ,
) ;
) ;
@ -316,25 +351,31 @@ export async function importLTARData({
tableName : assocMeta.modelMeta.title ,
tableName : assocMeta.modelMeta.title ,
body : insertArray ,
body : insertArray ,
cookie : { } ,
cookie : { } ,
skip_hooks : true ,
} ) ;
} ) ;
importedCount += insertArray . length ;
importedCount += insertArray . length ;
insertArray = [ ] ;
insertArray = [ ] ;
readable . resume ( ) ;
}
tempCount = 0 ;
}
}
activeProcess -- ;
if ( activeProcess < BULK_PARALLEL_PROCESS ) readable . resume ( ) ;
resolve ( true ) ;
resolve ( true ) ;
} ) ,
} ) ,
) ;
) ;
} ) ;
} ) ;
readable . on ( 'end' , async ( ) = > {
readable . on ( 'end' , async ( ) = > {
// ensure all chunks are processed
await Promise . all ( promises ) ;
await Promise . all ( promises ) ;
// insert remaining data
if ( assocTableData . length >= 0 ) {
if ( assocTableData . length >= 0 ) {
logBasic (
logBasic (
` :: Importing ' ${
` :: Importing ' ${
table . title
table . title
} ' LTAR data : : $ { importedCount } - $ { Math . min (
} ' LTAR data : : $ { importedCount } - $ { Math . min (
importedCount + ASSOC_BULK_DATA_BATCH_SIZE ,
importedCount + assocTableData . length ,
assocTableData . length ,
assocTableData . length ,
) } ` ,
) } ` ,
) ;
) ;
@ -344,11 +385,13 @@ export async function importLTARData({
tableName : assocMeta.modelMeta.title ,
tableName : assocMeta.modelMeta.title ,
body : assocTableData ,
body : assocTableData ,
cookie : { } ,
cookie : { } ,
skip_hooks : true ,
} ) ;
} ) ;
importedCount += assocTableData . length ;
importedCount += assocTableData . length ;
assocTableData = [ ] ;
assocTableData = [ ] ;
}
}
resolve ( true ) ;
resolve ( true ) ;
} ) ;
} ) ;
} ) ;
} ) ;