Skip to content

Commit

Permalink
feat: bucket stream route handler
Browse files Browse the repository at this point in the history
  • Loading branch information
tada5hi committed Feb 27, 2024
1 parent 2f27f9b commit 84f7de9
Show file tree
Hide file tree
Showing 6 changed files with 118 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,7 @@ export async function packContainerWithTrain(container: Container, context: Cont
const client = useStorageClient();

return new Promise<void>((resolve, reject) => {
client.request({
url: client.bucket.getDownloadPath(context.train.id),
responseType: 'stream',
})
client.bucket.stream(context.train.id)
.then((response) => {
const extract = tar.extract();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,6 @@
export * from './create';
export * from './delete';
export * from './read';
export * from './stream';
export * from './update';
export * from './upload';
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Copyright (c) 2022-2024.
* Author Peter Placzek (tada5hi)
* For the full copyright and license information,
* view the LICENSE file that was distributed with this source code.
*/

import { NotFoundError } from '@ebec/http';
import type { Request, Response } from 'routup';
import { useRequestParam } from 'routup';
import type { Pack } from 'tar-stream';
import tar from 'tar-stream';
import { useDataSource } from 'typeorm-extension';
import { streamToBuffer, useMinio } from '../../../../core';
import {
BucketEntity, BucketFileEntity,
} from '../../../../domains';

async function packFile(
pack: Pack,
name: string,
file: BucketFileEntity,
) : Promise<void> {
const minio = useMinio();
return new Promise<void>((resolve, reject) => {
minio.getObject(name, file.hash)
.then((stream) => streamToBuffer(stream))
.then((data) => {
pack.entry({
name: file.path,
size: data.byteLength,
}, data, (err) => {
if (err) {
reject(err);

return;
}

resolve();
});
})
.catch((e) => reject(e));
});
}

async function packFiles(
pack: Pack,
name: string,
files: BucketFileEntity[],
) {
const promises : Promise<void>[] = [];

for (let i = 0; i < files.length; i++) {
promises.push(packFile(pack, name, files[i]));
}

await Promise.all(promises);
}

export async function executeBucketRouteStreamHandler(req: Request, res: Response) : Promise<any> {
const id = useRequestParam(req, 'id');

const dataSource = await useDataSource();
const repository = dataSource.getRepository(BucketEntity);
const entity = await repository.findOneBy({ id });
if (!entity) {
throw new NotFoundError();
}

const fileRepository = dataSource.getRepository(BucketFileEntity);
const files = await fileRepository.findBy({
bucket_id: entity.id,
});

const pack = tar.pack();
pack.pipe(res);

await packFiles(pack, entity.name, files);

pack.finalize();
}
10 changes: 10 additions & 0 deletions packages/server-storage/src/http/controllers/bucket/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import {
executeBucketRouteDeleteHandler,
executeBucketRouteGetManyHandler,
executeBucketRouteGetOneHandler,
executeBucketRouteStreamHandler,
executeBucketRouteUpdateHandler,
executeBucketRouteUploadHandler,
} from './handlers';
Expand All @@ -36,6 +37,15 @@ export class BucketController {
return await executeBucketRouteGetManyHandler(req, res) as PartialBucket[];
}

@DGet('/:id/stream', [ForceLoggedInMiddleware])
async stream(
@DPath('id') id: string,
@DRequest() req: any,
@DResponse() res: any,
): Promise<ReadableStream> {
return await executeBucketRouteStreamHandler(req, res) as ReadableStream;
}

@DPost('/:id/upload', [ForceLoggedInMiddleware])
async upload(
@DPath('id') id: string,
Expand Down
14 changes: 14 additions & 0 deletions packages/storage-kit/src/domains/bucket-file/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
* view the LICENSE file that was distributed with this source code.
*/

import type { Analysis } from '@privateaim/core/src';
import type { BuildInput } from 'rapiq';
import { buildQuery } from 'rapiq';
import { nullifyEmptyObjectProperties } from '@privateaim/core';
Expand Down Expand Up @@ -39,4 +40,17 @@ export class BucketFileAPI extends BaseAPI {
const response = await this.client.post(`bucket-files/${id}`, nullifyEmptyObjectProperties(data));
return response.data;
}

getStreamPath(id: Analysis['id']): string {
return `bucket-files/${id}/stream`;
}

async stream(id: BucketFile['id']) : Promise<ReadableStream<any>> {
const response = await this.client.request({
url: this.getStreamPath(id),
responseType: 'stream',
});

return response.data;
}
}
13 changes: 11 additions & 2 deletions packages/storage-kit/src/domains/bucket/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,16 @@ export class BucketAPI extends BaseAPI {
return response.data;
}

getDownloadPath(id: Analysis['id']): string {
return `buckets/${id}/files/download`;
getStreamPath(id: Analysis['id']): string {
return `buckets/${id}/stream`;
}

async stream(id: Bucket['id']) : Promise<ReadableStream<any>> {
const response = await this.client.request({
url: this.getStreamPath(id),
responseType: 'stream',
});

return response.data;
}
}

0 comments on commit 84f7de9

Please sign in to comment.