Skip to content

Commit

Permalink
Addressed comments.
Browse files Browse the repository at this point in the history
  • Loading branch information
adityasa committed Jul 16, 2024
1 parent 46992ea commit ba93330
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ namespace Microsoft.Azure.Cosmos.Query.Core.Pipeline.CrossPartition
using System;
using Microsoft.Azure.Cosmos.Query.Core.QueryClient;

internal static class HierarchicalPartitionUtil
internal static class HierarchicalPartitionUtils
{
/// <summary>
/// Updates the FeedRange to limit the scope of incoming feedRange to logical partition within a single physical partition.
Expand Down Expand Up @@ -37,8 +37,7 @@ public static FeedRangeInternal LimitFeedRangeToSinglePartition(PartitionKey? pa
"EffectiveRangesForPartitionKey should be populated when PK is specified in request options.");
}

foreach (Documents.Routing.Range<String> epkForPartitionKey in
containerQueryProperties.EffectiveRangesForPartitionKey)
foreach (Documents.Routing.Range<String> epkForPartitionKey in containerQueryProperties.EffectiveRangesForPartitionKey)
{
if (Documents.Routing.Range<String>.CheckOverlapping(
feedRangeEpk.Range,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ private sealed class InitializationParameters
{
public IDocumentContainer DocumentContainer { get; }

public ContainerQueryProperties ContainerQueryProperties { get; }

public SqlQuerySpec SqlQuerySpec { get; }

public IReadOnlyList<FeedRangeEpk> TargetRanges { get; }
Expand All @@ -53,28 +55,26 @@ private sealed class InitializationParameters

public QueryExecutionOptions QueryPaginationOptions { get; }

public ContainerQueryProperties ContainerQueryProperties { get; }

public int MaxConcurrency { get; }

public InitializationParameters(
IDocumentContainer documentContainer,
ContainerQueryProperties containerQueryProperties,
SqlQuerySpec sqlQuerySpec,
IReadOnlyList<FeedRangeEpk> targetRanges,
PartitionKey? partitionKey,
IReadOnlyList<OrderByColumn> orderByColumns,
QueryExecutionOptions queryPaginationOptions,
int maxConcurrency,
ContainerQueryProperties containerQueryProperties)
int maxConcurrency)
{
this.DocumentContainer = documentContainer ?? throw new ArgumentNullException(nameof(documentContainer));
this.ContainerQueryProperties = containerQueryProperties;
this.SqlQuerySpec = sqlQuerySpec ?? throw new ArgumentNullException(nameof(sqlQuerySpec));
this.TargetRanges = targetRanges ?? throw new ArgumentNullException(nameof(targetRanges));
this.PartitionKey = partitionKey;
this.OrderByColumns = orderByColumns ?? throw new ArgumentNullException(nameof(orderByColumns));
this.QueryPaginationOptions = queryPaginationOptions ?? throw new ArgumentNullException(nameof(queryPaginationOptions));
this.MaxConcurrency = maxConcurrency;
this.ContainerQueryProperties = containerQueryProperties;
}
}

Expand All @@ -87,15 +87,15 @@ private enum ExecutionState

public static TryCatch<IQueryPipelineStage> MonadicCreate(
IDocumentContainer documentContainer,
ContainerQueryProperties containerQueryProperties,
SqlQuerySpec sqlQuerySpec,
IReadOnlyList<FeedRangeEpk> targetRanges,
Cosmos.PartitionKey? partitionKey,
IReadOnlyList<OrderByColumn> orderByColumns,
QueryExecutionOptions queryPaginationOptions,
int maxConcurrency,
bool nonStreamingOrderBy,
CosmosElement continuationToken,
ContainerQueryProperties containerQueryProperties)
CosmosElement continuationToken)
{
if (documentContainer == null)
{
Expand Down Expand Up @@ -131,14 +131,14 @@ public static TryCatch<IQueryPipelineStage> MonadicCreate(
{
return StreamingOrderByCrossPartitionQueryPipelineStage.MonadicCreate(
documentContainer,
containerQueryProperties,
sqlQuerySpec,
targetRanges,
partitionKey,
orderByColumns,
queryPaginationOptions,
maxConcurrency,
continuationToken,
containerQueryProperties);
continuationToken);
}

SqlQuerySpec rewrittenQueryForOrderBy = new SqlQuerySpec(
Expand All @@ -147,13 +147,13 @@ public static TryCatch<IQueryPipelineStage> MonadicCreate(

return TryCatch<IQueryPipelineStage>.FromResult(NonStreamingOrderByPipelineStage.Create(
documentContainer,
containerQueryProperties,
rewrittenQueryForOrderBy,
targetRanges,
partitionKey,
orderByColumns,
queryPaginationOptions,
maxConcurrency,
containerQueryProperties));
maxConcurrency));
}

private static async ValueTask MoveNextAsync_InitializeAsync_HandleSplitAsync(
Expand Down Expand Up @@ -201,13 +201,13 @@ private static async ValueTask MoveNextAsync_InitializeAsync_HandleSplitAsync(
// We maintain the current enumerator's range and let the RequestInvokerHandler logic kick in
OrderByQueryPartitionRangePageAsyncEnumerator childPaginator = OrderByQueryPartitionRangePageAsyncEnumerator.Create(
documentContainer,
containerQueryProperties,
uninitializedEnumerator.SqlQuerySpec,
new FeedRangeState<QueryState>(uninitializedEnumerator.FeedRangeState.FeedRange, uninitializedEnumerator.StartOfPageState),
partitionKey: null,
uninitializedEnumerator.QueryPaginationOptions,
uninitializedEnumerator.Filter,
PrefetchPolicy.PrefetchSinglePage,
containerQueryProperties);
PrefetchPolicy.PrefetchSinglePage);
uninitializedEnumeratorsAndTokens.Enqueue((childPaginator, token));
}
else
Expand All @@ -219,13 +219,13 @@ private static async ValueTask MoveNextAsync_InitializeAsync_HandleSplitAsync(

OrderByQueryPartitionRangePageAsyncEnumerator childPaginator = OrderByQueryPartitionRangePageAsyncEnumerator.Create(
documentContainer,
containerQueryProperties,
uninitializedEnumerator.SqlQuerySpec,
new FeedRangeState<QueryState>(childRange, uninitializedEnumerator.StartOfPageState),
partitionKey: null,
uninitializedEnumerator.QueryPaginationOptions,
uninitializedEnumerator.Filter,
PrefetchPolicy.PrefetchSinglePage,
containerQueryProperties);
PrefetchPolicy.PrefetchSinglePage);
uninitializedEnumeratorsAndTokens.Enqueue((childPaginator, token));
}
}
Expand Down Expand Up @@ -810,14 +810,14 @@ public static IQueryPipelineStage Create(

public static TryCatch<IQueryPipelineStage> MonadicCreate(
IDocumentContainer documentContainer,
ContainerQueryProperties containerQueryProperties,
SqlQuerySpec sqlQuerySpec,
IReadOnlyList<FeedRangeEpk> targetRanges,
Cosmos.PartitionKey? partitionKey,
IReadOnlyList<OrderByColumn> orderByColumns,
QueryExecutionOptions queryPaginationOptions,
int maxConcurrency,
CosmosElement continuationToken,
ContainerQueryProperties containerQueryProperties)
CosmosElement continuationToken)
{
// TODO (brchon): For now we are not honoring non deterministic ORDER BY queries, since there is a bug in the continuation logic.
// We can turn it back on once the bug is fixed.
Expand All @@ -834,13 +834,13 @@ public static TryCatch<IQueryPipelineStage> MonadicCreate(
enumeratorsAndTokens = targetRanges
.Select(range => (OrderByQueryPartitionRangePageAsyncEnumerator.Create(
documentContainer,
containerQueryProperties,
rewrittenQueryForOrderBy,
new FeedRangeState<QueryState>(range, state: default),
partitionKey,
queryPaginationOptions,
TrueFilter,
PrefetchPolicy.PrefetchSinglePage,
containerQueryProperties),
PrefetchPolicy.PrefetchSinglePage),
(OrderByContinuationToken)null))
.ToList();
}
Expand Down Expand Up @@ -923,13 +923,13 @@ public static TryCatch<IQueryPipelineStage> MonadicCreate(
OrderByContinuationToken token = kvp.Value;
OrderByQueryPartitionRangePageAsyncEnumerator remoteEnumerator = OrderByQueryPartitionRangePageAsyncEnumerator.Create(
documentContainer,
containerQueryProperties,
leftQuerySpec,
new FeedRangeState<QueryState>(range, token?.ParallelContinuationToken?.Token != null ? new QueryState(CosmosString.Create(token.ParallelContinuationToken.Token)) : null),
partitionKey,
queryPaginationOptions,
filter: null,
PrefetchPolicy.PrefetchSinglePage,
containerQueryProperties);
PrefetchPolicy.PrefetchSinglePage);

