From 05245bbc7ef6167c45752e1d34251c0d6983bcb1 Mon Sep 17 00:00:00 2001 From: Jason Miller Date: Wed, 16 Sep 2020 17:20:21 -0400 Subject: [PATCH 1/4] Add worker-task-queue package --- .gitignore | 7 +- package.json | 4 +- packages/worker-task-queue/README.md | 36 ++ packages/worker-task-queue/package.json | 31 ++ packages/worker-task-queue/src/index.ts | 419 ++++++++++++++++++++ packages/worker-task-queue/src/processor.ts | 284 +++++++++++++ packages/worker-task-queue/src/util.ts | 50 +++ src/index.js | 12 +- 8 files changed, 832 insertions(+), 11 deletions(-) create mode 100644 packages/worker-task-queue/README.md create mode 100644 packages/worker-task-queue/package.json create mode 100644 packages/worker-task-queue/src/index.ts create mode 100644 packages/worker-task-queue/src/processor.ts create mode 100644 packages/worker-task-queue/src/util.ts diff --git a/.gitignore b/.gitignore index d855d5e..ce1c80f 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,5 @@ -/package-lock.json -/node_modules -/dist +package-lock.json +node_modules +dist /polyfill +/packages/worker-task-queue/processor diff --git a/package.json b/package.json index 77858ad..02ad98a 100644 --- a/package.json +++ b/package.json @@ -7,7 +7,9 @@ "umd:main": "dist/task-worklet.js", "module": "dist/task-worklet.module.js", "scripts": { - "build": "microbundle -f umd,es && microbundle -f iife src/polyfill.mjs -o polyfill/index.js", + "build": "npm run -s build:main && npm run -s build:wtq", + "build:main": "microbundle -f umd,es && microbundle -f iife src/polyfill.js -o polyfill/index.js", + "build:wtq": "cd packages/worker-task-queue && npm run -s build", "test": "eslint \"{src,test}/**/*.test.{mjs,js}\" && karmatic --no-headless" }, "files": [ diff --git a/packages/worker-task-queue/README.md b/packages/worker-task-queue/README.md new file mode 100644 index 0000000..64d3a39 --- /dev/null +++ b/packages/worker-task-queue/README.md @@ -0,0 +1,36 @@ +# `worker-task-queue` + +This is a standalone implementation of the cooperative multithreading model from [Task Worklet](https://github.com/developit/task-worklet) as a zero-dependency library for Web and Node. + +```js +import WorkerTaskQueue from 'worker-task-queue'; + +// Set up the worker pool +const queue = new WorkerTaskQueue({ + // URL/path for our worker script: + workerUrl: '/path/to/worker.js', + // max pool size: + size: 4 +}); + +function demo(image) { + // allocates a thread in the pool doesn't have one free: + const cropped = postTask('crop', image, { box: [10, 20, 30, 40] }); + + // subsequent tasks run on the same thread to eliminate data transfer: + let large = postTask('resize', cropped, { width: 1000, height: 1000 }); + large = postTask('compress', large, quality); + + // ... except when they get automatically parallelized by moving the input to a second thread: + let thumb = postTask('resize', cropped, { width: 200, height: 200 }); + thumb = postTask('compress', thumb, quality); + + // At this point we've only transferred one image to a background thread, + // and transferred another image between two threads. + + // Only the final results are transferred back here, and only when we ask for them: + showPreview(await large.result, await thumb.result); +} + +demo(); +``` diff --git a/packages/worker-task-queue/package.json b/packages/worker-task-queue/package.json new file mode 100644 index 0000000..35fe99e --- /dev/null +++ b/packages/worker-task-queue/package.json @@ -0,0 +1,31 @@ +{ + "name": "worker-task-queue", + "version": "0.1.0", + "description": "Streamlined processing of tasks in a shared threadpool.", + "module": "dist/worker-task-queue.module.js", + "main": "dist/worker-task-queue.js", + "umd:main": "dist/worker-task-queue.umd.js", + "scripts": { + "build": "microbundle src/index.ts -f es,cjs,umd && microbundle -f es,cjs,umd src/processor.ts -o processor/index.js" + }, + "files": [ + "dist", + "processor", + "src" + ], + "repository": "developit/task-worklet", + "keywords": [ + "tasks", + "task worklet", + "worker task queue", + "task queue", + "parallelization", + "off-main-thread", + "OMT", + "threads", + "multithreading" + ], + "author": "Jason Miller ", + "license": "Apache-2.0", + "homepage": "https://github.com/developit/task-worklet/packages/worker-task-queue" +} diff --git a/packages/worker-task-queue/src/index.ts b/packages/worker-task-queue/src/index.ts new file mode 100644 index 0000000..0699b89 --- /dev/null +++ b/packages/worker-task-queue/src/index.ts @@ -0,0 +1,419 @@ +/** + * Copyright 2018 Google Inc. All Rights Reserved. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +interface TaskQueueOptions { + size?: number; + worker?: WorkerOptions; +} + +type TaskId = number; + +type WorkerId = number; + +const enum Status { + RESOLVE, + REJECT +} + +const enum Options { + LAZY, + EAGER +} + +type TaskResultDesc = [TaskId, Status, any?]; + +// interface WorkerTaskQueue { +// postTask(taskName: string, ...args: any[]): Task; +// } + +export default class WorkerTaskQueue { + /** @private */ + $$pool!: TaskQueuePool; + + constructor(workerUrl: string, options?: TaskQueueOptions) { + const size = (options && Number(options.size)) || 1; + const pool = new TaskQueuePool(size, workerUrl, options && options.worker); + prop(this, '$$pool', pool); + } + + postTask(taskName: string, ...args: any[]) { + const task = new Task(); + task.id = ++COUNT; + this.$$pool.exec(task, taskName, args); + prop(task, '$$queue', this); + return task; + } +} + +// export default WorkerTaskQueue; + +// All IDs are generated by incrementing a shared counter +let COUNT = 0; + +// used to verify that a task was serialized by TaskQueuePool +const SPECIAL = '$' + Math.random().toString(36).substring(2); + +type Walker = (value: Task, i: string | number, parent: object) => void; + +function walkTaskArgs(obj: object, walker: Walker) { + for (let i in obj) { + const value = obj[i]; + if (typeof value === 'object' && value) { + if (value instanceof Task) { + walker(value, i, obj); + } else { + walkTaskArgs(value, walker); + } + } + } +} + +function prop(obj: object, key: string, value: any) { + Object.defineProperty(obj, key, { value }); +} + +interface TaskWorker extends Worker { + id: WorkerId; + pending: number; + ready: Promise; + call(method: string, args: any[]): Promise; +} + +type PromiseController = { + [Status.RESOLVE]: (value: T) => void; + [Status.REJECT]: (error: E) => void; +}; + +interface ResultController extends PromiseController { + /** is the task waiting to be sent to a worker? */ + pending: boolean; + /** has the task been cancelled? */ + cancelled: boolean; + /* has the task been marked as completed by its worker? */ + completed: boolean; + /* has the task result been obtained from the worker? */ + fulfilled: boolean; + /* has the task result been requested from the worker? */ + requested: boolean; + result: Promise; +} + +class TaskQueuePool { + poolSize: number; + workerUrl: string; + workerOptions?: WorkerOptions; + workers: TaskWorker[]; + tasks: Record>; + results: Record; + workerTaskAssignments: Record; + + constructor( + poolSize: number, + workerUrl: string, + workerOptions?: WorkerOptions + ) { + this.poolSize = poolSize; + this.workerUrl = workerUrl; + this.workerOptions = workerOptions; + this.workers = []; + this.tasks = {}; + this.results = {}; + this.workerTaskAssignments = {}; + } + + exec(task: Task, taskName: string, args: any[]) { + const worker = this.getTaskWorker(taskName, args) || this.getNextWorker(); + this.workerTaskAssignments[task.id] = worker.id; + this.tasks[task.id] = task; + task.state = 'scheduled'; + worker.pending++; + + let resolve, reject; + const result = new Promise((res, rej) => { + resolve = res; + reject = rej; + }); + + const resultController: ResultController = { + pending: true, + cancelled: false, + completed: false, + fulfilled: false, + requested: false, + result, + [Status.RESOLVE]: resolve, + [Status.REJECT]: reject + }; + + this.results[task.id] = resultController; + + const tasksToResolveIndices = []; + const tasksToResolve = []; + const tasks = []; + + // @TODO it would be better to serialize tasks to their $$taskIdentifier String representation here. + // However doing so cannot mutate args in-place, as it would reveal the identifier secret. + walkTaskArgs(args, (value) => { + if (this.getWorkerForTask(value.id) !== worker) { + const resultController = this.results[value.id]; + console.warn( + `Task#${value.id} passed to ${taskName}[${ + task.id + }] was invoked in a different context. The result will be ${ + resultController.fulfilled ? '' : 'materialized & ' + }transferred.` + ); + tasksToResolveIndices.push(tasks.length); + tasksToResolve.push(resultController.result); + } + tasks.push(value); + }); + + // also wait for the worker to be loaded (async module resolution, etc) + tasksToResolve.push(worker.ready); + + Promise.all(tasksToResolve) + .then((taskValues) => { + resultController.pending = false; + if (resultController.cancelled) return; + + for (let i = tasks.length; i--; ) { + const task = tasks[i]; + task.$$taskIdentifier = SPECIAL + ':' + task.id; + } + + for (let i = tasksToResolveIndices.length; i--; ) { + const task = tasks[tasksToResolveIndices[i]]; + task.$$taskResult = taskValues[i]; + } + + let options: Options = Options.LAZY; + // if we need a result right away, mark the task as requiring a return + // value. This handles common cases like `await q.postTask().result`. + if (resultController.requested) { + options |= Options.EAGER; + } + worker.call('task', [task.id, options, taskName].concat(args)); + }) + .then(() => { + for (let i = 0; i < tasks.length; i++) { + delete tasks[i].$$taskIdentifier; + delete tasks[i].$$taskResult; + } + }); + } + + /** + * Cancel a task by its ID. + * Cancellation is not guaranteed, since task may already have started executing. + * Cancelling an already-completed task returns `false`. + */ + cancel(taskId: number): boolean | void { + const task = this.tasks[taskId]; + const resultController = this.results[taskId]; + if (resultController.completed || task.state === 'completed') { + return false; + } + + task.state = 'cancelled'; + resultController.cancelled = true; + if (!resultController.pending) { + const workerId = this.workerTaskAssignments[taskId]; + const worker = this.getWorker(workerId); + worker.call('cancel', [taskId]); + } + } + + /** + * Returns a Promise that will be fulfilled with the Task's value/error. + */ + getResult(taskId: number) { + const resultController = this.results[taskId]; + if (!resultController) { + // this should never happen! + throw Error(`Unknown result for Task: ${taskId}`); + } + if (resultController.pending === true) { + resultController.requested = true; + } else if ( + resultController.fulfilled === false && + resultController.requested === false + ) { + resultController.requested = true; + const workerId = this.workerTaskAssignments[taskId]; + const worker = this.getWorker(workerId); + worker.call('getresult', [taskId]); + } + return resultController.result; + } + + freeWorkerTask(worker: TaskWorker) { + if (--worker.pending === 0) { + // @todo: the worker now has no pending tasks. + // Should we reallocate any pending idempotent tasks from other workers in the pool? + // This may be impossible since tasks are scheduled by we don't know + // their instantaneous queuing status at any given point in time. + } + } + + statusReceived(worker: TaskWorker, statuses: TaskResultDesc[]) { + for (let i = 0; i < statuses.length; i++) { + const status = statuses[i]; + const id = status[0]; + const task = this.tasks[id]; + const resultController = this.results[id]; + + if (task.state === 'scheduled') { + const workerId = this.workerTaskAssignments[id]; + const worker = this.getWorker(workerId); + this.freeWorkerTask(worker); + } + + // current only a fulfillment triggers status updates, so we assume an update fulfills its task: + task.state = 'completed'; + resultController.completed = true; + // @TODO: we're dropping the tasks resolved/rejected state on the floor here? + + // [id,status,data] denotes a task with an eager return value (forced/numbers/booleans): + if (status.length === 3) { + task.state = 'fulfilled'; + // resolve/reject the status + resultController.fulfilled = true; + const op = status[1]; + resultController[op](status[2]); + } + } + } + + addWorker() { + const worker = new Worker(this.workerUrl, this.workerOptions) as TaskWorker; + worker.id = ++COUNT; + worker.pending = 0; + const callbacks: Record = {}; + worker.onmessage = (e) => { + const [type, id, data] = e.data; + const got = `${type}Received`; + if (this[got]) return this[got](worker, data); + callbacks[id][type](data); + delete callbacks[id]; + }; + // Nodejs compat + // if (worker.addListener) worker.addListener('message', worker.onmessage); + let q = []; + const resolved = Promise.resolve(); + function process() { + worker.postMessage(q); + q = []; + } + worker.call = (method, params) => + new Promise(function () { + const id = ++COUNT; + callbacks[id] = (arguments as unknown) as PromiseController; + if (q.push([method, id].concat(params)) === 1) { + resolved.then(process); + } + }); + this.workers.push(worker); + worker.ready = worker.call('init', [SPECIAL]); + return worker; + } + + getWorker(id: WorkerId) { + for (let i = 0; i < this.workers.length; i++) { + const worker = this.workers[i]; + if (worker.id == id) return worker; + } + } + + getWorkerForTask(taskId: TaskId) { + const id = this.workerTaskAssignments[taskId]; + for (let i = 0; i < this.workers.length; i++) { + const worker = this.workers[i]; + if (worker.id == id) return worker; + } + } + + getTaskDependencies(args: any[]) { + const tasks: Task[] = []; + walkTaskArgs(args, (value) => { + tasks.push(value); + }); + return tasks; + } + + getTaskWorker(taskName: string, args: any[]) { + const tasks = this.getTaskDependencies(args); + const usage = {}; + let highest = 0; + let best: WorkerId; + + for (let i = 0; i < tasks.length; i++) { + const workerId = this.workerTaskAssignments[tasks[i].id]; + let c = (usage[workerId] = (usage[workerId] || 0) + 1); + if (c > highest) { + highest = c; + best = workerId; + } + } + + if (best != null) return this.getWorker(best); + } + + getNextWorker() { + const size = this.workers.length; + if (size === 0) return this.addWorker(); + let best = this.workers[0]; + for (let i = 1; i < size; i++) { + const worker = this.workers[i]; + if (worker.pending < best.pending) { + best = worker; + } + } + if (best.pending && size < this.poolSize) { + return this.addWorker(); + } + return best; + } +} + +class Task { + id!: number; + state!: string; + result!: T; + + private $$taskIdentifier!: string; + private $$result!: Promise; + private $$queue!: WorkerTaskQueue; + + cancel() { + this.$$queue.$$pool.cancel(this.id); + } +} +Object.defineProperties(Task.prototype, { + $$taskIdentifier: { + enumerable: true + }, + $$result: {}, + $$queue: {}, + state: { + value: 'pending' + }, + result: { + get() { + let c = this.$$result; + if (!c) + prop(this, '$$result', (c = this.$$queue.$$pool.getResult(this.id))); + return c; + } + } +}); diff --git a/packages/worker-task-queue/src/processor.ts b/packages/worker-task-queue/src/processor.ts new file mode 100644 index 0000000..f28bd3f --- /dev/null +++ b/packages/worker-task-queue/src/processor.ts @@ -0,0 +1,284 @@ +/** + * Copyright 2018 Google Inc. All Rights Reserved. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { collectTransferrables, walkReduce, Key } from './util'; + +// promise status resolvers +const enum Status { + RESOLVE, + REJECT +} + +// strings less than this length can be inlined into high priority result listings +const SMALL_STRING_MAX = 512; + +// for task options bitmask +const RETURN_RESULT = 1; + +// sentinel key passed from coordinating thread for Task identification +let SPECIAL: string; + +const resolved = Promise.resolve(); + +export interface Task { + id: number; + $$taskIdentifier: string; + $$taskResult?: T; +} + +export interface Result extends Promise { + state?: Status; + fulfilled?: boolean; + value?: T; + error?: E; +} + +export interface Processor { + new (): Processor; + process(...args: any[]): T; +} + +export function registerTask(name: string, processor: Processor) { + tasks[name] = processor; + descs[name] = Object.assign({}, processor); +} + +type OptionsMask = 0 | 1 | 2; +// Holds incoming flat task queue jobs +type TaskDesc = [number, OptionsMask, string, ...any[]]; +// Holds outgoing serializable task results +type TaskResultDesc = [number, OptionsMask, Status, any]; + +const queue: TaskDesc[] = []; + +const results: Record = {}; + +const tasks: Record = {}; + +const descs: Record> = {}; + +const instances: Record> = {}; + +const cancellations: Record = {}; + +const gotResults: TaskResultDesc[] = []; + +const api = { + init(ident: string) { + SPECIAL = ident; + return Promise.all(Object.values(tasks)).then(() => descs); + }, + task(id: string) { + const data = [].slice.call(arguments); + if (id in cancellations) { + console.log('Skipping cancelled task: ' + id); + return; + } + if (queue.push(data) === 1) next(); + }, + getresult(id: number) { + // @todo: could this set task options and flushResultStatuses()? + for (let i = gotResults.length; i--; ) { + if (gotResults[i][0] === id) { + gotResults.splice(i, 1); + break; + } + } + + if (!(id in results)) throw Error(`Result ${id} not found.`); + + const result = results[id]; + gotResults.push([id, RETURN_RESULT, result.state, result.value]); + flushResultStatuses(); + }, + cancel(id: number) { + cancellations[id] = true; + } +}; + +addEventListener('message', (e) => { + let index = -1; + function next() { + if (++index === e.data.length) return; + const item = e.data[index]; + resolved + .then(() => api[item[0]].apply(null, item.slice(2))) + .then( + (ret) => { + if (ret !== undefined) postMessage([0, item[1], ret]); + next(); + }, + (err) => { + postMessage([1, item[1], '' + err]); + } + ); + } + next(); +}); + +function isTaskValue(value) { + if (typeof value !== 'object' || !value) return false; + if (!('$$taskIdentifier' in value)) return false; + const sentinel = SPECIAL + ':'; + return value.$$taskIdentifier === sentinel + value.id; +} + +function walkTaskValues( + obj: T, + action: (value: any, i?: Key, parent?: object) => void +) { + walkReduce(obj, (acc, value, i, obj) => { + if (isTaskValue(value)) { + action(value, i, obj); + } + }); +} + +function countPendingTasks(task: Task) { + if ('$$taskResult' in task) return; + const result = results[task.id]; + if (result == null || !result.fulfilled) pendingTasks++; +} + +function replaceTaskIdWithResult(task: Task, property: Key, obj: any) { + let value: T; + if ('$$taskResult' in task) { + value = task.$$taskResult; + } else { + const result = results[task.id]; + value = result.error || result.value; + } + obj[property] = value; +} + +let pendingTasks = 0; +let flushTimer: ReturnType; + +function next() { + clearTimeout(flushTimer); + if (queue.length === 0) { + flushTimer = setTimeout(flushResultStatuses, 50); + return; + } + + let taskDesc: TaskDesc; + for (let i = 0; i < queue.length; i++) { + pendingTasks = 0; + walkTaskValues(queue[i], countPendingTasks); + if (pendingTasks === 0) { + taskDesc = queue[i]; + queue.splice(i, 1); + break; + } + } + + // queue has tasks, but all are pending + if (taskDesc == null) { + console.error( + `Queue deadlocked: all ${queue.length} tasks have unresolved dependencies.` + ); + // this is dead time, flush any pending results + flushResultStatuses(); + return; + } + + const [id, options, name, ...args] = taskDesc; + + walkTaskValues(args, replaceTaskIdWithResult); + + delete cancellations[id]; + const processor = tasks[name]; + const result: Result = resolved + .then(() => { + if (typeof processor !== 'function') + throw Error(`Unknown task processor "${name}".`); + const instance = instances[name] || (instances[name] = new processor()); + return instance.process(...args); + }) + .then( + (value) => { + result.state = Status.RESOLVE; + result.fulfilled = true; + result.value = value; + gotResults.push([id, options, Status.RESOLVE, value]); + next(); + }, + (err) => { + result.state = Status.REJECT; + result.fulfilled = true; + result.error = err; + gotResults.push([id, options, Status.REJECT, '' + err]); + next(); + } + ); + results[id] = result; +} + +function isInlineReturn(data) { + const type = typeof data; + return ( + data == null || + type === 'boolean' || + type === 'number' || + (type === 'string' && data.length < SMALL_STRING_MAX) + ); +} + +function flushResultStatuses() { + clearTimeout(flushTimer); + if (gotResults.length === 0) return; + + let statuses = []; + const returnStatuses = []; + const transferrables = []; + let priorityResultCount = 0; + let resultCount = 0; + for (let i = 0; i < gotResults.length; i++) { + if (gotResults[i] == null) continue; + resultCount++; + + const [id, options, state, data] = gotResults[i]; + let status = [id, state]; + // if requested, we'll return the result along with the status: + let returnResult = options & RETURN_RESULT; + // if there are any priority returns in the queue, drop low-priority returns as we switch modes: + if (returnResult) priorityResultCount++; + + // preemptively pass nearly-free result types to the coordinating thread. + const transferrablesBefore = transferrables.length; + if (data) { + walkReduce(data, collectTransferrables, transferrables); + } + const hasTransferrables = transferrables.length > transferrablesBefore; + + if (returnResult || hasTransferrables || isInlineReturn(data)) { + status.push(data); + returnStatuses.push(status); + gotResults[i] = null; + } + statuses.push(status); + } + + if (priorityResultCount !== 0) statuses = returnStatuses; + + // low-priority/normal return clears the entire queue + if (resultCount === 0 || statuses.length === resultCount) { + gotResults.length = 0; + } else { + flushTimer = setTimeout(flushResultStatuses, 50); + } + + if (statuses.length !== 0) { + postMessage(['status', 0, statuses], transferrables); + } +} diff --git a/packages/worker-task-queue/src/util.ts b/packages/worker-task-queue/src/util.ts new file mode 100644 index 0000000..cdaa1e5 --- /dev/null +++ b/packages/worker-task-queue/src/util.ts @@ -0,0 +1,50 @@ +/** + * Copyright 2018 Google Inc. All Rights Reserved. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +export type Key = string | number; + +type Reducer = ( + accumulator: Partial | void, + obj: any, + index: Key, + parent: any +) => Partial | void; + +export function walkReduce( + obj: any, + reducer: Reducer, + accumulator?: Partial | void, + index?: Key, + parent?: any +) { + let result = reducer(accumulator, obj, index, parent); + if (result === undefined) result = accumulator; + if (typeof obj === 'object' && obj) { + for (let i in obj) { + walkReduce(obj[i], reducer, result, i, obj); + } + } + return result; +} + +type Transferrables = ArrayBuffer | MessagePort | ImageBitmap; + +export function collectTransferrables(xfer: Transferrables[], value: T) { + if ( + value instanceof ArrayBuffer || + value instanceof MessagePort || + value instanceof ImageBitmap + ) { + xfer.push(value); + } +} diff --git a/src/index.js b/src/index.js index 93dac23..c7b643c 100644 --- a/src/index.js +++ b/src/index.js @@ -1,5 +1,3 @@ -/* eslint-disable spaced-comment */ - /** * Copyright 2018 Google Inc. All Rights Reserved. * Licensed under the Apache License, Version 2.0 (the "License"); @@ -79,7 +77,7 @@ const workerUrl = URL.createObjectURL( const f = reducer(accumulator, obj, index, parent); if (f !== undefined) accumulator = f; if (typeof obj === 'object' && obj) { - for (let i in obj) { + for (const i in obj) { walkReduce(obj[i], reducer, accumulator, i, obj); } } @@ -201,9 +199,9 @@ const workerUrl = URL.createObjectURL( if (gotResults[i] == null) continue; resultCount++; const [id, options, state, data] = gotResults[i]; - let status = [id, state]; + const status = [id, state]; // if requested, we'll return the result along with the status: - let returnResult = options & RETURN_RESULT; + const returnResult = options & RETURN_RESULT; // if there are any priority returns in the queue, drop low-priority returns as we switch modes: if (returnResult) priorityResultCount++; @@ -327,7 +325,7 @@ let COUNT = 0; const SPECIAL = '$' + Math.random().toString(36).substring(2); function walkTaskArgs(obj, walker) { - for (let i in obj) { + for (const i in obj) { const value = obj[i]; if (typeof value === 'object' && value) { if (value instanceof Task) { @@ -565,7 +563,7 @@ class TaskQueuePool { for (const task of tasks) { const workerId = this.workerTaskAssignments[task.id]; - let c = (usage[workerId] = (usage[workerId] || 0) + 1); + const c = (usage[workerId] = (usage[workerId] || 0) + 1); if (c > highest) { highest = c; best = workerId; From e09cbe834e92bcdb5a862ccf422316f9740de204 Mon Sep 17 00:00:00 2001 From: Jason Miller Date: Wed, 16 Sep 2020 17:24:23 -0400 Subject: [PATCH 2/4] readme fix --- packages/worker-task-queue/README.md | 29 ++++++++++++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/packages/worker-task-queue/README.md b/packages/worker-task-queue/README.md index 64d3a39..d572b26 100644 --- a/packages/worker-task-queue/README.md +++ b/packages/worker-task-queue/README.md @@ -2,6 +2,8 @@ This is a standalone implementation of the cooperative multithreading model from [Task Worklet](https://github.com/developit/task-worklet) as a zero-dependency library for Web and Node. +**main thread code:** + ```js import WorkerTaskQueue from 'worker-task-queue'; @@ -34,3 +36,30 @@ function demo(image) { demo(); ``` + +**worker code:** + +```js +import { registerTask } from 'worker-task-queue/processor'; + +registerTask('crop', class { + process(image, { box }) { + // complicated stuff in here + return image; + } +}); + +registerTask('resize', class { + process(image, { width, height }) { + // complicated stuff in here + return image; + } +}); + +registerTask('compress', class { + process(image, quality) { + // complicated stuff in here + return image; + } +}); +``` From e69621a923ed4a44f87e9656ce5b0bb3bd45aca5 Mon Sep 17 00:00:00 2001 From: Jason Miller Date: Mon, 16 Aug 2021 18:00:38 -0400 Subject: [PATCH 3/4] Make it actually work! --- package.json | 5 ++ packages/worker-task-queue/package.json | 45 +++++++++++-- packages/worker-task-queue/src/index.ts | 23 ++++++- packages/worker-task-queue/src/processor.ts | 19 +++--- packages/worker-task-queue/src/util.ts | 9 ++- packages/worker-task-queue/test/index.test.js | 63 +++++++++++++++++++ packages/worker-task-queue/test/worker.js | 10 +++ 7 files changed, 158 insertions(+), 16 deletions(-) create mode 100644 packages/worker-task-queue/test/index.test.js create mode 100644 packages/worker-task-queue/test/worker.js diff --git a/package.json b/package.json index 6cabfdc..7c052f5 100644 --- a/package.json +++ b/package.json @@ -52,10 +52,15 @@ "license": "Apache-2.0", "homepage": "https://github.com/developit/task-worklet", "devDependencies": { + "@babel/preset-env": "^7.15.0", + "@babel/preset-typescript": "^7.15.0", + "@babel/register": "^7.15.3", + "@types/jest": "^27.0.1", "eslint": "^7.9.0", "eslint-config-google": "^0.14.0", "eslint-config-prettier": "^6.11.0", "file-loader": "^6.1.0", + "jest": "^27.0.6", "karmatic": "^2.1.0", "microbundle": "^0.12.3", "prettier": "^2.1.2", diff --git a/packages/worker-task-queue/package.json b/packages/worker-task-queue/package.json index 35fe99e..be6d13e 100644 --- a/packages/worker-task-queue/package.json +++ b/packages/worker-task-queue/package.json @@ -2,17 +2,48 @@ "name": "worker-task-queue", "version": "0.1.0", "description": "Streamlined processing of tasks in a shared threadpool.", - "module": "dist/worker-task-queue.module.js", - "main": "dist/worker-task-queue.js", - "umd:main": "dist/worker-task-queue.umd.js", + "exports": { + "./processor": { + "module": "./processor/index.mjs", + "import": "./processor/index.mjs", + "default": "./processor/index.js" + }, + ".": { + "module": "./dist/worker-task-queue.mjs", + "import": "./dist/worker-task-queue.mjs", + "default": "./dist/worker-task-queue.js" + } + }, + "module": "./dist/worker-task-queue.mjs", + "main": "./dist/worker-task-queue.js", + "umd:main": "./dist/worker-task-queue.umd.js", "scripts": { - "build": "microbundle src/index.ts -f es,cjs,umd && microbundle -f es,cjs,umd src/processor.ts -o processor/index.js" + "build": "microbundle src/index.ts -f es,cjs,umd && microbundle -f es,cjs,umd src/processor.ts -o processor/index.js", + "test": "../../node_modules/.bin/jest" }, "files": [ "dist", "processor", "src" ], + "babel": { + "env": { + "test": { + "presets": [ + [ + "@babel/preset-env", + { + "targets": { + "node": "current" + }, + "modules": "commonjs" + } + ], + "@babel/preset-typescript" + ] + } + } + }, "repository": "developit/task-worklet", "keywords": [ "tasks", @@ -27,5 +58,9 @@ ], "author": "Jason Miller ", "license": "Apache-2.0", - "homepage": "https://github.com/developit/task-worklet/packages/worker-task-queue" + "homepage": "https://github.com/developit/task-worklet/packages/worker-task-queue", + "devDependencies": { + "microbundle": "^0.13.3", + "web-worker": "^1.0.0" + } } diff --git a/packages/worker-task-queue/src/index.ts b/packages/worker-task-queue/src/index.ts index 0699b89..dab102a 100644 --- a/packages/worker-task-queue/src/index.ts +++ b/packages/worker-task-queue/src/index.ts @@ -53,6 +53,10 @@ export default class WorkerTaskQueue { prop(task, '$$queue', this); return task; } + + destroy() { + this.$$pool.destroy(); + } } // export default WorkerTaskQueue; @@ -131,6 +135,15 @@ class TaskQueuePool { this.workerTaskAssignments = {}; } + destroy() { + const tasks = this.tasks; + for (let id in tasks) { + this.cancel(id as any as number); + } + let worker; + while (worker = this.workers.pop()) worker.terminate(); + } + exec(task: Task, taskName: string, args: any[]) { const worker = this.getTaskWorker(taskName, args) || this.getNextWorker(); this.workerTaskAssignments[task.id] = worker.id; @@ -401,11 +414,17 @@ class Task { } Object.defineProperties(Task.prototype, { $$taskIdentifier: { + writable: true, enumerable: true }, - $$result: {}, - $$queue: {}, + $$result: { + writable: true, + }, + $$queue: { + writable: true, + }, state: { + writable: true, value: 'pending' }, result: { diff --git a/packages/worker-task-queue/src/processor.ts b/packages/worker-task-queue/src/processor.ts index f28bd3f..9d19ec4 100644 --- a/packages/worker-task-queue/src/processor.ts +++ b/packages/worker-task-queue/src/processor.ts @@ -43,13 +43,17 @@ export interface Result extends Promise { error?: E; } -export interface Processor { +export interface ProcessorConstructor { new (): Processor; - process(...args: any[]): T; } -export function registerTask(name: string, processor: Processor) { +export interface Processor { + process(...args: any[]): any; +} + +export function registerTask(name: string, processor: ProcessorConstructor) { tasks[name] = processor; + // @ts-ignore-next we're copying properties off the class here. descs[name] = Object.assign({}, processor); } @@ -63,11 +67,11 @@ const queue: TaskDesc[] = []; const results: Record = {}; -const tasks: Record = {}; +const tasks: Record = {}; -const descs: Record> = {}; +const descs: Record> = {}; -const instances: Record> = {}; +const instances: Record> = {}; const cancellations: Record = {}; @@ -167,7 +171,8 @@ let flushTimer: ReturnType; function next() { clearTimeout(flushTimer); if (queue.length === 0) { - flushTimer = setTimeout(flushResultStatuses, 50); + // flushTimer = setTimeout(flushResultStatuses, 50); + flushResultStatuses(); return; } diff --git a/packages/worker-task-queue/src/util.ts b/packages/worker-task-queue/src/util.ts index cdaa1e5..bfa615f 100644 --- a/packages/worker-task-queue/src/util.ts +++ b/packages/worker-task-queue/src/util.ts @@ -39,11 +39,16 @@ export function walkReduce( type Transferrables = ArrayBuffer | MessagePort | ImageBitmap; +// This is done to support Node, which lacks ImageBitmap and sometimes MessagePort: +function Never() {} +const MessagePortConstructor = (typeof MessagePort === 'undefined' ? Never : MessagePort) as any as typeof MessagePort; +const ImageBitmapConstructor = (typeof ImageBitmap === 'undefined' ? Never : ImageBitmap) as any as typeof ImageBitmap; + export function collectTransferrables(xfer: Transferrables[], value: T) { if ( value instanceof ArrayBuffer || - value instanceof MessagePort || - value instanceof ImageBitmap + value instanceof MessagePortConstructor || + value instanceof ImageBitmapConstructor ) { xfer.push(value); } diff --git a/packages/worker-task-queue/test/index.test.js b/packages/worker-task-queue/test/index.test.js new file mode 100644 index 0000000..f95b13b --- /dev/null +++ b/packages/worker-task-queue/test/index.test.js @@ -0,0 +1,63 @@ +/** + * Copyright 2018 Google Inc. All Rights Reserved. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import WorkerTaskQueue from '../src/index'; +import Worker from 'web-worker'; +global.Worker = Worker; + +const tick = () => new Promise((r) => process.nextTick(r)); + +describe('TaskQueue', () => { + it('should pass smoketest', async () => { + expect(typeof WorkerTaskQueue).toBe('function'); + }); + + describe('task execution and chaining', () => { + let queue; + + it('should be instantiable via relative path', async () => { + queue = new WorkerTaskQueue(require.resolve('./worker.js'), { size: 4 }); + }); + + it('should execute a single task', async () => { + const sum1 = queue.postTask('add', 1, 2); + + await tick(); + expect(sum1.state).toBe('scheduled'); + + const result = await sum1.result; + expect(sum1.state).toBe('fulfilled'); + expect(result).toBe(3); + }); + + it('should execute tasks with task dependencies', async () => { + const sum1 = queue.postTask('add', 1, 2); + const sum2 = queue.postTask('add', sum1, 2); + + await tick(); + expect(sum1.state).toBe('scheduled'); + expect(sum2.state).toBe('scheduled'); + + const result = await sum2.result; + expect(sum1.state).toBe('fulfilled'); + expect(sum2.state).toBe('fulfilled'); + expect(result).toBe(5); + }); + + it('should terminate workers when destroy() is called', () => { + expect(() => { + queue.destroy(); + }).not.toThrow(); + }); + }); +}); diff --git a/packages/worker-task-queue/test/worker.js b/packages/worker-task-queue/test/worker.js new file mode 100644 index 0000000..bf7fb4a --- /dev/null +++ b/packages/worker-task-queue/test/worker.js @@ -0,0 +1,10 @@ +const { registerTask } = require('worker-task-queue/processor'); + +registerTask( + 'add', + class { + process(a, b) { + return a + b; + } + } +); From b6bee9540407b46972958319797129e557774f5d Mon Sep 17 00:00:00 2001 From: Jason Miller Date: Tue, 17 Aug 2021 16:23:27 -0400 Subject: [PATCH 4/4] fix unwriteable internal fields --- packages/worker-task-queue/package.json | 2 +- packages/worker-task-queue/src/index.ts | 34 +++++++++++++++---------- 2 files changed, 21 insertions(+), 15 deletions(-) diff --git a/packages/worker-task-queue/package.json b/packages/worker-task-queue/package.json index be6d13e..c88d1ff 100644 --- a/packages/worker-task-queue/package.json +++ b/packages/worker-task-queue/package.json @@ -1,6 +1,6 @@ { "name": "worker-task-queue", - "version": "0.1.0", + "version": "0.2.1", "description": "Streamlined processing of tasks in a shared threadpool.", "exports": { "./processor": { diff --git a/packages/worker-task-queue/src/index.ts b/packages/worker-task-queue/src/index.ts index dab102a..348a3af 100644 --- a/packages/worker-task-queue/src/index.ts +++ b/packages/worker-task-queue/src/index.ts @@ -11,9 +11,8 @@ * limitations under the License. */ -interface TaskQueueOptions { +interface TaskQueueOptions extends WorkerOptions { size?: number; - worker?: WorkerOptions; } type TaskId = number; @@ -36,13 +35,14 @@ type TaskResultDesc = [TaskId, Status, any?]; // postTask(taskName: string, ...args: any[]): Task; // } -export default class WorkerTaskQueue { - /** @private */ - $$pool!: TaskQueuePool; +interface WorkerTaskQueue { + $$pool: TaskQueuePool; +} +class WorkerTaskQueue { constructor(workerUrl: string, options?: TaskQueueOptions) { const size = (options && Number(options.size)) || 1; - const pool = new TaskQueuePool(size, workerUrl, options && options.worker); + const pool = new TaskQueuePool(size, workerUrl, options); prop(this, '$$pool', pool); } @@ -59,6 +59,8 @@ export default class WorkerTaskQueue { } } +export default WorkerTaskQueue; + // export default WorkerTaskQueue; // All IDs are generated by incrementing a shared counter @@ -112,7 +114,7 @@ interface ResultController extends PromiseController { result: Promise; } -class TaskQueuePool { +interface TaskQueuePool { poolSize: number; workerUrl: string; workerOptions?: WorkerOptions; @@ -120,7 +122,9 @@ class TaskQueuePool { tasks: Record>; results: Record; workerTaskAssignments: Record; +} +class TaskQueuePool { constructor( poolSize: number, workerUrl: string, @@ -399,15 +403,17 @@ class TaskQueuePool { } } -class Task { - id!: number; - state!: string; - result!: T; +interface Task { + id: number; + state: string; + result: T; - private $$taskIdentifier!: string; - private $$result!: Promise; - private $$queue!: WorkerTaskQueue; + $$taskIdentifier: string; + $$result: Promise; + $$queue: WorkerTaskQueue; +} +class Task { cancel() { this.$$queue.$$pool.cancel(this.id); }