mirror of https://github.com/nocodb/nocodb
mertmit
4 months ago
16 changed files with 674 additions and 11 deletions
@ -0,0 +1,115 @@
|
||||
import Noco from '~/Noco'; |
||||
import { MetaTable, RootScopes } from '~/utils/globals'; |
||||
|
||||
export const MIGRATION_JOBS_STORE_KEY = 'NC_MIGRATION_JOBS'; |
||||
|
||||
const initState = { |
||||
version: '0', |
||||
stall_check: Date.now(), |
||||
locked: false, |
||||
}; |
||||
|
||||
export const getMigrationJobsState = async (): Promise<{ |
||||
version: string; |
||||
stall_check: number; |
||||
locked: boolean; |
||||
}> => { |
||||
const ncMeta = Noco.ncMeta; |
||||
|
||||
const qb = await ncMeta.metaGet( |
||||
RootScopes.ROOT, |
||||
RootScopes.ROOT, |
||||
MetaTable.STORE, |
||||
{ |
||||
key: MIGRATION_JOBS_STORE_KEY, |
||||
}, |
||||
); |
||||
|
||||
if (!qb) { |
||||
await ncMeta.metaInsert2( |
||||
RootScopes.ROOT, |
||||
RootScopes.ROOT, |
||||
MetaTable.STORE, |
||||
{ |
||||
key: MIGRATION_JOBS_STORE_KEY, |
||||
value: JSON.stringify(initState), |
||||
}, |
||||
true, |
||||
); |
||||
|
||||
return initState; |
||||
} |
||||
|
||||
try { |
||||
const migrationJobsState = JSON.parse(qb?.value || '{}'); |
||||
|
||||
if ('version' in migrationJobsState) { |
||||
return migrationJobsState; |
||||
} |
||||
|
||||
return initState; |
||||
} catch (e) { |
||||
console.error('Error parsing migration jobs state', e); |
||||
return initState; |
||||
} |
||||
}; |
||||
|
||||
export const updateMigrationJobsState = async ( |
||||
state: Partial<{ |
||||
version: string; |
||||
stall_check: number; |
||||
locked: boolean; |
||||
}>, |
||||
) => { |
||||
const ncMeta = Noco.ncMeta; |
||||
|
||||
const migrationJobsState = await getMigrationJobsState(); |
||||
|
||||
if (!migrationJobsState) { |
||||
const updatedState = { |
||||
...initState, |
||||
...state, |
||||
}; |
||||
|
||||
await ncMeta.metaInsert2( |
||||
RootScopes.ROOT, |
||||
RootScopes.ROOT, |
||||
MetaTable.STORE, |
||||
{ |
||||
key: MIGRATION_JOBS_STORE_KEY, |
||||
value: JSON.stringify(updatedState), |
||||
}, |
||||
true, |
||||
); |
||||
} else { |
||||
const updatedState = { |
||||
...migrationJobsState, |
||||
...state, |
||||
}; |
||||
|
||||
await ncMeta.metaUpdate( |
||||
RootScopes.ROOT, |
||||
RootScopes.ROOT, |
||||
MetaTable.STORE, |
||||
{ |
||||
value: JSON.stringify(updatedState), |
||||
}, |
||||
{ |
||||
key: MIGRATION_JOBS_STORE_KEY, |
||||
}, |
||||
); |
||||
} |
||||
}; |
||||
|
||||
export const setMigrationJobsStallInterval = () => { |
||||
// update stall check every 5 mins
|
||||
const interval = setInterval(async () => { |
||||
const migrationJobsState = await getMigrationJobsState(); |
||||
|
||||
migrationJobsState.stall_check = Date.now(); |
||||
|
||||
await updateMigrationJobsState(migrationJobsState); |
||||
}, 5 * 60 * 1000); |
||||
|
||||
return interval; |
||||
}; |
@ -0,0 +1,73 @@
|
||||
import debug from 'debug'; |
||||
import { Process, Processor } from '@nestjs/bull'; |
||||
import { Job } from 'bull'; |
||||
import { forwardRef, Inject } from '@nestjs/common'; |
||||
import { JOBS_QUEUE, MigrationJobTypes } from '~/interface/Jobs'; |
||||
import { IJobsService } from '~/modules/jobs/jobs-service.interface'; |
||||
import { |
||||
getMigrationJobsState, |
||||
updateMigrationJobsState, |
||||
} from '~/helpers/migrationJobs'; |
||||
|
||||
const migrationJobsList = [{ version: '1', job: MigrationJobTypes.Attachment }]; |
||||
|
||||
@Processor(JOBS_QUEUE) |
||||
export class InitMigrationJobs { |
||||
private readonly debugLog = debug('nc:migration-jobs:init'); |
||||
|
||||
constructor( |
||||
@Inject(forwardRef(() => 'JobsService')) |
||||
private readonly jobsService: IJobsService, |
||||
) {} |
||||
|
||||
log = (...msgs: string[]) => { |
||||
console.log('[init-migration-jobs]: ', ...msgs); |
||||
}; |
||||
|
||||
@Process(MigrationJobTypes.InitMigrationJobs) |
||||
async job(job: Job) { |
||||
this.debugLog(`job started for ${job.id}`); |
||||
|
||||
const migrationJobsState = await getMigrationJobsState(); |
||||
|
||||
// check for stall (no update for 10 mins)
|
||||
if (migrationJobsState.locked) { |
||||
if (Date.now() - migrationJobsState.stall_check > 10 * 60 * 1000) { |
||||
migrationJobsState.locked = false; |
||||
migrationJobsState.stall_check = Date.now(); |
||||
|
||||
await updateMigrationJobsState(migrationJobsState); |
||||
} |
||||
} |
||||
|
||||
// check for lock
|
||||
if (migrationJobsState.locked) { |
||||
// migration job is running, make sure it's not stalled by checking after 10 mins
|
||||
// stall check is updated every 5 mins
|
||||
setTimeout(() => { |
||||
this.jobsService.add(MigrationJobTypes.InitMigrationJobs, {}); |
||||
}, 10 * 60 * 1000); |
||||
return; |
||||
} |
||||
|
||||
// get migrations need to be applied
|
||||
const migrations = migrationJobsList.filter( |
||||
(m) => +m.version > +migrationJobsState.version, |
||||
); |
||||
|
||||
if (!migrations.length) { |
||||
return; |
||||
} |
||||
|
||||
// lock the migration job
|
||||
migrationJobsState.locked = true; |
||||
migrationJobsState.stall_check = Date.now(); |
||||
|
||||
await updateMigrationJobsState(migrationJobsState); |
||||
|
||||
// run first migration job
|
||||
await this.jobsService.add(migrations[0].job, {}); |
||||
|
||||
this.debugLog(`job completed for ${job.id}`); |
||||
} |
||||
} |
@ -0,0 +1,398 @@
|
||||
import debug from 'debug'; |
||||
import { Process, Processor } from '@nestjs/bull'; |
||||
import { Job } from 'bull'; |
||||
import { UITypes } from 'nocodb-sdk'; |
||||
import { forwardRef, Inject } from '@nestjs/common'; |
||||
import { Source } from '~/models'; |
||||
import { JOBS_QUEUE, MigrationJobTypes } from '~/interface/Jobs'; |
||||
import NcPluginMgrv2 from '~/helpers/NcPluginMgrv2'; |
||||
import Noco from '~/Noco'; |
||||
import { MetaTable } from '~/utils/globals'; |
||||
import NcConnectionMgrv2 from '~/utils/common/NcConnectionMgrv2'; |
||||
import { FileReference, Model } from '~/models'; |
||||
import { IJobsService } from '~/modules/jobs/jobs-service.interface'; |
||||
import { extractProps } from '~/helpers/extractProps'; |
||||
import { |
||||
setMigrationJobsStallInterval, |
||||
updateMigrationJobsState, |
||||
} from '~/helpers/migrationJobs'; |
||||
|
||||
const MIGRATION_JOB_VERSION = '1'; |
||||
|
||||
@Processor(JOBS_QUEUE) |
||||
export class AttachmentMigrationProcessor { |
||||
private readonly debugLog = debug('nc:migration-jobs:attachment'); |
||||
|
||||
constructor( |
||||
@Inject(forwardRef(() => 'JobsService')) |
||||
private readonly jobsService: IJobsService, |
||||
) {} |
||||
|
||||
log = (...msgs: string[]) => { |
||||
console.log('[nc_job_001_attachment]: ', ...msgs); |
||||
}; |
||||
|
||||
@Process(MigrationJobTypes.Attachment) |
||||
async job(job: Job) { |
||||
this.debugLog(`job started for ${job.id}`); |
||||
|
||||
const interval = setMigrationJobsStallInterval(); |
||||
|
||||
try { |
||||
const ncMeta = Noco.ncMeta; |
||||
|
||||
const temp_file_references_table = 'nc_temp_file_references'; |
||||
const temp_processed_models_table = 'nc_temp_processed_models'; |
||||
|
||||
const fileReferencesTableExists = |
||||
await ncMeta.knexConnection.schema.hasTable(temp_file_references_table); |
||||
|
||||
const processedModelsTableExists = |
||||
await ncMeta.knexConnection.schema.hasTable( |
||||
temp_processed_models_table, |
||||
); |
||||
|
||||
if (!fileReferencesTableExists) { |
||||
// create temp file references table if not exists
|
||||
await ncMeta.knexConnection.schema.createTable( |
||||
temp_file_references_table, |
||||
(table) => { |
||||
table.increments('id').primary(); |
||||
table.string('file_path').notNullable(); |
||||
table.boolean('referenced').defaultTo(false); |
||||
table.boolean('thumbnail_generated').defaultTo(false); |
||||
|
||||
table.index('file_path'); |
||||
}, |
||||
); |
||||
} |
||||
|
||||
if (!processedModelsTableExists) { |
||||
// create temp processed models table if not exists
|
||||
await ncMeta.knexConnection.schema.createTable( |
||||
temp_processed_models_table, |
||||
(table) => { |
||||
table.increments('id').primary(); |
||||
table.string('fk_model_id').notNullable(); |
||||
|
||||
table.index('fk_model_id'); |
||||
}, |
||||
); |
||||
} |
||||
|
||||
// get all file references
|
||||
const storageAdapter = await NcPluginMgrv2.storageAdapter(ncMeta); |
||||
|
||||
const fileScanStream = await storageAdapter.scanFiles('nc/uploads/**'); |
||||
|
||||
const fileReferenceBuffer = []; |
||||
|
||||
fileScanStream.on('data', async (file) => { |
||||
fileReferenceBuffer.push({ file_path: file }); |
||||
|
||||
if (fileReferenceBuffer.length >= 100) { |
||||
fileScanStream.pause(); |
||||
|
||||
const processBuffer = fileReferenceBuffer.splice(0); |
||||
|
||||
// skip or insert file references
|
||||
const toSkip = await ncMeta |
||||
.knexConnection(temp_file_references_table) |
||||
.whereIn( |
||||
'file_path', |
||||
fileReferenceBuffer.map((f) => f.file_path), |
||||
); |
||||
|
||||
const toSkipPaths = toSkip.map((f) => f.file_path); |
||||
|
||||
const toInsert = processBuffer.filter( |
||||
(f) => !toSkipPaths.includes(f.file_path), |
||||
); |
||||
|
||||
if (toInsert.length > 0) { |
||||
await ncMeta |
||||
.knexConnection(temp_file_references_table) |
||||
.insert(toInsert); |
||||
} |
||||
|
||||
fileScanStream.resume(); |
||||
} |
||||
}); |
||||
|
||||
await new Promise((resolve, reject) => { |
||||
fileScanStream.on('end', resolve); |
||||
fileScanStream.on('error', reject); |
||||
}); |
||||
|
||||
if (fileReferenceBuffer.length > 0) { |
||||
await ncMeta |
||||
.knexConnection(temp_file_references_table) |
||||
.insert(fileReferenceBuffer); |
||||
} |
||||
|
||||
// eslint-disable-next-line no-constant-condition
|
||||
while (true) { |
||||
const modelLimit = 100; |
||||
|
||||
let modelOffset = 0; |
||||
|
||||
const modelsWithAttachmentColumns = []; |
||||
|
||||
// get models that have at least one attachment column, and not processed
|
||||
|
||||
// eslint-disable-next-line no-constant-condition
|
||||
while (true) { |
||||
const models = await ncMeta |
||||
.knexConnection(MetaTable.COLUMNS) |
||||
.select('fk_workspace_id', 'base_id', 'source_id', 'fk_model_id') |
||||
.where('uidt', UITypes.Attachment) |
||||
.whereNotIn( |
||||
'fk_model_id', |
||||
ncMeta |
||||
.knexConnection(temp_processed_models_table) |
||||
.select('fk_model_id'), |
||||
) |
||||
.groupBy('fk_workspace_id', 'base_id', 'source_id', 'fk_model_id') |
||||
.limit(modelLimit) |
||||
.offset(modelOffset); |
||||
|
||||
modelOffset += modelLimit; |
||||
|
||||
if (!models?.length) { |
||||
break; |
||||
} |
||||
|
||||
modelsWithAttachmentColumns.push(...models); |
||||
} |
||||
|
||||
if (!modelsWithAttachmentColumns?.length) { |
||||
break; |
||||
} |
||||
|
||||
for (const modelData of modelsWithAttachmentColumns) { |
||||
const { fk_workspace_id, base_id, source_id, fk_model_id } = |
||||
modelData; |
||||
|
||||
const context = { |
||||
workspace_id: fk_workspace_id, |
||||
base_id, |
||||
}; |
||||
|
||||
const attachmentColumns = await ncMeta |
||||
.knexConnection(MetaTable.COLUMNS) |
||||
.select('id', 'title', 'column_name') |
||||
.where('uidt', UITypes.Attachment) |
||||
.where('fk_model_id', fk_model_id); |
||||
|
||||
if (!attachmentColumns?.length) { |
||||
this.log(`no attachment columns found for ${fk_model_id}`); |
||||
continue; |
||||
} |
||||
|
||||
const source = await Source.get(context, source_id); |
||||
|
||||
if (!source) { |
||||
this.log(`source not found for ${source_id}`); |
||||
continue; |
||||
} |
||||
|
||||
const model = await Model.get(context, fk_model_id); |
||||
|
||||
if (!model) { |
||||
this.log(`model not found for ${fk_model_id}`); |
||||
continue; |
||||
} |
||||
|
||||
await model.getColumns(context); |
||||
|
||||
const dbDriver = await NcConnectionMgrv2.get(source); |
||||
|
||||
if (!dbDriver) { |
||||
this.log(`connection can't achieved for ${source_id}`); |
||||
continue; |
||||
} |
||||
|
||||
const baseModel = await Model.getBaseModelSQL(context, { |
||||
model, |
||||
dbDriver, |
||||
}); |
||||
|
||||
const dataLimit = 10; |
||||
let dataOffset = 0; |
||||
|
||||
// eslint-disable-next-line no-constant-condition
|
||||
while (true) { |
||||
const data = await baseModel.list( |
||||
{ |
||||
fieldsSet: new Set( |
||||
model.primaryKeys |
||||
.map((c) => c.title) |
||||
.concat(attachmentColumns.map((c) => c.title)), |
||||
), |
||||
sort: model.primaryKeys.map((c) => c.title), |
||||
limit: dataLimit, |
||||
offset: dataOffset, |
||||
}, |
||||
{ |
||||
ignoreViewFilterAndSort: true, |
||||
}, |
||||
); |
||||
|
||||
dataOffset += dataLimit; |
||||
|
||||
if (!data?.length) { |
||||
break; |
||||
} |
||||
|
||||
const updatePayload = []; |
||||
|
||||
for (const row of data) { |
||||
const updateData = {}; |
||||
|
||||
let updateRequired = false; |
||||
|
||||
for (const column of attachmentColumns) { |
||||
let attachmentArr = row[column.title]; |
||||
|
||||
if (!attachmentArr?.length) { |
||||
continue; |
||||
} |
||||
|
||||
try { |
||||
if (typeof attachmentArr === 'string') { |
||||
attachmentArr = JSON.parse(attachmentArr); |
||||
} |
||||
} catch (e) { |
||||
this.log(`error parsing attachment data ${attachmentArr}`); |
||||
continue; |
||||
} |
||||
|
||||
if (Array.isArray(attachmentArr)) { |
||||
attachmentArr = attachmentArr.map((a) => |
||||
extractProps(a, [ |
||||
'id', |
||||
'url', |
||||
'path', |
||||
'title', |
||||
'mimetype', |
||||
'size', |
||||
'icon', |
||||
'width', |
||||
'height', |
||||
]), |
||||
); |
||||
|
||||
for (const attachment of attachmentArr) { |
||||
if ('path' in attachment || 'url' in attachment) { |
||||
const path = `nc/uploads/${ |
||||
attachment.path.replace(/^download\//, '') || |
||||
decodeURI( |
||||
`${new URL(attachment.url).pathname.replace( |
||||
/.*?nc\/uploads\//, |
||||
'', |
||||
)}`,
|
||||
) |
||||
}`;
|
||||
|
||||
const isReferenced = await ncMeta |
||||
.knexConnection(temp_file_references_table) |
||||
.where('file_path', path) |
||||
.first(); |
||||
|
||||
if (!isReferenced) { |
||||
// file is from another storage adapter
|
||||
this.log( |
||||
`file not found in file references table ${path}`, |
||||
); |
||||
continue; |
||||
} else if (isReferenced.referenced === false) { |
||||
await ncMeta |
||||
.knexConnection(temp_file_references_table) |
||||
.where('file_path', path) |
||||
.update({ |
||||
referenced: true, |
||||
}); |
||||
} |
||||
|
||||
if (!('id' in attachment)) { |
||||
attachment.id = await FileReference.insert(context, { |
||||
fk_model_id, |
||||
fk_column_id: column.id, |
||||
file_url: attachment.path || attachment.url, |
||||
file_size: attachment.size, |
||||
}); |
||||
updateRequired = true; |
||||
} |
||||
} |
||||
} |
||||
} |
||||
|
||||
if (updateRequired) { |
||||
updateData[column.column_name] = |
||||
JSON.stringify(attachmentArr); |
||||
} |
||||
} |
||||
|
||||
if (Object.keys(updateData).length === 0) { |
||||
continue; |
||||
} |
||||
|
||||
for (const pk of model.primaryKeys) { |
||||
updateData[pk.column_name] = row[pk.title]; |
||||
} |
||||
|
||||
updatePayload.push(updateData); |
||||
} |
||||
|
||||
if (updatePayload.length > 0) { |
||||
for (const updateData of updatePayload) { |
||||
const wherePk = await baseModel._wherePk( |
||||
baseModel._extractPksValues(updateData), |
||||
); |
||||
|
||||
if (!wherePk) { |
||||
this.log(`where pk not found for ${updateData}`); |
||||
continue; |
||||
} |
||||
|
||||
await baseModel.execAndParse( |
||||
baseModel |
||||
.dbDriver(baseModel.tnPath) |
||||
.update(updateData) |
||||
.where(wherePk), |
||||
null, |
||||
{ |
||||
raw: true, |
||||
}, |
||||
); |
||||
} |
||||
} |
||||
} |
||||
|
||||
await ncMeta |
||||
.knexConnection(temp_processed_models_table) |
||||
.insert({ fk_model_id }); |
||||
} |
||||
} |
||||
|
||||
// bump the version
|
||||
await updateMigrationJobsState({ |
||||
version: MIGRATION_JOB_VERSION, |
||||
}); |
||||
} catch (e) { |
||||
this.log(`error processing attachment migration job:`, e); |
||||
} |
||||
|
||||
clearInterval(interval); |
||||
|
||||
await updateMigrationJobsState({ |
||||
locked: false, |
||||
stall_check: Date.now(), |
||||
}); |
||||
|
||||
// call init migration job again
|
||||
await this.jobsService.add(MigrationJobTypes.InitMigrationJobs, {}); |
||||
|
||||
this.debugLog(`job completed for ${job.id}`); |
||||
} |
||||
} |
Loading…
Reference in new issue