diff --git a/CHANGELOG.md b/CHANGELOG.md index 14118952..bfc7f2bc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,10 @@ # Changelog +## 1.4.0 (unreleased) + +* Add `getNextCrudTransactionBatch` method to `PowerSyncDatabase` which allows for fetching a batch of CRUD operations to upload. + This is useful for uploading multiple transactions in a single batch. + ## 1.3.0 * Support tables created outside of PowerSync with the `RawTable` API. @@ -304,4 +309,4 @@ params = params * Replaced default Logger with [Kermit Logger](https://kermit.touchlab.co/) which allows users to more easily use and/or change Logger settings * Add `retryDelay` and `crudThrottle` options when setting up database connection -* Changed `_viewNameOverride` to `viewNameOverride` +* Changed `_viewNameOverride` to `viewNameOverride` \ No newline at end of file diff --git a/core/src/commonIntegrationTest/kotlin/com/powersync/DatabaseTest.kt b/core/src/commonIntegrationTest/kotlin/com/powersync/DatabaseTest.kt index 3e5f5f19..73054ff2 100644 --- a/core/src/commonIntegrationTest/kotlin/com/powersync/DatabaseTest.kt +++ b/core/src/commonIntegrationTest/kotlin/com/powersync/DatabaseTest.kt @@ -22,6 +22,7 @@ import kotlinx.coroutines.withContext import kotlin.test.Test import kotlin.test.assertEquals import kotlin.test.assertNotNull +import kotlin.time.TimeSource @OptIn(ExperimentalKermitApi::class) class DatabaseTest { @@ -459,4 +460,357 @@ class DatabaseTest { database.getCrudBatch() shouldBe null } + + @Test + fun testCrudTransactionBatch() = + databaseTest { + // Create a single insert (transaction 1) + database.execute( + "INSERT INTO users (id, name, email) VALUES (uuid(), ?, ?)", + listOf("a", "a@example.org"), + ) + + // Create a transaction with 2 inserts (transaction 2) + database.writeTransaction { + it.execute( + "INSERT INTO users (id, name, email) VALUES (uuid(), ?, ?)", + listOf("b", "b@example.org"), + ) + it.execute( + "INSERT INTO users (id, name, email) VALUES (uuid(), ?, ?)", + listOf("c", "c@example.org"), + ) + } + + // Create another single insert (transaction 3) + database.execute( + "INSERT INTO users (id, name, email) VALUES (uuid(), ?, ?)", + listOf("d", "d@example.org"), + ) + + // Create another transaction with 3 inserts (transaction 4) + database.writeTransaction { + it.execute( + "INSERT INTO users (id, name, email) VALUES (uuid(), ?, ?)", + listOf("e", "e@example.org"), + ) + it.execute( + "INSERT INTO users (id, name, email) VALUES (uuid(), ?, ?)", + listOf("f", "f@example.org"), + ) + it.execute( + "INSERT INTO users (id, name, email) VALUES (uuid(), ?, ?)", + listOf("g", "g@example.org"), + ) + } + + // Test with item limit of 3 - should get first transaction (1 item) + second transaction (2 items) + var batch = database.getNextCrudTransactionBatch(limit = 3) ?: error("Batch should not be null") + batch.hasMore shouldBe true + batch.crud shouldHaveSize 3 // 1 entry from transaction 1 + 2 entries from transaction 2 + batch.complete(null) + + // Test with item limit of 2 - should get third transaction (1 item) only + batch = database.getNextCrudTransactionBatch(limit = 2) ?: error("Batch should not be null") + batch.hasMore shouldBe true + batch.crud shouldHaveSize 1 // 1 entry from transaction 3 + batch.complete(null) + + // Test with no limit - should get remaining transactions + batch = database.getNextCrudTransactionBatch() ?: error("Batch should not be null") + batch.hasMore shouldBe false + batch.crud shouldHaveSize 3 // 3 entries from transaction 4 + batch.complete(null) + + // Should be no more transactions + database.getNextCrudTransactionBatch() shouldBe null + } + + @Test + fun testCrudTransactionBatchWithNullTxId() = + databaseTest { + // Create operations without transactions (NULL tx_id) + database.execute( + "INSERT INTO users (id, name, email) VALUES (uuid(), ?, ?)", + listOf("a", "a@example.org"), + ) + database.execute( + "INSERT INTO users (id, name, email) VALUES (uuid(), ?, ?)", + listOf("b", "b@example.org"), + ) + database.execute( + "INSERT INTO users (id, name, email) VALUES (uuid(), ?, ?)", + listOf("c", "c@example.org"), + ) + + // Each NULL tx_id operation should be treated as its own transaction + var batch = database.getNextCrudTransactionBatch(limit = 2) ?: error("Batch should not be null") + batch.hasMore shouldBe true + batch.crud shouldHaveSize 2 // 2 individual transactions + batch.complete(null) + + // Get the remaining transaction + batch = database.getNextCrudTransactionBatch() ?: error("Batch should not be null") + batch.hasMore shouldBe false + batch.crud shouldHaveSize 1 // 1 remaining transaction + batch.complete(null) + + database.getNextCrudTransactionBatch() shouldBe null + } + + @Test + fun testCrudTransactionBatchLargeTransaction() = + databaseTest { + // Create a large transaction with many operations + database.writeTransaction { + repeat(10) { i -> + it.execute( + "INSERT INTO users (id, name, email) VALUES (uuid(), ?, ?)", + listOf("user$i", "user$i@example.org"), + ) + } + } + + // Add a single operation + database.execute( + "INSERT INTO users (id, name, email) VALUES (uuid(), ?, ?)", + listOf("single", "single@example.org"), + ) + + // Should get entire large transaction (10 operations) - at least one transaction rule + var batch = database.getNextCrudTransactionBatch(limit = 5) ?: error("Batch should not be null") + batch.hasMore shouldBe true + batch.crud shouldHaveSize 10 + batch.complete(null) + + // Should get the single operation + batch = database.getNextCrudTransactionBatch() ?: error("Batch should not be null") + batch.hasMore shouldBe false + batch.crud shouldHaveSize 1 + batch.complete(null) + + database.getNextCrudTransactionBatch() shouldBe null + } + + @Test + fun testCrudTransactionBatchOrdering() = + databaseTest { + // Create operations in a specific order to test ordering + database.execute( + "INSERT INTO users (id, name, email) VALUES (uuid(), ?, ?)", + listOf("first", "first@example.org"), + ) + + database.writeTransaction { + it.execute( + "INSERT INTO users (id, name, email) VALUES (uuid(), ?, ?)", + listOf("second_a", "second_a@example.org"), + ) + it.execute( + "INSERT INTO users (id, name, email) VALUES (uuid(), ?, ?)", + listOf("second_b", "second_b@example.org"), + ) + } + + database.execute( + "INSERT INTO users (id, name, email) VALUES (uuid(), ?, ?)", + listOf("third", "third@example.org"), + ) + + // Operations should be processed in order + val batch = database.getNextCrudTransactionBatch() ?: error("Batch should not be null") + batch.hasMore shouldBe false + batch.crud shouldHaveSize 4 + + // Verify order by checking operation data + val operations = batch.crud + operations[0].opData!!["name"] shouldBe "first" + operations[1].opData!!["name"] shouldBe "second_a" + operations[2].opData!!["name"] shouldBe "second_b" + operations[3].opData!!["name"] shouldBe "third" + + batch.complete(null) + database.getNextCrudTransactionBatch() shouldBe null + } + + @Test + fun testCrudTransactionBatchEmptyDatabase() = + databaseTest { + val batch = database.getNextCrudTransactionBatch() + batch shouldBe null + } + + @Test + fun testCrudTransactionBatchZerolimit() = + databaseTest { + // Create some operations + database.execute( + "INSERT INTO users (id, name, email) VALUES (uuid(), ?, ?)", + listOf("a", "a@example.org"), + ) + + // Item limit of 0 should return null even if operations exist + val batch = database.getNextCrudTransactionBatch(limit = 0) + batch shouldBe null + } + + @Test + fun testCrudTransactionBatchGroupsByTransaction() = + databaseTest { + // Create a transaction with 3 operations + database.writeTransaction { + it.execute( + "INSERT INTO users (id, name, email) VALUES (uuid(), ?, ?)", + listOf("tx1_op1", "tx1_op1@example.org"), + ) + it.execute( + "INSERT INTO users (id, name, email) VALUES (uuid(), ?, ?)", + listOf("tx1_op2", "tx1_op2@example.org"), + ) + it.execute( + "INSERT INTO users (id, name, email) VALUES (uuid(), ?, ?)", + listOf("tx1_op3", "tx1_op3@example.org"), + ) + } + + // Create a single operation (NULL tx_id) + database.execute( + "INSERT INTO users (id, name, email) VALUES (uuid(), ?, ?)", + listOf("single", "single@example.org"), + ) + + // Request with no limit - should get all 4 operations (3 from tx + 1 single) + val batch = database.getNextCrudTransactionBatch() ?: error("Batch should not be null") + batch.hasMore shouldBe false + batch.crud shouldHaveSize 4 + batch.complete(null) + + database.getNextCrudTransactionBatch() shouldBe null + } + + @Test + fun testCrudTransactionBatchWithlimit() = + databaseTest { + // Create a transaction with 5 operations + database.writeTransaction { + repeat(5) { i -> + it.execute( + "INSERT INTO users (id, name, email) VALUES (uuid(), ?, ?)", + listOf("user$i", "user$i@example.org"), + ) + } + } + + // Create another transaction with 3 operations + database.writeTransaction { + repeat(3) { i -> + it.execute( + "INSERT INTO users (id, name, email) VALUES (uuid(), ?, ?)", + listOf("user2_$i", "user2_$i@example.org"), + ) + } + } + + // Add a single operation (NULL tx_id) + database.execute( + "INSERT INTO users (id, name, email) VALUES (uuid(), ?, ?)", + listOf("single", "single@example.org"), + ) + + // Test with item limit of 6 - should get first transaction (5 items) only + var batch = database.getNextCrudTransactionBatch(limit = 6) ?: error("Batch should not be null") + batch.hasMore shouldBe true + batch.crud shouldHaveSize 5 + batch.complete(null) + + // Test with item limit of 4 - should get second transaction (3 items) + single operation (1 item) + batch = database.getNextCrudTransactionBatch(limit = 4) ?: error("Batch should not be null") + batch.hasMore shouldBe false + batch.crud shouldHaveSize 4 + batch.complete(null) + + database.getNextCrudTransactionBatch() shouldBe null + } + + @Test + fun testCrudTransactionBatchlimitReturnsAtLeastOneTransaction() = + databaseTest { + // Create a transaction with 10 operations + database.writeTransaction { + repeat(10) { i -> + it.execute( + "INSERT INTO users (id, name, email) VALUES (uuid(), ?, ?)", + listOf("user$i", "user$i@example.org"), + ) + } + } + + // With item limit of 5, should return entire transaction (10 items) - at least one transaction rule, even if it exceeds the limit + var batch = database.getNextCrudTransactionBatch(limit = 5) ?: error("Batch should not be null") + batch.hasMore shouldBe false + batch.crud shouldHaveSize 10 + batch.complete(null) + + database.getNextCrudTransactionBatch() shouldBe null + } + + @Test + fun testCrudTransactionBatchPerformanceBenchmark() = + databaseTest { + // Create a large number of transactions with varying sizes + val totalOperations = 100_000 + val transactionSizes = listOf(1, 3, 5, 10, 20, 50, 100, 200, 500, 1000) + var operationCount = 0 + + while (operationCount < totalOperations) { + val size = transactionSizes.random() + val remainingOps = totalOperations - operationCount + val actualSize = minOf(size, remainingOps) + + if (actualSize == 1) { + // Single operation (NULL tx_id) + database.execute( + "INSERT INTO users (id, name, email) VALUES (uuid(), ?, ?)", + listOf("user$operationCount", "user$operationCount@example.org"), + ) + } else { + // Transaction with multiple operations + database.writeTransaction { + repeat(actualSize) { i -> + it.execute( + "INSERT INTO users (id, name, email) VALUES (uuid(), ?, ?)", + listOf("user${operationCount + i}", "user${operationCount + i}@example.org"), + ) + } + } + } + operationCount += actualSize + } + + val startTime = + kotlin.time.TimeSource.Monotonic + .markNow() + var totalBatches = 0 + var totalOperationsProcessed = 0 + + while (true) { + val batch = database.getNextCrudTransactionBatch(limit = 100) + if (batch == null) break + + totalBatches++ + totalOperationsProcessed += batch.crud.size + batch.complete(null) + } + + val elapsedTime = startTime.elapsedNow() + + totalOperationsProcessed shouldBe totalOperations + + println("Benchmark Results:") + println("Total operations: $totalOperations") + println("Total batches: $totalBatches") + println("Average operations per batch: ${totalOperationsProcessed.toDouble() / totalBatches}") + println("Processing time: ${elapsedTime.inWholeMilliseconds}ms") + println("Operations per second: ${(totalOperations * 1000.0 / elapsedTime.inWholeMilliseconds).toInt()}") + } } diff --git a/core/src/commonMain/kotlin/com/powersync/PowerSyncDatabase.kt b/core/src/commonMain/kotlin/com/powersync/PowerSyncDatabase.kt index d587932d..18f3f22a 100644 --- a/core/src/commonMain/kotlin/com/powersync/PowerSyncDatabase.kt +++ b/core/src/commonMain/kotlin/com/powersync/PowerSyncDatabase.kt @@ -134,6 +134,27 @@ public interface PowerSyncDatabase : Queries { @Throws(PowerSyncException::class, CancellationException::class) public suspend fun getNextCrudTransaction(): CrudTransaction? + /** + * Get a batch of crud data from multiple transactions to upload. + * + * Returns null if there is no data to upload. + * + * Use this from the [PowerSyncBackendConnector.uploadData]` callback. + * + * Once the data have been successfully uploaded, call [CrudBatch.complete] before + * requesting the next batch. + * + * Unlike [getCrudBatch], this groups data by transaction, allowing developers to + * upload multiple complete transactions in a single batch operation. + * + * @param limit The maximum number of crud items to include in the batch. + * If null, no item limit is applied. Returns the maximum number of complete + * transactions that fit within the item limit, but always returns at least + * one complete transaction even if its number of operations exceeds the limit. + */ + @Throws(PowerSyncException::class, CancellationException::class) + public suspend fun getNextCrudTransactionBatch(limit: Int? = null): CrudBatch? + /** * Convenience method to get the current version of PowerSync. */ diff --git a/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt b/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt index 51880ee1..36b72ca0 100644 --- a/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt +++ b/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt @@ -309,6 +309,87 @@ internal class PowerSyncDatabaseImpl( } } + override suspend fun getNextCrudTransactionBatch(limit: Int?): CrudBatch? { + waitReady() + + if (limit == 0) { + return null + } + + return internalDb.readTransaction { transaction -> + val result = + transaction.getAll( + """ + WITH all_operations AS ( + -- Compute transaction group once and reuse throughout + SELECT + id, + tx_id, + data, + COALESCE(tx_id, id) as transaction_group + FROM ps_crud + ), + transaction_groups AS ( + SELECT + transaction_group, + MIN(id) as first_operation_id, + COUNT(*) as operation_count + FROM all_operations + GROUP BY transaction_group + ), + transaction_with_running_totals AS ( + SELECT + transaction_group, + first_operation_id, + operation_count, + ROW_NUMBER() OVER (ORDER BY first_operation_id) as transaction_rank, + SUM(operation_count) OVER (ORDER BY first_operation_id ROWS UNBOUNDED PRECEDING) as running_total + FROM transaction_groups + ), + selected_transactions AS ( + SELECT transaction_group + FROM transaction_with_running_totals + WHERE ? IS NULL OR running_total <= ? OR transaction_rank = 1 + ) + SELECT ao.id, ao.tx_id, ao.data + FROM all_operations ao + INNER JOIN selected_transactions st ON ao.transaction_group = st.transaction_group + ORDER BY ao.id + """, + listOf(limit?.toLong(), limit?.toLong()), + ) { cursor -> + CrudEntry.fromRow( + CrudRow( + id = cursor.getString("id"), + data = cursor.getString("data"), + txId = cursor.getLongOptional("tx_id")?.toInt(), + ), + ) + } + + if (result.isEmpty()) { + return@readTransaction null + } + + val maxOperationId = result.maxOfOrNull { it.clientId } ?: 0 + + val hasMore = + transaction.get( + "SELECT EXISTS(SELECT 1 FROM ps_crud WHERE id > ? LIMIT 1)", + listOf(maxOperationId.toLong()), + ) { it.getLong(0)!! } > 0 + + return@readTransaction CrudBatch( + crud = result, + hasMore = hasMore, + complete = { writeCheckpoint -> + logger.i { "[CrudTransactionBatch::complete] Completing batch with checkpoint $writeCheckpoint" } + handleWriteCheckpoint(maxOperationId, writeCheckpoint) + }, + ) + } + } + override suspend fun getPowerSyncVersion(): String { // The initialization sets powerSyncVersion. waitReady() diff --git a/persistence/src/commonMain/sqldelight/com/persistence/Powersync.sq b/persistence/src/commonMain/sqldelight/com/persistence/Powersync.sq index da30fd6b..38019fe6 100644 --- a/persistence/src/commonMain/sqldelight/com/persistence/Powersync.sq +++ b/persistence/src/commonMain/sqldelight/com/persistence/Powersync.sq @@ -28,6 +28,9 @@ DELETE FROM ps_crud WHERE id <= ?; -- we can define interal tables as part of the dialect. CREATE TABLE IF NOT EXISTS ps_crud (id INTEGER PRIMARY KEY AUTOINCREMENT, data TEXT, tx_id INTEGER); +CREATE INDEX IF NOT EXISTS idx_ps_crud_tx_id ON ps_crud(tx_id); +CREATE INDEX IF NOT EXISTS idx_ps_crud_tx_id_id ON ps_crud(tx_id, id); + CREATE TABLE ps_buckets( name TEXT PRIMARY KEY, last_applied_op INTEGER NOT NULL DEFAULT 0,