Skip to content

Commit

Permalink
Enable Spark query runner in aggregate fuzzer test
Browse files Browse the repository at this point in the history
  • Loading branch information
rui-mo committed Dec 12, 2024
1 parent b44ffc9 commit dc4e10b
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 75 deletions.
53 changes: 0 additions & 53 deletions .github/workflows/experimental.yml
Original file line number Diff line number Diff line change
Expand Up @@ -85,18 +85,6 @@ jobs:
name: aggregation
path: velox/_build/debug/velox/functions/prestosql/fuzzer/velox_aggregation_fuzzer_test

- name: Upload spark aggregation fuzzer
uses: actions/upload-artifact@v3
with:
name: spark_aggregation_fuzzer
path: velox/_build/debug/velox/functions/sparksql/fuzzer/spark_aggregation_fuzzer_test

- name: Upload aggregation fuzzer
uses: actions/upload-artifact@v3
with:
name: aggregation
path: velox/_build/debug/velox/functions/prestosql/fuzzer/velox_aggregation_fuzzer_test

- name: Upload join fuzzer
uses: actions/upload-artifact@v3
with:
Expand Down Expand Up @@ -180,47 +168,6 @@ jobs:
/tmp/aggregate_fuzzer_repro
/tmp/server.log
linux-spark-fuzzer-run:
runs-on: ubuntu-latest
needs: compile
timeout-minutes: 120
steps:

- name: "Checkout Repo"
uses: actions/checkout@v3
with:
ref: "${{ inputs.ref || 'main' }}"

- name: "Install dependencies"
run: source ./scripts/setup-ubuntu.sh && install_apt_deps

- name: Download spark aggregation fuzzer
uses: actions/download-artifact@v3
with:
name: spark_aggregation_fuzzer

- name: "Run Spark Aggregate Fuzzer"
run: |
mkdir -p /tmp/spark_aggregate_fuzzer_repro/
chmod -R 777 /tmp/spark_aggregate_fuzzer_repro
chmod +x spark_aggregation_fuzzer_test
./spark_aggregation_fuzzer_test \
--seed ${RANDOM} \
--duration_sec 1800 \
--logtostderr=1 \
--minloglevel=0 \
--repro_persist_path=/tmp/spark_aggregate_fuzzer_repro \
--enable_sorted_aggregations=true \
&& echo -e "\n\nSpark Aggregation Fuzzer run finished successfully."
- name: Archive Spark aggregate production artifacts
if: always()
uses: actions/upload-artifact@v3
with:
name: spark-agg-fuzzer-failure-artifacts
path: |
/tmp/spark_aggregate_fuzzer_repro
linux-join-fuzzer-run:
runs-on: ubuntu-latest
needs: compile
Expand Down
6 changes: 5 additions & 1 deletion .github/workflows/scheduled.yml
Original file line number Diff line number Diff line change
Expand Up @@ -457,7 +457,7 @@ jobs:
spark-aggregate-fuzzer-run:
name: Spark Aggregate Fuzzer
runs-on: ubuntu-latest
container: ghcr.io/facebookincubator/velox-dev:centos9
container: ghcr.io/facebookincubator/velox-dev:spark-server
needs: compile
timeout-minutes: 60
steps:
Expand All @@ -469,12 +469,16 @@ jobs:

