From 03d017fa3b62522c9c98ed72ca81126b3bfd680e Mon Sep 17 00:00:00 2001 From: Krishna Pai Date: Thu, 14 Mar 2024 17:02:40 -0700 Subject: [PATCH] Add support in AggregationFuzzer to compare against reference DB for non deterministic functions on unsorted inputs (#9009) Summary: **Why** Currently for aggregate functions with custom verifiers, we do not verify plans against the reference db. This is because for these functions the reference db results differ and require us to use custom verifiers - typically the ResultVerifier::compare api. The compare api requires VeloxVectors and previously the results of the reference query runner were in materialized row format and thus couldnt be called. Recently support was added to return the reference db results as velox vectors (https://github.com/facebookincubator/velox/issues/8880). With this we can now validate functions which require custom verifiers. **What** We add support in the AggregationFuzzer to use the new 'executeVector' api if supported by the refrence query runner, and use that to validate non deterministic functions that require a custom verifier. **Results** Currently with this change we have increased the number of plans being verified against reference db from 40 % to 80+%. ``` Total masked aggregations: 78 (14.58%) Total global aggregations: 41 (7.66%) Total group-by aggregations: 392 (73.27%) Total distinct aggregations: 49 (9.16%) Total aggregations over distinct inputs: 40 (7.48%) Total window expressions: 53 (9.91%) Total functions tested: 38 Total iterations requiring sorted inputs: 43 (8.04%) Total iterations verified against reference DB: 448 (83.74%) Total functions not verified (verification skipped / not supported by reference DB / reference DB failed): 53 (9.91%) / 6 (1.12%) / 3 (0.56%) Total failed functions: 27 (5.05%) ``` Experimental runs : https://github.com/facebookincubator/velox/actions/runs/8199583642/job/22443872635 https://github.com/facebookincubator/velox/actions/runs/8199583642/job/22424917125 **Notes** Certain functions have been skiplisted for following reasons: 1. map_union_sum : This requires us to use the lates presto due to a bug on Presto end in version .284 not supporting reals. 2. approx_set: Signatures supported by Velox are more than supported by Presto 3. min_by, max_by: Requires custom verifiers (order by not supported by Presto) 4. any_value: alias for arbitrary but this alias isnt present in Presto. Pull Request resolved: https://github.com/facebookincubator/velox/pull/9009 Reviewed By: kagamiori Differential Revision: D54911135 Pulled By: kgpai fbshipit-source-id: e2fc1fc91a367eb1a3538aa87e755e31ae1018a6 --- .github/workflows/experimental.yml | 2 +- velox/exec/fuzzer/AggregationFuzzer.cpp | 80 +++++++++++++------ velox/exec/fuzzer/AggregationFuzzerBase.cpp | 56 ++++++++++++- velox/exec/fuzzer/AggregationFuzzerBase.h | 21 +++++ .../fuzzer/AggregationFuzzerTest.cpp | 7 +- 5 files changed, 138 insertions(+), 28 deletions(-) diff --git a/.github/workflows/experimental.yml b/.github/workflows/experimental.yml index bdd3495bb0ba..b56473fa5e97 100644 --- a/.github/workflows/experimental.yml +++ b/.github/workflows/experimental.yml @@ -147,7 +147,7 @@ jobs: ls -lR $PRESTO_HOME/etc /opt/start-prestojava.sh > /tmp/server.log 2>&1 & # Sleep for 60 seconds to allow Presto server to start. - sleep 60 + sleep 60 /opt/presto-cli --server 127.0.0.1:8080 --execute 'CREATE SCHEMA hive.tpch;' mkdir -p /tmp/aggregate_fuzzer_repro/ rm -rfv /tmp/aggregate_fuzzer_repro/* diff --git a/velox/exec/fuzzer/AggregationFuzzer.cpp b/velox/exec/fuzzer/AggregationFuzzer.cpp index ce851a3a15fd..e97cc48fd191 100644 --- a/velox/exec/fuzzer/AggregationFuzzer.cpp +++ b/velox/exec/fuzzer/AggregationFuzzer.cpp @@ -1004,6 +1004,27 @@ void AggregationFuzzer::Stats::print(size_t numIterations) const { AggregationFuzzerBase::Stats::print(numIterations); } +namespace { +// Merges a vector of RowVectors into one RowVector. +RowVectorPtr mergeRowVectors( + const std::vector& results, + velox::memory::MemoryPool* pool) { + auto totalCount = 0; + for (const auto& result : results) { + totalCount += result->size(); + } + auto copy = + BaseVector::create(results[0]->type(), totalCount, pool); + auto copyCount = 0; + for (const auto& result : results) { + copy->copy(result.get(), copyCount, 0, result->size()); + copyCount += result->size(); + } + return copy; +} + +} // namespace + bool AggregationFuzzer::compareEquivalentPlanResults( const std::vector& plans, bool customVerification, @@ -1018,39 +1039,52 @@ bool AggregationFuzzer::compareEquivalentPlanResults( ++stats_.numFailed; } - // TODO Use ResultVerifier::compare API to compare Velox results with - // reference DB results once reference query runner is updated to return - // results as Velox vectors. - std::optional expectedResult; + // If Velox successfully executes a plan, we attempt to verify + // the plan against the reference DB as follows: + // 1) If deterministic function (i.e. customVerification is false) + // then try and have the reference DB execute the plan and assert + // results are equal. + // 2) If Non deterministic function, and if the reference query runner + // supports Velox vectors then we have the reference DB execute the plan + // and use ResultVerifier::compare api (if supported ) to validate the + // results. + if (resultOrError.result != nullptr) { if (!customVerification) { auto referenceResult = computeReferenceResults(firstPlan, input); stats_.updateReferenceQueryStats(referenceResult.second); - expectedResult = referenceResult.first; - } else { - ++stats_.numVerificationSkipped; + auto expectedResult = referenceResult.first; - for (auto& verifier : customVerifiers) { - if (verifier != nullptr && verifier->supportsVerify()) { - VELOX_CHECK( - verifier->verify(resultOrError.result), - "Aggregation results failed custom verification"); + if (expectedResult) { + ++stats_.numVerified; + VELOX_CHECK( + assertEqualResults( + expectedResult.value(), + firstPlan->outputType(), + {resultOrError.result}), + "Velox and reference DB results don't match"); + LOG(INFO) << "Verified results against reference DB"; + } + } else if (referenceQueryRunner_->supportsVeloxVectorResults()) { + if (isSupportedType(firstPlan->outputType()) && + isSupportedType(input.front()->type())) { + auto referenceResult = + computeReferenceResultsAsVector(firstPlan, input); + stats_.updateReferenceQueryStats(referenceResult.second); + + if (referenceResult.first) { + velox::test::ResultOrError expected; + expected.result = + mergeRowVectors(referenceResult.first.value(), pool_.get()); + + compare( + resultOrError, customVerification, customVerifiers, expected); + ++stats_.numVerified; } } } } - if (expectedResult && resultOrError.result) { - ++stats_.numVerified; - VELOX_CHECK( - assertEqualResults( - expectedResult.value(), - firstPlan->outputType(), - {resultOrError.result}), - "Velox and reference DB results don't match"); - LOG(INFO) << "Verified results against reference DB"; - } - testPlans( plans, customVerification, diff --git a/velox/exec/fuzzer/AggregationFuzzerBase.cpp b/velox/exec/fuzzer/AggregationFuzzerBase.cpp index 0d28f6177deb..82866a680de6 100644 --- a/velox/exec/fuzzer/AggregationFuzzerBase.cpp +++ b/velox/exec/fuzzer/AggregationFuzzerBase.cpp @@ -87,6 +87,21 @@ DEFINE_bool( namespace facebook::velox::exec::test { +bool AggregationFuzzerBase::isSupportedType(const TypePtr& type) const { + // Date / IntervalDayTime/ Unknown are not currently supported by DWRF. + if (type->isDate() || type->isIntervalDayTime() || type->isUnKnown()) { + return false; + } + + for (auto i = 0; i < type->size(); ++i) { + if (!isSupportedType(type->childAt(i))) { + return false; + } + } + + return true; +} + bool AggregationFuzzerBase::addSignature( const std::string& name, const FunctionSignaturePtr& signature) { @@ -445,15 +460,39 @@ AggregationFuzzerBase::computeReferenceResults( referenceQueryRunner_->execute( sql.value(), input, plan->outputType()), ReferenceQueryErrorCode::kSuccess); - } catch (std::exception& e) { - // ++stats_.numReferenceQueryFailed; + } catch (...) { + LOG(WARNING) << "Query failed in the reference DB"; + return std::make_pair( + std::nullopt, ReferenceQueryErrorCode::kReferenceQueryFail); + } + } + + LOG(INFO) << "Query not supported by the reference DB"; + return std::make_pair( + std::nullopt, ReferenceQueryErrorCode::kReferenceQueryUnsupported); +} + +std::pair< + std::optional>, + AggregationFuzzerBase::ReferenceQueryErrorCode> +AggregationFuzzerBase::computeReferenceResultsAsVector( + const core::PlanNodePtr& plan, + const std::vector& input) { + VELOX_CHECK(referenceQueryRunner_->supportsVeloxVectorResults()); + + if (auto sql = referenceQueryRunner_->toSql(plan)) { + try { + return std::make_pair( + referenceQueryRunner_->executeVector( + sql.value(), input, plan->outputType()), + ReferenceQueryErrorCode::kSuccess); + } catch (...) { LOG(WARNING) << "Query failed in the reference DB"; return std::make_pair( std::nullopt, ReferenceQueryErrorCode::kReferenceQueryFail); } } else { LOG(INFO) << "Query not supported by the reference DB"; - // ++stats_.numVerificationNotSupported; } return std::make_pair( @@ -474,7 +513,15 @@ void AggregationFuzzerBase::testPlan( injectSpill, abandonPartial, maxDrivers); + compare(actual, customVerification, customVerifiers, expected); +} +void AggregationFuzzerBase::compare( + const velox::test::ResultOrError& actual, + bool customVerification, + const std::vector>& customVerifiers, + const velox::test::ResultOrError& expected) { + // Compare results or exceptions (if any). Fail is anything is different. if (FLAGS_enable_oom_injection) { // If OOM injection is enabled and we've made it this far and the test // is considered a success. We don't bother checking the results. @@ -495,6 +542,9 @@ void AggregationFuzzerBase::testPlan( return; } + VELOX_CHECK_NOT_NULL(expected.result); + VELOX_CHECK_NOT_NULL(actual.result); + VELOX_CHECK_EQ( expected.result->size(), actual.result->size(), diff --git a/velox/exec/fuzzer/AggregationFuzzerBase.h b/velox/exec/fuzzer/AggregationFuzzerBase.h index 4b689853182f..fec35c5e2da7 100644 --- a/velox/exec/fuzzer/AggregationFuzzerBase.h +++ b/velox/exec/fuzzer/AggregationFuzzerBase.h @@ -224,6 +224,27 @@ class AggregationFuzzerBase { bool abandonPartial = false, int32_t maxDrivers = 2); + // Will throw if referenceQueryRunner doesn't support + // returning results as a vector. + std::pair< + std::optional>, + AggregationFuzzerBase::ReferenceQueryErrorCode> + computeReferenceResultsAsVector( + const core::PlanNodePtr& plan, + const std::vector& input); + + void compare( + const velox::test::ResultOrError& actual, + bool customVerification, + const std::vector>& customVerifiers, + const velox::test::ResultOrError& expected); + + /// Returns false if the type or its children are unsupported. + /// Currently returns false if type is Date,IntervalDayTime or Unknown. + /// @param type + /// @return bool + bool isSupportedType(const TypePtr& type) const; + // @param customVerification If false, results are compared as is. Otherwise, // only row counts are compared. // @param customVerifiers Custom verifier for each aggregate function. These diff --git a/velox/functions/prestosql/fuzzer/AggregationFuzzerTest.cpp b/velox/functions/prestosql/fuzzer/AggregationFuzzerTest.cpp index 733ab5237feb..c9d6bf137e11 100644 --- a/velox/functions/prestosql/fuzzer/AggregationFuzzerTest.cpp +++ b/velox/functions/prestosql/fuzzer/AggregationFuzzerTest.cpp @@ -106,7 +106,12 @@ int main(int argc, char** argv) { "stddev_pop", // Lambda functions are not supported yet. "reduce_agg", - }; + "max_data_size_for_stats", + "map_union_sum", + "approx_set", + "min_by", + "max_by", + "any_value"}; using facebook::velox::exec::test::ApproxDistinctResultVerifier; using facebook::velox::exec::test::ApproxPercentileResultVerifier;