Skip to content

Commit c568caf

Browse files
samwilliskevin-dp
andcommitted
Handle pushed down predicates in Electric collection
Co-authored-by: Kevin De Porre <[email protected]> Co-authored-by: Sam Willis <[email protected]>
1 parent 9ad1169 commit c568caf

File tree

6 files changed

+1464
-19
lines changed

6 files changed

+1464
-19
lines changed

.changeset/tender-carpets-cheat.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@tanstack/electric-db-collection": patch
3+
---
4+
5+
Handle predicates that are pushed down.

packages/electric-db-collection/src/electric.ts

Lines changed: 77 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,15 @@ import {
1212
TimeoutWaitingForMatchError,
1313
TimeoutWaitingForTxIdError,
1414
} from "./errors"
15+
import { compileSQL } from "./sql-compiler"
1516
import type {
1617
BaseCollectionConfig,
1718
CollectionConfig,
1819
DeleteMutationFnParams,
1920
InsertMutationFnParams,
21+
LoadSubsetOptions,
2022
SyncConfig,
23+
SyncMode,
2124
UpdateMutationFnParams,
2225
UtilsRecord,
2326
} from "@tanstack/db"
@@ -72,6 +75,24 @@ type InferSchemaOutput<T> = T extends StandardSchemaV1
7275
: Record<string, unknown>
7376
: Record<string, unknown>
7477

78+
/**
79+
* The mode of sync to use for the collection.
80+
* @default `eager`
81+
* @description
82+
* - `eager`:
83+
* - syncs all data immediately on preload
84+
* - collection will be marked as ready once the sync is complete
85+
* - there is no incremental sync
86+
* - `on-demand`:
87+
* - syncs data in incremental snapshots when the collection is queried
88+
* - collection will be marked as ready immediately after the first snapshot is synced
89+
* - `progressive`:
90+
* - syncs all data for the collection in the background
91+
* - uses incremental snapshots during the initial sync to provide a fast path to the data required for queries
92+
* - collection will be marked as ready once the full sync is complete
93+
*/
94+
export type ElectricSyncMode = SyncMode | `progressive`
95+
7596
/**
7697
* Configuration interface for Electric collection options
7798
* @template T - The type of items in the collection
@@ -82,12 +103,13 @@ export interface ElectricCollectionConfig<
82103
TSchema extends StandardSchemaV1 = never,
83104
> extends Omit<
84105
BaseCollectionConfig<T, string | number, TSchema, UtilsRecord, any>,
85-
`onInsert` | `onUpdate` | `onDelete`
106+
`onInsert` | `onUpdate` | `onDelete` | `syncMode`
86107
> {
87108
/**
88109
* Configuration options for the ElectricSQL ShapeStream
89110
*/
90111
shapeOptions: ShapeStreamOptions<GetExtensions<T>>
112+
syncMode?: ElectricSyncMode
91113

