Skip to content

Commit

Permalink
[Internal] Query: Removes Ode changes which rejected certain types of…
Browse files Browse the repository at this point in the history
… queries which were previously supported on 3.36.0 (#4146)

* Revert "[Internal] Query: Fixes minor issues with TestQueryValidityCheckWithODEAsync (#4105)"

This reverts commit 101b9b1.

* Revert "[Internal] Query: Adds check to detect unsupported queries for Optimistic Direct Execution code path (#4090)"

This reverts commit f312f6a.
  • Loading branch information
akotalwar authored Oct 22, 2023
1 parent df83991 commit 3f48041
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 86 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ namespace Microsoft.Azure.Cosmos.Query.Core.ExecutionContext
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Text.RegularExpressions;
using System.Threading;
using System.Threading.Tasks;
using global::Azure;
using Microsoft.Azure.Cosmos;
using Microsoft.Azure.Cosmos.CosmosElements;
using Microsoft.Azure.Cosmos.Pagination;
Expand All @@ -33,12 +33,10 @@ namespace Microsoft.Azure.Cosmos.Query.Core.ExecutionContext
internal static class CosmosQueryExecutionContextFactory
{
private const string InternalPartitionKeyDefinitionProperty = "x-ms-query-partitionkey-definition";
private const string QueryInspectionPattern = @"\s+(GROUP\s+BY\s+|COUNT\s*\(|MIN\s*\(|MAX\s*\(|AVG\s*\(|SUM\s*\(|DISTINCT\s+)";
private const string OptimisticDirectExecution = "OptimisticDirectExecution";
private const string Passthrough = "Passthrough";
private const string Specialized = "Specialized";
private const int PageSizeFactorForTop = 5;
private static readonly Regex QueryInspectionRegex = new Regex(QueryInspectionPattern, RegexOptions.IgnoreCase | RegexOptions.Compiled);

public static IQueryPipelineStage Create(
DocumentContainer documentContainer,
Expand Down Expand Up @@ -149,14 +147,14 @@ private static async Task<TryCatch<IQueryPipelineStage>> TryCreateCoreContextAsy

if (targetRange != null)
{
return await TryCreateSinglePartitionExecutionContextAsync(
return await TryCreateExecutionContextAsync(
documentContainer,
partitionedQueryExecutionInfo: null,
cosmosQueryContext,
containerQueryProperties,
inputParameters,
targetRange,
createQueryPipelineTrace,
trace,
cancellationToken);
}

Expand Down Expand Up @@ -299,7 +297,7 @@ private static async Task<TryCatch<IQueryPipelineStage>> TryCreateFromPartitione

if (targetRange != null)
{
tryCreatePipelineStage = await TryCreateSinglePartitionExecutionContextAsync(
tryCreatePipelineStage = await TryCreateExecutionContextAsync(
documentContainer,
partitionedQueryExecutionInfo,
cosmosQueryContext,
Expand Down Expand Up @@ -330,7 +328,7 @@ private static async Task<TryCatch<IQueryPipelineStage>> TryCreateFromPartitione
return tryCreatePipelineStage;
}

private static async Task<TryCatch<IQueryPipelineStage>> TryCreateSinglePartitionExecutionContextAsync(
private static async Task<TryCatch<IQueryPipelineStage>> TryCreateExecutionContextAsync(
DocumentContainer documentContainer,
PartitionedQueryExecutionInfo partitionedQueryExecutionInfo,
CosmosQueryContext cosmosQueryContext,
Expand All @@ -340,17 +338,6 @@ private static async Task<TryCatch<IQueryPipelineStage>> TryCreateSinglePartitio
ITrace trace,
CancellationToken cancellationToken)
{
// Retrieve the query plan in a subset of cases to ensure the query is valid before creating the Ode pipeline
if (partitionedQueryExecutionInfo == null && QueryInspectionRegex.IsMatch(inputParameters.SqlQuerySpec.QueryText))
{
partitionedQueryExecutionInfo = await GetPartitionedQueryExecutionInfoAsync(
cosmosQueryContext,
inputParameters,
containerQueryProperties,
trace,
cancellationToken);
}

// Test code added to confirm the correct pipeline is being utilized
SetTestInjectionPipelineType(inputParameters, OptimisticDirectExecution);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,6 @@ public void PositiveOptimisticDirectExecutionOutput()
partitionKeyPath: @"/pk",
partitionKeyValue: null),
};

this.ExecuteTestSuite(testVariations);
}

Expand Down Expand Up @@ -380,11 +379,11 @@ public async Task TestPipelineForContinuationTokenOnSinglePartitionAsync()
{
int numItems = 100;
OptimisticDirectExecutionTestInput input = CreateInput(
description: @"Single Partition Key and Value Field",
description: @"Single Partition Key and Value Field",
query: "SELECT * FROM c",
expectedOptimisticDirectExecution: true,
partitionKeyPath: @"/pk",
partitionKeyValue: "a");
expectedOptimisticDirectExecution: true,
partitionKeyPath: @"/pk",
partitionKeyValue: "a");

int result = await this.GetPipelineAndDrainAsync(
input,
Expand All @@ -395,63 +394,6 @@ public async Task TestPipelineForContinuationTokenOnSinglePartitionAsync()
Assert.AreEqual(numItems, result);
}

// test checks that the Ode code path ensures that a query is valid before sending it to the backend
// these queries with previous ODE implementation would have succeeded. However, with the new query validity check, they should all throw an exception
[TestMethod]
public async Task TestQueryValidityCheckWithODEAsync()
{
const string UnsupportedSelectStarInGroupBy = "'SELECT *' is not allowed with GROUP BY";
const string UnsupportedCompositeAggregate = "Compositions of aggregates and other expressions are not allowed.";
const string UnsupportedNestedAggregateExpression = "Cannot perform an aggregate function on an expression containing an aggregate or a subquery.";
const string UnsupportedSelectLisWithAggregateOrGroupByExpression = "invalid in the select list because it is not contained in either an aggregate function or the GROUP BY clause";

List<(string Query, string ExpectedMessage)> testVariations = new List<(string Query, string ExpectedMessage)>
{
("SELECT COUNT (1) + 5 FROM c", UnsupportedCompositeAggregate),
("SELECT MIN(c.price) + 10 FROM c", UnsupportedCompositeAggregate),
("SELECT MAX(c.price) - 4 FROM c", UnsupportedCompositeAggregate),
("SELECT SUM (c.price) + 20 FROM c",UnsupportedCompositeAggregate),
("SELECT AVG(c.price) * 50 FROM c", UnsupportedCompositeAggregate),
("SELECT * from c GROUP BY c.name", UnsupportedSelectStarInGroupBy),
("SELECT SUM(c.sales) AS totalSales, AVG(SUM(c.salesAmount)) AS averageTotalSales\n\n\nFROM c", UnsupportedNestedAggregateExpression),
("SELECT c.category, c.price, COUNT(c) FROM c GROUP BY c.category\r\n", UnsupportedSelectLisWithAggregateOrGroupByExpression)
};

List<(string, string)> testVariationsWithCaseSensitivity = new List<(string, string)>();
foreach ((string Query, string ExpectedMessage) testCase in testVariations)
{
testVariationsWithCaseSensitivity.Add((testCase.Query, testCase.ExpectedMessage));
testVariationsWithCaseSensitivity.Add((testCase.Query.ToLower(), testCase.ExpectedMessage));
testVariationsWithCaseSensitivity.Add((testCase.Query.ToUpper(), testCase.ExpectedMessage));
}

foreach ((string Query, string ExpectedMessage) testCase in testVariationsWithCaseSensitivity)
{
OptimisticDirectExecutionTestInput input = CreateInput(
description: @"Unsupported queries in CosmosDB that were previously supported by Ode pipeline and returning wrong results",
query: testCase.Query,
expectedOptimisticDirectExecution: true,
partitionKeyPath: @"/pk",
partitionKeyValue: "a");

try
{
int result = await this.GetPipelineAndDrainAsync(
input,
numItems: 100,
isMultiPartition: false,
expectedContinuationTokenCount: 0,
requiresDist: true);
Assert.Fail("Invalid query being executed did not result in an exception");
}
catch (Exception ex)
{
Assert.IsTrue(ex.InnerException.Message.Contains(testCase.ExpectedMessage));
continue;
}
}
}

// test to check if pipeline handles a 410 exception properly and returns all the documents.
[TestMethod]
public async Task TestPipelineForGoneExceptionOnSingleAndMultiplePartitionAsync()
Expand Down Expand Up @@ -628,7 +570,7 @@ private async Task<int> GetPipelineAndDrainAsync(OptimisticDirectExecutionTestIn
return documents.Count;
}

internal static TryCatch<PartitionedQueryExecutionInfo> TryGetPartitionedQueryExecutionInfo(string querySpecJsonString, PartitionKeyDefinition pkDefinition)
internal static PartitionedQueryExecutionInfo GetPartitionedQueryExecutionInfo(string querySpecJsonString, PartitionKeyDefinition pkDefinition)
{
TryCatch<PartitionedQueryExecutionInfo> tryGetQueryPlan = QueryPartitionProviderTestInstance.Object.TryGetPartitionedQueryExecutionInfo(
querySpecJsonString: querySpecJsonString,
Expand All @@ -641,7 +583,7 @@ internal static TryCatch<PartitionedQueryExecutionInfo> TryGetPartitionedQueryEx
useSystemPrefix: false,
geospatialType: Cosmos.GeospatialType.Geography);

return tryGetQueryPlan;
return tryGetQueryPlan.Result;
}

private static async Task<IQueryPipelineStage> GetOdePipelineAsync(OptimisticDirectExecutionTestInput input, DocumentContainer documentContainer, QueryRequestOptions queryRequestOptions)
Expand Down Expand Up @@ -789,6 +731,7 @@ public override OptimisticDirectExecutionTestOutput ExecuteTest(OptimisticDirect
using StreamReader streamReader = new(serializerCore.ToStreamSqlQuerySpec(new SqlQuerySpec(input.Query), Documents.ResourceType.Document));
string sqlQuerySpecJsonString = streamReader.ReadToEnd();

PartitionedQueryExecutionInfo partitionedQueryExecutionInfo = GetPartitionedQueryExecutionInfo(sqlQuerySpecJsonString, input.PartitionKeyDefinition);
CosmosQueryExecutionContextFactory.InputParameters inputParameters = new CosmosQueryExecutionContextFactory.InputParameters(
sqlQuerySpec: new SqlQuerySpec(input.Query),
initialUserContinuationToken: input.ContinuationToken,
Expand All @@ -797,8 +740,8 @@ public override OptimisticDirectExecutionTestOutput ExecuteTest(OptimisticDirect
maxItemCount: queryRequestOptions.MaxItemCount,
maxBufferedItemCount: queryRequestOptions.MaxBufferedItemCount,
partitionKey: input.PartitionKeyValue,
properties: new Dictionary<string, object>() { { "x-ms-query-partitionkey-definition", input.PartitionKeyDefinition } },
partitionedQueryExecutionInfo: null,
properties: queryRequestOptions.Properties,
partitionedQueryExecutionInfo: partitionedQueryExecutionInfo,
executionEnvironment: null,
returnResultsInDeterministicOrder: null,
forcePassthrough: false,
Expand Down Expand Up @@ -1100,8 +1043,7 @@ public override async Task<TryCatch<PartitionedQueryExecutionInfo>> TryGetPartit
using StreamReader streamReader = new(serializerCore.ToStreamSqlQuerySpec(sqlQuerySpec, Documents.ResourceType.Document));
string sqlQuerySpecJsonString = streamReader.ReadToEnd();

TryCatch<PartitionedQueryExecutionInfo> queryPlan = OptimisticDirectExecutionQueryBaselineTests.TryGetPartitionedQueryExecutionInfo(sqlQuerySpecJsonString, partitionKeyDefinition);
PartitionedQueryExecutionInfo partitionedQueryExecutionInfo = queryPlan.Succeeded ? queryPlan.Result : throw queryPlan.Exception;
PartitionedQueryExecutionInfo partitionedQueryExecutionInfo = OptimisticDirectExecutionQueryBaselineTests.GetPartitionedQueryExecutionInfo(sqlQuerySpecJsonString, partitionKeyDefinition);
return TryCatch<PartitionedQueryExecutionInfo>.FromResult(partitionedQueryExecutionInfo);
}
}
Expand Down

0 comments on commit 3f48041

Please sign in to comment.