@ -1,5 +1,5 @@
import { UITypes , ViewTypes } from 'nocodb-sdk' ;
import { Injectable } from '@nestjs/common' ;
import { Injectable , Logger } from '@nestjs/common' ;
import papaparse from 'papaparse' ;
import {
findWithIdentifier ,
@ -27,11 +27,15 @@ import { HooksService } from '../../../services/hooks.service';
import { ViewsService } from '../../../services/views.service' ;
import NcPluginMgrv2 from '../../../helpers/NcPluginMgrv2' ;
import { BulkDataAliasService } from '../../../services/bulk-data-alias.service' ;
import { elapsedTime , initTime } from '../helpers' ;
import type { Readable } from 'stream' ;
import type { ViewCreateReqType } from 'nocodb-sdk' ;
import type { LinkToAnotherRecordColumn , User , View } from '../../../models' ;
@Injectable ( )
export class ImportService {
private readonly logger = new Logger ( ImportService . name ) ;
constructor (
private tablesService : TablesService ,
private columnsService : ColumnsService ,
@ -59,6 +63,8 @@ export class ImportService {
req : any ;
externalModels? : Model [ ] ;
} ) {
const hrTime = initTime ( ) ;
// structured id to db id
const idMap = new Map < string , string > ( ) ;
const externalIdMap = new Map < string , string > ( ) ;
@ -111,6 +117,8 @@ export class ImportService {
}
}
elapsedTime ( hrTime , 'generate id map for external models' , 'importModels' ) ;
// create tables with static columns
for ( const data of param . data ) {
const modelData = data . model ;
@ -148,9 +156,11 @@ export class ImportService {
tableReferences . set ( modelData . id , table ) ;
}
elapsedTime ( hrTime , 'create tables with static columns' , 'importModels' ) ;
const referencedColumnSet = [ ] ;
// create columns with reference to other columns
// create LTAR columns
for ( const data of param . data ) {
const modelData = data . model ;
const table = tableReferences . get ( modelData . id ) ;
@ -159,7 +169,6 @@ export class ImportService {
( a ) = > a . uidt === UITypes . LinkToAnotherRecord ,
) ;
// create columns with reference to other columns
for ( const col of linkedColumnSet ) {
if ( col . colOptions ) {
const colOptions = col . colOptions ;
@ -701,6 +710,8 @@ export class ImportService {
) ;
}
elapsedTime ( hrTime , 'create LTAR columns' , 'importModels' ) ;
const sortedReferencedColumnSet = [ ] ;
// sort referenced columns to avoid referencing before creation
@ -814,6 +825,8 @@ export class ImportService {
}
}
elapsedTime ( hrTime , 'create referenced columns' , 'importModels' ) ;
// create views
for ( const data of param . data ) {
const modelData = data . model ;
@ -930,6 +943,8 @@ export class ImportService {
}
}
elapsedTime ( hrTime , 'create views' , 'importModels' ) ;
// create hooks
for ( const data of param . data ) {
if ( ! data ? . hooks ) break ;
@ -972,6 +987,8 @@ export class ImportService {
}
}
elapsedTime ( hrTime , 'create hooks' , 'importModels' ) ;
return idMap ;
}
@ -1114,25 +1131,17 @@ export class ImportService {
file? : any ;
} ;
req : any ;
debug? : boolean ;
} ) {
const { user , projectId , baseId , src , req } = param ;
const hrTime = initTime ( ) ;
const debug = param ? . debug === true ;
const debugLog = ( . . . args : any [ ] ) = > {
if ( ! debug ) return ;
console . log ( . . . args ) ;
} ;
const { user , projectId , baseId , src , req } = param ;
let start = process . hrtime ( ) ;
const destProject = await Project . get ( projectId ) ;
const destBase = await Base . get ( baseId ) ;
const elapsedTime = function ( label? : string ) {
const elapsedS = process . hrtime ( start ) [ 0 ] . toFixed ( 3 ) ;
const elapsedMs = process . hrtime ( start ) [ 1 ] / 1000000 ;
if ( label ) debugLog ( ` ${ label } : ${ elapsedS } s ${ elapsedMs } ms ` ) ;
start = process . hrtime ( ) ;
} ;
if ( ! destProject || ! destBase ) {
throw NcError . badRequest ( 'Project or Base not found' ) ;
}
switch ( src . type ) {
case 'local' : {
@ -1145,10 +1154,10 @@ export class ImportService {
await storageAdapter . fileRead ( ` ${ path } /schema.json ` ) ,
) ;
elapsedTime ( 'read schema') ;
elapsedTime ( hrTime , 'read schema from file' , 'importBase ') ;
// store fk_mm_model_id (mm) to link once
cons t handledLinks = [ ] ;
le t handledLinks = [ ] ;
const idMap = await this . importModels ( {
user ,
@ -1158,7 +1167,7 @@ export class ImportService {
req ,
} ) ;
elapsedTime ( 'import models') ;
elapsedTime ( hrTime , 'import models schema' , 'importBase ') ;
if ( idMap ) {
const files = await ( storageAdapter as any ) . getDirectoryList (
@ -1174,9 +1183,6 @@ export class ImportService {
` ${ path } /data/ ${ file } ` ,
) ;
const headers : string [ ] = [ ] ;
let chunk = [ ] ;
const modelId = findWithIdentifier (
idMap ,
file . replace ( /\.csv$/ , '' ) ,
@ -1184,209 +1190,39 @@ export class ImportService {
const model = await Model . get ( modelId ) ;
debugLog ( ` Importing ${ model . title } ... ` ) ;
await new Promise ( ( resolve ) = > {
papaparse . parse ( readStream , {
newline : '\r\n' ,
step : async ( results , parser ) = > {
if ( ! headers . length ) {
parser . pause ( ) ;
for ( const header of results . data ) {
const id = idMap . get ( header ) ;
if ( id ) {
const col = await Column . get ( {
base_id : baseId ,
colId : id ,
} ) ;
if ( col . colOptions ? . type === 'bt' ) {
const childCol = await Column . get ( {
base_id : baseId ,
colId : col.colOptions.fk_child_column_id ,
} ) ;
headers . push ( childCol . column_name ) ;
} else {
headers . push ( col . column_name ) ;
}
} else {
debugLog ( header ) ;
}
}
parser . resume ( ) ;
} else {
if ( results . errors . length === 0 ) {
const row = { } ;
for ( let i = 0 ; i < headers . length ; i ++ ) {
if ( results . data [ i ] !== '' ) {
row [ headers [ i ] ] = results . data [ i ] ;
}
}
chunk . push ( row ) ;
if ( chunk . length > 100 ) {
parser . pause ( ) ;
elapsedTime ( 'before import chunk' ) ;
try {
await this . bulkDataService . bulkDataInsert ( {
projectName : projectId ,
tableName : modelId ,
body : chunk ,
cookie : null ,
chunkSize : chunk.length + 1 ,
foreign_key_checks : false ,
raw : true ,
} ) ;
} catch ( e ) {
debugLog ( ` ${ model . title } import throwed an error! ` ) ;
console . log ( e ) ;
}
chunk = [ ] ;
elapsedTime ( 'after import chunk' ) ;
parser . resume ( ) ;
}
}
}
} ,
complete : async ( ) = > {
if ( chunk . length > 0 ) {
elapsedTime ( 'before import chunk' ) ;
try {
await this . bulkDataService . bulkDataInsert ( {
projectName : projectId ,
tableName : modelId ,
body : chunk ,
cookie : null ,
chunkSize : chunk.length + 1 ,
foreign_key_checks : false ,
raw : true ,
} ) ;
} catch ( e ) {
debugLog ( chunk ) ;
console . log ( e ) ;
}
chunk = [ ] ;
elapsedTime ( 'after import chunk' ) ;
}
resolve ( null ) ;
} ,
} ) ;
this . logger . debug ( ` Importing ${ model . title } ... ` ) ;
await this . importDataFromCsvStream ( {
idMap ,
dataStream : readStream ,
destProject ,
destBase ,
destModel : model ,
} ) ;
elapsedTime (
hrTime ,
` import data for ${ model . title } ` ,
'importBase' ,
) ;
}
// reset timer
elapsedTime ( ) ;
elapsedTime ( hrTime ) ;
const linkReadStream = await (
storageAdapter as any
) . fileReadByStream ( linkFile ) ;
const lChunk : Record < string , any [ ] > = { } ; // fk_mm_model_id: { rowId, childId }[]
let headersFound = false ;
let childIndex = - 1 ;
let parentIndex = - 1 ;
let columnIndex = - 1 ;
const mmColumns : Record < string , Column > = { } ;
const mmParentChild : any = { } ;
await new Promise ( ( resolve ) = > {
papaparse . parse ( linkReadStream , {
newline : '\r\n' ,
step : async ( results , parser ) = > {
if ( ! headersFound ) {
for ( const [ i , header ] of Object . entries ( results . data ) ) {
if ( header === 'child' ) {
childIndex = parseInt ( i ) ;
} else if ( header === 'parent' ) {
parentIndex = parseInt ( i ) ;
} else if ( header === 'column' ) {
columnIndex = parseInt ( i ) ;
}
}
headersFound = true ;
} else {
if ( results . errors . length === 0 ) {
if (
results . data [ childIndex ] === 'child' &&
results . data [ parentIndex ] === 'parent' &&
results . data [ columnIndex ] === 'column'
)
return ;
const child = results . data [ childIndex ] ;
const parent = results . data [ parentIndex ] ;
const columnId = results . data [ columnIndex ] ;
if ( child && parent && columnId ) {
if ( mmColumns [ columnId ] ) {
// push to chunk
const mmModelId =
mmColumns [ columnId ] . colOptions . fk_mm_model_id ;
const mm = mmParentChild [ mmModelId ] ;
lChunk [ mmModelId ] . push ( {
[ mm . parent ] : parent ,
[ mm . child ] : child ,
} ) ;
} else {
// get column for the first time
parser . pause ( ) ;
const col = await Column . get ( {
colId : findWithIdentifier ( idMap , columnId ) ,
} ) ;
const colOptions =
await col . getColOptions < LinkToAnotherRecordColumn > ( ) ;
const vChildCol = await colOptions . getMMChildColumn ( ) ;
const vParentCol =
await colOptions . getMMParentColumn ( ) ;
mmParentChild [ col . colOptions . fk_mm_model_id ] = {
parent : vParentCol.column_name ,
child : vChildCol.column_name ,
} ;
mmColumns [ columnId ] = col ;
handledLinks . push ( col . colOptions . fk_mm_model_id ) ;
const mmModelId = col . colOptions . fk_mm_model_id ;
// create chunk
lChunk [ mmModelId ] = [ ] ;
// push to chunk
const mm = mmParentChild [ mmModelId ] ;
lChunk [ mmModelId ] . push ( {
[ mm . parent ] : parent ,
[ mm . child ] : child ,
} ) ;
parser . resume ( ) ;
}
}
}
}
} ,
complete : async ( ) = > {
for ( const [ k , v ] of Object . entries ( lChunk ) ) {
try {
await this . bulkDataService . bulkDataInsert ( {
projectName : projectId ,
tableName : k ,
body : v ,
cookie : null ,
chunkSize : 1000 ,
foreign_key_checks : false ,
raw : true ,
} ) ;
} catch ( e ) {
console . log ( e ) ;
}
}
resolve ( null ) ;
} ,
} ) ;
handledLinks = await this . importLinkFromCsvStream ( {
idMap ,
linkStream : linkReadStream ,
destProject ,
destBase ,
handledLinks ,
} ) ;
elapsedTime ( hrTime , ` import links ` , 'importBase' ) ;
}
} catch ( e ) {
throw new Error ( e ) ;
@ -1399,4 +1235,238 @@ export class ImportService {
break ;
}
}
importDataFromCsvStream ( param : {
idMap : Map < string , string > ;
dataStream : Readable ;
destProject : Project ;
destBase : Base ;
destModel : Model ;
} ) : Promise < void > {
const { idMap , dataStream , destBase , destProject , destModel } = param ;
const headers : string [ ] = [ ] ;
let chunk = [ ] ;
return new Promise ( ( resolve ) = > {
papaparse . parse ( dataStream , {
newline : '\r\n' ,
step : async ( results , parser ) = > {
if ( ! headers . length ) {
parser . pause ( ) ;
for ( const header of results . data ) {
const id = idMap . get ( header ) ;
if ( id ) {
const col = await Column . get ( {
base_id : destBase.id ,
colId : id ,
} ) ;
if ( col ) {
if ( col . colOptions ? . type === 'bt' ) {
const childCol = await Column . get ( {
base_id : destBase.id ,
colId : col.colOptions.fk_child_column_id ,
} ) ;
if ( childCol ) {
headers . push ( childCol . column_name ) ;
} else {
headers . push ( null ) ;
this . logger . error (
` child column not found ( ${ col . colOptions . fk_child_column_id } ) ` ,
) ;
}
} else {
headers . push ( col . column_name ) ;
}
} else {
headers . push ( null ) ;
this . logger . error ( ` column not found ( ${ id } ) ` ) ;
}
} else {
headers . push ( null ) ;
this . logger . error ( ` id not found ( ${ header } ) ` ) ;
}
}
parser . resume ( ) ;
} else {
if ( results . errors . length === 0 ) {
const row = { } ;
for ( let i = 0 ; i < headers . length ; i ++ ) {
if ( headers [ i ] ) {
if ( results . data [ i ] !== '' ) {
row [ headers [ i ] ] = results . data [ i ] ;
}
}
}
chunk . push ( row ) ;
if ( chunk . length > 1000 ) {
parser . pause ( ) ;
try {
await this . bulkDataService . bulkDataInsert ( {
projectName : destProject.id ,
tableName : destModel.id ,
body : chunk ,
cookie : null ,
chunkSize : chunk.length + 1 ,
foreign_key_checks : false ,
raw : true ,
} ) ;
} catch ( e ) {
this . logger . error ( e ) ;
}
chunk = [ ] ;
parser . resume ( ) ;
}
}
}
} ,
complete : async ( ) = > {
if ( chunk . length > 0 ) {
try {
await this . bulkDataService . bulkDataInsert ( {
projectName : destProject.id ,
tableName : destModel.id ,
body : chunk ,
cookie : null ,
chunkSize : chunk.length + 1 ,
foreign_key_checks : false ,
raw : true ,
} ) ;
} catch ( e ) {
this . logger . error ( e ) ;
}
chunk = [ ] ;
}
resolve ( null ) ;
} ,
} ) ;
} ) ;
}
// import links and return handled links
async importLinkFromCsvStream ( param : {
idMap : Map < string , string > ;
linkStream : Readable ;
destProject : Project ;
destBase : Base ;
handledLinks : string [ ] ;
} ) : Promise < string [ ] > {
const { idMap , linkStream , destBase , destProject , handledLinks } = param ;
const lChunks : Record < string , any [ ] > = { } ; // fk_mm_model_id: { rowId, childId }[]
const insertChunks = async ( ) = > {
for ( const [ k , v ] of Object . entries ( lChunks ) ) {
try {
if ( v . length === 0 ) continue ;
await this . bulkDataService . bulkDataInsert ( {
projectName : destProject.id ,
tableName : k ,
body : v ,
cookie : null ,
chunkSize : 1000 ,
foreign_key_checks : false ,
raw : true ,
} ) ;
lChunks [ k ] = [ ] ;
} catch ( e ) {
this . logger . error ( e ) ;
}
}
} ;
let headersFound = false ;
let childIndex = - 1 ;
let parentIndex = - 1 ;
let columnIndex = - 1 ;
const mmColumns : Record < string , Column > = { } ;
const mmParentChild : any = { } ;
return new Promise ( ( resolve ) = > {
papaparse . parse ( linkStream , {
newline : '\r\n' ,
step : async ( results , parser ) = > {
if ( ! headersFound ) {
for ( const [ i , header ] of Object . entries ( results . data ) ) {
if ( header === 'child' ) {
childIndex = parseInt ( i ) ;
} else if ( header === 'parent' ) {
parentIndex = parseInt ( i ) ;
} else if ( header === 'column' ) {
columnIndex = parseInt ( i ) ;
}
}
headersFound = true ;
} else {
if ( results . errors . length === 0 ) {
const child = results . data [ childIndex ] ;
const parent = results . data [ parentIndex ] ;
const columnId = results . data [ columnIndex ] ;
if ( child && parent && columnId ) {
if ( mmColumns [ columnId ] ) {
// push to chunk
const mmModelId =
mmColumns [ columnId ] . colOptions . fk_mm_model_id ;
const mm = mmParentChild [ mmModelId ] ;
lChunks [ mmModelId ] . push ( {
[ mm . parent ] : parent ,
[ mm . child ] : child ,
} ) ;
} else {
// get column for the first time
parser . pause ( ) ;
await insertChunks ( ) ;
const col = await Column . get ( {
base_id : destBase.id ,
colId : findWithIdentifier ( idMap , columnId ) ,
} ) ;
if ( col ) {
const colOptions =
await col . getColOptions < LinkToAnotherRecordColumn > ( ) ;
const vChildCol = await colOptions . getMMChildColumn ( ) ;
const vParentCol = await colOptions . getMMParentColumn ( ) ;
mmParentChild [ col . colOptions . fk_mm_model_id ] = {
parent : vParentCol.column_name ,
child : vChildCol.column_name ,
} ;
mmColumns [ columnId ] = col ;
handledLinks . push ( col . colOptions . fk_mm_model_id ) ;
const mmModelId = col . colOptions . fk_mm_model_id ;
// create chunk
lChunks [ mmModelId ] = [ ] ;
// push to chunk
const mm = mmParentChild [ mmModelId ] ;
lChunks [ mmModelId ] . push ( {
[ mm . parent ] : parent ,
[ mm . child ] : child ,
} ) ;
} else {
this . logger . error ( ` column not found ( ${ columnId } ) ` ) ;
}
parser . resume ( ) ;
}
}
}
}
} ,
complete : async ( ) = > {
await insertChunks ( ) ;
resolve ( handledLinks ) ;
} ,
} ) ;
} ) ;
}
}