Skip to content

Commit f7211d9

Browse files
committed
use the subsetDuduper for electric
1 parent 126f6b9 commit f7211d9

File tree

4 files changed

+373
-51
lines changed

4 files changed

+373
-51
lines changed

packages/electric-db-collection/src/electric.ts

Lines changed: 23 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import {
66
} from "@electric-sql/client"
77
import { Store } from "@tanstack/store"
88
import DebugModule from "debug"
9+
import { DeduplicatedLoadSubset } from "@tanstack/db"
910
import {
1011
ExpectedNumberInAwaitTxIdError,
1112
StreamAbortedError,
@@ -716,6 +717,21 @@ function createElectricSync<T extends Row<unknown>>(
716717
const newSnapshots: Array<PostgresSnapshot> = []
717718
let hasReceivedUpToDate = false // Track if we've completed initial sync in progressive mode
718719

720+
// Create deduplicated loadSubset wrapper for non-eager modes
721+
// This prevents redundant snapshot requests when multiple concurrent
722+
// live queries request overlapping or subset predicates
723+
const loadSubsetDedupe =
724+
syncMode === `eager`
725+
? null
726+
: new DeduplicatedLoadSubset(async (opts: LoadSubsetOptions) => {
727+
// In progressive mode, stop requesting snapshots once full sync is complete
728+
if (syncMode === `progressive` && hasReceivedUpToDate) {
729+
return
730+
}
731+
const snapshotParams = compileSQL<T>(opts)
732+
await stream.requestSnapshot(snapshotParams)
733+
})
734+
719735
unsubscribeStream = stream.subscribe((messages: Array<Message<T>>) => {
720736
let hasUpToDate = false
721737
let hasSnapshotEnd = false
@@ -799,6 +815,10 @@ function createElectricSync<T extends Row<unknown>>(
799815

800816
truncate()
801817

818+
// Reset the loadSubset deduplication state since we're starting fresh
819+
// This ensures that previously loaded predicates don't prevent refetching after truncate
820+
loadSubsetDedupe?.reset()
821+
802822
// Reset flags so we continue accumulating changes until next up-to-date
803823
hasUpToDate = false
804824
hasSnapshotEnd = false
@@ -858,23 +878,10 @@ function createElectricSync<T extends Row<unknown>>(
858878
}
859879
})
860880

861-
// Only set onLoadSubset if the sync mode is not eager, this indicates to the sync
862-
// layer that it can load more data on demand via the requestSnapshot method when,
863-
// the syncMode = `on-demand` or `progressive`
864-
const loadSubset =
865-
syncMode === `eager`
866-
? undefined
867-
: async (opts: LoadSubsetOptions) => {
868-
// In progressive mode, stop requesting snapshots once full sync is complete
869-
if (syncMode === `progressive` && hasReceivedUpToDate) {
870-
return
871-
}
872-
const snapshotParams = compileSQL<T>(opts)
873-
await stream.requestSnapshot(snapshotParams)
874-
}
875-
881+
// Return the deduplicated loadSubset if available (on-demand or progressive mode)
882+
// The loadSubset method is auto-bound, so it can be safely returned directly
876883
return {
877-
loadSubset,
884+
loadSubset: loadSubsetDedupe?.loadSubset,
878885
cleanup: () => {
879886
// Unsubscribe from the stream
880887
unsubscribeStream()

0 commit comments

Comments
 (0)