diff --git a/.github/workflows/release-package.yml b/.github/workflows/release-package.yml new file mode 100644 index 0000000..0cdbc00 --- /dev/null +++ b/.github/workflows/release-package.yml @@ -0,0 +1,33 @@ +name: Node.js Package + +on: + release: + types: [created] + +jobs: + build: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v2 + - uses: actions/setup-node@v1 + with: + node-version: 16 + - run: npm ci + - run: npm test + + publish-gpr: + needs: build + runs-on: ubuntu-latest + permissions: + packages: write + contents: read + steps: + - uses: actions/checkout@v2 + - uses: actions/setup-node@v1 + with: + node-version: 16 + registry-url: https://npm.pkg.github.com/ + - run: npm ci + - run: npm publish + env: + NODE_AUTH_TOKEN: ${{secrets.GITHUB_TOKEN}} \ No newline at end of file diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..236ce59 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +node_modules +/build \ No newline at end of file diff --git a/.npmrc b/.npmrc new file mode 100644 index 0000000..53e524d --- /dev/null +++ b/.npmrc @@ -0,0 +1 @@ +@unlib-js:registry=https://npm.pkg.github.com \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..523ab77 --- /dev/null +++ b/README.md @@ -0,0 +1,22 @@ +# `EventBarrier` + +`async-await`-style synchronization primitive in JavaScript world. + +This is like the `once` helper function in the built-in module `events` of Node.js but with more powerful utilities. +It is useful to wrap a legacy event-style object to avoid scattered local states. + +## Example + +```TypeScript +import EventBarrier from 'event-barrier' + +const eb = new EventBarrier + +async function foo() { + sendRequest('blah') + await eb.waitFor('response', 5000) +} + +// Somewhere else +eb.notify('response', res) +``` diff --git a/package-lock.json b/package-lock.json new file mode 100644 index 0000000..3242ccb --- /dev/null +++ b/package-lock.json @@ -0,0 +1,51 @@ +{ + "name": "@unlib-js/event-barrier", + "version": "1.0.0", + "lockfileVersion": 2, + "requires": true, + "packages": { + "": { + "name": "@unlib-js/event-barrier", + "version": "1.0.0", + "hasInstallScript": true, + "license": "ISC", + "devDependencies": { + "@types/node": "^15.0.3", + "typescript": "^4.2.4" + } + }, + "node_modules/@types/node": { + "version": "15.0.3", + "resolved": "https://registry.npmjs.org/@types/node/-/node-15.0.3.tgz", + "integrity": "sha512-/WbxFeBU+0F79z9RdEOXH4CsDga+ibi5M8uEYr91u3CkT/pdWcV8MCook+4wDPnZBexRdwWS+PiVZ2xJviAzcQ==", + "dev": true + }, + "node_modules/typescript": { + "version": "4.2.4", + "resolved": "https://registry.npmjs.org/typescript/-/typescript-4.2.4.tgz", + "integrity": "sha512-V+evlYHZnQkaz8TRBuxTA92yZBPotr5H+WhQ7bD3hZUndx5tGOa1fuCgeSjxAzM1RiN5IzvadIXTVefuuwZCRg==", + "dev": true, + "bin": { + "tsc": "bin/tsc", + "tsserver": "bin/tsserver" + }, + "engines": { + "node": ">=4.2.0" + } + } + }, + "dependencies": { + "@types/node": { + "version": "15.0.3", + "resolved": "https://registry.npmjs.org/@types/node/-/node-15.0.3.tgz", + "integrity": "sha512-/WbxFeBU+0F79z9RdEOXH4CsDga+ibi5M8uEYr91u3CkT/pdWcV8MCook+4wDPnZBexRdwWS+PiVZ2xJviAzcQ==", + "dev": true + }, + "typescript": { + "version": "4.2.4", + "resolved": "https://registry.npmjs.org/typescript/-/typescript-4.2.4.tgz", + "integrity": "sha512-V+evlYHZnQkaz8TRBuxTA92yZBPotr5H+WhQ7bD3hZUndx5tGOa1fuCgeSjxAzM1RiN5IzvadIXTVefuuwZCRg==", + "dev": true + } + } +} diff --git a/package.json b/package.json new file mode 100644 index 0000000..e0fcd2e --- /dev/null +++ b/package.json @@ -0,0 +1,25 @@ +{ + "name": "@unlib-js/event-barrier", + "version": "1.0.0", + "description": "Event barrier that provides `notify` and `waitFor` primitives", + "main": "build/index.js", + "scripts": { + "test": "node build/test.js", + "build": "tsc", + "install": "tsc" + }, + "keywords": [ + "synchronize", + "barrier", + "condition" + ], + "author": "Untitled", + "license": "ISC", + "devDependencies": { + "@types/node": "^15.0.3", + "typescript": "^4.2.4" + }, + "repository": { + "url": "https://github.com/unlib-js/event-barrier.git" + } +} diff --git a/src/index.ts b/src/index.ts new file mode 100644 index 0000000..32c7dd4 --- /dev/null +++ b/src/index.ts @@ -0,0 +1,180 @@ +import { EventEmitter } from 'events' + + +export type ErrorType = Error | string | number + +type TimeoutHandle = ReturnType + +interface Waiter { + resolve: (value: any) => void + reject: (err: ErrorType) => void + tHandle?: TimeoutHandle + abortSignal?: { signal: AbortSignal, onAbort: (this: AbortSignal, ev: Event) => any } +} + +type EventWaiter = Waiter[] + +export class TimeoutError extends Error { + public readonly name: string = 'TimeoutError' + public readonly source: string = 'Sync.EventBarrier' + public readonly event?: string + constructor(msg?: string, event?: string) { + super(msg) + if (event) this.event = event + } +} + +export class AbortionError extends Error { + public readonly name: string = 'AbortionError' + public readonly source: string = 'Sync.EventBarrier' + public readonly event?: string + constructor(msg?: string, event?: string) { + super(msg) + if (event) this.event = event + } +} + +export class AbortedBySignalError extends AbortionError { + public readonly name: string = 'AbortedBySignalError' + public readonly source: string = 'Sync.EventBarrier' + public readonly event?: string + constructor(msg: string = 'Aborted by signal', event?: string) { + super(msg, event) + if (event) this.event = event + } +} + +/** + * Event barrier + * + * Note: do not reuse the instance to avoid event leak + */ +export class EventBarrier extends EventEmitter { + private waiters: Map< + /* event */ string, + /* resolvers, timeout handles, etc. */ EventWaiter + > = new Map + /** + * Internal emitter + */ + private emitter = new EventEmitter + + /** + * Notify all the waiters of the event + * @param event Event that just happened + * @param value Event payload value (as the second argument of + * `EventEmitter.prototype.emit`) + * @param count The number of waiters to be waked up. Leave `undefined` to + * wake up all of them + */ + public notify(event: string, value?: any, count?: number) { + this.emit(event, value) + this.emitter.emit(event, value, count) + return this + } + + /** + * Abort waiters that are waiting for an event, causing `waitFor` to reject + * @param event The event that the waiters are still waiting for + * @param err Error as rejection reason + */ + public abort(event: string, err?: ErrorType) { + const { emitter, waiters } = this + if (!err) err = new AbortionError(undefined, event) + const eventWaiter = waiters.get(event) + if (eventWaiter) { + for (const waiter of eventWaiter) { + EventBarrier.abortWaiter(waiter, err) + } + waiters.delete(event) + emitter.removeAllListeners(event) + } + return this + } + + /** + * Abort all waiters on all events + * @param err Error as rejection reason + */ + public abortAll(err?: ErrorType) { + this.waiters.forEach((_, key) => this.abort(key, err)) + return this + } + + /** + * Wait for specific event to happen + * @param event Event to wait for + * @param timeout If specified, a `TimeoutError` will be thrown after that + * amount of time (in milliseconds) + * @param signal If specified, an `AbortError` will be thrown if aborted + */ + public waitFor(event: string, timeout?: number, signal?: AbortSignal): Promise { + return new Promise((res, rej) => { + const eventWaiter = this.getOrCreateEventWaiter(event) + const { emitter } = this + let thisWaiter: Waiter = { resolve: res, reject: rej } + const onNotify = (value: any, count?: number) => { + if (count == undefined) count = eventWaiter.length + for (const waiter of eventWaiter.splice(0, count)) { + EventBarrier.resolveWaiter(waiter, value) + } + // Clean up waiter + if (eventWaiter.length == 0) { + this.waiters.delete(event) + emitter.removeAllListeners(event) + } + } + if (emitter.listeners(event).length == 0) emitter.on(event, onNotify) + const getOnAbort = (reason: TimeoutError | AbortedBySignalError) => { + return () => { + const waiterIndex = eventWaiter.indexOf(thisWaiter) + if (waiterIndex >= 0) eventWaiter.splice(waiterIndex, 1) + // Clean up waiter + if (eventWaiter.length == 0) { + this.waiters.delete(event) + emitter.removeAllListeners(event) + } + EventBarrier.abortWaiter(thisWaiter, reason) + } + } + if (signal) { + const onAbortBySignal = getOnAbort(new AbortedBySignalError(undefined, event)) + signal.addEventListener('abort', onAbortBySignal) + thisWaiter.abortSignal = { signal, onAbort: onAbortBySignal } + } + if (timeout != undefined) { + const t: ReturnType = setTimeout(getOnAbort(new TimeoutError(undefined, event)), timeout) + thisWaiter.tHandle = t + } + eventWaiter.push(thisWaiter) + }) + } + + private getOrCreateEventWaiter(event: string) { + const { waiters } = this + const waiter = waiters.get(event) + if (waiter) return waiter + else { + const newWaiter: EventWaiter = [] + waiters.set(event, newWaiter) + return newWaiter + } + } + + private static resolveWaiter({ resolve, tHandle, abortSignal }: Waiter, value: any) { + EventBarrier.cleanUp(tHandle, abortSignal) + resolve(value) + } + + private static abortWaiter({ reject, tHandle, abortSignal }: Waiter, reason: ErrorType) { + EventBarrier.cleanUp(tHandle, abortSignal) + reject(reason) + } + + private static cleanUp(tHandle?: TimeoutHandle, abortSignal?: { signal: AbortSignal, onAbort: (this: AbortSignal, ev: Event) => any }) { + if (tHandle) clearTimeout(tHandle) + if (abortSignal) abortSignal.signal.removeEventListener('abort', abortSignal.onAbort) + } +} + +export default EventBarrier diff --git a/src/test.ts b/src/test.ts new file mode 100644 index 0000000..3c6d7ce --- /dev/null +++ b/src/test.ts @@ -0,0 +1,110 @@ +import EventBarrier from '.' +import assert from 'assert' +import { setTimeout } from 'timers/promises' + + +async function test() { + const eb = new EventBarrier + await assert.doesNotReject(Promise.all([ + (async () => { + await setTimeout(100) + eb.notify('foo', 'bar') + })(), + (async () => { + assert.strictEqual(await eb.waitFor('foo'), 'bar') + })(), + (async () => { + assert.strictEqual(await eb.waitFor('foo'), 'bar') + })(), + (async () => { + assert.strictEqual(await eb.waitFor('foo'), 'bar') + })(), + (async () => { + assert.strictEqual(await eb.waitFor('foo'), 'bar') + })() + ])) + assert.strictEqual((eb as any).waiters.size, 0) + + await assert.doesNotReject(Promise.all([ + (async () => { + await setTimeout(100) + eb.notify('foo', 'bar', 2) + assert.strictEqual((eb as any).waiters.get('foo').length, 2) + await setTimeout(100) + eb.notify('foo', 'doo', 2) + })(), + (async () => { + assert.strictEqual(await eb.waitFor('foo'), 'bar') + })(), + (async () => { + assert.strictEqual(await eb.waitFor('foo'), 'bar') + })(), + (async () => { + assert.strictEqual(await eb.waitFor('foo'), 'doo') + })(), + (async () => { + assert.strictEqual(await eb.waitFor('foo'), 'doo') + })() + ])) + assert.strictEqual((eb as any).waiters.size, 0) + + await assert.doesNotReject(Promise.all([ + (async () => { + await setTimeout(100) + eb.notify('foo', 'bar', 2) + await setTimeout(100) + eb.abort('foo') + })(), + (async () => { + assert.strictEqual(await eb.waitFor('foo'), 'bar') + })(), + (async () => { + assert.strictEqual(await eb.waitFor('foo'), 'bar') + })(), + assert.rejects(eb.waitFor('foo'), { name: 'AbortionError' }), + assert.rejects(eb.waitFor('foo'), { name: 'AbortionError' }) + ])) + assert.strictEqual((eb as any).waiters.size, 0) + + await assert.doesNotReject(Promise.all([ + (async () => { + await setTimeout(100) + eb.notify('foo', 'bar', 2) + })(), + assert.rejects(eb.waitFor('foo', 10), { name: 'TimeoutError' }), + (async () => { + assert.strictEqual(await eb.waitFor('foo'), 'bar') + })(), + (async () => { + assert.strictEqual(await eb.waitFor('foo'), 'bar') + })(), + assert.rejects(eb.waitFor('foo', 200), { name: 'TimeoutError' }), + assert.rejects(eb.waitFor('foo', 200), { name: 'TimeoutError' }) + ])) + assert.strictEqual((eb as any).waiters.size, 0) + + const ac = new AbortController + await assert.doesNotReject(Promise.all([ + (async () => { + await setTimeout(100) + eb.notify('foo', 'bar', 2) + ac.abort() + })(), + assert.rejects(eb.waitFor('foo', 50, ac.signal), { name: 'TimeoutError' }), + (async () => { + assert.strictEqual(await eb.waitFor('foo'), 'bar') + })(), + (async () => { + assert.strictEqual(await eb.waitFor('foo'), 'bar') + })(), + assert.rejects(eb.waitFor('foo', undefined, ac.signal), { name: 'AbortedBySignalError' }), + assert.rejects(eb.waitFor('foo', undefined, ac.signal), { name: 'AbortedBySignalError' }), + assert.rejects(eb.waitFor('foo', 200, ac.signal), { name: 'AbortedBySignalError' }), + assert.rejects(eb.waitFor('foo', 200), { name: 'TimeoutError' }), + ])) + assert.strictEqual((eb as any).waiters.size, 0) + + console.log('Tests passed') +} + +test() diff --git a/tsconfig.json b/tsconfig.json new file mode 100644 index 0000000..fe9b1d8 --- /dev/null +++ b/tsconfig.json @@ -0,0 +1,75 @@ +{ + "compilerOptions": { + /* Visit https://aka.ms/tsconfig.json to read more about this file */ + + /* Basic Options */ + // "incremental": true, /* Enable incremental compilation */ + "target": "esnext", /* Specify ECMAScript target version: 'ES3' (default), 'ES5', 'ES2015', 'ES2016', 'ES2017', 'ES2018', 'ES2019', 'ES2020', or 'ESNEXT'. */ + "module": "commonjs", /* Specify module code generation: 'none', 'commonjs', 'amd', 'system', 'umd', 'es2015', 'es2020', or 'ESNext'. */ + // "lib": [], /* Specify library files to be included in the compilation. */ + // "allowJs": true, /* Allow javascript files to be compiled. */ + // "checkJs": true, /* Report errors in .js files. */ + // "jsx": "preserve", /* Specify JSX code generation: 'preserve', 'react-native', 'react', 'react-jsx' or 'react-jsxdev'. */ + "declaration": true, /* Generates corresponding '.d.ts' file. */ + "declarationMap": true, /* Generates a sourcemap for each corresponding '.d.ts' file. */ + "declarationDir": "build", + "sourceMap": true, /* Generates corresponding '.map' file. */ + // "outFile": "./", /* Concatenate and emit output to single file. */ + "outDir": "build", /* Redirect output structure to the directory. */ + // "rootDir": "./", /* Specify the root directory of input files. Use to control the output directory structure with --outDir. */ + // "composite": true, /* Enable project compilation */ + // "tsBuildInfoFile": "./", /* Specify file to store incremental compilation information */ + // "removeComments": true, /* Do not emit comments to output. */ + // "noEmit": true, /* Do not emit outputs. */ + // "importHelpers": true, /* Import emit helpers from 'tslib'. */ + // "downlevelIteration": true, /* Provide full support for iterables in 'for-of', spread, and destructuring when targeting 'ES5' or 'ES3'. */ + // "isolatedModules": true, /* Transpile each file as a separate module (similar to 'ts.transpileModule'). */ + + /* Strict Type-Checking Options */ + "strict": true, /* Enable all strict type-checking options. */ + "noImplicitAny": true, /* Raise error on expressions and declarations with an implied 'any' type. */ + "strictNullChecks": true, /* Enable strict null checks. */ + "strictFunctionTypes": true, /* Enable strict checking of function types. */ + // "strictBindCallApply": true, /* Enable strict 'bind', 'call', and 'apply' methods on functions. */ + "strictPropertyInitialization": true, /* Enable strict checking of property initialization in classes. */ + "noImplicitThis": true, /* Raise error on 'this' expressions with an implied 'any' type. */ + "alwaysStrict": true, /* Parse in strict mode and emit "use strict" for each source file. */ + + /* Additional Checks */ + // "noUnusedLocals": true, /* Report errors on unused locals. */ + // "noUnusedParameters": true, /* Report errors on unused parameters. */ + // "noImplicitReturns": true, /* Report error when not all code paths in function return a value. */ + // "noFallthroughCasesInSwitch": true, /* Report errors for fallthrough cases in switch statement. */ + // "noUncheckedIndexedAccess": true, /* Include 'undefined' in index signature results */ + // "noPropertyAccessFromIndexSignature": true, /* Require undeclared properties from index signatures to use element accesses. */ + + /* Module Resolution Options */ + // "moduleResolution": "node", /* Specify module resolution strategy: 'node' (Node.js) or 'classic' (TypeScript pre-1.6). */ + // "baseUrl": "./", /* Base directory to resolve non-absolute module names. */ + // "paths": {}, /* A series of entries which re-map imports to lookup locations relative to the 'baseUrl'. */ + // "rootDirs": [], /* List of root folders whose combined content represents the structure of the project at runtime. */ + // "typeRoots": [], /* List of folders to include type definitions from. */ + // "types": [], /* Type declaration files to be included in compilation. */ + // "allowSyntheticDefaultImports": true, /* Allow default imports from modules with no default export. This does not affect code emit, just typechecking. */ + "esModuleInterop": true, /* Enables emit interoperability between CommonJS and ES Modules via creation of namespace objects for all imports. Implies 'allowSyntheticDefaultImports'. */ + // "preserveSymlinks": true, /* Do not resolve the real path of symlinks. */ + // "allowUmdGlobalAccess": true, /* Allow accessing UMD globals from modules. */ + + /* Source Map Options */ + // "sourceRoot": "", /* Specify the location where debugger should locate TypeScript files instead of source locations. */ + // "mapRoot": "", /* Specify the location where debugger should locate map files instead of generated locations. */ + // "inlineSourceMap": true, /* Emit a single file with source maps instead of having a separate file. */ + // "inlineSources": true, /* Emit the source alongside the sourcemaps within a single file; requires '--inlineSourceMap' or '--sourceMap' to be set. */ + + /* Experimental Options */ + // "experimentalDecorators": true, /* Enables experimental support for ES7 decorators. */ + // "emitDecoratorMetadata": true, /* Enables experimental support for emitting type metadata for decorators. */ + + /* Advanced Options */ + "skipLibCheck": true, /* Skip type checking of declaration files. */ + "forceConsistentCasingInFileNames": true /* Disallow inconsistently-cased references to the same file. */ + }, + "include": [ + "src/**/*.ts" + ] +}