From a88eda94b4876eb8c1e25dc308ba9851eee0c150 Mon Sep 17 00:00:00 2001 From: "jakub.jedlikowski" Date: Wed, 4 Sep 2024 15:26:46 +0200 Subject: [PATCH 01/11] fix: handle outdated message in channel queue --- src/index.ts | 64 +++++++++++++++++++++++++++++++++++----------------- 1 file changed, 43 insertions(+), 21 deletions(-) diff --git a/src/index.ts b/src/index.ts index df193d1a5..be7f90186 100644 --- a/src/index.ts +++ b/src/index.ts @@ -5,6 +5,7 @@ import path from 'node:path' import { fileURLToPath, pathToFileURL } from 'node:url' import { MessageChannel, + MessagePort, type TransferListItem, Worker, parentPort, @@ -522,36 +523,57 @@ function startWorkerThread>( let nextID = 0 - const syncFn = (...args: Parameters): R => { - const id = nextID++ - - const msg: MainToWorkerMessage> = { id, args } - - worker.postMessage(msg) - - const status = Atomics.wait(sharedBufferView!, 0, 0, timeout) - - // Reset SharedArrayBuffer for next call + const receiveMessageWithId = ( + port: MessagePort, + expectedId: number, + waitingTimeout?: number, + ): WorkerToMainMessage => { + const start = Date.now() + const status = Atomics.wait(sharedBufferView!, 0, 0, waitingTimeout) Atomics.store(sharedBufferView!, 0, 0) - /* istanbul ignore if */ if (!['ok', 'not-equal'].includes(status)) { throw new Error('Internal error: Atomics.wait() failed: ' + status) } - const { - id: id2, - result, - error, - properties, - } = (receiveMessageOnPort(mainPort) as { message: WorkerToMainMessage }) - .message + const { id, ...message } = ( + receiveMessageOnPort(mainPort) as { message: WorkerToMainMessage } + ).message - /* istanbul ignore if */ - if (id !== id2) { - throw new Error(`Internal error: Expected id ${id} but got id ${id2}`) + if (id < expectedId) { + const waitingTime = Date.now() - start + console.log( + `Expected id ${expectedId} but got id ${id}, waiting again...`, + ) + return receiveMessageWithId( + port, + expectedId, + waitingTimeout ? waitingTimeout - waitingTime : undefined, + ) + } + + if (expectedId !== id) { + throw new Error( + `Internal error: Expected id ${expectedId} but got id ${id}`, + ) } + return { id, ...message } + } + + const syncFn = (...args: Parameters): R => { + const id = nextID++ + + const msg: MainToWorkerMessage> = { id, args } + + worker.postMessage(msg) + + const { result, error, properties } = receiveMessageWithId( + mainPort, + id, + timeout, + ) + if (error) { throw Object.assign(error as object, properties) } From ac6630cafe4b89909961604d70713b2c20475896 Mon Sep 17 00:00:00 2001 From: "jakub.jedlikowski" Date: Thu, 5 Sep 2024 09:02:54 +0200 Subject: [PATCH 02/11] chore: improve log about receiving old message --- src/index.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/index.ts b/src/index.ts index be7f90186..7c0d36f4b 100644 --- a/src/index.ts +++ b/src/index.ts @@ -543,7 +543,7 @@ function startWorkerThread>( if (id < expectedId) { const waitingTime = Date.now() - start console.log( - `Expected id ${expectedId} but got id ${id}, waiting again...`, + `Received old message ${id}, keep waiting for ${expectedId}...`, ) return receiveMessageWithId( port, From fede1c0c68b5f7618d2c924b496f8857cac51913 Mon Sep 17 00:00:00 2001 From: "jakub.jedlikowski" Date: Mon, 9 Sep 2024 08:07:32 +0200 Subject: [PATCH 03/11] fix: notify worker to not send response when main errors out --- src/index.ts | 18 +++++++++++++++--- src/types.ts | 5 +++++ 2 files changed, 20 insertions(+), 3 deletions(-) diff --git a/src/index.ts b/src/index.ts index 7c0d36f4b..513a1798d 100644 --- a/src/index.ts +++ b/src/index.ts @@ -20,6 +20,7 @@ import type { AnyAsyncFn, AnyFn, GlobalShim, + MainToWorkerAbortMessage, MainToWorkerMessage, Syncify, ValueOf, @@ -533,6 +534,8 @@ function startWorkerThread>( Atomics.store(sharedBufferView!, 0, 0) if (!['ok', 'not-equal'].includes(status)) { + const abortMsg: MainToWorkerAbortMessage = { id: expectedId, abort: true } + port.postMessage(abortMsg) throw new Error('Internal error: Atomics.wait() failed: ' + status) } @@ -542,9 +545,6 @@ function startWorkerThread>( if (id < expectedId) { const waitingTime = Date.now() - start - console.log( - `Received old message ${id}, keep waiting for ${expectedId}...`, - ) return receiveMessageWithId( port, expectedId, @@ -609,12 +609,24 @@ export function runAsWorker< ({ id, args }: MainToWorkerMessage>) => { // eslint-disable-next-line @typescript-eslint/no-floating-promises ;(async () => { + let isAborted = false + const handleAbortMessage = (msg: MainToWorkerAbortMessage) => { + if (msg.id === id && msg.abort) { + isAborted = true + } + } + workerPort.on('message', handleAbortMessage) let msg: WorkerToMainMessage try { msg = { id, result: await fn(...args) } } catch (error: unknown) { msg = { id, error, properties: extractProperties(error) } } + workerPort.off('message', handleAbortMessage) + // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition + if (isAborted) { + return + } workerPort.postMessage(msg) Atomics.add(sharedBufferView, 0, 1) Atomics.notify(sharedBufferView, 0) diff --git a/src/types.ts b/src/types.ts index b3fd7516a..05d047928 100644 --- a/src/types.ts +++ b/src/types.ts @@ -25,6 +25,11 @@ export interface MainToWorkerMessage { args: T } +export interface MainToWorkerAbortMessage { + id: number + abort: boolean +} + export interface WorkerData { sharedBuffer: SharedArrayBuffer workerPort: MessagePort From a1e3ceaf2d3432324920de0832a77162a71d2d50 Mon Sep 17 00:00:00 2001 From: "jakub.jedlikowski" Date: Mon, 30 Sep 2024 12:42:28 +0200 Subject: [PATCH 04/11] chore: test for executions after timeout, use command pattern for aborting in worker --- src/index.ts | 11 +++++++---- src/types.ts | 4 ++-- test/fn.spec.ts | 20 ++++++++++++++++++++ 3 files changed, 29 insertions(+), 6 deletions(-) diff --git a/src/index.ts b/src/index.ts index 513a1798d..9fc11e8aa 100644 --- a/src/index.ts +++ b/src/index.ts @@ -20,7 +20,7 @@ import type { AnyAsyncFn, AnyFn, GlobalShim, - MainToWorkerAbortMessage, + MainToWorkerCommandMessage, MainToWorkerMessage, Syncify, ValueOf, @@ -534,7 +534,10 @@ function startWorkerThread>( Atomics.store(sharedBufferView!, 0, 0) if (!['ok', 'not-equal'].includes(status)) { - const abortMsg: MainToWorkerAbortMessage = { id: expectedId, abort: true } + const abortMsg: MainToWorkerCommandMessage = { + id: expectedId, + cmd: 'abort', + } port.postMessage(abortMsg) throw new Error('Internal error: Atomics.wait() failed: ' + status) } @@ -610,8 +613,8 @@ export function runAsWorker< // eslint-disable-next-line @typescript-eslint/no-floating-promises ;(async () => { let isAborted = false - const handleAbortMessage = (msg: MainToWorkerAbortMessage) => { - if (msg.id === id && msg.abort) { + const handleAbortMessage = (msg: MainToWorkerCommandMessage) => { + if (msg.id === id && msg.cmd === 'abort') { isAborted = true } } diff --git a/src/types.ts b/src/types.ts index 05d047928..5f7556093 100644 --- a/src/types.ts +++ b/src/types.ts @@ -25,9 +25,9 @@ export interface MainToWorkerMessage { args: T } -export interface MainToWorkerAbortMessage { +export interface MainToWorkerCommandMessage { id: number - abort: boolean + cmd: string } export interface WorkerData { diff --git a/test/fn.spec.ts b/test/fn.spec.ts index 3e68e5316..ee9983546 100644 --- a/test/fn.spec.ts +++ b/test/fn.spec.ts @@ -104,6 +104,26 @@ test('timeout', async () => { ) }) +test('subsequent executions after timeout', async () => { + const SYNCKIT_TIMEOUT = 30 + process.env.SYNCKIT_TIMEOUT = SYNCKIT_TIMEOUT.toString() + + const { createSyncFn } = await import('synckit') + const syncFn = createSyncFn(workerCjsPath) + + // start an execution in worker that will definitely time out + expect(() => syncFn(1, SYNCKIT_TIMEOUT * 2)).toThrow( + 'Internal error: Atomics.wait() failed: timed-out', + ) + + // wait for timed out execution to finish inside worker + await new Promise(resolve => setTimeout(resolve, SYNCKIT_TIMEOUT * 2)) + + // subsequent executions should work correctly + expect(syncFn(2, 1)).toBe(2) + expect(syncFn(3, 1)).toBe(3) +}) + test('globalShims env', async () => { process.env.SYNCKIT_GLOBAL_SHIMS = '1' From f9621094057ba2a1c93c028ea7187d0342f77834 Mon Sep 17 00:00:00 2001 From: "jakub.jedlikowski" Date: Mon, 30 Sep 2024 13:41:06 +0200 Subject: [PATCH 05/11] chore: increase duration of long-running task in test --- test/fn.spec.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test/fn.spec.ts b/test/fn.spec.ts index ee9983546..30d20ca43 100644 --- a/test/fn.spec.ts +++ b/test/fn.spec.ts @@ -112,12 +112,12 @@ test('subsequent executions after timeout', async () => { const syncFn = createSyncFn(workerCjsPath) // start an execution in worker that will definitely time out - expect(() => syncFn(1, SYNCKIT_TIMEOUT * 2)).toThrow( + expect(() => syncFn(1, SYNCKIT_TIMEOUT * 3)).toThrow( 'Internal error: Atomics.wait() failed: timed-out', ) // wait for timed out execution to finish inside worker - await new Promise(resolve => setTimeout(resolve, SYNCKIT_TIMEOUT * 2)) + await new Promise(resolve => setTimeout(resolve, SYNCKIT_TIMEOUT * 3)) // subsequent executions should work correctly expect(syncFn(2, 1)).toBe(2) From 2dabea11bf95fef9b0d8d2ffc63410feb5d03124 Mon Sep 17 00:00:00 2001 From: "jakub.jedlikowski" Date: Mon, 30 Sep 2024 13:56:49 +0200 Subject: [PATCH 06/11] chore: don't overwrite module-scoped constant --- test/fn.spec.ts | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/test/fn.spec.ts b/test/fn.spec.ts index 30d20ca43..d0f139deb 100644 --- a/test/fn.spec.ts +++ b/test/fn.spec.ts @@ -105,19 +105,17 @@ test('timeout', async () => { }) test('subsequent executions after timeout', async () => { - const SYNCKIT_TIMEOUT = 30 - process.env.SYNCKIT_TIMEOUT = SYNCKIT_TIMEOUT.toString() + const executionTimeout = 30 + process.env.SYNCKIT_TIMEOUT = executionTimeout.toString() const { createSyncFn } = await import('synckit') const syncFn = createSyncFn(workerCjsPath) // start an execution in worker that will definitely time out - expect(() => syncFn(1, SYNCKIT_TIMEOUT * 3)).toThrow( - 'Internal error: Atomics.wait() failed: timed-out', - ) + expect(() => syncFn(1, executionTimeout * 3)).toThrow() // wait for timed out execution to finish inside worker - await new Promise(resolve => setTimeout(resolve, SYNCKIT_TIMEOUT * 3)) + await new Promise(resolve => setTimeout(resolve, executionTimeout * 3)) // subsequent executions should work correctly expect(syncFn(2, 1)).toBe(2) From e1dd83bcb697c09747a576a3752fa18cd8fae4c4 Mon Sep 17 00:00:00 2001 From: "jakub.jedlikowski" Date: Mon, 30 Sep 2024 14:05:51 +0200 Subject: [PATCH 07/11] chore: further increase long running task duration --- test/fn.spec.ts | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/test/fn.spec.ts b/test/fn.spec.ts index d0f139deb..1203fde8d 100644 --- a/test/fn.spec.ts +++ b/test/fn.spec.ts @@ -106,16 +106,17 @@ test('timeout', async () => { test('subsequent executions after timeout', async () => { const executionTimeout = 30 + const longRunningTaskDuration = executionTimeout * 10 process.env.SYNCKIT_TIMEOUT = executionTimeout.toString() const { createSyncFn } = await import('synckit') const syncFn = createSyncFn(workerCjsPath) // start an execution in worker that will definitely time out - expect(() => syncFn(1, executionTimeout * 3)).toThrow() + expect(() => syncFn(1, longRunningTaskDuration)).toThrow() // wait for timed out execution to finish inside worker - await new Promise(resolve => setTimeout(resolve, executionTimeout * 3)) + await new Promise(resolve => setTimeout(resolve, longRunningTaskDuration)) // subsequent executions should work correctly expect(syncFn(2, 1)).toBe(2) From 7fe6942e1fe5802f03a0563744f958f48bda8ed4 Mon Sep 17 00:00:00 2001 From: "jakub.jedlikowski" Date: Wed, 2 Oct 2024 14:30:17 +0200 Subject: [PATCH 08/11] chore: tests for outdated and unexpected messages from worker --- test/fn.spec.ts | 98 ++++++++++++++++++++++++++++++++++++++++++++++++- test/helpers.ts | 25 +++++++++++++ 2 files changed, 122 insertions(+), 1 deletion(-) diff --git a/test/fn.spec.ts b/test/fn.spec.ts index 1203fde8d..97d38d529 100644 --- a/test/fn.spec.ts +++ b/test/fn.spec.ts @@ -3,7 +3,12 @@ import path from 'node:path' import { jest } from '@jest/globals' -import { _dirname, testIf, tsUseEsmSupported } from './helpers.js' +import { + _dirname, + setupReceiveMessageOnPortMock, + testIf, + tsUseEsmSupported, +} from './helpers.js' import type { AsyncWorkerFn } from './types.js' import { createSyncFn } from 'synckit' @@ -12,6 +17,7 @@ const { SYNCKIT_TIMEOUT } = process.env beforeEach(() => { jest.resetModules() + jest.restoreAllMocks() delete process.env.SYNCKIT_GLOBAL_SHIMS @@ -123,6 +129,96 @@ test('subsequent executions after timeout', async () => { expect(syncFn(3, 1)).toBe(3) }) +test('handling of outdated message from worker', async () => { + const synckitTimeout = 60 + process.env.SYNCKIT_TIMEOUT = synckitTimeout.toString() + const receiveMessageOnPortMock = await setupReceiveMessageOnPortMock() + + jest.spyOn(Atomics, 'wait').mockReturnValue('ok') + + receiveMessageOnPortMock + .mockReturnValueOnce({ message: { id: -1 } }) + .mockReturnValueOnce({ message: { id: 0, result: 1 } }) + + const { createSyncFn } = await import('synckit') + const syncFn = createSyncFn(workerCjsPath) + expect(syncFn(1)).toBe(1) + expect(receiveMessageOnPortMock).toHaveBeenCalledTimes(2) +}) + +test('propagation of undefined timeout', async () => { + const receiveMessageOnPortMock = await setupReceiveMessageOnPortMock() + + jest.spyOn(Atomics, 'wait').mockReturnValue('ok') + + receiveMessageOnPortMock + .mockReturnValueOnce({ message: { id: -1 } }) + .mockReturnValueOnce({ message: { id: 0, result: 1 } }) + + const { createSyncFn } = await import('synckit') + const syncFn = createSyncFn(workerCjsPath) + expect(syncFn(1)).toBe(1) + expect(receiveMessageOnPortMock).toHaveBeenCalledTimes(2) + + const [firstAtomicsWaitArgs, secondAtomicsWaitArgs] = ( + Atomics.wait as unknown as jest.SpiedFunction + ).mock.calls + const [, , , firstAtomicsWaitCallTimeout] = firstAtomicsWaitArgs + const [, , , secondAtomicsWaitCallTimeout] = secondAtomicsWaitArgs + + expect(typeof firstAtomicsWaitCallTimeout).toBe('undefined') + expect(typeof secondAtomicsWaitCallTimeout).toBe('undefined') +}) + +test('reduction of waiting time', async () => { + const synckitTimeout = 60 + process.env.SYNCKIT_TIMEOUT = synckitTimeout.toString() + const receiveMessageOnPortMock = await setupReceiveMessageOnPortMock() + + jest.spyOn(Atomics, 'wait').mockImplementation(() => { + const start = Date.now() + // simulate waiting 10ms for worker to respond + while (Date.now() - start < 10) { + continue + } + + return 'ok' + }) + + receiveMessageOnPortMock + .mockReturnValueOnce({ message: { id: -1 } }) + .mockReturnValueOnce({ message: { id: 0, result: 1 } }) + + const { createSyncFn } = await import('synckit') + const syncFn = createSyncFn(workerCjsPath) + expect(syncFn(1)).toBe(1) + expect(receiveMessageOnPortMock).toHaveBeenCalledTimes(2) + + const [firstAtomicsWaitArgs, secondAtomicsWaitArgs] = ( + Atomics.wait as unknown as jest.SpiedFunction + ).mock.calls + const [, , , firstAtomicsWaitCallTimeout] = firstAtomicsWaitArgs + const [, , , secondAtomicsWaitCallTimeout] = secondAtomicsWaitArgs + + expect(typeof firstAtomicsWaitCallTimeout).toBe('number') + expect(firstAtomicsWaitCallTimeout).toBe(synckitTimeout) + expect(typeof secondAtomicsWaitCallTimeout).toBe('number') + expect(secondAtomicsWaitCallTimeout).toBeLessThan(synckitTimeout) +}) + +test('unexpected message from worker', async () => { + jest.spyOn(Atomics, 'wait').mockReturnValue('ok') + + const receiveMessageOnPortMock = await setupReceiveMessageOnPortMock() + receiveMessageOnPortMock.mockReturnValueOnce({ message: { id: 100 } }) + + const { createSyncFn } = await import('synckit') + const syncFn = createSyncFn(workerCjsPath) + expect(() => syncFn(1)).toThrow( + 'Internal error: Expected id 0 but got id 100', + ) +}) + test('globalShims env', async () => { process.env.SYNCKIT_GLOBAL_SHIMS = '1' diff --git a/test/helpers.ts b/test/helpers.ts index 086922403..7a9d7f14e 100644 --- a/test/helpers.ts +++ b/test/helpers.ts @@ -1,5 +1,8 @@ import path from 'node:path' import { fileURLToPath } from 'node:url' +import WorkerThreads from 'node:worker_threads' + +import { jest } from '@jest/globals' import { MTS_SUPPORTED_NODE_VERSION } from 'synckit' @@ -13,3 +16,25 @@ export const tsUseEsmSupported = nodeVersion >= MTS_SUPPORTED_NODE_VERSION && nodeVersion <= 18.18 export const testIf = (condition: boolean) => (condition ? it : it.skip) + +type ReceiveMessageOnPortMock = jest.Mock< + typeof WorkerThreads.receiveMessageOnPort +> +export const setupReceiveMessageOnPortMock = + async (): Promise => { + jest.unstable_mockModule('node:worker_threads', () => { + return { + ...WorkerThreads, + receiveMessageOnPort: jest.fn(WorkerThreads.receiveMessageOnPort), + __esModule: true, + } + }) + + const { receiveMessageOnPort: receiveMessageOnPortMock } = (await import( + 'node:worker_threads' + )) as unknown as { + receiveMessageOnPort: ReceiveMessageOnPortMock + } + + return receiveMessageOnPortMock + } From 5cedc7b92679543c15e9069049cd3c5ad27095ac Mon Sep 17 00:00:00 2001 From: "jakub.jedlikowski" Date: Wed, 2 Oct 2024 14:33:03 +0200 Subject: [PATCH 09/11] chore: remove unnecessary __esModule flag in mock --- test/helpers.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/test/helpers.ts b/test/helpers.ts index 7a9d7f14e..b1a224bf5 100644 --- a/test/helpers.ts +++ b/test/helpers.ts @@ -26,7 +26,6 @@ export const setupReceiveMessageOnPortMock = return { ...WorkerThreads, receiveMessageOnPort: jest.fn(WorkerThreads.receiveMessageOnPort), - __esModule: true, } }) From 40fb269cdea134762f8cf6e8daf868f4533b2fcd Mon Sep 17 00:00:00 2001 From: "jakub.jedlikowski" Date: Wed, 2 Oct 2024 14:40:11 +0200 Subject: [PATCH 10/11] chore: improve timeout args tests --- test/fn.spec.ts | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/test/fn.spec.ts b/test/fn.spec.ts index 97d38d529..e1c6bd948 100644 --- a/test/fn.spec.ts +++ b/test/fn.spec.ts @@ -130,8 +130,8 @@ test('subsequent executions after timeout', async () => { }) test('handling of outdated message from worker', async () => { - const synckitTimeout = 60 - process.env.SYNCKIT_TIMEOUT = synckitTimeout.toString() + const executionTimeout = 60 + process.env.SYNCKIT_TIMEOUT = executionTimeout.toString() const receiveMessageOnPortMock = await setupReceiveMessageOnPortMock() jest.spyOn(Atomics, 'wait').mockReturnValue('ok') @@ -147,9 +147,10 @@ test('handling of outdated message from worker', async () => { }) test('propagation of undefined timeout', async () => { + delete process.env.SYNCKIT_TIMEOUT const receiveMessageOnPortMock = await setupReceiveMessageOnPortMock() - jest.spyOn(Atomics, 'wait').mockReturnValue('ok') + const atomicsWaitSpy = jest.spyOn(Atomics, 'wait').mockReturnValue('ok') receiveMessageOnPortMock .mockReturnValueOnce({ message: { id: -1 } }) @@ -160,9 +161,8 @@ test('propagation of undefined timeout', async () => { expect(syncFn(1)).toBe(1) expect(receiveMessageOnPortMock).toHaveBeenCalledTimes(2) - const [firstAtomicsWaitArgs, secondAtomicsWaitArgs] = ( - Atomics.wait as unknown as jest.SpiedFunction - ).mock.calls + const [firstAtomicsWaitArgs, secondAtomicsWaitArgs] = + atomicsWaitSpy.mock.calls const [, , , firstAtomicsWaitCallTimeout] = firstAtomicsWaitArgs const [, , , secondAtomicsWaitCallTimeout] = secondAtomicsWaitArgs @@ -175,7 +175,7 @@ test('reduction of waiting time', async () => { process.env.SYNCKIT_TIMEOUT = synckitTimeout.toString() const receiveMessageOnPortMock = await setupReceiveMessageOnPortMock() - jest.spyOn(Atomics, 'wait').mockImplementation(() => { + const atomicsWaitSpy = jest.spyOn(Atomics, 'wait').mockImplementation(() => { const start = Date.now() // simulate waiting 10ms for worker to respond while (Date.now() - start < 10) { @@ -194,9 +194,8 @@ test('reduction of waiting time', async () => { expect(syncFn(1)).toBe(1) expect(receiveMessageOnPortMock).toHaveBeenCalledTimes(2) - const [firstAtomicsWaitArgs, secondAtomicsWaitArgs] = ( - Atomics.wait as unknown as jest.SpiedFunction - ).mock.calls + const [firstAtomicsWaitArgs, secondAtomicsWaitArgs] = + atomicsWaitSpy.mock.calls const [, , , firstAtomicsWaitCallTimeout] = firstAtomicsWaitArgs const [, , , secondAtomicsWaitCallTimeout] = secondAtomicsWaitArgs From 44881dfcd8bf7933b696e8342e5e64eb8bd37488 Mon Sep 17 00:00:00 2001 From: JounQin Date: Mon, 7 Oct 2024 15:36:55 +0800 Subject: [PATCH 11/11] Create violet-laws-compare.md --- .changeset/violet-laws-compare.md | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 .changeset/violet-laws-compare.md diff --git a/.changeset/violet-laws-compare.md b/.changeset/violet-laws-compare.md new file mode 100644 index 000000000..cbf4399e0 --- /dev/null +++ b/.changeset/violet-laws-compare.md @@ -0,0 +1,5 @@ +--- +"synckit": patch +--- + +fix: handle outdated message in channel queue