From 2a2b5924dee11af8684b50bb48d8c973c8dcebb2 Mon Sep 17 00:00:00 2001 From: Wu-Hui Date: Mon, 21 Oct 2024 11:23:31 -0400 Subject: [PATCH] quick hack to integrate with watch. --- packages/firestore/src/api/pipeline.ts | 36 +++++- packages/firestore/src/api/snapshot.ts | 36 ++++++ packages/firestore/src/core/event_manager.ts | 51 +++++++++ .../firestore/src/core/firestore_client.ts | 35 +++++- packages/firestore/src/core/query.ts | 1 + .../firestore/src/core/sync_engine_impl.ts | 104 +++++++++++++++++- packages/firestore/src/lite-api/pipeline.ts | 17 ++- .../firestore/src/local/local_store_impl.ts | 29 ++++- packages/firestore/src/remote/remote_event.ts | 6 + packages/firestore/src/remote/watch_change.ts | 38 +++++-- 10 files changed, 333 insertions(+), 20 deletions(-) diff --git a/packages/firestore/src/api/pipeline.ts b/packages/firestore/src/api/pipeline.ts index 047731b40e5..7322489ed3c 100644 --- a/packages/firestore/src/api/pipeline.ts +++ b/packages/firestore/src/api/pipeline.ts @@ -1,13 +1,20 @@ -import { firestoreClientExecutePipeline } from '../core/firestore_client'; +import { + firestoreClientExecutePipeline, + firestoreClientListenPipeline +} from '../core/firestore_client'; import { Pipeline as LitePipeline } from '../lite-api/pipeline'; import { PipelineResult } from '../lite-api/pipeline-result'; import { DocumentData, DocumentReference } from '../lite-api/reference'; -import { Stage } from '../lite-api/stage'; +import { AddFields, Stage } from '../lite-api/stage'; import { UserDataReader } from '../lite-api/user_data_reader'; import { AbstractUserDataWriter } from '../lite-api/user_data_writer'; import { DocumentKey } from '../model/document_key'; import { ensureFirestoreConfigured, Firestore } from './database'; +import { DocumentSnapshot, PipelineSnapshot } from './snapshot'; +import { FirestoreError } from '../util/error'; +import { Unsubscribe } from './reference_impl'; +import { cast } from '../util/input_validation'; export class Pipeline< AppModelType = DocumentData @@ -94,4 +101,29 @@ export class Pipeline< return docs; }); } + + /** + * @internal + * @private + */ + _onSnapshot(observer: { + next?: (snapshot: PipelineSnapshot) => void; + error?: (error: FirestoreError) => void; + complete?: () => void; + }): Unsubscribe { + this.stages.push( + new AddFields( + this.selectablesToMap([ + '__name__', + '__create_time__', + '__update_time__' + ]) + ) + ); + + const client = ensureFirestoreConfigured(this.db); + firestoreClientListenPipeline(client, this, observer); + + return () => {}; + } } diff --git a/packages/firestore/src/api/snapshot.ts b/packages/firestore/src/api/snapshot.ts index 29e1616b61c..0fdb11dc0c0 100644 --- a/packages/firestore/src/api/snapshot.ts +++ b/packages/firestore/src/api/snapshot.ts @@ -40,6 +40,8 @@ import { Code, FirestoreError } from '../util/error'; import { Firestore } from './database'; import { SnapshotListenOptions } from './reference_impl'; +import { Pipeline } from './pipeline'; +import { PipelineResult } from '../lite-api/pipeline-result'; /** * Converter used by `withConverter()` to transform user objects of type @@ -790,3 +792,37 @@ export function snapshotEqual( return false; } + +export class PipelineSnapshot { + /** + * Metadata about this snapshot, concerning its source and if it has local + * modifications. + */ + readonly metadata: SnapshotMetadata; + + /** + * The query on which you called `get` or `onSnapshot` in order to get this + * `QuerySnapshot`. + */ + readonly pipeline: Pipeline; + + /** @hideconstructor */ + constructor( + readonly _firestore: Firestore, + readonly _userDataWriter: AbstractUserDataWriter, + pipeline: Pipeline, + readonly _snapshot: ViewSnapshot + ) { + this.metadata = new SnapshotMetadata( + _snapshot.hasPendingWrites, + _snapshot.fromCache + ); + this.pipeline = pipeline; + } + + /** An array of all the documents in the `QuerySnapshot`. */ + get results(): Array> { + const result: Array> = []; + return result; + } +} diff --git a/packages/firestore/src/core/event_manager.ts b/packages/firestore/src/core/event_manager.ts index 72d801f3934..5a6f5df0099 100644 --- a/packages/firestore/src/core/event_manager.ts +++ b/packages/firestore/src/core/event_manager.ts @@ -24,6 +24,9 @@ import { ObjectMap } from '../util/obj_map'; import { canonifyQuery, Query, queryEquals, stringifyQuery } from './query'; import { OnlineState } from './types'; import { ChangeType, DocumentViewChange, ViewSnapshot } from './view_snapshot'; +import { Pipeline } from '../api/pipeline'; +import { PipelineSnapshot } from '../api/snapshot'; +import { PipelineResultView } from './sync_engine_impl'; /** * Holds the listeners and the last received ViewSnapshot for a query being @@ -64,6 +67,8 @@ export interface EventManager { onUnlisten?: (query: Query, disableRemoteListen: boolean) => Promise; onFirstRemoteStoreListen?: (query: Query) => Promise; onLastRemoteStoreUnlisten?: (query: Query) => Promise; + // TODO(pipeline): consolidate query and pipeline + onListenPipeline?: (pipeline: PipelineListener) => Promise; terminate(): void; } @@ -85,6 +90,7 @@ export class EventManagerImpl implements EventManager { ) => Promise; /** Callback invoked once all listeners to a Query are removed. */ onUnlisten?: (query: Query, disableRemoteListen: boolean) => Promise; + onListenPipeline?: (pipeline: PipelineListener) => Promise; /** * Callback invoked when a Query starts listening to the remote store, while @@ -123,6 +129,7 @@ function validateEventManager(eventManagerImpl: EventManagerImpl): void { !!eventManagerImpl.onLastRemoteStoreUnlisten, 'onLastRemoteStoreUnlisten not set' ); + debugAssert(!!eventManagerImpl.onListenPipeline, 'onListenPipeline not set'); } const enum ListenerSetupAction { @@ -213,6 +220,25 @@ export async function eventManagerListen( } } +export async function eventManagerListenPipeline( + eventManager: EventManager, + listener: PipelineListener +): Promise { + const eventManagerImpl = debugCast(eventManager, EventManagerImpl); + validateEventManager(eventManagerImpl); + + try { + await eventManagerImpl.onListenPipeline!(listener); + } catch (e) { + const firestoreError = wrapInUserErrorIfRecoverable( + e as Error, + `Initialization of query '${listener.pipeline}' failed` + ); + listener.onError(firestoreError); + return; + } +} + export async function eventManagerUnlisten( eventManager: EventManager, listener: QueryListener @@ -286,6 +312,13 @@ export function eventManagerOnWatchChange( } } +export function eventManagerOnPipelineWatchChange( + eventManager: EventManager, + viewSnaps: PipelineResultView[] +): void { + const eventManagerImpl = debugCast(eventManager, EventManagerImpl); +} + export function eventManagerOnWatchError( eventManager: EventManager, query: Query, @@ -567,3 +600,21 @@ export class QueryListener { return this.options.source !== ListenerDataSource.Cache; } } + +export class PipelineListener { + private snap: PipelineResultView | null = null; + + constructor( + readonly pipeline: Pipeline, + private queryObserver: Observer + ) {} + + onViewSnapshot(snap: PipelineResultView): boolean { + this.snap = snap; + return true; + } + + onError(error: FirestoreError): void { + this.queryObserver.error(error); + } +} diff --git a/packages/firestore/src/core/firestore_client.ts b/packages/firestore/src/core/firestore_client.ts index 57aa99869da..6b09a4c92c7 100644 --- a/packages/firestore/src/core/firestore_client.ts +++ b/packages/firestore/src/core/firestore_client.ts @@ -23,7 +23,8 @@ import { CredentialsProvider } from '../api/credentials'; import { User } from '../auth/user'; -import { Pipeline } from '../lite-api/pipeline'; +import { Pipeline as LitePipeline } from '../lite-api/pipeline'; +import { Pipeline } from '../api/pipeline'; import { LocalStore } from '../local/local_store'; import { localStoreConfigureFieldIndexes, @@ -79,9 +80,11 @@ import { addSnapshotsInSyncListener, EventManager, eventManagerListen, + eventManagerListenPipeline, eventManagerUnlisten, ListenOptions, Observer, + PipelineListener, QueryListener, removeSnapshotsInSyncListener } from './event_manager'; @@ -89,6 +92,7 @@ import { newQueryForPath, Query } from './query'; import { SyncEngine } from './sync_engine'; import { syncEngineListen, + syncEngineListenPipeline, syncEngineLoadBundle, syncEngineRegisterPendingWritesCallback, syncEngineUnlisten, @@ -101,6 +105,8 @@ import { TransactionOptions } from './transaction_options'; import { TransactionRunner } from './transaction_runner'; import { View } from './view'; import { ViewSnapshot } from './view_snapshot'; +import { Unsubscribe } from '../api/reference_impl'; +import { PipelineSnapshot } from '../api/snapshot'; const LOG_TAG = 'FirestoreClient'; export const MAX_CONCURRENT_LIMBO_RESOLUTIONS = 100; @@ -404,6 +410,10 @@ export async function getEventManager( null, onlineComponentProvider.syncEngine ); + eventManager.onListenPipeline = syncEngineListenPipeline.bind( + null, + onlineComponentProvider.syncEngine + ); return eventManager; } @@ -556,7 +566,7 @@ export function firestoreClientRunAggregateQuery( export function firestoreClientExecutePipeline( client: FirestoreClient, - pipeline: Pipeline + pipeline: LitePipeline ): Promise { const deferred = new Deferred(); @@ -571,6 +581,27 @@ export function firestoreClientExecutePipeline( return deferred.promise; } +export function firestoreClientListenPipeline( + client: FirestoreClient, + pipeline: Pipeline, + observer: { + next?: (snapshot: PipelineSnapshot) => void; + error?: (error: FirestoreError) => void; + complete?: () => void; + } +): Unsubscribe { + const wrappedObserver = new AsyncObserver(observer); + const listener = new PipelineListener(pipeline, wrappedObserver); + client.asyncQueue.enqueueAndForget(async () => { + const eventManager = await getEventManager(client); + return eventManagerListenPipeline(eventManager, listener); + }); + return () => { + wrappedObserver.mute(); + // TODO(pipeline): actually unlisten + }; +} + export function firestoreClientWrite( client: FirestoreClient, mutations: Mutation[] diff --git a/packages/firestore/src/core/query.ts b/packages/firestore/src/core/query.ts index b13296ad7ee..87e7e6ce5a6 100644 --- a/packages/firestore/src/core/query.ts +++ b/packages/firestore/src/core/query.ts @@ -35,6 +35,7 @@ import { Target, targetEquals } from './target'; +import { Pipeline } from '../api/pipeline'; export const enum LimitType { First = 'F', diff --git a/packages/firestore/src/core/sync_engine_impl.ts b/packages/firestore/src/core/sync_engine_impl.ts index f96cbea0f00..bf9fe49feac 100644 --- a/packages/firestore/src/core/sync_engine_impl.ts +++ b/packages/firestore/src/core/sync_engine_impl.ts @@ -45,7 +45,8 @@ import { TargetData, TargetPurpose } from '../local/target_data'; import { DocumentKeySet, documentKeySet, - DocumentMap + DocumentMap, + mutableDocumentMap } from '../model/collections'; import { MutableDocument } from '../model/document'; import { DocumentKey } from '../model/document_key'; @@ -81,8 +82,10 @@ import { import { EventManager, eventManagerOnOnlineStateChange, + eventManagerOnPipelineWatchChange, eventManagerOnWatchChange, - eventManagerOnWatchError + eventManagerOnWatchError, + PipelineListener } from './event_manager'; import { ListenSequence } from './listen_sequence'; import { @@ -115,6 +118,9 @@ import { ViewChange } from './view'; import { ViewSnapshot } from './view_snapshot'; +import { Pipeline } from '../api/pipeline'; +import { PipelineSnapshot } from '../api/snapshot'; +import { PipelineResult } from '../lite-api/pipeline-result'; const LOG_TAG = 'SyncEngine'; @@ -143,6 +149,56 @@ class QueryView { ) {} } +export class PipelineResultView { + private keyToIndexMap: Map; + constructor(public pipeline: Pipeline, public view: Array) { + this.keyToIndexMap = new Map(); + this.buildKeyToIndexMap(); + } + + private buildKeyToIndexMap(): void { + this.view.forEach((doc, index) => { + this.keyToIndexMap.set(doc.key, index); + }); + } + + addResult(key: DocumentKey, doc: MutableDocument) { + if (this.keyToIndexMap.has(key)) { + throw new Error(`Result with key ${key} already exists.`); + } + this.view.push(doc); + this.keyToIndexMap.set(key, this.view.length - 1); + } + + removeResult(key: DocumentKey) { + const index = this.keyToIndexMap.get(key); + if (index === undefined) { + return; // Result not found, nothing to remove + } + + // Remove from the array efficiently by swapping with the last element and popping + const lastIndex = this.view.length - 1; + if (index !== lastIndex) { + [this.view[index], this.view[lastIndex]] = [ + this.view[lastIndex], + this.view[index] + ]; + // Update the keyToIndexMap for the swapped element + this.keyToIndexMap.set(this.view[index].key, index); + } + this.view.pop(); + this.keyToIndexMap.delete(key); + } + + updateResult(key: DocumentKey, doc: MutableDocument) { + const index = this.keyToIndexMap.get(key); + if (index === undefined) { + throw new Error(`Result with key ${key} not found.`); + } + this.view[index] = doc; + } +} + /** Tracks a limbo resolution. */ class LimboResolution { constructor(public key: DocumentKey) {} @@ -208,6 +264,9 @@ class SyncEngineImpl implements SyncEngine { queryEquals ); queriesByTarget = new Map(); + // TODO(pipeline): below is a hack for the lack of canonical id for pipelines + pipelineByTarget = new Map(); + pipelineViewByTarget = new Map(); /** * The keys of documents that are in limbo for which we haven't yet started a * limbo resolution query. The strings in this set are the result of calling @@ -285,6 +344,24 @@ export function newSyncEngine( return syncEngine; } +export async function syncEngineListenPipeline( + syncEngine: SyncEngine, + pipeline: PipelineListener +): Promise { + const syncEngineImpl = ensureWatchCallbacks(syncEngine); + const targetData = await localStoreAllocateTarget( + syncEngineImpl.localStore, + pipeline.pipeline + ); + syncEngineImpl.pipelineByTarget.set(targetData.targetId, pipeline); + syncEngineImpl.pipelineViewByTarget.set( + targetData.targetId, + new PipelineResultView(pipeline.pipeline, []) + ); + + remoteStoreListen(syncEngineImpl.remoteStore, targetData); +} + /** * Initiates the new listen, resolves promise when listen enqueued to the * server. All the subsequent view snapshots or errors are sent to the @@ -708,6 +785,7 @@ export async function syncEngineRejectListen( primitiveComparator ), documentUpdates, + mutableDocumentMap(), resolvedLimboDocuments ); @@ -1079,11 +1157,31 @@ export async function syncEngineEmitNewSnapsAndNotifyLocalStore( const docChangesInAllViews: LocalViewChanges[] = []; const queriesProcessed: Array> = []; - if (syncEngineImpl.queryViewsByQuery.isEmpty()) { + if ( + syncEngineImpl.queryViewsByQuery.isEmpty() && + syncEngineImpl.pipelineViewByTarget.size === 0 + ) { // Return early since `onWatchChange()` might not have been assigned yet. return; } + syncEngineImpl.pipelineViewByTarget.forEach((results, targetId) => { + const change = remoteEvent?.targetChanges.get(targetId); + if (!!change) { + change.modifiedDocuments.forEach(key => { + results.updateResult(key, remoteEvent?.augmentedDocumentUpdates.get(key)!); + }); + change.addedDocuments.forEach(key => { + results.addResult(key, remoteEvent?.augmentedDocumentUpdates.get(key)!); + }); + change.removedDocuments.forEach(key => { + results.removeResult(key); + }); + + syncEngineImpl.pipelineByTarget.get(targetId)?.onViewSnapshot(results); + } + }); + syncEngineImpl.queryViewsByQuery.forEach((_, queryView) => { debugAssert( !!syncEngineImpl.applyDocChanges, diff --git a/packages/firestore/src/lite-api/pipeline.ts b/packages/firestore/src/lite-api/pipeline.ts index b1b172baab5..78faf5b2117 100644 --- a/packages/firestore/src/lite-api/pipeline.ts +++ b/packages/firestore/src/lite-api/pipeline.ts @@ -141,7 +141,7 @@ export class Pipeline { * @private */ protected documentReferenceFactory: (id: DocumentKey) => DocumentReference, - private stages: Stage[], + protected stages: Stage[], // TODO(pipeline) support converter //private converter: FirestorePipelineConverter = defaultPipelineConverter() private converter: unknown = {} @@ -234,7 +234,7 @@ export class Pipeline { ); } - private selectablesToMap( + protected selectablesToMap( selectables: Array ): Map { const result = new Map(); @@ -815,10 +815,21 @@ export class Pipeline { * @internal * @private */ - _toStructuredPipeline(jsonProtoSerializer: JsonProtoSerializer): StructuredPipeline { + _toStructuredPipeline( + jsonProtoSerializer: JsonProtoSerializer + ): StructuredPipeline { const stages: ProtoStage[] = this.stages.map(stage => stage._toProto(jsonProtoSerializer) ); return { pipeline: { stages } }; } + + /** + * @internal + * @private + */ + // TODO(pipeline): do better than this + _toCanonicalId(jsonProtoSerializer: JsonProtoSerializer): String { + return JSON.stringify(this._toStructuredPipeline(jsonProtoSerializer)); + } } diff --git a/packages/firestore/src/local/local_store_impl.ts b/packages/firestore/src/local/local_store_impl.ts index cfcabdc20cf..15db9406a81 100644 --- a/packages/firestore/src/local/local_store_impl.ts +++ b/packages/firestore/src/local/local_store_impl.ts @@ -24,7 +24,12 @@ import { queryToTarget } from '../core/query'; import { SnapshotVersion } from '../core/snapshot_version'; -import { canonifyTarget, Target, targetEquals } from '../core/target'; +import { + canonifyTarget, + Target, + targetEquals, + targetIsPipelineTarget +} from '../core/target'; import { BatchId, TargetId } from '../core/types'; import { Timestamp } from '../lite-api/timestamp'; import { @@ -90,6 +95,7 @@ import { ClientId } from './shared_client_state'; import { isIndexedDbTransactionError } from './simple_db'; import { TargetCache } from './target_cache'; import { TargetData, TargetPurpose } from './target_data'; +import { Pipeline } from '../api/pipeline'; export const LOG_TAG = 'LocalStore'; @@ -935,9 +941,28 @@ export function localStoreReadDocument( */ export function localStoreAllocateTarget( localStore: LocalStore, - target: Target + target: Target | Pipeline ): Promise { const localStoreImpl = debugCast(localStore, LocalStoreImpl); + if (targetIsPipelineTarget(target)) { + return localStoreImpl.persistence.runTransaction( + 'Allocate pipeline target', + 'readwrite', + txn => { + return localStoreImpl.targetCache + .allocateTargetId(txn) + .next(targetId => { + return new TargetData( + target, + targetId, + TargetPurpose.Listen, + txn.currentSequenceNumber + ); + }); + } + ); + } + return localStoreImpl.persistence .runTransaction('Allocate target', 'readwrite', txn => { let targetData: TargetData; diff --git a/packages/firestore/src/remote/remote_event.ts b/packages/firestore/src/remote/remote_event.ts index 49b2ef56a97..6af7861ee96 100644 --- a/packages/firestore/src/remote/remote_event.ts +++ b/packages/firestore/src/remote/remote_event.ts @@ -54,6 +54,11 @@ export class RemoteEvent { * doc's new values (if not deleted). */ readonly documentUpdates: MutableDocumentMap, + /** + * A set of which augmented documents (pipeline) have changed or been deleted, along with the + * doc's new values (if not deleted). + */ + readonly augmentedDocumentUpdates: MutableDocumentMap, /** * A set of which document updates are due only to limbo resolution targets. */ @@ -86,6 +91,7 @@ export class RemoteEvent { targetChanges, new SortedMap(primitiveComparator), mutableDocumentMap(), + mutableDocumentMap(), documentKeySet() ); } diff --git a/packages/firestore/src/remote/watch_change.ts b/packages/firestore/src/remote/watch_change.ts index c73f2302d19..dd595c9863d 100644 --- a/packages/firestore/src/remote/watch_change.ts +++ b/packages/firestore/src/remote/watch_change.ts @@ -292,6 +292,9 @@ export class WatchChangeAggregator { /** Keeps track of the documents to update since the last raised snapshot. */ private pendingDocumentUpdates = mutableDocumentMap(); + /** Keeps track of the augmented documents to update since the last raised snapshot. */ + private pendingAugmentedDocumentUpdates = mutableDocumentMap(); + /** A mapping of document keys to their set of target IDs. */ private pendingDocumentTargetMapping = documentTargetMap(); @@ -651,16 +654,21 @@ export class WatchChangeAggregator { this.pendingDocumentUpdates.forEach((_, doc) => doc.setReadTime(snapshotVersion) ); + this.pendingAugmentedDocumentUpdates.forEach((_, doc) => + doc.setReadTime(snapshotVersion) + ); const remoteEvent = new RemoteEvent( snapshotVersion, targetChanges, this.pendingTargetResets, this.pendingDocumentUpdates, + this.pendingAugmentedDocumentUpdates, resolvedLimboDocuments ); this.pendingDocumentUpdates = mutableDocumentMap(); + this.pendingAugmentedDocumentUpdates = mutableDocumentMap(); this.pendingDocumentTargetMapping = documentTargetMap(); this.pendingTargetResets = new SortedMap( primitiveComparator @@ -686,10 +694,17 @@ export class WatchChangeAggregator { const targetState = this.ensureTargetState(targetId); targetState.addDocumentChange(document.key, changeType); - this.pendingDocumentUpdates = this.pendingDocumentUpdates.insert( - document.key, - document - ); + if ( + targetIsPipelineTarget(this.targetDataForActiveTarget(targetId)!.target) + ) { + this.pendingAugmentedDocumentUpdates = + this.pendingAugmentedDocumentUpdates.insert(document.key, document); + } else { + this.pendingDocumentUpdates = this.pendingDocumentUpdates.insert( + document.key, + document + ); + } this.pendingDocumentTargetMapping = this.pendingDocumentTargetMapping.insert( @@ -731,10 +746,17 @@ export class WatchChangeAggregator { ); if (updatedDocument) { - this.pendingDocumentUpdates = this.pendingDocumentUpdates.insert( - key, - updatedDocument - ); + if ( + targetIsPipelineTarget(this.targetDataForActiveTarget(targetId)!.target) + ) { + this.pendingAugmentedDocumentUpdates = + this.pendingAugmentedDocumentUpdates.insert(key, updatedDocument); + } else { + this.pendingDocumentUpdates = this.pendingDocumentUpdates.insert( + key, + updatedDocument + ); + } } }