Skip to content

Commit 25b9c55

Browse files
authored
Rust client: Request sync lines as BSON (#312)
1 parent 599189c commit 25b9c55

File tree

5 files changed

+183
-30
lines changed

5 files changed

+183
-30
lines changed

packages/powersync_core/lib/src/sync/stream_utils.dart

Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,12 @@
11
import 'dart:async';
22

33
import 'dart:convert' as convert;
4+
import 'dart:math';
5+
import 'dart:typed_data';
6+
7+
import 'package:typed_data/typed_buffers.dart';
8+
9+
import '../exceptions.dart';
410

511
/// Inject a broadcast stream into a normal stream.
612
Stream<T> addBroadcast<T>(Stream<T> a, Stream<T> broadcast) {
@@ -75,6 +81,11 @@ extension ByteStreamToLines on Stream<List<int>> {
7581
final textInput = transform(convert.utf8.decoder);
7682
return textInput.transform(const convert.LineSplitter());
7783
}
84+
85+
/// Splits this stream into BSON documents without parsing them.
86+
Stream<Uint8List> get bsonDocuments {
87+
return Stream.eventTransformed(this, _BsonSplittingSink.new);
88+
}
7889
}
7990

8091
extension StreamToJson on Stream<String> {
@@ -99,3 +110,91 @@ Future<void> cancelAll(List<StreamSubscription<void>> subscriptions) async {
99110
final futures = subscriptions.map((sub) => sub.cancel());
100111
await Future.wait(futures);
101112
}
113+
114+
/// An [EventSink] that takes raw bytes as inputs, buffers them internally by
115+
/// reading a 4-byte length prefix for each message and then emits them as
116+
/// chunks.
117+
final class _BsonSplittingSink implements EventSink<List<int>> {
118+
final EventSink<Uint8List> _downstream;
119+
120+
final length = ByteData(4);
121+
int remainingBytes = 4;
122+
123+
Uint8Buffer? pendingBuffer;
124+
125+
_BsonSplittingSink(this._downstream);
126+
127+
@override
128+
void add(List<int> data) {
129+
var i = 0;
130+
while (i < data.length) {
131+
final availableInData = data.length - i;
132+
133+
if (pendingBuffer case final pending?) {
134+
// We're in the middle of reading a document
135+
final bytesToRead = min(availableInData, remainingBytes);
136+
pending.addAll(data, i, i + bytesToRead);
137+
i += bytesToRead;
138+
remainingBytes -= bytesToRead;
139+
assert(remainingBytes >= 0);
140+
141+
if (remainingBytes == 0) {
142+
_downstream.add(pending.buffer
143+
.asUint8List(pending.offsetInBytes, pending.lengthInBytes));
144+
145+
// Prepare reading another document, starting with its length
146+
pendingBuffer = null;
147+
remainingBytes = 4;
148+
}
149+
} else {
150+
final bytesToRead = min(availableInData, remainingBytes);
151+
final lengthAsUint8List = length.buffer.asUint8List();
152+
153+
lengthAsUint8List.setRange(
154+
4 - remainingBytes,
155+
4 - remainingBytes + bytesToRead,
156+
data,
157+
i,
158+
);
159+
i += bytesToRead;
160+
remainingBytes -= bytesToRead;
161+
assert(remainingBytes >= 0);
162+
163+
if (remainingBytes == 0) {
164+
// Transition from reading length header to reading document.
165+
// Subtracting 4 because the length of the header is included in the
166+
// length.
167+
remainingBytes = length.getInt32(0, Endian.little) - 4;
168+
if (remainingBytes < 5) {
169+
_downstream.addError(
170+
PowerSyncProtocolException(
171+
'Invalid length for bson: $remainingBytes'),
172+
StackTrace.current,
173+
);
174+
}
175+
176+
pendingBuffer = Uint8Buffer()..addAll(lengthAsUint8List);
177+
}
178+
}
179+
}
180+
181+
assert(i == data.length);
182+
}
183+
184+
@override
185+
void addError(Object error, [StackTrace? stackTrace]) {
186+
_downstream.addError(error, stackTrace);
187+
}
188+
189+
@override
190+
void close() {
191+
if (pendingBuffer != null || remainingBytes != 4) {
192+
_downstream.addError(
193+
PowerSyncProtocolException('Pending data when stream was closed'),
194+
StackTrace.current,
195+
);
196+
}
197+
198+
_downstream.close();
199+
}
200+
}

packages/powersync_core/lib/src/sync/streaming_sync.dart

Lines changed: 24 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -506,7 +506,11 @@ class StreamingSyncImplementation implements StreamingSync {
506506
}
507507
}
508508

509-
Future<http.StreamedResponse?> _postStreamRequest(Object? data) async {
509+
Future<http.StreamedResponse?> _postStreamRequest(
510+
Object? data, bool acceptBson) async {
511+
const ndJson = 'application/x-ndjson';
512+
const bson = 'application/vnd.powersync.bson-stream';
513+
510514
final credentials = await connector.getCredentialsCached();
511515
if (credentials == null) {
512516
throw CredentialsException('Not logged in');
@@ -516,6 +520,8 @@ class StreamingSyncImplementation implements StreamingSync {
516520
final request = http.Request('POST', uri);
517521
request.headers['Content-Type'] = 'application/json';
518522
request.headers['Authorization'] = "Token ${credentials.token}";
523+
request.headers['Accept'] =
524+
acceptBson ? '$bson;q=0.9,$ndJson;q=0.8' : ndJson;
519525
request.headers.addAll(_userAgentHeaders);
520526

521527
request.body = convert.jsonEncode(data);
@@ -535,18 +541,13 @@ class StreamingSyncImplementation implements StreamingSync {
535541
return res;
536542
}
537543

538-
Stream<String> _rawStreamingSyncRequest(Object? data) async* {
539-
final response = await _postStreamRequest(data);
540-
if (response != null) {
541-
yield* response.stream.lines;
542-
}
543-
}
544-
545544
Stream<StreamingSyncLine> _streamingSyncRequest(StreamingSyncRequest data) {
546-
return _rawStreamingSyncRequest(data)
547-
.parseJson
548-
.cast<Map<String, dynamic>>()
549-
.transform(StreamingSyncLine.reader);
545+
return Stream.fromFuture(_postStreamRequest(data, false))
546+
.asyncExpand((response) {
547+
return response?.stream.lines.parseJson
548+
.cast<Map<String, dynamic>>()
549+
.transform(StreamingSyncLine.reader);
550+
});
550551
}
551552

552553
/// Delays the standard `retryDelay` Duration, but exits early if
@@ -614,7 +615,17 @@ final class _ActiveRustStreamingIteration {
614615
}
615616

616617
Stream<ReceivedLine> _receiveLines(Object? data) {
617-
return sync._rawStreamingSyncRequest(data).map(ReceivedLine.new);
618+
return Stream.fromFuture(sync._postStreamRequest(data, true))
619+
.asyncExpand<Object /* Uint8List | String */ >((response) {
620+
if (response == null) {
621+
return null;
622+
} else {
623+
final contentType = response.headers['content-type'];
624+
final isBson = contentType == 'application/vnd.powersync.bson-stream';
625+
626+
return isBson ? response.stream.bsonDocuments : response.stream.lines;
627+
}
628+
}).map(ReceivedLine.new);
618629
}
619630

620631
Future<void> _handleLines(EstablishSyncStream request) async {

packages/powersync_core/pubspec.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ dependencies:
2828
pub_semver: ^2.0.0
2929
pubspec_parse: ^1.3.0
3030
path: ^1.8.0
31+
typed_data: ^1.4.0
3132

3233
dev_dependencies:
3334
lints: ^5.1.1
@@ -38,6 +39,7 @@ dev_dependencies:
3839
shelf_static: ^1.1.2
3940
stream_channel: ^2.1.2
4041
fake_async: ^1.3.3
42+
bson: ^5.0.7
4143

4244
platforms:
4345
android:

packages/powersync_core/test/in_memory_sync_test.dart

Lines changed: 24 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -19,20 +19,33 @@ void main() {
1919
_declareTests(
2020
'dart sync client',
2121
SyncOptions(
22-
// ignore: deprecated_member_use_from_same_package
23-
syncImplementation: SyncClientImplementation.dart,
24-
retryDelay: Duration(milliseconds: 200)),
22+
// ignore: deprecated_member_use_from_same_package
23+
syncImplementation: SyncClientImplementation.dart,
24+
retryDelay: Duration(milliseconds: 200),
25+
),
26+
false,
2527
);
2628

27-
_declareTests(
28-
'rust sync client',
29-
SyncOptions(
30-
syncImplementation: SyncClientImplementation.rust,
31-
retryDelay: Duration(milliseconds: 200)),
32-
);
29+
group('rust sync client', () {
30+
_declareTests(
31+
'json',
32+
SyncOptions(
33+
syncImplementation: SyncClientImplementation.rust,
34+
retryDelay: Duration(milliseconds: 200)),
35+
false,
36+
);
37+
38+
_declareTests(
39+
'bson',
40+
SyncOptions(
41+
syncImplementation: SyncClientImplementation.rust,
42+
retryDelay: Duration(milliseconds: 200)),
43+
true,
44+
);
45+
});
3346
}
3447

35-
void _declareTests(String name, SyncOptions options) {
48+
void _declareTests(String name, SyncOptions options, bool bson) {
3649
final ignoredLogger = Logger.detached('powersync.test')..level = Level.OFF;
3750

3851
group(name, () {
@@ -74,7 +87,7 @@ void _declareTests(String name, SyncOptions options) {
7487
setUp(() async {
7588
logger = Logger.detached('powersync.active')..level = Level.ALL;
7689
credentialsCallbackCount = 0;
77-
syncService = MockSyncService();
90+
syncService = MockSyncService(useBson: bson);
7891

7992
factory = await testUtils.testFactory();
8093
(raw, database) = await factory.openInMemoryDatabase();

packages/powersync_core/test/server/sync_server/in_memory_sync_server.dart

Lines changed: 34 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,17 @@
11
import 'dart:async';
22
import 'dart:convert';
3+
import 'dart:typed_data';
34

5+
import 'package:bson/bson.dart';
46
import 'package:shelf/shelf.dart';
57
import 'package:shelf_router/shelf_router.dart';
68

79
final class MockSyncService {
10+
final bool useBson;
11+
812
// Use a queued stream to make tests easier.
9-
StreamController<String> controller = StreamController();
13+
StreamController<Object /* String | Uint8List */ > controller =
14+
StreamController();
1015
Completer<Request> _listener = Completer();
1116

1217
final router = Router();
@@ -16,13 +21,29 @@ final class MockSyncService {
1621
};
1722
};
1823

19-
MockSyncService() {
24+
MockSyncService({this.useBson = false}) {
2025
router
2126
..post('/sync/stream', (Request request) async {
27+
if (useBson &&
28+
!request.headers['Accept']!
29+
.contains('application/vnd.powersync.bson-stream')) {
30+
throw "Want to serve bson, but client doesn't accept it";
31+
}
32+
2233
_listener.complete(request);
2334
// Respond immediately with a stream
24-
return Response.ok(controller.stream.transform(utf8.encoder), headers: {
25-
'Content-Type': 'application/x-ndjson',
35+
final bytes = controller.stream.map((line) {
36+
return switch (line) {
37+
final String line => utf8.encode(line),
38+
final Uint8List line => line,
39+
_ => throw ArgumentError.value(line, 'line', 'Unexpected type'),
40+
};
41+
});
42+
43+
return Response.ok(bytes, headers: {
44+
'Content-Type': useBson
45+
? 'application/vnd.powersync.bson-stream'
46+
: 'application/x-ndjson',
2647
'Cache-Control': 'no-cache',
2748
'Connection': 'keep-alive',
2849
}, context: {
@@ -39,12 +60,19 @@ final class MockSyncService {
3960
Future<Request> get waitForListener => _listener.future;
4061

4162
// Queue events which will be sent to connected clients.
42-
void addRawEvent(String data) {
63+
void addRawEvent(Object data) {
4364
controller.add(data);
4465
}
4566

4667
void addLine(Object? message) {
47-
addRawEvent('${json.encode(message)}\n');
68+
if (useBson) {
69+
// Going through a JSON roundtrip ensures that the message can be
70+
// serialized with the BSON package.
71+
final cleanedMessage = json.decode(json.encode(message));
72+
addRawEvent(BsonCodec.serialize(cleanedMessage).byteList);
73+
} else {
74+
addRawEvent('${json.encode(message)}\n');
75+
}
4876
}
4977

5078
void addKeepAlive([int tokenExpiresIn = 3600]) {

0 commit comments

Comments
 (0)