enumeratorsAndTokens.Add((remoteEnumerator, token));
}
Expand All @@ -951,13 +951,13 @@ public static TryCatch<IQueryPipelineStage> MonadicCreate(

OrderByQueryPartitionRangePageAsyncEnumerator remoteEnumerator = OrderByQueryPartitionRangePageAsyncEnumerator.Create(
documentContainer,
containerQueryProperties,
targetQuerySpec,
new FeedRangeState<QueryState>(range, token?.ParallelContinuationToken?.Token != null ? new QueryState(CosmosString.Create(token.ParallelContinuationToken.Token)) : null),
partitionKey,
queryPaginationOptions,
filter: null,
PrefetchPolicy.PrefetchSinglePage,
containerQueryProperties);
PrefetchPolicy.PrefetchSinglePage);

enumeratorsAndTokens.Add((remoteEnumerator, token));
}
Expand All @@ -974,13 +974,13 @@ public static TryCatch<IQueryPipelineStage> MonadicCreate(
OrderByContinuationToken token = kvp.Value;
OrderByQueryPartitionRangePageAsyncEnumerator remoteEnumerator = OrderByQueryPartitionRangePageAsyncEnumerator.Create(
documentContainer,
containerQueryProperties,
rightQuerySpec,
new FeedRangeState<QueryState>(range, token?.ParallelContinuationToken?.Token != null ? new QueryState(CosmosString.Create(token.ParallelContinuationToken.Token)) : null),
partitionKey,
queryPaginationOptions,
filter: null,
PrefetchPolicy.PrefetchSinglePage,
containerQueryProperties);
PrefetchPolicy.PrefetchSinglePage);

