diff --git a/crates/core/src/sync/storage_adapter.rs b/crates/core/src/sync/storage_adapter.rs index 2bc906c..2e1174c 100644 --- a/crates/core/src/sync/storage_adapter.rs +++ b/crates/core/src/sync/storage_adapter.rs @@ -459,6 +459,24 @@ impl StorageAdapter { self.delete_subscription.exec()?; Ok(()) } + + pub fn local_state(&self) -> Result, PowerSyncError> { + let stmt = self + .db + .prepare_v2("SELECT target_op FROM ps_buckets WHERE name = ?")?; + stmt.bind_text(1, "$local", sqlite_nostd::Destructor::STATIC)?; + + Ok(if stmt.step()? == ResultCode::ROW { + let target_op = stmt.column_int64(0); + Some(LocalState { target_op }) + } else { + None + }) + } +} + +pub struct LocalState { + pub target_op: i64, } pub struct BucketInfo { diff --git a/crates/core/src/sync/streaming_sync.rs b/crates/core/src/sync/streaming_sync.rs index 7b7c6e0..ef48a93 100644 --- a/crates/core/src/sync/streaming_sync.rs +++ b/crates/core/src/sync/streaming_sync.rs @@ -448,6 +448,12 @@ impl StreamingSyncIteration { |s| s.start_tracking_checkpoint(progress, subscription_state), &mut event.instructions, ); + + // Technically, we could still try to apply a pending checkpoint after receiving a + // new one. However, sync_local assumes it's only called in a state where there's no + // pending checkpoint, so we'd have to take the oplog state at the time we've + // originally received the validated-but-not-applied checkpoint. This is likely not + // something worth doing. self.validated_but_not_applied = None; *target = updated_target; } @@ -515,28 +521,7 @@ impl StreamingSyncIteration { SyncEvent::BinaryLine { data } => bson::from_bytes(data) .map_err(|e| PowerSyncError::sync_protocol_error("invalid binary line", e))?, SyncEvent::UploadFinished => { - if let Some(checkpoint) = self.validated_but_not_applied.take() { - let result = self.sync_local(&checkpoint, None)?; - - match result { - SyncLocalResult::ChangesApplied => { - event.instructions.push(Instruction::LogLine { - severity: LogSeverity::DEBUG, - line: "Applied pending checkpoint after completed upload" - .into(), - }); - - self.handle_checkpoint_applied(event, self.adapter.now()?); - } - _ => { - event.instructions.push(Instruction::LogLine { - severity: LogSeverity::WARNING, - line: "Could not apply pending checkpoint even after completed upload" - .into(), - }); - } - } - } + self.try_applying_write_after_completed_upload(event)?; continue; } @@ -608,6 +593,43 @@ impl StreamingSyncIteration { Ok(progress) } + fn try_applying_write_after_completed_upload( + &mut self, + event: &mut ActiveEvent, + ) -> Result<(), PowerSyncError> { + let Some(checkpoint) = self.validated_but_not_applied.take() else { + return Ok(()); + }; + + let target_write = self.adapter.local_state()?.map(|e| e.target_op); + if checkpoint.write_checkpoint < target_write { + // Note: None < Some(x). The pending checkpoint does not contain the write + // checkpoint created during the upload, so we don't have to try applying it, it's + // guaranteed to be outdated. + return Ok(()); + } + + let result = self.sync_local(&checkpoint, None)?; + match result { + SyncLocalResult::ChangesApplied => { + event.instructions.push(Instruction::LogLine { + severity: LogSeverity::DEBUG, + line: "Applied pending checkpoint after completed upload".into(), + }); + + self.handle_checkpoint_applied(event, self.adapter.now()?); + } + _ => { + event.instructions.push(Instruction::LogLine { + severity: LogSeverity::WARNING, + line: "Could not apply pending checkpoint even after completed upload".into(), + }); + } + } + + Ok(()) + } + /// Reconciles local stream subscriptions with service-side state received in a checkpoint. /// /// This involves: diff --git a/crates/core/src/sync_local.rs b/crates/core/src/sync_local.rs index ed77bf6..17cc085 100644 --- a/crates/core/src/sync_local.rs +++ b/crates/core/src/sync_local.rs @@ -102,17 +102,10 @@ impl<'a> SyncOperation<'a> { if needs_check { // language=SQLite let statement = self.db.prepare_v2( - "\ - SELECT group_concat(name) - FROM ps_buckets - WHERE target_op > last_op AND name = '$local'", + "SELECT 1 FROM ps_buckets WHERE target_op > last_op AND name = '$local'", )?; - if statement.step()? != ResultCode::ROW { - return Err(PowerSyncError::unknown_internal()); - } - - if statement.column_type(0)? == ColumnType::Text { + if statement.step()? == ResultCode::ROW { return Ok(false); } diff --git a/dart/test/sync_test.dart b/dart/test/sync_test.dart index 0cfdfe1..60c6c35 100644 --- a/dart/test/sync_test.dart +++ b/dart/test/sync_test.dart @@ -121,8 +121,14 @@ void _syncTests({ } List pushCheckpoint( - {int lastOpId = 1, List buckets = const []}) { - return syncLine(checkpoint(lastOpId: lastOpId, buckets: buckets)); + {int lastOpId = 1, + List buckets = const [], + String? writeCheckpoint}) { + return syncLine(checkpoint( + lastOpId: lastOpId, + buckets: buckets, + writeCheckpoint: writeCheckpoint, + )); } List pushCheckpointComplete({int? priority, String lastOpId = '1'}) { @@ -676,6 +682,123 @@ void _syncTests({ }); }); + group('applies pending changes', () { + test('write checkpoint before upload complete', () { + // local write while offline + db.execute("insert into items (id, col) values ('local', 'data');"); + invokeControl('start', null); + + // Start upload process. Assume data has been uploaded and a write + // checkpoint has been requested, but not received yet. + db.execute('DELETE FROM ps_crud'); + pushCheckpoint(buckets: priorityBuckets, writeCheckpoint: '1'); + pushSyncData('prio1', '1', 'row-0', 'PUT', {'col': 'hi'}); + expect(pushCheckpointComplete(), [ + containsPair('LogLine', { + 'severity': 'INFO', + 'line': contains('Will retry at completed upload') + }) + ]); + + // Now complete the upload process. + db.execute(r"UPDATE ps_buckets SET target_op = 1 WHERE name = '$local'"); + invokeControl('completed_upload', null); + + // This should apply the pending write checkpoint. + expect(fetchRows(), [ + {'id': 'row-0', 'col': 'hi'} + ]); + }); + + test('write checkpoint with synced data', () { + // local write while offline + db.execute("insert into items (id, col) values ('local', 'data');"); + invokeControl('start', null); + + // Complete upload process + db.execute('DELETE FROM ps_crud'); + db.execute(r"UPDATE ps_buckets SET target_op = 1 WHERE name = '$local'"); + expect(invokeControl('completed_upload', null), isEmpty); + + // Sync afterwards containing data and write checkpoint. + pushCheckpoint(buckets: priorityBuckets, writeCheckpoint: '1'); + pushSyncData('prio1', '1', 'row-0', 'PUT', {'col': 'hi'}); + pushCheckpointComplete(); + expect(fetchRows(), [ + {'id': 'row-0', 'col': 'hi'} + ]); + }); + + test('write checkpoint after synced data', () { + // local write while offline + db.execute("insert into items (id, col) values ('local', 'data');"); + invokeControl('start', null); + + // Upload changes, assume that triggered a checkpoint. + db.execute('DELETE FROM ps_crud'); + pushCheckpoint(buckets: priorityBuckets); + pushSyncData('prio1', '1', 'row-0', 'PUT', {'col': 'hi'}); + expect(pushCheckpointComplete(), [ + containsPair('LogLine', { + 'severity': 'INFO', + 'line': contains('Will retry at completed upload') + }) + ]); + + // Now the upload is complete and requests a write checkpoint + db.execute(r"UPDATE ps_buckets SET target_op = 1 WHERE name = '$local'"); + expect(invokeControl('completed_upload', null), isEmpty); + + // Which triggers a new iteration + pushCheckpoint(buckets: priorityBuckets, writeCheckpoint: '1'); + expect( + pushCheckpointComplete(), + contains(containsPair('LogLine', { + 'severity': 'DEBUG', + 'line': contains('Validated and applied checkpoint') + }))); + + expect(fetchRows(), [ + {'id': 'row-0', 'col': 'hi'} + ]); + }); + + test('second local write', () { + // first local write while offline + db.execute("insert into items (id, col) values ('local', 'data');"); + invokeControl('start', null); + + // Upload changes, assume that triggered a checkpoint. + db.execute('DELETE FROM ps_crud'); + pushCheckpoint(buckets: priorityBuckets, writeCheckpoint: '1'); + pushSyncData('prio1', '1', 'row-0', 'PUT', {'col': 'hi'}); + expect(pushCheckpointComplete(), [ + containsPair('LogLine', { + 'severity': 'INFO', + 'line': contains('Will retry at completed upload') + }) + ]); + + // Second local write during sync + db.execute("insert into items (id, col) values ('local2', 'data2');"); + + // Now the upload is complete and requests a write checkpoint + db.execute(r"UPDATE ps_buckets SET target_op = 1 WHERE name = '$local'"); + expect(invokeControl('completed_upload', null), [ + containsPair('LogLine', { + 'severity': 'WARNING', + 'line': + 'Could not apply pending checkpoint even after completed upload' + }) + ]); + + expect(fetchRows(), [ + {'id': 'local', 'col': 'data'}, + {'id': 'local2', 'col': 'data2'}, + ]); + }); + }); + group('errors', () { syncTest('diff without prior checkpoint', (_) { invokeControl('start', null); diff --git a/dart/test/utils/test_utils.dart b/dart/test/utils/test_utils.dart index d806709..abd3e85 100644 --- a/dart/test/utils/test_utils.dart +++ b/dart/test/utils/test_utils.dart @@ -11,7 +11,7 @@ Object checkpoint({ return { 'checkpoint': { 'last_op_id': '$lastOpId', - 'write_checkpoint': null, + 'write_checkpoint': writeCheckpoint, 'buckets': buckets, 'streams': streams, }