From 0f1c654a20a2355ef8d2ece76df3e38f8f3bf44f Mon Sep 17 00:00:00 2001 From: larry-internxt Date: Thu, 21 Mar 2024 19:47:19 +0100 Subject: [PATCH] added upload file using streams --- src/commands/upload.ts | 2 +- src/services/crypto.service.ts | 18 ++++- .../network/network-facade.service.ts | 79 +++++++++++++++++-- src/services/network/upload.service.ts | 46 ++++++++++- src/types/network.types.ts | 4 +- src/webdav/handlers/PUT.handler.ts | 8 +- test/services/crypto.service.test.ts | 2 +- .../network/network-facade.service.test.ts | 4 +- test/webdav/handlers/PUT.handler.test.ts | 2 +- 9 files changed, 143 insertions(+), 22 deletions(-) diff --git a/src/commands/upload.ts b/src/commands/upload.ts index 5ea51e6..12190d0 100644 --- a/src/commands/upload.ts +++ b/src/commands/upload.ts @@ -72,7 +72,7 @@ export default class Upload extends Command { linewrap: true, }); progressBar.start(1, 0); - const [uploadPromise, abortable] = await networkFacade.uploadFromStream( + const [uploadPromise, abortable] = await networkFacade.uploadFromStreamUsingBlob( user.bucket, mnemonic, stat.size, diff --git a/src/services/crypto.service.ts b/src/services/crypto.service.ts index e5bb59e..14a28d3 100644 --- a/src/services/crypto.service.ts +++ b/src/services/crypto.service.ts @@ -109,7 +109,7 @@ export class CryptoService { return Buffer.concat([decipher.update(contentsToDecrypt), decipher.final()]).toString('utf8'); }; - private encryptReadable(readable: ReadableStream, cipher: Cipher): ReadableStream { + public encryptReadable(readable: ReadableStream, cipher: Cipher): ReadableStream { const reader = readable.getReader(); const encryptedFileReadable = new ReadableStream({ @@ -161,7 +161,7 @@ export class CryptoService { return decryptedStream; } - public async encryptStream( + public async encryptStreamToFile( input: ReadableStream, key: Buffer, iv: Buffer, @@ -191,6 +191,20 @@ export class CryptoService { }; } + public async readableToHash(readable: ReadableStream): Promise { + const reader = readable.getReader(); + const hasher = crypto.createHash('sha256'); + let done = false; + while (!done) { + const status = await reader.read(); + if (!status.done) { + hasher.update(status.value); + } + done = status.done; + } + return createHash('ripemd160').update(hasher.digest()).digest(); + } + /** * Generates the key and the iv by transforming a secret and a salt. * It will generate the same key and iv if the same secret and salt is used. diff --git a/src/services/network/network-facade.service.ts b/src/services/network/network-facade.service.ts index b628a19..797baf4 100644 --- a/src/services/network/network-facade.service.ts +++ b/src/services/network/network-facade.service.ts @@ -1,10 +1,9 @@ import { Network } from '@internxt/sdk'; import * as NetworkUpload from '@internxt/sdk/dist/network/upload'; import * as NetworkDownload from '@internxt/sdk/dist/network/download'; - import { Environment } from '@internxt/inxt-js'; -import crypto from 'crypto'; -import { DownloadOptions, UploadOptions, UploadProgressCallback } from '../../types/network.types'; +import crypto, { createCipheriv } from 'crypto'; +import { DownloadOptions, UploadOptions, ProgressCallback } from '../../types/network.types'; import { DecryptFileFunction, DownloadFileFunction, @@ -58,7 +57,7 @@ export class NetworkFacade { let fileStream: ReadableStream; const abortable = options?.abortController ?? new AbortController(); - const onProgress: UploadProgressCallback = (progress: number) => { + const onProgress: ProgressCallback = (progress: number) => { if (!options?.progressCallback) return; options.progressCallback(progress); }; @@ -118,7 +117,7 @@ export class NetworkFacade { * @param options The upload options * @returns A promise to execute the upload and an abort controller to cancel the upload */ - async uploadFromStream( + async uploadFromStreamUsingBlob( bucketId: string, mnemonic: string, size: number, @@ -129,7 +128,7 @@ export class NetworkFacade { let fileHash: Buffer; let encryptedBlob: Blob; - const onProgress: UploadProgressCallback = (progress: number) => { + const onProgress: ProgressCallback = (progress: number) => { if (!options?.progressCallback) return; options.progressCallback(progress); }; @@ -139,7 +138,7 @@ export class NetworkFacade { }; const encryptFile: EncryptFileFunction = async (_, key, iv) => { - const { blob, hash } = await this.cryptoService.encryptStream( + const { blob, hash } = await this.cryptoService.encryptStreamToFile( from, Buffer.from(key as ArrayBuffer), Buffer.from(iv as ArrayBuffer), @@ -177,4 +176,70 @@ export class NetworkFacade { return [uploadOperation(), abortable]; } + + /** + * Performs an upload encrypting the stream content + * + * @param bucketId The bucket where the file will be uploaded + * @param mnemonic The plain mnemonic of the user + * @param size The total size of the stream content + * @param type The mime type of the stream content + * @param from The source ReadStream to upload from + * @param options The upload options + * @returns A promise to execute the upload and an abort controller to cancel the upload + */ + async uploadFromStreamUsingStream( + bucketId: string, + mnemonic: string, + size: number, + from: ReadableStream, + options?: UploadOptions, + ): Promise<[Promise<{ fileId: string; hash: Buffer }>, AbortController]> { + const abortable = options?.abortController ?? new AbortController(); + let encryptedStream: ReadableStream; + let fileHash: Buffer; + + const onProgress: ProgressCallback = (progress: number) => { + if (!options?.progressCallback) return; + options.progressCallback(progress); + }; + + const onUploadProgress = (progress: number) => { + onProgress(progress); + }; + + const encryptFile: EncryptFileFunction = async (algorithm, key, iv) => { + const cipher = createCipheriv('aes-256-ctr', key as Buffer, iv as Buffer); + encryptedStream = this.cryptoService.encryptReadable(from, cipher); + }; + + const uploadFile: UploadFileFunction = async (url) => { + const { hash } = await this.uploadService.uploadReadableStream(url, size, encryptedStream, { + abortController: abortable, + progressCallback: onUploadProgress, + }); + fileHash = hash; + return fileHash.toString('hex'); + }; + + const uploadOperation = async () => { + const uploadResult = await NetworkUpload.uploadFile( + this.network, + this.cryptoLib, + bucketId, + mnemonic, + size, + encryptFile, + uploadFile, + ); + + onProgress(1); + return { + fileId: uploadResult, + hash: fileHash, + }; + }; + + return [uploadOperation(), abortable]; + } } diff --git a/src/services/network/upload.service.ts b/src/services/network/upload.service.ts index a809738..a442c29 100644 --- a/src/services/network/upload.service.ts +++ b/src/services/network/upload.service.ts @@ -1,5 +1,7 @@ -import { UploadOptions } from '../../types/network.types'; +import axios from 'axios'; import superagent from 'superagent'; +import { UploadOptions } from '../../types/network.types'; +import { CryptoService } from '../crypto.service'; export class UploadService { public static readonly instance: UploadService = new UploadService(); @@ -30,4 +32,46 @@ export class UploadService { } return { etag }; } + + async uploadReadableStream( + url: string, + size: number, + data: ReadableStream, + options: UploadOptions, + ): Promise<{ etag: string; hash: Buffer }> { + const [dataReadableHash, dataReadableRequest] = data.tee(); + + const request = axios.put(url, dataReadableRequest, { + headers: { + 'Content-Type': 'application/octet-stream', + 'Content-Length': size.toString(), + maxContentLength: Infinity, + maxBodyLength: Infinity, + }, + onUploadProgress: (progressEvent) => { + console.info({ progressEvent }); + if (options.progressCallback && progressEvent.total) { + const reportedProgress = progressEvent.loaded / progressEvent.total; + options.progressCallback(reportedProgress); + } + }, + signal: options.abortController?.signal, + cancelToken: new axios.CancelToken((canceler) => { + options.abortController?.signal.addEventListener('abort', () => { + canceler(); + }); + }), + }); + + const getFileHash = CryptoService.instance.readableToHash(dataReadableHash); + + const [response, hash] = await Promise.all([request, getFileHash]); + + const etag = response.headers.etag; + options.progressCallback(1); + if (!etag) { + throw new Error('Missing Etag in response when uploading file'); + } + return { etag, hash }; + } } diff --git a/src/types/network.types.ts b/src/types/network.types.ts index f2f9cb5..a7b1493 100644 --- a/src/types/network.types.ts +++ b/src/types/network.types.ts @@ -3,9 +3,9 @@ export interface NetworkCredentials { pass: string; } -export type UploadProgressCallback = (progress: number) => void; +export type ProgressCallback = (progress: number) => void; export interface NetworkOperationBaseOptions { - progressCallback: UploadProgressCallback; + progressCallback: ProgressCallback; abortController?: AbortController; } diff --git a/src/webdav/handlers/PUT.handler.ts b/src/webdav/handlers/PUT.handler.ts index e1ef9c6..6d87199 100644 --- a/src/webdav/handlers/PUT.handler.ts +++ b/src/webdav/handlers/PUT.handler.ts @@ -1,5 +1,4 @@ import { Request, Response } from 'express'; -import path from 'path'; import { DriveFileService } from '../../services/drive/drive-file.service'; import { DriveRealmManager } from '../../services/realms/drive-realm-manager.service'; import { NetworkFacade } from '../../services/network/network-facade.service'; @@ -43,7 +42,7 @@ export class PUTRequestHandler implements WebDavMethodHandler { const { user, mnemonic } = await this.dependencies.authService.getAuthDetails(); - const [uploadPromise] = await this.dependencies.networkFacade.uploadFromStream( + const [uploadPromise, _] = await this.dependencies.networkFacade.uploadFromStreamUsingStream( user.bucket, mnemonic, contentLength, @@ -54,8 +53,7 @@ export class PUTRequestHandler implements WebDavMethodHandler { }, }, ); - - const uploadResult = await uploadPromise; + const { fileId } = await uploadPromise; webdavLogger.info('✅ File uploaded to network'); @@ -64,7 +62,7 @@ export class PUTRequestHandler implements WebDavMethodHandler { type: resource.path.ext.replaceAll('.', ''), size: contentLength, folderId: driveFolder.id, - fileId: uploadResult.fileId, + fileId: fileId, bucket: user.bucket, }); diff --git a/test/services/crypto.service.test.ts b/test/services/crypto.service.test.ts index 1a58e79..85487c0 100644 --- a/test/services/crypto.service.test.ts +++ b/test/services/crypto.service.test.ts @@ -127,7 +127,7 @@ describe('Crypto service', () => { const file = path.join(process.cwd(), 'test/fixtures/test-content.fixture.txt'); const readStream = createReadStream(file); - const result = await CryptoService.instance.encryptStream( + const result = await CryptoService.instance.encryptStreamToFile( StreamUtils.readStreamToReadableStream(readStream), key, iv, diff --git a/test/services/network/network-facade.service.test.ts b/test/services/network/network-facade.service.test.ts index d09e76d..7339a68 100644 --- a/test/services/network/network-facade.service.test.ts +++ b/test/services/network/network-facade.service.test.ts @@ -43,7 +43,7 @@ describe('Network Facade Service', () => { abortController: new AbortController(), }; - const result = await sut.uploadFromStream( + const result = await sut.uploadFromStreamUsingBlob( 'f1858bc9675f9e4f7ab29429', 'animal fog wink trade december thumb sight cousin crunch plunge captain enforce letter creek text', 100, @@ -70,7 +70,7 @@ describe('Network Facade Service', () => { }; networkFacadeSandbox.stub(NetworkUpload, 'uploadFile').resolves('uploaded_file_id'); - const [executeUpload] = await sut.uploadFromStream( + const [executeUpload] = await sut.uploadFromStreamUsingBlob( 'f1858bc9675f9e4f7ab29429', 'animal fog wink trade december thumb sight cousin crunch plunge captain enforce letter creek text', 100, diff --git a/test/webdav/handlers/PUT.handler.test.ts b/test/webdav/handlers/PUT.handler.test.ts index 45d238e..cce200a 100644 --- a/test/webdav/handlers/PUT.handler.test.ts +++ b/test/webdav/handlers/PUT.handler.test.ts @@ -143,7 +143,7 @@ describe('PUT request handler', () => { .resolves({ mnemonic: 'MNEMONIC', token: 'TOKEN', newToken: 'NEW_TOKEN', user: UserFixture }); sandbox - .stub(networkFacade, 'uploadFromStream') + .stub(networkFacade, 'uploadFromStreamUsingStream') .resolves([Promise.resolve({ fileId: '09218313209', hash: Buffer.from('test') }), new AbortController()]); sandbox.stub(DriveFileService.instance, 'createFile').resolves(); sandbox.stub(driveRealmManager, 'createFile').resolves();