Skip to content

Commit

Permalink
Removed the need to extend abstract WorkerTaskWorker. Further aligned…
Browse files Browse the repository at this point in the history
… APIs
  • Loading branch information
kaisalmen committed Nov 3, 2023
1 parent af1ab0e commit 76dd1cb
Show file tree
Hide file tree
Showing 25 changed files with 313 additions and 225 deletions.
10 changes: 5 additions & 5 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions packages/examples/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@
"dependencies": {
"lil-gui": "~0.19.0",
"three": "~0.158.0",
"wtd-core": "~2.4.0-next.2",
"wtd-three-ext": "~2.4.0-next.2",
"wtd-core": "~2.4.0-next.3",
"wtd-three-ext": "~2.4.0-next.3",
"wwobjloader2": "6.2.0-next.2"
},
"devDependencies": {
Expand Down
37 changes: 20 additions & 17 deletions packages/examples/src/com/WorkerCom.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,39 +25,42 @@ class HelloWorldStandardWorkerExample {
try {
const channel = new MessageChannel();

// no need to send an init message, just await all init
const promisesinit = [];
promisesinit.push(workerTaskCom1.initWorker());
promisesinit.push(workerTaskCom2.initWorker());
await Promise.all(promisesinit);

const t0 = performance.now();

const execCom1 = new WorkerTaskMessage();
const initCom1 = new WorkerTaskMessage();
const payload1 = new RawPayload();
payload1.message.raw = { port: channel.port1 };
execCom1.addPayload(payload1);
initCom1.addPayload(payload1);

const execCom2 = new WorkerTaskMessage();
const initCom2 = new WorkerTaskMessage();
const payload2 = new RawPayload();
payload2.message.raw = { port: channel.port2 };
execCom2.addPayload(payload2);
initCom2.addPayload(payload2);

const promisesinit = [];
promisesinit.push(workerTaskCom1.initWorker({
message: initCom1,
transferables: [channel.port1]
}));
promisesinit.push(workerTaskCom2.initWorker({
message: initCom2,
transferables: [channel.port2],
}));
await Promise.all(promisesinit);

const t0 = performance.now();

const onComplete = (message: WorkerTaskMessageType) => {
console.log('Received final command: ' + message.cmd);
const rawPayload = message.payloads[0] as RawPayload;
console.log(`Worker said onComplete: ${rawPayload.message.raw.hello}`);
console.log(`Worker said onComplete: ${rawPayload.message.raw.finished}`);
};

const promisesExec = [];
promisesExec.push(workerTaskCom1.executeWorker({
message: execCom1,
transferables: [channel.port1],
message: new WorkerTaskMessage(),
onComplete
}));
promisesExec.push(workerTaskCom2.executeWorker({
message: execCom2,
transferables: [channel.port2],
message: new WorkerTaskMessage(),
onComplete
}));

Expand Down
4 changes: 2 additions & 2 deletions packages/examples/src/helloWorld/HelloWorldWorkerTask.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@ class HelloWorldWorkerTaskExample {
url: new URL(import.meta.env.DEV ? '../worker/HelloWorldWorker.ts' : '../worker/generated/HelloWorldWorker-es.js', import.meta.url)
}, true);

const initMessage = new WorkerTaskMessage();
const message = new WorkerTaskMessage();

try {
// init the worker task without any payload (worker init without function invocation on worker)
const resultInit = await workerTask.initWorker(initMessage);
const resultInit = await workerTask.initWorker({ message });
console.log(`initTaskType then: ${resultInit}`);

const t0 = performance.now();
Expand Down
20 changes: 16 additions & 4 deletions packages/examples/src/infinite/PotentiallyInfiniteExample.ts
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,9 @@ class PotentiallyInfiniteExample {
id: taskDescr.id,
name: taskDescr.name
});
awaiting.push(this.workerTaskDirector.initTaskType(taskDescr.name, initMessage));
awaiting.push(this.workerTaskDirector.initTaskType(taskDescr.name, {
message: initMessage
}));
}

taskDescr = this.taskInfiniteWorkerInternalGeometry;
Expand All @@ -315,7 +317,9 @@ class PotentiallyInfiniteExample {
id: taskDescr.id,
name: taskDescr.name
});
awaiting.push(this.workerTaskDirector.initTaskType(taskDescr.name, initMessage));
awaiting.push(this.workerTaskDirector.initTaskType(taskDescr.name, {
message: initMessage
}));
}

taskDescr = this.taskInfiniteWorkerExternalGeometry;
Expand All @@ -338,7 +342,11 @@ class PotentiallyInfiniteExample {

initMessage.addPayload(meshPayload);
const transferables = pack(initMessage.payloads, false);
awaiting.push(this.workerTaskDirector.initTaskType(taskDescr.name, initMessage, transferables));
awaiting.push(this.workerTaskDirector.initTaskType(taskDescr.name, {
message: initMessage,
transferables,
copyTransferables: true
}));
}

