Browse Source

feat: S3 support

nc-feat/attachment-clean-up
mertmit 4 months ago
parent
commit
dcd0e4d3b0
  1. 17
      packages/nocodb/src/modules/jobs/migration-jobs/nc_job_001_attachment.ts
  2. 14
      packages/nocodb/src/modules/jobs/migration-jobs/nc_job_002_thumbnail.ts
  3. 52
      packages/nocodb/src/plugins/GenericS3/GenericS3.ts

17
packages/nocodb/src/modules/jobs/migration-jobs/nc_job_001_attachment.ts

@ -40,6 +40,8 @@ export class AttachmentMigrationProcessor {
const interval = setMigrationJobsStallInterval();
let err = null;
try {
const ncMeta = Noco.ncMeta;
@ -129,8 +131,15 @@ export class AttachmentMigrationProcessor {
await new Promise((resolve, reject) => {
fileScanStream.on('end', resolve);
fileScanStream.on('error', reject);
}).catch((e) => {
this.log(`error scanning files:`, e);
err = e;
});
if (err) {
throw err;
}
if (fileReferenceBuffer.length > 0) {
await ncMeta
.knexConnection(temp_file_references_table)
@ -451,7 +460,9 @@ export class AttachmentMigrationProcessor {
version: MIGRATION_JOB_VERSION,
});
} catch (e) {
this.log(`error processing attachment migration job:`, e);
this.log(`There was an error while processing attachment migration job`);
this.log(e);
err = e;
}
clearInterval(interval);
@ -461,8 +472,10 @@ export class AttachmentMigrationProcessor {
stall_check: Date.now(),
});
// call init migration job again
// call init migration job again if there is no error
if (!err) {
await this.jobsService.add(MigrationJobTypes.InitMigrationJobs, {});
}
this.debugLog(`job completed for ${job.id}`);
}

14
packages/nocodb/src/modules/jobs/migration-jobs/nc_job_002_thumbnail.ts

@ -37,6 +37,8 @@ export class ThumbnailMigrationProcessor {
const interval = setMigrationJobsStallInterval();
let err = null;
try {
const ncMeta = Noco.ncMeta;
@ -110,8 +112,15 @@ export class ThumbnailMigrationProcessor {
await new Promise((resolve, reject) => {
fileScanStream.on('end', resolve);
fileScanStream.on('error', reject);
}).catch((e) => {
this.log(`error scanning files:`, e);
err = e;
});
if (err) {
throw err;
}
if (fileReferenceBuffer.length > 0) {
await ncMeta
.knexConnection(temp_file_references_table)
@ -183,7 +192,10 @@ export class ThumbnailMigrationProcessor {
version: MIGRATION_JOB_VERSION,
});
} catch (e) {
this.log(`error processing attachment migration job:`, e);
this.log(
`There was an error while generating thumbnails for old attachments`,
);
this.log(e);
}
clearInterval(interval);

52
packages/nocodb/src/plugins/GenericS3/GenericS3.ts

@ -1,5 +1,6 @@
import fs from 'fs';
import { promisify } from 'util';
import { Readable } from 'stream';
import axios from 'axios';
import { useAgent } from 'request-filtering-agent';
import {
@ -10,7 +11,6 @@ import { getSignedUrl } from '@aws-sdk/s3-request-presigner';
import { Upload } from '@aws-sdk/lib-storage';
import type { PutObjectRequest, S3 as S3Client } from '@aws-sdk/client-s3';
import type { IStorageAdapterV2, XcFile } from 'nc-plugin';
import type { Readable } from 'stream';
import { generateTempFilePath, waitForStreamClose } from '~/utils/pluginUtils';
interface GenerocObjectStorageInput {
@ -200,7 +200,53 @@ export default class GenericS3 implements IStorageAdapterV2 {
return Promise.resolve(undefined);
}
public async scanFiles(_globPattern: string): Promise<Readable> {
return Promise.resolve(undefined);
public async scanFiles(globPattern: string): Promise<Readable> {
// remove all dots from the glob pattern
globPattern = globPattern.replace(/\./g, '');
// remove the leading slash
globPattern = globPattern.replace(/^\//, '');
// make sure pattern starts with nc/uploads/
if (!globPattern.startsWith('nc/uploads/')) {
globPattern = `nc/uploads/${globPattern}`;
}
// S3 does not support glob so remove *
globPattern = globPattern.replace(/\*/g, '');
const stream = new Readable({
read() {},
});
stream.setEncoding('utf8');
const listObjects = async (continuationToken?: string) => {
this.s3Client
.listObjectsV2({
Bucket: this.input.bucket,
Prefix: globPattern,
...(continuationToken
? { ContinuationToken: continuationToken }
: {}),
})
.then((response) => {
response.Contents.forEach((content) => {
stream.push(content.Key);
});
if (response.IsTruncated) {
listObjects(response.NextContinuationToken);
} else {
stream.push(null);
}
});
};
listObjects().catch((error) => {
stream.emit('error', error);
});
return stream;
}
}

Loading…
Cancel
Save