From 2d75461055b66986fa568a26b7730bd253fab08f Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Tue, 24 Jun 2025 15:24:49 -0400 Subject: [PATCH 1/6] Fix connect() resolving early with Rust client --- .changeset/new-readers-dance.md | 5 +++++ .../AbstractStreamingSyncImplementation.ts | 7 ++++++- packages/node/tests/sync.test.ts | 21 +++++++++++++++++++ 3 files changed, 32 insertions(+), 1 deletion(-) create mode 100644 .changeset/new-readers-dance.md diff --git a/.changeset/new-readers-dance.md b/.changeset/new-readers-dance.md new file mode 100644 index 00000000..3f686c30 --- /dev/null +++ b/.changeset/new-readers-dance.md @@ -0,0 +1,5 @@ +--- +'@powersync/common': patch +--- + +Rust sync client: Fix `connect()` resolving before a connection is made. diff --git a/packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts b/packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts index 85d493ef..f78aa949 100644 --- a/packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts +++ b/packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts @@ -945,11 +945,16 @@ The next upload iteration will be delayed.`); } const info = instruction.UpdateSyncStatus.status; + const lastStatus = syncImplementation.syncStatus; const coreCompleteSync = info.priority_status.find((s) => s.priority == FULL_SYNC_PRIORITY); const completeSync = coreCompleteSync != null ? coreStatusToJs(coreCompleteSync) : null; syncImplementation.updateSyncStatus({ - connected: info.connected, + // The first update to the connected field should only happen when it has really changed - there's a + // statusUpdated listener in connect() that will only make the promise complete once connected has changed. + // We only want to apply that workaround while we're connecting because it's only relevant in the initial + // connect call. Afterwards, we want to forward the sync status unchanged. + connected: lastStatus.connected == info.connected && info.connecting ? undefined : info.connected, connecting: info.connecting, dataFlow: { downloading: info.downloading != null, diff --git a/packages/node/tests/sync.test.ts b/packages/node/tests/sync.test.ts index e01f121d..8f7d8fa3 100644 --- a/packages/node/tests/sync.test.ts +++ b/packages/node/tests/sync.test.ts @@ -135,6 +135,27 @@ function defineSyncTests(impl: SyncClientImplementation) { expect(Math.abs(lastSyncedAt - now)).toBeLessThan(5000); }); + mockSyncServiceTest('connect() waits for connection', async ({ syncService }) => { + const database = await syncService.createDatabase(); + let connectCompleted = false; + database.connect(new TestConnector(), options).then(() => { + connectCompleted = true; + }); + expect(connectCompleted).toBeFalsy(); + + await vi.waitFor(() => expect(syncService.connectedListeners).toHaveLength(1)); + // Opening the socket is not enough, we set connected: true when the first line is received. + expect(connectCompleted).toBeFalsy(); + + syncService.pushLine({ + checkpoint: { + last_op_id: '10', + buckets: [bucket('a', 10)] + } + }); + await vi.waitFor(() => connectCompleted); + }); + describe('reports progress', () => { let lastOpId = 0; From 8f007df9a5aea93184164c16fb9250677b879b41 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Mon, 30 Jun 2025 16:54:25 -0600 Subject: [PATCH 2/6] Update listener instead --- .../AbstractStreamingSyncImplementation.ts | 27 +++++-------------- 1 file changed, 7 insertions(+), 20 deletions(-) diff --git a/packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts b/packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts index f78aa949..03d976fd 100644 --- a/packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts +++ b/packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts @@ -410,23 +410,15 @@ The next upload iteration will be delayed.`); this.abortController = controller; this.streamingSyncPromise = this.streamingSync(this.abortController.signal, options); - // Return a promise that resolves when the connection status is updated + // Return a promise that resolves when the connection status is updated to indicate that we're connected. return new Promise((resolve) => { const disposer = this.registerListener({ - statusUpdated: (update) => { - // This is triggered as soon as a connection is read from - if (typeof update.connected == 'undefined') { - // only concern with connection updates - return; - } - - if (update.connected == false) { - /** - * This function does not reject if initial connect attempt failed. - * Connected can be false if the connection attempt was aborted or if the initial connection - * attempt failed. - */ + statusChanged(status) { + if (status.dataFlowStatus.downloadError != null) { this.logger.warn('Initial connect attempt did not successfully connect to server'); + } else if (status.connecting) { + // Still connecting. + return; } disposer(); @@ -945,16 +937,11 @@ The next upload iteration will be delayed.`); } const info = instruction.UpdateSyncStatus.status; - const lastStatus = syncImplementation.syncStatus; const coreCompleteSync = info.priority_status.find((s) => s.priority == FULL_SYNC_PRIORITY); const completeSync = coreCompleteSync != null ? coreStatusToJs(coreCompleteSync) : null; syncImplementation.updateSyncStatus({ - // The first update to the connected field should only happen when it has really changed - there's a - // statusUpdated listener in connect() that will only make the promise complete once connected has changed. - // We only want to apply that workaround while we're connecting because it's only relevant in the initial - // connect call. Afterwards, we want to forward the sync status unchanged. - connected: lastStatus.connected == info.connected && info.connecting ? undefined : info.connected, + connected: info.connected, connecting: info.connecting, dataFlow: { downloading: info.downloading != null, From cc726e93ad70daccbba5b6f78fa6970c98842bc1 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Mon, 30 Jun 2025 17:06:34 -0600 Subject: [PATCH 3/6] Set connected once we have a connection --- .../sync/stream/AbstractStreamingSyncImplementation.ts | 4 ++++ packages/node/tests/sync.test.ts | 8 +++++--- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts b/packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts index 03d976fd..6b7d4097 100644 --- a/packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts +++ b/packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts @@ -881,6 +881,10 @@ The next upload iteration will be delayed.`); ); } + // The rust client will set connected: true after the first sync line because that's when it gets invoked, but + // we're already connected here and can report that. + syncImplementation.updateSyncStatus({ connected: true }); + try { while (!controlInvocations.closed) { const line = await controlInvocations.read(); diff --git a/packages/node/tests/sync.test.ts b/packages/node/tests/sync.test.ts index 8f7d8fa3..2f73bba8 100644 --- a/packages/node/tests/sync.test.ts +++ b/packages/node/tests/sync.test.ts @@ -144,8 +144,9 @@ function defineSyncTests(impl: SyncClientImplementation) { expect(connectCompleted).toBeFalsy(); await vi.waitFor(() => expect(syncService.connectedListeners).toHaveLength(1)); - // Opening the socket is not enough, we set connected: true when the first line is received. - expect(connectCompleted).toBeFalsy(); + // We want connected: true once we have a connection + await vi.waitFor(() => connectCompleted); + expect(database.currentStatus.dataFlowStatus.downloading).toBeFalsy(); syncService.pushLine({ checkpoint: { @@ -153,7 +154,8 @@ function defineSyncTests(impl: SyncClientImplementation) { buckets: [bucket('a', 10)] } }); - await vi.waitFor(() => connectCompleted); + + await vi.waitFor(() => expect(database.currentStatus.dataFlowStatus.downloading).toBeTruthy()); }); describe('reports progress', () => { From 985baa3f6c0a7925e2294c63b158d485c5f99fb4 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Mon, 30 Jun 2025 17:14:09 -0600 Subject: [PATCH 4/6] Only set uploading when there are things to upload --- .../stream/AbstractStreamingSyncImplementation.ts | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts b/packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts index 6b7d4097..03a332d7 100644 --- a/packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts +++ b/packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts @@ -342,17 +342,18 @@ export abstract class AbstractStreamingSyncImplementation let checkedCrudItem: CrudEntry | undefined; while (true) { - this.updateSyncStatus({ - dataFlow: { - uploading: true - } - }); try { /** * This is the first item in the FIFO CRUD queue. */ const nextCrudItem = await this.options.adapter.nextCrudItem(); if (nextCrudItem) { + this.updateSyncStatus({ + dataFlow: { + uploading: true + } + }); + if (nextCrudItem.clientId == checkedCrudItem?.clientId) { // This will force a higher log level than exceptions which are caught here. this.logger.warn(`Potentially previously uploaded CRUD entries are still present in the upload queue. From c635c9f077f9e404e249ca68968a238eb8c4d402 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Mon, 30 Jun 2025 17:27:25 -0600 Subject: [PATCH 5/6] Don't set uploading unecessarily --- packages/node/tests/sync.test.ts | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/packages/node/tests/sync.test.ts b/packages/node/tests/sync.test.ts index 2f73bba8..f8fe6ac8 100644 --- a/packages/node/tests/sync.test.ts +++ b/packages/node/tests/sync.test.ts @@ -158,6 +158,26 @@ function defineSyncTests(impl: SyncClientImplementation) { await vi.waitFor(() => expect(database.currentStatus.dataFlowStatus.downloading).toBeTruthy()); }); + mockSyncServiceTest('does not set uploading status without local writes', async ({ syncService }) => { + const database = await syncService.createDatabase(); + database.registerListener({ + statusChanged(status) { + expect(status.dataFlowStatus.uploading).toBeFalsy(); + } + }); + + database.connect(new TestConnector(), options); + await vi.waitFor(() => expect(syncService.connectedListeners).toHaveLength(1)); + + syncService.pushLine({ + checkpoint: { + last_op_id: '10', + buckets: [bucket('a', 10)] + } + }); + await vi.waitFor(() => expect(database.currentStatus.dataFlowStatus.downloading).toBeTruthy()); + }); + describe('reports progress', () => { let lastOpId = 0; From 5c57ea8b2be116c39c79089b32c64a37d608a6c8 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Wed, 2 Jul 2025 16:03:38 -0600 Subject: [PATCH 6/6] Fix crash on logger invocation --- .../client/sync/stream/AbstractStreamingSyncImplementation.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts b/packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts index 03a332d7..4f6cb107 100644 --- a/packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts +++ b/packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts @@ -414,7 +414,7 @@ The next upload iteration will be delayed.`); // Return a promise that resolves when the connection status is updated to indicate that we're connected. return new Promise((resolve) => { const disposer = this.registerListener({ - statusChanged(status) { + statusChanged: (status) => { if (status.dataFlowStatus.downloadError != null) { this.logger.warn('Initial connect attempt did not successfully connect to server'); } else if (status.connecting) {