@@ -12,12 +12,15 @@ import {
1212 TimeoutWaitingForMatchError ,
1313 TimeoutWaitingForTxIdError ,
1414} from "./errors"
15+ import { compileSQL } from "./sql-compiler"
1516import type {
1617 BaseCollectionConfig ,
1718 CollectionConfig ,
1819 DeleteMutationFnParams ,
1920 InsertMutationFnParams ,
21+ LoadSubsetOptions ,
2022 SyncConfig ,
23+ SyncMode ,
2124 UpdateMutationFnParams ,
2225 UtilsRecord ,
2326} from "@tanstack/db"
@@ -72,6 +75,24 @@ type InferSchemaOutput<T> = T extends StandardSchemaV1
7275 : Record < string , unknown >
7376 : Record < string , unknown >
7477
78+ /**
79+ * The mode of sync to use for the collection.
80+ * @default `eager`
81+ * @description
82+ * - `eager`:
83+ * - syncs all data immediately on preload
84+ * - collection will be marked as ready once the sync is complete
85+ * - there is no incremental sync
86+ * - `on-demand`:
87+ * - syncs data in incremental snapshots when the collection is queried
88+ * - collection will be marked as ready immediately after the first snapshot is synced
89+ * - `progressive`:
90+ * - syncs all data for the collection in the background
91+ * - uses incremental snapshots during the initial sync to provide a fast path to the data required for queries
92+ * - collection will be marked as ready once the full sync is complete
93+ */
94+ export type ElectricSyncMode = SyncMode | `progressive`
95+
7596/**
7697 * Configuration interface for Electric collection options
7798 * @template T - The type of items in the collection
@@ -82,12 +103,13 @@ export interface ElectricCollectionConfig<
82103 TSchema extends StandardSchemaV1 = never ,
83104> extends Omit <
84105 BaseCollectionConfig < T , string | number , TSchema , UtilsRecord , any > ,
85- `onInsert` | `onUpdate` | `onDelete`
106+ `onInsert` | `onUpdate` | `onDelete` | `syncMode`
86107 > {
87108 /**
88109 * Configuration options for the ElectricSQL ShapeStream
89110 */
90111 shapeOptions : ShapeStreamOptions < GetExtensions < T > >
112+ syncMode ?: ElectricSyncMode
91113
92114 /**
93115 * Optional asynchronous handler function called before an insert operation
@@ -281,6 +303,9 @@ export function electricCollectionOptions(
281303} {
282304 const seenTxids = new Store < Set < Txid > > ( new Set ( [ ] ) )
283305 const seenSnapshots = new Store < Array < PostgresSnapshot > > ( [ ] )
306+ const internalSyncMode = config . syncMode ?? `eager`
307+ const finalSyncMode =
308+ internalSyncMode === `progressive` ? `on-demand` : internalSyncMode
284309 const pendingMatches = new Store <
285310 Map <
286311 string ,
@@ -331,6 +356,7 @@ export function electricCollectionOptions(
331356 const sync = createElectricSync < any > ( config . shapeOptions , {
332357 seenTxids,
333358 seenSnapshots,
359+ syncMode : internalSyncMode ,
334360 pendingMatches,
335361 currentBatchMessages,
336362 removePendingMatches,
@@ -550,6 +576,7 @@ export function electricCollectionOptions(
550576
551577 return {
552578 ...restConfig ,
579+ syncMode : finalSyncMode ,
553580 sync,
554581 onInsert : wrappedOnInsert ,
555582 onUpdate : wrappedOnUpdate ,
@@ -567,6 +594,7 @@ export function electricCollectionOptions(
567594function createElectricSync < T extends Row < unknown > > (
568595 shapeOptions : ShapeStreamOptions < GetExtensions < T > > ,
569596 options : {
597+ syncMode : ElectricSyncMode
570598 seenTxids : Store < Set < Txid > >
571599 seenSnapshots : Store < Array < PostgresSnapshot > >
572600 pendingMatches : Store <
@@ -590,6 +618,7 @@ function createElectricSync<T extends Row<unknown>>(
590618 const {
591619 seenTxids,
592620 seenSnapshots,
621+ syncMode,
593622 pendingMatches,
594623 currentBatchMessages,
595624 removePendingMatches,
@@ -653,6 +682,12 @@ function createElectricSync<T extends Row<unknown>>(
653682
654683 const stream = new ShapeStream ( {
655684 ...shapeOptions ,
685+ // In on-demand mode, we only want to sync changes, so we set the log to `changes_only`
686+ log : syncMode === `on-demand` ? `changes_only` : undefined ,
687+ // In on-demand mode, we only need the changes from the point of time the collection was created
688+ // so we default to `now` when there is no saved offset.
689+ offset :
690+ shapeOptions . offset ?? ( syncMode === `on-demand` ? `now` : undefined ) ,
656691 signal : abortController . signal ,
657692 onError : ( errorParams ) => {
658693 // Just immediately mark ready if there's an error to avoid blocking
@@ -679,9 +714,11 @@ function createElectricSync<T extends Row<unknown>>(
679714 let transactionStarted = false
680715 const newTxids = new Set < Txid > ( )
681716 const newSnapshots : Array < PostgresSnapshot > = [ ]
717+ let hasReceivedUpToDate = false // Track if we've completed initial sync in progressive mode
682718
683719 unsubscribeStream = stream . subscribe ( ( messages : Array < Message < T > > ) => {
684720 let hasUpToDate = false
721+ let hasSnapshotEnd = false
685722
686723 for ( const message of messages ) {
687724 // Add message to current batch buffer (for race condition handling)
@@ -746,6 +783,7 @@ function createElectricSync<T extends Row<unknown>>(
746783 } )
747784 } else if ( isSnapshotEndMessage ( message ) ) {
748785 newSnapshots . push ( parseSnapshotMessage ( message ) )
786+ hasSnapshotEnd = true
749787 } else if ( isUpToDateMessage ( message ) ) {
750788 hasUpToDate = true
751789 } else if ( isMustRefetchMessage ( message ) ) {
@@ -761,12 +799,14 @@ function createElectricSync<T extends Row<unknown>>(
761799
762800 truncate ( )
763801
764- // Reset hasUpToDate so we continue accumulating changes until next up-to-date
802+ // Reset flags so we continue accumulating changes until next up-to-date
765803 hasUpToDate = false
804+ hasSnapshotEnd = false
805+ hasReceivedUpToDate = false // Reset for progressive mode - we're starting a new sync
766806 }
767807 }
768808
769- if ( hasUpToDate ) {
809+ if ( hasUpToDate || hasSnapshotEnd ) {
770810 // Clear the current batch buffer since we're now up-to-date
771811 currentBatchMessages . setState ( ( ) => [ ] )
772812
@@ -776,8 +816,15 @@ function createElectricSync<T extends Row<unknown>>(
776816 transactionStarted = false
777817 }
778818
779- // Mark the collection as ready now that sync is up to date
780- markReady ( )
819+ if ( hasUpToDate || ( hasSnapshotEnd && syncMode === `on-demand` ) ) {
820+ // Mark the collection as ready now that sync is up to date
821+ markReady ( )
822+ }
823+
824+ // Track that we've received the first up-to-date for progressive mode
825+ if ( hasUpToDate ) {
826+ hasReceivedUpToDate = true
827+ }
781828
782829 // Always commit txids when we receive up-to-date, regardless of transaction state
783830 seenTxids . setState ( ( currentTxids ) => {
@@ -811,12 +858,29 @@ function createElectricSync<T extends Row<unknown>>(
811858 }
812859 } )
813860
814- // Return the unsubscribe function
815- return ( ) => {
816- // Unsubscribe from the stream
817- unsubscribeStream ( )
818- // Abort the abort controller to stop the stream
819- abortController . abort ( )
861+ // Only set onLoadSubset if the sync mode is not eager, this indicates to the sync
862+ // layer that it can load more data on demand via the requestSnapshot method when,
863+ // the syncMode = `on-demand` or `progressive`
864+ const loadSubset =
865+ syncMode === `eager`
866+ ? undefined
867+ : async ( opts : LoadSubsetOptions ) => {
868+ // In progressive mode, stop requesting snapshots once full sync is complete
869+ if ( syncMode === `progressive` && hasReceivedUpToDate ) {
870+ return
871+ }
872+ const snapshotParams = compileSQL < T > ( opts )
873+ await stream . requestSnapshot ( snapshotParams )
874+ }
875+
876+ return {
877+ loadSubset,
878+ cleanup : ( ) => {
879+ // Unsubscribe from the stream
880+ unsubscribeStream ( )
881+ // Abort the abort controller to stop the stream
882+ abortController . abort ( )
883+ } ,
820884 }
821885 } ,
822886 // Expose the getSyncMetadata function
0 commit comments