Skip to content

Commit

Permalink
[Improvement] Shared Sync Worker (#72)
Browse files Browse the repository at this point in the history
  • Loading branch information
stevensJourney authored Mar 11, 2024
1 parent 60f666b commit aede9e7
Show file tree
Hide file tree
Showing 27 changed files with 1,267 additions and 328 deletions.
5 changes: 5 additions & 0 deletions .changeset/few-beds-camp.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@journeyapps/powersync-sdk-web': minor
---

Improved multiple tab syncing by unloading stream and sync bucket adapter functionality to shared webworker.
5 changes: 5 additions & 0 deletions .changeset/nasty-tigers-reflect.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@journeyapps/powersync-sdk-common': patch
---

Internally moved crud upload watching to `SqliteBucketStorageAdapter`. Added `dispose` methods for sync stream clients and better closing of clients.
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ import { PowerSyncBackendConnector } from './connection/PowerSyncBackendConnecto
import {
AbstractStreamingSyncImplementation,
DEFAULT_CRUD_UPLOAD_THROTTLE_MS,
StreamingSyncImplementationListener
StreamingSyncImplementationListener,
StreamingSyncImplementation
} from './sync/stream/AbstractStreamingSyncImplementation';
import { CrudBatch } from './sync/bucket/CrudBatch';
import { CrudTransaction } from './sync/bucket/CrudTransaction';
Expand Down Expand Up @@ -63,12 +64,25 @@ export interface PowerSyncDBListener extends StreamingSyncImplementationListener
initialized: () => void;
}

export interface PowerSyncCloseOptions {
/**
* Disconnect the sync stream client if connected.
* This is usually true, but can be false for Web when using
* multiple tabs and a shared sync provider.
*/
disconnect?: boolean;
}

const POWERSYNC_TABLE_MATCH = /(^ps_data__|^ps_data_local__)/;

const DEFAULT_DISCONNECT_CLEAR_OPTIONS: DisconnectAndClearOptions = {
clearLocal: true
};

export const DEFAULT_POWERSYNC_CLOSE_OPTIONS: PowerSyncCloseOptions = {
disconnect: true
};

export const DEFAULT_WATCH_THROTTLE_MS = 30;

