Skip to content

Commit cb47bea

Browse files
kevin-dpsamwillis
andauthored
Handle pushed down predicates in Electric collection (#618)
* Handle pushed down predicates in Electric collection Co-authored-by: Kevin De Porre <[email protected]> Co-authored-by: Sam Willis <[email protected]> * use the subsetDuduper for electric * Leave fixme * fix DeduplicatedLoadSubset call * fix tests --------- Co-authored-by: Sam Willis <[email protected]>
1 parent 0c6fa6b commit cb47bea

File tree

8 files changed

+1793
-21
lines changed

8 files changed

+1793
-21
lines changed

.changeset/tender-carpets-cheat.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@tanstack/electric-db-collection": patch
3+
---
4+
5+
Handle predicates that are pushed down.

packages/electric-db-collection/src/electric.ts

Lines changed: 84 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -6,18 +6,22 @@ import {
66
} from "@electric-sql/client"
77
import { Store } from "@tanstack/store"
88
import DebugModule from "debug"
9+
import { DeduplicatedLoadSubset } from "@tanstack/db"
910
import {
1011
ExpectedNumberInAwaitTxIdError,
1112
StreamAbortedError,
1213
TimeoutWaitingForMatchError,
1314
TimeoutWaitingForTxIdError,
1415
} from "./errors"
16+
import { compileSQL } from "./sql-compiler"
1517
import type {
1618
BaseCollectionConfig,
1719
CollectionConfig,
1820
DeleteMutationFnParams,
1921
InsertMutationFnParams,
22+
LoadSubsetOptions,
2023
SyncConfig,
24+
SyncMode,
2125
UpdateMutationFnParams,
2226
UtilsRecord,
2327
} from "@tanstack/db"
@@ -72,6 +76,24 @@ type InferSchemaOutput<T> = T extends StandardSchemaV1
7276
: Record<string, unknown>
7377
: Record<string, unknown>
7478

79+
/**
80+
* The mode of sync to use for the collection.
81+
* @default `eager`
82+
* @description
83+
* - `eager`:
84+
* - syncs all data immediately on preload
85+
* - collection will be marked as ready once the sync is complete
86+
* - there is no incremental sync
87+
* - `on-demand`:
88+
* - syncs data in incremental snapshots when the collection is queried
89+
* - collection will be marked as ready immediately after the first snapshot is synced
90+
* - `progressive`:
91+
* - syncs all data for the collection in the background
92+
* - uses incremental snapshots during the initial sync to provide a fast path to the data required for queries
93+
* - collection will be marked as ready once the full sync is complete
94+
*/
95+
export type ElectricSyncMode = SyncMode | `progressive`
96+
7597
/**
7698
* Configuration interface for Electric collection options
7799
* @template T - The type of items in the collection
@@ -82,12 +104,13 @@ export interface ElectricCollectionConfig<
82104
TSchema extends StandardSchemaV1 = never,
83105
> extends Omit<
84106
BaseCollectionConfig<T, string | number, TSchema, UtilsRecord, any>,
85-
`onInsert` | `onUpdate` | `onDelete`
107+
`onInsert` | `onUpdate` | `onDelete` | `syncMode`
86108
> {
87109
/**
88110
* Configuration options for the ElectricSQL ShapeStream
89111
*/
90112
shapeOptions: ShapeStreamOptions<GetExtensions<T>>
113+
syncMode?: ElectricSyncMode
91114

92115
/**
93116
* Optional asynchronous handler function called before an insert operation
@@ -281,6 +304,9 @@ export function electricCollectionOptions(
281304
} {
282305
const seenTxids = new Store<Set<Txid>>(new Set([]))
283306
const seenSnapshots = new Store<Array<PostgresSnapshot>>([])
307+
const internalSyncMode = config.syncMode ?? `eager`
308+
const finalSyncMode =
309+
internalSyncMode === `progressive` ? `on-demand` : internalSyncMode
284310
const pendingMatches = new Store<
285311
Map<
286312
string,
@@ -331,6 +357,7 @@ export function electricCollectionOptions(
331357
const sync = createElectricSync<any>(config.shapeOptions, {
332358
seenTxids,
333359
seenSnapshots,
360+
syncMode: internalSyncMode,
334361
pendingMatches,
335362
currentBatchMessages,
336363
removePendingMatches,
@@ -550,6 +577,7 @@ export function electricCollectionOptions(
550577

551578
return {
552579
...restConfig,
580+
syncMode: finalSyncMode,
553581
sync,
554582
onInsert: wrappedOnInsert,
555583
onUpdate: wrappedOnUpdate,
@@ -567,6 +595,7 @@ export function electricCollectionOptions(
567595
function createElectricSync<T extends Row<unknown>>(
568596
shapeOptions: ShapeStreamOptions<GetExtensions<T>>,
569597
options: {
598+
syncMode: ElectricSyncMode
570599
seenTxids: Store<Set<Txid>>
571600
seenSnapshots: Store<Array<PostgresSnapshot>>
572601
pendingMatches: Store<
@@ -590,6 +619,7 @@ function createElectricSync<T extends Row<unknown>>(
590619
const {
591620
seenTxids,
592621
seenSnapshots,
622+
syncMode,
593623
pendingMatches,
594624
currentBatchMessages,
595625
removePendingMatches,
@@ -653,6 +683,12 @@ function createElectricSync<T extends Row<unknown>>(
653683

654684
const stream = new ShapeStream({
655685
...shapeOptions,
686+
// In on-demand mode, we only want to sync changes, so we set the log to `changes_only`
687+
log: syncMode === `on-demand` ? `changes_only` : undefined,
688+
// In on-demand mode, we only need the changes from the point of time the collection was created
689+
// so we default to `now` when there is no saved offset.
690+
offset:
691+
shapeOptions.offset ?? (syncMode === `on-demand` ? `now` : undefined),
656692
signal: abortController.signal,
657693
onError: (errorParams) => {
658694
// Just immediately mark ready if there's an error to avoid blocking
@@ -679,9 +715,28 @@ function createElectricSync<T extends Row<unknown>>(
679715
let transactionStarted = false
680716
const newTxids = new Set<Txid>()
681717
const newSnapshots: Array<PostgresSnapshot> = []
718+
let hasReceivedUpToDate = false // Track if we've completed initial sync in progressive mode
719+
720+
// Create deduplicated loadSubset wrapper for non-eager modes
721+
// This prevents redundant snapshot requests when multiple concurrent
722+
// live queries request overlapping or subset predicates
723+
const loadSubsetDedupe =
724+
syncMode === `eager`
725+
? null
726+
: new DeduplicatedLoadSubset({
727+
loadSubset: async (opts: LoadSubsetOptions) => {
728+
// In progressive mode, stop requesting snapshots once full sync is complete
729+
if (syncMode === `progressive` && hasReceivedUpToDate) {
730+
return
731+
}
732+
const snapshotParams = compileSQL<T>(opts)
733+
await stream.requestSnapshot(snapshotParams)
734+
},
735+
})
682736

683737
unsubscribeStream = stream.subscribe((messages: Array<Message<T>>) => {
684738
let hasUpToDate = false
739+
let hasSnapshotEnd = false
685740

686741
for (const message of messages) {
687742
// Add message to current batch buffer (for race condition handling)
@@ -746,6 +801,7 @@ function createElectricSync<T extends Row<unknown>>(
746801
})
747802
} else if (isSnapshotEndMessage(message)) {
748803
newSnapshots.push(parseSnapshotMessage(message))
804+
hasSnapshotEnd = true
749805
} else if (isUpToDateMessage(message)) {
750806
hasUpToDate = true
751807
} else if (isMustRefetchMessage(message)) {
@@ -761,12 +817,18 @@ function createElectricSync<T extends Row<unknown>>(
761817

762818
truncate()
763819

764-
// Reset hasUpToDate so we continue accumulating changes until next up-to-date
820+
// Reset the loadSubset deduplication state since we're starting fresh
821+
// This ensures that previously loaded predicates don't prevent refetching after truncate
822+
loadSubsetDedupe?.reset()
823+
824+
// Reset flags so we continue accumulating changes until next up-to-date
765825
hasUpToDate = false
826+
hasSnapshotEnd = false
827+
hasReceivedUpToDate = false // Reset for progressive mode - we're starting a new sync
766828
}
767829
}
768830

769-
if (hasUpToDate) {
831+
if (hasUpToDate || hasSnapshotEnd) {
770832
// Clear the current batch buffer since we're now up-to-date
771833
currentBatchMessages.setState(() => [])
772834

@@ -776,8 +838,15 @@ function createElectricSync<T extends Row<unknown>>(
776838
transactionStarted = false
777839
}
778840

779-
// Mark the collection as ready now that sync is up to date
780-
markReady()
841+
if (hasUpToDate || (hasSnapshotEnd && syncMode === `on-demand`)) {
842+
// Mark the collection as ready now that sync is up to date
843+
markReady()
844+
}
845+
846+
// Track that we've received the first up-to-date for progressive mode
847+
if (hasUpToDate) {
848+
hasReceivedUpToDate = true
849+
}
781850

782851
// Always commit txids when we receive up-to-date, regardless of transaction state
783852
seenTxids.setState((currentTxids) => {
@@ -811,12 +880,16 @@ function createElectricSync<T extends Row<unknown>>(
811880
}
812881
})
813882

814-
// Return the unsubscribe function
815-
return () => {
816-
// Unsubscribe from the stream
817-
unsubscribeStream()
818-
// Abort the abort controller to stop the stream
819-
abortController.abort()
883+
// Return the deduplicated loadSubset if available (on-demand or progressive mode)
884+
// The loadSubset method is auto-bound, so it can be safely returned directly
885+
return {
886+
loadSubset: loadSubsetDedupe?.loadSubset,
887+
cleanup: () => {
888+
// Unsubscribe from the stream
889+
unsubscribeStream()
890+
// Abort the abort controller to stop the stream
891+
abortController.abort()
892+
},
820893
}
821894
},
822895
// Expose the getSyncMetadata function
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
export function serialize(value: unknown): string {
2+
if (typeof value === `string`) {
3+
return `'${value}'`
4+
}
5+
6+
if (typeof value === `number`) {
7+
return value.toString()
8+
}
9+
10+
if (value === null || value === undefined) {
11+
return `NULL`
12+
}
13+
14+
if (typeof value === `boolean`) {
15+
return value ? `true` : `false`
16+
}
17+
18+
if (value instanceof Date) {
19+
return `'${value.toISOString()}'`
20+
}
21+
22+
if (Array.isArray(value)) {
23+
return `ARRAY[${value.map(serialize).join(`,`)}]`
24+
}
25+
26+
throw new Error(`Cannot serialize value: ${JSON.stringify(value)}`)
27+
}

0 commit comments

Comments
 (0)