Skip to content

Rust client: Request sync lines as BSON #312

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jul 31, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 99 additions & 0 deletions packages/powersync_core/lib/src/sync/stream_utils.dart
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
import 'dart:async';

import 'dart:convert' as convert;
import 'dart:math';
import 'dart:typed_data';

import 'package:typed_data/typed_buffers.dart';

import '../exceptions.dart';

/// Inject a broadcast stream into a normal stream.
Stream<T> addBroadcast<T>(Stream<T> a, Stream<T> broadcast) {
Expand Down Expand Up @@ -75,6 +81,11 @@ extension ByteStreamToLines on Stream<List<int>> {
final textInput = transform(convert.utf8.decoder);
return textInput.transform(const convert.LineSplitter());
}

/// Splits this stream into BSON documents without parsing them.
Stream<Uint8List> get bsonDocuments {
return Stream.eventTransformed(this, _BsonSplittingSink.new);
}
}

extension StreamToJson on Stream<String> {
Expand All @@ -99,3 +110,91 @@ Future<void> cancelAll(List<StreamSubscription<void>> subscriptions) async {
final futures = subscriptions.map((sub) => sub.cancel());
await Future.wait(futures);
}

/// An [EventSink] that takes raw bytes as inputs, buffers them internally by
/// reading a 4-byte length prefix for each message and then emits them as
/// chunks.
final class _BsonSplittingSink implements EventSink<List<int>> {
final EventSink<Uint8List> _downstream;

final length = ByteData(4);
int remainingBytes = 4;

Uint8Buffer? pendingBuffer;

_BsonSplittingSink(this._downstream);

@override
void add(List<int> data) {
var i = 0;
while (i < data.length) {
final availableInData = data.length - i;

if (pendingBuffer case final pending?) {
// We're in the middle of reading a document
final bytesToRead = min(availableInData, remainingBytes);
pending.addAll(data, i, i + bytesToRead);
i += bytesToRead;
remainingBytes -= bytesToRead;
assert(remainingBytes >= 0);

if (remainingBytes == 0) {
_downstream.add(pending.buffer
.asUint8List(pending.offsetInBytes, pending.lengthInBytes));

// Prepare reading another document, starting with its length
pendingBuffer = null;
remainingBytes = 4;
}
} else {
final bytesToRead = min(availableInData, remainingBytes);
final lengthAsUint8List = length.buffer.asUint8List();

lengthAsUint8List.setRange(
4 - remainingBytes,
4 - remainingBytes + bytesToRead,
data,
i,
);
i += bytesToRead;
remainingBytes -= bytesToRead;
assert(remainingBytes >= 0);

if (remainingBytes == 0) {
// Transition from reading length header to reading document.
// Subtracting 4 because the length of the header is included in the
// length.
remainingBytes = length.getInt32(0, Endian.little) - 4;
if (remainingBytes < 5) {
_downstream.addError(
PowerSyncProtocolException(
'Invalid length for bson: $remainingBytes'),
StackTrace.current,
);
}

pendingBuffer = Uint8Buffer()..addAll(lengthAsUint8List);
}
}
}

assert(i == data.length);
}

@override
void addError(Object error, [StackTrace? stackTrace]) {
_downstream.addError(error, stackTrace);
}

@override
void close() {
if (pendingBuffer != null || remainingBytes != 4) {
_downstream.addError(
PowerSyncProtocolException('Pending data when stream was closed'),
StackTrace.current,
);
}

_downstream.close();
}
}
37 changes: 24 additions & 13 deletions packages/powersync_core/lib/src/sync/streaming_sync.dart
Original file line number Diff line number Diff line change
Expand Up @@ -506,7 +506,11 @@ class StreamingSyncImplementation implements StreamingSync {
}
}