- name: Run Spark Aggregate Fuzzer
run: |
bash /opt/start-spark.sh
# Sleep for 60 seconds to allow Spark server to start.
sleep 60
mkdir -p /tmp/spark_aggregate_fuzzer_repro/logs/
chmod -R 777 /tmp/spark_aggregate_fuzzer_repro
chmod +x spark_aggregation_fuzzer_test
./spark_aggregation_fuzzer_test \
--seed ${RANDOM} \
--duration_sec $DURATION \
--enable_sorted_aggregations=false \
--minloglevel=0 \
--stderrthreshold=2 \
--log_dir=/tmp/spark_aggregate_fuzzer_repro/logs \
Expand Down
2 changes: 2 additions & 0 deletions velox/exec/fuzzer/FuzzerUtil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,8 @@ void setupMemory(
void registerHiveConnector(
const std::unordered_map<std::string, std::string>& hiveConfigs) {
auto configs = hiveConfigs;
// Make sure not to run out of open file descriptors.
configs[connector::hive::HiveConfig::kNumCacheFileHandles] = "1000";
if (!connector::hasConnectorFactory(
connector::hive::HiveConnectorFactory::kHiveConnectorName)) {
connector::registerConnectorFactory(
Expand Down
39 changes: 18 additions & 21 deletions velox/functions/sparksql/fuzzer/SparkAggregationFuzzerTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,13 @@
#include <gtest/gtest.h>
#include <unordered_set>

#include "velox/dwio/parquet/RegisterParquetWriter.h"
#include "velox/exec/fuzzer/AggregationFuzzerOptions.h"
#include "velox/exec/fuzzer/AggregationFuzzerRunner.h"
#include "velox/exec/fuzzer/DuckQueryRunner.h"
#include "velox/exec/fuzzer/TransformResultVerifier.h"
#include "velox/functions/prestosql/registration/RegistrationFunctions.h"
#include "velox/functions/sparksql/aggregates/Register.h"
#include "velox/functions/sparksql/fuzzer/SparkQueryRunner.h"
#include "velox/serializers/CompactRowSerializer.h"
#include "velox/serializers/PrestoSerializer.h"
#include "velox/serializers/UnsafeRowSerializer.h"
Expand All @@ -45,6 +46,7 @@ DEFINE_string(
int main(int argc, char** argv) {
facebook::velox::functions::aggregate::sparksql::registerAggregateFunctions(
"", false);
facebook::velox::parquet::registerParquetWriterFactory();

::testing::InitGoogleTest(&argc, argv);

Expand All @@ -71,10 +73,13 @@ int main(int argc, char** argv) {
}
facebook::velox::memory::MemoryManager::initialize({});

// TODO: List of the functions that at some point crash or fail and need to
// be fixed before we can enable. Constant argument of bloom_filter_agg cause
// fuzzer test fail.
std::unordered_set<std::string> skipFunctions = {"bloom_filter_agg"};
// Spark does not provide user-accessible aggregate functions with the
// following names.
std::unordered_set<std::string> skipFunctions = {
"bloom_filter_agg",
"first_ignore_null",
"last_ignore_null",
"regr_replacement"};

using facebook::velox::exec::test::TransformResultVerifier;

Expand Down Expand Up @@ -113,21 +118,9 @@ int main(int argc, char** argv) {
size_t initialSeed = FLAGS_seed == 0 ? std::time(nullptr) : FLAGS_seed;
std::shared_ptr<facebook::velox::memory::MemoryPool> rootPool{
facebook::velox::memory::memoryManager()->addRootPool()};
auto duckQueryRunner =
std::make_unique<facebook::velox::exec::test::DuckQueryRunner>(
rootPool.get());
duckQueryRunner->disableAggregateFunctions(
{// https://github.com/facebookincubator/velox/issues/7677
"max_by",
"min_by",
// The skewness functions of Velox and DuckDB use different
// algorithms.
// https://github.com/facebookincubator/velox/issues/4845
"skewness",
// Spark's kurtosis uses Pearson's formula for calculating the kurtosis
// coefficient. Meanwhile, DuckDB employs the sample kurtosis calculation
// formula. The results from the two methods are completely different.
"kurtosis"});
auto sparkQueryRunner = std::make_unique<
facebook::velox::functions::sparksql::fuzzer::SparkQueryRunner>(
rootPool.get(), "localhost:15002", "fuzzer", "aggregate");

using Runner = facebook::velox::exec::test::AggregationFuzzerRunner;
using Options = facebook::velox::exec::test::AggregationFuzzerOptions;
Expand All @@ -137,5 +130,9 @@ int main(int argc, char** argv) {
options.skipFunctions = skipFunctions;
options.customVerificationFunctions = customVerificationFunctions;
options.orderableGroupKeys = true;
return Runner::run(initialSeed, std::move(duckQueryRunner), options);
options.timestampPrecision =
facebook::velox::VectorFuzzer::Options::TimestampPrecision::kMicroSeconds;
options.hiveConfigs = {
{facebook::velox::connector::hive::HiveConfig::kReadTimestampUnit, "6"}};
return Runner::run(initialSeed, std::move(sparkQueryRunner), options);
}

0 comments on commit dc4e10b

Please sign in to comment.