Skip to content

Commit

Permalink
quick hack to integrate with watch.
Browse files Browse the repository at this point in the history
  • Loading branch information
wu-hui committed Nov 1, 2024
1 parent 8ac835e commit 422723a
Show file tree
Hide file tree
Showing 10 changed files with 333 additions and 20 deletions.
36 changes: 34 additions & 2 deletions packages/firestore/src/api/pipeline.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,20 @@
import { firestoreClientExecutePipeline } from '../core/firestore_client';
import {
firestoreClientExecutePipeline,
firestoreClientListenPipeline
} from '../core/firestore_client';
import { Pipeline as LitePipeline } from '../lite-api/pipeline';
import { PipelineResult } from '../lite-api/pipeline-result';
import { DocumentData, DocumentReference } from '../lite-api/reference';
import { Stage } from '../lite-api/stage';
import { AddFields, Stage } from '../lite-api/stage';
import { UserDataReader } from '../lite-api/user_data_reader';
import { AbstractUserDataWriter } from '../lite-api/user_data_writer';
import { DocumentKey } from '../model/document_key';

import { ensureFirestoreConfigured, Firestore } from './database';
import { DocumentSnapshot, PipelineSnapshot } from './snapshot';
import { FirestoreError } from '../util/error';
import { Unsubscribe } from './reference_impl';
import { cast } from '../util/input_validation';

export class Pipeline<
AppModelType = DocumentData
Expand Down Expand Up @@ -94,4 +101,29 @@ export class Pipeline<
return docs;
});
}

/**
* @internal
* @private
*/
_onSnapshot(observer: {
next?: (snapshot: PipelineSnapshot) => void;
error?: (error: FirestoreError) => void;
complete?: () => void;
}): Unsubscribe {
this.stages.push(
new AddFields(
this.selectablesToMap([
'__name__',
'__create_time__',
'__update_time__'
])
)
);

const client = ensureFirestoreConfigured(this.db);
firestoreClientListenPipeline(client, this, observer);

return () => {};
}
}
36 changes: 36 additions & 0 deletions packages/firestore/src/api/snapshot.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ import { Code, FirestoreError } from '../util/error';

import { Firestore } from './database';
import { SnapshotListenOptions } from './reference_impl';
import { Pipeline } from './pipeline';
import { PipelineResult } from '../lite-api/pipeline-result';

/**
* Converter used by `withConverter()` to transform user objects of type
Expand Down Expand Up @@ -790,3 +792,37 @@ export function snapshotEqual<AppModelType, DbModelType extends DocumentData>(

return false;
}

export class PipelineSnapshot<AppModelType = DocumentData> {
/**
* Metadata about this snapshot, concerning its source and if it has local
* modifications.
*/
readonly metadata: SnapshotMetadata;

/**
* The query on which you called `get` or `onSnapshot` in order to get this
* `QuerySnapshot`.
*/
readonly pipeline: Pipeline<AppModelType>;

/** @hideconstructor */
constructor(
readonly _firestore: Firestore,
readonly _userDataWriter: AbstractUserDataWriter,
pipeline: Pipeline<AppModelType>,
readonly _snapshot: ViewSnapshot
) {
this.metadata = new SnapshotMetadata(
_snapshot.hasPendingWrites,
_snapshot.fromCache
);
this.pipeline = pipeline;
}

/** An array of all the documents in the `QuerySnapshot`. */
get results(): Array<PipelineResult<AppModelType>> {
const result: Array<PipelineResult<AppModelType>> = [];
return result;
}
}
51 changes: 51 additions & 0 deletions packages/firestore/src/core/event_manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ import { ObjectMap } from '../util/obj_map';
import { canonifyQuery, Query, queryEquals, stringifyQuery } from './query';
import { OnlineState } from './types';
import { ChangeType, DocumentViewChange, ViewSnapshot } from './view_snapshot';
import { Pipeline } from '../api/pipeline';
import { PipelineSnapshot } from '../api/snapshot';
import { PipelineResultView } from './sync_engine_impl';

/**
* Holds the listeners and the last received ViewSnapshot for a query being
Expand Down Expand Up @@ -64,6 +67,8 @@ export interface EventManager {
onUnlisten?: (query: Query, disableRemoteListen: boolean) => Promise<void>;
onFirstRemoteStoreListen?: (query: Query) => Promise<void>;
onLastRemoteStoreUnlisten?: (query: Query) => Promise<void>;
// TODO(pipeline): consolidate query and pipeline
onListenPipeline?: (pipeline: PipelineListener) => Promise<void>;
terminate(): void;
}

Expand All @@ -85,6 +90,7 @@ export class EventManagerImpl implements EventManager {
) => Promise<ViewSnapshot>;
/** Callback invoked once all listeners to a Query are removed. */
onUnlisten?: (query: Query, disableRemoteListen: boolean) => Promise<void>;
onListenPipeline?: (pipeline: PipelineListener) => Promise<void>;

