Skip to content

Commit

Permalink
Add support in AggregationFuzzer to compare against reference DB for …
Browse files Browse the repository at this point in the history
…non deterministic functions on unsorted inputs (#9009)

Summary:
**Why**
 Currently for aggregate functions with custom verifiers, we do not verify plans against the reference db. This is because for these functions the reference db results differ and require us to use custom verifiers - typically the ResultVerifier::compare api. The compare api requires VeloxVectors and previously the results of the reference query runner were in materialized row format and thus couldnt be called. Recently support was added to return the reference db results as velox vectors (#8880). With this we can now validate functions which require custom verifiers.

**What**
We add support in the AggregationFuzzer to use the new 'executeVector' api if supported by the refrence query runner, and use that to validate non deterministic functions that require a custom verifier.

**Results**
Currently with this change we have increased the number of plans being verified against reference db from 40 % to 80+%.

```
 Total masked aggregations: 78 (14.58%)
Total global aggregations: 41 (7.66%)
Total group-by aggregations: 392 (73.27%)
Total distinct aggregations: 49 (9.16%)
 Total aggregations over distinct inputs: 40 (7.48%)
Total window expressions: 53 (9.91%)
 Total functions tested: 38
 Total iterations requiring sorted inputs: 43 (8.04%)
Total iterations verified against reference DB: 448 (83.74%)
 Total functions not verified (verification skipped / not supported by reference DB / reference DB failed): 53 (9.91%) / 6 (1.12%) / 3 (0.56%)
 Total failed functions: 27 (5.05%)
```

Experimental runs :
https://github.com/facebookincubator/velox/actions/runs/8199583642/job/22443872635
 https://github.com/facebookincubator/velox/actions/runs/8199583642/job/22424917125

 **Notes**
 Certain functions have been skiplisted for following reasons:
 1. map_union_sum : This requires us to use the lates presto due to a bug on Presto end in version .284 not supporting reals.
 2. approx_set: Signatures supported by Velox are more than supported by Presto
 3. min_by, max_by: Requires custom verifiers (order by not supported by Presto)
 4. any_value: alias for arbitrary but this alias isnt present in Presto.

Pull Request resolved: #9009

Reviewed By: kagamiori

Differential Revision: D54911135

Pulled By: kgpai

fbshipit-source-id: e2fc1fc91a367eb1a3538aa87e755e31ae1018a6
  • Loading branch information
kgpai authored and facebook-github-bot committed Mar 15, 2024
1 parent 8f95208 commit 03d017f
Show file tree
Hide file tree
Showing 5 changed files with 138 additions and 28 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/experimental.yml
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ jobs:
ls -lR $PRESTO_HOME/etc
/opt/start-prestojava.sh > /tmp/server.log 2>&1 &
# Sleep for 60 seconds to allow Presto server to start.
sleep 60
sleep 60
/opt/presto-cli --server 127.0.0.1:8080 --execute 'CREATE SCHEMA hive.tpch;'
mkdir -p /tmp/aggregate_fuzzer_repro/
rm -rfv /tmp/aggregate_fuzzer_repro/*
Expand Down
80 changes: 57 additions & 23 deletions velox/exec/fuzzer/AggregationFuzzer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1004,6 +1004,27 @@ void AggregationFuzzer::Stats::print(size_t numIterations) const {
AggregationFuzzerBase::Stats::print(numIterations);
}

namespace {
// Merges a vector of RowVectors into one RowVector.
RowVectorPtr mergeRowVectors(
const std::vector<RowVectorPtr>& results,
velox::memory::MemoryPool* pool) {
auto totalCount = 0;
for (const auto& result : results) {
totalCount += result->size();
}
auto copy =
BaseVector::create<RowVector>(results[0]->type(), totalCount, pool);
auto copyCount = 0;
for (const auto& result : results) {
copy->copy(result.get(), copyCount, 0, result->size());
copyCount += result->size();
}
return copy;
}

} // namespace

bool AggregationFuzzer::compareEquivalentPlanResults(
const std::vector<PlanWithSplits>& plans,
bool customVerification,
Expand All @@ -1018,39 +1039,52 @@ bool AggregationFuzzer::compareEquivalentPlanResults(
++stats_.numFailed;
}

// TODO Use ResultVerifier::compare API to compare Velox results with
// reference DB results once reference query runner is updated to return
// results as Velox vectors.
std::optional<MaterializedRowMultiset> expectedResult;
// If Velox successfully executes a plan, we attempt to verify
// the plan against the reference DB as follows:
// 1) If deterministic function (i.e. customVerification is false)
// then try and have the reference DB execute the plan and assert
// results are equal.
// 2) If Non deterministic function, and if the reference query runner
// supports Velox vectors then we have the reference DB execute the plan
// and use ResultVerifier::compare api (if supported ) to validate the
// results.

if (resultOrError.result != nullptr) {
if (!customVerification) {
auto referenceResult = computeReferenceResults(firstPlan, input);
stats_.updateReferenceQueryStats(referenceResult.second);
expectedResult = referenceResult.first;
} else {
++stats_.numVerificationSkipped;
auto expectedResult = referenceResult.first;

for (auto& verifier : customVerifiers) {
if (verifier != nullptr && verifier->supportsVerify()) {
VELOX_CHECK(
verifier->verify(resultOrError.result),
"Aggregation results failed custom verification");
if (expectedResult) {
++stats_.numVerified;
VELOX_CHECK(
assertEqualResults(
expectedResult.value(),
firstPlan->outputType(),
{resultOrError.result}),
"Velox and reference DB results don't match");
LOG(INFO) << "Verified results against reference DB";
}
} else if (referenceQueryRunner_->supportsVeloxVectorResults()) {
if (isSupportedType(firstPlan->outputType()) &&
isSupportedType(input.front()->type())) {
auto referenceResult =
computeReferenceResultsAsVector(firstPlan, input);
stats_.updateReferenceQueryStats(referenceResult.second);

if (referenceResult.first) {
velox::test::ResultOrError expected;
expected.result =
mergeRowVectors(referenceResult.first.value(), pool_.get());

compare(
resultOrError, customVerification, customVerifiers, expected);
++stats_.numVerified;
}
}
}
}

if (expectedResult && resultOrError.result) {
++stats_.numVerified;
VELOX_CHECK(
assertEqualResults(
expectedResult.value(),
firstPlan->outputType(),
{resultOrError.result}),
"Velox and reference DB results don't match");
LOG(INFO) << "Verified results against reference DB";
}

testPlans(
plans,
customVerification,
Expand Down
56 changes: 53 additions & 3 deletions velox/exec/fuzzer/AggregationFuzzerBase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,21 @@ DEFINE_bool(

namespace facebook::velox::exec::test {

bool AggregationFuzzerBase::isSupportedType(const TypePtr& type) const {
// Date / IntervalDayTime/ Unknown are not currently supported by DWRF.
if (type->isDate() || type->isIntervalDayTime() || type->isUnKnown()) {
return false;
}

for (auto i = 0; i < type->size(); ++i) {
if (!isSupportedType(type->childAt(i))) {
return false;
}
}

return true;
}

bool AggregationFuzzerBase::addSignature(
const std::string& name,
const FunctionSignaturePtr& signature) {
Expand Down Expand Up @@ -445,15 +460,39 @@ AggregationFuzzerBase::computeReferenceResults(
referenceQueryRunner_->execute(
sql.value(), input, plan->outputType()),
ReferenceQueryErrorCode::kSuccess);
} catch (std::exception& e) {
// ++stats_.numReferenceQueryFailed;
} catch (...) {
LOG(WARNING) << "Query failed in the reference DB";
return std::make_pair(
std::nullopt, ReferenceQueryErrorCode::kReferenceQueryFail);
}
}

LOG(INFO) << "Query not supported by the reference DB";
return std::make_pair(
std::nullopt, ReferenceQueryErrorCode::kReferenceQueryUnsupported);
}

std::pair<
std::optional<std::vector<RowVectorPtr>>,
AggregationFuzzerBase::ReferenceQueryErrorCode>
AggregationFuzzerBase::computeReferenceResultsAsVector(
const core::PlanNodePtr& plan,
const std::vector<RowVectorPtr>& input) {
VELOX_CHECK(referenceQueryRunner_->supportsVeloxVectorResults());

if (auto sql = referenceQueryRunner_->toSql(plan)) {
try {
return std::make_pair(
referenceQueryRunner_->executeVector(
sql.value(), input, plan->outputType()),
ReferenceQueryErrorCode::kSuccess);
} catch (...) {
LOG(WARNING) << "Query failed in the reference DB";
return std::make_pair(
std::nullopt, ReferenceQueryErrorCode::kReferenceQueryFail);
}
} else {
LOG(INFO) << "Query not supported by the reference DB";
// ++stats_.numVerificationNotSupported;
}

return std::make_pair(
Expand All @@ -474,7 +513,15 @@ void AggregationFuzzerBase::testPlan(
injectSpill,
abandonPartial,
maxDrivers);
compare(actual, customVerification, customVerifiers, expected);
}

void AggregationFuzzerBase::compare(
const velox::test::ResultOrError& actual,
bool customVerification,
const std::vector<std::shared_ptr<ResultVerifier>>& customVerifiers,
const velox::test::ResultOrError& expected) {
// Compare results or exceptions (if any). Fail is anything is different.
if (FLAGS_enable_oom_injection) {
// If OOM injection is enabled and we've made it this far and the test
// is considered a success. We don't bother checking the results.
Expand All @@ -495,6 +542,9 @@ void AggregationFuzzerBase::testPlan(
return;
}

VELOX_CHECK_NOT_NULL(expected.result);
VELOX_CHECK_NOT_NULL(actual.result);

VELOX_CHECK_EQ(
expected.result->size(),
actual.result->size(),
Expand Down
21 changes: 21 additions & 0 deletions velox/exec/fuzzer/AggregationFuzzerBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,27 @@ class AggregationFuzzerBase {
bool abandonPartial = false,
int32_t maxDrivers = 2);

// Will throw if referenceQueryRunner doesn't support
// returning results as a vector.
std::pair<
std::optional<std::vector<RowVectorPtr>>,
AggregationFuzzerBase::ReferenceQueryErrorCode>
computeReferenceResultsAsVector(
const core::PlanNodePtr& plan,
const std::vector<RowVectorPtr>& input);

void compare(
const velox::test::ResultOrError& actual,
bool customVerification,
const std::vector<std::shared_ptr<ResultVerifier>>& customVerifiers,
const velox::test::ResultOrError& expected);

/// Returns false if the type or its children are unsupported.
/// Currently returns false if type is Date,IntervalDayTime or Unknown.
/// @param type
/// @return bool
bool isSupportedType(const TypePtr& type) const;

// @param customVerification If false, results are compared as is. Otherwise,
// only row counts are compared.
// @param customVerifiers Custom verifier for each aggregate function. These
Expand Down
7 changes: 6 additions & 1 deletion velox/functions/prestosql/fuzzer/AggregationFuzzerTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,12 @@ int main(int argc, char** argv) {
"stddev_pop",
// Lambda functions are not supported yet.
"reduce_agg",
};
"max_data_size_for_stats",
"map_union_sum",
"approx_set",
"min_by",
"max_by",
"any_value"};

using facebook::velox::exec::test::ApproxDistinctResultVerifier;
using facebook::velox::exec::test::ApproxPercentileResultVerifier;
Expand Down

0 comments on commit 03d017f

Please sign in to comment.