@@ -21,6 +21,7 @@ import com.google.firebase.firestore.core.Canonicalizable
2121import com.google.firebase.firestore.model.Document
2222import com.google.firebase.firestore.model.DocumentKey
2323import com.google.firebase.firestore.model.MutableDocument
24+ import com.google.firebase.firestore.model.ResourcePath
2425import com.google.firebase.firestore.model.Values
2526import com.google.firebase.firestore.pipeline.AddFieldsStage
2627import com.google.firebase.firestore.pipeline.AggregateFunction
@@ -43,7 +44,6 @@ import com.google.firebase.firestore.pipeline.InternalOptions
4344import com.google.firebase.firestore.pipeline.LimitStage
4445import com.google.firebase.firestore.pipeline.OffsetStage
4546import com.google.firebase.firestore.pipeline.Ordering
46- import com.google.firebase.firestore.pipeline.PipelineOptions
4747import com.google.firebase.firestore.pipeline.RawStage
4848import com.google.firebase.firestore.pipeline.RemoveFieldsStage
4949import com.google.firebase.firestore.pipeline.ReplaceStage
@@ -55,17 +55,28 @@ import com.google.firebase.firestore.pipeline.Stage
5555import com.google.firebase.firestore.pipeline.UnionStage
5656import com.google.firebase.firestore.pipeline.UnnestStage
5757import com.google.firebase.firestore.pipeline.WhereStage
58+ import com.google.firebase.firestore.remote.RemoteSerializer
5859import com.google.firebase.firestore.util.Assert.fail
5960import com.google.firestore.v1.ExecutePipelineRequest
6061import com.google.firestore.v1.StructuredPipeline
6162import com.google.firestore.v1.Value
6263
63- open class AbstractPipeline
64+ class Pipeline
6465internal constructor (
65- internal val firestore: FirebaseFirestore ,
66- internal val userDataReader: UserDataReader ,
67- internal val stages: List <Stage <* >>
66+ private val firestore: FirebaseFirestore ,
67+ private val userDataReader: UserDataReader ,
68+ private val stages: List <Stage <* >>
6869) {
70+ internal constructor (
71+ firestore: FirebaseFirestore ,
72+ userDataReader: UserDataReader ,
73+ stage: Stage <* >
74+ ) : this (firestore, userDataReader, listOf (stage))
75+
76+ private fun append (stage : Stage <* >): Pipeline {
77+ return Pipeline (firestore, userDataReader, stages.plus(stage))
78+ }
79+
6980 private fun toStructuredPipelineProto (options : InternalOptions ? ): StructuredPipeline {
7081 val builder = StructuredPipeline .newBuilder()
7182 builder.pipeline = toPipelineProto()
@@ -79,17 +90,17 @@ internal constructor(
7990 .build()
8091
8192 private fun toExecutePipelineRequest (options : InternalOptions ? ): ExecutePipelineRequest {
82- val database = firestore.databaseId
93+ val database = firestore!! .databaseId
8394 val builder = ExecutePipelineRequest .newBuilder()
8495 builder.database = " projects/${database.projectId} /databases/${database.databaseId} "
8596 builder.structuredPipeline = toStructuredPipelineProto(options)
8697 return builder.build()
8798 }
8899
89- protected fun execute (options : InternalOptions ? ): Task <PipelineSnapshot > {
100+ fun execute (options : InternalOptions ? ): Task <PipelineSnapshot > {
90101 val request = toExecutePipelineRequest(options)
91102 val observerTask = ObserverSnapshotTask ()
92- firestore.callClient { call -> call!! .executePipeline(request, observerTask) }
103+ firestore? .callClient { call -> call!! .executePipeline(request, observerTask) }
93104 return observerTask.task
94105 }
95106
@@ -106,7 +117,7 @@ internal constructor(
106117 ) {
107118 results.add(
108119 PipelineResult (
109- firestore,
120+ firestore!! ,
110121 userDataWriter,
111122 if (key == null ) null else DocumentReference (key, firestore),
112123 data,
@@ -127,28 +138,9 @@ internal constructor(
127138 val task: Task <PipelineSnapshot >
128139 get() = taskCompletionSource.task
129140 }
130- }
131-
132- class Pipeline
133- private constructor (
134- firestore: FirebaseFirestore ,
135- userDataReader: UserDataReader ,
136- stages: List <Stage <* >>
137- ) : AbstractPipeline (firestore, userDataReader, stages) {
138- internal constructor (
139- firestore: FirebaseFirestore ,
140- userDataReader: UserDataReader ,
141- stage: Stage <* >
142- ) : this (firestore, userDataReader, listOf (stage))
143-
144- private fun append (stage : Stage <* >): Pipeline {
145- return Pipeline (firestore, userDataReader, stages.plus(stage))
146- }
147141
148142 fun execute (): Task <PipelineSnapshot > = execute(null )
149143
150- fun execute (options : PipelineOptions ): Task <PipelineSnapshot > = execute(options.options)
151-
152144 internal fun documentReference (key : DocumentKey ): DocumentReference {
153145 return DocumentReference (key, firestore)
154146 }
@@ -627,7 +619,7 @@ class PipelineSource internal constructor(private val firestore: FirebaseFiresto
627619 * @param path A path to a collection that will be the source of this pipeline.
628620 * @return A new [Pipeline] object with documents from target collection.
629621 */
630- fun collection (path : String ): Pipeline = collection(CollectionSource .of (path))
622+ fun collection (path : String ): Pipeline = collection(firestore.collection (path))
631623
632624 /* *
633625 * Set the pipeline's source to the collection specified by the given [CollectionReference].
@@ -637,7 +629,8 @@ class PipelineSource internal constructor(private val firestore: FirebaseFiresto
637629 * @throws [IllegalArgumentException] Thrown if the [ref] provided targets a different project or
638630 * database than the pipeline.
639631 */
640- fun collection (ref : CollectionReference ): Pipeline = collection(CollectionSource .of(ref))
632+ fun collection (ref : CollectionReference ): Pipeline =
633+ collection(CollectionSource .of(ref, firestore.databaseId))
641634
642635 /* *
643636 * Set the pipeline's source to the collection specified by CollectionSource.
@@ -648,7 +641,7 @@ class PipelineSource internal constructor(private val firestore: FirebaseFiresto
648641 * or database than the pipeline.
649642 */
650643 fun collection (stage : CollectionSource ): Pipeline {
651- if (stage.firestore != null && stage.firestore. databaseId != firestore.databaseId) {
644+ if (stage.serializer. databaseId() != firestore.databaseId) {
652645 throw IllegalArgumentException (" Provided collection is from a different Firestore instance." )
653646 }
654647 return Pipeline (firestore, firestore.userDataReader, stage)
@@ -661,9 +654,9 @@ class PipelineSource internal constructor(private val firestore: FirebaseFiresto
661654 * @return A new [Pipeline] object with documents from target collection group.
662655 */
663656 fun collectionGroup (collectionId : String ): Pipeline =
664- pipeline (CollectionGroupSource .of((collectionId)))
657+ collectionGroup (CollectionGroupSource .of((collectionId)))
665658
666- fun pipeline (stage : CollectionGroupSource ): Pipeline =
659+ internal fun collectionGroup (stage : CollectionGroupSource ): Pipeline =
667660 Pipeline (firestore, firestore.userDataReader, stage)
668661
669662 /* *
@@ -706,20 +699,34 @@ class PipelineSource internal constructor(private val firestore: FirebaseFiresto
706699 return Pipeline (
707700 firestore,
708701 firestore.userDataReader,
709- DocumentsSource (documents.map { docRef -> " / " + docRef. path }.toTypedArray())
702+ DocumentsSource (documents.map { ResourcePath .fromString(it. path) }.toTypedArray())
710703 )
711704 }
712705}
713706
714707class RealtimePipelineSource internal constructor(private val firestore : FirebaseFirestore ) {
708+ /* *
709+ * Convert the given Query into an equivalent Pipeline.
710+ *
711+ * @param query A Query to be converted into a Pipeline.
712+ * @return A new [Pipeline] object that is equivalent to [query]
713+ * @throws [IllegalArgumentException] Thrown if the [query] provided targets a different project
714+ * or database than the pipeline.
715+ */
716+ fun convertFrom (query : Query ): RealtimePipeline {
717+ if (query.firestore.databaseId != firestore.databaseId) {
718+ throw IllegalArgumentException (" Provided query is from a different Firestore instance." )
719+ }
720+ return query.query.toRealtimePipeline(firestore, firestore.userDataReader)
721+ }
715722
716723 /* *
717724 * Set the pipeline's source to the collection specified by the given path.
718725 *
719726 * @param path A path to a collection that will be the source of this pipeline.
720727 * @return A new [RealtimePipeline] object with documents from target collection.
721728 */
722- fun collection (path : String ): RealtimePipeline = collection(CollectionSource .of (path))
729+ fun collection (path : String ): RealtimePipeline = collection(firestore.collection (path))
723730
724731 /* *
725732 * Set the pipeline's source to the collection specified by the given [CollectionReference].
@@ -729,7 +736,8 @@ class RealtimePipelineSource internal constructor(private val firestore: Firebas
729736 * @throws [IllegalArgumentException] Thrown if the [ref] provided targets a different project or
730737 * database than the pipeline.
731738 */
732- fun collection (ref : CollectionReference ): RealtimePipeline = collection(CollectionSource .of(ref))
739+ fun collection (ref : CollectionReference ): RealtimePipeline =
740+ collection(CollectionSource .of(ref, firestore.databaseId))
733741
734742 /* *
735743 * Set the pipeline's source to the collection specified by CollectionSource.
@@ -740,10 +748,10 @@ class RealtimePipelineSource internal constructor(private val firestore: Firebas
740748 * or database than the pipeline.
741749 */
742750 fun collection (stage : CollectionSource ): RealtimePipeline {
743- if (stage.firestore != null && stage.firestore. databaseId != firestore.databaseId) {
751+ if (stage.serializer. databaseId() != firestore.databaseId) {
744752 throw IllegalArgumentException (" Provided collection is from a different Firestore instance." )
745753 }
746- return RealtimePipeline (firestore, firestore.userDataReader, stage)
754+ return RealtimePipeline (RemoteSerializer ( firestore.databaseId) , firestore.userDataReader, stage)
747755 }
748756
749757 /* *
@@ -753,26 +761,26 @@ class RealtimePipelineSource internal constructor(private val firestore: Firebas
753761 * @return A new [RealtimePipeline] object with documents from target collection group.
754762 */
755763 fun collectionGroup (collectionId : String ): RealtimePipeline =
756- pipeline (CollectionGroupSource .of((collectionId)))
764+ collectionGroup (CollectionGroupSource .of((collectionId)))
757765
758- fun pipeline (stage : CollectionGroupSource ): RealtimePipeline =
759- RealtimePipeline (firestore, firestore.userDataReader, stage)
766+ fun collectionGroup (stage : CollectionGroupSource ): RealtimePipeline =
767+ RealtimePipeline (RemoteSerializer ( firestore.databaseId) , firestore.userDataReader, stage)
760768}
761769
762770class RealtimePipeline
763771internal constructor (
764- firestore : FirebaseFirestore ,
765- userDataReader: UserDataReader ,
766- stages: List <Stage <* >>
767- ) : AbstractPipeline (firestore, userDataReader, stages), Canonicalizable {
772+ internal val serializer : RemoteSerializer ,
773+ internal val userDataReader: UserDataReader ,
774+ internal val stages: List <Stage <* >>
775+ ) : Canonicalizable {
768776 internal constructor (
769- firestore : FirebaseFirestore ,
777+ serializer : RemoteSerializer ,
770778 userDataReader: UserDataReader ,
771779 stage: Stage <* >
772- ) : this (firestore , userDataReader, listOf (stage))
780+ ) : this (serializer , userDataReader, listOf (stage))
773781
774782 private fun with (stages : List <Stage <* >>): RealtimePipeline =
775- RealtimePipeline (firestore , userDataReader, stages)
783+ RealtimePipeline (serializer , userDataReader, stages)
776784
777785 private fun append (stage : Stage <* >): RealtimePipeline = with (stages.plus(stage))
778786
@@ -820,14 +828,17 @@ internal constructor(
820828 return rewrittenStages.joinToString(" |" ) { stage -> (stage as Canonicalizable ).canonicalId() }
821829 }
822830
831+ override fun toString (): String = canonicalId()
832+
823833 override fun equals (other : Any? ): Boolean {
824834 if (this == = other) return true
825835 if (other !is RealtimePipeline ) return false
826- return stages == other.stages
836+ if (serializer.databaseId() != other.serializer.databaseId()) return false
837+ return rewrittenStages == other.rewrittenStages
827838 }
828839
829840 override fun hashCode (): Int {
830- return stages.hashCode()
841+ return serializer.databaseId().hashCode() * 31 + stages.hashCode()
831842 }
832843
833844 internal fun evaluate (inputs : List <MutableDocument >): List <MutableDocument > {
@@ -883,6 +894,15 @@ internal constructor(
883894 internal fun comparator (): Comparator <Document > =
884895 getLastEffectiveSortStage().comparator(evaluateContext())
885896
897+ internal fun toStructurePipelineProto (): StructuredPipeline {
898+ val builder = StructuredPipeline .newBuilder()
899+ builder.pipeline =
900+ com.google.firestore.v1.Pipeline .newBuilder()
901+ .addAllStages(rewrittenStages.map { it.toProtoStage(userDataReader) })
902+ .build()
903+ return builder.build()
904+ }
905+
886906 private fun getLastEffectiveSortStage (): SortStage {
887907 for (stage in rewrittenStages.asReversed()) {
888908 if (stage is SortStage ) {
0 commit comments