Skip to content

Commit

Permalink
added upload file using streams
Browse files Browse the repository at this point in the history
  • Loading branch information
larryrider committed Mar 21, 2024
1 parent 4b63ad3 commit 0f1c654
Show file tree
Hide file tree
Showing 9 changed files with 143 additions and 22 deletions.
2 changes: 1 addition & 1 deletion src/commands/upload.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ export default class Upload extends Command {
linewrap: true,
});
progressBar.start(1, 0);
const [uploadPromise, abortable] = await networkFacade.uploadFromStream(
const [uploadPromise, abortable] = await networkFacade.uploadFromStreamUsingBlob(
user.bucket,
mnemonic,
stat.size,
Expand Down
18 changes: 16 additions & 2 deletions src/services/crypto.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ export class CryptoService {
return Buffer.concat([decipher.update(contentsToDecrypt), decipher.final()]).toString('utf8');
};

private encryptReadable(readable: ReadableStream<Uint8Array>, cipher: Cipher): ReadableStream<Uint8Array> {
public encryptReadable(readable: ReadableStream<Uint8Array>, cipher: Cipher): ReadableStream<Uint8Array> {
const reader = readable.getReader();

const encryptedFileReadable = new ReadableStream({
Expand Down Expand Up @@ -161,7 +161,7 @@ export class CryptoService {
return decryptedStream;
}

public async encryptStream(
public async encryptStreamToFile(
input: ReadableStream<Uint8Array>,
key: Buffer,
iv: Buffer,
Expand Down Expand Up @@ -191,6 +191,20 @@ export class CryptoService {
};
}

public async readableToHash(readable: ReadableStream<Uint8Array>): Promise<Buffer> {
const reader = readable.getReader();
const hasher = crypto.createHash('sha256');
let done = false;
while (!done) {
const status = await reader.read();
if (!status.done) {
hasher.update(status.value);
}
done = status.done;
}
return createHash('ripemd160').update(hasher.digest()).digest();
}

/**
* Generates the key and the iv by transforming a secret and a salt.
* It will generate the same key and iv if the same secret and salt is used.
Expand Down
79 changes: 72 additions & 7 deletions src/services/network/network-facade.service.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
import { Network } from '@internxt/sdk';
import * as NetworkUpload from '@internxt/sdk/dist/network/upload';
import * as NetworkDownload from '@internxt/sdk/dist/network/download';

import { Environment } from '@internxt/inxt-js';
import crypto from 'crypto';
import { DownloadOptions, UploadOptions, UploadProgressCallback } from '../../types/network.types';
import crypto, { createCipheriv } from 'crypto';
import { DownloadOptions, UploadOptions, ProgressCallback } from '../../types/network.types';
import {
DecryptFileFunction,
DownloadFileFunction,
Expand Down Expand Up @@ -58,7 +57,7 @@ export class NetworkFacade {
let fileStream: ReadableStream<Uint8Array>;
const abortable = options?.abortController ?? new AbortController();

const onProgress: UploadProgressCallback = (progress: number) => {
const onProgress: ProgressCallback = (progress: number) => {
if (!options?.progressCallback) return;
options.progressCallback(progress);
};
Expand Down Expand Up @@ -118,7 +117,7 @@ export class NetworkFacade {
* @param options The upload options
* @returns A promise to execute the upload and an abort controller to cancel the upload
*/
async uploadFromStream(
async uploadFromStreamUsingBlob(
bucketId: string,
mnemonic: string,
size: number,
Expand All @@ -129,7 +128,7 @@ export class NetworkFacade {
let fileHash: Buffer;
let encryptedBlob: Blob;

const onProgress: UploadProgressCallback = (progress: number) => {
const onProgress: ProgressCallback = (progress: number) => {
if (!options?.progressCallback) return;
options.progressCallback(progress);
};
Expand All @@ -139,7 +138,7 @@ export class NetworkFacade {
};

const encryptFile: EncryptFileFunction = async (_, key, iv) => {
const { blob, hash } = await this.cryptoService.encryptStream(
const { blob, hash } = await this.cryptoService.encryptStreamToFile(
from,
Buffer.from(key as ArrayBuffer),
Buffer.from(iv as ArrayBuffer),
Expand Down Expand Up @@ -177,4 +176,70 @@ export class NetworkFacade {

return [uploadOperation(), abortable];
}

/**
* Performs an upload encrypting the stream content
*
* @param bucketId The bucket where the file will be uploaded
* @param mnemonic The plain mnemonic of the user
* @param size The total size of the stream content
* @param type The mime type of the stream content
* @param from The source ReadStream to upload from
* @param options The upload options
* @returns A promise to execute the upload and an abort controller to cancel the upload
*/
async uploadFromStreamUsingStream(
bucketId: string,
mnemonic: string,
size: number,
from: ReadableStream<Uint8Array>,
options?: UploadOptions,
): Promise<[Promise<{ fileId: string; hash: Buffer }>, AbortController]> {
const abortable = options?.abortController ?? new AbortController();
let encryptedStream: ReadableStream<Uint8Array>;
let fileHash: Buffer;

const onProgress: ProgressCallback = (progress: number) => {
if (!options?.progressCallback) return;
options.progressCallback(progress);
};

const onUploadProgress = (progress: number) => {
onProgress(progress);
};

const encryptFile: EncryptFileFunction = async (algorithm, key, iv) => {
const cipher = createCipheriv('aes-256-ctr', key as Buffer, iv as Buffer);
encryptedStream = this.cryptoService.encryptReadable(from, cipher);
};

const uploadFile: UploadFileFunction = async (url) => {
const { hash } = await this.uploadService.uploadReadableStream(url, size, encryptedStream, {
abortController: abortable,
progressCallback: onUploadProgress,
});
fileHash = hash;
return fileHash.toString('hex');
};

const uploadOperation = async () => {
const uploadResult = await NetworkUpload.uploadFile(
this.network,
this.cryptoLib,
bucketId,
mnemonic,
size,
encryptFile,
uploadFile,
);

onProgress(1);
return {
fileId: uploadResult,
hash: fileHash,
};
};

return [uploadOperation(), abortable];
}
}
46 changes: 45 additions & 1 deletion src/services/network/upload.service.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import { UploadOptions } from '../../types/network.types';
import axios from 'axios';
import superagent from 'superagent';
import { UploadOptions } from '../../types/network.types';
import { CryptoService } from '../crypto.service';

export class UploadService {
public static readonly instance: UploadService = new UploadService();
Expand Down Expand Up @@ -30,4 +32,46 @@ export class UploadService {
}
return { etag };
}

async uploadReadableStream(
url: string,
size: number,
data: ReadableStream<Uint8Array>,
options: UploadOptions,
): Promise<{ etag: string; hash: Buffer }> {
const [dataReadableHash, dataReadableRequest] = data.tee();

const request = axios.put(url, dataReadableRequest, {
headers: {
'Content-Type': 'application/octet-stream',
'Content-Length': size.toString(),
maxContentLength: Infinity,
maxBodyLength: Infinity,
},
onUploadProgress: (progressEvent) => {
console.info({ progressEvent });
if (options.progressCallback && progressEvent.total) {
const reportedProgress = progressEvent.loaded / progressEvent.total;
options.progressCallback(reportedProgress);
}
},
signal: options.abortController?.signal,
cancelToken: new axios.CancelToken((canceler) => {
options.abortController?.signal.addEventListener('abort', () => {
canceler();
});
}),
});

const getFileHash = CryptoService.instance.readableToHash(dataReadableHash);

const [response, hash] = await Promise.all([request, getFileHash]);

const etag = response.headers.etag;
options.progressCallback(1);
if (!etag) {
throw new Error('Missing Etag in response when uploading file');
}
return { etag, hash };
}
}
4 changes: 2 additions & 2 deletions src/types/network.types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ export interface NetworkCredentials {
pass: string;
}

export type UploadProgressCallback = (progress: number) => void;
export type ProgressCallback = (progress: number) => void;
export interface NetworkOperationBaseOptions {
progressCallback: UploadProgressCallback;
progressCallback: ProgressCallback;
abortController?: AbortController;
}

Expand Down
8 changes: 3 additions & 5 deletions src/webdav/handlers/PUT.handler.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import { Request, Response } from 'express';
import path from 'path';
import { DriveFileService } from '../../services/drive/drive-file.service';
import { DriveRealmManager } from '../../services/realms/drive-realm-manager.service';
import { NetworkFacade } from '../../services/network/network-facade.service';
Expand Down Expand Up @@ -43,7 +42,7 @@ export class PUTRequestHandler implements WebDavMethodHandler {

const { user, mnemonic } = await this.dependencies.authService.getAuthDetails();

const [uploadPromise] = await this.dependencies.networkFacade.uploadFromStream(
const [uploadPromise, _] = await this.dependencies.networkFacade.uploadFromStreamUsingStream(
user.bucket,
mnemonic,
contentLength,
Expand All @@ -54,8 +53,7 @@ export class PUTRequestHandler implements WebDavMethodHandler {
},
},
);

const uploadResult = await uploadPromise;
const { fileId } = await uploadPromise;

webdavLogger.info('✅ File uploaded to network');

Expand All @@ -64,7 +62,7 @@ export class PUTRequestHandler implements WebDavMethodHandler {
type: resource.path.ext.replaceAll('.', ''),
size: contentLength,
folderId: driveFolder.id,
fileId: uploadResult.fileId,
fileId: fileId,
bucket: user.bucket,
});

Expand Down
2 changes: 1 addition & 1 deletion test/services/crypto.service.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ describe('Crypto service', () => {
const file = path.join(process.cwd(), 'test/fixtures/test-content.fixture.txt');
const readStream = createReadStream(file);

const result = await CryptoService.instance.encryptStream(
const result = await CryptoService.instance.encryptStreamToFile(
StreamUtils.readStreamToReadableStream(readStream),
key,
iv,
Expand Down
4 changes: 2 additions & 2 deletions test/services/network/network-facade.service.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ describe('Network Facade Service', () => {
abortController: new AbortController(),
};

const result = await sut.uploadFromStream(
const result = await sut.uploadFromStreamUsingBlob(
'f1858bc9675f9e4f7ab29429',
'animal fog wink trade december thumb sight cousin crunch plunge captain enforce letter creek text',
100,
Expand All @@ -70,7 +70,7 @@ describe('Network Facade Service', () => {
};

networkFacadeSandbox.stub(NetworkUpload, 'uploadFile').resolves('uploaded_file_id');
const [executeUpload] = await sut.uploadFromStream(
const [executeUpload] = await sut.uploadFromStreamUsingBlob(
'f1858bc9675f9e4f7ab29429',
'animal fog wink trade december thumb sight cousin crunch plunge captain enforce letter creek text',
100,
Expand Down
2 changes: 1 addition & 1 deletion test/webdav/handlers/PUT.handler.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ describe('PUT request handler', () => {
.resolves({ mnemonic: 'MNEMONIC', token: 'TOKEN', newToken: 'NEW_TOKEN', user: UserFixture });

sandbox
.stub(networkFacade, 'uploadFromStream')
.stub(networkFacade, 'uploadFromStreamUsingStream')
.resolves([Promise.resolve({ fileId: '09218313209', hash: Buffer.from('test') }), new AbortController()]);
sandbox.stub(DriveFileService.instance, 'createFile').resolves();
sandbox.stub(driveRealmManager, 'createFile').resolves();
Expand Down

0 comments on commit 0f1c654

Please sign in to comment.