Browse Source

feat: improved migration logic

nc-feat/attachment-clean-up
mertmit 4 months ago
parent
commit
e6354ad6a7
  1. 529
      packages/nocodb/src/modules/jobs/migration-jobs/nc_job_001_attachment.ts

529
packages/nocodb/src/modules/jobs/migration-jobs/nc_job_001_attachment.ts

@ -152,326 +152,335 @@ export class AttachmentMigration {
.insert(fileReferenceBuffer);
}
// eslint-disable-next-line no-constant-condition
while (true) {
const modelLimit = 100;
let processedModelsCount = 0;
let modelOffset = 0;
const processModel = async (modelData) => {
const { fk_workspace_id, base_id, source_id, fk_model_id } = modelData;
const modelsWithAttachmentColumns = [];
const context = {
workspace_id: fk_workspace_id,
base_id,
};
// get models that have at least one attachment column, and not processed
const source = await Source.get(context, source_id);
// eslint-disable-next-line no-constant-condition
while (true) {
const selectFields = [
...(Noco.isEE() ? ['fk_workspace_id'] : []),
'base_id',
'source_id',
'fk_model_id',
];
const models = await ncMeta
.knexConnection(MetaTable.COLUMNS)
.select(selectFields)
.where('uidt', UITypes.Attachment)
.whereNotIn(
'fk_model_id',
ncMeta
.knexConnection(temp_processed_models_table)
.select('fk_model_id')
.where('completed', true),
)
.groupBy(selectFields)
.limit(modelLimit)
.offset(modelOffset);
modelOffset += modelLimit;
if (!models?.length) {
break;
}
modelsWithAttachmentColumns.push(...models);
if (!source) {
this.log(`source not found for ${source_id}`);
return;
}
if (!modelsWithAttachmentColumns?.length) {
break;
}
const model = await Model.get(context, fk_model_id);
this.log(
`Found ${modelsWithAttachmentColumns.length} models with attachment columns`,
);
if (!model) {
this.log(`model not found for ${fk_model_id}`);
return;
}
let processedModelsCount = 0;
await model.getColumns(context);
for (const modelData of modelsWithAttachmentColumns) {
const { fk_workspace_id, base_id, source_id, fk_model_id } =
modelData;
const attachmentColumns = model.columns.filter(
(c) => c.uidt === UITypes.Attachment,
);
const context = {
workspace_id: fk_workspace_id,
base_id,
};
const dbDriver = await NcConnectionMgrv2.get(source);
const source = await Source.get(context, source_id);
if (!dbDriver) {
this.log(`connection can't achieved for ${source_id}`);
return;
}
if (!source) {
this.log(`source not found for ${source_id}`);
continue;
}
const baseModel = await Model.getBaseModelSQL(context, {
model,
dbDriver,
});
const model = await Model.get(context, fk_model_id);
const processedModel = await ncMeta
.knexConnection(temp_processed_models_table)
.where('fk_model_id', fk_model_id)
.first();
if (!model) {
this.log(`model not found for ${fk_model_id}`);
continue;
}
const dataLimit = 10;
let dataOffset = 0;
await model.getColumns(context);
if (!processedModel) {
await ncMeta
.knexConnection(temp_processed_models_table)
.insert({ fk_model_id, offset: 0 });
} else {
dataOffset = processedModel.offset;
}
const attachmentColumns = model.columns.filter(
(c) => c.uidt === UITypes.Attachment,
// eslint-disable-next-line no-constant-condition
while (true) {
const data = await baseModel.list(
{
fieldsSet: new Set(
model.primaryKeys
.map((c) => c.title)
.concat(attachmentColumns.map((c) => c.title)),
),
sort: model.primaryKeys.map((c) => c.title),
limit: dataLimit,
offset: dataOffset,
},
{
ignoreViewFilterAndSort: true,
},
);
const dbDriver = await NcConnectionMgrv2.get(source);
dataOffset += dataLimit;
if (!dbDriver) {
this.log(`connection can't achieved for ${source_id}`);
continue;
if (!data?.length) {
break;
}
const baseModel = await Model.getBaseModelSQL(context, {
model,
dbDriver,
});
const updatePayload = [];
const processedModel = await ncMeta
.knexConnection(temp_processed_models_table)
.where('fk_model_id', fk_model_id)
.first();
for (const row of data) {
const updateData = {};
const dataLimit = 10;
let dataOffset = 0;
let updateRequired = false;
if (!processedModel) {
await ncMeta
.knexConnection(temp_processed_models_table)
.insert({ fk_model_id, offset: 0 });
} else {
dataOffset = processedModel.offset;
}
// eslint-disable-next-line no-constant-condition
while (true) {
const data = await baseModel.list(
{
fieldsSet: new Set(
model.primaryKeys
.map((c) => c.title)
.concat(attachmentColumns.map((c) => c.title)),
),
sort: model.primaryKeys.map((c) => c.title),
limit: dataLimit,
offset: dataOffset,
},
{
ignoreViewFilterAndSort: true,
},
);
dataOffset += dataLimit;
if (!data?.length) {
break;
}
for (const column of attachmentColumns) {
let attachmentArr = row[column.title];
const updatePayload = [];
for (const row of data) {
const updateData = {};
let updateRequired = false;
for (const column of attachmentColumns) {
let attachmentArr = row[column.title];
if (!attachmentArr?.length) {
continue;
}
if (!attachmentArr?.length) {
continue;
try {
if (typeof attachmentArr === 'string') {
attachmentArr = JSON.parse(attachmentArr);
}
} catch (e) {
this.log(`error parsing attachment data ${attachmentArr}`);
continue;
}
try {
if (typeof attachmentArr === 'string') {
attachmentArr = JSON.parse(attachmentArr);
}
} catch (e) {
this.log(`error parsing attachment data ${attachmentArr}`);
continue;
}
if (Array.isArray(attachmentArr)) {
attachmentArr = attachmentArr.map((a) =>
extractProps(a, [
'id',
'url',
'path',
'title',
'mimetype',
'size',
'icon',
'width',
'height',
]),
);
if (Array.isArray(attachmentArr)) {
attachmentArr = attachmentArr.map((a) =>
extractProps(a, [
'id',
'url',
'path',
'title',
'mimetype',
'size',
'icon',
'width',
'height',
]),
);
for (const attachment of attachmentArr) {
try {
if ('path' in attachment || 'url' in attachment) {
const filePath = `nc/uploads/${
attachment.path?.replace(/^download\//, '') ||
this.normalizeUrl(attachment.url)
}`;
const isReferenced = await ncMeta
for (const attachment of attachmentArr) {
try {
if ('path' in attachment || 'url' in attachment) {
const filePath = `nc/uploads/${
attachment.path?.replace(/^download\//, '') ||
this.normalizeUrl(attachment.url)
}`;
const isReferenced = await ncMeta
.knexConnection(temp_file_references_table)
.where('file_path', filePath)
.first();
if (!isReferenced) {
// file is from another storage adapter
this.log(
`file not found in file references table ${
attachment.path || attachment.url
}, ${filePath}`,
);
} else if (isReferenced.referenced === false) {
const fileNameWithExt = path.basename(filePath);
const mimetype =
attachment.mimetype ||
mimetypes[path.extname(fileNameWithExt).slice(1)];
await ncMeta
.knexConnection(temp_file_references_table)
.where('file_path', filePath)
.update({
mimetype,
referenced: true,
});
// insert file reference if not exists
const fileReference = await ncMeta
.knexConnection(MetaTable.FILE_REFERENCES)
.where('file_url', attachment.path || attachment.url)
.andWhere('storage', storageAdapterType)
.first();
if (!isReferenced) {
// file is from another storage adapter
this.log(
`file not found in file references table ${
attachment.path || attachment.url
}, ${filePath}`,
if (!fileReference) {
await FileReference.insert(
{
workspace_id: RootScopes.ROOT,
base_id: RootScopes.ROOT,
},
{
storage: storageAdapterType,
file_url: attachment.path || attachment.url,
file_size: attachment.size,
deleted: true,
},
);
} else if (isReferenced.referenced === false) {
const fileNameWithExt = path.basename(filePath);
const mimetype =
attachment.mimetype ||
mimetypes[path.extname(fileNameWithExt).slice(1)];
await ncMeta
.knexConnection(temp_file_references_table)
.where('file_path', filePath)
.update({
mimetype,
referenced: true,
});
// insert file reference if not exists
const fileReference = await ncMeta
.knexConnection(MetaTable.FILE_REFERENCES)
.where(
'file_url',
attachment.path || attachment.url,
)
.andWhere('storage', storageAdapterType)
.first();
if (!fileReference) {
await FileReference.insert(
{
workspace_id: RootScopes.ROOT,
base_id: RootScopes.ROOT,
},
{
storage: storageAdapterType,
file_url: attachment.path || attachment.url,
file_size: attachment.size,
deleted: true,
},
);
}
}
}
if (!('id' in attachment)) {
attachment.id = await FileReference.insert(context, {
source_id: source.id,
fk_model_id,
fk_column_id: column.id,
file_url: attachment.path || attachment.url,
file_size: attachment.size,
is_external: !source.isMeta(),
deleted: false,
});
updateRequired = true;
}
if (!('id' in attachment)) {
attachment.id = await FileReference.insert(context, {
source_id: source.id,
fk_model_id,
fk_column_id: column.id,
file_url: attachment.path || attachment.url,
file_size: attachment.size,
is_external: !source.isMeta(),
deleted: false,
});
updateRequired = true;
}
} catch (e) {
this.log(
`Error processing attachment ${JSON.stringify(
attachment,
)}`,
);
this.log(e);
throw e;
}
} catch (e) {
this.log(
`Error processing attachment ${JSON.stringify(
attachment,
)}`,
);
this.log(e);
throw e;
}
}
if (updateRequired) {
updateData[column.column_name] =
JSON.stringify(attachmentArr);
}
}
if (Object.keys(updateData).length === 0) {
continue;
if (updateRequired) {
updateData[column.column_name] = JSON.stringify(attachmentArr);
}
}
for (const pk of model.primaryKeys) {
updateData[pk.column_name] = row[pk.title];
}
if (Object.keys(updateData).length === 0) {
continue;
}
updatePayload.push(updateData);
for (const pk of model.primaryKeys) {
updateData[pk.column_name] = row[pk.title];
}
if (updatePayload.length > 0) {
for (const updateData of updatePayload) {
const wherePk = await baseModel._wherePk(
baseModel._extractPksValues(updateData),
);
updatePayload.push(updateData);
}
if (!wherePk) {
this.log(`where pk not found for ${updateData}`);
continue;
}
if (updatePayload.length > 0) {
for (const updateData of updatePayload) {
const wherePk = await baseModel._wherePk(
baseModel._extractPksValues(updateData),
);
await baseModel.execAndParse(
baseModel
.dbDriver(baseModel.tnPath)
.update(updateData)
.where(wherePk),
null,
{
raw: true,
},
);
if (!wherePk) {
this.log(`where pk not found for ${updateData}`);
continue;
}
}
// update offset
await ncMeta
.knexConnection(temp_processed_models_table)
.where('fk_model_id', fk_model_id)
.update({ offset: dataOffset });
await baseModel.execAndParse(
baseModel
.dbDriver(baseModel.tnPath)
.update(updateData)
.where(wherePk),
null,
{
raw: true,
},
);
}
}
// mark model as processed
// update offset
await ncMeta
.knexConnection(temp_processed_models_table)
.where('fk_model_id', fk_model_id)
.update({ completed: true });
.update({ offset: dataOffset });
}
processedModelsCount += 1;
// mark model as processed
await ncMeta
.knexConnection(temp_processed_models_table)
.where('fk_model_id', fk_model_id)
.update({ completed: true });
processedModelsCount += 1;
};
const selectFields = [
...(Noco.isEE() ? ['fk_workspace_id'] : []),
'base_id',
'source_id',
'fk_model_id',
];
const numberOfModelsToBeProcessed = (
await ncMeta.knexConnection
.from(
ncMeta
.knexConnection(MetaTable.COLUMNS)
.where('uidt', UITypes.Attachment)
.whereNotIn(
'fk_model_id',
ncMeta
.knexConnection(temp_processed_models_table)
.select('fk_model_id')
.where('completed', true),
)
.groupBy(selectFields)
.count('*', { as: 'count' })
.as('t'),
)
.sum('count as count')
.first()
)?.count;
const processModelLimit = 100;
// get models that have at least one attachment column, and not processed
this.log(
`Processed ${processedModelsCount} of ${modelsWithAttachmentColumns.length} models`,
);
// eslint-disable-next-line no-constant-condition
while (true) {
// this will return until all models marked as processed
const models = await ncMeta
.knexConnection(MetaTable.COLUMNS)
.select(selectFields)
.where('uidt', UITypes.Attachment)
.whereNotIn(
'fk_model_id',
ncMeta
.knexConnection(temp_processed_models_table)
.select('fk_model_id')
.where('completed', true),
)
.groupBy(selectFields)
.limit(processModelLimit);
if (!models?.length) {
break;
}
for (const model of models) {
try {
await processModel(model);
this.log(
`Processed ${processedModelsCount} of ${numberOfModelsToBeProcessed} models`,
);
} catch (e) {
this.log(`Error processing model ${model.fk_model_id}`);
this.log(e);
}
}
}
this.log(
`Processed total of ${numberOfModelsToBeProcessed} models with attachments`,
);
} catch (e) {
this.log(`There was an error while processing attachment migration job`);
this.log(e);

Loading…
Cancel
Save