Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@ package com.google.firebase.firestore
import com.google.android.gms.tasks.Task
import com.google.android.gms.tasks.TaskCompletionSource
import com.google.firebase.Timestamp
import com.google.firebase.firestore.core.Canonicalizable
import com.google.firebase.firestore.model.Document
import com.google.firebase.firestore.model.DocumentKey
import com.google.firebase.firestore.model.MutableDocument
import com.google.firebase.firestore.model.Values
import com.google.firebase.firestore.pipeline.AddFieldsStage
import com.google.firebase.firestore.pipeline.AggregateFunction
Expand All @@ -29,6 +32,7 @@ import com.google.firebase.firestore.pipeline.CollectionSource
import com.google.firebase.firestore.pipeline.DatabaseSource
import com.google.firebase.firestore.pipeline.DistinctStage
import com.google.firebase.firestore.pipeline.DocumentsSource
import com.google.firebase.firestore.pipeline.EvaluationContext
import com.google.firebase.firestore.pipeline.Expr
import com.google.firebase.firestore.pipeline.Expr.Companion.field
import com.google.firebase.firestore.pipeline.ExprWithAlias
Expand All @@ -51,6 +55,7 @@ import com.google.firebase.firestore.pipeline.Stage
import com.google.firebase.firestore.pipeline.UnionStage
import com.google.firebase.firestore.pipeline.UnnestStage
import com.google.firebase.firestore.pipeline.WhereStage
import com.google.firebase.firestore.util.Assert.fail
import com.google.firestore.v1.ExecutePipelineRequest
import com.google.firestore.v1.StructuredPipeline
import com.google.firestore.v1.Value
Expand Down Expand Up @@ -759,7 +764,7 @@ internal constructor(
firestore: FirebaseFirestore,
userDataReader: UserDataReader,
stages: List<Stage<*>>
) : AbstractPipeline(firestore, userDataReader, stages) {
) : AbstractPipeline(firestore, userDataReader, stages), Canonicalizable {
internal constructor(
firestore: FirebaseFirestore,
userDataReader: UserDataReader,
Expand All @@ -786,31 +791,107 @@ internal constructor(

fun where(condition: BooleanExpr): RealtimePipeline = append(WhereStage(condition))

internal fun rewriteStages(): RealtimePipeline {
internal val rewrittenStages: List<Stage<*>> by lazy {
var hasOrder = false
return with(
buildList {
for (stage in stages) when (stage) {
// Stages whose semantics depend on ordering
is LimitStage,
is OffsetStage -> {
if (!hasOrder) {
hasOrder = true
add(SortStage.BY_DOCUMENT_ID)
}
add(stage)
}
is SortStage -> {
buildList {
for (stage in stages) when (stage) {
// Stages whose semantics depend on ordering
is LimitStage,
is OffsetStage -> {
if (!hasOrder) {
hasOrder = true
add(stage.withStableOrdering())
add(SortStage.BY_DOCUMENT_ID)
}
else -> add(stage)
add(stage)
}
if (!hasOrder) {
add(SortStage.BY_DOCUMENT_ID)
is SortStage -> {
hasOrder = true
add(stage.withStableOrdering())
}
else -> add(stage)
}
)
if (!hasOrder) {
add(SortStage.BY_DOCUMENT_ID)
}
}
}

override fun canonicalId(): String {
return rewrittenStages.joinToString("|") { stage -> (stage as Canonicalizable).canonicalId() }
}

override fun equals(other: Any?): Boolean {
if (this === other) return true
if (other !is RealtimePipeline) return false
return stages == other.stages
}

override fun hashCode(): Int {
return stages.hashCode()
}

internal fun evaluate(inputs: List<MutableDocument>): List<MutableDocument> {
val context = EvaluationContext(this)
return rewrittenStages.fold(inputs) { documents, stage -> stage.evaluate(context, documents) }
}

internal fun matchesAllDocuments(): Boolean {
for (stage in rewrittenStages) {
// Check for LimitStage
if (stage.name == "limit") {
return false
}
Comment on lines +841 to +843
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Using stage.name == "limit" to check for the LimitStage is a bit brittle. It's better to use a type check with is LimitStage for improved type safety and maintainability. This avoids potential issues if the stage name is ever changed and makes the code's intent clearer.

A similar issue exists in the hasLimit() function on line 865.

Suggested change
if (stage.name == "limit") {
return false
}
if (stage is LimitStage) {
return false
}


// Check for Where stage
if (stage is WhereStage) {
// Check if it's the special 'exists(__name__)' case
val funcExpr = stage.condition as? FunctionExpr
if (funcExpr?.name == "exists" && funcExpr.params.size == 1) {
val fieldExpr = funcExpr.params[0] as? Field
if (fieldExpr?.fieldPath?.isKeyField == true) {
continue // This specific 'exists(__name__)' filter doesn't count
}
}
return false
}
// TODO(pipeline) : Add checks for other filtering stages like Aggregate,
// Distinct, FindNearest once they are implemented.
}
return true
}

internal fun hasLimit(): Boolean {
for (stage in rewrittenStages) {
if (stage.name == "limit") {
return true
}
// TODO(pipeline): need to check for other stages that could have a limit,
// like findNearest
}
return false
}

internal fun matches(doc: Document): Boolean {
val result = evaluate(listOf(doc as MutableDocument))
return result.isNotEmpty()
}

private fun evaluateContext(): EvaluationContext {
return EvaluationContext(this)
}

internal fun comparator(): Comparator<Document> =
getLastEffectiveSortStage().comparator(evaluateContext())
Comment on lines +879 to +884
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The evaluateContext() function is a private helper that is only used once. It can be inlined at its call site in comparator() to simplify the code and improve readability.

  internal fun comparator(): Comparator<Document> =
    getLastEffectiveSortStage().comparator(EvaluationContext(this))


private fun getLastEffectiveSortStage(): SortStage {
for (stage in rewrittenStages.asReversed()) {
if (stage is SortStage) {
return stage
}
// TODO(pipeline): Consider stages that might invalidate ordering later,
// like fineNearest
}
throw fail("RealtimePipeline must contain at least one Sort stage (ensured by RewriteStages).")
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Copyright 2025 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.firebase.firestore.core

/** An internal interface for classes that can be canonicalized to a string representation. */
internal interface Canonicalizable {
fun canonicalId(): String
}
Loading
Loading