diff --git a/.changeset/few-beds-camp.md b/.changeset/few-beds-camp.md new file mode 100644 index 00000000..dd825366 --- /dev/null +++ b/.changeset/few-beds-camp.md @@ -0,0 +1,5 @@ +--- +'@journeyapps/powersync-sdk-web': minor +--- + +Improved multiple tab syncing by unloading stream and sync bucket adapter functionality to shared webworker. diff --git a/.changeset/nasty-tigers-reflect.md b/.changeset/nasty-tigers-reflect.md new file mode 100644 index 00000000..6203bfa0 --- /dev/null +++ b/.changeset/nasty-tigers-reflect.md @@ -0,0 +1,5 @@ +--- +'@journeyapps/powersync-sdk-common': patch +--- + +Internally moved crud upload watching to `SqliteBucketStorageAdapter`. Added `dispose` methods for sync stream clients and better closing of clients. diff --git a/packages/powersync-sdk-common/src/client/AbstractPowerSyncDatabase.ts b/packages/powersync-sdk-common/src/client/AbstractPowerSyncDatabase.ts index b1ac652c..0ddd0768 100644 --- a/packages/powersync-sdk-common/src/client/AbstractPowerSyncDatabase.ts +++ b/packages/powersync-sdk-common/src/client/AbstractPowerSyncDatabase.ts @@ -9,7 +9,8 @@ import { PowerSyncBackendConnector } from './connection/PowerSyncBackendConnecto import { AbstractStreamingSyncImplementation, DEFAULT_CRUD_UPLOAD_THROTTLE_MS, - StreamingSyncImplementationListener + StreamingSyncImplementationListener, + StreamingSyncImplementation } from './sync/stream/AbstractStreamingSyncImplementation'; import { CrudBatch } from './sync/bucket/CrudBatch'; import { CrudTransaction } from './sync/bucket/CrudTransaction'; @@ -63,12 +64,25 @@ export interface PowerSyncDBListener extends StreamingSyncImplementationListener initialized: () => void; } +export interface PowerSyncCloseOptions { + /** + * Disconnect the sync stream client if connected. + * This is usually true, but can be false for Web when using + * multiple tabs and a shared sync provider. + */ + disconnect?: boolean; +} + const POWERSYNC_TABLE_MATCH = /(^ps_data__|^ps_data_local__)/; const DEFAULT_DISCONNECT_CLEAR_OPTIONS: DisconnectAndClearOptions = { clearLocal: true }; +export const DEFAULT_POWERSYNC_CLOSE_OPTIONS: PowerSyncCloseOptions = { + disconnect: true +}; + export const DEFAULT_WATCH_THROTTLE_MS = 30; export const DEFAULT_POWERSYNC_DB_OPTIONS = { @@ -101,10 +115,9 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver void; protected _isReadyPromise: Promise; @@ -113,7 +126,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver { @@ -244,11 +249,9 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver void; +} + +export interface BucketStorageAdapter extends BaseObserver, Disposable { init(): Promise; saveSyncData(batch: SyncDataBatch): Promise; removeBuckets(buckets: string[]): Promise; diff --git a/packages/powersync-sdk-common/src/client/sync/bucket/SqliteBucketStorage.ts b/packages/powersync-sdk-common/src/client/sync/bucket/SqliteBucketStorage.ts index 44210014..d0f2dd1e 100644 --- a/packages/powersync-sdk-common/src/client/sync/bucket/SqliteBucketStorage.ts +++ b/packages/powersync-sdk-common/src/client/sync/bucket/SqliteBucketStorage.ts @@ -1,21 +1,30 @@ import { v4 as uuid } from 'uuid'; import { Mutex } from 'async-mutex'; -import { DBAdapter, Transaction } from '../../../db/DBAdapter'; -import { BucketState, BucketStorageAdapter, Checkpoint, SyncLocalDatabaseResult } from './BucketStorageAdapter'; +import { DBAdapter, Transaction, extractTableUpdates } from '../../../db/DBAdapter'; +import { + BucketState, + BucketStorageAdapter, + BucketStorageListener, + Checkpoint, + PSInternalTable, + SyncLocalDatabaseResult +} from './BucketStorageAdapter'; import { OpTypeEnum } from './OpType'; import { CrudBatch } from './CrudBatch'; import { CrudEntry } from './CrudEntry'; import { SyncDataBatch } from './SyncDataBatch'; import Logger, { ILogger } from 'js-logger'; +import { BaseObserver } from '../../../utils/BaseObserver'; const COMPACT_OPERATION_INTERVAL = 1_000; -export class SqliteBucketStorage implements BucketStorageAdapter { +export class SqliteBucketStorage extends BaseObserver implements BucketStorageAdapter { static MAX_OP_ID = '9223372036854775807'; public tableNames: Set; private pendingBucketDeletes: boolean; private _hasCompletedSync: boolean; + private updateListener: () => void; /** * Count up, and do a compact on startup. @@ -27,9 +36,18 @@ export class SqliteBucketStorage implements BucketStorageAdapter { private mutex: Mutex, private logger: ILogger = Logger.get('SqliteBucketStorage') ) { + super(); this._hasCompletedSync = false; this.pendingBucketDeletes = true; this.tableNames = new Set(); + this.updateListener = db.registerListener({ + tablesUpdated: (update) => { + const tables = extractTableUpdates(update); + if (tables.includes(PSInternalTable.CRUD)) { + this.iterateListeners((l) => l.crudUpdate?.()); + } + } + }); } async init() { @@ -42,6 +60,10 @@ export class SqliteBucketStorage implements BucketStorageAdapter { } } + async dispose() { + this.updateListener?.(); + } + getMaxOpId() { return SqliteBucketStorage.MAX_OP_ID; } diff --git a/packages/powersync-sdk-common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts b/packages/powersync-sdk-common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts index 61afabb7..cee983e2 100644 --- a/packages/powersync-sdk-common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts +++ b/packages/powersync-sdk-common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts @@ -16,7 +16,7 @@ import ndjsonStream from 'can-ndjson-stream'; import { BucketChecksum, BucketStorageAdapter, Checkpoint } from '../bucket/BucketStorageAdapter'; import { SyncStatus, SyncStatusOptions } from '../../../db/crud/SyncStatus'; import { SyncDataBucket } from '../bucket/SyncDataBucket'; -import { BaseObserver, BaseListener } from '../../../utils/BaseObserver'; +import { BaseObserver, BaseListener, Disposable } from '../../../utils/BaseObserver'; export enum LockType { CRUD = 'crud', @@ -49,6 +49,26 @@ export interface StreamingSyncImplementationListener extends BaseListener { statusChanged?: ((status: SyncStatus) => void) | undefined; } +export interface StreamingSyncImplementation extends BaseObserver, Disposable { + /** + * Connects to the sync service + */ + connect(): Promise; + /** + * Disconnects from the sync services. + * @throws if not connected or if abort is not controlled internally + */ + disconnect(): Promise; + getWriteCheckpoint: () => Promise; + hasCompletedSync: () => Promise; + isConnected: boolean; + lastSyncedAt?: Date; + syncStatus: SyncStatus; + triggerCrudUpload: () => void; + waitForReady(): Promise; + waitForStatus(status: SyncStatusOptions): Promise; +} + export const DEFAULT_CRUD_UPLOAD_THROTTLE_MS = 1000; export const DEFAULT_STREAMING_SYNC_OPTIONS = { @@ -57,9 +77,14 @@ export const DEFAULT_STREAMING_SYNC_OPTIONS = { crudUploadThrottleMs: DEFAULT_CRUD_UPLOAD_THROTTLE_MS }; -export abstract class AbstractStreamingSyncImplementation extends BaseObserver { +export abstract class AbstractStreamingSyncImplementation + extends BaseObserver + implements StreamingSyncImplementation +{ protected _lastSyncedAt: Date | null; protected options: AbstractStreamingSyncImplementationOptions; + protected abortController: AbortController | null; + protected crudUpdateListener?: () => void; syncStatus: SyncStatus; triggerCrudUpload: () => void; @@ -75,6 +100,7 @@ export abstract class AbstractStreamingSyncImplementation extends BaseObserver { @@ -88,17 +114,51 @@ export abstract class AbstractStreamingSyncImplementation extends BaseObserver { + return new Promise((resolve) => { + const l = this.registerListener({ + statusChanged: (updatedStatus) => { + /** + * Match only the partial status options provided in the + * matching status + */ + const matchPartialObject = (compA: object, compB: object) => { + return Object.entries(compA).every(([key, value]) => { + const comparisonBValue = compB[key]; + if (typeof value == 'object' && typeof comparisonBValue == 'object') { + return matchPartialObject(value, comparisonBValue); + } + return value == comparisonBValue; + }); + }; + + if (matchPartialObject(status, updatedStatus.toJSON())) { + resolve(); + l?.(); + } + } + }); + }); + } + get lastSyncedAt() { const lastSynced = this.syncStatus.lastSyncedAt; return lastSynced && new Date(lastSynced); } + get isConnected() { + return this.syncStatus.connected; + } + protected get logger() { return this.options.logger!; } - get isConnected() { - return this.syncStatus.connected; + async dispose() { + this.crudUpdateListener?.(); + this.crudUpdateListener = undefined; } abstract obtainLock(lockOptions: LockOptions): Promise; @@ -107,6 +167,11 @@ export abstract class AbstractStreamingSyncImplementation extends BaseObserver { + const response = await this.options.remote.get('/write-checkpoint2.json'); + return response['data']['write_checkpoint'] as string; + } + protected async _uploadAllCrud(): Promise { return this.obtainLock({ type: LockType.CRUD, @@ -154,13 +219,38 @@ export abstract class AbstractStreamingSyncImplementation extends BaseObserver { - const response = await this.options.remote.get('/write-checkpoint2.json'); - return response['data']['write_checkpoint'] as string; + connect() { + this.abortController = new AbortController(); + this.streamingSync(this.abortController.signal); + return this.waitForStatus({ connected: true }); } + async disconnect(): Promise { + if (!this.abortController) { + throw new Error('Disconnect not possible'); + } + this.abortController.abort('Disconnected'); + } + + /** + * @deprecated use [connect instead] + */ async streamingSync(signal?: AbortSignal): Promise { - signal?.addEventListener('abort', () => { + if (!signal) { + this.abortController = new AbortController(); + signal = this.abortController.signal; + } + + /** + * Listen for CRUD updates and trigger upstream uploads + */ + this.crudUpdateListener = this.options.adapter.registerListener({ + crudUpdate: () => this.triggerCrudUpload() + }); + + signal.addEventListener('abort', () => { + this.crudUpdateListener?.(); + this.crudUpdateListener = undefined; this.updateSyncStatus({ connected: false, dataFlow: { @@ -187,7 +277,7 @@ export abstract class AbstractStreamingSyncImplementation extends BaseObserver void): Promise<{ retry?: boolean }> { + protected async streamingSyncIteration(signal: AbortSignal, progress?: () => void): Promise<{ retry?: boolean }> { return await this.obtainLock({ type: LockType.SYNC, signal, @@ -355,7 +445,10 @@ export abstract class AbstractStreamingSyncImplementation extends BaseObserver { + protected async *streamingSyncRequest( + req: StreamingSyncRequest, + signal?: AbortSignal + ): AsyncGenerator { const body = await this.options.remote.postStreaming('/sync/stream', req, {}, signal); const stream = ndjsonStream(body); const reader = stream.getReader(); diff --git a/packages/powersync-sdk-common/src/db/DBAdapter.ts b/packages/powersync-sdk-common/src/db/DBAdapter.ts index c3568297..2d1835fb 100644 --- a/packages/powersync-sdk-common/src/db/DBAdapter.ts +++ b/packages/powersync-sdk-common/src/db/DBAdapter.ts @@ -108,3 +108,7 @@ export function isBatchedUpdateNotification( ): update is BatchedUpdateNotification { return 'tables' in update; } + +export function extractTableUpdates(update: BatchedUpdateNotification | UpdateNotification) { + return isBatchedUpdateNotification(update) ? update.tables : [update.table]; +} diff --git a/packages/powersync-sdk-common/src/db/crud/SyncStatus.ts b/packages/powersync-sdk-common/src/db/crud/SyncStatus.ts index f81d91b2..c0cb4c6e 100644 --- a/packages/powersync-sdk-common/src/db/crud/SyncStatus.ts +++ b/packages/powersync-sdk-common/src/db/crud/SyncStatus.ts @@ -56,4 +56,12 @@ export class SyncStatus { const dataFlow = this.dataFlowStatus; return `SyncStatus Promise; +} + export interface BaseObserverInterface { registerListener(listener: Partial): () => void; } diff --git a/packages/powersync-sdk-web/src/db/PowerSyncDatabase.ts b/packages/powersync-sdk-web/src/db/PowerSyncDatabase.ts index 4de83858..6b4ab0ea 100644 --- a/packages/powersync-sdk-web/src/db/PowerSyncDatabase.ts +++ b/packages/powersync-sdk-web/src/db/PowerSyncDatabase.ts @@ -4,7 +4,9 @@ import { PowerSyncBackendConnector, SqliteBucketStorage, BucketStorageAdapter, - PowerSyncDatabaseOptions + PowerSyncDatabaseOptions, + PowerSyncCloseOptions, + DEFAULT_POWERSYNC_CLOSE_OPTIONS } from '@journeyapps/powersync-sdk-common'; import { WebRemote } from './sync/WebRemote'; @@ -24,6 +26,12 @@ export interface WebPowerSyncFlags { * Open in SSR placeholder mode. DB operations and Sync operations will be a No-op */ ssrMode?: boolean; + /** + * Externally unload open PowerSync database instances when the window closes. + * Setting this to `true` requires calling `close` on all open PowerSyncDatabase + * instances before the window unloads + */ + externallyUnload?: boolean; } export interface WebPowerSyncDatabaseOptions extends PowerSyncDatabaseOptions { @@ -31,12 +39,46 @@ export interface WebPowerSyncDatabaseOptions extends PowerSyncDatabaseOptions { } export class PowerSyncDatabase extends AbstractPowerSyncDatabase { + protected unloadListener?: () => Promise; + constructor(protected options: WebPowerSyncDatabaseOptions) { super(options); + + const { flags } = this.options; + + if (flags?.enableMultiTabs && !flags.externallyUnload) { + this.unloadListener = () => this.close({ disconnect: false }); + window.addEventListener('unload', this.unloadListener); + } } async _initialize(): Promise {} + /** + * Closes the database connection. + * By default the sync stream client is only disconnected if + * multiple tabs are not enabled. + */ + close(options: PowerSyncCloseOptions = DEFAULT_POWERSYNC_CLOSE_OPTIONS): Promise { + if (this.unloadListener) { + window.removeEventListener('unload', this.unloadListener); + } + + return super.close({ + // Don't disconnect by default if multiple tabs are enabled + disconnect: options.disconnect ?? !this.options.flags?.enableMultiTabs + }); + } + + connect(connector: PowerSyncBackendConnector): Promise { + /** + * Using React strict mode might cause calls to connect to fire multiple times + * Connect is wrapped inside a lock in order to prevent race conditions internally between multiple + * connection attempts. + */ + return navigator.locks.request(`connection-lock-${this.options.database.name}`, () => super.connect(connector)); + } + protected generateBucketStorageAdapter(): BucketStorageAdapter { return new SqliteBucketStorage(this.database, AbstractPowerSyncDatabase.transactionMutex); } diff --git a/packages/powersync-sdk-web/src/db/adapters/wa-sqlite/WASQLiteDBAdapter.ts b/packages/powersync-sdk-web/src/db/adapters/wa-sqlite/WASQLiteDBAdapter.ts index 2651018d..862c0d65 100644 --- a/packages/powersync-sdk-web/src/db/adapters/wa-sqlite/WASQLiteDBAdapter.ts +++ b/packages/powersync-sdk-web/src/db/adapters/wa-sqlite/WASQLiteDBAdapter.ts @@ -12,6 +12,7 @@ import { import * as Comlink from 'comlink'; import Logger, { ILogger } from 'js-logger'; import type { DBWorkerInterface, OpenDB } from '../../../worker/db/open-db'; +import { getWorkerDatabaseOpener } from '../../../worker/db/open-worker-database'; export type WASQLiteFlags = { enableMultiTabs?: boolean; @@ -19,6 +20,11 @@ export type WASQLiteFlags = { export interface WASQLiteDBAdapterOptions extends Omit { flags?: WASQLiteFlags; + /** + * Use an existing port to an initialized worker. + * A worker will be initialized if none is provided + */ + workerPort?: MessagePort; } /** @@ -54,29 +60,12 @@ export class WASQLiteDBAdapter extends BaseObserver implement if (!enableMultiTabs) { this.logger.warn('Multiple tabs are not enabled in this browser'); } - /** - * Webpack V5 can bundle the worker automatically if the full Worker constructor syntax is used - * https://webpack.js.org/guides/web-workers/ - * This enables multi tab support by default, but falls back if SharedWorker is not available - * (in the case of Android) - */ - const openDB = enableMultiTabs - ? Comlink.wrap( - new SharedWorker(new URL('../../../worker/db/SharedWASQLiteDB.worker.js', import.meta.url), { - /* @vite-ignore */ - name: `shared-DB-worker-${this.name}`, - type: 'module' - }).port - ) - : Comlink.wrap( - new Worker(new URL('../../../worker/db/WASQLiteDB.worker.js', import.meta.url), { - /* @vite-ignore */ - name: `DB-worker-${this.name}`, - type: 'module' - }) - ); - - this.workerMethods = await openDB(this.options.dbFilename); + + const dbOpener = this.options.workerPort + ? Comlink.wrap(this.options.workerPort) + : getWorkerDatabaseOpener(this.options.dbFilename, enableMultiTabs); + + this.workerMethods = await dbOpener(this.options.dbFilename); this.workerMethods.registerOnTableChange( Comlink.proxy((opType: number, tableName: string, rowId: number) => { @@ -104,10 +93,13 @@ export class WASQLiteDBAdapter extends BaseObserver implement }; }; + /** + * Attempts to close the connection. + * Shared workers might not actually close the connection if other + * tabs are still using it. + */ close() { - if (!this.flags.enableMultiTabs) { - this.workerMethods?.close?.(); - } + this.workerMethods?.close?.(); } async getAll(sql: string, parameters?: any[] | undefined): Promise { @@ -127,31 +119,12 @@ export class WASQLiteDBAdapter extends BaseObserver implement async readLock(fn: (tx: LockContext) => Promise, options?: DBLockOptions | undefined): Promise { await this.initialized; - return new Promise((resolve, reject) => { - this.acquireLock(async () => { - try { - const res = await fn(this.generateDBHelpers({ execute: this._execute })); - resolve(res); - } catch (ex) { - reject(ex); - } - }); - }); + return this.acquireLock(async () => fn(this.generateDBHelpers({ execute: this._execute }))); } async writeLock(fn: (tx: LockContext) => Promise, options?: DBLockOptions | undefined): Promise { await this.initialized; - return new Promise((resolve, reject) => { - // This implementation currently only uses a single connection. Locking is ensured by navigator locks - this.acquireLock(async () => { - try { - const res = await fn(this.generateDBHelpers({ execute: this._execute })); - resolve(res); - } catch (ex) { - reject(ex); - } - }); - }); + return this.acquireLock(async () => fn(this.generateDBHelpers({ execute: this._execute }))); } protected acquireLock(callback: () => Promise): Promise { diff --git a/packages/powersync-sdk-web/src/db/sync/SharedWebStreamingSyncImplementation.ts b/packages/powersync-sdk-web/src/db/sync/SharedWebStreamingSyncImplementation.ts index 82616294..1e18193f 100644 --- a/packages/powersync-sdk-web/src/db/sync/SharedWebStreamingSyncImplementation.ts +++ b/packages/powersync-sdk-web/src/db/sync/SharedWebStreamingSyncImplementation.ts @@ -1,70 +1,183 @@ -import _ from 'lodash'; -import { v4 as uuid } from 'uuid'; import * as Comlink from 'comlink'; import { WebStreamingSyncImplementation, WebStreamingSyncImplementationOptions } from './WebStreamingSyncImplementation'; import { - SharedSyncImplementation, - SharedSyncMessage, - SharedSyncMessageType, - SharedSyncStatus + ManualSharedSyncPayload, + SharedSyncClientEvent, + SharedSyncImplementation } from '../../worker/sync/SharedSyncImplementation'; +import { AbstractSharedSyncClientProvider } from '../../worker/sync/AbstractSharedSyncClientProvider'; +import { PowerSyncCredentials, SyncStatus, SyncStatusOptions } from '@journeyapps/powersync-sdk-common'; +import { openWorkerDatabasePort } from '../../worker/db/open-worker-database'; + +/** + * The shared worker will trigger methods on this side of the message port + * via this client provider. + */ +class SharedSyncClientProvider extends AbstractSharedSyncClientProvider { + constructor( + protected options: WebStreamingSyncImplementationOptions, + public statusChanged: (status: SyncStatusOptions) => void + ) { + super(); + } + + async fetchCredentials(): Promise { + const credentials = await this.options.remote.getCredentials(); + /** + * The credentials need to be serializable. + * Users might extend [PowerSyncCredentials] to contain + * items which are not serializable. + * This returns only the essential fields. + */ + return { + endpoint: credentials.endpoint, + token: credentials.token, + expiresAt: credentials.expiresAt + }; + } + + uploadCrud(): Promise { + return this.options.uploadCrud(); + } + + get logger() { + return this.options.logger; + } + + trace(...x: any[]): void { + this.logger?.trace(...x); + } + debug(...x: any[]): void { + this.logger?.debug(...x); + } + info(...x: any[]): void { + this.logger?.info(...x); + } + log(...x: any[]): void { + this.logger?.log(...x); + } + warn(...x: any[]): void { + this.logger?.warn(...x); + } + error(...x: any[]): void { + this.logger?.error(...x); + } + time(label: string): void { + this.logger?.time(label); + } + timeEnd(label: string): void { + this.logger?.timeEnd(label); + } +} export class SharedWebStreamingSyncImplementation extends WebStreamingSyncImplementation { - protected stateManager: Comlink.Remote; + protected syncManager: Comlink.Remote; + protected clientProvider: SharedSyncClientProvider; + protected messagePort: MessagePort; - /** - * ID for the tab running this sync implementation - */ - protected syncTabId: string; + protected isInitialized: Promise; constructor(options: WebStreamingSyncImplementationOptions) { super(options); - this.syncTabId = uuid(); - const worker = new SharedWorker(new URL('../../worker/sync/SharedSyncImplementation.worker.js', import.meta.url), { - /* @vite-ignore */ - name: `shared-sync-${this.webOptions.identifier}`, - type: 'module' - }); - - const { port } = worker; - this.stateManager = Comlink.wrap(port); + /** + * Configure or connect to the shared sync worker. + * This worker will manage all syncing operations remotely. + */ + const syncWorker = new SharedWorker( + new URL('../../worker/sync/SharedSyncImplementation.worker.js', import.meta.url), + { + /* @vite-ignore */ + name: `shared-sync-${this.webOptions.identifier}`, + type: 'module' + } + ); + this.messagePort = syncWorker.port; + this.syncManager = Comlink.wrap(this.messagePort); + this.triggerCrudUpload = this.syncManager.triggerCrudUpload; - port.onmessage = (event: MessageEvent) => { - const { - data: { type, payload } - } = event; - if (type !== SharedSyncMessageType.UPDATE || payload.tabId == this.syncTabId) { - // Don't update from own updates - return; + /** + * Opens MessagePort to the existing shared DB worker. + * The sync worker cannot initiate connections directly to the + * DB worker, but a port to the DB worker can be transferred to the + * sync worker. + */ + const { crudUploadThrottleMs, identifier, retryDelayMs } = this.options; + const dbOpenerPort = openWorkerDatabasePort(this.options.identifier!, true) as MessagePort; + this.isInitialized = this.syncManager.init(Comlink.transfer(dbOpenerPort, [dbOpenerPort]), { + dbName: this.options.identifier!, + streamOptions: { + crudUploadThrottleMs, + identifier, + retryDelayMs } - // Don't broadcast this to the shared implementation - this.internalUpdateStatus(payload); - }; + }); - // Load the initial state - this.stateManager.getState().then((state) => { - this.internalUpdateStatus(state); + /** + * Pass along any sync status updates to this listener + */ + this.clientProvider = new SharedSyncClientProvider(this.options, (status) => { + this.iterateListeners((l) => this.updateSyncStatus(status)); }); + + /** + * The sync worker will call this client provider when it needs + * to fetch credentials or upload data. + * This performs bi-directional method calling. + */ + Comlink.expose(this.clientProvider, this.messagePort); } /** - * Triggers update of sync status without broadcasting to shared sync - * manager + * Starts the sync process, this effectively acts as a call to + * `connect` if not yet connected. */ - protected internalUpdateStatus(state: SharedSyncStatus) { - return super.updateSyncStatus(state); + async connect(): Promise { + await this.waitForReady(); + return this.syncManager.connect(); } - protected updateSyncStatus(state: SharedSyncStatus): void { - super.updateSyncStatus(state); - // Broadcast this update to shared sync manager - this.stateManager.updateState({ - ...state, - tabId: this.syncTabId - }); + async disconnect(): Promise { + await this.waitForReady(); + return this.syncManager.disconnect(); + } + + async getWriteCheckpoint(): Promise { + await this.waitForReady(); + return this.syncManager.getWriteCheckpoint(); + } + + async hasCompletedSync(): Promise { + return this.syncManager.hasCompletedSync(); + } + + async dispose(): Promise { + await this.waitForReady(); + // Signal the shared worker that this client is closing its connection to the worker + const closeMessagePayload: ManualSharedSyncPayload = { + event: SharedSyncClientEvent.CLOSE_CLIENT, + data: {} + }; + + (localStorage as any).setItem('posting close' + Math.random(), `$}`); + + this.messagePort.postMessage(closeMessagePayload); + + // Release the proxy + this.syncManager[Comlink.releaseProxy](); + } + + async waitForReady() { + return this.isInitialized; + } + + /** + * Used in tests to force a connection states + */ + private async _testUpdateStatus(status: SyncStatus) { + return (this.syncManager as any).syncStreamClient.updateSyncStatus(status.toJSON()); } } diff --git a/packages/powersync-sdk-web/src/index.ts b/packages/powersync-sdk-web/src/index.ts index 6495734a..66e33a09 100644 --- a/packages/powersync-sdk-web/src/index.ts +++ b/packages/powersync-sdk-web/src/index.ts @@ -2,5 +2,7 @@ export * from '@journeyapps/powersync-sdk-common'; export * from './db/PowerSyncDatabase'; export * from './db/sync/WebRemote'; export * from './db/sync/WebStreamingSyncImplementation'; +export * from './db/sync/SharedWebStreamingSyncImplementation'; export * from './db/adapters/wa-sqlite/WASQLiteDBAdapter'; export * from './db/adapters/wa-sqlite/WASQLitePowerSyncDatabaseOpenFactory'; +export * from './db/adapters/AbstractWebPowerSyncDatabaseOpenFactory'; diff --git a/packages/powersync-sdk-web/src/worker/db/SharedWASQLiteDB.worker.ts b/packages/powersync-sdk-web/src/worker/db/SharedWASQLiteDB.worker.ts index 48b85f51..3b49a30a 100644 --- a/packages/powersync-sdk-web/src/worker/db/SharedWASQLiteDB.worker.ts +++ b/packages/powersync-sdk-web/src/worker/db/SharedWASQLiteDB.worker.ts @@ -1,23 +1,58 @@ import '@journeyapps/wa-sqlite'; import * as Comlink from 'comlink'; +import { v4 as uuid } from 'uuid'; + import { DBWorkerInterface, _openDB } from './open-db'; +/** + * Keeps track of open DB connections and the clients which + * are using it. + */ +type SharedDBWorkerConnection = { + clientIds: Set; + db: DBWorkerInterface; +}; + const _self: SharedWorkerGlobalScope = self as any; -const DBMap = new Map>(); +const DBMap = new Map(); +const OPEN_DB_LOCK = 'open-wasqlite-db'; const openDB = async (dbFileName: string): Promise => { - if (!DBMap.has(dbFileName)) { - const openPromise = _openDB(dbFileName); - DBMap.set(dbFileName, openPromise); - openPromise.catch((error) => { - // Allow for retries if an error ocurred - console.error(error); - DBMap.delete(dbFileName); - }); - } - return Comlink.proxy(await DBMap.get(dbFileName)!); + // Prevent multiple simultaneous opens from causing race conditions + return navigator.locks.request(OPEN_DB_LOCK, async () => { + const clientId = uuid(); + + if (!DBMap.has(dbFileName)) { + const clientIds = new Set(); + const connection = await _openDB(dbFileName); + DBMap.set(dbFileName, { + clientIds, + db: connection + }); + } + + const dbEntry = DBMap.get(dbFileName)!; + dbEntry.clientIds.add(clientId); + const { db } = dbEntry; + + const wrappedConnection = { + ...db, + close: Comlink.proxy(() => { + const { clientIds } = dbEntry; + clientIds.delete(clientId); + if (clientIds.size == 0) { + console.debug(`Closing connection to ${dbFileName}.`); + DBMap.delete(dbFileName); + return db.close?.(); + } + console.debug(`Connection to ${dbFileName} not closed yet due to active clients.`); + }) + }; + + return Comlink.proxy(wrappedConnection); + }); }; _self.onconnect = function (event: MessageEvent) { @@ -26,9 +61,9 @@ _self.onconnect = function (event: MessageEvent) { Comlink.expose(openDB, port); }; -addEventListener('beforeunload', () => { - Array.from(DBMap.values()).forEach(async (dbPromise) => { - const db = await dbPromise; +addEventListener('unload', () => { + Array.from(DBMap.values()).forEach(async (dbConnection) => { + const db = await dbConnection.db; db.close?.(); }); }); diff --git a/packages/powersync-sdk-web/src/worker/db/open-worker-database.ts b/packages/powersync-sdk-web/src/worker/db/open-worker-database.ts new file mode 100644 index 00000000..c7a2a3f0 --- /dev/null +++ b/packages/powersync-sdk-web/src/worker/db/open-worker-database.ts @@ -0,0 +1,33 @@ +import * as Comlink from 'comlink'; +import { OpenDB } from './open-db'; + +/** + * Opens a shared or dedicated worker which exposes opening of database connections + */ +export function openWorkerDatabasePort(workerIdentifier: string, multipleTabs = true) { + /** + * Webpack V5 can bundle the worker automatically if the full Worker constructor syntax is used + * https://webpack.js.org/guides/web-workers/ + * This enables multi tab support by default, but falls back if SharedWorker is not available + * (in the case of Android) + */ + return multipleTabs + ? new SharedWorker(new URL('./SharedWASQLiteDB.worker.js', import.meta.url), { + /* @vite-ignore */ + name: `shared-DB-worker-${workerIdentifier}`, + type: 'module' + }).port + : new Worker(new URL('./WASQLiteDB.worker.js', import.meta.url), { + /* @vite-ignore */ + name: `DB-worker-${workerIdentifier}`, + type: 'module' + }); +} + +/** + * @returns A function which allows for opening database connections inside + * a worker. + */ +export function getWorkerDatabaseOpener(workerIdentifier: string, multipleTabs = true) { + return Comlink.wrap(openWorkerDatabasePort(workerIdentifier, multipleTabs)); +} diff --git a/packages/powersync-sdk-web/src/worker/sync/AbstractSharedSyncClientProvider.ts b/packages/powersync-sdk-web/src/worker/sync/AbstractSharedSyncClientProvider.ts new file mode 100644 index 00000000..9d6fc978 --- /dev/null +++ b/packages/powersync-sdk-web/src/worker/sync/AbstractSharedSyncClientProvider.ts @@ -0,0 +1,19 @@ +import { PowerSyncCredentials, SyncStatusOptions } from '@journeyapps/powersync-sdk-common'; + +/** + * The client side port should provide these methods. + */ +export abstract class AbstractSharedSyncClientProvider { + abstract fetchCredentials(): Promise; + abstract uploadCrud(): Promise; + abstract statusChanged(status: SyncStatusOptions): void; + + abstract trace(...x: any[]): void; + abstract debug(...x: any[]): void; + abstract info(...x: any[]): void; + abstract log(...x: any[]): void; + abstract warn(...x: any[]): void; + abstract error(...x: any[]): void; + abstract time(label: string): void; + abstract timeEnd(label: string): void; +} diff --git a/packages/powersync-sdk-web/src/worker/sync/BroadcastLogger.ts b/packages/powersync-sdk-web/src/worker/sync/BroadcastLogger.ts new file mode 100644 index 00000000..a681b25d --- /dev/null +++ b/packages/powersync-sdk-web/src/worker/sync/BroadcastLogger.ts @@ -0,0 +1,79 @@ +import Logger, { ILogLevel, ILogger } from 'js-logger'; +import { type WrappedSyncPort } from './SharedSyncImplementation'; + +/** + * Broadcasts logs to all clients + */ +export class BroadcastLogger implements ILogger { + TRACE: ILogLevel; + DEBUG: ILogLevel; + INFO: ILogLevel; + TIME: ILogLevel; + WARN: ILogLevel; + ERROR: ILogLevel; + OFF: ILogLevel; + + constructor(protected clients: WrappedSyncPort[]) { + this.TRACE = Logger.TRACE; + this.DEBUG = Logger.DEBUG; + this.INFO = Logger.INFO; + this.TIME = Logger.TIME; + this.WARN = Logger.WARN; + this.ERROR = Logger.ERROR; + this.OFF = Logger.OFF; + } + + trace(...x: any[]): void { + console.trace(...x); + this.clients.forEach((p) => p.clientProvider.trace(...x)); + } + + debug(...x: any[]): void { + console.debug(...x); + this.clients.forEach((p) => p.clientProvider.debug(...x)); + } + + info(...x: any[]): void { + console.info(...x); + this.clients.forEach((p) => p.clientProvider.info(...x)); + } + + log(...x: any[]): void { + console.log(...x); + this.clients.forEach((p) => p.clientProvider.log(...x)); + } + + warn(...x: any[]): void { + console.warn(...x); + this.clients.forEach((p) => p.clientProvider.warn(...x)); + } + + error(...x: any[]): void { + console.error(...x); + this.clients.forEach((p) => p.clientProvider.error(...x)); + } + + time(label: string): void { + console.time(label); + this.clients.forEach((p) => p.clientProvider.time(label)); + } + + timeEnd(label: string): void { + console.timeEnd(label); + this.clients.forEach((p) => p.clientProvider.timeEnd(label)); + } + + setLevel(level: ILogLevel): void { + // Levels are not adjustable on this level. + } + + getLevel(): ILogLevel { + // Levels are not adjustable on this level. + return Logger.INFO; + } + + enabledFor(level: ILogLevel): boolean { + // Levels are not adjustable on this level. + return true; + } +} diff --git a/packages/powersync-sdk-web/src/worker/sync/SharedSyncImplementation.ts b/packages/powersync-sdk-web/src/worker/sync/SharedSyncImplementation.ts index 81d960b4..d4ac15dc 100644 --- a/packages/powersync-sdk-web/src/worker/sync/SharedSyncImplementation.ts +++ b/packages/powersync-sdk-web/src/worker/sync/SharedSyncImplementation.ts @@ -1,44 +1,281 @@ -import _ from 'lodash'; -import { BaseListener, BaseObserver, SyncStatusOptions } from '@journeyapps/powersync-sdk-common'; +import * as Comlink from 'comlink'; +import { ILogger } from 'js-logger'; +import { + AbstractStreamingSyncImplementation, + StreamingSyncImplementation, + AbstractStreamingSyncImplementationOptions, + BaseObserver, + LockOptions, + SqliteBucketStorage, + StreamingSyncImplementationListener, + SyncStatus, + SyncStatusOptions +} from '@journeyapps/powersync-sdk-common'; +import { WebStreamingSyncImplementation } from '../../db/sync/WebStreamingSyncImplementation'; +import { Mutex } from 'async-mutex'; +import { WebRemote } from '../../db/sync/WebRemote'; -export enum SharedSyncMessageType { - UPDATE = 'sync-status-update' +import { WASQLiteDBAdapter } from '../../db/adapters/wa-sqlite/WASQLiteDBAdapter'; +import { AbstractSharedSyncClientProvider } from './AbstractSharedSyncClientProvider'; +import { BroadcastLogger } from './BroadcastLogger'; + +/** + * Manual message events for shared sync clients + */ +export enum SharedSyncClientEvent { + /** + * This client requests the shared sync manager should + * close it's connection to the client. + */ + CLOSE_CLIENT = 'close-client' } -export type SharedSyncStatus = SyncStatusOptions & { - tabId?: string; +export type ManualSharedSyncPayload = { + event: SharedSyncClientEvent; + data: any; // TODO update in future }; -export type SharedSyncMessage = { - type: SharedSyncMessageType; - payload: SharedSyncStatus; +export type SharedSyncInitOptions = { + dbName: string; + streamOptions: Omit; }; -export interface SharedSyncImplementationListener extends BaseListener { - statusChanged: (status: SharedSyncStatus) => void; +export interface SharedSyncImplementationListener extends StreamingSyncImplementationListener { + initialized: () => void; } -export class SharedSyncImplementation extends BaseObserver { - protected status: SharedSyncStatus; +export type WrappedSyncPort = { + port: MessagePort; + clientProvider: Comlink.Remote; +}; + +export type RemoteOperationAbortController = { + controller: AbortController; + activePort: WrappedSyncPort; +}; + +/** + * Shared sync implementation which runs inside a shared webworker + */ +export class SharedSyncImplementation + extends BaseObserver + implements StreamingSyncImplementation +{ + protected ports: WrappedSyncPort[]; + protected syncStreamClient?: AbstractStreamingSyncImplementation; + + protected abortController?: AbortController; + protected isInitialized: Promise; + protected statusListener?: () => void; + + protected fetchCredentialsController?: RemoteOperationAbortController; + protected uploadDataController?: RemoteOperationAbortController; + + syncStatus: SyncStatus; + broadCastLogger: ILogger; constructor() { super(); - this.status = { - connected: false + this.ports = []; + + this.isInitialized = new Promise((resolve) => { + const callback = this.registerListener({ + initialized: () => { + resolve(); + callback?.(); + } + }); + }); + + this.syncStatus = new SyncStatus({}); + this.broadCastLogger = new BroadcastLogger(this.ports); + } + + async waitForStatus(status: SyncStatusOptions): Promise { + await this.waitForReady(); + return this.syncStreamClient!.waitForStatus(status); + } + + get lastSyncedAt(): Date | undefined { + return this.syncStreamClient?.lastSyncedAt; + } + + get isConnected(): boolean { + return this.syncStreamClient?.isConnected ?? false; + } + + async waitForReady() { + return this.isInitialized; + } + + /** + * Configures the DBAdapter connection and a streaming sync client. + */ + async init(dbWorkerPort: MessagePort, params: SharedSyncInitOptions) { + if (this.syncStreamClient) { + // Cannot modify already existing sync implementation + return; + } + + this.syncStreamClient = new WebStreamingSyncImplementation({ + adapter: new SqliteBucketStorage( + new WASQLiteDBAdapter({ + dbFilename: params.dbName, + workerPort: dbWorkerPort, + flags: { enableMultiTabs: true }, + logger: this.broadCastLogger + }), + new Mutex() + ), + remote: new WebRemote({ + fetchCredentials: async () => { + const lastPort = this.ports[this.ports.length - 1]; + return new Promise(async (resolve, reject) => { + const abortController = new AbortController(); + this.fetchCredentialsController = { + controller: abortController, + activePort: lastPort + }; + + abortController.signal.onabort = reject; + try { + resolve(await lastPort.clientProvider.fetchCredentials()); + } catch (ex) { + reject(ex); + } finally { + this.fetchCredentialsController = undefined; + } + }); + } + }), + uploadCrud: async () => { + const lastPort = this.ports[this.ports.length - 1]; + + return new Promise(async (resolve, reject) => { + const abortController = new AbortController(); + this.uploadDataController = { + controller: abortController, + activePort: lastPort + }; + + // Resolving will make it retry + abortController.signal.onabort = () => resolve(); + try { + resolve(await lastPort.clientProvider.uploadCrud()); + } catch (ex) { + reject(ex); + } finally { + this.uploadDataController = undefined; + } + }); + }, + ...params.streamOptions, + // Logger cannot be transferred just yet + logger: this.broadCastLogger + }); + + this.syncStreamClient.registerListener({ + statusChanged: (status) => { + this.updateAllStatuses(status.toJSON()); + } + }); + + this.iterateListeners((l) => l.initialized?.()); + } + + async dispose() { + await this.waitForReady(); + this.statusListener?.(); + return this.syncStreamClient?.dispose(); + } + + /** + * Connects to the PowerSync backend instance. + * Multiple tabs can safely call this in their initialization. + * The connection will simply be reconnected whenever a new tab + * connects. + */ + async connect() { + await this.waitForReady(); + this.disconnect(); + this.abortController = new AbortController(); + this.syncStreamClient?.streamingSync(this.abortController.signal); + } + + async disconnect() { + this.abortController?.abort('Disconnected'); + this.iterateListeners((l) => l.statusChanged?.(new SyncStatus({ connected: false }))); + } + + /** + * Adds a new client tab's message port to the list of connected ports + */ + addPort(port: MessagePort) { + const portProvider = { + port, + clientProvider: Comlink.wrap(port) }; + this.ports.push(portProvider); + + // Give the newly connected client the latest status + const status = this.syncStreamClient?.syncStatus; + if (status) { + portProvider.clientProvider.statusChanged(status.toJSON()); + } } /** - * Provides a method to get the current state - * This is needed for a new tab to initialize it's local state - * before relying on the next broadcast update. + * Removes a message port client from this manager's managed + * clients. */ - getState(): SharedSyncStatus { - return this.status; + removePort(port: MessagePort) { + const index = this.ports.findIndex((p) => p.port == port); + if (index < 0) { + console.warn(`Could not remove port ${port} since it is not present in active ports.`); + return; + } + + const trackedPort = this.ports[index]; + // Release proxy + trackedPort.clientProvider[Comlink.releaseProxy](); + this.ports.splice(index, 1); + + /** + * The port might currently be in use. Any active functions might + * not resolve. Abort them here. + */ + [this.fetchCredentialsController, this.uploadDataController].forEach((abortController) => { + if (abortController?.activePort.port == port) { + abortController!.controller.abort(); + } + }); + } + + triggerCrudUpload() { + this.waitForReady().then(() => this.syncStreamClient?.triggerCrudUpload()); + } + + async obtainLock(lockOptions: LockOptions): Promise { + await this.waitForReady(); + return this.syncStreamClient!.obtainLock(lockOptions); } - updateState(status: SharedSyncStatus) { - this.status = _.merge(this.status, status); - this.iterateListeners((cb) => cb.statusChanged?.(status)); + async hasCompletedSync(): Promise { + await this.waitForReady(); + return this.syncStreamClient!.hasCompletedSync(); + } + + async getWriteCheckpoint(): Promise { + await this.waitForReady(); + return this.syncStreamClient!.getWriteCheckpoint(); + } + + /** + * A method to update the all shared statuses for each + * client. + */ + private updateAllStatuses(status: SyncStatusOptions) { + this.syncStatus = new SyncStatus(status); + this.ports.forEach((p) => p.clientProvider.statusChanged(status)); } } diff --git a/packages/powersync-sdk-web/src/worker/sync/SharedSyncImplementation.worker.ts b/packages/powersync-sdk-web/src/worker/sync/SharedSyncImplementation.worker.ts index d807666a..1047cf86 100644 --- a/packages/powersync-sdk-web/src/worker/sync/SharedSyncImplementation.worker.ts +++ b/packages/powersync-sdk-web/src/worker/sync/SharedSyncImplementation.worker.ts @@ -1,21 +1,26 @@ import * as Comlink from 'comlink'; -import { SharedSyncImplementation, SharedSyncMessageType } from './SharedSyncImplementation'; +import { SharedSyncImplementation, SharedSyncClientEvent, ManualSharedSyncPayload } from './SharedSyncImplementation'; +import Logger from 'js-logger'; const _self: SharedWorkerGlobalScope = self as any; - +Logger.useDefaults(); const sharedSyncImplementation = new SharedSyncImplementation(); _self.onconnect = function (event: MessageEvent) { const port = event.ports[0]; - Comlink.expose(sharedSyncImplementation, port); - - sharedSyncImplementation.registerListener({ - statusChanged: (status) => { - port.postMessage({ - type: SharedSyncMessageType.UPDATE, - payload: status - }); + /** + * Adds an extra listener which can remove this port + * from the list of monitored ports. + */ + port.addEventListener('message', (event) => { + const payload = event.data as ManualSharedSyncPayload; + if (payload?.event == SharedSyncClientEvent.CLOSE_CLIENT) { + console.log('closing shared for port', port); + sharedSyncImplementation.removePort(port); } }); + + Comlink.expose(sharedSyncImplementation, port); + sharedSyncImplementation.addPort(port); }; diff --git a/packages/powersync-sdk-web/tests/bucket_storage.test.ts b/packages/powersync-sdk-web/tests/bucket_storage.test.ts index b63782ce..100a8a41 100644 --- a/packages/powersync-sdk-web/tests/bucket_storage.test.ts +++ b/packages/powersync-sdk-web/tests/bucket_storage.test.ts @@ -12,7 +12,7 @@ import { afterEach, beforeEach, describe, expect, it } from 'vitest'; import { AbstractPowerSyncDatabase, Checkpoint } from '@journeyapps/powersync-sdk-common'; import { WASQLitePowerSyncDatabaseOpenFactory } from '@journeyapps/powersync-sdk-web'; import { Mutex } from 'async-mutex'; -import { testSchema } from './test_schema'; +import { testSchema } from './utils/test-schema'; const putAsset1_1 = OplogEntry.fromRow({ op_id: '1', diff --git a/packages/powersync-sdk-web/tests/crud.test.ts b/packages/powersync-sdk-web/tests/crud.test.ts index cc249905..e6dbee26 100644 --- a/packages/powersync-sdk-web/tests/crud.test.ts +++ b/packages/powersync-sdk-web/tests/crud.test.ts @@ -10,7 +10,7 @@ import { } from '@journeyapps/powersync-sdk-common'; import { WASQLitePowerSyncDatabaseOpenFactory } from '@journeyapps/powersync-sdk-web'; import { v4 as uuid } from 'uuid'; -import { testSchema } from './test_schema'; +import { testSchema } from './utils/test-schema'; const testId = '2290de4f-0488-4e50-abed-f8e8eb1d0b42'; diff --git a/packages/powersync-sdk-web/tests/multiple_instances.test.ts b/packages/powersync-sdk-web/tests/multiple_instances.test.ts new file mode 100644 index 00000000..b8562864 --- /dev/null +++ b/packages/powersync-sdk-web/tests/multiple_instances.test.ts @@ -0,0 +1,216 @@ +import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; +import { AbstractPowerSyncDatabase, SqliteBucketStorage, SyncStatus } from '@journeyapps/powersync-sdk-common'; +import { + SharedWebStreamingSyncImplementation, + WASQLitePowerSyncDatabaseOpenFactory, + WebRemote, + WebStreamingSyncImplementationOptions +} from '@journeyapps/powersync-sdk-web'; +import { testSchema } from './utils/test-schema'; +import { TestConnector } from './utils/MockStreamOpenFactory'; +import { Mutex } from 'async-mutex'; + +describe('Multiple Instances', () => { + const dbFilename = 'test-multiple-instances.db'; + const factory = new WASQLitePowerSyncDatabaseOpenFactory({ + dbFilename, + schema: testSchema + }); + + let db: AbstractPowerSyncDatabase; + + beforeEach(() => { + db = factory.getInstance(); + }); + + afterEach(async () => { + await db.disconnectAndClear(); + await db.close(); + }); + + function createAsset(powersync: AbstractPowerSyncDatabase = db) { + return powersync.execute('INSERT INTO assets(id, description) VALUES(uuid(), ?)', ['test']); + } + + it('should share data between instances', async () => { + // Create an asset on the first connection + await createAsset(); + + // Create a new connection and verify it can read existing assets + const db2 = factory.getInstance(); + const assets = await db2.getAll('SELECT * FROM assets'); + expect(assets.length).equals(1); + + await db2.close(); + }); + + it('should maintain DB connections if instances call close', async () => { + /** + * The shared webworker should use the same DB connection for both instances. + * The shared connection should only be closed if all PowerSync clients + * close themselves. + */ + const db2 = factory.getInstance(); + await db2.close(); + + // Create an asset on the first connection + await createAsset(); + }); + + it('should watch table changes between instances', async () => { + const db2 = factory.getInstance(); + + const watchedPromise = new Promise(async (resolve) => { + const controller = new AbortController(); + for await (const result of db2.watch('SELECT * FROM assets')) { + resolve(); + controller.abort(); + } + }); + + await createAsset(); + + expect(watchedPromise).rejects; + }); + + it('should share sync updates', async () => { + // Generate the first streaming sync implementation + const connector1 = new TestConnector(); + + // They need to use the same identifier to use the same shared worker. + const identifier = 'streaming-sync-shared'; + const syncOptions1: WebStreamingSyncImplementationOptions = { + adapter: new SqliteBucketStorage(db.database, new Mutex()), + remote: new WebRemote(connector1), + uploadCrud: async () => { + await connector1.uploadData(db); + }, + identifier + }; + + const stream1 = new SharedWebStreamingSyncImplementation(syncOptions1); + + // Generate the second streaming sync implementation + const connector2 = new TestConnector(); + const syncOptions2: WebStreamingSyncImplementationOptions = { + adapter: new SqliteBucketStorage(db.database, new Mutex()), + remote: new WebRemote(connector1), + uploadCrud: async () => { + await connector2.uploadData(db); + }, + identifier + }; + + const stream2 = new SharedWebStreamingSyncImplementation(syncOptions2); + + const stream2UpdatedPromise = new Promise((resolve, reject) => { + const l = stream2.registerListener({ + statusChanged: (status) => { + if (status.connected) { + resolve(); + l(); + } + } + }); + }); + + // hack to set the status to a new one for tests + (stream1 as any)['_testUpdateStatus'](new SyncStatus({ connected: true })); + + await stream2UpdatedPromise; + expect(stream2.isConnected).true; + + await stream1.dispose(); + await stream2.dispose(); + }); + + it('should trigger uploads from last connected clients', async () => { + // Generate the first streaming sync implementation + const connector1 = new TestConnector(); + const spy1 = vi.spyOn(connector1, 'uploadData'); + + // They need to use the same identifier to use the same shared worker. + const identifier = dbFilename; + + // Resolves once the first connector has been called to upload data + let triggerUpload1: () => void; + const upload1TriggeredPromise = new Promise((resolve) => { + triggerUpload1 = resolve; + }); + + // Create the first streaming client + const syncOptions1: WebStreamingSyncImplementationOptions = { + adapter: new SqliteBucketStorage(db.database, new Mutex()), + remote: new WebRemote(connector1), + uploadCrud: async () => { + triggerUpload1(); + connector1.uploadData(db); + }, + identifier + }; + const stream1 = new SharedWebStreamingSyncImplementation(syncOptions1); + + // Generate the second streaming sync implementation + const connector2 = new TestConnector(); + const spy2 = vi.spyOn(connector2, 'uploadData'); + let triggerUpload2: () => void; + const upload2TriggeredPromise = new Promise((resolve) => { + triggerUpload2 = resolve; + }); + const syncOptions2: WebStreamingSyncImplementationOptions = { + adapter: new SqliteBucketStorage(db.database, new Mutex()), + remote: new WebRemote(connector1), + uploadCrud: async () => { + triggerUpload2(); + connector2.uploadData(db); + }, + identifier + }; + const stream2 = new SharedWebStreamingSyncImplementation(syncOptions2); + + // Waits for the stream to be marked as connected + const stream2UpdatedPromise = new Promise((resolve, reject) => { + const l = stream2.registerListener({ + statusChanged: (status) => { + if (status.connected) { + resolve(); + l(); + } + } + }); + }); + + // hack to set the status to connected for tests + (stream1 as any)['_testUpdateStatus'](new SyncStatus({ connected: true })); + + // The status in the second stream client should be updated + await stream2UpdatedPromise; + expect(stream2.isConnected).true; + + // Manual trigger since tests don't entirely configure watches for ps_crud + stream1.triggerCrudUpload(); + + // Create something with CRUD in it. + await db.execute('INSERT into customers (id, name, email) VALUES (uuid(), ?, ?)', [ + 'steven', + 'steven@journeyapps.com' + ]); + + // The second connector should be called to upload + await upload2TriggeredPromise; + + // It should call the latest connected client + expect(spy2).toHaveBeenCalledOnce(); + + // Close the second client, leaving only the first one + await stream2.dispose(); + + stream1.triggerCrudUpload(); + // It should now upload from the first client + await upload1TriggeredPromise; + + expect(spy1).toHaveBeenCalledOnce(); + + await stream1.dispose(); + }); +}); diff --git a/packages/powersync-sdk-web/tests/stream.test.ts b/packages/powersync-sdk-web/tests/stream.test.ts index 00925406..2e2173e5 100644 --- a/packages/powersync-sdk-web/tests/stream.test.ts +++ b/packages/powersync-sdk-web/tests/stream.test.ts @@ -1,169 +1,94 @@ +import _ from 'lodash'; +import Logger from 'js-logger'; import { beforeAll, describe, expect, it } from 'vitest'; +import { v4 as uuid } from 'uuid'; import { AbstractPowerSyncDatabase, - AbstractRemote, - AbstractStreamingSyncImplementation, Column, ColumnType, - PowerSyncBackendConnector, - PowerSyncCredentials, - PowerSyncDatabaseOptions, - RemoteConnector, Schema, + SyncStatusOptions, Table } from '@journeyapps/powersync-sdk-common'; -import { - PowerSyncDatabase, - WASQLitePowerSyncDatabaseOpenFactory, - WebPowerSyncDatabaseOptions, - WebStreamingSyncImplementation -} from '@journeyapps/powersync-sdk-web'; -import Logger from 'js-logger'; -import { WebPowerSyncOpenFactoryOptions } from 'src/db/adapters/AbstractWebPowerSyncDatabaseOpenFactory'; -import { v4 as uuid } from 'uuid'; - -class TestConnector implements PowerSyncBackendConnector { - async fetchCredentials(): Promise { - return { - endpoint: '', - token: '' - }; - } - async uploadData(database: AbstractPowerSyncDatabase): Promise { - return; - } -} - -class MockRemote extends AbstractRemote { - streamController: ReadableStreamDefaultController | null; - - constructor( - connector: RemoteConnector, - protected onStreamRequested: () => void - ) { - super(connector); - this.streamController = null; - } - - post(path: string, data: any, headers?: Record | undefined): Promise { - throw new Error('Method not implemented.'); - } - get(path: string, headers?: Record | undefined): Promise { - throw new Error('Method not implemented.'); - } - async postStreaming(): Promise { - return new Response(this.generateStream()).body; - } - - private generateStream() { - return new ReadableStream({ - start: (controller) => { - this.streamController = controller; - this.onStreamRequested(); +import { MockRemote, MockStreamOpenFactory, TestConnector } from './utils/MockStreamOpenFactory'; + +export async function waitForConnectionStatus( + db: AbstractPowerSyncDatabase, + statusCheck: SyncStatusOptions = { connected: true } +) { + await new Promise((resolve) => { + if (db.connected) { + resolve(); + } + const l = db.registerListener({ + statusChanged: (status) => { + if (_.every(statusCheck, (value, key) => _.isEqual(status.toJSON()[key as keyof SyncStatusOptions], value))) { + resolve(); + l?.(); + } } }); - } + }); } -class MockedStreamPowerSync extends PowerSyncDatabase { - constructor( - options: WebPowerSyncDatabaseOptions, - protected remote: AbstractRemote - ) { - super(options); - } - - protected generateSyncStreamImplementation( - connector: PowerSyncBackendConnector - ): AbstractStreamingSyncImplementation { - return new WebStreamingSyncImplementation({ - adapter: this.bucketStorageAdapter, - remote: this.remote, - uploadCrud: async () => { - await this.waitForReady(); - await connector.uploadData(this); +export async function generateConnectedDatabase() { + /** + * Very basic implementation of a listener pattern. + * Required since we cannot extend multiple classes. + */ + const callbacks: Map void> = new Map(); + const remote = new MockRemote(new TestConnector(), () => callbacks.forEach((c) => c())); + + const factory = new MockStreamOpenFactory( + { + dbFilename: 'test-stream-connection.db', + flags: { + enableMultiTabs: false }, - identifier: this.options.database.name + schema: new Schema([ + new Table({ + name: 'users', + columns: [new Column({ name: 'name', type: ColumnType.TEXT })] + }) + ]) + }, + remote + ); + const powersync = factory.getInstance(); + + const waitForStream = () => + new Promise((resolve) => { + const id = uuid(); + callbacks.set(id, () => { + resolve(); + callbacks.delete(id); + }); }); - } -} - -class MockOpenFactory extends WASQLitePowerSyncDatabaseOpenFactory { - constructor( - options: WebPowerSyncOpenFactoryOptions, - protected remote: AbstractRemote - ) { - super(options); - } - generateInstance(options: PowerSyncDatabaseOptions): AbstractPowerSyncDatabase { - return new MockedStreamPowerSync( - { - ...options - }, - this.remote - ); - } -} - -describe('Stream test', () => { - beforeAll(() => Logger.useDefaults()); - it('PowerSync reconnect', async () => { - /** - * Very basic implementation of a listener pattern. - * Required since we cannot extend multiple classes. - */ - const callbacks: Map void> = new Map(); - const remote = new MockRemote(new TestConnector(), () => callbacks.forEach((c) => c())); - - const powersync = new MockOpenFactory( - { - dbFilename: 'test-stream-connection.db', - flags: { - enableMultiTabs: false - }, - schema: new Schema([ - new Table({ - name: 'users', - columns: [new Column({ name: 'name', type: ColumnType.TEXT })] - }) - ]) - }, - remote - ).getInstance(); + const streamOpened = waitForStream(); - const waitForStream = () => - new Promise((resolve) => { - const id = uuid(); - callbacks.set(id, () => { - resolve(); - callbacks.delete(id); - }); - }); + powersync.connect(new TestConnector()); - const streamOpened = waitForStream(); + await streamOpened; - powersync.connect(new TestConnector()); + remote.streamController?.enqueue(new TextEncoder().encode('{"token_expires_in":3426}\n')); - await streamOpened; + // Wait for connected to be true + await waitForConnectionStatus(powersync); - remote.streamController?.enqueue(new TextEncoder().encode('{"token_expires_in":3426}\n')); + return { + factory, + powersync, + remote, + waitForStream + }; +} - // Wait for connected to be true - await new Promise((resolve) => { - if (powersync.connected) { - resolve(); - } - const l = powersync.registerListener({ - statusChanged: (status) => { - if (status.connected) { - resolve(); - l?.(); - } - } - }); - }); +describe('Stream test', () => { + beforeAll(() => Logger.useDefaults()); + it('PowerSync reconnect', async () => { + const { powersync, waitForStream, remote } = await generateConnectedDatabase(); expect(powersync.connected).true; // Close the stream diff --git a/packages/powersync-sdk-web/tests/utils/MockStreamOpenFactory.ts b/packages/powersync-sdk-web/tests/utils/MockStreamOpenFactory.ts new file mode 100644 index 00000000..72c5abc2 --- /dev/null +++ b/packages/powersync-sdk-web/tests/utils/MockStreamOpenFactory.ts @@ -0,0 +1,99 @@ +import { + PowerSyncBackendConnector, + PowerSyncCredentials, + AbstractPowerSyncDatabase, + AbstractRemote, + RemoteConnector, + AbstractStreamingSyncImplementation, + PowerSyncDatabaseOptions +} from '@journeyapps/powersync-sdk-common'; +import { + PowerSyncDatabase, + WebPowerSyncDatabaseOptions, + WebStreamingSyncImplementation, + WASQLitePowerSyncDatabaseOpenFactory, + WebPowerSyncOpenFactoryOptions +} from '@journeyapps/powersync-sdk-web'; + +export class TestConnector implements PowerSyncBackendConnector { + async fetchCredentials(): Promise { + return { + endpoint: '', + token: '' + }; + } + async uploadData(database: AbstractPowerSyncDatabase): Promise { + return; + } +} + +export class MockRemote extends AbstractRemote { + streamController: ReadableStreamDefaultController | null; + + constructor( + connector: RemoteConnector, + protected onStreamRequested: () => void + ) { + super(connector); + this.streamController = null; + } + + post(path: string, data: any, headers?: Record | undefined): Promise { + throw new Error('Method not implemented.'); + } + get(path: string, headers?: Record | undefined): Promise { + throw new Error('Method not implemented.'); + } + async postStreaming(): Promise { + return new Response(this.generateStream()).body; + } + + private generateStream() { + return new ReadableStream({ + start: (controller) => { + this.streamController = controller; + this.onStreamRequested(); + } + }); + } +} + +export class MockedStreamPowerSync extends PowerSyncDatabase { + constructor( + options: WebPowerSyncDatabaseOptions, + protected remote: AbstractRemote + ) { + super(options); + } + + protected generateSyncStreamImplementation( + connector: PowerSyncBackendConnector + ): AbstractStreamingSyncImplementation { + return new WebStreamingSyncImplementation({ + adapter: this.bucketStorageAdapter, + remote: this.remote, + uploadCrud: async () => { + await this.waitForReady(); + await connector.uploadData(this); + }, + identifier: this.options.database.name + }); + } +} + +export class MockStreamOpenFactory extends WASQLitePowerSyncDatabaseOpenFactory { + constructor( + options: WebPowerSyncOpenFactoryOptions, + protected remote: AbstractRemote + ) { + super(options); + } + generateInstance(options: PowerSyncDatabaseOptions): AbstractPowerSyncDatabase { + return new MockedStreamPowerSync( + { + ...options + }, + this.remote + ); + } +} diff --git a/packages/powersync-sdk-web/tests/test_schema.ts b/packages/powersync-sdk-web/tests/utils/test-schema.ts similarity index 100% rename from packages/powersync-sdk-web/tests/test_schema.ts rename to packages/powersync-sdk-web/tests/utils/test-schema.ts diff --git a/packages/powersync-sdk-web/tests/watch.test.ts b/packages/powersync-sdk-web/tests/watch.test.ts index a58117d9..15eabd82 100644 --- a/packages/powersync-sdk-web/tests/watch.test.ts +++ b/packages/powersync-sdk-web/tests/watch.test.ts @@ -1,9 +1,9 @@ import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; -vi.useRealTimers(); import { v4 as uuid } from 'uuid'; import { AbstractPowerSyncDatabase } from '@journeyapps/powersync-sdk-common'; import { WASQLitePowerSyncDatabaseOpenFactory } from '@journeyapps/powersync-sdk-web'; -import { testSchema } from './test_schema'; +import { testSchema } from './utils/test-schema'; +vi.useRealTimers(); /** * There seems to be an issue with Vitest browser mode's setTimeout and diff --git a/packages/powersync-sdk-web/tsconfig.json b/packages/powersync-sdk-web/tsconfig.json index 40e6c558..57861c75 100644 --- a/packages/powersync-sdk-web/tsconfig.json +++ b/packages/powersync-sdk-web/tsconfig.json @@ -18,5 +18,10 @@ "target": "es6" /* Specify ECMAScript target version: 'ES3' (default), 'ES5', 'ES2015', 'ES2016', 'ES2017', 'ES2018', 'ES2019', 'ES2020', or 'ESNEXT'. */, "strictNullChecks": true }, + "references": [ + { + "path": "../powersync-sdk-common" + } + ], "include": ["src/**/*", "tests/**/*"] }