Skip to content

Fix connect() resolving early with Rust client #643

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Jul 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/new-readers-dance.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@powersync/common': patch
---

Rust sync client: Fix `connect()` resolving before a connection is made.
Original file line number Diff line number Diff line change
Expand Up @@ -342,17 +342,18 @@ export abstract class AbstractStreamingSyncImplementation
let checkedCrudItem: CrudEntry | undefined;

while (true) {
this.updateSyncStatus({
dataFlow: {
uploading: true
}
});
try {
/**
* This is the first item in the FIFO CRUD queue.
*/
const nextCrudItem = await this.options.adapter.nextCrudItem();
if (nextCrudItem) {
this.updateSyncStatus({
dataFlow: {
uploading: true
}
});

if (nextCrudItem.clientId == checkedCrudItem?.clientId) {
// This will force a higher log level than exceptions which are caught here.
this.logger.warn(`Potentially previously uploaded CRUD entries are still present in the upload queue.
Expand Down Expand Up @@ -410,23 +411,15 @@ The next upload iteration will be delayed.`);
this.abortController = controller;
this.streamingSyncPromise = this.streamingSync(this.abortController.signal, options);

// Return a promise that resolves when the connection status is updated
// Return a promise that resolves when the connection status is updated to indicate that we're connected.
return new Promise<void>((resolve) => {
const disposer = this.registerListener({
statusUpdated: (update) => {
// This is triggered as soon as a connection is read from
if (typeof update.connected == 'undefined') {
// only concern with connection updates
return;
}

if (update.connected == false) {
/**
* This function does not reject if initial connect attempt failed.
* Connected can be false if the connection attempt was aborted or if the initial connection
* attempt failed.
*/
statusChanged: (status) => {
if (status.dataFlowStatus.downloadError != null) {
this.logger.warn('Initial connect attempt did not successfully connect to server');
} else if (status.connecting) {
// Still connecting.
return;
}

disposer();
Expand Down Expand Up @@ -889,6 +882,10 @@ The next upload iteration will be delayed.`);
);
}

// The rust client will set connected: true after the first sync line because that's when it gets invoked, but
// we're already connected here and can report that.
syncImplementation.updateSyncStatus({ connected: true });

try {
while (!controlInvocations.closed) {
const line = await controlInvocations.read();
Expand Down
43 changes: 43 additions & 0 deletions packages/node/tests/sync.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,49 @@ function defineSyncTests(impl: SyncClientImplementation) {
expect(Math.abs(lastSyncedAt - now)).toBeLessThan(5000);
});

mockSyncServiceTest('connect() waits for connection', async ({ syncService }) => {
const database = await syncService.createDatabase();
let connectCompleted = false;
database.connect(new TestConnector(), options).then(() => {
connectCompleted = true;
});
expect(connectCompleted).toBeFalsy();

await vi.waitFor(() => expect(syncService.connectedListeners).toHaveLength(1));
// We want connected: true once we have a connection
await vi.waitFor(() => connectCompleted);
expect(database.currentStatus.dataFlowStatus.downloading).toBeFalsy();

syncService.pushLine({
checkpoint: {
last_op_id: '10',
buckets: [bucket('a', 10)]
}
});

await vi.waitFor(() => expect(database.currentStatus.dataFlowStatus.downloading).toBeTruthy());
});

mockSyncServiceTest('does not set uploading status without local writes', async ({ syncService }) => {
const database = await syncService.createDatabase();
database.registerListener({
statusChanged(status) {
expect(status.dataFlowStatus.uploading).toBeFalsy();
}
});

database.connect(new TestConnector(), options);
await vi.waitFor(() => expect(syncService.connectedListeners).toHaveLength(1));

syncService.pushLine({
checkpoint: {
last_op_id: '10',
buckets: [bucket('a', 10)]
}
});
await vi.waitFor(() => expect(database.currentStatus.dataFlowStatus.downloading).toBeTruthy());
});

describe('reports progress', () => {
let lastOpId = 0;

Expand Down