From 9b4cfa006a830f070b16991ef5730394f7fe19cc Mon Sep 17 00:00:00 2001 From: tada5hi Date: Tue, 27 Feb 2024 12:56:16 +0100 Subject: [PATCH] feat: bucket-file stream route handler & updated test suite --- package-lock.json | 5 +- packages/server-storage/package.json | 2 + .../controllers/bucket-file/handlers/index.ts | 1 + .../bucket-file/handlers/stream.ts | 72 +++++++++++++++++++ .../src/http/controllers/bucket-file/index.ts | 10 +++ .../controllers/bucket/handlers/stream.ts | 5 ++ .../test/unit/bucket-file.spec.ts | 45 ++++++++++++ 7 files changed, 139 insertions(+), 1 deletion(-) create mode 100644 packages/server-storage/src/http/controllers/bucket-file/handlers/stream.ts diff --git a/package-lock.json b/package-lock.json index c99184c61..84387e707 100644 --- a/package-lock.json +++ b/package-lock.json @@ -26237,15 +26237,16 @@ "@hapic/harbor": "^2.3.2", "@privateaim/core": "^0.1.0", "@privateaim/server-kit": "^0.1.0", + "@privateaim/storage-kit": "^0.0.0", "amqp-extension": "^2.0.1", "dockerode": "^4.0.2", "dotenv": "^16.4.5", "envix": "^1.3.0", "gunzip-maybe": "^1.4.2", "hapic": "^2.5.0", - "minio": "^7.1.3", "rapiq": "^0.9.0", "redis-extension": "^1.3.0", + "singa": "^1.0.0", "tar-stream": "^3.1.6", "uuid": "^9.0.1", "winston": "^3.11.0" @@ -26439,6 +26440,7 @@ "redis-extension": "^1.3.0", "routup": "^3.2.0", "singa": "^1.0.0", + "tar-stream": "^3.1.6", "typeorm": "^0.3.20", "typeorm-extension": "^3.5.0", "winston": "^3.11.0" @@ -26446,6 +26448,7 @@ "devDependencies": { "@types/busboy": "^1.5.3", "@types/cors": "^2.8.17", + "@types/tar-stream": "^2.2.3", "testcontainers": "^10.7.1" } }, diff --git a/packages/server-storage/package.json b/packages/server-storage/package.json index d067de1ba..7a90561b0 100644 --- a/packages/server-storage/package.json +++ b/packages/server-storage/package.json @@ -26,6 +26,7 @@ "redis-extension": "^1.3.0", "routup": "^3.2.0", "singa": "^1.0.0", + "tar-stream": "^3.1.6", "typeorm": "^0.3.20", "typeorm-extension": "^3.5.0", "winston": "^3.11.0" @@ -33,6 +34,7 @@ "devDependencies": { "@types/busboy": "^1.5.3", "@types/cors": "^2.8.17", + "@types/tar-stream": "^2.2.3", "testcontainers": "^10.7.1" }, "scripts": { diff --git a/packages/server-storage/src/http/controllers/bucket-file/handlers/index.ts b/packages/server-storage/src/http/controllers/bucket-file/handlers/index.ts index bc7afb784..20fdf601b 100644 --- a/packages/server-storage/src/http/controllers/bucket-file/handlers/index.ts +++ b/packages/server-storage/src/http/controllers/bucket-file/handlers/index.ts @@ -7,3 +7,4 @@ export * from './delete'; export * from './read'; +export * from './stream'; diff --git a/packages/server-storage/src/http/controllers/bucket-file/handlers/stream.ts b/packages/server-storage/src/http/controllers/bucket-file/handlers/stream.ts new file mode 100644 index 000000000..5144565d8 --- /dev/null +++ b/packages/server-storage/src/http/controllers/bucket-file/handlers/stream.ts @@ -0,0 +1,72 @@ +/* + * Copyright (c) 2022-2024. + * Author Peter Placzek (tada5hi) + * For the full copyright and license information, + * view the LICENSE file that was distributed with this source code. + */ + +import { NotFoundError } from '@ebec/http'; +import type { Request, Response } from 'routup'; +import { useRequestParam } from 'routup'; +import type { Pack } from 'tar-stream'; +import tar from 'tar-stream'; +import { useDataSource } from 'typeorm-extension'; +import { streamToBuffer, useMinio } from '../../../../core'; +import { + BucketFileEntity, +} from '../../../../domains'; + +async function packFile( + pack: Pack, + file: BucketFileEntity, +) : Promise { + const minio = useMinio(); + return new Promise((resolve, reject) => { + minio.getObject(file.bucket.name, file.hash) + .then((stream) => streamToBuffer(stream)) + .then((data) => { + pack.entry({ + name: file.path, + size: data.byteLength, + }, data, (err) => { + if (err) { + reject(err); + + return; + } + + resolve(); + }); + }) + .catch((e) => reject(e)); + }); +} + +export async function executeBucketFileRouteStreamHandler(req: Request, res: Response) : Promise { + const id = useRequestParam(req, 'id'); + + const dataSource = await useDataSource(); + const repository = dataSource.getRepository(BucketFileEntity); + const entity = await repository.findOne({ + where: { + id, + }, + relations: ['bucket'], + }); + + if (!entity) { + throw new NotFoundError(); + } + + res.writeHead(200, { + 'Content-Type': 'application/x-tar', + 'Transfer-Encoding': 'chunked', + }); + + const pack = tar.pack(); + pack.pipe(res); + + await packFile(pack, entity); + + pack.finalize(); +} diff --git a/packages/server-storage/src/http/controllers/bucket-file/index.ts b/packages/server-storage/src/http/controllers/bucket-file/index.ts index 7a1c84ba8..cefcd33b6 100644 --- a/packages/server-storage/src/http/controllers/bucket-file/index.ts +++ b/packages/server-storage/src/http/controllers/bucket-file/index.ts @@ -14,6 +14,7 @@ import { executeBucketFileRouteDeleteHandler, executeBucketFileRouteGetManyHandler, executeBucketFileRouteGetOneHandler, + executeBucketFileRouteStreamHandler, } from './handlers'; type PartialBucketFile = Partial; @@ -29,6 +30,15 @@ export class BucketFileController { return await executeBucketFileRouteGetManyHandler(req, res) as PartialBucketFile[]; } + @DGet('/:id/stream', [ForceLoggedInMiddleware]) + async stream( + @DPath('id') id: string, + @DRequest() req: any, + @DResponse() res: any, + ): Promise { + return await executeBucketFileRouteStreamHandler(req, res) as ReadableStream; + } + @DGet('/:id', [ForceLoggedInMiddleware]) async getOne( @DPath('id') id: string, diff --git a/packages/server-storage/src/http/controllers/bucket/handlers/stream.ts b/packages/server-storage/src/http/controllers/bucket/handlers/stream.ts index aa49367be..85428fd39 100644 --- a/packages/server-storage/src/http/controllers/bucket/handlers/stream.ts +++ b/packages/server-storage/src/http/controllers/bucket/handlers/stream.ts @@ -72,6 +72,11 @@ export async function executeBucketRouteStreamHandler(req: Request, res: Respons bucket_id: entity.id, }); + res.writeHead(200, { + 'Content-Type': 'application/x-tar', + 'Transfer-Encoding': 'chunked', + }); + const pack = tar.pack(); pack.pipe(res); diff --git a/packages/server-storage/test/unit/bucket-file.spec.ts b/packages/server-storage/test/unit/bucket-file.spec.ts index a3f9d5dbb..3b4d4506a 100644 --- a/packages/server-storage/test/unit/bucket-file.spec.ts +++ b/packages/server-storage/test/unit/bucket-file.spec.ts @@ -6,6 +6,7 @@ */ import path from 'node:path'; +import tar from 'tar-stream'; import type { BucketFileEntity } from '../../src/domains'; import { dropTestDatabase, @@ -71,6 +72,50 @@ describe('controllers/bucket-file', () => { expectPropertiesEqualToSrc(details, response.body); }); + it('should stream resource', (done) => { + const extract = tar.extract(); + + const headers : Record[] = []; + + extract.on('entry', async (header, stream, callback) => { + headers.push(header); + + callback(); + }); + + extract.on('finish', () => { + expect(headers.length).toBeGreaterThanOrEqual(1); + done(); + }); + + superTest + .get(`/bucket-files/${details.id}/stream`) + .auth('admin', 'start123') + .pipe(extract); + }); + + it('should stream resource collection', (done) => { + const extract = tar.extract(); + + const headers : Record[] = []; + + extract.on('entry', async (header, stream, callback) => { + headers.push(header); + + callback(); + }); + + extract.on('finish', () => { + expect(headers.length).toBeGreaterThanOrEqual(1); + done(); + }); + + superTest + .get(`/buckets/${details.bucket_id}/stream`) + .auth('admin', 'start123') + .pipe(extract); + }); + it('should delete resource', async () => { const response = await superTest .delete(`/bucket-files/${details.id}`)