taskDescr = this.taskObjLoader2Worker;
Expand Down Expand Up @@ -382,7 +390,11 @@ class PotentiallyInfiniteExample {
initMessage.addPayload(dataPayload);

const transferables = pack(initMessage.payloads, false);
await this.workerTaskDirector.initTaskType(initMessage.name, initMessage, transferables);
await this.workerTaskDirector.initTaskType(initMessage.name, {
message: initMessage,
transferables,
copyTransferables: true
});
console.timeEnd('All tasks have been initialized');
this.executeWorkers();
}
Expand Down
10 changes: 8 additions & 2 deletions packages/examples/src/threejs/Threejs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,9 @@ class WorkerTaskDirectorExample {
url: new URL(import.meta.env.DEV ? '../worker/HelloWorldThreeWorker.ts' : '../worker/generated/HelloWorldThreeWorker-es.js', import.meta.url)
});
this.tasksToUse.push(helloWorldInitMessage.name);
awaiting.push(this.workerTaskDirector.initTaskType(helloWorldInitMessage.name, helloWorldInitMessage));
awaiting.push(this.workerTaskDirector.initTaskType(helloWorldInitMessage.name, {
message: helloWorldInitMessage
}));

const objLoaderInitMessage = new WorkerTaskMessage({
id: 0,
Expand Down Expand Up @@ -188,7 +190,11 @@ class WorkerTaskDirectorExample {
objLoaderInitMessage.addPayload(materialsPayload);

const transferables = pack(objLoaderInitMessage.payloads, false);
await this.workerTaskDirector.initTaskType(objLoaderInitMessage.name, objLoaderInitMessage, transferables);
await this.workerTaskDirector.initTaskType(objLoaderInitMessage.name, {
message: objLoaderInitMessage,
transferables,
copyTransferables: true
});
console.timeEnd('Init tasks');
await this.executeTasks();
}
Expand Down
10 changes: 8 additions & 2 deletions packages/examples/src/transferables/TransferablesTestbed.ts
Original file line number Diff line number Diff line change
Expand Up @@ -223,10 +223,16 @@ class TransferablesTestbed {
initMessage.addPayload(meshPayload);

const transferables = pack(initMessage.payloads, false);
return this.workerTaskDirector.initTaskType(initMessage.name, initMessage, transferables);
return this.workerTaskDirector.initTaskType(initMessage.name, {
message: initMessage,
transferables,
copyTransferables: true
});
}
else {
return this.workerTaskDirector.initTaskType(initMessage.name, initMessage);
return this.workerTaskDirector.initTaskType(initMessage.name, {
message: initMessage
});
}
}

Expand Down
57 changes: 41 additions & 16 deletions packages/examples/src/worker/Com1Worker.ts
Original file line number Diff line number Diff line change
@@ -1,39 +1,64 @@
import {
InterComPortHandler,
InterComWorker,
RawPayload,
WorkerTaskCommandRequest,
WorkerTaskCommandResponse,
WorkerTaskDefaultWorker,
WorkerTaskMessageType,
WorkerTaskWorker,
comRouting,
createFromExisting
} from 'wtd-core';

export class Com1Worker extends WorkerTaskDefaultWorker {
export class Com1Worker implements WorkerTaskWorker, InterComWorker {

interComIntermediate(message: WorkerTaskMessageType): void {
const rawPayload = message.payloads[0] as RawPayload;
console.log(`Worker2 said: ${rawPayload.message.raw.hello}`);
private icph = new InterComPortHandler();

// after receiving the message from Com2Worker, send execComplete to main
const execComplete = createFromExisting(message, WorkerTaskCommandResponse.EXECUTE_COMPLETE);
const payload = new RawPayload({ hello: 'Worker 1 finished!' });
execComplete.addPayload(payload);
init(message: WorkerTaskMessageType): void {
// register the default com-routing function for inter-worker communication
this.icph.registerPort('com2', message.payloads[0], message => comRouting(this, message));

// send initComplete to main
const initComplete = createFromExisting({} as WorkerTaskMessageType, WorkerTaskCommandResponse.INIT_COMPLETE);
const payload = new RawPayload({ hello: 'Worker 1 initComplete!' });
initComplete.addPayload(payload);

// no need to pack as there aren't any buffers used
this.postMessage(execComplete);
self.postMessage(initComplete);
}

execute(message: WorkerTaskMessageType) {
// register the default com-routing function for inter-worker communication
this.registerPort('com2', message.payloads[0]);

// send message with cmd 'interComIntermediate' to Com2Worker
const sendWorker2 = createFromExisting(message, WorkerTaskCommandRequest.INTERCOM_INTERMEDIATE);
const payload = new RawPayload({ hello: 'Hi Worker 2!' });
sendWorker2.addPayload(payload);

this.postMessageOnPort('com2', sendWorker2);
this.icph.postMessageOnPort('com2', sendWorker2);
}

interComIntermediate(message: WorkerTaskMessageType): void {
const rawPayload = message.payloads[0] as RawPayload;
console.log(`Worker 2 said: ${rawPayload.message.raw.hello}`);

// after receiving the message from Com2Worker, send interComIntermediateConfirm to worker 2
const intermediateConfirm = createFromExisting(message, WorkerTaskCommandResponse.INTERCOM_INTERMEDIATE_CONFIRM);
const payload = new RawPayload({ confirmed: 'Hi Worker 2. I confirm!' });
intermediateConfirm.addPayload(payload);

this.icph.postMessageOnPort('com2', intermediateConfirm);
}

interComIntermediateConfirm(message: WorkerTaskMessageType): void {
const rawPayload = message.payloads[0] as RawPayload;
console.log(`Worker 2 confirmed: ${rawPayload.message.raw.confirmed}`);

// after receiving the interComIntermediateConfirm from Com2Worker, send execComplete to main
const execComplete = createFromExisting(message, WorkerTaskCommandResponse.EXECUTE_COMPLETE);
const payload = new RawPayload({ finished: 'Hi Main. Worker 1 completed!' });
execComplete.addPayload(payload);
self.postMessage(execComplete);
}

}

const worker = new Com1Worker();
self.onmessage = message => worker.comRouting(message);
self.onmessage = message => comRouting(worker, message);
57 changes: 41 additions & 16 deletions packages/examples/src/worker/Com2Worker.ts
Original file line number Diff line number Diff line change
@@ -1,39 +1,64 @@
import {
InterComPortHandler,
InterComWorker,
RawPayload,
WorkerTaskCommandRequest,
WorkerTaskCommandResponse,
WorkerTaskDefaultWorker,
WorkerTaskMessageType,
WorkerTaskWorker,
comRouting,
createFromExisting
} from 'wtd-core';

export class Com2Worker extends WorkerTaskDefaultWorker {
export class Com2Worker implements WorkerTaskWorker, InterComWorker {

interComIntermediate(message: WorkerTaskMessageType): void {
const rawPayload = message.payloads[0] as RawPayload;
console.log(`Worker1 said: ${rawPayload.message.raw.hello}`);
private icph = new InterComPortHandler();

// after receiving the message from Com1Worker, send execComplete to main
const execComplete = createFromExisting(message, WorkerTaskCommandResponse.EXECUTE_COMPLETE);
const payload = new RawPayload({ hello: 'Worker 2 finished!' });
execComplete.addPayload(payload);
init(message: WorkerTaskMessageType): void {
// register the default com-routing function for inter-worker communication
this.icph.registerPort('com1', message.payloads[0], message => comRouting(this, message));

// send initComplete to main
const initComplete = createFromExisting({} as WorkerTaskMessageType, WorkerTaskCommandResponse.INIT_COMPLETE);
const payload = new RawPayload({ hello: 'Worker 2 initComplete!' });
initComplete.addPayload(payload);

// no need to pack as there aren't any buffers used
this.postMessage(execComplete);
self.postMessage(initComplete);
}

execute(message: WorkerTaskMessageType) {
// register the default com-routing function for inter-worker communication
this.registerPort('com1', message.payloads[0]);

// send message with cmd 'interComIntermediate' to Com1Worker
const sendWorker1 = createFromExisting(message, WorkerTaskCommandRequest.INTERCOM_INTERMEDIATE);
const payload = new RawPayload({ hello: 'Hi Worker 1!' });
sendWorker1.addPayload(payload);

this.postMessageOnPort('com1', sendWorker1);
this.icph.postMessageOnPort('com1', sendWorker1);
}

interComIntermediate(message: WorkerTaskMessageType): void {
const rawPayload = message.payloads[0] as RawPayload;
console.log(`Worker 1 said: ${rawPayload.message.raw.hello}`);

// after receiving the message from Com1Worker, send interComIntermediateConfirm to worker 2
const intermediateConfirm = createFromExisting(message, WorkerTaskCommandResponse.INTERCOM_INTERMEDIATE_CONFIRM);
const payload = new RawPayload({ confirmed: 'Hi Worker 1. I confirm!' });
intermediateConfirm.addPayload(payload);

this.icph.postMessageOnPort('com1', intermediateConfirm);
}

interComIntermediateConfirm(message: WorkerTaskMessageType): void {
const rawPayload = message.payloads[0] as RawPayload;
console.log(`Worker 1 confirmed: ${rawPayload.message.raw.confirmed}`);

// after receiving the interComIntermediateConfirm from Com1Worker, send execComplete to main
const execComplete = createFromExisting(message, WorkerTaskCommandResponse.EXECUTE_COMPLETE);
const payload = new RawPayload({ finished: 'Hi Main. Worker 2 completed!' });
execComplete.addPayload(payload);
self.postMessage(execComplete);
}

}

const worker = new Com2Worker();
self.onmessage = message => worker.comRouting(message);
self.onmessage = message => comRouting(worker, message);
Loading

0 comments on commit 76dd1cb

Please sign in to comment.