92114
/**
93115
* Optional asynchronous handler function called before an insert operation
@@ -281,6 +303,8 @@ export function electricCollectionOptions(
281303
} {
282304
const seenTxids = new Store<Set<Txid>>(new Set([]))
283305
const seenSnapshots = new Store<Array<PostgresSnapshot>>([])
306+
const syncMode = config.syncMode ?? `eager`
307+
const finalSyncMode = syncMode === `progressive` ? `on-demand` : syncMode
284308
const pendingMatches = new Store<
285309
Map<
286310
string,
@@ -331,6 +355,7 @@ export function electricCollectionOptions(
331355
const sync = createElectricSync<any>(config.shapeOptions, {
332356
seenTxids,
333357
seenSnapshots,
358+
syncMode,
334359
pendingMatches,
335360
currentBatchMessages,
336361
removePendingMatches,
@@ -550,6 +575,7 @@ export function electricCollectionOptions(
550575

551576
return {
552577
...restConfig,
578+
syncMode: finalSyncMode,
553579
sync,
554580
onInsert: wrappedOnInsert,
555581
onUpdate: wrappedOnUpdate,
@@ -567,6 +593,7 @@ export function electricCollectionOptions(
567593
function createElectricSync<T extends Row<unknown>>(
568594
shapeOptions: ShapeStreamOptions<GetExtensions<T>>,
569595
options: {
596+
syncMode: ElectricSyncMode
570597
seenTxids: Store<Set<Txid>>
571598
seenSnapshots: Store<Array<PostgresSnapshot>>
572599
pendingMatches: Store<
@@ -590,6 +617,7 @@ function createElectricSync<T extends Row<unknown>>(
590617
const {
591618
seenTxids,
592619
seenSnapshots,
620+
syncMode,
593621
pendingMatches,
594622
currentBatchMessages,
595623
removePendingMatches,
@@ -653,6 +681,15 @@ function createElectricSync<T extends Row<unknown>>(
653681

654682
const stream = new ShapeStream({
655683
...shapeOptions,
684+
// In on-demand mode, we only want to sync changes, so we set the log to `changes_only`
685+
log: syncMode === `on-demand` ? `changes_only` : undefined,
686+
// In on-demand mode, we only need the changes from the point of time the collection was created
687+
// so we default to `now` when there is no saved offset.
688+
offset: shapeOptions.offset
689+
? shapeOptions.offset
690+
: syncMode === `on-demand`
691+
? `now`
692+
: undefined,
656693
signal: abortController.signal,
657694
onError: (errorParams) => {
658695
// Just immediately mark ready if there's an error to avoid blocking
@@ -679,9 +716,11 @@ function createElectricSync<T extends Row<unknown>>(
679716
let transactionStarted = false
680717
const newTxids = new Set<Txid>()
681718
const newSnapshots: Array<PostgresSnapshot> = []
719+
let hasReceivedUpToDate = false // Track if we've completed initial sync in progressive mode
682720

683721
unsubscribeStream = stream.subscribe((messages: Array<Message<T>>) => {
684722
let hasUpToDate = false
723+
let hasSnapshotEnd = false
685724

686725
for (const message of messages) {
687726
// Add message to current batch buffer (for race condition handling)
@@ -746,6 +785,7 @@ function createElectricSync<T extends Row<unknown>>(
746785
})
747786
} else if (isSnapshotEndMessage(message)) {
748787
newSnapshots.push(parseSnapshotMessage(message))
788+
hasSnapshotEnd = true
749789
} else if (isUpToDateMessage(message)) {
750790
hasUpToDate = true
751791
} else if (isMustRefetchMessage(message)) {
@@ -761,12 +801,14 @@ function createElectricSync<T extends Row<unknown>>(
761801

762802
truncate()
763803

764-
// Reset hasUpToDate so we continue accumulating changes until next up-to-date
804+
// Reset flags so we continue accumulating changes until next up-to-date
765805
hasUpToDate = false
806+
hasSnapshotEnd = false
807+
hasReceivedUpToDate = false // Reset for progressive mode - we're starting a new sync
766808
}
767809
}
768810

769-
if (hasUpToDate) {
811+
if (hasUpToDate || hasSnapshotEnd) {
770812
// Clear the current batch buffer since we're now up-to-date
771813
currentBatchMessages.setState(() => [])
772814

@@ -776,8 +818,15 @@ function createElectricSync<T extends Row<unknown>>(
776818
transactionStarted = false
777819
}
778820

779-
// Mark the collection as ready now that sync is up to date
780-
markReady()
821+
if (hasUpToDate || (hasSnapshotEnd && syncMode === `on-demand`)) {
822+
// Mark the collection as ready now that sync is up to date
823+
markReady()
824+
}
825+
826+
// Track that we've received the first up-to-date for progressive mode
827+
if (hasUpToDate) {
828+
hasReceivedUpToDate = true
829+
}
781830

782831
// Always commit txids when we receive up-to-date, regardless of transaction state
783832
seenTxids.setState((currentTxids) => {
@@ -811,12 +860,29 @@ function createElectricSync<T extends Row<unknown>>(
811860
}
812861
})
813862

814-
// Return the unsubscribe function
815-
return () => {
816-
// Unsubscribe from the stream
817-
unsubscribeStream()
818-
// Abort the abort controller to stop the stream
819-
abortController.abort()
863+
// Only set onLoadSubset if the sync mode is not eager, this indicates to the sync
864+
// layer that it can load more data on demand via the requestSnapshot method when,
865+
// the syncMode = `on-demand` or `progressive`
866+
const loadSubset =
867+
syncMode === `eager`
868+
? undefined
869+
: async (opts: LoadSubsetOptions) => {
870+
// In progressive mode, stop requesting snapshots once full sync is complete
871+
if (syncMode === `progressive` && hasReceivedUpToDate) {
872+
return
873+
}
874+
const snapshotParams = compileSQL<T>(opts)
875+
await stream.requestSnapshot(snapshotParams)
876+
}
877+
878+
return {
879+
loadSubset,
880+
cleanup: () => {
881+
// Unsubscribe from the stream
882+
unsubscribeStream()
883+
// Abort the abort controller to stop the stream
884+
abortController.abort()
885+
},
820886
}
821887
},
822888
// Expose the getSyncMetadata function
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
export function serialize(value: unknown): string {
2+
if (typeof value === `string`) {
3+
return `'${value}'`
4+
}
5+
6+
if (typeof value === `number`) {
7+
return value.toString()
8+
}
9+
10+
if (value === null || value === undefined) {
11+
return `NULL`
12+
}
13+
14+
if (typeof value === `boolean`) {
15+
return value ? `true` : `false`
16+
}
17+
18+
if (value instanceof Date) {
19+
return `'${value.toISOString()}'`
20+
}
21+
22+
if (Array.isArray(value)) {
23+
return `ARRAY[${value.map(serialize).join(`,`)}]`
24+
}
25+
26+
throw new Error(`Cannot serialize value: ${JSON.stringify(value)}`)
27+
}

0 commit comments

Comments
 (0)