enumeratorsAndTokens.Add((remoteEnumerator, token));
}
Expand Down Expand Up @@ -1014,13 +1014,13 @@ public static TryCatch<IQueryPipelineStage> MonadicCreate(
OrderByContinuationToken token = kvp.Value;
OrderByQueryPartitionRangePageAsyncEnumerator remoteEnumerator = OrderByQueryPartitionRangePageAsyncEnumerator.Create(
documentContainer,
containerQueryProperties,
rewrittenQueryForOrderBy,
new FeedRangeState<QueryState>(range, token?.ParallelContinuationToken?.Token != null ? new QueryState(CosmosString.Create(token.ParallelContinuationToken.Token)) : null),
partitionKey,
queryPaginationOptions,
filter,
PrefetchPolicy.PrefetchSinglePage,
containerQueryProperties);
PrefetchPolicy.PrefetchSinglePage);

enumeratorsAndTokens.Add((remoteEnumerator, token));
}
Expand Down Expand Up @@ -1765,12 +1765,12 @@ private async Task<BufferedOrderByResults> MoveNextAsync_InitializeAsync(ITrace
{
ITracingAsyncEnumerator<TryCatch<OrderByQueryPage>> enumerator = await OrderByCrossPartitionRangePageEnumerator.CreateAsync(
this.parameters.DocumentContainer,
this.parameters.ContainerQueryProperties,
this.parameters.SqlQuerySpec,
this.parameters.TargetRanges,
this.parameters.PartitionKey,
this.parameters.QueryPaginationOptions,
this.parameters.MaxConcurrency,
this.parameters.ContainerQueryProperties,
trace,
cancellationToken);

Expand All @@ -1789,27 +1789,27 @@ private async Task<BufferedOrderByResults> MoveNextAsync_InitializeAsync(ITrace

public static IQueryPipelineStage Create(
IDocumentContainer documentContainer,
ContainerQueryProperties containerQueryProperties,
SqlQuerySpec sqlQuerySpec,
IReadOnlyList<FeedRangeEpk> targetRanges,
Cosmos.PartitionKey? partitionKey,
IReadOnlyList<OrderByColumn> orderByColumns,
QueryExecutionOptions queryPaginationOptions,
int maxConcurrency,
ContainerQueryProperties containerQueryProperties)
int maxConcurrency)
{
int pageSize = queryPaginationOptions.PageSizeLimit.GetValueOrDefault(MaximumPageSize) > 0 ?
Math.Min(MaximumPageSize, queryPaginationOptions.PageSizeLimit.Value) :
MaximumPageSize;

InitializationParameters parameters = new InitializationParameters(
documentContainer,
containerQueryProperties,
sqlQuerySpec,
targetRanges,
partitionKey,
orderByColumns,
queryPaginationOptions,
maxConcurrency,
containerQueryProperties);
maxConcurrency);

return new NonStreamingOrderByPipelineStage(
parameters,
Expand Down Expand Up @@ -1839,12 +1839,12 @@ private OrderByCrossPartitionRangePageEnumerator(

public static async Task<ITracingAsyncEnumerator<TryCatch<OrderByQueryPage>>> CreateAsync(
IDocumentContainer documentContainer,
ContainerQueryProperties containerQueryProperties,
SqlQuerySpec sqlQuerySpec,
IReadOnlyList<FeedRangeEpk> targetRanges,
Cosmos.PartitionKey? partitionKey,
QueryExecutionOptions queryPaginationOptions,
int maxConcurrency,
ContainerQueryProperties containerQueryProperties,
ITrace trace,
CancellationToken cancellationToken)
{
Expand All @@ -1854,13 +1854,13 @@ public static async Task<ITracingAsyncEnumerator<TryCatch<OrderByQueryPage>>> Cr
{
OrderByQueryPartitionRangePageAsyncEnumerator enumerator = OrderByQueryPartitionRangePageAsyncEnumerator.Create(
documentContainer,
containerQueryProperties,
sqlQuerySpec,
new FeedRangeState<QueryState>(range, state: null),
partitionKey,
queryPaginationOptions,
filter: null,
PrefetchPolicy.PrefetchAll,
containerQueryProperties);
PrefetchPolicy.PrefetchAll);

enumeratorsAndTokens.Enqueue(new (enumerator, null));
}
Expand Down
Loading

0 comments on commit ba93330

Please sign in to comment.