diff --git a/.changeset/new-readers-dance.md b/.changeset/new-readers-dance.md new file mode 100644 index 00000000..3f686c30 --- /dev/null +++ b/.changeset/new-readers-dance.md @@ -0,0 +1,5 @@ +--- +'@powersync/common': patch +--- + +Rust sync client: Fix `connect()` resolving before a connection is made. diff --git a/packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts b/packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts index 85d493ef..4f6cb107 100644 --- a/packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts +++ b/packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts @@ -342,17 +342,18 @@ export abstract class AbstractStreamingSyncImplementation let checkedCrudItem: CrudEntry | undefined; while (true) { - this.updateSyncStatus({ - dataFlow: { - uploading: true - } - }); try { /** * This is the first item in the FIFO CRUD queue. */ const nextCrudItem = await this.options.adapter.nextCrudItem(); if (nextCrudItem) { + this.updateSyncStatus({ + dataFlow: { + uploading: true + } + }); + if (nextCrudItem.clientId == checkedCrudItem?.clientId) { // This will force a higher log level than exceptions which are caught here. this.logger.warn(`Potentially previously uploaded CRUD entries are still present in the upload queue. @@ -410,23 +411,15 @@ The next upload iteration will be delayed.`); this.abortController = controller; this.streamingSyncPromise = this.streamingSync(this.abortController.signal, options); - // Return a promise that resolves when the connection status is updated + // Return a promise that resolves when the connection status is updated to indicate that we're connected. return new Promise((resolve) => { const disposer = this.registerListener({ - statusUpdated: (update) => { - // This is triggered as soon as a connection is read from - if (typeof update.connected == 'undefined') { - // only concern with connection updates - return; - } - - if (update.connected == false) { - /** - * This function does not reject if initial connect attempt failed. - * Connected can be false if the connection attempt was aborted or if the initial connection - * attempt failed. - */ + statusChanged: (status) => { + if (status.dataFlowStatus.downloadError != null) { this.logger.warn('Initial connect attempt did not successfully connect to server'); + } else if (status.connecting) { + // Still connecting. + return; } disposer(); @@ -889,6 +882,10 @@ The next upload iteration will be delayed.`); ); } + // The rust client will set connected: true after the first sync line because that's when it gets invoked, but + // we're already connected here and can report that. + syncImplementation.updateSyncStatus({ connected: true }); + try { while (!controlInvocations.closed) { const line = await controlInvocations.read(); diff --git a/packages/node/tests/sync.test.ts b/packages/node/tests/sync.test.ts index e01f121d..f8fe6ac8 100644 --- a/packages/node/tests/sync.test.ts +++ b/packages/node/tests/sync.test.ts @@ -135,6 +135,49 @@ function defineSyncTests(impl: SyncClientImplementation) { expect(Math.abs(lastSyncedAt - now)).toBeLessThan(5000); }); + mockSyncServiceTest('connect() waits for connection', async ({ syncService }) => { + const database = await syncService.createDatabase(); + let connectCompleted = false; + database.connect(new TestConnector(), options).then(() => { + connectCompleted = true; + }); + expect(connectCompleted).toBeFalsy(); + + await vi.waitFor(() => expect(syncService.connectedListeners).toHaveLength(1)); + // We want connected: true once we have a connection + await vi.waitFor(() => connectCompleted); + expect(database.currentStatus.dataFlowStatus.downloading).toBeFalsy(); + + syncService.pushLine({ + checkpoint: { + last_op_id: '10', + buckets: [bucket('a', 10)] + } + }); + + await vi.waitFor(() => expect(database.currentStatus.dataFlowStatus.downloading).toBeTruthy()); + }); + + mockSyncServiceTest('does not set uploading status without local writes', async ({ syncService }) => { + const database = await syncService.createDatabase(); + database.registerListener({ + statusChanged(status) { + expect(status.dataFlowStatus.uploading).toBeFalsy(); + } + }); + + database.connect(new TestConnector(), options); + await vi.waitFor(() => expect(syncService.connectedListeners).toHaveLength(1)); + + syncService.pushLine({ + checkpoint: { + last_op_id: '10', + buckets: [bucket('a', 10)] + } + }); + await vi.waitFor(() => expect(database.currentStatus.dataFlowStatus.downloading).toBeTruthy()); + }); + describe('reports progress', () => { let lastOpId = 0;