|
|
|
@ -96,32 +96,44 @@ export class AttachmentMigrationProcessor {
|
|
|
|
|
|
|
|
|
|
const fileReferenceBuffer = []; |
|
|
|
|
|
|
|
|
|
let filesCount = 0; |
|
|
|
|
|
|
|
|
|
fileScanStream.on('data', async (file) => { |
|
|
|
|
fileReferenceBuffer.push({ file_path: file }); |
|
|
|
|
|
|
|
|
|
if (fileReferenceBuffer.length >= 100) { |
|
|
|
|
fileScanStream.pause(); |
|
|
|
|
|
|
|
|
|
const processBuffer = fileReferenceBuffer.splice(0); |
|
|
|
|
try { |
|
|
|
|
const processBuffer = fileReferenceBuffer.splice(0); |
|
|
|
|
|
|
|
|
|
// skip or insert file references
|
|
|
|
|
const toSkip = await ncMeta |
|
|
|
|
.knexConnection(temp_file_references_table) |
|
|
|
|
.whereIn( |
|
|
|
|
'file_path', |
|
|
|
|
fileReferenceBuffer.map((f) => f.file_path), |
|
|
|
|
); |
|
|
|
|
filesCount += processBuffer.length; |
|
|
|
|
|
|
|
|
|
const toSkipPaths = toSkip.map((f) => f.file_path); |
|
|
|
|
// skip or insert file references
|
|
|
|
|
const toSkip = await ncMeta |
|
|
|
|
.knexConnection(temp_file_references_table) |
|
|
|
|
.whereIn( |
|
|
|
|
'file_path', |
|
|
|
|
fileReferenceBuffer.map((f) => f.file_path), |
|
|
|
|
); |
|
|
|
|
|
|
|
|
|
const toInsert = processBuffer.filter( |
|
|
|
|
(f) => !toSkipPaths.includes(f.file_path), |
|
|
|
|
); |
|
|
|
|
const toSkipPaths = toSkip.map((f) => f.file_path); |
|
|
|
|
|
|
|
|
|
if (toInsert.length > 0) { |
|
|
|
|
await ncMeta |
|
|
|
|
.knexConnection(temp_file_references_table) |
|
|
|
|
.insert(toInsert); |
|
|
|
|
const toInsert = processBuffer.filter( |
|
|
|
|
(f) => !toSkipPaths.includes(f.file_path), |
|
|
|
|
); |
|
|
|
|
|
|
|
|
|
if (toInsert.length > 0) { |
|
|
|
|
await ncMeta |
|
|
|
|
.knexConnection(temp_file_references_table) |
|
|
|
|
.insert(toInsert); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
this.log(`Scanned ${filesCount} files`); |
|
|
|
|
} catch (e) { |
|
|
|
|
this.log(`There was an error while scanning files`); |
|
|
|
|
this.log(e); |
|
|
|
|
err = e; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
fileScanStream.resume(); |
|
|
|
@ -131,10 +143,16 @@ export class AttachmentMigrationProcessor {
|
|
|
|
|
await new Promise((resolve, reject) => { |
|
|
|
|
fileScanStream.on('end', resolve); |
|
|
|
|
fileScanStream.on('error', reject); |
|
|
|
|
}).catch((e) => { |
|
|
|
|
this.log(`error scanning files:`, e); |
|
|
|
|
err = e; |
|
|
|
|
}); |
|
|
|
|
}) |
|
|
|
|
.then(() => { |
|
|
|
|
filesCount += fileReferenceBuffer.length; |
|
|
|
|
this.log(`Completed scanning with ${filesCount} files`); |
|
|
|
|
}) |
|
|
|
|
.catch((e) => { |
|
|
|
|
this.log(`There was an error while scanning files`); |
|
|
|
|
this.log(e); |
|
|
|
|
err = e; |
|
|
|
|
}); |
|
|
|
|
|
|
|
|
|
if (err) { |
|
|
|
|
throw err; |
|
|
|
@ -193,6 +211,12 @@ export class AttachmentMigrationProcessor {
|
|
|
|
|
break; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
this.log( |
|
|
|
|
`Found ${modelsWithAttachmentColumns.length} models with attachment columns`, |
|
|
|
|
); |
|
|
|
|
|
|
|
|
|
let processedModelsCount = 0; |
|
|
|
|
|
|
|
|
|
for (const modelData of modelsWithAttachmentColumns) { |
|
|
|
|
const { fk_workspace_id, base_id, source_id, fk_model_id } = |
|
|
|
|
modelData; |
|
|
|
@ -445,6 +469,12 @@ export class AttachmentMigrationProcessor {
|
|
|
|
|
.knexConnection(temp_processed_models_table) |
|
|
|
|
.where('fk_model_id', fk_model_id) |
|
|
|
|
.update({ completed: true }); |
|
|
|
|
|
|
|
|
|
processedModelsCount += 1; |
|
|
|
|
|
|
|
|
|
this.log( |
|
|
|
|
`Processed ${processedModelsCount} of ${modelsWithAttachmentColumns.length} models`, |
|
|
|
|
); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|