/**
* Callback invoked when a Query starts listening to the remote store, while
Expand Down Expand Up @@ -123,6 +129,7 @@ function validateEventManager(eventManagerImpl: EventManagerImpl): void {
!!eventManagerImpl.onLastRemoteStoreUnlisten,
'onLastRemoteStoreUnlisten not set'
);
debugAssert(!!eventManagerImpl.onListenPipeline, 'onListenPipeline not set');
}

const enum ListenerSetupAction {
Expand Down Expand Up @@ -213,6 +220,25 @@ export async function eventManagerListen(
}
}

export async function eventManagerListenPipeline(
eventManager: EventManager,
listener: PipelineListener
): Promise<void> {
const eventManagerImpl = debugCast(eventManager, EventManagerImpl);
validateEventManager(eventManagerImpl);

try {
await eventManagerImpl.onListenPipeline!(listener);
} catch (e) {
const firestoreError = wrapInUserErrorIfRecoverable(
e as Error,
`Initialization of query '${listener.pipeline}' failed`
);
listener.onError(firestoreError);
return;
}
}

export async function eventManagerUnlisten(
eventManager: EventManager,
listener: QueryListener
Expand Down Expand Up @@ -286,6 +312,13 @@ export function eventManagerOnWatchChange(
}
}

export function eventManagerOnPipelineWatchChange(
eventManager: EventManager,
viewSnaps: PipelineResultView[]
): void {
const eventManagerImpl = debugCast(eventManager, EventManagerImpl);
}

export function eventManagerOnWatchError(
eventManager: EventManager,
query: Query,
Expand Down Expand Up @@ -567,3 +600,21 @@ export class QueryListener {
return this.options.source !== ListenerDataSource.Cache;
}
}

export class PipelineListener {
private snap: PipelineResultView | null = null;

constructor(
readonly pipeline: Pipeline,
private queryObserver: Observer<PipelineSnapshot>
) {}

onViewSnapshot(snap: PipelineResultView): boolean {
this.snap = snap;
return true;
}

onError(error: FirestoreError): void {
this.queryObserver.error(error);
}
}
35 changes: 33 additions & 2 deletions packages/firestore/src/core/firestore_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ import {
CredentialsProvider
} from '../api/credentials';
import { User } from '../auth/user';
import { Pipeline } from '../lite-api/pipeline';
import { Pipeline as LitePipeline } from '../lite-api/pipeline';
import { Pipeline } from '../api/pipeline';
import { LocalStore } from '../local/local_store';
import {
localStoreConfigureFieldIndexes,
Expand Down Expand Up @@ -79,16 +80,19 @@ import {
addSnapshotsInSyncListener,
EventManager,
eventManagerListen,
eventManagerListenPipeline,
eventManagerUnlisten,
ListenOptions,
Observer,
PipelineListener,
QueryListener,
removeSnapshotsInSyncListener
} from './event_manager';
import { newQueryForPath, Query } from './query';
import { SyncEngine } from './sync_engine';
import {
syncEngineListen,
syncEngineListenPipeline,
syncEngineLoadBundle,
syncEngineRegisterPendingWritesCallback,
syncEngineUnlisten,
Expand All @@ -101,6 +105,8 @@ import { TransactionOptions } from './transaction_options';
import { TransactionRunner } from './transaction_runner';
import { View } from './view';
import { ViewSnapshot } from './view_snapshot';
import { Unsubscribe } from '../api/reference_impl';
import { PipelineSnapshot } from '../api/snapshot';

const LOG_TAG = 'FirestoreClient';
export const MAX_CONCURRENT_LIMBO_RESOLUTIONS = 100;
Expand Down Expand Up @@ -404,6 +410,10 @@ export async function getEventManager(
null,
onlineComponentProvider.syncEngine
);
eventManager.onListenPipeline = syncEngineListenPipeline.bind(
null,
onlineComponentProvider.syncEngine
);
return eventManager;
}

Expand Down Expand Up @@ -556,7 +566,7 @@ export function firestoreClientRunAggregateQuery(

export function firestoreClientExecutePipeline(
client: FirestoreClient,
pipeline: Pipeline
pipeline: LitePipeline
): Promise<PipelineStreamElement[]> {
const deferred = new Deferred<PipelineStreamElement[]>();

Expand All @@ -571,6 +581,27 @@ export function firestoreClientExecutePipeline(
return deferred.promise;
}

export function firestoreClientListenPipeline(
client: FirestoreClient,
pipeline: Pipeline,
observer: {
next?: (snapshot: PipelineSnapshot) => void;
error?: (error: FirestoreError) => void;
complete?: () => void;
}
): Unsubscribe {
const wrappedObserver = new AsyncObserver(observer);
const listener = new PipelineListener(pipeline, wrappedObserver);
client.asyncQueue.enqueueAndForget(async () => {
const eventManager = await getEventManager(client);
return eventManagerListenPipeline(eventManager, listener);
});
return () => {
wrappedObserver.mute();
// TODO(pipeline): actually unlisten
};
}

export function firestoreClientWrite(
client: FirestoreClient,
mutations: Mutation[]
Expand Down
1 change: 1 addition & 0 deletions packages/firestore/src/core/query.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import {
Target,
targetEquals
} from './target';
import { Pipeline } from '../api/pipeline';

export const enum LimitType {
First = 'F',
Expand Down
Loading

0 comments on commit 422723a

Please sign in to comment.