Skip to content

Commit

Permalink
[Fix] Bucket storage bugs (#61)
Browse files Browse the repository at this point in the history
Co-authored-by: Steven Ontong <[email protected]>
  • Loading branch information
Manrich121 and stevensJourney authored Feb 12, 2024
1 parent 457d890 commit 1229e52
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 40 deletions.
5 changes: 5 additions & 0 deletions .changeset/bright-nails-marry.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@journeyapps/powersync-sdk-common': patch
---

Fixed minor bugs in BucketStorage adapter.
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB

/**
* The underlying database.
*
*
* For the most part, behavior is the same whether querying on the underlying database, or on {@link AbstractPowerSyncDatabase}.
*/
protected get database() {
Expand Down Expand Up @@ -179,7 +179,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB

/**
* Replace the schema with a new version. This is for advanced use cases - typically the schema should just be specified once in the constructor.
*
*
* Cannot be used while connected - this should only be called before {@link AbstractPowerSyncDatabase.connect}.
*/
async updateSchema(schema: Schema) {
Expand Down Expand Up @@ -247,7 +247,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB

/**
* Close the sync connection.
*
*
* Use {@link connect} to connect again.
*/
async disconnect() {
Expand All @@ -261,7 +261,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
* Use this when logging out.
* The database can still be queried after this is called, but the tables
* would be empty.
*
*
* To preserve data in local-only tables, set clearLocal to false.
*/
async disconnectAndClear(options = DEFAULT_DISCONNECT_CLEAR_OPTIONS) {
Expand Down Expand Up @@ -363,20 +363,9 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
}

const last = all[all.length - 1];
return new CrudBatch(all, haveMore, async (writeCheckpoint?: string) => {
await this.writeTransaction(async (tx) => {
await tx.execute(`DELETE FROM ${PSInternalTable.CRUD} WHERE id <= ?`, [last.clientId]);
if (writeCheckpoint != null && (await tx.execute(`SELECT 1 FROM ${PSInternalTable.CRUD} LIMIT 1`)) == null) {
await tx.execute(`UPDATE ${PSInternalTable.BUCKETS} SET target_op = ? WHERE name='$local'`, [
writeCheckpoint
]);
} else {
await tx.execute(`UPDATE ${PSInternalTable.BUCKETS} SET target_op = ? WHERE name='$local'`, [
this.bucketStorageAdapter.getMaxOpId()
]);
}
});
});
return new CrudBatch(all, haveMore, async (writeCheckpoint?: string) =>
this.handleCrudCheckpoint(last.clientId, writeCheckpoint)
);
}

/**
Expand All @@ -403,7 +392,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
}
const txId = first.tx_id;

let all: CrudEntry[] = [];
let all: CrudEntry[];
if (!txId) {
all = [CrudEntry.fromRow(first)];
} else {
Expand All @@ -418,28 +407,30 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB

return new CrudTransaction(
all,
async (writeCheckpoint?: string) => {
await this.writeTransaction(async (tx) => {
await tx.execute(`DELETE FROM ${PSInternalTable.CRUD} WHERE id <= ?`, [last.clientId]);
if (writeCheckpoint) {
const check = await tx.execute(`SELECT 1 FROM ${PSInternalTable.CRUD} LIMIT 1`);
if (!check.rows?.length) {
await tx.execute(`UPDATE ${PSInternalTable.BUCKETS} SET target_op = ? WHERE name='$local'`, [
writeCheckpoint
]);
}
} else {
await tx.execute(`UPDATE ${PSInternalTable.BUCKETS} SET target_op = ? WHERE name='$local'`, [
this.bucketStorageAdapter.getMaxOpId()
]);
}
});
},
async (writeCheckpoint?: string) => this.handleCrudCheckpoint(last.clientId, writeCheckpoint),
txId
);
});
}

private async handleCrudCheckpoint(lastClientId: number, writeCheckpoint?: string) {
return this.writeTransaction(async (tx) => {
await tx.execute(`DELETE FROM ${PSInternalTable.CRUD} WHERE id <= ?`, [lastClientId]);
if (writeCheckpoint) {
const check = await tx.execute(`SELECT 1 FROM ${PSInternalTable.CRUD} LIMIT 1`);
if (!check.rows?.length) {
await tx.execute(`UPDATE ${PSInternalTable.BUCKETS} SET target_op = ? WHERE name='$local'`, [
writeCheckpoint
]);
}
} else {
await tx.execute(`UPDATE ${PSInternalTable.BUCKETS} SET target_op = ? WHERE name='$local'`, [
this.bucketStorageAdapter.getMaxOpId()
]);
}
});
}

/**
* Execute a statement and optionally return results.
*/
Expand Down Expand Up @@ -557,7 +548,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
}
for await (const event of this.onChange({
...(options ?? {}),
tables: resolvedTables
tables: _.uniq(resolvedTables)
})) {
yield await this.executeReadOnly(sql, parameters);
}
Expand All @@ -579,7 +570,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB

return new EventIterator<WatchOnChangeEvent>((eventOptions) => {
const flushTableUpdates = _.throttle(
async () => {
() => {
const intersection = _.intersection(watchedTables, throttledTableUpdates);
if (intersection.length) {
eventOptions.push({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,8 @@ export class SqliteBucketStorage implements BucketStorageAdapter {
]);
this.logger.debug('saveSyncData', JSON.stringify(result));
count += b.data.length;
this.compactCounter += count;
}
this.compactCounter += count;
});
}

Expand Down Expand Up @@ -293,7 +293,7 @@ export class SqliteBucketStorage implements BucketStorageAdapter {
* When the objects are successfully sent to the server, call .complete()
*/
async getCrudBatch(limit: number = 100): Promise<CrudBatch | null> {
if (!this.hasCrud()) {
if (!await this.hasCrud()) {
return null;
}

Expand Down

0 comments on commit 1229e52

Please sign in to comment.