diff --git a/package.json b/package.json index de77368..342b110 100644 --- a/package.json +++ b/package.json @@ -1,7 +1,7 @@ { "name": "zeed", "type": "module", - "version": "0.22.8", + "version": "0.22.9", "description": "🌱 Simple foundation library", "author": { "name": "Dirk Holtwick", @@ -70,7 +70,7 @@ }, "devDependencies": { "@antfu/eslint-config": "<2.22", - "@antfu/ni": "^0.21.12", + "@antfu/ni": "<0.22.0", "@types/node": "^20.14.10", "@vitejs/plugin-vue": "^5.0.5", "@vitest/browser": "^2.0.2", diff --git a/src/common/exec/promise.ts b/src/common/exec/promise.ts index d464482..a9fe1f6 100644 --- a/src/common/exec/promise.ts +++ b/src/common/exec/promise.ts @@ -98,7 +98,10 @@ export async function tryTimeout( }) } -/** Wait for `event` on `obj` to emit. Resolve with result or reject on `timeout` */ +/** + * @deprecated use emitter.waitOn + * Wait for `event` on `obj` to emit. Resolve with result or reject on `timeout` + */ export function waitOn( obj: any, event: string, diff --git a/src/common/msg/emitter.spec.ts b/src/common/msg/emitter.spec.ts index f8e302c..8a120a5 100644 --- a/src/common/msg/emitter.spec.ts +++ b/src/common/msg/emitter.spec.ts @@ -1,11 +1,8 @@ import { vi } from 'vitest' -import { detect } from '../platform' import { sleep, waitOn } from '../exec/promise' import { getSecureRandomIfPossible } from '../data/math' import { Emitter, getGlobalEmitter } from './emitter' -const platform = detect() - declare global { interface ZeedGlobalEmitter { a: (n: number) => void @@ -20,7 +17,8 @@ interface LazyEvent { obj: any } -export function lazyListener( +/** @deprecated use waitOn */ +function lazyListener( emitter: any, listenerKey?: string, ): (key?: string, skipUnmatched?: boolean) => Promise { @@ -182,16 +180,34 @@ describe('emitter', () => { const v = await waitOn(e1, 'f') expect(v).toBe(1) - if (platform.test) { - await expect(waitOn(e1, 'x', 10)).rejects.toThrow( - 'Did not response in time', - ) - // } else { - // // https://jasmine.github.io/api/3.5/global - // await expectAsync(on(e1, "x", 10)).toBeRejectedWithError( - // "Did not response in time" - // ) - } + await expect(waitOn(e1, 'x', 10)).rejects.toThrow( + 'Did not response in time', + ) + // } else { + // // https://jasmine.github.io/api/3.5/global + // await expectAsync(on(e1, "x", 10)).toBeRejectedWithError( + // "Did not response in time" + // ) + }) + + it('should wait on integrated', async () => { + expect.assertions(2) + + const e1 = new Emitter<{ + f: (v: number) => void + x: () => void + }>() + + queueMicrotask(() => { + void e1.emit('f', 1) + }) + + const v = await e1.waitOn('f', 100) + expect(v).toBe(1) + + await expect(e1.waitOn('x', 10)).rejects.toThrow( + 'Did not response in time', + ) }) it('should work lazy', async () => { diff --git a/src/common/msg/emitter.ts b/src/common/msg/emitter.ts index ec9d70d..cfe98f6 100644 --- a/src/common/msg/emitter.ts +++ b/src/common/msg/emitter.ts @@ -1,3 +1,4 @@ +import type { ArgumentsType } from 'vitest' import type { DisposerFunction } from '../dispose-types' import { getGlobalContext } from '../global' import { DefaultLogger } from '../log' @@ -146,6 +147,27 @@ export class Emitter< this.subscribers = {} return this } + + /// + + waitOn[0]>( + event: U, + timeoutMS = 1000, + ): Promise { + return new Promise((resolve, reject) => { + let timer: any + + const dispose = this.once(event, ((value): void => { + clearTimeout(timer) + resolve(value) + }) as LocalListener[U]) + + timer = setTimeout(() => { + dispose() + reject(new Error('Did not response in time')) + }, timeoutMS) + }) + } } declare global { diff --git a/src/common/msg/rpc.spec.ts b/src/common/msg/rpc.spec.ts index 4fc7f93..c4f8e0f 100644 --- a/src/common/msg/rpc.spec.ts +++ b/src/common/msg/rpc.spec.ts @@ -1,4 +1,3 @@ -import { MessageChannel } from 'node:worker_threads' import { decodeJson, encodeJson } from '../bin' import { cloneObject } from '../data' import { useStringHashPool } from '../data/string-hash-pool' @@ -36,7 +35,7 @@ describe('rpc async', () => { it('basic', async () => { const log: any[] = [] - const channel = new MessageChannel() + const [c1, c2] = createLocalChannelPair() const serialize = (data: any) => { log.push(cloneObject(data)) @@ -45,8 +44,8 @@ describe('rpc async', () => { const deserialize = (data: any) => decodeJson(data) const bob = useRPC(Bob, { - post: data => channel.port1.postMessage(data), - on: data => channel.port1.on('message', data), + post: data => c1.postMessage(data), + on: data => c1.on('message', e => data(e.data)), serialize, deserialize, stringHashPool: useStringHashPool(), @@ -55,9 +54,9 @@ describe('rpc async', () => { const alice = useRPC(Alice, { // mark bob's `bump` as an event without response eventNames: ['bump'], - post: data => channel.port2.postMessage(data), + post: data => c2.postMessage(data), - on: data => channel.port2.on('message', data), + on: data => c2.on('message', e => data(e.data)), serialize, deserialize, stringHashPool: useStringHashPool(), @@ -74,8 +73,8 @@ describe('rpc async', () => { await new Promise(resolve => setTimeout(resolve, 100)) expect(Bob.getCount()).toBe(1) - channel.port1.close() - channel.port2.close() + c1.close() + c2.close() expect(log).toMatchInlineSnapshot(` Array [ @@ -110,13 +109,14 @@ describe('rpc async', () => { }) it('hub', async () => { - const channel = new MessageChannel() + const [c1, c2] = createLocalChannelPair() + const serialize = (data: any) => encodeJson(data) const deserialize = (data: any) => decodeJson(data) const bobHub = useRPCHub({ - post: data => channel.port1.postMessage(data), - on: data => channel.port1.on('message', data), + post: data => c1.postMessage(data), + on: data => c1.on('message', e => data(e.data)), serialize, deserialize, stringHashPool: useStringHashPool(), @@ -127,9 +127,9 @@ describe('rpc async', () => { const alice = useRPC(Alice, { // mark bob's `bump` as an event without response eventNames: ['bump'], - post: data => channel.port2.postMessage(data), + post: data => c2.postMessage(data), - on: data => channel.port2.on('message', data), + on: data => c2.on('message', e => data(e.data)), serialize, deserialize, stringHashPool: useStringHashPool(), @@ -146,8 +146,8 @@ describe('rpc async', () => { await new Promise(resolve => setTimeout(resolve, 100)) expect(Bob.getCount()).toBe(1) - channel.port1.close() - channel.port2.close() + c1.close() + c2.close() }) it('timeout async', async (done) => {