export const DEFAULT_POWERSYNC_DB_OPTIONS = {
Expand Down Expand Up @@ -101,10 +115,9 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
* Current connection status.
*/
currentStatus?: SyncStatus;
syncStreamImplementation?: AbstractStreamingSyncImplementation;
syncStreamImplementation?: StreamingSyncImplementation;
sdkVersion: string;

private abortController: AbortController | null;
protected bucketStorageAdapter: BucketStorageAdapter;
private syncStatusListenerDisposer?: () => void;
protected _isReadyPromise: Promise<void>;
Expand All @@ -113,7 +126,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
constructor(protected options: PowerSyncDatabaseOptions) {
super();
this.bucketStorageAdapter = this.generateBucketStorageAdapter();
this.closed = true;
this.closed = false;
this.currentStatus = undefined;
this.options = { ...DEFAULT_POWERSYNC_DB_OPTIONS, ...options };
this._schema = options.schema;
Expand Down Expand Up @@ -189,7 +202,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
* Cannot be used while connected - this should only be called before {@link AbstractPowerSyncDatabase.connect}.
*/
async updateSchema(schema: Schema) {
if (this.abortController) {
if (this.syncStreamImplementation) {
throw new Error('Cannot update schema while connected');
}

Expand All @@ -207,19 +220,6 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
await this.database.execute('SELECT powersync_replace_schema(?)', [JSON.stringify(this.schema.toJSON())]);
}

/**
* Queues a CRUD upload when internal CRUD tables have been updated.
*/
protected async watchCrudUploads() {
for await (const event of this.onChange({
tables: [PSInternalTable.CRUD],
rawTableNames: true,
signal: this.abortController?.signal
})) {
this.syncStreamImplementation?.triggerCrudUpload();
}
}

/**
* Wait for initialization to complete.
* While initializing is automatic, this helps to catch and report initialization errors.
Expand All @@ -232,10 +232,15 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
* Connects to stream of events from the PowerSync instance.
*/
async connect(connector: PowerSyncBackendConnector) {
await this.waitForReady();

// close connection if one is open
await this.disconnect();

await this.waitForReady();
if (this.closed) {
throw new Error('Cannot connect using a closed client');
}

this.syncStreamImplementation = this.generateSyncStreamImplementation(connector);
this.syncStatusListenerDisposer = this.syncStreamImplementation.registerListener({
statusChanged: (status) => {
Expand All @@ -244,11 +249,9 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
}
});

this.abortController = new AbortController();
// Begin network stream
await this.syncStreamImplementation.waitForReady();
this.syncStreamImplementation.triggerCrudUpload();
this.syncStreamImplementation.streamingSync(this.abortController.signal);
this.watchCrudUploads();
this.syncStreamImplementation.connect();
}

/**
Expand All @@ -257,9 +260,10 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
* Use {@link connect} to connect again.
*/
async disconnect() {
this.abortController?.abort();
await this.syncStreamImplementation?.disconnect();
this.syncStatusListenerDisposer?.();
this.abortController = null;
await this.syncStreamImplementation?.dispose();
this.syncStreamImplementation = undefined;
}

/**
Expand Down Expand Up @@ -308,11 +312,17 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
* Once close is called, this connection cannot be used again - a new one
* must be constructed.
*/
async close() {
async close(options: PowerSyncCloseOptions = DEFAULT_POWERSYNC_CLOSE_OPTIONS) {
await this.waitForReady();

await this.disconnect();
const { disconnect } = options;
if (disconnect) {
await this.disconnect();
}

await this.syncStreamImplementation?.dispose();
this.database.close();
this.closed = true;
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { OpId } from './CrudEntry';
import { CrudBatch } from './CrudBatch';
import { SyncDataBatch } from './SyncDataBatch';
import { BaseListener, BaseObserver, Disposable } from '../../../utils/BaseObserver';

export interface Checkpoint {
last_op_id: OpId;
Expand Down Expand Up @@ -44,7 +45,11 @@ export enum PSInternalTable {
OPLOG = 'ps_oplog'
}

export interface BucketStorageAdapter {
export interface BucketStorageListener extends BaseListener {
crudUpdate: () => void;
}

export interface BucketStorageAdapter extends BaseObserver<BucketStorageListener>, Disposable {
init(): Promise<void>;
saveSyncData(batch: SyncDataBatch): Promise<void>;
removeBuckets(buckets: string[]): Promise<void>;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,21 +1,30 @@
import { v4 as uuid } from 'uuid';
import { Mutex } from 'async-mutex';
import { DBAdapter, Transaction } from '../../../db/DBAdapter';
import { BucketState, BucketStorageAdapter, Checkpoint, SyncLocalDatabaseResult } from './BucketStorageAdapter';
import { DBAdapter, Transaction, extractTableUpdates } from '../../../db/DBAdapter';
import {
BucketState,
BucketStorageAdapter,
BucketStorageListener,
Checkpoint,
PSInternalTable,
SyncLocalDatabaseResult
} from './BucketStorageAdapter';
import { OpTypeEnum } from './OpType';
import { CrudBatch } from './CrudBatch';
import { CrudEntry } from './CrudEntry';
import { SyncDataBatch } from './SyncDataBatch';
import Logger, { ILogger } from 'js-logger';
import { BaseObserver } from '../../../utils/BaseObserver';

const COMPACT_OPERATION_INTERVAL = 1_000;

export class SqliteBucketStorage implements BucketStorageAdapter {
export class SqliteBucketStorage extends BaseObserver<BucketStorageListener> implements BucketStorageAdapter {
static MAX_OP_ID = '9223372036854775807';

public tableNames: Set<string>;
private pendingBucketDeletes: boolean;
private _hasCompletedSync: boolean;
private updateListener: () => void;

/**
* Count up, and do a compact on startup.
Expand All @@ -27,9 +36,18 @@ export class SqliteBucketStorage implements BucketStorageAdapter {
private mutex: Mutex,
private logger: ILogger = Logger.get('SqliteBucketStorage')
) {
super();
this._hasCompletedSync = false;
this.pendingBucketDeletes = true;
this.tableNames = new Set();
this.updateListener = db.registerListener({
tablesUpdated: (update) => {
const tables = extractTableUpdates(update);
if (tables.includes(PSInternalTable.CRUD)) {
this.iterateListeners((l) => l.crudUpdate?.());
}
}
});
}

async init() {
Expand All @@ -42,6 +60,10 @@ export class SqliteBucketStorage implements BucketStorageAdapter {
}
}

async dispose() {
this.updateListener?.();
}

getMaxOpId() {
return SqliteBucketStorage.MAX_OP_ID;
}
Expand Down
Loading

0 comments on commit aede9e7

Please sign in to comment.