From e6354ad6a7d0340146eeb40571089b0d764aadb2 Mon Sep 17 00:00:00 2001 From: mertmit Date: Sat, 10 Aug 2024 07:32:00 +0000 Subject: [PATCH] feat: improved migration logic --- .../migration-jobs/nc_job_001_attachment.ts | 529 +++++++++--------- 1 file changed, 269 insertions(+), 260 deletions(-) diff --git a/packages/nocodb/src/modules/jobs/migration-jobs/nc_job_001_attachment.ts b/packages/nocodb/src/modules/jobs/migration-jobs/nc_job_001_attachment.ts index f4b02500f5..bc70e0b818 100644 --- a/packages/nocodb/src/modules/jobs/migration-jobs/nc_job_001_attachment.ts +++ b/packages/nocodb/src/modules/jobs/migration-jobs/nc_job_001_attachment.ts @@ -152,326 +152,335 @@ export class AttachmentMigration { .insert(fileReferenceBuffer); } - // eslint-disable-next-line no-constant-condition - while (true) { - const modelLimit = 100; + let processedModelsCount = 0; - let modelOffset = 0; + const processModel = async (modelData) => { + const { fk_workspace_id, base_id, source_id, fk_model_id } = modelData; - const modelsWithAttachmentColumns = []; + const context = { + workspace_id: fk_workspace_id, + base_id, + }; - // get models that have at least one attachment column, and not processed + const source = await Source.get(context, source_id); - // eslint-disable-next-line no-constant-condition - while (true) { - const selectFields = [ - ...(Noco.isEE() ? ['fk_workspace_id'] : []), - 'base_id', - 'source_id', - 'fk_model_id', - ]; - - const models = await ncMeta - .knexConnection(MetaTable.COLUMNS) - .select(selectFields) - .where('uidt', UITypes.Attachment) - .whereNotIn( - 'fk_model_id', - ncMeta - .knexConnection(temp_processed_models_table) - .select('fk_model_id') - .where('completed', true), - ) - .groupBy(selectFields) - .limit(modelLimit) - .offset(modelOffset); - - modelOffset += modelLimit; - - if (!models?.length) { - break; - } - - modelsWithAttachmentColumns.push(...models); + if (!source) { + this.log(`source not found for ${source_id}`); + return; } - if (!modelsWithAttachmentColumns?.length) { - break; - } + const model = await Model.get(context, fk_model_id); - this.log( - `Found ${modelsWithAttachmentColumns.length} models with attachment columns`, - ); + if (!model) { + this.log(`model not found for ${fk_model_id}`); + return; + } - let processedModelsCount = 0; + await model.getColumns(context); - for (const modelData of modelsWithAttachmentColumns) { - const { fk_workspace_id, base_id, source_id, fk_model_id } = - modelData; + const attachmentColumns = model.columns.filter( + (c) => c.uidt === UITypes.Attachment, + ); - const context = { - workspace_id: fk_workspace_id, - base_id, - }; + const dbDriver = await NcConnectionMgrv2.get(source); - const source = await Source.get(context, source_id); + if (!dbDriver) { + this.log(`connection can't achieved for ${source_id}`); + return; + } - if (!source) { - this.log(`source not found for ${source_id}`); - continue; - } + const baseModel = await Model.getBaseModelSQL(context, { + model, + dbDriver, + }); - const model = await Model.get(context, fk_model_id); + const processedModel = await ncMeta + .knexConnection(temp_processed_models_table) + .where('fk_model_id', fk_model_id) + .first(); - if (!model) { - this.log(`model not found for ${fk_model_id}`); - continue; - } + const dataLimit = 10; + let dataOffset = 0; - await model.getColumns(context); + if (!processedModel) { + await ncMeta + .knexConnection(temp_processed_models_table) + .insert({ fk_model_id, offset: 0 }); + } else { + dataOffset = processedModel.offset; + } - const attachmentColumns = model.columns.filter( - (c) => c.uidt === UITypes.Attachment, + // eslint-disable-next-line no-constant-condition + while (true) { + const data = await baseModel.list( + { + fieldsSet: new Set( + model.primaryKeys + .map((c) => c.title) + .concat(attachmentColumns.map((c) => c.title)), + ), + sort: model.primaryKeys.map((c) => c.title), + limit: dataLimit, + offset: dataOffset, + }, + { + ignoreViewFilterAndSort: true, + }, ); - const dbDriver = await NcConnectionMgrv2.get(source); + dataOffset += dataLimit; - if (!dbDriver) { - this.log(`connection can't achieved for ${source_id}`); - continue; + if (!data?.length) { + break; } - const baseModel = await Model.getBaseModelSQL(context, { - model, - dbDriver, - }); + const updatePayload = []; - const processedModel = await ncMeta - .knexConnection(temp_processed_models_table) - .where('fk_model_id', fk_model_id) - .first(); + for (const row of data) { + const updateData = {}; - const dataLimit = 10; - let dataOffset = 0; + let updateRequired = false; - if (!processedModel) { - await ncMeta - .knexConnection(temp_processed_models_table) - .insert({ fk_model_id, offset: 0 }); - } else { - dataOffset = processedModel.offset; - } - - // eslint-disable-next-line no-constant-condition - while (true) { - const data = await baseModel.list( - { - fieldsSet: new Set( - model.primaryKeys - .map((c) => c.title) - .concat(attachmentColumns.map((c) => c.title)), - ), - sort: model.primaryKeys.map((c) => c.title), - limit: dataLimit, - offset: dataOffset, - }, - { - ignoreViewFilterAndSort: true, - }, - ); - - dataOffset += dataLimit; - - if (!data?.length) { - break; - } + for (const column of attachmentColumns) { + let attachmentArr = row[column.title]; - const updatePayload = []; - - for (const row of data) { - const updateData = {}; - - let updateRequired = false; - - for (const column of attachmentColumns) { - let attachmentArr = row[column.title]; + if (!attachmentArr?.length) { + continue; + } - if (!attachmentArr?.length) { - continue; + try { + if (typeof attachmentArr === 'string') { + attachmentArr = JSON.parse(attachmentArr); } + } catch (e) { + this.log(`error parsing attachment data ${attachmentArr}`); + continue; + } - try { - if (typeof attachmentArr === 'string') { - attachmentArr = JSON.parse(attachmentArr); - } - } catch (e) { - this.log(`error parsing attachment data ${attachmentArr}`); - continue; - } + if (Array.isArray(attachmentArr)) { + attachmentArr = attachmentArr.map((a) => + extractProps(a, [ + 'id', + 'url', + 'path', + 'title', + 'mimetype', + 'size', + 'icon', + 'width', + 'height', + ]), + ); - if (Array.isArray(attachmentArr)) { - attachmentArr = attachmentArr.map((a) => - extractProps(a, [ - 'id', - 'url', - 'path', - 'title', - 'mimetype', - 'size', - 'icon', - 'width', - 'height', - ]), - ); - - for (const attachment of attachmentArr) { - try { - if ('path' in attachment || 'url' in attachment) { - const filePath = `nc/uploads/${ - attachment.path?.replace(/^download\//, '') || - this.normalizeUrl(attachment.url) - }`; - - const isReferenced = await ncMeta + for (const attachment of attachmentArr) { + try { + if ('path' in attachment || 'url' in attachment) { + const filePath = `nc/uploads/${ + attachment.path?.replace(/^download\//, '') || + this.normalizeUrl(attachment.url) + }`; + + const isReferenced = await ncMeta + .knexConnection(temp_file_references_table) + .where('file_path', filePath) + .first(); + + if (!isReferenced) { + // file is from another storage adapter + this.log( + `file not found in file references table ${ + attachment.path || attachment.url + }, ${filePath}`, + ); + } else if (isReferenced.referenced === false) { + const fileNameWithExt = path.basename(filePath); + + const mimetype = + attachment.mimetype || + mimetypes[path.extname(fileNameWithExt).slice(1)]; + + await ncMeta .knexConnection(temp_file_references_table) .where('file_path', filePath) + .update({ + mimetype, + referenced: true, + }); + + // insert file reference if not exists + const fileReference = await ncMeta + .knexConnection(MetaTable.FILE_REFERENCES) + .where('file_url', attachment.path || attachment.url) + .andWhere('storage', storageAdapterType) .first(); - if (!isReferenced) { - // file is from another storage adapter - this.log( - `file not found in file references table ${ - attachment.path || attachment.url - }, ${filePath}`, + if (!fileReference) { + await FileReference.insert( + { + workspace_id: RootScopes.ROOT, + base_id: RootScopes.ROOT, + }, + { + storage: storageAdapterType, + file_url: attachment.path || attachment.url, + file_size: attachment.size, + deleted: true, + }, ); - } else if (isReferenced.referenced === false) { - const fileNameWithExt = path.basename(filePath); - - const mimetype = - attachment.mimetype || - mimetypes[path.extname(fileNameWithExt).slice(1)]; - - await ncMeta - .knexConnection(temp_file_references_table) - .where('file_path', filePath) - .update({ - mimetype, - referenced: true, - }); - - // insert file reference if not exists - const fileReference = await ncMeta - .knexConnection(MetaTable.FILE_REFERENCES) - .where( - 'file_url', - attachment.path || attachment.url, - ) - .andWhere('storage', storageAdapterType) - .first(); - - if (!fileReference) { - await FileReference.insert( - { - workspace_id: RootScopes.ROOT, - base_id: RootScopes.ROOT, - }, - { - storage: storageAdapterType, - file_url: attachment.path || attachment.url, - file_size: attachment.size, - deleted: true, - }, - ); - } } + } - if (!('id' in attachment)) { - attachment.id = await FileReference.insert(context, { - source_id: source.id, - fk_model_id, - fk_column_id: column.id, - file_url: attachment.path || attachment.url, - file_size: attachment.size, - is_external: !source.isMeta(), - deleted: false, - }); - - updateRequired = true; - } + if (!('id' in attachment)) { + attachment.id = await FileReference.insert(context, { + source_id: source.id, + fk_model_id, + fk_column_id: column.id, + file_url: attachment.path || attachment.url, + file_size: attachment.size, + is_external: !source.isMeta(), + deleted: false, + }); + + updateRequired = true; } - } catch (e) { - this.log( - `Error processing attachment ${JSON.stringify( - attachment, - )}`, - ); - this.log(e); - throw e; } + } catch (e) { + this.log( + `Error processing attachment ${JSON.stringify( + attachment, + )}`, + ); + this.log(e); + throw e; } } - - if (updateRequired) { - updateData[column.column_name] = - JSON.stringify(attachmentArr); - } } - if (Object.keys(updateData).length === 0) { - continue; + if (updateRequired) { + updateData[column.column_name] = JSON.stringify(attachmentArr); } + } - for (const pk of model.primaryKeys) { - updateData[pk.column_name] = row[pk.title]; - } + if (Object.keys(updateData).length === 0) { + continue; + } - updatePayload.push(updateData); + for (const pk of model.primaryKeys) { + updateData[pk.column_name] = row[pk.title]; } - if (updatePayload.length > 0) { - for (const updateData of updatePayload) { - const wherePk = await baseModel._wherePk( - baseModel._extractPksValues(updateData), - ); + updatePayload.push(updateData); + } - if (!wherePk) { - this.log(`where pk not found for ${updateData}`); - continue; - } + if (updatePayload.length > 0) { + for (const updateData of updatePayload) { + const wherePk = await baseModel._wherePk( + baseModel._extractPksValues(updateData), + ); - await baseModel.execAndParse( - baseModel - .dbDriver(baseModel.tnPath) - .update(updateData) - .where(wherePk), - null, - { - raw: true, - }, - ); + if (!wherePk) { + this.log(`where pk not found for ${updateData}`); + continue; } - } - // update offset - await ncMeta - .knexConnection(temp_processed_models_table) - .where('fk_model_id', fk_model_id) - .update({ offset: dataOffset }); + await baseModel.execAndParse( + baseModel + .dbDriver(baseModel.tnPath) + .update(updateData) + .where(wherePk), + null, + { + raw: true, + }, + ); + } } - // mark model as processed + // update offset await ncMeta .knexConnection(temp_processed_models_table) .where('fk_model_id', fk_model_id) - .update({ completed: true }); + .update({ offset: dataOffset }); + } - processedModelsCount += 1; + // mark model as processed + await ncMeta + .knexConnection(temp_processed_models_table) + .where('fk_model_id', fk_model_id) + .update({ completed: true }); + + processedModelsCount += 1; + }; + + const selectFields = [ + ...(Noco.isEE() ? ['fk_workspace_id'] : []), + 'base_id', + 'source_id', + 'fk_model_id', + ]; + + const numberOfModelsToBeProcessed = ( + await ncMeta.knexConnection + .from( + ncMeta + .knexConnection(MetaTable.COLUMNS) + .where('uidt', UITypes.Attachment) + .whereNotIn( + 'fk_model_id', + ncMeta + .knexConnection(temp_processed_models_table) + .select('fk_model_id') + .where('completed', true), + ) + .groupBy(selectFields) + .count('*', { as: 'count' }) + .as('t'), + ) + .sum('count as count') + .first() + )?.count; + + const processModelLimit = 100; + + // get models that have at least one attachment column, and not processed - this.log( - `Processed ${processedModelsCount} of ${modelsWithAttachmentColumns.length} models`, - ); + // eslint-disable-next-line no-constant-condition + while (true) { + // this will return until all models marked as processed + const models = await ncMeta + .knexConnection(MetaTable.COLUMNS) + .select(selectFields) + .where('uidt', UITypes.Attachment) + .whereNotIn( + 'fk_model_id', + ncMeta + .knexConnection(temp_processed_models_table) + .select('fk_model_id') + .where('completed', true), + ) + .groupBy(selectFields) + .limit(processModelLimit); + + if (!models?.length) { + break; + } + + for (const model of models) { + try { + await processModel(model); + this.log( + `Processed ${processedModelsCount} of ${numberOfModelsToBeProcessed} models`, + ); + } catch (e) { + this.log(`Error processing model ${model.fk_model_id}`); + this.log(e); + } } } + + this.log( + `Processed total of ${numberOfModelsToBeProcessed} models with attachments`, + ); } catch (e) { this.log(`There was an error while processing attachment migration job`); this.log(e);