Skip to content

Commit

Permalink
Finish implementation of chunked uploads
Browse files Browse the repository at this point in the history
  • Loading branch information
Alvinn8 committed Dec 24, 2024
1 parent eb06858 commit 3fe8480
Show file tree
Hide file tree
Showing 5 changed files with 102 additions and 20 deletions.
1 change: 1 addition & 0 deletions src/common/ui/FileFormats.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ export function getIconFor(fileName: string): string {
case ".yaml":
case ".toml":
case ".json":
case ".json5":
case ".cfg":
case ".conf":
case ".ini":
Expand Down
2 changes: 1 addition & 1 deletion src/common/version.ts
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
const VERSION = "September 16";
const VERSION = "December 25";
export default VERSION;
6 changes: 4 additions & 2 deletions src/protocol/packets.ts
Original file line number Diff line number Diff line change
Expand Up @@ -88,16 +88,18 @@ export interface ChunkedUploadData {
end: number;
}
/**
* success: The chunk was uploaded successfully, proceed to next chunk.
* success: The chunk has been scheduled to uploaded successfully, proceed to next chunk.
* end: The upload has been completed.
* desync: This chunk did not arrive in correct order.
* 404: No upload was found for the provided uploadId.
* malsized: The start and end byte offsets did not align with the size of the chunk.
* hijack: The upload was started by a different connection than this one.
* error: A previous chunk failed to upload to the server.
*/
export type ChunkedUploadStatus = "success" | "end" | "desync" | "404" | "malsized" | "hijack";
export type ChunkedUploadStatus = "success" | "end" | "desync" | "404" | "malsized" | "hijack" | "error";
export interface ChunkedUploadResponse {
status: ChunkedUploadStatus;
error?: string;
}

export const packetMap = new Map<number, Packet<any, any>>();
Expand Down
44 changes: 39 additions & 5 deletions src/server/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,6 @@ const PROTOCOL_VERSION = 1;

type ProtocolType = "json";

const LARGE_FILE_THRESHOLD = 10E6; // 10 MB
const CHUNK_SIZE = 2 * 1024 * 1024; // 5 MB

interface LargeDownload {
id: string;
path: string;
Expand All @@ -35,6 +32,9 @@ interface ChunkedUpload {
stream: PassThrough;
uploadPromise: Promise<ftp.FTPResponse>;
offset: number;
pendingChunks: number;
maxPendingChunks: number;
error: Error | null;
}

const largeDownloads: LargeDownload[] = [];
Expand Down Expand Up @@ -67,6 +67,10 @@ function showErrorToUser(error: any): string | null {
return null;
}

function sleep(ms: number) {
return new Promise(resolve => setInterval(resolve, ms));
}

const httpServer = createServer(function (req, res) {
const headers = {
"Access-Control-Allow-Origin": "*",
Expand Down Expand Up @@ -394,7 +398,10 @@ handler(Packets.ChunkedUploadStart, async (packet, data, connection) => {
connection,
stream,
uploadPromise,
offset: 0
offset: 0,
pendingChunks: 0,
maxPendingChunks: 2,
error: null
});

return { uploadId };
Expand All @@ -418,6 +425,13 @@ handler(Packets.ChunkedUpload, async (packet, data, connection) => {
}
}

if (chunkedUpload.error) {
return {
status: "error" as Status,
error: showErrorToUser(chunkedUpload.error)
};
}

const buffer = Buffer.from(data.data, "base64");
if (buffer.byteLength !== data.end - data.start) {
return { status: "malsized" as Status };
Expand All @@ -427,7 +441,27 @@ handler(Packets.ChunkedUpload, async (packet, data, connection) => {
return { status: "desync" as Status };
}

chunkedUpload.stream.write(buffer);
chunkedUpload.pendingChunks++;
chunkedUpload.stream.write(buffer, (err) => {
if (err) {
chunkedUpload.error = err;
}
chunkedUpload.pendingChunks--;
});

connection.log("pendingChunks = " + chunkedUpload.pendingChunks);


let sleepCount = 0;
while (chunkedUpload.pendingChunks >= chunkedUpload.maxPendingChunks && sleepCount++ < 1000) {
// We have a few chunks in the queue now. We can wait a little.
await sleep(50);
}

if (sleepCount > 1) {
connection.log("slept, pendingChunks = " + chunkedUpload.pendingChunks);
}

chunkedUpload.offset += buffer.length;

if (chunkedUpload.offset === chunkedUpload.size) {
Expand Down
69 changes: 57 additions & 12 deletions src/web/WebsocketFTPConnection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,15 @@ export const PROTOCOL_TYPE = "json";
export const PROTOCOL_VERSION = 1;

const LARGE_FILE_THRESHOLD = 10E6; // 10 MB
const CHUNK_SIZE = 2 * 1024 * 1024; // 5 MB

const CHUNK_SIZES = [
512 * 1024, // 512 kB
2 * 1024 * 1024, // 2 MB
8 * 1024 * 1024, // 8 MB
];
const DEFAULT_CHUNK_SIZE = CHUNK_SIZES[0];
const DECREASE_CHUNK_SIZE_THRESHOLD = 5000; // 5 s
const INCREASE_CHUNK_SIZE_THRESHOLD = 500; // 500 ms

/**
* The URL to the websocket to connect to.
Expand Down Expand Up @@ -65,7 +73,7 @@ function progressTracker(type: "download" | "upload", path: string) {
return (event: ProgressEvent) => {
const op: LargeFileOperationInterface = {
type,
fileName: path.substring(path.lastIndexOf('/') + 1),
fileName: filename(path),
hasProgress: false,
loaded: 0,
total: 0
Expand Down Expand Up @@ -211,11 +219,10 @@ export default class WebsocketFTPConnection implements FTPConnection {

private async keepAlive(xhr: XMLHttpRequest) {
// It is very important that the connection to the FTP server isn't closed
// while we are uploading or downloading large files. Since the websocket
// while we are downloading large files over HTTP. Since the websocket
// connection isn't used for this process, we need to ensure the connection
// isn't closed, because closing the connection causes the FTP connection
// to close.
// TODO update comment
// We therefore send ping messages during the upload or download.
const intervalId = setInterval(() => {
if (xhr.readyState === XMLHttpRequest.DONE) {
Expand Down Expand Up @@ -283,38 +290,76 @@ export default class WebsocketFTPConnection implements FTPConnection {
path,
size: blob.size
});
const chunkCount = Math.ceil(blob.size / CHUNK_SIZE);
let chunkSize = DEFAULT_CHUNK_SIZE;

let lastStatus = "";
for (let i = 0; i < chunkCount; i++) {
const start = i * CHUNK_SIZE;
const end = Math.min(start + CHUNK_SIZE, blob.size);
let offset = 0;
while (offset < blob.size) {
const start = offset;
const end = Math.min(start + chunkSize, blob.size);
const chunk = blob.slice(start, end);
const base64 = await blobToBase64(chunk);

console.log(`Uploading chunk ${i}/${chunkCount - 1}, bytes ${start} to ${end}.`);
const { status } = await this.send(Packets.ChunkedUpload, {
console.log(`Uploading chunk with bytes ${start} to ${end}.`);
const startTime = performance.now();
const response = await this.send(Packets.ChunkedUpload, {
uploadId,
data: base64,
start,
end
});
const status = response.status;
offset += chunk.size;
lastStatus = status;
if (status !== "success" && status !== "end") {
// TODO maybe error here straight away?
// currently we just let the file size check verify stuff.
console.log("Status: " + status);
if (status === "error") {
console.error(response.error);
}
break;
}
// TODO report progress

// Progress bar
largeFileOperationStore.setValue({
type: "upload",
fileName: filename(path),
hasProgress: true,
loaded: end,
total: blob.size
});

// Adjust chunk size if applicable
const endTime = performance.now();
const ms = endTime - startTime;
const chunkSizeIndex = CHUNK_SIZES.indexOf(chunkSize);
if (chunkSizeIndex > 0 && ms > DECREASE_CHUNK_SIZE_THRESHOLD) {
// The chunk size can become smaller, and the decrease threshold was reached.
chunkSize = CHUNK_SIZES[chunkSizeIndex - 1];
console.log(`Chunk took ${Math.round(ms)} ms, decreasing chunk size.`);
} else if (chunkSizeIndex < CHUNK_SIZES.length - 1 && ms < INCREASE_CHUNK_SIZE_THRESHOLD) {
// The chunk size can become bigger, and the increase threshold was reached.
chunkSize = CHUNK_SIZES[chunkSizeIndex + 1];
console.log(`Chunk took ${Math.round(ms)} ms, increasing chunk size.`);
} else {
console.log(`Chunk took ${Math.round(ms)} ms, chunk size is good.`);
}
}

// Verify file size
const listResult = await this.send(Packets.List, {
path: dirname(path)
});
largeFileOperationStore.setValue(null);
const fileName = filename(path);
const fileInfo = listResult.files.find(f => f.name === fileName);
if (!fileInfo || fileInfo.size !== blob.size) {
if (!fileInfo) {
await (new Promise<void>(resolve => {
Dialog.message("Upload failed", "Unknown error", () => resolve());
}));
throw new Error("Chunked upload failed.");
} else if (fileInfo.size !== blob.size) {
await (new Promise<void>(resolve => {
Dialog.message(
"Partial upload failed",
Expand Down

0 comments on commit 3fe8480

Please sign in to comment.