diff --git a/packages/powersync_core/lib/src/sync/stream_utils.dart b/packages/powersync_core/lib/src/sync/stream_utils.dart index a4689cf9..0c28eebf 100644 --- a/packages/powersync_core/lib/src/sync/stream_utils.dart +++ b/packages/powersync_core/lib/src/sync/stream_utils.dart @@ -1,6 +1,12 @@ import 'dart:async'; import 'dart:convert' as convert; +import 'dart:math'; +import 'dart:typed_data'; + +import 'package:typed_data/typed_buffers.dart'; + +import '../exceptions.dart'; /// Inject a broadcast stream into a normal stream. Stream addBroadcast(Stream a, Stream broadcast) { @@ -75,6 +81,11 @@ extension ByteStreamToLines on Stream> { final textInput = transform(convert.utf8.decoder); return textInput.transform(const convert.LineSplitter()); } + + /// Splits this stream into BSON documents without parsing them. + Stream get bsonDocuments { + return Stream.eventTransformed(this, _BsonSplittingSink.new); + } } extension StreamToJson on Stream { @@ -99,3 +110,91 @@ Future cancelAll(List> subscriptions) async { final futures = subscriptions.map((sub) => sub.cancel()); await Future.wait(futures); } + +/// An [EventSink] that takes raw bytes as inputs, buffers them internally by +/// reading a 4-byte length prefix for each message and then emits them as +/// chunks. +final class _BsonSplittingSink implements EventSink> { + final EventSink _downstream; + + final length = ByteData(4); + int remainingBytes = 4; + + Uint8Buffer? pendingBuffer; + + _BsonSplittingSink(this._downstream); + + @override + void add(List data) { + var i = 0; + while (i < data.length) { + final availableInData = data.length - i; + + if (pendingBuffer case final pending?) { + // We're in the middle of reading a document + final bytesToRead = min(availableInData, remainingBytes); + pending.addAll(data, i, i + bytesToRead); + i += bytesToRead; + remainingBytes -= bytesToRead; + assert(remainingBytes >= 0); + + if (remainingBytes == 0) { + _downstream.add(pending.buffer + .asUint8List(pending.offsetInBytes, pending.lengthInBytes)); + + // Prepare reading another document, starting with its length + pendingBuffer = null; + remainingBytes = 4; + } + } else { + final bytesToRead = min(availableInData, remainingBytes); + final lengthAsUint8List = length.buffer.asUint8List(); + + lengthAsUint8List.setRange( + 4 - remainingBytes, + 4 - remainingBytes + bytesToRead, + data, + i, + ); + i += bytesToRead; + remainingBytes -= bytesToRead; + assert(remainingBytes >= 0); + + if (remainingBytes == 0) { + // Transition from reading length header to reading document. + // Subtracting 4 because the length of the header is included in the + // length. + remainingBytes = length.getInt32(0, Endian.little) - 4; + if (remainingBytes < 5) { + _downstream.addError( + PowerSyncProtocolException( + 'Invalid length for bson: $remainingBytes'), + StackTrace.current, + ); + } + + pendingBuffer = Uint8Buffer()..addAll(lengthAsUint8List); + } + } + } + + assert(i == data.length); + } + + @override + void addError(Object error, [StackTrace? stackTrace]) { + _downstream.addError(error, stackTrace); + } + + @override + void close() { + if (pendingBuffer != null || remainingBytes != 4) { + _downstream.addError( + PowerSyncProtocolException('Pending data when stream was closed'), + StackTrace.current, + ); + } + + _downstream.close(); + } +} diff --git a/packages/powersync_core/lib/src/sync/streaming_sync.dart b/packages/powersync_core/lib/src/sync/streaming_sync.dart index bbdcb85a..88125ce3 100644 --- a/packages/powersync_core/lib/src/sync/streaming_sync.dart +++ b/packages/powersync_core/lib/src/sync/streaming_sync.dart @@ -506,7 +506,11 @@ class StreamingSyncImplementation implements StreamingSync { } } - Future _postStreamRequest(Object? data) async { + Future _postStreamRequest( + Object? data, bool acceptBson) async { + const ndJson = 'application/x-ndjson'; + const bson = 'application/vnd.powersync.bson-stream'; + final credentials = await connector.getCredentialsCached(); if (credentials == null) { throw CredentialsException('Not logged in'); @@ -516,6 +520,8 @@ class StreamingSyncImplementation implements StreamingSync { final request = http.Request('POST', uri); request.headers['Content-Type'] = 'application/json'; request.headers['Authorization'] = "Token ${credentials.token}"; + request.headers['Accept'] = + acceptBson ? '$bson;q=0.9,$ndJson;q=0.8' : ndJson; request.headers.addAll(_userAgentHeaders); request.body = convert.jsonEncode(data); @@ -535,18 +541,13 @@ class StreamingSyncImplementation implements StreamingSync { return res; } - Stream _rawStreamingSyncRequest(Object? data) async* { - final response = await _postStreamRequest(data); - if (response != null) { - yield* response.stream.lines; - } - } - Stream _streamingSyncRequest(StreamingSyncRequest data) { - return _rawStreamingSyncRequest(data) - .parseJson - .cast>() - .transform(StreamingSyncLine.reader); + return Stream.fromFuture(_postStreamRequest(data, false)) + .asyncExpand((response) { + return response?.stream.lines.parseJson + .cast>() + .transform(StreamingSyncLine.reader); + }); } /// Delays the standard `retryDelay` Duration, but exits early if @@ -614,7 +615,17 @@ final class _ActiveRustStreamingIteration { } Stream _receiveLines(Object? data) { - return sync._rawStreamingSyncRequest(data).map(ReceivedLine.new); + return Stream.fromFuture(sync._postStreamRequest(data, true)) + .asyncExpand((response) { + if (response == null) { + return null; + } else { + final contentType = response.headers['content-type']; + final isBson = contentType == 'application/vnd.powersync.bson-stream'; + + return isBson ? response.stream.bsonDocuments : response.stream.lines; + } + }).map(ReceivedLine.new); } Future _handleLines(EstablishSyncStream request) async { diff --git a/packages/powersync_core/pubspec.yaml b/packages/powersync_core/pubspec.yaml index f82d1c9a..266f9f46 100644 --- a/packages/powersync_core/pubspec.yaml +++ b/packages/powersync_core/pubspec.yaml @@ -28,6 +28,7 @@ dependencies: pub_semver: ^2.0.0 pubspec_parse: ^1.3.0 path: ^1.8.0 + typed_data: ^1.4.0 dev_dependencies: lints: ^5.1.1 @@ -38,6 +39,7 @@ dev_dependencies: shelf_static: ^1.1.2 stream_channel: ^2.1.2 fake_async: ^1.3.3 + bson: ^5.0.7 platforms: android: diff --git a/packages/powersync_core/test/in_memory_sync_test.dart b/packages/powersync_core/test/in_memory_sync_test.dart index 1819795d..57e36b0c 100644 --- a/packages/powersync_core/test/in_memory_sync_test.dart +++ b/packages/powersync_core/test/in_memory_sync_test.dart @@ -19,20 +19,33 @@ void main() { _declareTests( 'dart sync client', SyncOptions( - // ignore: deprecated_member_use_from_same_package - syncImplementation: SyncClientImplementation.dart, - retryDelay: Duration(milliseconds: 200)), + // ignore: deprecated_member_use_from_same_package + syncImplementation: SyncClientImplementation.dart, + retryDelay: Duration(milliseconds: 200), + ), + false, ); - _declareTests( - 'rust sync client', - SyncOptions( - syncImplementation: SyncClientImplementation.rust, - retryDelay: Duration(milliseconds: 200)), - ); + group('rust sync client', () { + _declareTests( + 'json', + SyncOptions( + syncImplementation: SyncClientImplementation.rust, + retryDelay: Duration(milliseconds: 200)), + false, + ); + + _declareTests( + 'bson', + SyncOptions( + syncImplementation: SyncClientImplementation.rust, + retryDelay: Duration(milliseconds: 200)), + true, + ); + }); } -void _declareTests(String name, SyncOptions options) { +void _declareTests(String name, SyncOptions options, bool bson) { final ignoredLogger = Logger.detached('powersync.test')..level = Level.OFF; group(name, () { @@ -74,7 +87,7 @@ void _declareTests(String name, SyncOptions options) { setUp(() async { logger = Logger.detached('powersync.active')..level = Level.ALL; credentialsCallbackCount = 0; - syncService = MockSyncService(); + syncService = MockSyncService(useBson: bson); factory = await testUtils.testFactory(); (raw, database) = await factory.openInMemoryDatabase(); diff --git a/packages/powersync_core/test/server/sync_server/in_memory_sync_server.dart b/packages/powersync_core/test/server/sync_server/in_memory_sync_server.dart index 6bea9454..19d7e137 100644 --- a/packages/powersync_core/test/server/sync_server/in_memory_sync_server.dart +++ b/packages/powersync_core/test/server/sync_server/in_memory_sync_server.dart @@ -1,12 +1,17 @@ import 'dart:async'; import 'dart:convert'; +import 'dart:typed_data'; +import 'package:bson/bson.dart'; import 'package:shelf/shelf.dart'; import 'package:shelf_router/shelf_router.dart'; final class MockSyncService { + final bool useBson; + // Use a queued stream to make tests easier. - StreamController controller = StreamController(); + StreamController controller = + StreamController(); Completer _listener = Completer(); final router = Router(); @@ -16,13 +21,29 @@ final class MockSyncService { }; }; - MockSyncService() { + MockSyncService({this.useBson = false}) { router ..post('/sync/stream', (Request request) async { + if (useBson && + !request.headers['Accept']! + .contains('application/vnd.powersync.bson-stream')) { + throw "Want to serve bson, but client doesn't accept it"; + } + _listener.complete(request); // Respond immediately with a stream - return Response.ok(controller.stream.transform(utf8.encoder), headers: { - 'Content-Type': 'application/x-ndjson', + final bytes = controller.stream.map((line) { + return switch (line) { + final String line => utf8.encode(line), + final Uint8List line => line, + _ => throw ArgumentError.value(line, 'line', 'Unexpected type'), + }; + }); + + return Response.ok(bytes, headers: { + 'Content-Type': useBson + ? 'application/vnd.powersync.bson-stream' + : 'application/x-ndjson', 'Cache-Control': 'no-cache', 'Connection': 'keep-alive', }, context: { @@ -39,12 +60,19 @@ final class MockSyncService { Future get waitForListener => _listener.future; // Queue events which will be sent to connected clients. - void addRawEvent(String data) { + void addRawEvent(Object data) { controller.add(data); } void addLine(Object? message) { - addRawEvent('${json.encode(message)}\n'); + if (useBson) { + // Going through a JSON roundtrip ensures that the message can be + // serialized with the BSON package. + final cleanedMessage = json.decode(json.encode(message)); + addRawEvent(BsonCodec.serialize(cleanedMessage).byteList); + } else { + addRawEvent('${json.encode(message)}\n'); + } } void addKeepAlive([int tokenExpiresIn = 3600]) {