From fe02f9eb3ebaca5fe5740d3c4b37001832eda451 Mon Sep 17 00:00:00 2001 From: Anbarasu Date: Sun, 15 Sep 2024 20:15:34 +0530 Subject: [PATCH] feat: Implement scanFiles for gcs and Minio (#9463) * fix: bump gcs version fix: implement scan files for gcs and minio * fix: stream files --- packages/nocodb/src/plugins/gcs/Gcs.ts | 212 +++++++++++++--------- packages/nocodb/src/plugins/mino/Minio.ts | 46 ++++- 2 files changed, 168 insertions(+), 90 deletions(-) diff --git a/packages/nocodb/src/plugins/gcs/Gcs.ts b/packages/nocodb/src/plugins/gcs/Gcs.ts index 34c40fd90c..a6da92471e 100644 --- a/packages/nocodb/src/plugins/gcs/Gcs.ts +++ b/packages/nocodb/src/plugins/gcs/Gcs.ts @@ -1,18 +1,18 @@ import fs from 'fs'; import { promisify } from 'util'; +import { Readable } from 'stream'; import { Storage } from '@google-cloud/storage'; import axios from 'axios'; import { useAgent } from 'request-filtering-agent'; import type { GetSignedUrlConfig, StorageOptions } from '@google-cloud/storage'; import type { IStorageAdapterV2, XcFile } from '~/types/nc-plugin'; -import type { Readable } from 'stream'; import { generateTempFilePath, waitForStreamClose } from '~/utils/pluginUtils'; interface GoogleCloudStorageInput { client_email: string; private_key: string; bucket: string; - project_id: string; + project_id?: string; } export default class Gcs implements IStorageAdapterV2 { @@ -22,23 +22,32 @@ export default class Gcs implements IStorageAdapterV2 { private bucketName: string; private input: GoogleCloudStorageInput; - constructor(input: unknown) { - this.input = input as GoogleCloudStorageInput; + constructor(input: GoogleCloudStorageInput) { + this.input = input; } - public async init(): Promise { - const options: StorageOptions = {}; - options.credentials = { - client_email: this.input.client_email, - // replace \n with real line breaks to avoid - // error:0909006C:PEM routines:get_name:no start line - private_key: this.input.private_key.replace(/\\n/gm, '\n'), + protected patchKey(key: string): string { + let patchedKey = decodeURIComponent(key); + if (patchedKey.startsWith(`${this.bucketName}/`)) { + patchedKey = patchedKey.replace(`${this.bucketName}/`, ''); + } + return patchedKey; + } + + public async init(): Promise { + const options: StorageOptions = { + credentials: { + client_email: this.input.client_email, + // replace \n with real line breaks to avoid + // error:0909006C:PEM routines:get_name:no start line + private_key: this.input.private_key.replace(/\\n/gm, '\n'), + }, }; - // default project ID would be used if it is not provided if (this.input.project_id) { options.projectId = this.input.project_id; } + this.bucketName = this.input.bucket; this.storageClient = new Storage(options); } @@ -50,9 +59,9 @@ export default class Gcs implements IStorageAdapterV2 { await waitForStreamClose(createStream); await this.fileCreate('nc-test-file.txt', { path: tempFile, - mimetype: '', + mimetype: 'text/plain', originalname: 'temp.txt', - size: '', + size: createStream.bytesWritten.toString(), }); await promisify(fs.unlink)(tempFile); return true; @@ -61,112 +70,104 @@ export default class Gcs implements IStorageAdapterV2 { } } - public fileRead(key: string): Promise { - return new Promise((resolve, reject) => { - const file = this.storageClient.bucket(this.bucketName).file(key); - // Check for existence, since gcloud-node seemed to be caching the result - file.exists((err, exists) => { - if (exists) { - file.download((downerr, data) => { - if (err) { - return reject(downerr); - } - return resolve(data); - }); - } else { - reject(err); - } - }); - }); + public async fileRead(key: string): Promise { + const file = this.storageClient + .bucket(this.bucketName) + .file(this.patchKey(key)); + const [exists] = await file.exists(); + if (!exists) { + throw new Error(`File ${this.patchKey(key)} does not exist`); + } + const [data] = await file.download(); + return data; } - async fileCreate(key: string, file: XcFile): Promise { - const uploadResponse = await this.storageClient + public async fileCreate(key: string, file: XcFile): Promise { + const [uploadResponse] = await this.storageClient .bucket(this.bucketName) .upload(file.path, { - destination: key, + destination: this.patchKey(key), contentType: file?.mimetype || 'application/octet-stream', gzip: true, + predefinedAcl: 'publicRead', metadata: { cacheControl: 'public, max-age=31536000', }, }); - return uploadResponse[0].publicUrl(); + return uploadResponse.publicUrl(); } - async fileCreateByStream( + public async fileCreateByStream( key: string, stream: Readable, - options?: { + options: { mimetype?: string; - }, - ): Promise { - const uploadResponse = await this.storageClient + size?: number; + } = {}, + ): Promise { + const file = this.storageClient .bucket(this.bucketName) - .file(key) - .save(stream, { - // Support for HTTP requests made with `Accept-Encoding: gzip` - gzip: true, - // By setting the option `destination`, you can change the name of the - // object you are uploading to a bucket. - metadata: { - contentType: options.mimetype || 'application/octet-stream', - // Enable long-lived HTTP caching headers - // Use only if the contents of the file will never change - // (If the contents will change, use cacheControl: 'no-cache') - cacheControl: 'public, max-age=31536000', - }, - }); + .file(this.patchKey(key)); + await new Promise((resolve, reject) => { + stream + .pipe( + file.createWriteStream({ + gzip: true, + predefinedAcl: 'publicRead', + metadata: { + contentType: options.mimetype || 'application/octet-stream', + cacheControl: 'public, max-age=31536000', + }, + }), + ) + .on('finish', () => resolve()) + .on('error', reject); + }); - return uploadResponse[0].publicUrl(); + return file.publicUrl(); } - async fileCreateByUrl( + public async fileCreateByUrl( destPath: string, url: string, { fetchOptions: { buffer } = { buffer: false } }, - ): Promise { - return new Promise((resolve, reject) => { - axios - .get(url, { - httpAgent: useAgent(url, { stopPortScanningByUrlRedirection: true }), - httpsAgent: useAgent(url, { stopPortScanningByUrlRedirection: true }), - responseType: buffer ? 'arraybuffer' : 'stream', - }) - .then((response) => { - this.storageClient - .bucket(this.bucketName) - .file(destPath) - .save(response.data) - .then((res) => resolve({ url: res, data: response.data })) - .catch(reject); - }) - .catch((error) => { - reject(error); - }); + ): Promise<{ url: string; data: any }> { + const response = await axios.get(url, { + httpAgent: useAgent(url, { stopPortScanningByUrlRedirection: true }), + httpsAgent: useAgent(url, { stopPortScanningByUrlRedirection: true }), + responseType: buffer ? 'arraybuffer' : 'stream', }); + + const file = this.storageClient.bucket(this.bucketName).file(destPath); + await file.save(response.data); + + return { url: file.publicUrl(), data: response.data }; } - fileDelete(_path: string): Promise { - throw new Error('Method not implemented.'); + public async fileDelete(path: string): Promise { + await this.storageClient.bucket(this.bucketName).file(path).delete(); } - // TODO - implement - fileReadByStream(_key: string): Promise { - return Promise.resolve(undefined); + public async fileReadByStream(key: string): Promise { + return this.storageClient + .bucket(this.bucketName) + .file(this.patchKey(key)) + .createReadStream(); } - // TODO - implement - getDirectoryList(_path: string): Promise { - return Promise.resolve(undefined); + public async getDirectoryList(path: string): Promise { + const [files] = await this.storageClient.bucket(this.bucketName).getFiles({ + prefix: path, + }); + return files.map((file) => file.name); } public async getSignedUrl( - key, + key: string, expiresInSeconds = 7200, pathParameters?: { [key: string]: string }, - ) { + ): Promise { const options: GetSignedUrlConfig = { version: 'v4', action: 'read', @@ -176,13 +177,48 @@ export default class Gcs implements IStorageAdapterV2 { const [url] = await this.storageClient .bucket(this.bucketName) - .file(key) + .file(this.patchKey(key)) .getSignedUrl(options); return url; } - public async scanFiles(_globPattern: string): Promise { - return Promise.resolve(undefined); + public async scanFiles(globPattern: string): Promise { + // Remove all dots from the prefix + globPattern = globPattern.replace(/\./g, ''); + + // Remove the leading slash + globPattern = globPattern.replace(/^\//, ''); + + // Make sure pattern starts with nc/uploads/ + if (!globPattern.startsWith('nc/uploads/')) { + globPattern = `nc/uploads/${globPattern}`; + } + + const stream = new Readable({ + objectMode: true, + read() {}, + }); + + const fileStream = this.storageClient + .bucket(this.input.bucket) + .getFilesStream({ + prefix: globPattern, + autoPaginate: true, + }); + + fileStream.on('error', (error) => { + stream.emit('error', error); + }); + + fileStream.on('data', (file) => { + stream.push(file.name); + }); + + fileStream.on('end', () => { + stream.push(null); + }); + + return stream; } } diff --git a/packages/nocodb/src/plugins/mino/Minio.ts b/packages/nocodb/src/plugins/mino/Minio.ts index 0efa2c47ab..7304065428 100644 --- a/packages/nocodb/src/plugins/mino/Minio.ts +++ b/packages/nocodb/src/plugins/mino/Minio.ts @@ -204,7 +204,49 @@ export default class Minio implements IStorageAdapterV2 { }); } - public async scanFiles(_globPattern: string): Promise { - return Promise.resolve(undefined); + public async scanFiles(globPattern: string): Promise { + // Remove all dots from the glob pattern + globPattern = globPattern.replace(/\./g, ''); + + // Remove the leading slash + globPattern = globPattern.replace(/^\//, ''); + + // Make sure pattern starts with nc/uploads/ + if (!globPattern.startsWith('nc/uploads/')) { + globPattern = `nc/uploads/${globPattern}`; + } + + // Minio does not support glob so remove * + globPattern = globPattern.replace(/\*/g, ''); + + const stream = new Readable({ + read() {}, + }); + + stream.setEncoding('utf8'); + + const listObjects = async () => { + try { + const objectStream = this.minioClient.listObjectsV2( + this.input.bucket, + globPattern, + true, + ); + + for await (const item of objectStream) { + stream.push(item.name); + } + + stream.push(null); + } catch (error) { + stream.emit('error', error); + } + }; + + listObjects().catch((error) => { + stream.emit('error', error); + }); + + return stream; } }