diff --git a/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/CrossPartition/HierarchicalPartitionUtils.cs b/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/CrossPartition/HierarchicalPartitionUtils.cs
new file mode 100644
index 0000000000..2b91c7cb40
--- /dev/null
+++ b/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/CrossPartition/HierarchicalPartitionUtils.cs
@@ -0,0 +1,100 @@
+// ------------------------------------------------------------
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// ------------------------------------------------------------
+
+namespace Microsoft.Azure.Cosmos.Query.Core.Pipeline.CrossPartition
+{
+ using System;
+ using Microsoft.Azure.Cosmos.Query.Core.QueryClient;
+
+ internal static class HierarchicalPartitionUtils
+ {
+ ///
+ /// Updates the FeedRange to limit the scope of incoming feedRange to logical partition within a single physical partition.
+ /// Generally speaking, a subpartitioned container can experience split partition at any level of hierarchical partition key.
+ /// This could cause a situation where more than one physical partition contains the data for a partial partition key.
+ /// Currently, enumerator instantiation does not honor physical partition boundary and allocates entire epk range which could spans across multiple physical partitions to the enumerator.
+ /// Since such an epk range does not exist at the container level, Service generates a GoneException.
+ /// This method restrics the range of each enumerator by intersecting it with physical partition range.
+ ///
+ public static FeedRangeInternal LimitFeedRangeToSinglePartition(PartitionKey? partitionKey, FeedRangeInternal feedRange, ContainerQueryProperties containerQueryProperties)
+ {
+ // We sadly need to check the partition key, since a user can set a partition key in the request options with a different continuation token.
+ // In the future the partition filtering and continuation information needs to be a tightly bounded contract (like cross feed range state).
+ if (partitionKey.HasValue)
+ {
+ // ISSUE-HACK-adityasa-3/25/2024 - We should not update the original feed range inside this class.
+ // Instead we should guarantee that when enumerator is instantiated it is limited to a single physical partition.
+ // Ultimately we should remove enumerator's dependency on PartitionKey.
+ if ((containerQueryProperties.PartitionKeyDefinition.Paths.Count > 1) &&
+ (partitionKey.Value.InternalKey.Components.Count != containerQueryProperties.PartitionKeyDefinition.Paths.Count) &&
+ (feedRange is FeedRangeEpk feedRangeEpk))
+ {
+ if (containerQueryProperties.EffectiveRangesForPartitionKey == null ||
+ containerQueryProperties.EffectiveRangesForPartitionKey.Count == 0)
+ {
+ throw new InvalidOperationException(
+ "EffectiveRangesForPartitionKey should be populated when PK is specified in request options.");
+ }
+
+ foreach (Documents.Routing.Range epkForPartitionKey in containerQueryProperties.EffectiveRangesForPartitionKey)
+ {
+ if (Documents.Routing.Range.CheckOverlapping(
+ feedRangeEpk.Range,
+ epkForPartitionKey))
+ {
+ if (!feedRangeEpk.Range.Equals(epkForPartitionKey))
+ {
+ String overlappingMin;
+ bool minInclusive;
+ String overlappingMax;
+ bool maxInclusive;
+
+ if (Documents.Routing.Range.MinComparer.Instance.Compare(
+ epkForPartitionKey,
+ feedRangeEpk.Range) < 0)
+ {
+ overlappingMin = feedRangeEpk.Range.Min;
+ minInclusive = feedRangeEpk.Range.IsMinInclusive;
+ }
+ else
+ {
+ overlappingMin = epkForPartitionKey.Min;
+ minInclusive = epkForPartitionKey.IsMinInclusive;
+ }
+
+ if (Documents.Routing.Range.MaxComparer.Instance.Compare(
+ epkForPartitionKey,
+ feedRangeEpk.Range) > 0)
+ {
+ overlappingMax = feedRangeEpk.Range.Max;
+ maxInclusive = feedRangeEpk.Range.IsMaxInclusive;
+ }
+ else
+ {
+ overlappingMax = epkForPartitionKey.Max;
+ maxInclusive = epkForPartitionKey.IsMaxInclusive;
+ }
+
+ feedRange = new FeedRangeEpk(
+ new Documents.Routing.Range(
+ overlappingMin,
+ overlappingMax,
+ minInclusive,
+ maxInclusive));
+ }
+
+ break;
+ }
+ }
+ }
+ else
+ {
+ feedRange = new FeedRangePartitionKey(partitionKey.Value);
+ }
+ }
+
+ return feedRange;
+ }
+ }
+}
diff --git a/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/CrossPartition/OrderBy/OrderByCrossPartitionQueryPipelineStage.cs b/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/CrossPartition/OrderBy/OrderByCrossPartitionQueryPipelineStage.cs
index c96942f080..e7121d5770 100644
--- a/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/CrossPartition/OrderBy/OrderByCrossPartitionQueryPipelineStage.cs
+++ b/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/CrossPartition/OrderBy/OrderByCrossPartitionQueryPipelineStage.cs
@@ -43,6 +43,8 @@ private sealed class InitializationParameters
{
public IDocumentContainer DocumentContainer { get; }
+ public ContainerQueryProperties ContainerQueryProperties { get; }
+
public SqlQuerySpec SqlQuerySpec { get; }
public IReadOnlyList TargetRanges { get; }
@@ -52,11 +54,12 @@ private sealed class InitializationParameters
public IReadOnlyList OrderByColumns { get; }
public QueryExecutionOptions QueryPaginationOptions { get; }
-
+
public int MaxConcurrency { get; }
public InitializationParameters(
IDocumentContainer documentContainer,
+ ContainerQueryProperties containerQueryProperties,
SqlQuerySpec sqlQuerySpec,
IReadOnlyList targetRanges,
PartitionKey? partitionKey,
@@ -65,6 +68,7 @@ public InitializationParameters(
int maxConcurrency)
{
this.DocumentContainer = documentContainer ?? throw new ArgumentNullException(nameof(documentContainer));
+ this.ContainerQueryProperties = containerQueryProperties;
this.SqlQuerySpec = sqlQuerySpec ?? throw new ArgumentNullException(nameof(sqlQuerySpec));
this.TargetRanges = targetRanges ?? throw new ArgumentNullException(nameof(targetRanges));
this.PartitionKey = partitionKey;
@@ -83,6 +87,7 @@ private enum ExecutionState
public static TryCatch MonadicCreate(
IDocumentContainer documentContainer,
+ ContainerQueryProperties containerQueryProperties,
SqlQuerySpec sqlQuerySpec,
IReadOnlyList targetRanges,
Cosmos.PartitionKey? partitionKey,
@@ -126,6 +131,7 @@ public static TryCatch MonadicCreate(
{
return StreamingOrderByCrossPartitionQueryPipelineStage.MonadicCreate(
documentContainer,
+ containerQueryProperties,
sqlQuerySpec,
targetRanges,
partitionKey,
@@ -141,6 +147,7 @@ public static TryCatch MonadicCreate(
return TryCatch.FromResult(NonStreamingOrderByPipelineStage.Create(
documentContainer,
+ containerQueryProperties,
rewrittenQueryForOrderBy,
targetRanges,
partitionKey,
@@ -151,6 +158,7 @@ public static TryCatch MonadicCreate(
private static async ValueTask MoveNextAsync_InitializeAsync_HandleSplitAsync(
IDocumentContainer documentContainer,
+ ContainerQueryProperties containerQueryProperties,
Queue<(OrderByQueryPartitionRangePageAsyncEnumerator enumerator, OrderByContinuationToken token)> uninitializedEnumeratorsAndTokens,
OrderByQueryPartitionRangePageAsyncEnumerator uninitializedEnumerator,
OrderByContinuationToken token,
@@ -193,6 +201,7 @@ private static async ValueTask MoveNextAsync_InitializeAsync_HandleSplitAsync(
// We maintain the current enumerator's range and let the RequestInvokerHandler logic kick in
OrderByQueryPartitionRangePageAsyncEnumerator childPaginator = OrderByQueryPartitionRangePageAsyncEnumerator.Create(
documentContainer,
+ containerQueryProperties,
uninitializedEnumerator.SqlQuerySpec,
new FeedRangeState(uninitializedEnumerator.FeedRangeState.FeedRange, uninitializedEnumerator.StartOfPageState),
partitionKey: null,
@@ -210,6 +219,7 @@ private static async ValueTask MoveNextAsync_InitializeAsync_HandleSplitAsync(
OrderByQueryPartitionRangePageAsyncEnumerator childPaginator = OrderByQueryPartitionRangePageAsyncEnumerator.Create(
documentContainer,
+ containerQueryProperties,
uninitializedEnumerator.SqlQuerySpec,
new FeedRangeState(childRange, uninitializedEnumerator.StartOfPageState),
partitionKey: null,
@@ -293,6 +303,7 @@ private static bool ContainsSupportedResumeTypes(IReadOnlyList orde
private sealed class StreamingOrderByCrossPartitionQueryPipelineStage : IQueryPipelineStage
{
private readonly IDocumentContainer documentContainer;
+ private readonly ContainerQueryProperties containerQueryProperties;
private readonly IReadOnlyList sortOrders;
private readonly PriorityQueue enumerators;
private readonly Queue<(OrderByQueryPartitionRangePageAsyncEnumerator enumerator, OrderByContinuationToken token)> uninitializedEnumeratorsAndTokens;
@@ -315,6 +326,7 @@ private static class Expressions
private StreamingOrderByCrossPartitionQueryPipelineStage(
IDocumentContainer documentContainer,
+ ContainerQueryProperties containerQueryProperties,
IReadOnlyList sortOrders,
QueryExecutionOptions queryPaginationOptions,
int maxConcurrency,
@@ -322,6 +334,7 @@ private StreamingOrderByCrossPartitionQueryPipelineStage(
QueryState state)
{
this.documentContainer = documentContainer ?? throw new ArgumentNullException(nameof(documentContainer));
+ this.containerQueryProperties = containerQueryProperties;
this.sortOrders = sortOrders ?? throw new ArgumentNullException(nameof(sortOrders));
this.enumerators = new PriorityQueue(new OrderByEnumeratorComparer(this.sortOrders));
this.queryPaginationOptions = queryPaginationOptions ?? QueryExecutionOptions.Default;
@@ -332,6 +345,7 @@ private StreamingOrderByCrossPartitionQueryPipelineStage(
private StreamingOrderByCrossPartitionQueryPipelineStage(
IDocumentContainer documentContainer,
+ ContainerQueryProperties containerQueryProperties,
IReadOnlyList sortOrders,
PriorityQueue enumerators,
Queue<(OrderByQueryPartitionRangePageAsyncEnumerator enumerator, OrderByContinuationToken token)> uninitializedEnumeratorsAndTokens,
@@ -339,6 +353,7 @@ private StreamingOrderByCrossPartitionQueryPipelineStage(
int maxConcurrency)
{
this.documentContainer = documentContainer ?? throw new ArgumentNullException(nameof(documentContainer));
+ this.containerQueryProperties = containerQueryProperties;
this.sortOrders = sortOrders ?? throw new ArgumentNullException(nameof(sortOrders));
this.enumerators = enumerators ?? throw new ArgumentNullException(nameof(enumerators));
this.uninitializedEnumeratorsAndTokens = uninitializedEnumeratorsAndTokens ?? throw new ArgumentNullException(nameof(uninitializedEnumeratorsAndTokens));
@@ -562,6 +577,7 @@ private async ValueTask MoveNextAsync_InitializeAsync_HandleSplitAsync(
{
await OrderByCrossPartitionQueryPipelineStage.MoveNextAsync_InitializeAsync_HandleSplitAsync(
this.documentContainer,
+ this.containerQueryProperties,
this.uninitializedEnumeratorsAndTokens,
uninitializedEnumerator,
token,
@@ -775,6 +791,7 @@ public ValueTask MoveNextAsync(ITrace trace, CancellationToken cancellatio
public static IQueryPipelineStage Create(
IDocumentContainer documentContainer,
+ ContainerQueryProperties containerQueryProperties,
IReadOnlyList sortOrders,
PriorityQueue enumerators,
Queue<(OrderByQueryPartitionRangePageAsyncEnumerator enumerator, OrderByContinuationToken token)> uninitializedEnumeratorsAndTokens,
@@ -783,6 +800,7 @@ public static IQueryPipelineStage Create(
{
return new StreamingOrderByCrossPartitionQueryPipelineStage(
documentContainer,
+ containerQueryProperties,
sortOrders,
enumerators,
uninitializedEnumeratorsAndTokens,
@@ -792,6 +810,7 @@ public static IQueryPipelineStage Create(
public static TryCatch MonadicCreate(
IDocumentContainer documentContainer,
+ ContainerQueryProperties containerQueryProperties,
SqlQuerySpec sqlQuerySpec,
IReadOnlyList targetRanges,
Cosmos.PartitionKey? partitionKey,
@@ -815,6 +834,7 @@ public static TryCatch MonadicCreate(
enumeratorsAndTokens = targetRanges
.Select(range => (OrderByQueryPartitionRangePageAsyncEnumerator.Create(
documentContainer,
+ containerQueryProperties,
rewrittenQueryForOrderBy,
new FeedRangeState(range, state: default),
partitionKey,
@@ -903,6 +923,7 @@ public static TryCatch MonadicCreate(
OrderByContinuationToken token = kvp.Value;
OrderByQueryPartitionRangePageAsyncEnumerator remoteEnumerator = OrderByQueryPartitionRangePageAsyncEnumerator.Create(
documentContainer,
+ containerQueryProperties,
leftQuerySpec,
new FeedRangeState(range, token?.ParallelContinuationToken?.Token != null ? new QueryState(CosmosString.Create(token.ParallelContinuationToken.Token)) : null),
partitionKey,
@@ -930,6 +951,7 @@ public static TryCatch MonadicCreate(
OrderByQueryPartitionRangePageAsyncEnumerator remoteEnumerator = OrderByQueryPartitionRangePageAsyncEnumerator.Create(
documentContainer,
+ containerQueryProperties,
targetQuerySpec,
new FeedRangeState(range, token?.ParallelContinuationToken?.Token != null ? new QueryState(CosmosString.Create(token.ParallelContinuationToken.Token)) : null),
partitionKey,
@@ -952,6 +974,7 @@ public static TryCatch MonadicCreate(
OrderByContinuationToken token = kvp.Value;
OrderByQueryPartitionRangePageAsyncEnumerator remoteEnumerator = OrderByQueryPartitionRangePageAsyncEnumerator.Create(
documentContainer,
+ containerQueryProperties,
rightQuerySpec,
new FeedRangeState(range, token?.ParallelContinuationToken?.Token != null ? new QueryState(CosmosString.Create(token.ParallelContinuationToken.Token)) : null),
partitionKey,
@@ -991,6 +1014,7 @@ public static TryCatch MonadicCreate(
OrderByContinuationToken token = kvp.Value;
OrderByQueryPartitionRangePageAsyncEnumerator remoteEnumerator = OrderByQueryPartitionRangePageAsyncEnumerator.Create(
documentContainer,
+ containerQueryProperties,
rewrittenQueryForOrderBy,
new FeedRangeState(range, token?.ParallelContinuationToken?.Token != null ? new QueryState(CosmosString.Create(token.ParallelContinuationToken.Token)) : null),
partitionKey,
@@ -1006,6 +1030,7 @@ public static TryCatch MonadicCreate(
StreamingOrderByCrossPartitionQueryPipelineStage stage = new StreamingOrderByCrossPartitionQueryPipelineStage(
documentContainer,
+ containerQueryProperties,
orderByColumns.Select(column => column.SortOrder).ToList(),
queryPaginationOptions,
maxConcurrency,
@@ -1740,6 +1765,7 @@ private async Task MoveNextAsync_InitializeAsync(ITrace
{
ITracingAsyncEnumerator> enumerator = await OrderByCrossPartitionRangePageEnumerator.CreateAsync(
this.parameters.DocumentContainer,
+ this.parameters.ContainerQueryProperties,
this.parameters.SqlQuerySpec,
this.parameters.TargetRanges,
this.parameters.PartitionKey,
@@ -1763,6 +1789,7 @@ private async Task MoveNextAsync_InitializeAsync(ITrace
public static IQueryPipelineStage Create(
IDocumentContainer documentContainer,
+ ContainerQueryProperties containerQueryProperties,
SqlQuerySpec sqlQuerySpec,
IReadOnlyList targetRanges,
Cosmos.PartitionKey? partitionKey,
@@ -1776,6 +1803,7 @@ public static IQueryPipelineStage Create(
InitializationParameters parameters = new InitializationParameters(
documentContainer,
+ containerQueryProperties,
sqlQuerySpec,
targetRanges,
partitionKey,
@@ -1793,20 +1821,25 @@ private sealed class OrderByCrossPartitionRangePageEnumerator : ITracingAsyncEnu
{
private readonly IDocumentContainer documentContainer;
+ private readonly ContainerQueryProperties containerQueryProperties;
+
private readonly Queue<(OrderByQueryPartitionRangePageAsyncEnumerator enumerator, OrderByContinuationToken token)> enumeratorsAndTokens;
public TryCatch Current { get; private set; }
private OrderByCrossPartitionRangePageEnumerator(
IDocumentContainer documentContainer,
- Queue<(OrderByQueryPartitionRangePageAsyncEnumerator enumerator, OrderByContinuationToken token)> enumeratorsAndTokens)
+ Queue<(OrderByQueryPartitionRangePageAsyncEnumerator enumerator, OrderByContinuationToken token)> enumeratorsAndTokens,
+ ContainerQueryProperties containerQueryProperties)
{
this.documentContainer = documentContainer ?? throw new ArgumentNullException(nameof(documentContainer));
this.enumeratorsAndTokens = enumeratorsAndTokens ?? throw new ArgumentNullException(nameof(enumeratorsAndTokens));
+ this.containerQueryProperties = containerQueryProperties;
}
public static async Task>> CreateAsync(
IDocumentContainer documentContainer,
+ ContainerQueryProperties containerQueryProperties,
SqlQuerySpec sqlQuerySpec,
IReadOnlyList targetRanges,
Cosmos.PartitionKey? partitionKey,
@@ -1821,6 +1854,7 @@ public static async Task>> Cr
{
OrderByQueryPartitionRangePageAsyncEnumerator enumerator = OrderByQueryPartitionRangePageAsyncEnumerator.Create(
documentContainer,
+ containerQueryProperties,
sqlQuerySpec,
new FeedRangeState(range, state: null),
partitionKey,
@@ -1837,7 +1871,7 @@ await ParallelPrefetch.PrefetchInParallelAsync(
trace,
cancellationToken);
- return new OrderByCrossPartitionRangePageEnumerator(documentContainer, enumeratorsAndTokens);
+ return new OrderByCrossPartitionRangePageEnumerator(documentContainer, enumeratorsAndTokens, containerQueryProperties);
}
public async ValueTask DisposeAsync()
@@ -1894,6 +1928,7 @@ public async ValueTask MoveNextAsync(ITrace trace, CancellationToken cance
{
await MoveNextAsync_InitializeAsync_HandleSplitAsync(
this.documentContainer,
+ this.containerQueryProperties,
this.enumeratorsAndTokens,
enumerator,
token,
diff --git a/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/CrossPartition/OrderBy/OrderByQueryPartitionRangePageAsyncEnumerator.cs b/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/CrossPartition/OrderBy/OrderByQueryPartitionRangePageAsyncEnumerator.cs
index e5c70a9ccd..9d49d9a78e 100644
--- a/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/CrossPartition/OrderBy/OrderByQueryPartitionRangePageAsyncEnumerator.cs
+++ b/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/CrossPartition/OrderBy/OrderByQueryPartitionRangePageAsyncEnumerator.cs
@@ -10,7 +10,8 @@ namespace Microsoft.Azure.Cosmos.Query.Core.Pipeline.CrossPartition.OrderBy
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos.Pagination;
using Microsoft.Azure.Cosmos.Query.Core.Monads;
- using Microsoft.Azure.Cosmos.Query.Core.Pipeline.Pagination;
+ using Microsoft.Azure.Cosmos.Query.Core.Pipeline.Pagination;
+ using Microsoft.Azure.Cosmos.Query.Core.QueryClient;
using Microsoft.Azure.Cosmos.Tracing;
internal sealed class OrderByQueryPartitionRangePageAsyncEnumerator : PartitionRangePageAsyncEnumerator, IPrefetcher
@@ -19,7 +20,8 @@ internal sealed class OrderByQueryPartitionRangePageAsyncEnumerator : PartitionR
private readonly BufferedPartitionRangePageAsyncEnumeratorBase bufferedEnumerator;
public static OrderByQueryPartitionRangePageAsyncEnumerator Create(
- IQueryDataSource queryDataSource,
+ IQueryDataSource queryDataSource,
+ ContainerQueryProperties containerQueryProperties,
SqlQuerySpec sqlQuerySpec,
FeedRangeState feedRangeState,
PartitionKey? partitionKey,
@@ -28,7 +30,8 @@ public static OrderByQueryPartitionRangePageAsyncEnumerator Create(
PrefetchPolicy prefetchPolicy)
{
InnerEnumerator enumerator = new InnerEnumerator(
- queryDataSource,
+ queryDataSource,
+ containerQueryProperties,
sqlQuerySpec,
feedRangeState,
partitionKey,
@@ -105,10 +108,12 @@ public OrderByQueryPartitionRangePageAsyncEnumerator CloneAsFullyBufferedEnumera
private sealed class InnerEnumerator : PartitionRangePageAsyncEnumerator
{
- private readonly IQueryDataSource queryDataSource;
+ private readonly IQueryDataSource queryDataSource;
+ private readonly ContainerQueryProperties containerQueryProperties;
public InnerEnumerator(
- IQueryDataSource queryDataSource,
+ IQueryDataSource queryDataSource,
+ ContainerQueryProperties containerQueryProperties,
SqlQuerySpec sqlQuerySpec,
FeedRangeState feedRangeState,
PartitionKey? partitionKey,
@@ -117,6 +122,7 @@ public InnerEnumerator(
: base(feedRangeState)
{
this.queryDataSource = queryDataSource ?? throw new ArgumentNullException(nameof(queryDataSource));
+ this.containerQueryProperties = containerQueryProperties;
this.SqlQuerySpec = sqlQuerySpec ?? throw new ArgumentNullException(nameof(sqlQuerySpec));
this.PartitionKey = partitionKey;
this.QueryPaginationOptions = queryPaginationOptions ?? QueryExecutionOptions.Default;
@@ -140,6 +146,7 @@ public InnerEnumerator CloneWithMaxPageSize()
return new InnerEnumerator(
this.queryDataSource,
+ this.containerQueryProperties,
this.SqlQuerySpec,
this.FeedRangeState,
this.PartitionKey,
@@ -151,9 +158,7 @@ public InnerEnumerator CloneWithMaxPageSize()
protected override async Task> GetNextPageAsync(ITrace trace, CancellationToken cancellationToken)
{
- // Unfortunately we need to keep both the epk range and partition key for queries
- // Since the continuation token format uses epk range even though we only need the partition key to route the request.
- FeedRangeInternal feedRange = this.PartitionKey.HasValue ? new FeedRangePartitionKey(this.PartitionKey.Value) : this.FeedRangeState.FeedRange;
+ FeedRangeInternal feedRange = HierarchicalPartitionUtils.LimitFeedRangeToSinglePartition(this.PartitionKey, this.FeedRangeState.FeedRange, this.containerQueryProperties);
TryCatch monadicQueryPage = await this.queryDataSource
.MonadicQueryAsync(
diff --git a/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/CrossPartition/Parallel/QueryPartitionRangePageAsyncEnumerator.cs b/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/CrossPartition/Parallel/QueryPartitionRangePageAsyncEnumerator.cs
index bf10ea2af7..ebb57d0781 100644
--- a/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/CrossPartition/Parallel/QueryPartitionRangePageAsyncEnumerator.cs
+++ b/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/CrossPartition/Parallel/QueryPartitionRangePageAsyncEnumerator.cs
@@ -46,7 +46,7 @@ protected override Task> GetNextPageAsync(ITrace trace, Canc
throw new ArgumentNullException(nameof(trace));
}
- FeedRangeInternal feedRange = this.LimitFeedRangeToSinglePartition();
+ FeedRangeInternal feedRange = HierarchicalPartitionUtils.LimitFeedRangeToSinglePartition(this.partitionKey, this.FeedRangeState.FeedRange, this.containerQueryProperties);
return this.queryDataSource.MonadicQueryAsync(
sqlQuerySpec: this.sqlQuerySpec,
feedRangeState: new FeedRangeState(feedRange, this.FeedRangeState.State),
@@ -54,95 +54,5 @@ protected override Task> GetNextPageAsync(ITrace trace, Canc
trace: trace,
cancellationToken);
}
-
- ///
- /// Updates the FeedRange to limit the scope of this enumerator to single physical partition.
- /// Generally speaking, a subpartitioned container can experience split partition at any level of hierarchical partition key.
- /// This could cause a situation where more than one physical partition contains the data for a partial partition key.
- /// Currently, enumerator instantiation does not honor physical partition boundary and allocates entire epk range which could spans across multiple physical partitions to the enumerator.
- /// Since such an epk range does not exist at the container level, Service generates a GoneException.
- /// This method restrics the range of each container by shrinking the ends of the range so that they do not span across physical partition.
- ///
- private FeedRangeInternal LimitFeedRangeToSinglePartition()
- {
- // We sadly need to check the partition key, since a user can set a partition key in the request options with a different continuation token.
- // In the future the partition filtering and continuation information needs to be a tightly bounded contract (like cross feed range state).
- FeedRangeInternal feedRange = this.FeedRangeState.FeedRange;
- if (this.partitionKey.HasValue)
- {
- // ISSUE-HACK-adityasa-3/25/2024 - We should not update the original feed range inside this class.
- // Instead we should guarantee that when enumerator is instantiated it is limited to a single physical partition.
- // Ultimately we should remove enumerator's dependency on PartitionKey.
- if ((this.containerQueryProperties.PartitionKeyDefinition.Paths.Count > 1) &&
- (this.partitionKey.Value.InternalKey.Components.Count != this.containerQueryProperties.PartitionKeyDefinition.Paths.Count) &&
- (feedRange is FeedRangeEpk feedRangeEpk))
- {
- if (this.containerQueryProperties.EffectiveRangesForPartitionKey == null ||
- this.containerQueryProperties.EffectiveRangesForPartitionKey.Count == 0)
- {
- throw new InvalidOperationException(
- "EffectiveRangesForPartitionKey should be populated when PK is specified in request options.");
- }
-
- foreach (Documents.Routing.Range epkForPartitionKey in
- this.containerQueryProperties.EffectiveRangesForPartitionKey)
- {
- if (Documents.Routing.Range.CheckOverlapping(
- feedRangeEpk.Range,
- epkForPartitionKey))
- {
- if (!feedRangeEpk.Range.Equals(epkForPartitionKey))
- {
- String overlappingMin;
- bool minInclusive;
- String overlappingMax;
- bool maxInclusive;
-
- if (Documents.Routing.Range.MinComparer.Instance.Compare(
- epkForPartitionKey,
- feedRangeEpk.Range) < 0)
- {
- overlappingMin = feedRangeEpk.Range.Min;
- minInclusive = feedRangeEpk.Range.IsMinInclusive;
- }
- else
- {
- overlappingMin = epkForPartitionKey.Min;
- minInclusive = epkForPartitionKey.IsMinInclusive;
- }
-
- if (Documents.Routing.Range.MaxComparer.Instance.Compare(
- epkForPartitionKey,
- feedRangeEpk.Range) > 0)
- {
- overlappingMax = feedRangeEpk.Range.Max;
- maxInclusive = feedRangeEpk.Range.IsMaxInclusive;
- }
- else
- {
- overlappingMax = epkForPartitionKey.Max;
- maxInclusive = epkForPartitionKey.IsMaxInclusive;
- }
-
- feedRange = new FeedRangeEpk(
- new Documents.Routing.Range(
- overlappingMin,
- overlappingMax,
- minInclusive,
- maxInclusive));
- }
-
- break;
- }
- }
- }
- else
- {
- feedRange = new FeedRangePartitionKey(this.partitionKey.Value);
- }
- }
-
- return feedRange;
- }
}
}
\ No newline at end of file
diff --git a/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/PipelineFactory.cs b/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/PipelineFactory.cs
index 47208f6205..0741138f75 100644
--- a/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/PipelineFactory.cs
+++ b/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/PipelineFactory.cs
@@ -79,7 +79,8 @@ public static TryCatch MonadicCreate(
queryPaginationOptions: queryPaginationOptions,
maxConcurrency: maxConcurrency,
nonStreamingOrderBy: queryInfo.HasNonStreamingOrderBy,
- continuationToken: continuationToken);
+ continuationToken: continuationToken,
+ containerQueryProperties: containerQueryProperties);
}
else
{
diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Performance.Tests/Query/OrderByPipelineStageBenchmark.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Performance.Tests/Query/OrderByPipelineStageBenchmark.cs
index 872722e110..4fb4430153 100644
--- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Performance.Tests/Query/OrderByPipelineStageBenchmark.cs
+++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Performance.Tests/Query/OrderByPipelineStageBenchmark.cs
@@ -68,7 +68,8 @@ private static async Task CreateAndRunPipeline(IDocumentContainer documentContai
queryPaginationOptions: new QueryExecutionOptions(pageSizeHint: EndUserPageSize),
maxConcurrency: MaxConcurrency,
nonStreamingOrderBy: nonStreamingOrderBy,
- continuationToken: null);
+ continuationToken: null,
+ containerQueryProperties: new Cosmos.Query.Core.QueryClient.ContainerQueryProperties());
IQueryPipelineStage pipeline = pipelineStage.Result;
diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Pagination/InMemoryContainer.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Pagination/InMemoryContainer.cs
index 1451862287..b00d515b65 100644
--- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Pagination/InMemoryContainer.cs
+++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Pagination/InMemoryContainer.cs
@@ -515,7 +515,7 @@ public virtual Task> MonadicQueryAsync(
}
List documents = new List();
- foreach (Record record in records.Where(r => IsRecordWithinFeedRange(r, feedRangeState.FeedRange, this.partitionKeyDefinition) && IsRecordWithinQueryPartition(r, this.queryRequestOptions, this.partitionKeyDefinition)))
+ foreach (Record record in records.Where(r => IsRecordWithinFeedRange(r, feedRangeState.FeedRange, this.partitionKeyDefinition)))
{
CosmosObject document = ConvertRecordToCosmosElement(record);
documents.Add(CosmosObject.Create(document));
@@ -719,26 +719,6 @@ public virtual Task> MonadicQueryAsync(
}
}
- private bool IsRecordWithinQueryPartition(Record record, QueryRequestOptions queryRequestOptions, PartitionKeyDefinition partitionKeyDefinition)
- {
- if(queryRequestOptions?.PartitionKey == null)
- {
- return true;
- }
-
- IList partitionKey = GetPartitionKeysFromObjectModel(queryRequestOptions.PartitionKey.Value);
- IList partitionKeyFromRecord = GetPartitionKeysFromPayload(record.Payload, partitionKeyDefinition);
- if (partitionKeyDefinition.Kind == PartitionKind.MultiHash)
- {
- PartitionKeyHash partitionKeyHash = GetHashFromPartitionKeys(partitionKey, partitionKeyDefinition);
- PartitionKeyHash partitionKeyFromRecordHash = GetHashFromPartitionKeys(partitionKeyFromRecord, partitionKeyDefinition);
-
- return partitionKeyHash.Equals(partitionKeyFromRecordHash) || partitionKeyFromRecordHash.Value.StartsWith(partitionKeyHash.Value);
- }
-
- return partitionKey.SequenceEqual(partitionKeyFromRecord);
- }
-
public Task> MonadicChangeFeedAsync(
FeedRangeState feedRangeState,
ChangeFeedExecutionOptions changeFeedPaginationOptions,
diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/OrderByQueryPartitionRangePageAsyncEnumeratorTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/OrderByQueryPartitionRangePageAsyncEnumeratorTests.cs
index e2ec35831b..6c189e92e7 100644
--- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/OrderByQueryPartitionRangePageAsyncEnumeratorTests.cs
+++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/OrderByQueryPartitionRangePageAsyncEnumeratorTests.cs
@@ -58,7 +58,8 @@ protected override Task>> CreateEnum
IAsyncEnumerator> enumerator = new TracingAsyncEnumerator>(
OrderByQueryPartitionRangePageAsyncEnumerator.Create(
- queryDataSource: documentContainer,
+ queryDataSource: documentContainer,
+ containerQueryProperties: new Cosmos.Query.Core.QueryClient.ContainerQueryProperties(),
sqlQuerySpec: new Cosmos.Query.Core.SqlQuerySpec("SELECT * FROM c"),
feedRangeState: new FeedRangeState(ranges[0], state),
partitionKey: null,
diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/Pipeline/NonStreamingOrderByQueryTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/Pipeline/NonStreamingOrderByQueryTests.cs
index 6d41312496..e579a86ee1 100644
--- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/Pipeline/NonStreamingOrderByQueryTests.cs
+++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/Pipeline/NonStreamingOrderByQueryTests.cs
@@ -331,7 +331,8 @@ private static async Task RunParityTests(
queryPaginationOptions: new QueryExecutionOptions(pageSizeHint: pageSize),
maxConcurrency: maxConcurrency,
nonStreamingOrderBy: nonStreamingOrderBy,
- continuationToken: null);
+ continuationToken: null,
+ containerQueryProperties: new Cosmos.Query.Core.QueryClient.ContainerQueryProperties());
Assert.IsTrue(pipelineStage.Succeeded);
diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/Pipeline/OrderByCrossPartitionQueryPipelineStageTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/Pipeline/OrderByCrossPartitionQueryPipelineStageTests.cs
index d0ef64dc5e..de1b1b32cf 100644
--- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/Pipeline/OrderByCrossPartitionQueryPipelineStageTests.cs
+++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/Pipeline/OrderByCrossPartitionQueryPipelineStageTests.cs
@@ -79,7 +79,8 @@ public void MonadicCreate_NullContinuationToken()
queryPaginationOptions: new QueryExecutionOptions(pageSizeHint: 10),
maxConcurrency: 10,
nonStreamingOrderBy: false,
- continuationToken: null);
+ continuationToken: null,
+ containerQueryProperties: new Cosmos.Query.Core.QueryClient.ContainerQueryProperties());
Assert.IsTrue(monadicCreate.Succeeded);
}
@@ -100,7 +101,8 @@ public void MonadicCreate_NonCosmosArrayContinuationToken()
queryPaginationOptions: new QueryExecutionOptions(pageSizeHint: 10),
maxConcurrency: 10,
nonStreamingOrderBy: false,
- continuationToken: CosmosObject.Create(new Dictionary()));
+ continuationToken: CosmosObject.Create(new Dictionary()),
+ containerQueryProperties: new Cosmos.Query.Core.QueryClient.ContainerQueryProperties());
Assert.IsTrue(monadicCreate.Failed);
Assert.IsTrue(monadicCreate.InnerMostException is MalformedContinuationTokenException);
}
@@ -122,7 +124,8 @@ public void MonadicCreate_EmptyArrayContinuationToken()
queryPaginationOptions: new QueryExecutionOptions(pageSizeHint: 10),
maxConcurrency: 10,
nonStreamingOrderBy: false,
- continuationToken: CosmosArray.Create(new List()));
+ continuationToken: CosmosArray.Create(new List()),
+ containerQueryProperties: new Cosmos.Query.Core.QueryClient.ContainerQueryProperties());
Assert.IsTrue(monadicCreate.Failed);
Assert.IsTrue(monadicCreate.InnerMostException is MalformedContinuationTokenException);
}
@@ -144,7 +147,8 @@ public void MonadicCreate_NonParallelContinuationToken()
queryPaginationOptions: new QueryExecutionOptions(pageSizeHint: 10),
maxConcurrency: 10,
nonStreamingOrderBy: false,
- continuationToken: CosmosArray.Create(new List() { CosmosString.Create("asdf") }));
+ continuationToken: CosmosArray.Create(new List() { CosmosString.Create("asdf") }),
+ containerQueryProperties: new Cosmos.Query.Core.QueryClient.ContainerQueryProperties());
Assert.IsTrue(monadicCreate.Failed);
Assert.IsTrue(monadicCreate.InnerMostException is MalformedContinuationTokenException);
}
@@ -185,7 +189,8 @@ public void MonadicCreate_SingleOrderByContinuationToken()
new List()
{
OrderByContinuationToken.ToCosmosElement(orderByContinuationToken)
- }));
+ }),
+ containerQueryProperties: new Cosmos.Query.Core.QueryClient.ContainerQueryProperties());
Assert.IsTrue(monadicCreate.Succeeded);
}
@@ -230,7 +235,8 @@ public void MonadicCreate_SingleOrderByContinuationToken()
new List()
{
OrderByContinuationToken.ToCosmosElement(orderByContinuationToken)
- }));
+ }),
+ containerQueryProperties: new Cosmos.Query.Core.QueryClient.ContainerQueryProperties());
Assert.IsTrue(monadicCreate.Succeeded);
}
}
@@ -291,7 +297,8 @@ public void MonadicCreate_MultipleOrderByContinuationToken()
{
OrderByContinuationToken.ToCosmosElement(orderByContinuationToken1),
OrderByContinuationToken.ToCosmosElement(orderByContinuationToken2)
- }));
+ }),
+ containerQueryProperties: new Cosmos.Query.Core.QueryClient.ContainerQueryProperties());
Assert.IsTrue(monadicCreate.Succeeded);
}
}
@@ -333,7 +340,8 @@ public void MonadicCreate_OrderByWithResumeValues()
new List()
{
OrderByContinuationToken.ToCosmosElement(orderByContinuationToken)
- }));
+ }),
+ containerQueryProperties: new Cosmos.Query.Core.QueryClient.ContainerQueryProperties());
Assert.IsTrue(monadicCreate.Succeeded);
}
@@ -375,7 +383,8 @@ public void MonadicCreate_OrderByWithResumeValues()
new List()
{
OrderByContinuationToken.ToCosmosElement(orderByContinuationToken)
- }));
+ }),
+ containerQueryProperties: new Cosmos.Query.Core.QueryClient.ContainerQueryProperties());
Assert.IsTrue(monadicCreate.Succeeded);
}
}
@@ -427,7 +436,8 @@ public async Task TestFormattedFiltersForTargetPartitionWithContinuationTokenAsy
queryPaginationOptions: new QueryExecutionOptions(pageSizeHint: 1),
maxConcurrency: 0,
nonStreamingOrderBy: false,
- continuationToken: CosmosElement.Parse(continuationToken));
+ continuationToken: CosmosElement.Parse(continuationToken),
+ containerQueryProperties: new Cosmos.Query.Core.QueryClient.ContainerQueryProperties());
Assert.IsTrue(monadicCreate.Succeeded);
IQueryPipelineStage queryPipelineStage = monadicCreate.Result;
@@ -465,7 +475,8 @@ FROM c
queryPaginationOptions: new QueryExecutionOptions(pageSizeHint: 10),
maxConcurrency: 10,
nonStreamingOrderBy: nonStreamingOrderBy,
- continuationToken: null);
+ continuationToken: null,
+ containerQueryProperties: new Cosmos.Query.Core.QueryClient.ContainerQueryProperties());
Assert.IsTrue(monadicCreate.Succeeded);
IQueryPipelineStage queryPipelineStage = monadicCreate.Result;
@@ -514,7 +525,8 @@ FROM c
queryPaginationOptions: new QueryExecutionOptions(pageSizeHint: 10),
maxConcurrency: 10,
nonStreamingOrderBy: false,
- continuationToken: null);
+ continuationToken: null,
+ containerQueryProperties: new Cosmos.Query.Core.QueryClient.ContainerQueryProperties());
Assert.IsTrue(monadicCreate.Succeeded);
IQueryPipelineStage queryPipelineStage = monadicCreate.Result;
@@ -576,7 +588,8 @@ FROM c
queryPaginationOptions: new QueryExecutionOptions(pageSizeHint: 10),
maxConcurrency: 10,
nonStreamingOrderBy: nonStreamingOrderBy,
- continuationToken: continuationToken);
+ continuationToken: continuationToken,
+ containerQueryProperties: new Cosmos.Query.Core.QueryClient.ContainerQueryProperties());
monadicQueryPipelineStage.ThrowIfFailed();
IQueryPipelineStage queryPipelineStage = monadicQueryPipelineStage.Result;
diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/QueryPlanBaselineTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/QueryPlanBaselineTests.cs
index ef68af02bc..7ecce80a1e 100644
--- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/QueryPlanBaselineTests.cs
+++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/QueryPlanBaselineTests.cs
@@ -22,7 +22,7 @@
///
[TestClass]
public class QueryPlanBaselineTests : BaselineTests
- {
+ {
[TestMethod]
[Owner("brchon")]
public void Aggregates()