From 69169317000c10c00381990dbbdbb6e5c555f32b Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Wed, 30 Jul 2025 13:32:46 +0200 Subject: [PATCH 1/7] Support BSON streams --- CHANGELOG.md | 2 + .../src/appleMain/kotlin/com/powersync/SDK.kt | 8 - core/build.gradle.kts | 2 - .../com/powersync/sync/AbstractSyncTest.kt | 1 + .../com/powersync/testutils/TestUtils.kt | 1 + .../com/powersync/sync/RSocketSupport.kt | 139 -------------- .../kotlin/com/powersync/sync/SyncOptions.kt | 55 +----- .../kotlin/com/powersync/sync/SyncStream.kt | 169 ++++++++++++------ gradle/libs.versions.toml | 3 - 9 files changed, 121 insertions(+), 259 deletions(-) delete mode 100644 core/src/commonMain/kotlin/com/powersync/sync/RSocketSupport.kt diff --git a/CHANGELOG.md b/CHANGELOG.md index 5f94c135..ef9fb1e4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,8 @@ * Update SQLite to 3.50.3. * Android: Ensure JNI libraries are 16KB-aligned. +* Support receiving binary sync lines over HTTP when the Rust client is enabled. +* Remove the experimental websocket transport mode. ## 1.3.0 diff --git a/PowerSyncKotlin/src/appleMain/kotlin/com/powersync/SDK.kt b/PowerSyncKotlin/src/appleMain/kotlin/com/powersync/SDK.kt index bf1c3105..350c593f 100644 --- a/PowerSyncKotlin/src/appleMain/kotlin/com/powersync/SDK.kt +++ b/PowerSyncKotlin/src/appleMain/kotlin/com/powersync/SDK.kt @@ -2,7 +2,6 @@ package com.powersync -import com.powersync.sync.ConnectionMethod import com.powersync.sync.SyncOptions /** @@ -25,16 +24,9 @@ public fun throwPowerSyncException(exception: PowerSyncException): Unit = throw @OptIn(ExperimentalPowerSyncAPI::class) public fun createSyncOptions( newClient: Boolean, - webSocket: Boolean, userAgent: String, ): SyncOptions = SyncOptions( newClientImplementation = newClient, - method = - if (webSocket) { - ConnectionMethod.WebSocket() - } else { - ConnectionMethod.Http - }, userAgent = userAgent, ) diff --git a/core/build.gradle.kts b/core/build.gradle.kts index 92049353..994baa39 100644 --- a/core/build.gradle.kts +++ b/core/build.gradle.kts @@ -207,8 +207,6 @@ kotlin { implementation(libs.ktor.client.contentnegotiation) implementation(libs.ktor.serialization.json) implementation(libs.kotlinx.io) - implementation(libs.rsocket.core) - implementation(libs.rsocket.transport.websocket) implementation(libs.kotlinx.coroutines.core) implementation(libs.kotlinx.datetime) implementation(libs.stately.concurrency) diff --git a/core/src/commonIntegrationTest/kotlin/com/powersync/sync/AbstractSyncTest.kt b/core/src/commonIntegrationTest/kotlin/com/powersync/sync/AbstractSyncTest.kt index 6e54618a..f4f108d7 100644 --- a/core/src/commonIntegrationTest/kotlin/com/powersync/sync/AbstractSyncTest.kt +++ b/core/src/commonIntegrationTest/kotlin/com/powersync/sync/AbstractSyncTest.kt @@ -8,6 +8,7 @@ import com.powersync.ExperimentalPowerSyncAPI */ abstract class AbstractSyncTest( private val useNewSyncImplementation: Boolean, + protected val useBson: Boolean = false, ) { @OptIn(ExperimentalPowerSyncAPI::class) val options: SyncOptions get() { diff --git a/core/src/commonIntegrationTest/kotlin/com/powersync/testutils/TestUtils.kt b/core/src/commonIntegrationTest/kotlin/com/powersync/testutils/TestUtils.kt index 6faea462..9da520eb 100644 --- a/core/src/commonIntegrationTest/kotlin/com/powersync/testutils/TestUtils.kt +++ b/core/src/commonIntegrationTest/kotlin/com/powersync/testutils/TestUtils.kt @@ -26,6 +26,7 @@ import kotlinx.coroutines.test.TestScope import kotlinx.coroutines.test.runTest import kotlinx.io.files.Path import kotlinx.serialization.json.JsonElement +import kotlinx.serialization.serializer expect val factory: DatabaseDriverFactory diff --git a/core/src/commonMain/kotlin/com/powersync/sync/RSocketSupport.kt b/core/src/commonMain/kotlin/com/powersync/sync/RSocketSupport.kt deleted file mode 100644 index 871d9fe1..00000000 --- a/core/src/commonMain/kotlin/com/powersync/sync/RSocketSupport.kt +++ /dev/null @@ -1,139 +0,0 @@ -package com.powersync.sync - -import com.powersync.ExperimentalPowerSyncAPI -import com.powersync.connectors.PowerSyncCredentials -import com.powersync.utils.JsonUtil -import io.ktor.client.HttpClient -import io.ktor.client.plugins.websocket.webSocketSession -import io.ktor.http.URLBuilder -import io.ktor.http.URLProtocol -import io.ktor.http.takeFrom -import io.rsocket.kotlin.core.RSocketConnector -import io.rsocket.kotlin.payload.PayloadMimeType -import io.rsocket.kotlin.payload.buildPayload -import io.rsocket.kotlin.payload.data -import io.rsocket.kotlin.payload.metadata -import io.rsocket.kotlin.transport.RSocketClientTarget -import io.rsocket.kotlin.transport.RSocketConnection -import io.rsocket.kotlin.transport.RSocketTransportApi -import io.rsocket.kotlin.transport.ktor.websocket.internal.KtorWebSocketConnection -import kotlinx.coroutines.Dispatchers -import kotlinx.coroutines.IO -import kotlinx.coroutines.currentCoroutineContext -import kotlinx.coroutines.flow.Flow -import kotlinx.coroutines.flow.emitAll -import kotlinx.coroutines.flow.flow -import kotlinx.coroutines.flow.flowOn -import kotlinx.coroutines.flow.map -import kotlinx.io.readByteArray -import kotlinx.serialization.SerialName -import kotlinx.serialization.Serializable -import kotlinx.serialization.json.JsonObject -import kotlin.coroutines.CoroutineContext - -/** - * Connects to the RSocket endpoint for receiving sync lines. - * - * Note that we reconstruct the transport layer for RSocket by opening a WebSocket connection - * manually instead of using the high-level RSocket Ktor integration. - * The reason is that every request to the sync service needs its own metadata and data payload - * (e.g. to transmit the token), but the Ktor integration only supports setting a single payload for - * the entire client. - */ -@OptIn(RSocketTransportApi::class, ExperimentalPowerSyncAPI::class) -internal fun HttpClient.rSocketSyncStream( - userAgent: String, - options: ConnectionMethod.WebSocket, - req: JsonObject, - credentials: PowerSyncCredentials, -): Flow = - flow { - val flowContext = currentCoroutineContext() - - val websocketUri = - URLBuilder(credentials.endpointUri("sync/stream")).apply { - protocol = - when (protocolOrNull) { - URLProtocol.HTTP -> URLProtocol.WS - else -> URLProtocol.WSS - } - } - - // Note: We're using a custom connector here because we need to set options for each request - // without creating a new HTTP client each time. The recommended approach would be to add an - // RSocket extension to the HTTP client, but that only allows us to set the SETUP metadata for - // all connections (bad because we need a short-lived token in there). - // https://github.com/rsocket/rsocket-kotlin/issues/311 - val target = - object : RSocketClientTarget { - @RSocketTransportApi - override suspend fun connectClient(): RSocketConnection { - val ws = - webSocketSession { - url.takeFrom(websocketUri) - } - return KtorWebSocketConnection(ws) - } - - override val coroutineContext: CoroutineContext - get() = flowContext - } - - val connector = - RSocketConnector { - connectionConfig { - payloadMimeType = - PayloadMimeType( - metadata = "application/json", - data = "application/json", - ) - - setupPayload { - buildPayload { - data("{}") - metadata( - JsonUtil.json.encodeToString( - ConnectionSetupMetadata( - token = "Bearer ${credentials.token}", - userAgent = userAgent, - ), - ), - ) - } - } - - keepAlive = options.keepAlive.toRSocket() - } - } - - val rSocket = connector.connect(target) - val syncStream = - rSocket.requestStream( - buildPayload { - data(JsonUtil.json.encodeToString(req)) - metadata(JsonUtil.json.encodeToString(RequestStreamMetadata("/sync/stream"))) - }, - ) - - emitAll(syncStream.map { it.data.readByteArray() }.flowOn(Dispatchers.IO)) - } - -/** - * The metadata payload we need to use when connecting with RSocket. - * - * This corresponds to `RSocketContextMeta` on the sync service. - */ -@Serializable -private class ConnectionSetupMetadata( - val token: String, - @SerialName("user_agent") - val userAgent: String, -) - -/** - * The metadata payload we send for the `REQUEST_STREAM` frame. - */ -@Serializable -private class RequestStreamMetadata( - val path: String, -) diff --git a/core/src/commonMain/kotlin/com/powersync/sync/SyncOptions.kt b/core/src/commonMain/kotlin/com/powersync/sync/SyncOptions.kt index cf14d673..aa7afdf8 100644 --- a/core/src/commonMain/kotlin/com/powersync/sync/SyncOptions.kt +++ b/core/src/commonMain/kotlin/com/powersync/sync/SyncOptions.kt @@ -2,9 +2,7 @@ package com.powersync.sync import com.powersync.ExperimentalPowerSyncAPI import com.powersync.PowerSyncDatabase -import io.rsocket.kotlin.keepalive.KeepAlive -import kotlin.time.Duration -import kotlin.time.Duration.Companion.seconds + /** * Experimental options that can be passed to [PowerSyncDatabase.connect] to specify an experimental @@ -19,8 +17,6 @@ public class SyncOptions constructor( @property:ExperimentalPowerSyncAPI public val newClientImplementation: Boolean = false, - @property:ExperimentalPowerSyncAPI - public val method: ConnectionMethod = ConnectionMethod.Http, /** * The user agent to use for requests made to the PowerSync service. */ @@ -38,52 +34,3 @@ public class SyncOptions } } -/** - * The connection method to use when the SDK connects to the sync service. - */ -@ExperimentalPowerSyncAPI -public sealed interface ConnectionMethod { - /** - * Receive sync lines via streamed HTTP response from the sync service. - * - * This mode is less efficient than [WebSocket] because it doesn't support backpressure - * properly and uses JSON instead of the more efficient BSON representation for sync lines. - * - * This is currently the default, but this will be changed once [WebSocket] support is stable. - */ - @ExperimentalPowerSyncAPI - public data object Http : ConnectionMethod - - /** - * Receive binary sync lines via RSocket over a WebSocket connection. - * - * This connection mode is currently experimental and requires a recent sync service to work. - * WebSocket support is only available when enabling the [SyncOptions.newClientImplementation]. - */ - @ExperimentalPowerSyncAPI - public data class WebSocket( - val keepAlive: RSocketKeepAlive = RSocketKeepAlive.default, - ) : ConnectionMethod -} - -/** - * Keep-alive options for long-running RSocket streams: - * - * The client will ping the server every [interval], and assumes the connection to be closed if it - * hasn't received an acknowledgement in [maxLifetime]. - */ -@ExperimentalPowerSyncAPI -public data class RSocketKeepAlive( - val interval: Duration, - val maxLifetime: Duration, -) { - internal fun toRSocket(): KeepAlive = KeepAlive(interval, maxLifetime) - - internal companion object { - val default = - RSocketKeepAlive( - interval = 20.0.seconds, - maxLifetime = 30.0.seconds, - ) - } -} diff --git a/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt b/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt index 826cdd24..bf59e638 100644 --- a/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt +++ b/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt @@ -4,6 +4,7 @@ import co.touchlab.kermit.Logger import co.touchlab.kermit.Severity import co.touchlab.stately.concurrency.AtomicReference import com.powersync.ExperimentalPowerSyncAPI +import com.powersync.PowerSyncException import com.powersync.bucket.BucketChecksum import com.powersync.bucket.BucketRequest import com.powersync.bucket.BucketStorage @@ -27,12 +28,19 @@ import io.ktor.client.request.get import io.ktor.client.request.headers import io.ktor.client.request.preparePost import io.ktor.client.request.setBody +import io.ktor.client.statement.HttpResponse import io.ktor.client.statement.bodyAsText import io.ktor.http.ContentType import io.ktor.http.HttpHeaders import io.ktor.http.HttpStatusCode +import io.ktor.http.append import io.ktor.http.contentType import io.ktor.utils.io.ByteReadChannel +import io.ktor.utils.io.core.readAvailable +import io.ktor.utils.io.readAvailable +import io.ktor.utils.io.readBuffer +import io.ktor.utils.io.readByteArray +import io.ktor.utils.io.readRemaining import io.ktor.utils.io.readUTF8Line import kotlinx.coroutines.CancellationException import kotlinx.coroutines.CompletableDeferred @@ -46,11 +54,16 @@ import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.delay import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.FlowCollector import kotlinx.coroutines.flow.emitAll import kotlinx.coroutines.flow.flow +import kotlinx.coroutines.flow.map import kotlinx.coroutines.launch import kotlinx.coroutines.withContext import kotlinx.datetime.Clock +import kotlinx.io.Buffer +import kotlinx.io.readByteArray +import kotlinx.io.readIntLe import kotlinx.serialization.json.JsonElement import kotlinx.serialization.json.JsonObject import kotlinx.serialization.json.encodeToJsonElement @@ -210,62 +223,71 @@ internal class SyncStream( return body.data.writeCheckpoint } - private fun connectViaHttp(req: JsonElement): Flow = - flow { - val credentials = connector.getCredentialsCached() - require(credentials != null) { "Not logged in" } + private suspend fun connectToSyncEndpoint( + req: JsonElement, + supportBson: Boolean, + block: suspend (isBson: Boolean, response: HttpResponse) -> T + ): T { + val credentials = connector.getCredentialsCached() + require(credentials != null) { "Not logged in" } - val uri = credentials.endpointUri("sync/stream") + val uri = credentials.endpointUri("sync/stream") - val bodyJson = JsonUtil.json.encodeToString(req) + val bodyJson = JsonUtil.json.encodeToString(req) - val request = - httpClient.preparePost(uri) { - contentType(ContentType.Application.Json) - headers { - append(HttpHeaders.Authorization, "Token ${credentials.token}") + val request = + httpClient.preparePost(uri) { + contentType(ContentType.Application.Json) + headers { + append(HttpHeaders.Authorization, "Token ${credentials.token}") + if (supportBson) { + contentType(bsonStream.withParameter("q", "0.9")) + // Also indicate ndjson support as fallback + append(HttpHeaders.ContentType, ndjson.withParameter("q", "0.8")) + } else { + contentType(ndjson) } - timeout { socketTimeoutMillis = Long.MAX_VALUE } - setBody(bodyJson) - } - - request.execute { httpResponse -> - if (httpResponse.status.value == 401) { - connector.invalidateCredentials() } + timeout { socketTimeoutMillis = Long.MAX_VALUE } + setBody(bodyJson) + } - if (httpResponse.status != HttpStatusCode.OK) { - throw RuntimeException("Received error when connecting to sync stream: ${httpResponse.bodyAsText()}") - } + return request.execute { httpResponse -> + val isBson = httpResponse.contentType() == bsonStream - status.update { copy(connected = true, connecting = false) } - val channel: ByteReadChannel = httpResponse.body() + if (httpResponse.status.value == 401) { + connector.invalidateCredentials() + } - while (!channel.isClosedForRead) { - val line = channel.readUTF8Line() - if (line != null) { - emit(line) - } - } + if (httpResponse.status != HttpStatusCode.OK) { + throw RuntimeException("Received error when connecting to sync stream: ${httpResponse.bodyAsText()}") } + + status.update { copy(connected = true, connecting = false) } + block(isBson, httpResponse) } + } - private fun connectViaWebSocket( - req: JsonObject, - options: ConnectionMethod.WebSocket, - ): Flow = + private fun receiveTextLines(req: JsonElement): Flow = flow { - val credentials = requireNotNull(connector.getCredentialsCached()) { "Not logged in" } - - emitAll( - httpClient.rSocketSyncStream( - userAgent = this@SyncStream.options.userAgent, - options = options, - req = req, - credentials = credentials, - ), - ) + connectToSyncEndpoint(req, supportBson = false) { isBson, response -> + check(!isBson) + + emitAll(response.body().lines()) + } + } + + private fun receiveTextOrBinaryLines(req: JsonElement): Flow = flow { + connectToSyncEndpoint(req, supportBson = false) { isBson, response -> + val body = response.body() + + if (isBson) { + emitAll(body.bsonObjects().map { PowerSyncControlArguments.BinaryLine(it) }) + } else { + emitAll(body.lines().map { PowerSyncControlArguments.TextLine(it) }) + } } + } private suspend fun streamingSyncIteration() { coroutineScope { @@ -424,15 +446,8 @@ internal class SyncStream( } private suspend fun connect(start: Instruction.EstablishSyncStream) { - when (val method = options.method) { - ConnectionMethod.Http -> - connectViaHttp(start.request).collect { - controlInvocations.send(PowerSyncControlArguments.TextLine(it)) - } - is ConnectionMethod.WebSocket -> - connectViaWebSocket(start.request, method).collect { - controlInvocations.send(PowerSyncControlArguments.BinaryLine(it)) - } + receiveTextOrBinaryLines(start.request).collect { + controlInvocations.send(it) } } } @@ -471,7 +486,7 @@ internal class SyncStream( lateinit var receiveLines: Job receiveLines = scope.launch { - connectViaHttp(JsonUtil.json.encodeToJsonElement(req)).collect { value -> + receiveTextLines(JsonUtil.json.encodeToJsonElement(req)).collect { value -> val line = JsonUtil.json.decodeFromString(value) state = handleInstruction(line, value, state) @@ -688,10 +703,58 @@ internal class SyncStream( } internal companion object { + private val ndjson = ContentType("application", "x-ndjson") + private val bsonStream = ContentType("application", "vnd.powersync.bson-stream") + fun defaultHttpClient(config: HttpClientConfig<*>.() -> Unit) = HttpClient { config(this) } + + private fun ByteReadChannel.lines(): Flow = flow { + while (!isClosedForRead) { + val line = readUTF8Line() + if (line != null) { + emit(line) + } + } + } + + private fun ByteReadChannel.bsonObjects(): Flow = flow { + while (true) { + emit(readBsonObject() ?: break) + } + } + + private suspend fun ByteReadChannel.readBsonObject(): ByteArray? { + if (isClosedForRead || !awaitContent(4)) { + return null // eof at start of object + } + + return readBuffer(4).use { buffer -> + // 4 byte length prefix, see https://bsonspec.org/spec.html + val length = buffer.peek().readIntLe() + // length is the total size of the frame, including the 4 byte length header + var remaining = length - 4 + + while (remaining > 0) { + val bytesRead = readAvailable(1) { source -> + val available = source.readAtMostTo(buffer, remaining.toLong()) + available.toInt() + } + if (bytesRead == -1) { + // No bytes available, wait for more + if (isClosedForRead || !awaitContent(1)) { + throw PowerSyncException("Unexpected end of response in middle of BSON sync line", null) + } + } else { + remaining -= bytesRead + } + } + + buffer.readByteArray() + } + } } } diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index f0551878..d7282066 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -16,7 +16,6 @@ coroutines = "1.8.1" kotlinx-datetime = "0.6.2" kotlinx-io = "0.5.4" ktor = "3.1.0" -rsocket = "0.20.0" uuid = "0.8.2" powersync-core = "0.4.2" sqlite-jdbc = "3.50.3.0" @@ -87,8 +86,6 @@ ktor-client-contentnegotiation = { module = "io.ktor:ktor-client-content-negotia ktor-client-mock = { module = "io.ktor:ktor-client-mock", version.ref = "ktor" } ktor-serialization-json = { module = "io.ktor:ktor-serialization-kotlinx-json", version.ref = "ktor" } kotlinx-coroutines-core = { module = "org.jetbrains.kotlinx:kotlinx-coroutines-core", version.ref = "coroutines" } -rsocket-core = { module = "io.rsocket.kotlin:rsocket-core", version.ref = "rsocket" } -rsocket-transport-websocket = { module = "io.rsocket.kotlin:rsocket-transport-ktor-websocket-internal", version.ref = "rsocket" } sqldelight-driver-native = { module = "app.cash.sqldelight:native-driver", version.ref = "sqlDelight" } sqliter = { module = "co.touchlab:sqliter-driver", version.ref = "sqliter" } From 756aecd8a042c0653687572e7149b074ddaec538 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Wed, 30 Jul 2025 13:33:41 +0200 Subject: [PATCH 2/7] Fix formatting --- .../com/powersync/testutils/TestUtils.kt | 1 - .../kotlin/com/powersync/sync/SyncOptions.kt | 2 - .../kotlin/com/powersync/sync/SyncStream.kt | 53 ++++++++++--------- 3 files changed, 27 insertions(+), 29 deletions(-) diff --git a/core/src/commonIntegrationTest/kotlin/com/powersync/testutils/TestUtils.kt b/core/src/commonIntegrationTest/kotlin/com/powersync/testutils/TestUtils.kt index 9da520eb..6faea462 100644 --- a/core/src/commonIntegrationTest/kotlin/com/powersync/testutils/TestUtils.kt +++ b/core/src/commonIntegrationTest/kotlin/com/powersync/testutils/TestUtils.kt @@ -26,7 +26,6 @@ import kotlinx.coroutines.test.TestScope import kotlinx.coroutines.test.runTest import kotlinx.io.files.Path import kotlinx.serialization.json.JsonElement -import kotlinx.serialization.serializer expect val factory: DatabaseDriverFactory diff --git a/core/src/commonMain/kotlin/com/powersync/sync/SyncOptions.kt b/core/src/commonMain/kotlin/com/powersync/sync/SyncOptions.kt index aa7afdf8..035482a6 100644 --- a/core/src/commonMain/kotlin/com/powersync/sync/SyncOptions.kt +++ b/core/src/commonMain/kotlin/com/powersync/sync/SyncOptions.kt @@ -3,7 +3,6 @@ package com.powersync.sync import com.powersync.ExperimentalPowerSyncAPI import com.powersync.PowerSyncDatabase - /** * Experimental options that can be passed to [PowerSyncDatabase.connect] to specify an experimental * connection mechanism. @@ -33,4 +32,3 @@ public class SyncOptions public val defaults: SyncOptions = SyncOptions() } } - diff --git a/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt b/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt index bf59e638..aed7e3da 100644 --- a/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt +++ b/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt @@ -40,7 +40,6 @@ import io.ktor.utils.io.core.readAvailable import io.ktor.utils.io.readAvailable import io.ktor.utils.io.readBuffer import io.ktor.utils.io.readByteArray -import io.ktor.utils.io.readRemaining import io.ktor.utils.io.readUTF8Line import kotlinx.coroutines.CancellationException import kotlinx.coroutines.CompletableDeferred @@ -54,14 +53,12 @@ import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.delay import kotlinx.coroutines.flow.Flow -import kotlinx.coroutines.flow.FlowCollector import kotlinx.coroutines.flow.emitAll import kotlinx.coroutines.flow.flow import kotlinx.coroutines.flow.map import kotlinx.coroutines.launch import kotlinx.coroutines.withContext import kotlinx.datetime.Clock -import kotlinx.io.Buffer import kotlinx.io.readByteArray import kotlinx.io.readIntLe import kotlinx.serialization.json.JsonElement @@ -226,7 +223,7 @@ internal class SyncStream( private suspend fun connectToSyncEndpoint( req: JsonElement, supportBson: Boolean, - block: suspend (isBson: Boolean, response: HttpResponse) -> T + block: suspend (isBson: Boolean, response: HttpResponse) -> T, ): T { val credentials = connector.getCredentialsCached() require(credentials != null) { "Not logged in" } @@ -277,17 +274,18 @@ internal class SyncStream( } } - private fun receiveTextOrBinaryLines(req: JsonElement): Flow = flow { - connectToSyncEndpoint(req, supportBson = false) { isBson, response -> - val body = response.body() + private fun receiveTextOrBinaryLines(req: JsonElement): Flow = + flow { + connectToSyncEndpoint(req, supportBson = false) { isBson, response -> + val body = response.body() - if (isBson) { - emitAll(body.bsonObjects().map { PowerSyncControlArguments.BinaryLine(it) }) - } else { - emitAll(body.lines().map { PowerSyncControlArguments.TextLine(it) }) + if (isBson) { + emitAll(body.bsonObjects().map { PowerSyncControlArguments.BinaryLine(it) }) + } else { + emitAll(body.lines().map { PowerSyncControlArguments.TextLine(it) }) + } } } - } private suspend fun streamingSyncIteration() { coroutineScope { @@ -711,20 +709,22 @@ internal class SyncStream( config(this) } - private fun ByteReadChannel.lines(): Flow = flow { - while (!isClosedForRead) { - val line = readUTF8Line() - if (line != null) { - emit(line) + private fun ByteReadChannel.lines(): Flow = + flow { + while (!isClosedForRead) { + val line = readUTF8Line() + if (line != null) { + emit(line) + } } } - } - private fun ByteReadChannel.bsonObjects(): Flow = flow { - while (true) { - emit(readBsonObject() ?: break) + private fun ByteReadChannel.bsonObjects(): Flow = + flow { + while (true) { + emit(readBsonObject() ?: break) + } } - } private suspend fun ByteReadChannel.readBsonObject(): ByteArray? { if (isClosedForRead || !awaitContent(4)) { @@ -738,10 +738,11 @@ internal class SyncStream( var remaining = length - 4 while (remaining > 0) { - val bytesRead = readAvailable(1) { source -> - val available = source.readAtMostTo(buffer, remaining.toLong()) - available.toInt() - } + val bytesRead = + readAvailable(1) { source -> + val available = source.readAtMostTo(buffer, remaining.toLong()) + available.toInt() + } if (bytesRead == -1) { // No bytes available, wait for more if (isClosedForRead || !awaitContent(1)) { From 6340957f8d489be5b16fed2cfbd631965e276059 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Wed, 30 Jul 2025 15:23:08 +0200 Subject: [PATCH 3/7] Add bson test --- .../com/powersync/sync/SyncIntegrationTest.kt | 32 +++++++++++++++++++ .../com/powersync/testutils/TestUtils.kt | 7 ++-- .../kotlin/com/powersync/sync/SyncStream.kt | 2 -- .../powersync/testutils/MockSyncService.kt | 26 ++++++++++++--- 4 files changed, 57 insertions(+), 10 deletions(-) diff --git a/core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncIntegrationTest.kt b/core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncIntegrationTest.kt index 84743ac0..62f72bf2 100644 --- a/core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncIntegrationTest.kt +++ b/core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncIntegrationTest.kt @@ -30,6 +30,7 @@ import io.kotest.matchers.collections.shouldHaveSize import io.kotest.matchers.shouldBe import io.kotest.matchers.shouldNotBe import io.kotest.matchers.string.shouldContain +import io.ktor.http.ContentType import kotlinx.coroutines.CompletableDeferred import kotlinx.coroutines.DelicateCoroutinesApi import kotlinx.coroutines.Dispatchers @@ -859,4 +860,35 @@ class NewSyncIntegrationTest : BaseSyncIntegrationTest(true) { query.cancelAndIgnoreRemainingEvents() } } + + @OptIn(ExperimentalStdlibApi::class) + @Test + fun bson() = databaseTest { + // There's no up-to-date bson library for Kotlin multiplatform, so this test verifies BSON support with byte + // strings created with package:bson in Dart. + syncLinesContentType = ContentType("application", "vnd.powersync.bson-stream") + + turbineScope(timeout = 10.0.seconds) { + val query = + database + .watch("SELECT name FROM users", throttleMs = 0L) { + it.getString(0)!! + }.testIn(this) + query.awaitItem() shouldBe emptyList() + + database.connect(connector, options = options) + + // {checkpoint: {last_op_id: 1, write_checkpoint: null, buckets: [{bucket: a, checksum: 0, priority: 3, count: null}]}} + syncLines.send("8100000003636865636b706f696e740070000000026c6173745f6f705f6964000200000031000a77726974655f636865636b706f696e7400046275636b657473003e00000003300036000000026275636b65740002000000610010636865636b73756d0000000000107072696f7269747900030000000a636f756e740000000000".hexToByteArray()) + + // {data: {bucket: a, data: [{checksum: 0, data: {"name":"username"}, op: PUT, op_id: 1, object_id: u, object_type: users}]}} + syncLines.send("9e00000003646174610093000000026275636b6574000200000061000464617461007a0000000330007200000010636865636b73756d0000000000026461746100140000007b226e616d65223a22757365726e616d65227d00026f70000400000050555400026f705f696400020000003100026f626a6563745f696400020000007500026f626a6563745f74797065000600000075736572730000000000".hexToByteArray()) + + // {checkpoint_complete: {last_op_id: 1}} + syncLines.send("3100000003636865636b706f696e745f636f6d706c6574650017000000026c6173745f6f705f6964000200000031000000".hexToByteArray()) + + query.awaitItem() shouldBe listOf("username") + query.cancelAndIgnoreRemainingEvents() + } + } } diff --git a/core/src/commonIntegrationTest/kotlin/com/powersync/testutils/TestUtils.kt b/core/src/commonIntegrationTest/kotlin/com/powersync/testutils/TestUtils.kt index 6faea462..fd2807ea 100644 --- a/core/src/commonIntegrationTest/kotlin/com/powersync/testutils/TestUtils.kt +++ b/core/src/commonIntegrationTest/kotlin/com/powersync/testutils/TestUtils.kt @@ -16,11 +16,11 @@ import com.powersync.createPowerSyncDatabaseImpl import com.powersync.db.PowerSyncDatabaseImpl import com.powersync.db.schema.Schema import com.powersync.sync.LegacySyncImplementation -import com.powersync.sync.SyncLine import com.powersync.utils.JsonUtil import io.ktor.client.HttpClient import io.ktor.client.HttpClientConfig import io.ktor.client.engine.mock.toByteArray +import io.ktor.http.ContentType import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.test.TestScope import kotlinx.coroutines.test.runTest @@ -84,8 +84,8 @@ internal class ActiveDatabaseTest( ), ) - @OptIn(LegacySyncImplementation::class) - var syncLines = Channel() + var syncLines = Channel() + var syncLinesContentType = ContentType("application", "x-ndjson") var requestedSyncStreams = mutableListOf() var checkpointResponse: () -> WriteCheckpointResponse = { WriteCheckpointResponse(WriteCheckpointData("1000")) @@ -124,6 +124,7 @@ internal class ActiveDatabaseTest( MockSyncService( lines = syncLines, generateCheckpoint = { checkpointResponse() }, + syncLinesContentType = { syncLinesContentType }, trackSyncRequest = { val parsed = JsonUtil.json.parseToJsonElement(it.body.toByteArray().decodeToString()) requestedSyncStreams.add(parsed) diff --git a/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt b/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt index aed7e3da..165a6e33 100644 --- a/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt +++ b/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt @@ -36,10 +36,8 @@ import io.ktor.http.HttpStatusCode import io.ktor.http.append import io.ktor.http.contentType import io.ktor.utils.io.ByteReadChannel -import io.ktor.utils.io.core.readAvailable import io.ktor.utils.io.readAvailable import io.ktor.utils.io.readBuffer -import io.ktor.utils.io.readByteArray import io.ktor.utils.io.readUTF8Line import kotlinx.coroutines.CancellationException import kotlinx.coroutines.CompletableDeferred diff --git a/core/src/commonTest/kotlin/com/powersync/testutils/MockSyncService.kt b/core/src/commonTest/kotlin/com/powersync/testutils/MockSyncService.kt index bc564e97..99865d9a 100644 --- a/core/src/commonTest/kotlin/com/powersync/testutils/MockSyncService.kt +++ b/core/src/commonTest/kotlin/com/powersync/testutils/MockSyncService.kt @@ -13,18 +13,20 @@ import io.ktor.client.engine.callContext import io.ktor.client.plugins.HttpTimeoutCapability import io.ktor.client.request.HttpRequestData import io.ktor.client.request.HttpResponseData +import io.ktor.http.ContentType +import io.ktor.http.HttpHeaders import io.ktor.http.HttpProtocolVersion import io.ktor.http.HttpStatusCode import io.ktor.http.headersOf import io.ktor.util.date.GMTDate import io.ktor.utils.io.InternalAPI import io.ktor.utils.io.awaitFreeSpace +import io.ktor.utils.io.writeByteArray import io.ktor.utils.io.writeStringUtf8 import io.ktor.utils.io.writer import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.channels.ReceiveChannel import kotlinx.coroutines.channels.consume -import kotlinx.serialization.encodeToString /** * A mock HTTP engine providing sync lines read from a coroutines [ReceiveChannel]. @@ -35,9 +37,11 @@ import kotlinx.serialization.encodeToString */ @OptIn(LegacySyncImplementation::class) internal class MockSyncService( - private val lines: ReceiveChannel, + private val lines: ReceiveChannel, + private val syncLinesContentType: () -> ContentType, private val generateCheckpoint: () -> WriteCheckpointResponse, private val trackSyncRequest: suspend (HttpRequestData) -> Unit, + ) : HttpClientEngineBase("sync-service") { override val config: HttpClientEngineConfig get() = Config @@ -61,8 +65,20 @@ internal class MockSyncService( // Wait for a downstream listener being ready before requesting a sync line channel.awaitFreeSpace() val line = receive() - val serializedLine = JsonUtil.json.encodeToString(line) - channel.writeStringUtf8("$serializedLine\n") + when (line) { + is SyncLine -> { + val serializedLine = JsonUtil.json.encodeToString(line) + channel.writeStringUtf8("$serializedLine\n") + } + is ByteArray -> { + channel.writeByteArray(line) + } + is String -> { + channel.writeStringUtf8("$line\n") + } + else -> throw UnsupportedOperationException("Unknown sync line type") + } + channel.flush() } } @@ -71,7 +87,7 @@ internal class MockSyncService( HttpResponseData( HttpStatusCode.OK, GMTDate(), - headersOf(), + headersOf(HttpHeaders.ContentType, syncLinesContentType().toString()), HttpProtocolVersion.HTTP_1_1, job.channel, context, From 569a16d6c72990f1f75feb42f1f3642a3b674dbb Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Wed, 30 Jul 2025 15:50:20 +0200 Subject: [PATCH 4/7] Add unit tests for bson split --- .../com/powersync/sync/SyncIntegrationTest.kt | 53 +++++++------ .../kotlin/com/powersync/sync/SyncStream.kt | 11 ++- .../com/powersync/sync/SyncStreamTest.kt | 76 +++++++++++++++++++ .../powersync/testutils/MockSyncService.kt | 1 - .../kotlin/com/powersync/demos/Auth.kt | 2 - 5 files changed, 115 insertions(+), 28 deletions(-) diff --git a/core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncIntegrationTest.kt b/core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncIntegrationTest.kt index 62f72bf2..1807bd22 100644 --- a/core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncIntegrationTest.kt +++ b/core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncIntegrationTest.kt @@ -863,32 +863,41 @@ class NewSyncIntegrationTest : BaseSyncIntegrationTest(true) { @OptIn(ExperimentalStdlibApi::class) @Test - fun bson() = databaseTest { - // There's no up-to-date bson library for Kotlin multiplatform, so this test verifies BSON support with byte - // strings created with package:bson in Dart. - syncLinesContentType = ContentType("application", "vnd.powersync.bson-stream") - - turbineScope(timeout = 10.0.seconds) { - val query = - database - .watch("SELECT name FROM users", throttleMs = 0L) { - it.getString(0)!! - }.testIn(this) - query.awaitItem() shouldBe emptyList() + fun bson() = + databaseTest { + // There's no up-to-date bson library for Kotlin multiplatform, so this test verifies BSON support with byte + // strings created with package:bson in Dart. + syncLinesContentType = ContentType("application", "vnd.powersync.bson-stream") - database.connect(connector, options = options) + turbineScope(timeout = 10.0.seconds) { + val query = + database + .watch("SELECT name FROM users", throttleMs = 0L) { + it.getString(0)!! + }.testIn(this) + query.awaitItem() shouldBe emptyList() - // {checkpoint: {last_op_id: 1, write_checkpoint: null, buckets: [{bucket: a, checksum: 0, priority: 3, count: null}]}} - syncLines.send("8100000003636865636b706f696e740070000000026c6173745f6f705f6964000200000031000a77726974655f636865636b706f696e7400046275636b657473003e00000003300036000000026275636b65740002000000610010636865636b73756d0000000000107072696f7269747900030000000a636f756e740000000000".hexToByteArray()) + database.connect(connector, options = options) - // {data: {bucket: a, data: [{checksum: 0, data: {"name":"username"}, op: PUT, op_id: 1, object_id: u, object_type: users}]}} - syncLines.send("9e00000003646174610093000000026275636b6574000200000061000464617461007a0000000330007200000010636865636b73756d0000000000026461746100140000007b226e616d65223a22757365726e616d65227d00026f70000400000050555400026f705f696400020000003100026f626a6563745f696400020000007500026f626a6563745f74797065000600000075736572730000000000".hexToByteArray()) + // {checkpoint: {last_op_id: 1, write_checkpoint: null, buckets: [{bucket: a, checksum: 0, priority: 3, count: null}]}} + syncLines.send( + "8100000003636865636b706f696e740070000000026c6173745f6f705f6964000200000031000a77726974655f636865636b706f696e7400046275636b657473003e00000003300036000000026275636b65740002000000610010636865636b73756d0000000000107072696f7269747900030000000a636f756e740000000000" + .hexToByteArray(), + ) - // {checkpoint_complete: {last_op_id: 1}} - syncLines.send("3100000003636865636b706f696e745f636f6d706c6574650017000000026c6173745f6f705f6964000200000031000000".hexToByteArray()) + // {data: {bucket: a, data: [{checksum: 0, data: {"name":"username"}, op: PUT, op_id: 1, object_id: u, object_type: users}]}} + syncLines.send( + "9e00000003646174610093000000026275636b6574000200000061000464617461007a0000000330007200000010636865636b73756d0000000000026461746100140000007b226e616d65223a22757365726e616d65227d00026f70000400000050555400026f705f696400020000003100026f626a6563745f696400020000007500026f626a6563745f74797065000600000075736572730000000000" + .hexToByteArray(), + ) + + // {checkpoint_complete: {last_op_id: 1}} + syncLines.send( + "3100000003636865636b706f696e745f636f6d706c6574650017000000026c6173745f6f705f6964000200000031000000".hexToByteArray(), + ) - query.awaitItem() shouldBe listOf("username") - query.cancelAndIgnoreRemainingEvents() + query.awaitItem() shouldBe listOf("username") + query.cancelAndIgnoreRemainingEvents() + } } - } } diff --git a/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt b/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt index 165a6e33..d4f0a7bc 100644 --- a/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt +++ b/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt @@ -707,7 +707,7 @@ internal class SyncStream( config(this) } - private fun ByteReadChannel.lines(): Flow = + fun ByteReadChannel.lines(): Flow = flow { while (!isClosedForRead) { val line = readUTF8Line() @@ -717,7 +717,7 @@ internal class SyncStream( } } - private fun ByteReadChannel.bsonObjects(): Flow = + fun ByteReadChannel.bsonObjects(): Flow = flow { while (true) { emit(readBsonObject() ?: break) @@ -725,13 +725,18 @@ internal class SyncStream( } private suspend fun ByteReadChannel.readBsonObject(): ByteArray? { - if (isClosedForRead || !awaitContent(4)) { + if (isClosedForRead || !awaitContent(1)) { return null // eof at start of object } return readBuffer(4).use { buffer -> // 4 byte length prefix, see https://bsonspec.org/spec.html val length = buffer.peek().readIntLe() + if (length < 5) { + // At the very least we need the 4 byte length and a zero terminator + throw PowerSyncException("Invalid BSON message, to small", null) + } + // length is the total size of the frame, including the 4 byte length header var remaining = length - 4 diff --git a/core/src/commonTest/kotlin/com/powersync/sync/SyncStreamTest.kt b/core/src/commonTest/kotlin/com/powersync/sync/SyncStreamTest.kt index 1e903f8b..5c95935b 100644 --- a/core/src/commonTest/kotlin/com/powersync/sync/SyncStreamTest.kt +++ b/core/src/commonTest/kotlin/com/powersync/sync/SyncStreamTest.kt @@ -1,5 +1,6 @@ package com.powersync.sync +import app.cash.turbine.turbineScope import co.touchlab.kermit.ExperimentalKermitApi import co.touchlab.kermit.Logger import co.touchlab.kermit.Severity @@ -12,12 +13,16 @@ import com.powersync.connectors.PowerSyncBackendConnector import com.powersync.db.crud.CrudEntry import com.powersync.db.crud.UpdateType import com.powersync.db.schema.Schema +import com.powersync.sync.SyncStream.Companion.bsonObjects import dev.mokkery.answering.returns import dev.mokkery.everySuspend import dev.mokkery.mock +import io.kotest.matchers.collections.shouldHaveSize import io.kotest.matchers.shouldBe import io.ktor.client.HttpClient import io.ktor.client.engine.mock.MockEngine +import io.ktor.utils.io.ByteChannel +import io.ktor.utils.io.writeByteArray import kotlinx.coroutines.delay import kotlinx.coroutines.launch import kotlinx.coroutines.test.runTest @@ -175,4 +180,75 @@ class SyncStreamTest { // Clean up job.cancel() } + + @Test + fun splitBsonObjects() = + runTest { + turbineScope { + val channel = ByteChannel() + val objects = channel.bsonObjects().testIn(this) + + channel.writeByteArray(byteArrayOf(5, 0, 0, 0, 1)) + channel.flush() + objects.awaitItem() shouldHaveSize 5 + + channel.writeByteArray(byteArrayOf(6, 0)) + channel.flush() + channel.writeByteArray(byteArrayOf(0, 0)) + channel.flush() + channel.writeByteArray(byteArrayOf(0, 0)) + channel.flush() + objects.awaitItem() shouldHaveSize 6 + + channel.close() + objects.awaitComplete() + } + } + + @Test + fun invalidBsonSize() = + runTest { + turbineScope { + val channel = ByteChannel() + val objects = channel.bsonObjects().testIn(this) + + channel.writeByteArray(byteArrayOf(3, 0, 0, 0)) + channel.flush() + + objects.awaitError() + objects.cancelAndIgnoreRemainingEvents() + } + } + + @Test + fun invalidEndInLength() = + runTest { + turbineScope { + val channel = ByteChannel() + val objects = channel.bsonObjects().testIn(this) + + channel.writeByteArray(byteArrayOf(5, 0)) + channel.flush() + channel.close() + + // Still two bytes missing for length + objects.awaitError() + } + } + + @Test + fun invalidEndInObject() = + runTest { + turbineScope { + val channel = ByteChannel() + val objects = channel.bsonObjects().testIn(this) + + channel.writeByteArray(byteArrayOf(6, 0, 0, 0)) + channel.flush() + channel.close() + + // Still two bytes missing for content + objects.awaitError() + } + } } diff --git a/core/src/commonTest/kotlin/com/powersync/testutils/MockSyncService.kt b/core/src/commonTest/kotlin/com/powersync/testutils/MockSyncService.kt index 99865d9a..48cf0dfd 100644 --- a/core/src/commonTest/kotlin/com/powersync/testutils/MockSyncService.kt +++ b/core/src/commonTest/kotlin/com/powersync/testutils/MockSyncService.kt @@ -41,7 +41,6 @@ internal class MockSyncService( private val syncLinesContentType: () -> ContentType, private val generateCheckpoint: () -> WriteCheckpointResponse, private val trackSyncRequest: suspend (HttpRequestData) -> Unit, - ) : HttpClientEngineBase("sync-service") { override val config: HttpClientEngineConfig get() = Config diff --git a/demos/supabase-todolist/shared/src/commonMain/kotlin/com/powersync/demos/Auth.kt b/demos/supabase-todolist/shared/src/commonMain/kotlin/com/powersync/demos/Auth.kt index a8ec2292..4eaa4e06 100644 --- a/demos/supabase-todolist/shared/src/commonMain/kotlin/com/powersync/demos/Auth.kt +++ b/demos/supabase-todolist/shared/src/commonMain/kotlin/com/powersync/demos/Auth.kt @@ -6,7 +6,6 @@ import co.touchlab.kermit.Logger import com.powersync.ExperimentalPowerSyncAPI import com.powersync.PowerSyncDatabase import com.powersync.connector.supabase.SupabaseConnector -import com.powersync.sync.ConnectionMethod import com.powersync.sync.SyncOptions import io.github.jan.supabase.auth.status.RefreshFailureCause import io.github.jan.supabase.auth.status.SessionStatus @@ -50,7 +49,6 @@ internal class AuthViewModel( is SessionStatus.Authenticated -> { db.connect(supabase, options = SyncOptions( newClientImplementation = true, - method = ConnectionMethod.WebSocket(), )) } is SessionStatus.NotAuthenticated -> { From 3fe21d92b88fe542bdf2136b6bb55130f955c908 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Wed, 30 Jul 2025 17:32:35 +0200 Subject: [PATCH 5/7] Add socket timeout --- .../kotlin/com/powersync/sync/SyncStream.kt | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt b/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt index d4f0a7bc..db034eb1 100644 --- a/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt +++ b/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt @@ -62,6 +62,8 @@ import kotlinx.io.readIntLe import kotlinx.serialization.json.JsonElement import kotlinx.serialization.json.JsonObject import kotlinx.serialization.json.encodeToJsonElement +import kotlin.time.Duration +import kotlin.time.Duration.Companion.seconds @OptIn(ExperimentalPowerSyncAPI::class) internal class SyncStream( @@ -88,7 +90,10 @@ internal class SyncStream( private val httpClient: HttpClient = createClient { - install(HttpTimeout) + install(HttpTimeout) { + socketTimeoutMillis = SOCKET_TIMEOUT + } + install(ContentNegotiation) install(WebSockets) @@ -699,6 +704,10 @@ internal class SyncStream( } internal companion object { + // The sync service sends a token keepalive message roughly every 20 seconds. So if we don't receive a message + // in twice that time, assume the connection is broken. + private const val SOCKET_TIMEOUT: Long = 40_000 + private val ndjson = ContentType("application", "x-ndjson") private val bsonStream = ContentType("application", "vnd.powersync.bson-stream") From 075f9be1ed0a91fd28b0f441336dd050d5685c9c Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Wed, 30 Jul 2025 17:49:31 +0200 Subject: [PATCH 6/7] Set timeout --- .../commonMain/kotlin/com/powersync/sync/SyncStream.kt | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt b/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt index db034eb1..fd8ac1ad 100644 --- a/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt +++ b/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt @@ -22,8 +22,8 @@ import io.ktor.client.call.body import io.ktor.client.plugins.DefaultRequest import io.ktor.client.plugins.HttpTimeout import io.ktor.client.plugins.contentnegotiation.ContentNegotiation -import io.ktor.client.plugins.timeout import io.ktor.client.plugins.websocket.WebSockets +import io.ktor.client.request.accept import io.ktor.client.request.get import io.ktor.client.request.headers import io.ktor.client.request.preparePost @@ -241,14 +241,13 @@ internal class SyncStream( headers { append(HttpHeaders.Authorization, "Token ${credentials.token}") if (supportBson) { - contentType(bsonStream.withParameter("q", "0.9")) + accept(bsonStream.withParameter("q", "0.9")) // Also indicate ndjson support as fallback - append(HttpHeaders.ContentType, ndjson.withParameter("q", "0.8")) + append(HttpHeaders.Accept, ndjson.withParameter("q", "0.8")) } else { - contentType(ndjson) + accept(ndjson) } } - timeout { socketTimeoutMillis = Long.MAX_VALUE } setBody(bodyJson) } From e3669dbbe9121ba035f26d439daae6e6c05bf1d5 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Wed, 30 Jul 2025 17:58:36 +0200 Subject: [PATCH 7/7] Reformat --- core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt | 2 -- 1 file changed, 2 deletions(-) diff --git a/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt b/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt index fd8ac1ad..581b147f 100644 --- a/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt +++ b/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt @@ -62,8 +62,6 @@ import kotlinx.io.readIntLe import kotlinx.serialization.json.JsonElement import kotlinx.serialization.json.JsonObject import kotlinx.serialization.json.encodeToJsonElement -import kotlin.time.Duration -import kotlin.time.Duration.Companion.seconds @OptIn(ExperimentalPowerSyncAPI::class) internal class SyncStream(