Skip to content

Commit

Permalink
Merge pull request #24 from penwern/fix-multipart-checksums
Browse files Browse the repository at this point in the history
Worker pool / task queue strategy
  • Loading branch information
Sunday-Crunk authored Nov 20, 2024
2 parents ff6b993 + 2744d82 commit 2ee6a9d
Showing 1 changed file with 59 additions and 43 deletions.
102 changes: 59 additions & 43 deletions src/js/core/WorkerManager.js
Original file line number Diff line number Diff line change
@@ -1,68 +1,84 @@
class CurateWorkerManager {
constructor() {
constructor(poolSize = 5) {
this.poolSize = poolSize;
this.workers = new Map(); // Map of active workers
this.taskQueue = [];
this.isProcessing = false;
this.worker = null;
this.currentTasks = new Map(); // Track current task for each worker
}

initWorker() {
if (this.worker) {
this.worker.terminate();
}
const worker = new Worker("/workers/hashWorker.js");
const workerId = crypto.randomUUID();

// Load the worker from jsDelivr
const workerUrl = "/workers/hashWorker.js";
this.worker = new Worker(workerUrl);
console.log("Worker initialized: ", this.worker);
this.setupWorkerHandlers();
}
setupWorkerHandlers() {
this.worker.onmessage = (event) => {
if (event.data.status === "complete" && this.currentResolve) {
this.currentResolve({
file: this.currentFile,
hash: event.data.hash,
name: this.currentFile.name,
});
worker.onmessage = (event) => {
if (event.data.status === "complete") {
const currentTask = this.currentTasks.get(workerId);
if (currentTask) {
currentTask.resolve({
file: currentTask.file,
hash: event.data.hash,
name: currentTask.file.name,
});
this.currentTasks.delete(workerId);
}
this.processNextTask(workerId, worker);
}
this.processNextTask();
};

this.worker.onerror = (event) => {
if (this.currentReject) {
this.currentReject("Worker error: " + event.message);
worker.onerror = (event) => {
const currentTask = this.currentTasks.get(workerId);
if (currentTask) {
currentTask.reject("Worker error: " + event.message);
this.currentTasks.delete(workerId);
}
this.processNextTask();
this.processNextTask(workerId, worker);
};

this.workers.set(workerId, worker);
return workerId;
}

generateChecksum(file) {
return new Promise((resolve, reject) => {
this.taskQueue.push({ file, resolve, reject });
if (!this.isProcessing) {
this.processNextTask();
}
const task = { file, resolve, reject };
this.taskQueue.push(task);
this.ensureWorkers();
});
}

processNextTask() {
ensureWorkers() {
// Create workers up to pool size if there are pending tasks
if (this.taskQueue.length > 0) {
if (!this.worker) {
this.initWorker();
while (this.workers.size < this.poolSize) {
const workerId = this.initWorker();
this.processNextTask(workerId, this.workers.get(workerId));
}
}
}

processNextTask(workerId, worker) {
if (this.taskQueue.length > 0) {
const task = this.taskQueue.shift();
this.currentResolve = task.resolve;
this.currentReject = task.reject;
this.currentFile = task.file;
this.isProcessing = true;
this.worker.postMessage({ file: task.file, msg: "begin hash" });
} else {
this.isProcessing = false;
if (this.worker) {
this.worker.terminate();
this.worker = null;
}
this.currentTasks.set(workerId, task);
worker.postMessage({ file: task.file, msg: "begin hash" });
} else if (this.currentTasks.size === 0) {
// No more tasks in queue and no running tasks - cleanup workers
this.cleanupWorkers();
}
}

cleanupWorkers() {
for (const [workerId, worker] of this.workers) {
worker.terminate();
}
this.workers.clear();
}

// Optional: Method to manually cleanup if needed
terminate() {
this.cleanupWorkers();
this.taskQueue = [];
this.currentTasks.clear();
}
}

Expand Down

0 comments on commit 2ee6a9d

Please sign in to comment.