Skip to content

Commit

Permalink
Enable Spark query runner in aggregation fuzzer test
Browse files Browse the repository at this point in the history
  • Loading branch information
rui-mo committed Jun 20, 2024
1 parent 1a50a8a commit a356700
Show file tree
Hide file tree
Showing 23 changed files with 4,747 additions and 25 deletions.
1 change: 1 addition & 0 deletions .github/workflows/linux-build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ jobs:
source scripts/setup-centos9.sh
install_cuda ${CUDA_VERSION}
fi
source scripts/setup-centos8.sh && install_grpc
- uses: assignUser/stash/restore@v1
with:
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/scheduled.yml
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ jobs:
env:
EXTRA_CMAKE_FLAGS: "-DVELOX_ENABLE_ARROW=ON -DVELOX_BUILD_PYTHON_PACKAGE=ON ${{ inputs.extraCMakeFlags }}"
run: |
source scripts/setup-centos8.sh && install_grpc
EXTRA_CMAKE_FLAGS="-DPYTHON_EXECUTABLE=$(which python3) $EXTRA_CMAKE_FLAGS"
make debug
Expand Down
712 changes: 712 additions & 0 deletions CMake/Findgrpc.cmake

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ option(VELOX_ENABLE_S3 "Build S3 Connector" OFF)
option(VELOX_ENABLE_GCS "Build GCS Connector" OFF)
option(VELOX_ENABLE_ABFS "Build Abfs Connector" OFF)
option(VELOX_ENABLE_HDFS "Build Hdfs Connector" OFF)
option(VELOX_ENABLE_PARQUET "Enable Parquet support" OFF)
option(VELOX_ENABLE_PARQUET "Enable Parquet support" ON)
option(VELOX_ENABLE_ARROW "Enable Arrow support" OFF)
option(VELOX_ENABLE_REMOTE_FUNCTIONS "Enable remote function support" OFF)
option(VELOX_ENABLE_CCACHE "Use ccache if installed." ON)
Expand Down
20 changes: 20 additions & 0 deletions scripts/setup-centos8.sh
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,26 @@ function install_cuda {
yum install -y cuda-nvcc-$(echo $1 | tr '.' '-') cuda-cudart-devel-$(echo $1 | tr '.' '-')
}

function install_grpc {
git clone https://github.com/grpc/grpc.git --branch v1.50.0 --single-branch
(
cd grpc
git submodule update --init
mkdir -p cmake/build
cd cmake/build
cmake ../.. -DgRPC_INSTALL=ON \
-DCMAKE_BUILD_TYPE=Release \
-DgRPC_ABSL_PROVIDER=module \
-DgRPC_CARES_PROVIDER=module \
-DgRPC_PROTOBUF_PROVIDER=module \
-DgRPC_RE2_PROVIDER=package \
-DgRPC_SSL_PROVIDER=package \
-DgRPC_ZLIB_PROVIDER=package
make "-j$(nproc)"
$SUDO make install
)
}

