Skip to content

Commit

Permalink
added multi-part upload functionality
Browse files Browse the repository at this point in the history
  • Loading branch information
larryrider committed Mar 22, 2024
1 parent 0f1c654 commit 449b2b1
Show file tree
Hide file tree
Showing 9 changed files with 274 additions and 109 deletions.
3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
"@internxt/prettier-config": "^1.0.2",
"@internxt/sdk": "^1.4.72",
"@oclif/core": "^3",
"axios": "^1.6.7",
"axios": "^1.6.8",
"bip39": "^3.1.0",
"body-parser": "^1.20.2",
"chalk": "^5.3.0",
Expand All @@ -55,6 +55,7 @@
"@oclif/prettier-config": "^0.2.1",
"@oclif/test": "^3",
"@openpgp/web-stream-tools": "0.0.11-patch-0",
"@types/async": "^3.2.24",
"@types/chai": "^4",
"@types/express": "^4.17.21",
"@types/mime-types": "^2.1.4",
Expand Down
55 changes: 50 additions & 5 deletions src/services/crypto.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ export class CryptoService {
return Buffer.concat([decipher.update(contentsToDecrypt), decipher.final()]).toString('utf8');
};

public encryptReadable(readable: ReadableStream<Uint8Array>, cipher: Cipher): ReadableStream<Uint8Array> {
private encryptReadable(readable: ReadableStream<Uint8Array>, cipher: Cipher): ReadableStream<Uint8Array> {
const reader = readable.getReader();

const encryptedFileReadable = new ReadableStream({
Expand All @@ -132,6 +132,28 @@ export class CryptoService {
return encryptedFileReadable;
}

/**
* Given a stream and a cipher, encrypt its content on pull
* @param readable Readable stream
* @param cipher Cipher used to encrypt the content
* @returns A readable whose output is the encrypted content of the source stream
*/
public encryptReadableOnPull(readable: ReadableStream<Uint8Array>, cipher: Cipher): ReadableStream<Uint8Array> {
const reader = readable.getReader();

return new ReadableStream({
async pull(controller) {
const status = await reader.read();

if (!status.done) {
controller.enqueue(cipher.update(status.value));
} else {
controller.close();
}
},
});
}

public async decryptStream(inputSlices: ReadableStream<Uint8Array>[], key: Buffer, iv: Buffer) {
const decipher = createDecipheriv('aes-256-ctr', key, iv);
const encryptedStream = StreamUtils.joinReadableBinaryStreams(inputSlices);
Expand Down Expand Up @@ -191,18 +213,41 @@ export class CryptoService {
};
}

public async readableToHash(readable: ReadableStream<Uint8Array>): Promise<Buffer> {
const reader = readable.getReader();
public async processEveryFileBlobReturnHash(
chunkedFileReadable: ReadableStream<Uint8Array>,
onEveryBlob: (blob: Blob) => Promise<void>,
): Promise<string> {
const reader = chunkedFileReadable.getReader();
const hasher = crypto.createHash('sha256');

let done = false;

while (!done) {
const status = await reader.read();
if (!status.done) {
hasher.update(status.value);
const value = status.value;
hasher.update(value);
const blob = new Blob([value], { type: 'application/octet-stream' });
await onEveryBlob(blob);
}
done = status.done;
}
return createHash('ripemd160').update(hasher.digest()).digest();

return createHash('ripemd160').update(hasher.digest()).digest('hex');
}

public encryptStreamInParts(
stream: ReadableStream<Uint8Array>,
size: number,
cipher: Cipher,
parts: number,
): ReadableStream<Uint8Array> {
// We include a marginChunkSize because if we split the chunk directly, there will always be one more chunk left, this will cause a mismatch with the urls provided
const marginChunkSize = 1024;
const chunkSize = size / parts + marginChunkSize;
const readableFileChunks = StreamUtils.streamReadableIntoChunks(stream, chunkSize);

return this.encryptReadableOnPull(readableFileChunks, cipher);
}

/**
Expand Down
151 changes: 111 additions & 40 deletions src/services/network/network-facade.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,25 @@ import * as NetworkUpload from '@internxt/sdk/dist/network/upload';
import * as NetworkDownload from '@internxt/sdk/dist/network/download';
import { Environment } from '@internxt/inxt-js';
import crypto, { createCipheriv } from 'crypto';
import { DownloadOptions, UploadOptions, ProgressCallback } from '../../types/network.types';
import {
DownloadOptions,
UploadOptions,
ProgressCallback,
UploadMultipartOptions,
UploadTask,
} from '../../types/network.types';
import {
DecryptFileFunction,
DownloadFileFunction,
EncryptFileFunction,
UploadFileFunction,
UploadFileMultipartFunction,
} from '@internxt/sdk/dist/network';
import { CryptoService } from '../crypto.service';
import { UploadService } from './upload.service';
import { DownloadService } from './download.service';
import { ValidationService } from '../validation.service';
import { QueueObject, queue } from 'async';

export class NetworkFacade {
private readonly cryptoLib: Network.Crypto;
Expand Down Expand Up @@ -178,68 +186,131 @@ export class NetworkFacade {
}

/**
* Performs an upload encrypting the stream content
* Performs a multipart upload encrypting the stream content
*
* @param bucketId The bucket where the file will be uploaded
* @param mnemonic The plain mnemonic of the user
* @param size The total size of the stream content
* @param type The mime type of the stream content
* @param from The source ReadStream to upload from
* @param options The upload options
* @returns A promise to execute the upload and an abort controller to cancel the upload
* @param options The upload multipart options
* @returns A promise with the resulting fileId
*/
async uploadFromStreamUsingStream(
async uploadMultipart(
bucketId: string,
mnemonic: string,
size: number,
from: ReadableStream<Uint8Array>,
options?: UploadOptions,
): Promise<[Promise<{ fileId: string; hash: Buffer }>, AbortController]> {
const abortable = options?.abortController ?? new AbortController();
let encryptedStream: ReadableStream<Uint8Array>;
let fileHash: Buffer;
options: UploadMultipartOptions,
): Promise<string> {
const partsUploadedBytes: Record<number, number> = {};

const onProgress: ProgressCallback = (progress: number) => {
if (!options?.progressCallback) return;
options.progressCallback(progress);
};
function notifyProgress(partId: number, uploadedBytes: number) {
partsUploadedBytes[partId] = uploadedBytes;

const onUploadProgress = (progress: number) => {
onProgress(progress);
};
const totalUploaded = Object.values(partsUploadedBytes).reduce((a, p) => a + p, 0);
const progress = totalUploaded / size;

options.uploadingCallback(progress);
}

const uploadsAbortController = new AbortController();
options.abortController?.signal.addEventListener('abort', () => uploadsAbortController.abort());

let realError: Error | null = null;
let encryptedStream: ReadableStream<Uint8Array>;
const fileParts: { PartNumber: number; ETag: string }[] = [];

const encryptFile: EncryptFileFunction = async (algorithm, key, iv) => {
const cipher = createCipheriv('aes-256-ctr', key as Buffer, iv as Buffer);
encryptedStream = this.cryptoService.encryptReadable(from, cipher);
encryptedStream = this.cryptoService.encryptStreamInParts(from, size, cipher, options.parts);
};

const uploadFile: UploadFileFunction = async (url) => {
const { hash } = await this.uploadService.uploadReadableStream(url, size, encryptedStream, {
abortController: abortable,
progressCallback: onUploadProgress,
const uploadFileMultipart: UploadFileMultipartFunction = async (urls: string[]) => {
let partIndex = 0;
const limitConcurrency = 10;

const worker = async (upload: UploadTask) => {
const { etag } = await this.uploadService.uploadFile(upload.urlToUpload, upload.contentToUpload, {
progressCallback: (progress) => {
notifyProgress(upload.index, upload.contentToUpload.size * progress);
},
abortController: uploadsAbortController,
});

if (!etag) {
throw new Error('ETag header was not returned');
}
fileParts.push({
ETag: etag,
PartNumber: upload.index + 1,
});
};

const uploadQueue: QueueObject<UploadTask> = queue<UploadTask>(function (task, callback) {
worker(task)
.then(() => {
callback();
})
.catch((e) => {
callback(e);
});
}, limitConcurrency);

const fileHash = await this.cryptoService.processEveryFileBlobReturnHash(encryptedStream, async (blob) => {
if (uploadsAbortController.signal.aborted) {
if (realError) throw realError;
else throw new Error('Upload cancelled by user');
}

let errorAlreadyThrown = false;

uploadQueue
.pushAsync({
contentToUpload: blob,
urlToUpload: urls[partIndex],
index: partIndex++,
})
.catch((err: Error) => {
if (errorAlreadyThrown) return;

errorAlreadyThrown = true;
if (err) {
uploadQueue.kill();
if (!uploadsAbortController?.signal.aborted) {
// Failed due to other reason, so abort requests
uploadsAbortController.abort();
// TODO: Do it properly with ```options.abortController?.abort(err.message);``` available from Node 17.2.0 in advance
// https://github.com/node-fetch/node-fetch/issues/1462
realError = err;
}
}
});

/**
* TODO: Memory leak here, probably due to closures usage with this variable.
* Pending to be solved, do not remove this line unless the leak is solved.
*/
blob = new Blob([]);
});
fileHash = hash;
return fileHash.toString('hex');
};

const uploadOperation = async () => {
const uploadResult = await NetworkUpload.uploadFile(
this.network,
this.cryptoLib,
bucketId,
mnemonic,
size,
encryptFile,
uploadFile,
);
while (uploadQueue.running() > 0 || uploadQueue.length() > 0) {
await uploadQueue.drain();
}

onProgress(1);
return {
fileId: uploadResult,
hash: fileHash,
parts: fileParts.sort((pA, pB) => pA.PartNumber - pB.PartNumber),
};
};

return [uploadOperation(), abortable];
return NetworkUpload.uploadMultipartFile(
this.network,
this.cryptoLib,
bucketId,
mnemonic,
size,
encryptFile,
uploadFileMultipart,
options.parts,
);
}
}
45 changes: 0 additions & 45 deletions src/services/network/upload.service.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
import axios from 'axios';
import superagent from 'superagent';
import { UploadOptions } from '../../types/network.types';
import { CryptoService } from '../crypto.service';

export class UploadService {
public static readonly instance: UploadService = new UploadService();

Expand Down Expand Up @@ -32,46 +29,4 @@ export class UploadService {
}
return { etag };
}

async uploadReadableStream(
url: string,
size: number,
data: ReadableStream<Uint8Array>,
options: UploadOptions,
): Promise<{ etag: string; hash: Buffer }> {
const [dataReadableHash, dataReadableRequest] = data.tee();

const request = axios.put(url, dataReadableRequest, {
headers: {
'Content-Type': 'application/octet-stream',
'Content-Length': size.toString(),
maxContentLength: Infinity,
maxBodyLength: Infinity,
},
onUploadProgress: (progressEvent) => {
console.info({ progressEvent });
if (options.progressCallback && progressEvent.total) {
const reportedProgress = progressEvent.loaded / progressEvent.total;
options.progressCallback(reportedProgress);
}
},
signal: options.abortController?.signal,
cancelToken: new axios.CancelToken((canceler) => {
options.abortController?.signal.addEventListener('abort', () => {
canceler();
});
}),
});

const getFileHash = CryptoService.instance.readableToHash(dataReadableHash);

const [response, hash] = await Promise.all([request, getFileHash]);

const etag = response.headers.etag;
options.progressCallback(1);
if (!etag) {
throw new Error('Missing Etag in response when uploading file');
}
return { etag, hash };
}
}
16 changes: 16 additions & 0 deletions src/types/network.types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,19 @@ export interface NetworkOperationBaseOptions {

export type UploadOptions = NetworkOperationBaseOptions;
export type DownloadOptions = NetworkOperationBaseOptions;

export interface UploadMultipartOptions {
uploadingCallback: ProgressCallback;
abortController?: AbortController;
continueUploadOptions?: {
taskId: string;
isPaused: boolean;
};
parts: number;
}

export interface UploadTask {
contentToUpload: Blob;
urlToUpload: string;
index: number;
}
Loading

0 comments on commit 449b2b1

Please sign in to comment.