From 84f7de90d09dc7a8d95386c52b1242d0df4084dc Mon Sep 17 00:00:00 2001 From: tada5hi Date: Tue, 27 Feb 2024 12:48:46 +0100 Subject: [PATCH] feat: bucket stream route handler --- .../builder/helpers/container-pack.ts | 5 +- .../http/controllers/bucket/handlers/index.ts | 1 + .../controllers/bucket/handlers/stream.ts | 81 +++++++++++++++++++ .../src/http/controllers/bucket/index.ts | 10 +++ .../src/domains/bucket-file/api.ts | 14 ++++ .../storage-kit/src/domains/bucket/api.ts | 13 ++- 6 files changed, 118 insertions(+), 6 deletions(-) create mode 100644 packages/server-storage/src/http/controllers/bucket/handlers/stream.ts diff --git a/packages/server-analysis-manager/src/components/builder/helpers/container-pack.ts b/packages/server-analysis-manager/src/components/builder/helpers/container-pack.ts index c7a192807..84fadb666 100644 --- a/packages/server-analysis-manager/src/components/builder/helpers/container-pack.ts +++ b/packages/server-analysis-manager/src/components/builder/helpers/container-pack.ts @@ -30,10 +30,7 @@ export async function packContainerWithTrain(container: Container, context: Cont const client = useStorageClient(); return new Promise((resolve, reject) => { - client.request({ - url: client.bucket.getDownloadPath(context.train.id), - responseType: 'stream', - }) + client.bucket.stream(context.train.id) .then((response) => { const extract = tar.extract(); diff --git a/packages/server-storage/src/http/controllers/bucket/handlers/index.ts b/packages/server-storage/src/http/controllers/bucket/handlers/index.ts index f6ff37546..25bb85cb4 100644 --- a/packages/server-storage/src/http/controllers/bucket/handlers/index.ts +++ b/packages/server-storage/src/http/controllers/bucket/handlers/index.ts @@ -8,5 +8,6 @@ export * from './create'; export * from './delete'; export * from './read'; +export * from './stream'; export * from './update'; export * from './upload'; diff --git a/packages/server-storage/src/http/controllers/bucket/handlers/stream.ts b/packages/server-storage/src/http/controllers/bucket/handlers/stream.ts new file mode 100644 index 000000000..aa49367be --- /dev/null +++ b/packages/server-storage/src/http/controllers/bucket/handlers/stream.ts @@ -0,0 +1,81 @@ +/* + * Copyright (c) 2022-2024. + * Author Peter Placzek (tada5hi) + * For the full copyright and license information, + * view the LICENSE file that was distributed with this source code. + */ + +import { NotFoundError } from '@ebec/http'; +import type { Request, Response } from 'routup'; +import { useRequestParam } from 'routup'; +import type { Pack } from 'tar-stream'; +import tar from 'tar-stream'; +import { useDataSource } from 'typeorm-extension'; +import { streamToBuffer, useMinio } from '../../../../core'; +import { + BucketEntity, BucketFileEntity, +} from '../../../../domains'; + +async function packFile( + pack: Pack, + name: string, + file: BucketFileEntity, +) : Promise { + const minio = useMinio(); + return new Promise((resolve, reject) => { + minio.getObject(name, file.hash) + .then((stream) => streamToBuffer(stream)) + .then((data) => { + pack.entry({ + name: file.path, + size: data.byteLength, + }, data, (err) => { + if (err) { + reject(err); + + return; + } + + resolve(); + }); + }) + .catch((e) => reject(e)); + }); +} + +async function packFiles( + pack: Pack, + name: string, + files: BucketFileEntity[], +) { + const promises : Promise[] = []; + + for (let i = 0; i < files.length; i++) { + promises.push(packFile(pack, name, files[i])); + } + + await Promise.all(promises); +} + +export async function executeBucketRouteStreamHandler(req: Request, res: Response) : Promise { + const id = useRequestParam(req, 'id'); + + const dataSource = await useDataSource(); + const repository = dataSource.getRepository(BucketEntity); + const entity = await repository.findOneBy({ id }); + if (!entity) { + throw new NotFoundError(); + } + + const fileRepository = dataSource.getRepository(BucketFileEntity); + const files = await fileRepository.findBy({ + bucket_id: entity.id, + }); + + const pack = tar.pack(); + pack.pipe(res); + + await packFiles(pack, entity.name, files); + + pack.finalize(); +} diff --git a/packages/server-storage/src/http/controllers/bucket/index.ts b/packages/server-storage/src/http/controllers/bucket/index.ts index 8cf4a3e00..985981af0 100644 --- a/packages/server-storage/src/http/controllers/bucket/index.ts +++ b/packages/server-storage/src/http/controllers/bucket/index.ts @@ -19,6 +19,7 @@ import { executeBucketRouteDeleteHandler, executeBucketRouteGetManyHandler, executeBucketRouteGetOneHandler, + executeBucketRouteStreamHandler, executeBucketRouteUpdateHandler, executeBucketRouteUploadHandler, } from './handlers'; @@ -36,6 +37,15 @@ export class BucketController { return await executeBucketRouteGetManyHandler(req, res) as PartialBucket[]; } + @DGet('/:id/stream', [ForceLoggedInMiddleware]) + async stream( + @DPath('id') id: string, + @DRequest() req: any, + @DResponse() res: any, + ): Promise { + return await executeBucketRouteStreamHandler(req, res) as ReadableStream; + } + @DPost('/:id/upload', [ForceLoggedInMiddleware]) async upload( @DPath('id') id: string, diff --git a/packages/storage-kit/src/domains/bucket-file/api.ts b/packages/storage-kit/src/domains/bucket-file/api.ts index e34f8435c..63b98c17d 100644 --- a/packages/storage-kit/src/domains/bucket-file/api.ts +++ b/packages/storage-kit/src/domains/bucket-file/api.ts @@ -5,6 +5,7 @@ * view the LICENSE file that was distributed with this source code. */ +import type { Analysis } from '@privateaim/core/src'; import type { BuildInput } from 'rapiq'; import { buildQuery } from 'rapiq'; import { nullifyEmptyObjectProperties } from '@privateaim/core'; @@ -39,4 +40,17 @@ export class BucketFileAPI extends BaseAPI { const response = await this.client.post(`bucket-files/${id}`, nullifyEmptyObjectProperties(data)); return response.data; } + + getStreamPath(id: Analysis['id']): string { + return `bucket-files/${id}/stream`; + } + + async stream(id: BucketFile['id']) : Promise> { + const response = await this.client.request({ + url: this.getStreamPath(id), + responseType: 'stream', + }); + + return response.data; + } } diff --git a/packages/storage-kit/src/domains/bucket/api.ts b/packages/storage-kit/src/domains/bucket/api.ts index 083bf2ef4..6d08a1701 100644 --- a/packages/storage-kit/src/domains/bucket/api.ts +++ b/packages/storage-kit/src/domains/bucket/api.ts @@ -52,7 +52,16 @@ export class BucketAPI extends BaseAPI { return response.data; } - getDownloadPath(id: Analysis['id']): string { - return `buckets/${id}/files/download`; + getStreamPath(id: Analysis['id']): string { + return `buckets/${id}/stream`; + } + + async stream(id: Bucket['id']) : Promise> { + const response = await this.client.request({ + url: this.getStreamPath(id), + responseType: 'stream', + }); + + return response.data; } }