From b095d873a5a7d29adeb7cf015aaef4189dea8a13 Mon Sep 17 00:00:00 2001 From: Kevin Wilfong Date: Tue, 17 Dec 2024 15:08:02 -0800 Subject: [PATCH] misc: Add support for TimestampWithTimeZone in AggregationFuzzer (#11897) Summary: Pull Request resolved: https://github.com/facebookincubator/velox/pull/11897 This change adds support for TimestampWithTimeZone in AggregationFuzzer as partition keys (it cannot be a sorting key as it does not support +/-) with PrestoQueryRunner. The primary challenge is that PrestoQueryRunner does not support TimestampWithTimeZone as an input (it's not supported in DWRF). To address this the general strategy is when we see a TimestampWithTimeZone in the input types we convert it to a BIGINT millisUtc and VARCHAR time_zone and then call from_unixtime as part of an initial projection in the query to produce TimestampWithTimeZone values. More concretely I added a general purpose utility library PrestoQueryRunnerIntermediateTypeTransforms to handle converting Vectors of intermediate only types (custom types not supported in the input) to Vectors of types supported in the input, and to generate expressions to do the conversion back to the intermediate only type as part of the query. This is supported when the types are nested in complex types as well. I added a function inputProjections to ReferenceQueryRunner which takes in the input batch and does this conversion and generates the expressions. The rest of this change is just plumbing these projections into all plans throughout AggregationFuzzer, as well as the plans generated in the ResultVerifiers. If this approach is acceptable, it's straight forward to then support custom types like TimestampWithTimeZone as arguments to aggregates in the fuzzer, as well as add support in other fuzzers like JoinFuzzer. Differential Revision: D67346942 --- velox/exec/fuzzer/AggregationFuzzer.cpp | 98 ++++- velox/exec/fuzzer/AggregationFuzzerBase.cpp | 16 +- velox/exec/fuzzer/AggregationFuzzerBase.h | 8 + velox/exec/fuzzer/CMakeLists.txt | 9 +- velox/exec/fuzzer/PrestoQueryRunner.cpp | 78 +++- velox/exec/fuzzer/PrestoQueryRunner.h | 6 + ...oQueryRunnerIntermediateTypeTransforms.cpp | 408 ++++++++++++++++++ ...stoQueryRunnerIntermediateTypeTransforms.h | 39 ++ velox/exec/fuzzer/ReferenceQueryRunner.h | 23 + velox/exec/fuzzer/ResultVerifier.h | 2 + velox/exec/fuzzer/TransformResultVerifier.h | 1 + velox/exec/fuzzer/WindowFuzzer.cpp | 18 +- velox/exec/fuzzer/WindowFuzzer.h | 2 + velox/exec/tests/PrestoQueryRunnerTest.cpp | 12 +- .../fuzzer/ApproxDistinctResultVerifier.h | 4 + .../fuzzer/ApproxPercentileResultVerifier.h | 16 +- .../fuzzer/ArbitraryResultVerifier.h | 2 + .../fuzzer/MinMaxByResultVerifier.cpp | 5 +- .../prestosql/fuzzer/MinMaxByResultVerifier.h | 1 + 19 files changed, 700 insertions(+), 48 deletions(-) create mode 100644 velox/exec/fuzzer/PrestoQueryRunnerIntermediateTypeTransforms.cpp create mode 100644 velox/exec/fuzzer/PrestoQueryRunnerIntermediateTypeTransforms.h diff --git a/velox/exec/fuzzer/AggregationFuzzer.cpp b/velox/exec/fuzzer/AggregationFuzzer.cpp index d778f39df475..005a90290ad6 100644 --- a/velox/exec/fuzzer/AggregationFuzzer.cpp +++ b/velox/exec/fuzzer/AggregationFuzzer.cpp @@ -97,6 +97,7 @@ class AggregationFuzzer : public AggregationFuzzerBase { const std::vector& sortingKeys, const std::string& aggregate, const std::vector& input, + const std::vector& projections, bool customVerification, bool enableWindowVerification); @@ -106,6 +107,7 @@ class AggregationFuzzer : public AggregationFuzzerBase { const std::vector& aggregates, const std::vector& masks, const std::vector& input, + const std::vector& projections, bool customVerification, const std::shared_ptr& customVerifier); @@ -115,6 +117,7 @@ class AggregationFuzzer : public AggregationFuzzerBase { const std::string& aggregate, const std::vector& masks, const std::vector& input, + const std::vector& projections, bool customVerification, const std::shared_ptr& customVerifier); @@ -126,6 +129,7 @@ class AggregationFuzzer : public AggregationFuzzerBase { const std::vector& plans, bool customVerification, const std::vector& input, + const std::vector& projections, const std::shared_ptr& customVerifier, int32_t maxDrivers = 2, bool testWithSpilling = true); @@ -136,6 +140,7 @@ class AggregationFuzzer : public AggregationFuzzerBase { const std::string& aggregate, const std::vector& masks, const std::vector& input, + const std::vector& projections, bool customVerification, const std::shared_ptr& customVerifier); @@ -349,10 +354,13 @@ void AggregationFuzzer::go() { auto groupingKeys = generateKeys("g", names, types); auto input = generateInputData(names, types, std::nullopt); + auto [convertedInput, projections] = + referenceQueryRunner_->inputProjections(input); - logVectors(input); + logVectors(convertedInput); - verifyAggregation(groupingKeys, {}, {}, input, false, {}); + verifyAggregation( + groupingKeys, {}, {}, convertedInput, projections, false, {}); } else { // Pick a random signature. auto signatureWithStats = pickSignature(); @@ -386,14 +394,17 @@ void AggregationFuzzer::go() { auto sortingKeys = generateSortingKeys("s", argNames, argTypes); auto input = generateInputDataWithRowNumber( argNames, argTypes, partitionKeys, signature); + auto [convertedInput, projections] = + referenceQueryRunner_->inputProjections(input); - logVectors(input); + logVectors(convertedInput); bool failed = verifyWindow( partitionKeys, sortingKeys, call, - input, + convertedInput, + projections, customVerification, FLAGS_enable_window_reference_verification); if (failed) { @@ -437,8 +448,10 @@ void AggregationFuzzer::go() { } auto input = generateInputData(argNames, argTypes, signature); + auto [convertedInput, projections] = + referenceQueryRunner_->inputProjections(input); - logVectors(input); + logVectors(convertedInput); std::shared_ptr customVerifier; if (customVerification) { @@ -451,7 +464,8 @@ void AggregationFuzzer::go() { groupingKeys, call, masks, - input, + convertedInput, + projections, customVerification, customVerifier); if (failed) { @@ -463,7 +477,8 @@ void AggregationFuzzer::go() { groupingKeys, call, masks, - input, + convertedInput, + projections, customVerification, customVerifier); if (failed) { @@ -474,7 +489,8 @@ void AggregationFuzzer::go() { groupingKeys, {call}, masks, - input, + convertedInput, + projections, customVerification, customVerifier); if (failed) { @@ -507,10 +523,12 @@ void makeAlternativePlansWithValues( const std::vector& aggregates, const std::vector& masks, const std::vector& inputVectors, + const std::vector& projections, std::vector& plans) { // Partial -> final aggregation plan. plans.push_back(PlanBuilder() .values(inputVectors) + .projectExpressions(projections) .partialAggregation(groupingKeys, aggregates, masks) .finalAggregation() .planNode()); @@ -518,6 +536,7 @@ void makeAlternativePlansWithValues( // Partial -> intermediate -> final aggregation plan. plans.push_back(PlanBuilder() .values(inputVectors) + .projectExpressions(projections) .partialAggregation(groupingKeys, aggregates, masks) .intermediateAggregation() .finalAggregation() @@ -535,6 +554,7 @@ void makeAlternativePlansWithValues( for (const auto& sourceInput : sourceInputs) { sources.push_back(PlanBuilder(planNodeIdGenerator) .values({sourceInput}) + .projectExpressions(projections) .partialAggregation(groupingKeys, aggregates, masks) .planNode()); } @@ -557,6 +577,7 @@ void makeAlternativePlansWithTableScan( const std::vector& aggregates, const std::vector& masks, const RowTypePtr& inputRowType, + const std::vector& projections, std::vector& plans) { // There is a known issue where LocalPartition will send DictionaryVectors // with the same underlying base Vector to multiple threads. This triggers @@ -567,6 +588,7 @@ void makeAlternativePlansWithTableScan( // Partial -> final aggregation plan. plans.push_back(PlanBuilder() .tableScan(inputRowType) + .projectExpressions(projections) .partialAggregation(groupingKeys, aggregates, masks) .localPartition(groupingKeys) .finalAggregation() @@ -575,6 +597,7 @@ void makeAlternativePlansWithTableScan( // Partial -> intermediate -> final aggregation plan. plans.push_back(PlanBuilder() .tableScan(inputRowType) + .projectExpressions(projections) .partialAggregation(groupingKeys, aggregates, masks) .localPartition(groupingKeys) .intermediateAggregation() @@ -588,10 +611,12 @@ void makeStreamingPlansWithValues( const std::vector& aggregates, const std::vector& masks, const std::vector& inputVectors, + const std::vector& projections, std::vector& plans) { // Single aggregation. plans.push_back(PlanBuilder() .values(inputVectors) + .projectExpressions(projections) .orderBy(groupingKeys, false) .streamingAggregation( groupingKeys, @@ -605,6 +630,7 @@ void makeStreamingPlansWithValues( plans.push_back( PlanBuilder() .values(inputVectors) + .projectExpressions(projections) .orderBy(groupingKeys, false) .partialStreamingAggregation(groupingKeys, aggregates, masks) .finalAggregation() @@ -614,6 +640,7 @@ void makeStreamingPlansWithValues( plans.push_back( PlanBuilder() .values(inputVectors) + .projectExpressions(projections) .orderBy(groupingKeys, false) .partialStreamingAggregation(groupingKeys, aggregates, masks) .intermediateAggregation() @@ -633,6 +660,7 @@ void makeStreamingPlansWithValues( sources.push_back( PlanBuilder(planNodeIdGenerator) .values({sourceInput}) + .projectExpressions(projections) .orderBy(groupingKeys, false) .partialStreamingAggregation(groupingKeys, aggregates, masks) .planNode()); @@ -648,10 +676,12 @@ void makeStreamingPlansWithTableScan( const std::vector& aggregates, const std::vector& masks, const RowTypePtr& inputRowType, + const std::vector& projections, std::vector& plans) { // Single aggregation. plans.push_back(PlanBuilder() .tableScan(inputRowType) + .projectExpressions(projections) .orderBy(groupingKeys, false) .streamingAggregation( groupingKeys, @@ -665,6 +695,7 @@ void makeStreamingPlansWithTableScan( plans.push_back( PlanBuilder() .tableScan(inputRowType) + .projectExpressions(projections) .orderBy(groupingKeys, false) .partialStreamingAggregation(groupingKeys, aggregates, masks) .finalAggregation() @@ -674,6 +705,7 @@ void makeStreamingPlansWithTableScan( plans.push_back( PlanBuilder() .tableScan(inputRowType) + .projectExpressions(projections) .orderBy(groupingKeys, false) .partialStreamingAggregation(groupingKeys, aggregates, masks) .intermediateAggregation() @@ -684,6 +716,7 @@ void makeStreamingPlansWithTableScan( plans.push_back( PlanBuilder() .tableScan(inputRowType) + .projectExpressions(projections) .orderBy(groupingKeys, true) .partialStreamingAggregation(groupingKeys, aggregates, masks) .localMerge(groupingKeys) @@ -696,6 +729,7 @@ bool AggregationFuzzer::verifyWindow( const std::vector& sortingKeys, const std::string& aggregate, const std::vector& input, + const std::vector& projections, bool customVerification, bool enableWindowVerification) { std::stringstream frame; @@ -708,6 +742,7 @@ bool AggregationFuzzer::verifyWindow( auto plan = PlanBuilder() .values(input) + .projectExpressions(projections) .window({fmt::format("{} over ({})", aggregate, frame.str())}) .planNode(); if (persistAndRunOnce_) { @@ -753,10 +788,12 @@ bool AggregationFuzzer::verifyAggregation( const std::vector& aggregates, const std::vector& masks, const std::vector& input, + const std::vector& projections, bool customVerification, const std::shared_ptr& customVerifier) { auto firstPlan = PlanBuilder() .values(input) + .projectExpressions(projections) .singleAggregation(groupingKeys, aggregates, masks) .planNode(); @@ -766,6 +803,7 @@ bool AggregationFuzzer::verifyAggregation( customVerifier->initialize( input, + projections, groupingKeys, aggregationNode->aggregates()[0], aggregationNode->aggregateNames()[0]); @@ -791,12 +829,22 @@ bool AggregationFuzzer::verifyAggregation( std::vector tableScanPlans; makeAlternativePlansWithTableScan( - groupingKeys, aggregates, masks, inputRowType, tableScanPlans); + groupingKeys, + aggregates, + masks, + inputRowType, + projections, + tableScanPlans); if (!groupingKeys.empty()) { // Use OrderBy + StreamingAggregation on original input. makeStreamingPlansWithTableScan( - groupingKeys, aggregates, masks, inputRowType, tableScanPlans); + groupingKeys, + aggregates, + masks, + inputRowType, + projections, + tableScanPlans); } for (const auto& plan : tableScanPlans) { @@ -805,7 +853,7 @@ bool AggregationFuzzer::verifyAggregation( } else { std::vector valuesPlans; makeAlternativePlansWithValues( - groupingKeys, aggregates, masks, input, valuesPlans); + groupingKeys, aggregates, masks, input, projections, valuesPlans); // Evaluate same plans on flat inputs. std::vector flatInput; @@ -817,16 +865,16 @@ bool AggregationFuzzer::verifyAggregation( } makeAlternativePlansWithValues( - groupingKeys, aggregates, masks, flatInput, valuesPlans); + groupingKeys, aggregates, masks, flatInput, projections, valuesPlans); if (!groupingKeys.empty()) { // Use OrderBy + StreamingAggregation on original input. makeStreamingPlansWithValues( - groupingKeys, aggregates, masks, input, valuesPlans); + groupingKeys, aggregates, masks, input, projections, valuesPlans); // Use OrderBy + StreamingAggregation on flattened input. makeStreamingPlansWithValues( - groupingKeys, aggregates, masks, flatInput, valuesPlans); + groupingKeys, aggregates, masks, flatInput, projections, valuesPlans); } for (const auto& plan : valuesPlans) { @@ -839,7 +887,7 @@ bool AggregationFuzzer::verifyAggregation( } return compareEquivalentPlanResults( - plans, customVerification, input, customVerifier); + plans, customVerification, input, projections, customVerifier); } bool AggregationFuzzer::verifySortedAggregation( @@ -847,10 +895,12 @@ bool AggregationFuzzer::verifySortedAggregation( const std::string& aggregate, const std::vector& masks, const std::vector& input, + const std::vector& projections, bool customVerification, const std::shared_ptr& customVerifier) { auto firstPlan = PlanBuilder() .values(input) + .projectExpressions(projections) .singleAggregation(groupingKeys, {aggregate}, masks) .planNode(); @@ -865,6 +915,7 @@ bool AggregationFuzzer::verifySortedAggregation( customVerifier->initialize( input, + projections, groupingKeys, aggregateFunctionCall, aggregationNode->aggregateNames()[0]); @@ -887,6 +938,7 @@ bool AggregationFuzzer::verifySortedAggregation( plans.push_back( {PlanBuilder() .values(input) + .projectExpressions(projections) .orderBy(groupingKeys, false) .streamingAggregation( groupingKeys, @@ -907,6 +959,7 @@ bool AggregationFuzzer::verifySortedAggregation( plans.push_back( {PlanBuilder() .tableScan(inputRowType) + .projectExpressions(projections) .singleAggregation(groupingKeys, {aggregate}, masks) .planNode(), splits}); @@ -915,6 +968,7 @@ bool AggregationFuzzer::verifySortedAggregation( plans.push_back( {PlanBuilder() .tableScan(inputRowType) + .projectExpressions(projections) .orderBy(groupingKeys, false) .streamingAggregation( groupingKeys, @@ -939,12 +993,13 @@ bool AggregationFuzzer::verifySortedAggregation( // results of the aggregation with the reference DB) // keep the custom verifier enabled. return compareEquivalentPlanResults( - plans, customVerification, input, customVerifier, 1); + plans, customVerification, input, projections, customVerifier, 1); } else { // If custom verification is not enabled or the custom verifier is used for // compare and the aggregation is order sensitive (the result shoudl be // deterministic if the input is sorted), then compare the results directly. - return compareEquivalentPlanResults(plans, false, input, nullptr, 1); + return compareEquivalentPlanResults( + plans, false, input, projections, nullptr, 1); } } @@ -1077,6 +1132,7 @@ bool AggregationFuzzer::compareEquivalentPlanResults( const std::vector& plans, bool customVerification, const std::vector& input, + const std::vector& projections, const std::shared_ptr& customVerifier, int32_t maxDrivers, bool testWithSpilling) { @@ -1156,11 +1212,13 @@ bool AggregationFuzzer::verifyDistinctAggregation( const std::string& aggregate, const std::vector& masks, const std::vector& input, + const std::vector& projections, bool customVerification, const std::shared_ptr& customVerifier) { const auto firstPlan = PlanBuilder() .values(input) + .projectExpressions(projections) .singleAggregation(groupingKeys, {aggregate}, masks) .planNode(); @@ -1171,6 +1229,7 @@ bool AggregationFuzzer::verifyDistinctAggregation( customVerifier->initialize( input, + projections, groupingKeys, aggregationNode->aggregates()[0], aggregationNode->aggregateNames()[0]); @@ -1191,6 +1250,7 @@ bool AggregationFuzzer::verifyDistinctAggregation( plans.push_back( {PlanBuilder() .values(input) + .projectExpressions(projections) .orderBy(groupingKeys, false) .streamingAggregation( groupingKeys, @@ -1213,6 +1273,7 @@ bool AggregationFuzzer::verifyDistinctAggregation( plans.push_back( {PlanBuilder() .tableScan(inputRowType) + .projectExpressions(projections) .singleAggregation(groupingKeys, {aggregate}, masks) .planNode(), splits}); @@ -1221,6 +1282,7 @@ bool AggregationFuzzer::verifyDistinctAggregation( plans.push_back( {PlanBuilder() .tableScan(inputRowType) + .projectExpressions(projections) .orderBy(groupingKeys, false) .streamingAggregation( groupingKeys, @@ -1240,7 +1302,7 @@ bool AggregationFuzzer::verifyDistinctAggregation( // Distinct aggregation must run single-threaded or data must be partitioned // on group-by keys among threads. return compareEquivalentPlanResults( - plans, customVerification, input, customVerifier, 1, false); + plans, customVerification, input, projections, customVerifier, 1, false); } } // namespace diff --git a/velox/exec/fuzzer/AggregationFuzzerBase.cpp b/velox/exec/fuzzer/AggregationFuzzerBase.cpp index b6fe99a14491..fd0a35fb11a6 100644 --- a/velox/exec/fuzzer/AggregationFuzzerBase.cpp +++ b/velox/exec/fuzzer/AggregationFuzzerBase.cpp @@ -210,17 +210,6 @@ std::vector AggregationFuzzerBase::generateKeys( const std::string& prefix, std::vector& names, std::vector& types) { - static const std::vector kNonFloatingPointTypes{ - BOOLEAN(), - TINYINT(), - SMALLINT(), - INTEGER(), - BIGINT(), - VARCHAR(), - VARBINARY(), - TIMESTAMP(), - }; - auto numKeys = boost::random::uniform_int_distribution(1, 5)(rng_); std::vector keys; for (auto i = 0; i < numKeys; ++i) { @@ -228,10 +217,9 @@ std::vector AggregationFuzzerBase::generateKeys( // Pick random, possibly complex, type. if (orderableGroupKeys_) { - types.push_back( - vectorFuzzer_.randOrderableType(kNonFloatingPointTypes, 2)); + types.push_back(vectorFuzzer_.randOrderableType(supportedKeyTypes_, 2)); } else { - types.push_back(vectorFuzzer_.randType(kNonFloatingPointTypes, 2)); + types.push_back(vectorFuzzer_.randType(supportedKeyTypes_, 2)); } names.push_back(keys.back()); } diff --git a/velox/exec/fuzzer/AggregationFuzzerBase.h b/velox/exec/fuzzer/AggregationFuzzerBase.h index 74e5c63b5d72..1e1680a44af7 100644 --- a/velox/exec/fuzzer/AggregationFuzzerBase.h +++ b/velox/exec/fuzzer/AggregationFuzzerBase.h @@ -83,6 +83,13 @@ class AggregationFuzzerBase { dwrf::registerDwrfReaderFactory(); dwrf::registerDwrfWriterFactory(); referenceQueryRunner_->registerCustomVectorFuzzers(vectorFuzzer_); + + for (const auto& type : referenceQueryRunner_->supportedScalarTypes()) { + if (!type->isReal() && !type->isDouble()) { + supportedKeyTypes_.push_back(type); + } + } + seed(initialSeed); } @@ -276,6 +283,7 @@ class AggregationFuzzerBase { std::shared_ptr writerPool_{ rootPool_->addAggregateChild("aggregationFuzzerWriter")}; VectorFuzzer vectorFuzzer_; + std::vector supportedKeyTypes_; }; // Returns true if the elapsed time is greater than or equal to diff --git a/velox/exec/fuzzer/CMakeLists.txt b/velox/exec/fuzzer/CMakeLists.txt index 986c52e58e68..660d37c4c5fa 100644 --- a/velox/exec/fuzzer/CMakeLists.txt +++ b/velox/exec/fuzzer/CMakeLists.txt @@ -12,8 +12,13 @@ # See the License for the specific language governing permissions and # limitations under the License. -add_library(velox_fuzzer_util DuckQueryRunner.cpp PrestoQueryRunner.cpp - FuzzerUtil.cpp ToSQLUtil.cpp) +add_library( + velox_fuzzer_util + DuckQueryRunner.cpp + PrestoQueryRunner.cpp + FuzzerUtil.cpp + ToSQLUtil.cpp + PrestoQueryRunnerIntermediateTypeTransforms.cpp) target_link_libraries( velox_fuzzer_util diff --git a/velox/exec/fuzzer/PrestoQueryRunner.cpp b/velox/exec/fuzzer/PrestoQueryRunner.cpp index a1265195d381..e98b60368509 100644 --- a/velox/exec/fuzzer/PrestoQueryRunner.cpp +++ b/velox/exec/fuzzer/PrestoQueryRunner.cpp @@ -28,6 +28,7 @@ #include "velox/dwio/common/WriterFactory.h" #include "velox/dwio/dwrf/writer/Writer.h" #include "velox/exec/fuzzer/FuzzerUtil.h" +#include "velox/exec/fuzzer/PrestoQueryRunnerIntermediateTypeTransforms.h" #include "velox/exec/fuzzer/ToSQLUtil.h" #include "velox/exec/tests/utils/QueryAssertions.h" #include "velox/functions/prestosql/types/IPAddressType.h" @@ -205,6 +206,11 @@ std::optional PrestoQueryRunner::toSql( return toSql(valuesNode); } + if (const auto tableScanNode = + std::dynamic_pointer_cast(plan)) { + return toSql(tableScanNode); + } + VELOX_NYI(); } @@ -251,6 +257,7 @@ const std::vector& PrestoQueryRunner::supportedScalarTypes() const { VARCHAR(), VARBINARY(), TIMESTAMP(), + TIMESTAMP_WITH_TIME_ZONE(), }; return kScalarTypes; } @@ -262,6 +269,62 @@ void PrestoQueryRunner::registerCustomVectorFuzzers( std::make_unique()); } +std::pair, std::vector> +PrestoQueryRunner::inputProjections( + const std::vector& input) const { + if (input.empty()) { + return {input, {}}; + } + + std::vector projections; + std::vector names = input[0]->type()->asRow().names(); + std::vector> children(input.size()); + for (int childIndex = 0; childIndex < input[0]->childrenSize(); + childIndex++) { + const auto& childType = input[0]->childAt(childIndex)->type(); + // If it's an intermediate only type, transform the input and add + // expressions to reverse the transformation. Otherwise the input is + // unchanged and the projection is just an identity mapping. + if (isIntermediateOnlyType(childType)) { + for (int batchIndex = 0; batchIndex < input.size(); batchIndex++) { + children[batchIndex].push_back(transformIntermediateOnlyType( + input[batchIndex]->childAt(childIndex))); + } + projections.push_back(getIntermediateOnlyTypeProjectionExpr( + childType, + std::make_shared( + names[childIndex], names[childIndex]), + names[childIndex])); + } else { + for (int batchIndex = 0; batchIndex < input.size(); batchIndex++) { + children[batchIndex].push_back(input[batchIndex]->childAt(childIndex)); + } + + projections.push_back(std::make_shared( + names[childIndex], names[childIndex])); + } + } + + std::vector types; + for (const auto& child : children[0]) { + types.push_back(child->type()); + } + + auto rowType = ROW(std::move(names), std::move(types)); + + std::vector output; + for (int batchIndex = 0; batchIndex < input.size(); batchIndex++) { + output.push_back(std::make_shared( + input[batchIndex]->pool(), + rowType, + nullptr, + input[batchIndex]->size(), + std::move(children[batchIndex]))); + } + + return std::make_pair(output, projections); +} + const std::unordered_map& PrestoQueryRunner::aggregationFunctionDataSpecs() const { // For some functions, velox supports NaN, Infinity better than presto query @@ -324,7 +387,12 @@ std::optional PrestoQueryRunner::toSql( } } - sql << " FROM tmp"; + auto sourceSql = toSql(aggregationNode->sources()[0]); + if (!sourceSql.has_value()) { + return std::nullopt; + } + + sql << " FROM (" << sourceSql.value() << ")"; if (!groupingKeys.empty()) { sql << " GROUP BY " << folly::join(", ", groupingKeys); @@ -711,6 +779,14 @@ std::optional PrestoQueryRunner::toSql( return "tmp"; } +std::optional PrestoQueryRunner::toSql( + const std::shared_ptr& tableScanNode) { + if (!isSupportedDwrfType(tableScanNode->outputType())) { + return std::nullopt; + } + return "tmp"; +} + std::multiset> PrestoQueryRunner::execute( const std::string& sql, const std::vector& input, diff --git a/velox/exec/fuzzer/PrestoQueryRunner.h b/velox/exec/fuzzer/PrestoQueryRunner.h index 43326cc7239e..8b1e45ad215b 100644 --- a/velox/exec/fuzzer/PrestoQueryRunner.h +++ b/velox/exec/fuzzer/PrestoQueryRunner.h @@ -55,6 +55,9 @@ class PrestoQueryRunner : public velox::exec::test::ReferenceQueryRunner { void registerCustomVectorFuzzers(VectorFuzzer& vectorFuzzer) const override; + std::pair, std::vector> + inputProjections(const std::vector& input) const override; + const std::unordered_map& aggregationFunctionDataSpecs() const override; @@ -147,6 +150,9 @@ class PrestoQueryRunner : public velox::exec::test::ReferenceQueryRunner { std::optional toSql( const std::shared_ptr& valuesNode); + std::optional toSql( + const std::shared_ptr& tableScanNode); + std::string startQuery( const std::string& sql, const std::string& sessionProperty = ""); diff --git a/velox/exec/fuzzer/PrestoQueryRunnerIntermediateTypeTransforms.cpp b/velox/exec/fuzzer/PrestoQueryRunnerIntermediateTypeTransforms.cpp new file mode 100644 index 000000000000..51300f253094 --- /dev/null +++ b/velox/exec/fuzzer/PrestoQueryRunnerIntermediateTypeTransforms.cpp @@ -0,0 +1,408 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/exec/fuzzer/PrestoQueryRunnerIntermediateTypeTransforms.h" +#include "velox/functions/prestosql/types/TimestampWithTimeZoneType.h" +#include "velox/parse/Expressions.h" +#include "velox/type/tz/TimeZoneMap.h" + +namespace facebook::velox::exec::test { +namespace { +// Defines a transform for an intermediate type or a complex type that can +// contain an intermediate type. +class IntermediateTypeTransform { + public: + virtual ~IntermediateTypeTransform() = default; + + virtual VectorPtr transform(const VectorPtr& vector) const = 0; + virtual core::ExprPtr projectionExpr( + const TypePtr& type, + const core::ExprPtr& inputExpr, + const std::string& columnAlias) const = 0; +}; + +class ArrayTransform : public IntermediateTypeTransform { + public: + VectorPtr transform(const VectorPtr& vector) const override; + core::ExprPtr projectionExpr( + const TypePtr& type, + const core::ExprPtr& inputExpr, + const std::string& columnAlias) const override; +}; + +class MapTransform : public IntermediateTypeTransform { + public: + VectorPtr transform(const VectorPtr& vector) const override; + core::ExprPtr projectionExpr( + const TypePtr& type, + const core::ExprPtr& inputExpr, + const std::string& columnAlias) const override; +}; + +class RowTransform : public IntermediateTypeTransform { + public: + VectorPtr transform(const VectorPtr& vector) const override; + core::ExprPtr projectionExpr( + const TypePtr& type, + const core::ExprPtr& inputExpr, + const std::string& columnAlias) const override; +}; + +class TimestampWithTimeZoneTransform : public IntermediateTypeTransform { + private: + static constexpr const char* kMillisColumnName = "millis"; + static constexpr const char* kTimeZoneColumnName = "time_zone"; + + public: + VectorPtr transform(const VectorPtr& vector) const override; + core::ExprPtr projectionExpr( + const TypePtr& type, + const core::ExprPtr& inputExpr, + const std::string& columnAlias) const override; +}; + +// Convert a Vector of TimestampWithTimeZone to a RowVector of BIGINT and +// VARCHAR (millisUtc and time_zone). +VectorPtr TimestampWithTimeZoneTransform::transform( + const VectorPtr& vector) const { + VELOX_CHECK(isTimestampWithTimeZoneType(vector->type())); + DecodedVector decoded(*vector); + const auto* base = decoded.base()->as>(); + + VectorPtr millisVector; + VectorPtr timeZoneVector; + + if (base->isFlatEncoding()) { + auto millisVectorFlat = BaseVector::create>( + DOUBLE(), base->size(), base->pool()); + auto timeZoneVectorFlat = BaseVector::create>( + VARCHAR(), base->size(), base->pool()); + millisVector = millisVectorFlat; + timeZoneVector = timeZoneVectorFlat; + + for (int i = 0; i < base->size(); ++i) { + if (base->isNullAt(i)) { + millisVectorFlat->setNull(i, true); + timeZoneVectorFlat->setNull(i, true); + } else { + millisVectorFlat->set(i, (double)unpackMillisUtc(base->valueAt(i))); + std::string tzName = + tz::getTimeZoneName(unpackZoneKeyId(base->valueAt(i))); + timeZoneVectorFlat->set(i, StringView(tzName)); + } + } + } else { + VELOX_CHECK(base->isConstantEncoding()); + if (base->isNullAt(0)) { + millisVector = + BaseVector::createNullConstant(DOUBLE(), base->size(), base->pool()); + timeZoneVector = + BaseVector::createNullConstant(VARCHAR(), base->size(), base->pool()); + } else { + millisVector = BaseVector::createConstant( + DOUBLE(), + (double)unpackMillisUtc(base->valueAt(0)), + base->size(), + base->pool()); + std::string tzName = + tz::getTimeZoneName(unpackZoneKeyId(base->valueAt(0))); + timeZoneVector = BaseVector::createConstant( + VARCHAR(), StringView(tzName), base->size(), base->pool()); + } + } + + VectorPtr row = std::make_shared( + base->pool(), + ROW({kMillisColumnName, kTimeZoneColumnName}, {DOUBLE(), VARCHAR()}), + nullptr, + base->size(), + std::vector{millisVector, timeZoneVector}); + + if (!decoded.isIdentityMapping()) { + row = decoded.wrap(row, *vector, vector->size()); + } + + return row; +} + +// Applies from_unixtime to a RowVector of BIGINT and VARCHAR (millisUtc and +// time_zone) to produce values of type TimestampWithTimeZone. +core::ExprPtr TimestampWithTimeZoneTransform::projectionExpr( + const TypePtr& type, + const core::ExprPtr& inputExpr, + const std::string& columnAlias) const { + VELOX_CHECK(isTimestampWithTimeZoneType(type)); + + return std::make_shared( + "from_unixtime", + std::vector{ + std::make_shared( + kMillisColumnName, + kMillisColumnName, + std::vector{inputExpr}), + std::make_shared( + kTimeZoneColumnName, + kTimeZoneColumnName, + std::vector{inputExpr})}, + columnAlias); +} + +// Converts an ArrayVector so that any intermediate only types in the elements +// are transformed. +VectorPtr ArrayTransform::transform(const VectorPtr& vector) const { + VELOX_CHECK(vector->type()->isArray()); + DecodedVector decoded(*vector); + const auto* base = decoded.base()->as(); + + VectorPtr elementsVector = transformIntermediateOnlyType(base->elements()); + + VectorPtr array = std::make_shared( + base->pool(), + ARRAY(elementsVector->type()), + base->nulls(), + base->size(), + base->offsets(), + base->sizes(), + elementsVector); + + if (!decoded.isIdentityMapping()) { + array = decoded.wrap(array, *vector, vector->size()); + } + + return array; +} + +// Applies a lambda transform to the elements of an array to convert input types +// to intermediate only types where necessary. +core::ExprPtr ArrayTransform::projectionExpr( + const TypePtr& type, + const core::ExprPtr& inputExpr, + const std::string& columnAlias) const { + VELOX_CHECK(type->isArray()); + + return std::make_shared( + "transform", + std::vector{ + inputExpr, + std::make_shared( + std::vector{"x"}, + getIntermediateOnlyTypeProjectionExpr( + type->asArray().elementType(), + std::make_shared("x", "x"), + "x"))}, + columnAlias); +} + +// Converts an MapVector so that any intermediate only types in the keys and +// values are transformed. +VectorPtr MapTransform::transform(const VectorPtr& vector) const { + VELOX_CHECK(vector->type()->isMap()); + DecodedVector decoded(*vector); + const auto* base = decoded.base()->as(); + + VectorPtr keysVector = base->mapKeys(); + VectorPtr valuesVector = base->mapValues(); + const auto& keysType = keysVector->type(); + const auto& valuesType = valuesVector->type(); + + if (isIntermediateOnlyType(keysType)) { + keysVector = transformIntermediateOnlyType(keysVector); + } + + if (isIntermediateOnlyType(valuesType)) { + valuesVector = transformIntermediateOnlyType(valuesVector); + } + + VectorPtr map = std::make_shared( + base->pool(), + MAP(keysVector->type(), valuesVector->type()), + base->nulls(), + base->size(), + base->offsets(), + base->sizes(), + keysVector, + valuesVector); + + if (!decoded.isIdentityMapping()) { + map = decoded.wrap(map, *vector, vector->size()); + } + + return map; +} + +// Applies a lambda transform to the keys and values of a map to convert input +// types to intermediate only types where necessary. +core::ExprPtr MapTransform::projectionExpr( + const TypePtr& type, + const core::ExprPtr& inputExpr, + const std::string& columnAlias) const { + VELOX_CHECK(type->isMap()); + const auto& mapType = type->asMap(); + const auto& keysType = mapType.keyType(); + const auto& valuesType = mapType.valueType(); + + core::ExprPtr expr = inputExpr; + + if (isIntermediateOnlyType(keysType)) { + expr = std::make_shared( + "transform_keys", + std::vector{ + expr, + std::make_shared( + std::vector{"k", "v"}, + getIntermediateOnlyTypeProjectionExpr( + keysType, + std::make_shared("k", "k"), + "k"))}, + columnAlias); + } + + if (isIntermediateOnlyType(valuesType)) { + expr = std::make_shared( + "transform_values", + std::vector{ + expr, + std::make_shared( + std::vector{"k", "v"}, + getIntermediateOnlyTypeProjectionExpr( + valuesType, + std::make_shared("v", "v"), + "v"))}, + columnAlias); + } + + return expr; +} + +// Converts an RowVector so that any intermediate only types in the children are +// transformed. +VectorPtr RowTransform::transform(const VectorPtr& vector) const { + VELOX_CHECK(vector->type()->isRow()); + DecodedVector decoded(*vector); + const auto* base = decoded.base()->as(); + + std::vector children; + std::vector childrenTypes; + std::vector childrenNames = base->type()->asRow().names(); + for (const auto& child : base->children()) { + if (isIntermediateOnlyType(child->type())) { + children.push_back(transformIntermediateOnlyType(child)); + childrenTypes.push_back(children.back()->type()); + } else { + children.push_back(child); + childrenTypes.push_back(child->type()); + } + } + + VectorPtr row = std::make_shared( + base->pool(), + ROW(std::move(childrenNames), std::move(childrenTypes)), + base->nulls(), + base->size(), + std::move(children)); + + if (!decoded.isIdentityMapping()) { + row = decoded.wrap(row, *vector, vector->size()); + } + + return row; +} + +// Applies transforms to the children of a row to convert input types to +// intermediate only types where necessary, and reconstructs the row via +// row_constructor. +core::ExprPtr RowTransform::projectionExpr( + const TypePtr& type, + const core::ExprPtr& inputExpr, + const std::string& columnAlias) const { + VELOX_CHECK(type->isRow()); + const auto& rowType = type->asRow(); + + std::vector children; + for (int i = 0; i < rowType.size(); i++) { + if (isIntermediateOnlyType(rowType.childAt(i))) { + children.push_back(getIntermediateOnlyTypeProjectionExpr( + rowType.childAt(i), + std::make_shared( + rowType.nameOf(i), + rowType.nameOf(i), + std::vector{inputExpr}), + rowType.nameOf(i))); + } else { + children.push_back(std::make_shared( + rowType.nameOf(i), + rowType.nameOf(i), + std::vector{inputExpr})); + } + } + + return std::make_shared( + "row_constructor", std::move(children), columnAlias); +} + +const ArrayTransform kArrayTransform; +const MapTransform kMapTransform; +const RowTransform kRowTransform; +const TimestampWithTimeZoneTransform kTimestampWithTimeZoneTransform; +} // namespace + +bool isIntermediateOnlyType(const TypePtr& type) { + if (isTimestampWithTimeZoneType(type)) { + return true; + } + + for (auto i = 0; i < type->size(); ++i) { + if (isIntermediateOnlyType(type->childAt(i))) { + return true; + } + } + + return false; +} + +VectorPtr transformIntermediateOnlyType(const VectorPtr& vector) { + const auto& type = vector->type(); + if (type->isArray()) { + return kArrayTransform.transform(vector); + } else if (type->isMap()) { + return kMapTransform.transform(vector); + } else if (type->isRow()) { + return kRowTransform.transform(vector); + } else if (isTimestampWithTimeZoneType(type)) { + return kTimestampWithTimeZoneTransform.transform(vector); + } + + VELOX_UNREACHABLE(); +} + +core::ExprPtr getIntermediateOnlyTypeProjectionExpr( + const TypePtr& type, + const core::ExprPtr& inputExpr, + const std::string& columnAlias) { + if (type->isArray()) { + return kArrayTransform.projectionExpr(type, inputExpr, columnAlias); + } else if (type->isMap()) { + return kMapTransform.projectionExpr(type, inputExpr, columnAlias); + } else if (type->isRow()) { + return kRowTransform.projectionExpr(type, inputExpr, columnAlias); + } else if (isTimestampWithTimeZoneType(type)) { + return kTimestampWithTimeZoneTransform.projectionExpr( + type, inputExpr, columnAlias); + } + + VELOX_UNREACHABLE(); +} +} // namespace facebook::velox::exec::test diff --git a/velox/exec/fuzzer/PrestoQueryRunnerIntermediateTypeTransforms.h b/velox/exec/fuzzer/PrestoQueryRunnerIntermediateTypeTransforms.h new file mode 100644 index 000000000000..f66a403e60bf --- /dev/null +++ b/velox/exec/fuzzer/PrestoQueryRunnerIntermediateTypeTransforms.h @@ -0,0 +1,39 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "velox/parse/IExpr.h" +#include "velox/vector/BaseVector.h" + +namespace facebook::velox::exec::test { +/// Returns true if this types is an intermediate only type or contains an +/// intermediate only type. +bool isIntermediateOnlyType(const TypePtr& type); + +/// Converts a Vector of an intermediate only type, or containing one, to a +/// Vector of value(s) that can be input to a projection to produce those values +/// of that type but are of types supported as input. Preserves nulls and +/// encodings. +VectorPtr transformIntermediateOnlyType(const VectorPtr& vector); +/// Converts an expression that takes in a value of an intermediate only type so +/// that it applies a transformation to convert valid input typess into values +/// of the intermediate only type. +core::ExprPtr getIntermediateOnlyTypeProjectionExpr( + const TypePtr& type, + const core::ExprPtr& inputExpr, + const std::string& columnAlias); +} // namespace facebook::velox::exec::test diff --git a/velox/exec/fuzzer/ReferenceQueryRunner.h b/velox/exec/fuzzer/ReferenceQueryRunner.h index d71b7f84fe0c..ffa826a43f7a 100644 --- a/velox/exec/fuzzer/ReferenceQueryRunner.h +++ b/velox/exec/fuzzer/ReferenceQueryRunner.h @@ -18,6 +18,8 @@ #include #include "velox/core/PlanNode.h" #include "velox/expression/FunctionSignature.h" +#include "velox/parse/Expressions.h" +#include "velox/parse/IExpr.h" #include "velox/vector/fuzzer/VectorFuzzer.h" namespace facebook::velox::exec::test { @@ -52,6 +54,27 @@ class ReferenceQueryRunner { return; } + /// Given a vector of batches, returns a pair of updated input batches and + /// expressions to be run as a projection. This is used to convert + /// intermediate only types (types not supported in the input) in input to + /// types allowed in the input, and the projections will handle converting + /// those values back into intermediate only types. + virtual std::pair, std::vector> + inputProjections(const std::vector& input) const { + if (input.empty()) { + return {input, {}}; + } + + std::vector projections; + + for (const auto& name : input[0]->type()->asRow().names()) { + projections.push_back( + std::make_shared(name, name)); + } + + return std::make_pair(input, projections); + } + virtual const std::unordered_map& aggregationFunctionDataSpecs() const = 0; diff --git a/velox/exec/fuzzer/ResultVerifier.h b/velox/exec/fuzzer/ResultVerifier.h index bda5ccecc7f1..57e5bdb2ec5d 100644 --- a/velox/exec/fuzzer/ResultVerifier.h +++ b/velox/exec/fuzzer/ResultVerifier.h @@ -51,6 +51,7 @@ class ResultVerifier { /// re-use its results for multiple 'verify' calls. virtual void initialize( const std::vector& input, + const std::vector& projections, const std::vector& groupingKeys, const core::AggregationNode::Aggregate& aggregate, const std::string& aggregateName) = 0; @@ -61,6 +62,7 @@ class ResultVerifier { /// that will store the window function results. virtual void initializeWindow( const std::vector& /*input*/, + const std::vector& /* projections */, const std::vector& /*partitionByKeys*/, const std::vector& /*sortingKeysAndOrders*/, const core::WindowNode::Function& /*function*/, diff --git a/velox/exec/fuzzer/TransformResultVerifier.h b/velox/exec/fuzzer/TransformResultVerifier.h index 15776275103b..a31c1ccb5718 100644 --- a/velox/exec/fuzzer/TransformResultVerifier.h +++ b/velox/exec/fuzzer/TransformResultVerifier.h @@ -52,6 +52,7 @@ class TransformResultVerifier : public ResultVerifier { void initialize( const std::vector& /*input*/, + const std::vector& /*projections*/, const std::vector& groupingKeys, const core::AggregationNode::Aggregate& /*aggregate*/, const std::string& aggregateName) override { diff --git a/velox/exec/fuzzer/WindowFuzzer.cpp b/velox/exec/fuzzer/WindowFuzzer.cpp index b4d8a578f3a8..f5f258a01149 100644 --- a/velox/exec/fuzzer/WindowFuzzer.cpp +++ b/velox/exec/fuzzer/WindowFuzzer.cpp @@ -509,7 +509,10 @@ void WindowFuzzer::go() { } } - logVectors(input); + auto [convertedInput, projections] = + referenceQueryRunner_->inputProjections(input); + + logVectors(convertedInput); // For kRange frames with constant k, velox expects the frame bounds to be // columns containing precomputed offset values. Presto frame clause uses @@ -523,7 +526,8 @@ void WindowFuzzer::go() { sortingKeysAndOrders, frameClause, call, - input, + convertedInput, + projections, customVerification, customVerifier, FLAGS_enable_window_reference_verification, @@ -561,6 +565,7 @@ void WindowFuzzer::testAlternativePlans( const std::string& frame, const std::string& functionCall, const std::vector& input, + const std::vector& projections, bool customVerification, const std::shared_ptr& customVerifier, const velox::fuzzer::ResultOrError& expected) { @@ -580,6 +585,7 @@ void WindowFuzzer::testAlternativePlans( plans.push_back( {PlanBuilder() .values(input) + .projectExpressions(projections) .orderBy(allKeys, false) .streamingWindow( {fmt::format("{} over ({})", functionCall, frame)}) @@ -602,6 +608,7 @@ void WindowFuzzer::testAlternativePlans( plans.push_back( {PlanBuilder() .tableScan(inputRowType) + .projectExpressions(projections) .localPartition(partitionKeys) .window({fmt::format("{} over ({})", functionCall, frame)}) .planNode(), @@ -612,6 +619,7 @@ void WindowFuzzer::testAlternativePlans( plans.push_back( {PlanBuilder() .tableScan(inputRowType) + .projectExpressions(projections) .orderBy(allKeys, false) .streamingWindow( {fmt::format("{} over ({})", functionCall, frame)}) @@ -631,6 +639,7 @@ void initializeVerifier( const core::PlanNodePtr& plan, const std::shared_ptr& customVerifier, const std::vector& input, + const std::vector& projections, const std::vector& partitionKeys, const std::vector& sortingKeysAndOrders, const std::string& frame) { @@ -638,6 +647,7 @@ void initializeVerifier( std::dynamic_pointer_cast(plan); customVerifier->initializeWindow( input, + projections, partitionKeys, sortingKeysAndOrders, windowNode->windowFunctions()[0], @@ -652,6 +662,7 @@ bool WindowFuzzer::verifyWindow( const std::string& frameClause, const std::string& functionCall, const std::vector& input, + const std::vector& projections, bool customVerification, const std::shared_ptr& customVerifier, bool enableWindowVerification, @@ -666,6 +677,7 @@ bool WindowFuzzer::verifyWindow( auto frame = getFrame(partitionKeys, sortingKeysAndOrders, frameClause); auto plan = PlanBuilder() .values(input) + .projectExpressions(projections) .window({fmt::format("{} over ({})", functionCall, frame)}) .capturePlanNodeId(windowNodeId) .planNode(); @@ -722,6 +734,7 @@ bool WindowFuzzer::verifyWindow( plan, customVerifier, input, + projections, partitionKeys, sortingKeysAndOrders, frame); @@ -735,6 +748,7 @@ bool WindowFuzzer::verifyWindow( frame, functionCall, input, + projections, customVerification, customVerifier, resultOrError); diff --git a/velox/exec/fuzzer/WindowFuzzer.h b/velox/exec/fuzzer/WindowFuzzer.h index 4e9b88057b35..ac52866daf6e 100644 --- a/velox/exec/fuzzer/WindowFuzzer.h +++ b/velox/exec/fuzzer/WindowFuzzer.h @@ -198,6 +198,7 @@ class WindowFuzzer : public AggregationFuzzerBase { const std::string& frameClause, const std::string& functionCall, const std::vector& input, + const std::vector& projections, bool customVerification, const std::shared_ptr& customVerifier, bool enableWindowVerification, @@ -209,6 +210,7 @@ class WindowFuzzer : public AggregationFuzzerBase { const std::string& frame, const std::string& functionCall, const std::vector& input, + const std::vector& projections, bool customVerification, const std::shared_ptr& customVerifier, const velox::fuzzer::ResultOrError& expected); diff --git a/velox/exec/tests/PrestoQueryRunnerTest.cpp b/velox/exec/tests/PrestoQueryRunnerTest.cpp index 25b231dc6c7c..b6b18f06c9cc 100644 --- a/velox/exec/tests/PrestoQueryRunnerTest.cpp +++ b/velox/exec/tests/PrestoQueryRunnerTest.cpp @@ -136,7 +136,7 @@ TEST_F(PrestoQueryRunnerTest, sortedAggregation) { ASSERT_TRUE(sql.has_value()); ASSERT_EQ( - "SELECT multimap_agg(c0, c1 ORDER BY c0 ASC NULLS LAST) as a0 FROM tmp", + "SELECT multimap_agg(c0, c1 ORDER BY c0 ASC NULLS LAST) as a0 FROM (tmp)", sql.value()); // Plans with multiple order by's in the aggregate. @@ -152,7 +152,7 @@ TEST_F(PrestoQueryRunnerTest, sortedAggregation) { sql = queryRunner->toSql(plan); ASSERT_TRUE(sql.has_value()); ASSERT_EQ( - "SELECT multimap_agg(c0, c1 ORDER BY c1 ASC NULLS FIRST, c0 DESC NULLS LAST, c2 ASC NULLS LAST) as a0 FROM tmp", + "SELECT multimap_agg(c0, c1 ORDER BY c1 ASC NULLS FIRST, c0 DESC NULLS LAST, c2 ASC NULLS LAST) as a0 FROM (tmp)", sql.value()); } @@ -174,7 +174,7 @@ TEST_F(PrestoQueryRunnerTest, distinctAggregation) { auto sql = queryRunner->toSql(plan); ASSERT_TRUE(sql.has_value()); - ASSERT_EQ("SELECT array_agg(distinct c0) as a0 FROM tmp", sql.value()); + ASSERT_EQ("SELECT array_agg(distinct c0) as a0 FROM (tmp)", sql.value()); } TEST_F(PrestoQueryRunnerTest, toSql) { @@ -234,7 +234,7 @@ TEST_F(PrestoQueryRunnerTest, toSql) { .planNode(); EXPECT_EQ( queryRunner->toSql(plan), - "SELECT c1, avg(c0) as a0 FROM tmp GROUP BY c1"); + "SELECT c1, avg(c0) as a0 FROM (tmp) GROUP BY c1"); plan = PlanBuilder() .tableScan("tmp", dataType) @@ -243,7 +243,7 @@ TEST_F(PrestoQueryRunnerTest, toSql) { .planNode(); EXPECT_EQ( queryRunner->toSql(plan), - "SELECT (a0 + c1) as p0 FROM (SELECT c1, sum(c0) as a0 FROM tmp GROUP BY c1)"); + "SELECT (a0 + c1) as p0 FROM (SELECT c1, sum(c0) as a0 FROM (tmp) GROUP BY c1)"); plan = PlanBuilder() .tableScan("tmp", dataType) @@ -251,7 +251,7 @@ TEST_F(PrestoQueryRunnerTest, toSql) { .planNode(); EXPECT_EQ( queryRunner->toSql(plan), - "SELECT avg(c0) filter (where c2) as a0, avg(c1) as a1 FROM tmp"); + "SELECT avg(c0) filter (where c2) as a0, avg(c1) as a1 FROM (tmp)"); } } diff --git a/velox/functions/prestosql/fuzzer/ApproxDistinctResultVerifier.h b/velox/functions/prestosql/fuzzer/ApproxDistinctResultVerifier.h index 9704995521e7..fec4411378ce 100644 --- a/velox/functions/prestosql/fuzzer/ApproxDistinctResultVerifier.h +++ b/velox/functions/prestosql/fuzzer/ApproxDistinctResultVerifier.h @@ -44,12 +44,14 @@ class ApproxDistinctResultVerifier : public ResultVerifier { // Compute count(distinct x) over 'input'. void initialize( const std::vector& input, + const std::vector& projections, const std::vector& groupingKeys, const core::AggregationNode::Aggregate& aggregate, const std::string& aggregateName) override { auto plan = PlanBuilder() .values(input) + .projectExpressions(projections) .singleAggregation(groupingKeys, {makeCountDistinctCall(aggregate)}) .planNode(); @@ -63,6 +65,7 @@ class ApproxDistinctResultVerifier : public ResultVerifier { // Compute count_distinct(x) over 'input' over 'frame'. void initializeWindow( const std::vector& input, + const std::vector& projections, const std::vector& partitionByKeys, const std::vector& /*sortingKeysAndOrders*/, const core::WindowNode::Function& function, @@ -70,6 +73,7 @@ class ApproxDistinctResultVerifier : public ResultVerifier { const std::string& windowName) override { auto plan = PlanBuilder() .values(input) + .projectExpressions(projections) .window({makeCountDistinctWindowCall(function, frame)}) .planNode(); diff --git a/velox/functions/prestosql/fuzzer/ApproxPercentileResultVerifier.h b/velox/functions/prestosql/fuzzer/ApproxPercentileResultVerifier.h index 63a3becf3095..71b2b44d7d8d 100644 --- a/velox/functions/prestosql/fuzzer/ApproxPercentileResultVerifier.h +++ b/velox/functions/prestosql/fuzzer/ApproxPercentileResultVerifier.h @@ -41,6 +41,7 @@ class ApproxPercentileResultVerifier : public ResultVerifier { // Compute the range of percentiles represented by each of the input values. void initialize( const std::vector& input, + const std::vector& projections, const std::vector& groupingKeys, const core::AggregationNode::Aggregate& aggregate, const std::string& aggregateName) override { @@ -60,13 +61,14 @@ class ApproxPercentileResultVerifier : public ResultVerifier { extractPercentileAndAccuracy(aggregate.call, input); // Compute percentiles for all values. - allRanges_ = - computePercentiles(input, valueField, weightField, aggregate.mask); + allRanges_ = computePercentiles( + input, projections, valueField, weightField, aggregate.mask); VELOX_CHECK_LE(allRanges_->size(), numInputs); } void initializeWindow( const std::vector& input, + const std::vector& projections, const std::vector& partitionByKeys, const std::vector& sortingKeysAndOrders, const core::WindowNode::Function& function, @@ -85,6 +87,7 @@ class ApproxPercentileResultVerifier : public ResultVerifier { allRanges_ = computePercentilesForWindow( input, + projections, valueField, weightField, sortingKeysAndOrders, @@ -238,6 +241,7 @@ class ApproxPercentileResultVerifier : public ResultVerifier { // followed by min_pct and max_pct columns. RowVectorPtr computePercentiles( const std::vector& input, + const std::vector& inputProjections, const std::string& valueField, const std::optional& weightField, const core::FieldAccessTypedExprPtr& mask) { @@ -251,7 +255,7 @@ class ApproxPercentileResultVerifier : public ResultVerifier { weightField.has_value() ? weightField.value() : "1::bigint")); PlanBuilder planBuilder; - planBuilder.values(input); + planBuilder.values(input).projectExpressions(inputProjections); if (mask != nullptr) { planBuilder.filter(mask->name()); @@ -422,6 +426,7 @@ class ApproxPercentileResultVerifier : public ResultVerifier { // row_num RowVectorPtr computePercentilesForWindow( const std::vector& input, + const std::vector& inputProjections, const std::string& valueField, const std::optional& weightField, const std::vector& sortingKeysAndOrders, @@ -453,7 +458,10 @@ class ApproxPercentileResultVerifier : public ResultVerifier { } PlanBuilder planBuilder; - planBuilder.values(input).project(projections).filter("w > 0"); + planBuilder.values(input) + .projectExpressions(inputProjections) + .project(projections) + .filter("w > 0"); auto partitionByKeysWithRowNumber = getPartitionByClause(append(groupingKeys_, {"row_number"})); diff --git a/velox/functions/prestosql/fuzzer/ArbitraryResultVerifier.h b/velox/functions/prestosql/fuzzer/ArbitraryResultVerifier.h index 99161c336f74..337be08243b2 100644 --- a/velox/functions/prestosql/fuzzer/ArbitraryResultVerifier.h +++ b/velox/functions/prestosql/fuzzer/ArbitraryResultVerifier.h @@ -38,6 +38,7 @@ class ArbitraryResultVerifier : public ResultVerifier { void initialize( const std::vector& input, + const std::vector& projections, const std::vector& groupingKeys, const core::AggregationNode::Aggregate& aggregate, const std::string& aggregateName) override { @@ -58,6 +59,7 @@ class ArbitraryResultVerifier : public ResultVerifier { auto plan = PlanBuilder(planNodeIdGenerator, input[0]->pool()) .values(input) + .projectExpressions(projections) .singleAggregation(groupingKeys, {makeArrayAggCall(aggregate)}) .project(projectColumns) .planNode(); diff --git a/velox/functions/prestosql/fuzzer/MinMaxByResultVerifier.cpp b/velox/functions/prestosql/fuzzer/MinMaxByResultVerifier.cpp index a6f1a1fd23f4..9fd077901fee 100644 --- a/velox/functions/prestosql/fuzzer/MinMaxByResultVerifier.cpp +++ b/velox/functions/prestosql/fuzzer/MinMaxByResultVerifier.cpp @@ -20,6 +20,7 @@ namespace facebook::velox::exec::test { void MinMaxByResultVerifier::initialize( const std::vector& input, + const std::vector& projections, const std::vector& groupingKeys, const core::AggregationNode::Aggregate& aggregate, const std::string& aggregateName) { @@ -95,7 +96,9 @@ void MinMaxByResultVerifier::initialize( // GROUP BY // b auto planNodeIdGenerator = std::make_shared(); - auto plan = PlanBuilder(planNodeIdGenerator, input[0]->pool()).values(input); + auto plan = PlanBuilder(planNodeIdGenerator, input[0]->pool()) + .values(input) + .projectExpressions(projections); // Filter out masked rows first so that groups with all rows filtered out // won't take a row_number during the filtering later. if (aggregate.mask != nullptr) { diff --git a/velox/functions/prestosql/fuzzer/MinMaxByResultVerifier.h b/velox/functions/prestosql/fuzzer/MinMaxByResultVerifier.h index a43aa2da5128..7d8dc5f787e6 100644 --- a/velox/functions/prestosql/fuzzer/MinMaxByResultVerifier.h +++ b/velox/functions/prestosql/fuzzer/MinMaxByResultVerifier.h @@ -58,6 +58,7 @@ class MinMaxByResultVerifier : public ResultVerifier { void initialize( const std::vector& input, + const std::vector& projections, const std::vector& groupingKeys, const core::AggregationNode::Aggregate& aggregate, const std::string& aggregateName) override;