Future<http.StreamedResponse?> _postStreamRequest(Object? data) async {
Future<http.StreamedResponse?> _postStreamRequest(
Object? data, bool acceptBson) async {
const ndJson = 'application/x-ndjson';
const bson = 'application/vnd.powersync.bson-stream';

final credentials = await connector.getCredentialsCached();
if (credentials == null) {
throw CredentialsException('Not logged in');
Expand All @@ -516,6 +520,8 @@ class StreamingSyncImplementation implements StreamingSync {
final request = http.Request('POST', uri);
request.headers['Content-Type'] = 'application/json';
request.headers['Authorization'] = "Token ${credentials.token}";
request.headers['Accept'] =
acceptBson ? '$bson;q=0.9,$ndJson;q=0.8' : ndJson;
request.headers.addAll(_userAgentHeaders);

request.body = convert.jsonEncode(data);
Expand All @@ -535,18 +541,13 @@ class StreamingSyncImplementation implements StreamingSync {
return res;
}

Stream<String> _rawStreamingSyncRequest(Object? data) async* {
final response = await _postStreamRequest(data);
if (response != null) {
yield* response.stream.lines;
}
}

Stream<StreamingSyncLine> _streamingSyncRequest(StreamingSyncRequest data) {
return _rawStreamingSyncRequest(data)
.parseJson
.cast<Map<String, dynamic>>()
.transform(StreamingSyncLine.reader);
return Stream.fromFuture(_postStreamRequest(data, false))
.asyncExpand((response) {
return response?.stream.lines.parseJson
.cast<Map<String, dynamic>>()
.transform(StreamingSyncLine.reader);
});
}

/// Delays the standard `retryDelay` Duration, but exits early if
Expand Down Expand Up @@ -614,7 +615,17 @@ final class _ActiveRustStreamingIteration {
}

Stream<ReceivedLine> _receiveLines(Object? data) {
return sync._rawStreamingSyncRequest(data).map(ReceivedLine.new);
return Stream.fromFuture(sync._postStreamRequest(data, true))
.asyncExpand<Object /* Uint8List | String */ >((response) {
if (response == null) {
return null;
} else {
final contentType = response.headers['content-type'];
final isBson = contentType == 'application/vnd.powersync.bson-stream';

return isBson ? response.stream.bsonDocuments : response.stream.lines;
}
}).map(ReceivedLine.new);
}

Future<void> _handleLines(EstablishSyncStream request) async {
Expand Down
2 changes: 2 additions & 0 deletions packages/powersync_core/pubspec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ dependencies:
pub_semver: ^2.0.0
pubspec_parse: ^1.3.0
path: ^1.8.0
typed_data: ^1.4.0

dev_dependencies:
lints: ^5.1.1
Expand All @@ -38,6 +39,7 @@ dev_dependencies:
shelf_static: ^1.1.2
stream_channel: ^2.1.2
fake_async: ^1.3.3
bson: ^5.0.7

platforms:
android:
Expand Down
35 changes: 24 additions & 11 deletions packages/powersync_core/test/in_memory_sync_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,33 @@ void main() {
_declareTests(
'dart sync client',
SyncOptions(
// ignore: deprecated_member_use_from_same_package
syncImplementation: SyncClientImplementation.dart,
retryDelay: Duration(milliseconds: 200)),
// ignore: deprecated_member_use_from_same_package
syncImplementation: SyncClientImplementation.dart,
retryDelay: Duration(milliseconds: 200),
),
false,
);

_declareTests(
'rust sync client',
SyncOptions(
syncImplementation: SyncClientImplementation.rust,
retryDelay: Duration(milliseconds: 200)),
);
group('rust sync client', () {
_declareTests(
'json',
SyncOptions(
syncImplementation: SyncClientImplementation.rust,
retryDelay: Duration(milliseconds: 200)),
false,
);

_declareTests(
'bson',
SyncOptions(
syncImplementation: SyncClientImplementation.rust,
retryDelay: Duration(milliseconds: 200)),
true,
);
});
}

void _declareTests(String name, SyncOptions options) {
void _declareTests(String name, SyncOptions options, bool bson) {
final ignoredLogger = Logger.detached('powersync.test')..level = Level.OFF;

group(name, () {
Expand Down Expand Up @@ -74,7 +87,7 @@ void _declareTests(String name, SyncOptions options) {
setUp(() async {
logger = Logger.detached('powersync.active')..level = Level.ALL;
credentialsCallbackCount = 0;
syncService = MockSyncService();
syncService = MockSyncService(useBson: bson);

factory = await testUtils.testFactory();
(raw, database) = await factory.openInMemoryDatabase();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
import 'dart:async';
import 'dart:convert';
import 'dart:typed_data';

import 'package:bson/bson.dart';
import 'package:shelf/shelf.dart';
import 'package:shelf_router/shelf_router.dart';

final class MockSyncService {
final bool useBson;

// Use a queued stream to make tests easier.
StreamController<String> controller = StreamController();
StreamController<Object /* String | Uint8List */ > controller =
StreamController();
Completer<Request> _listener = Completer();

final router = Router();
Expand All @@ -16,13 +21,29 @@ final class MockSyncService {
};
};

MockSyncService() {
MockSyncService({this.useBson = false}) {
router
..post('/sync/stream', (Request request) async {
if (useBson &&
!request.headers['Accept']!
.contains('application/vnd.powersync.bson-stream')) {
throw "Want to serve bson, but client doesn't accept it";
}

_listener.complete(request);
// Respond immediately with a stream
return Response.ok(controller.stream.transform(utf8.encoder), headers: {
'Content-Type': 'application/x-ndjson',
final bytes = controller.stream.map((line) {
return switch (line) {
final String line => utf8.encode(line),
final Uint8List line => line,
_ => throw ArgumentError.value(line, 'line', 'Unexpected type'),
};
});

return Response.ok(bytes, headers: {
'Content-Type': useBson
? 'application/vnd.powersync.bson-stream'
: 'application/x-ndjson',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
}, context: {
Expand All @@ -39,12 +60,19 @@ final class MockSyncService {
Future<Request> get waitForListener => _listener.future;

// Queue events which will be sent to connected clients.
void addRawEvent(String data) {
void addRawEvent(Object data) {
controller.add(data);
}

void addLine(Object? message) {
addRawEvent('${json.encode(message)}\n');
if (useBson) {
// Going through a JSON roundtrip ensures that the message can be
// serialized with the BSON package.
final cleanedMessage = json.decode(json.encode(message));
addRawEvent(BsonCodec.serialize(cleanedMessage).byteList);
} else {
addRawEvent('${json.encode(message)}\n');
}
}

void addKeepAlive([int tokenExpiresIn = 3600]) {
Expand Down