function install_velox_deps {
run_and_time install_velox_deps_from_dnf
run_and_time install_conda
Expand Down
21 changes: 21 additions & 0 deletions scripts/setup-ubuntu.sh
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,26 @@ function install_cuda {
$SUDO apt install -y cuda-nvcc-$(echo $1 | tr '.' '-') cuda-cudart-dev-$(echo $1 | tr '.' '-')
}

function install_grpc {
git clone https://github.com/grpc/grpc.git --branch v1.50.0 --single-branch
(
cd grpc
git submodule update --init
mkdir -p cmake/build
cd cmake/build
cmake ../.. -DgRPC_INSTALL=ON \
-DCMAKE_BUILD_TYPE=Release \
-DgRPC_ABSL_PROVIDER=module \
-DgRPC_CARES_PROVIDER=module \
-DgRPC_PROTOBUF_PROVIDER=module \
-DgRPC_RE2_PROVIDER=package \
-DgRPC_SSL_PROVIDER=package \
-DgRPC_ZLIB_PROVIDER=package
make "-j$(nproc)"
$SUDO make install
)
}

function install_velox_deps {
run_and_time install_velox_deps_from_apt
run_and_time install_fmt
Expand All @@ -184,6 +204,7 @@ function install_velox_deps {
function install_apt_deps {
install_build_prerequisites
install_velox_deps_from_apt
run_and_time install_grpc
}

(return 2> /dev/null) && return # If script was sourced, don't run commands.
Expand Down
3 changes: 2 additions & 1 deletion velox/dwio/common/Options.h
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,8 @@ class RowReaderOptions {
uint64_t skipRows_ = 0;
std::shared_ptr<UnitLoaderFactory> unitLoaderFactory_;

TimestampPrecision timestampPrecision_ = TimestampPrecision::kMilliseconds;
// After https://github.com/facebookincubator/velox/pull/4680, the precision could be controlled via HiveConfig.
TimestampPrecision timestampPrecision_ = TimestampPrecision::kMicroseconds;

public:
RowReaderOptions() noexcept
Expand Down
15 changes: 12 additions & 3 deletions velox/exec/fuzzer/AggregationFuzzerBase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -481,8 +481,12 @@ AggregationFuzzerBase::computeReferenceResults(
referenceQueryRunner_->execute(
sql.value(), input, plan->outputType()),
ReferenceQueryErrorCode::kSuccess);
} catch (VeloxRuntimeError& e) {
LOG(WARNING) << "Query failed in the reference DB: " << e.message()
<< ", query: " << sql.value() << ", seed: " << currentSeed_;
} catch (...) {
LOG(WARNING) << "Query failed in the reference DB";
LOG(WARNING) << "Query failed in the reference DB, query: "
<< sql.value() << ", seed: " << currentSeed_;
return std::make_pair(
std::nullopt, ReferenceQueryErrorCode::kReferenceQueryFail);
}
Expand All @@ -507,8 +511,12 @@ AggregationFuzzerBase::computeReferenceResultsAsVector(
referenceQueryRunner_->executeVector(
sql.value(), input, plan->outputType()),
ReferenceQueryErrorCode::kSuccess);
} catch (VeloxRuntimeError& e) {
LOG(WARNING) << "Query failed in the reference DB: " << e.message()
<< ", query: " << sql.value() << ", seed: " << currentSeed_;
} catch (...) {
LOG(WARNING) << "Query failed in the reference DB";
LOG(WARNING) << "Query failed in the reference DB, query: "
<< sql.value() << ", seed: " << currentSeed_;
return std::make_pair(
std::nullopt, ReferenceQueryErrorCode::kReferenceQueryFail);
}
Expand Down Expand Up @@ -560,7 +568,8 @@ void AggregationFuzzerBase::compare(
if (!customVerification) {
VELOX_CHECK(
assertEqualResults({expected.result}, {actual.result}),
"Logically equivalent plans produced different results");
"Logically equivalent plans produced different results, seed: {}",
currentSeed_);
return;
}

Expand Down
5 changes: 4 additions & 1 deletion velox/exec/fuzzer/AggregationFuzzerBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,14 @@ class AggregationFuzzerBase {
referenceQueryRunner_{std::move(referenceQueryRunner)},
vectorFuzzer_{getFuzzerOptions(timestampPrecision), pool_.get()} {
filesystems::registerLocalFileSystem();
// Make sure not to run out of open file descriptors.
const std::unordered_map<std::string, std::string> hiveConfig = {
{connector::hive::HiveConfig::kNumCacheFileHandles, "1000"}};
auto hiveConnector =
connector::getConnectorFactory(
connector::hive::HiveConnectorFactory::kHiveConnectorName)
->newConnector(
kHiveConnectorId, std::make_shared<core::MemConfig>());
kHiveConnectorId, std::make_shared<core::MemConfig>(hiveConfig));
connector::registerConnector(hiveConnector);

seed(initialSeed);
Expand Down
153 changes: 152 additions & 1 deletion velox/functions/sparksql/fuzzer/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,166 @@
# See the License for the specific language governing permissions and
# limitations under the License.

# Set up Spark connect
file(
GLOB PROTO_FILES
RELATIVE ${PROJECT_SOURCE_DIR}
${CMAKE_CURRENT_SOURCE_DIR}/proto/spark/connect/*.proto)
foreach(PROTO ${PROTO_FILES})
get_filename_component(PROTO_DIR ${PROTO} DIRECTORY)
get_filename_component(PROTO_NAME ${PROTO} NAME_WE)
set(PROTO_OUTPUT_DIR ${PROJECT_BINARY_DIR}/${PROTO_DIR})
list(APPEND PROTO_SRCS "${PROTO_OUTPUT_DIR}/${PROTO_NAME}.pb.cc")
list(APPEND PROTO_HDRS "${PROTO_OUTPUT_DIR}/${PROTO_NAME}.pb.h")
list(APPEND GRPC_SRCS "${PROTO_OUTPUT_DIR}/${PROTO_NAME}.grpc.pb.cc")
list(APPEND GRPC_HDRS "${PROTO_OUTPUT_DIR}/${PROTO_NAME}.grpc.pb.h")
list(APPEND PROTO_FILES_FULL
"${PROJECT_SOURCE_DIR}/${PROTO_DIR}/${PROTO_NAME}.proto")
endforeach()

set(PROTO_OUTPUT_FILES ${PROTO_HDRS} ${PROTO_SRCS})
set_source_files_properties(${PROTO_OUTPUT_FILES} PROPERTIES GENERATED TRUE)

set(GRPC_OUTPUT_FILES ${GRPC_HDRS} ${GRPC_SRCS})
set_source_files_properties(${GRPC_OUTPUT_FILES} PROPERTIES GENERATED TRUE)

# Generate Spark connect hearders and sources
add_custom_command(
OUTPUT ${PROTO_OUTPUT_FILES}
COMMAND
${Protobuf_PROTOC_EXECUTABLE} --proto_path ${CMAKE_SOURCE_DIR}/ --proto_path
${Protobuf_INCLUDE_DIRS} --cpp_out ${CMAKE_BINARY_DIR} ${PROTO_FILES_FULL}
DEPENDS ${Protobuf_PROTOC_EXECUTABLE}
COMMENT "Running PROTO compiler"
VERBATIM)
add_custom_target(spark_connect_proto ALL DEPENDS ${PROTO_OUTPUT_FILES})
# add_dependencies(spark_connect_proto protobuf::libprotobuf)

find_program(GRPC_CPP_PLUGIN grpc_cpp_plugin)
# Generate grpc headers and sources
add_custom_command(
OUTPUT ${GRPC_OUTPUT_FILES}
COMMAND
${Protobuf_PROTOC_EXECUTABLE} --proto_path ${CMAKE_SOURCE_DIR}/ --proto_path
${Protobuf_INCLUDE_DIRS} --grpc_out=${CMAKE_BINARY_DIR}
--plugin=protoc-gen-grpc=${GRPC_CPP_PLUGIN} ${PROTO_FILES_FULL}
DEPENDS ${Protobuf_PROTOC_EXECUTABLE}
COMMENT "Running gRPC C++ protocol buffer compiler"
VERBATIM)
add_custom_target(spark_connect_grpc_proto ALL DEPENDS ${GRPC_OUTPUT_FILES})

set(SRCS ${PROTO_SRCS} ${GRPC_SRCS} SparkQueryRunner.cpp)

add_library(velox_spark_query_runner ${SRCS})
target_include_directories(velox_spark_query_runner
PUBLIC ${PROJECT_BINARY_DIR}/)

include(${PROJECT_SOURCE_DIR}/CMake/Findgrpc.cmake)
target_link_libraries(
velox_spark_query_runner
velox_fuzzer_util
velox_exec_test_lib
velox_arrow_bridge
grpc::grpc++
grpc::grpc
gpr::gpr
protobuf::libprotobuf
z::z
re2::re2
address_sorting::address_sorting
cares::cares
boringssl::ssl
boringssl::crypto
pthread
arrow
lz4::lz4
zstd::zstd
Snappy::snappy)

add_executable(spark_aggregation_fuzzer_test SparkAggregationFuzzerTest.cpp)

target_link_libraries(
spark_aggregation_fuzzer_test
velox_aggregation_fuzzer
velox_aggregation_fuzzer_base
velox_functions_spark_aggregates
velox_spark_query_runner
velox_fuzzer_util
velox_window
velox_vector_test_lib
gtest
gtest_main)
gtest_main
grpc::grpc++
grpc::grpc
gpr::gpr
protobuf::libprotobuf
z::z
re2::re2
address_sorting::address_sorting
cares::cares
upb::upb
boringssl::ssl
boringssl::crypto
absl::absl_statusor
absl::absl_status
absl::absl_cordz_info
absl::absl_cord
absl::absl_symbolize
absl::absl_demangle_internal
absl::absl_flags_parse
absl::absl_flags
absl::absl_cord_internal
absl::absl_bad_optional_access
absl::absl_throw_delegate
absl::absl_bad_variant_access
absl::absl_random_internal_pool_urbg
absl::absl_random_internal_randen
absl::absl_random_internal_seed_material
absl::absl_random_seed_gen_exception
absl::absl_random_internal_randen_hwaes
absl::absl_random_internal_randen_slow
absl::absl_graphcycles_internal
absl::absl_base
absl::absl_malloc_internal
absl::absl_examine_stack
absl::absl_debugging_internal
absl::absl_exponential_biased
absl::absl_cordz_handle
absl::absl_log_severity
absl::absl_stacktrace
absl::absl_time
absl::absl_cordz_functions
absl::absl_raw_logging_internal
absl::absl_str_format_internal
absl::absl_strerror
absl::absl_strings
absl::absl_strings_internal
absl::absl_raw_hash_set
absl::absl_hashtablez_sampler
absl::absl_hash
absl::absl_city
absl::absl_int128
absl::absl_low_level_hash
absl::absl_random_internal_randen_hwaes_impl
absl::absl_random_internal_platform
absl::absl_flags_internal
absl::absl_flags_usage_internal
absl::absl_flags_reflection
absl::absl_flags_commandlineflag
absl::absl_flags_private_handle_accessor
absl::absl_flags_commandlineflag_internal
absl::absl_flags_config
absl::absl_flags_program_name
absl::absl_flags_marshalling
absl::absl_synchronization
absl::absl_time_zone
absl::absl_spinlock_wait
pthread
arrow
lz4::lz4
zstd::zstd
Snappy::snappy)

if(${VELOX_BUILD_TESTING})
add_subdirectory(tests)
endif()
31 changes: 13 additions & 18 deletions velox/functions/sparksql/fuzzer/SparkAggregationFuzzerTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@

#include "velox/exec/fuzzer/AggregationFuzzerOptions.h"
#include "velox/exec/fuzzer/AggregationFuzzerRunner.h"
#include "velox/exec/fuzzer/DuckQueryRunner.h"
#include "velox/exec/fuzzer/TransformResultVerifier.h"
#include "velox/functions/prestosql/registration/RegistrationFunctions.h"
#include "velox/functions/sparksql/aggregates/Register.h"
#include "velox/functions/sparksql/fuzzer/SparkQueryRunner.h"

DEFINE_int64(
seed,
Expand Down Expand Up @@ -56,7 +56,11 @@ int main(int argc, char** argv) {
// TODO: List of the functions that at some point crash or fail and need to
// be fixed before we can enable. Constant argument of bloom_filter_agg cause
// fuzzer test fail.
std::unordered_set<std::string> skipFunctions = {"bloom_filter_agg"};
std::unordered_set<std::string> skipFunctions = {
"bloom_filter_agg",
"first_ignore_null",
"last_ignore_null",
"regr_replacement"};

using facebook::velox::exec::test::TransformResultVerifier;

Expand Down Expand Up @@ -85,21 +89,10 @@ int main(int argc, char** argv) {
};

size_t initialSeed = FLAGS_seed == 0 ? std::time(nullptr) : FLAGS_seed;
auto duckQueryRunner =
std::make_unique<facebook::velox::exec::test::DuckQueryRunner>();
duckQueryRunner->disableAggregateFunctions({
// https://github.com/facebookincubator/velox/issues/7677
"max_by",
"min_by",
// The skewness functions of Velox and DuckDB use different
// algorithms.
// https://github.com/facebookincubator/velox/issues/4845
"skewness",
// Spark's kurtosis uses Pearson's formula for calculating the kurtosis
// coefficient. Meanwhile, DuckDB employs the sample kurtosis calculation
// formula. The results from the two methods are completely different.
"kurtosis",
});

auto sparkQueryRunner = std::make_unique<
facebook::velox::functions::sparksql::fuzzer::SparkQueryRunner>(
"localhost:15002");

using Runner = facebook::velox::exec::test::AggregationFuzzerRunner;
using Options = facebook::velox::exec::test::AggregationFuzzerOptions;
Expand All @@ -108,5 +101,7 @@ int main(int argc, char** argv) {
options.onlyFunctions = FLAGS_only;
options.skipFunctions = skipFunctions;
options.customVerificationFunctions = customVerificationFunctions;
return Runner::run(initialSeed, std::move(duckQueryRunner), options);
options.timestampPrecision =
facebook::velox::VectorFuzzer::Options::TimestampPrecision::kMicroSeconds;
return Runner::run(initialSeed, std::move(sparkQueryRunner), options);
}
Loading

0 comments on commit a356700

Please sign in to comment.