From 0aec65c94ac1435a55c673b9899fafc30ab4a8f5 Mon Sep 17 00:00:00 2001 From: Xiaoxuan Meng Date: Fri, 20 Dec 2024 00:33:01 -0800 Subject: [PATCH] feat: Adds shuffle compression support (#11914) Summary: Adds per query shuffle compression support. Currently we configure compression kind in partition output buffer manager which enforces all the queries use the same compression kind and assume all the workers having the same compression kind which is not flexible not align with Presto java as well. This change removes the compression kind from partition output buffer manager and instead configure it through query config. Also the shuffle operators report the compression kind. The followup is to integrate with Prestissimo work by setting LZ4 compression kind if the shuffle compression session property is set. Note Presto java doesn't allow to configure compression kind to use. With Meta internal workloads, LZ4 compression kind can reduce e2e execution time by 20% with half of shuffle data volume reduction Reviewed By: tanjialiang, oerling Differential Revision: D67407045 --- velox/core/QueryConfig.h | 10 + velox/docs/configs.rst | 10 +- velox/docs/monitoring/stats.rst | 5 + velox/exec/Exchange.cpp | 25 +- velox/exec/Exchange.h | 2 +- velox/exec/Merge.cpp | 30 +- velox/exec/Merge.h | 5 + velox/exec/MergeSource.cpp | 3 +- velox/exec/Operator.h | 5 + velox/exec/OutputBufferManager.h | 20 +- velox/exec/PartitionedOutput.cpp | 27 +- velox/exec/PartitionedOutput.h | 4 +- velox/exec/tests/MultiFragmentTest.cpp | 864 +++++++++++++++---------- velox/serializers/RowSerializer.h | 29 +- velox/vector/VectorStream.h | 2 +- velox/vector/tests/VectorTest.cpp | 4 +- 16 files changed, 665 insertions(+), 380 deletions(-) diff --git a/velox/core/QueryConfig.h b/velox/core/QueryConfig.h index 90105af887e6..630bebdc8fc0 100644 --- a/velox/core/QueryConfig.h +++ b/velox/core/QueryConfig.h @@ -15,6 +15,7 @@ */ #pragma once +#include "velox/common/compression/Compression.h" #include "velox/common/config/Config.h" #include "velox/vector/TypeAliases.h" @@ -490,6 +491,11 @@ class QueryConfig { static constexpr const char* kTableScanScaleUpMemoryUsageRatio = "table_scan_scale_up_memory_usage_ratio"; + /// Specifies the shuffle compression kind which is defined by + /// CompressionKind. If it is CompressionKind_NONE, then no compression. + static constexpr const char* kShuffleCompressionKind = + "shuffle_compression_codec"; + bool selectiveNimbleReaderEnabled() const { return get(kSelectiveNimbleReaderEnabled, false); } @@ -906,6 +912,10 @@ class QueryConfig { return get(kTableScanScaleUpMemoryUsageRatio, 0.7); } + std::string shuffleCompressionKind() const { + return get(kShuffleCompressionKind, "none"); + } + template T get(const std::string& key, const T& defaultValue) const { return config_->get(key, defaultValue); diff --git a/velox/docs/configs.rst b/velox/docs/configs.rst index 9fe2534b1262..b2f7563fed42 100644 --- a/velox/docs/configs.rst +++ b/velox/docs/configs.rst @@ -148,6 +148,12 @@ Generic Configuration - integer - 16 - Byte length of the string prefix stored in the prefix-sort buffer. This doesn't include the null byte. + * - shuffle_compression_codec + - string + - none + - Specifies the compression algorithm type to compress the shuffle data to + trade CPU for network IO efficiency. The supported compression codecs + are: zlib, snappy, lzo, zstd, lz4 and gzip. none means no compression. .. _expression-evaluation-conf: @@ -355,8 +361,8 @@ Spilling - string - none - Specifies the compression algorithm type to compress the spilled data before write to disk to trade CPU for IO - efficiency. The supported compression codecs are: ZLIB, SNAPPY, LZO, ZSTD, LZ4 and GZIP. - NONE means no compression. + efficiency. The supported compression codecs are: zlib, snappy, lzo, zstd, lz4 and gzip. + none means no compression. * - spill_prefixsort_enabled - bool - false diff --git a/velox/docs/monitoring/stats.rst b/velox/docs/monitoring/stats.rst index a6feb1d32cb0..c6f9f5f91156 100644 --- a/velox/docs/monitoring/stats.rst +++ b/velox/docs/monitoring/stats.rst @@ -205,6 +205,11 @@ These stats are reported by shuffle operators. - Indicates the vector serde kind used by an operator for shuffle with 1 for Presto, 2 for CompactRow, 3 for UnsafeRow. It is reported by Exchange, MergeExchange and PartitionedOutput operators for now. + * - shuffleCompressionKind + - + - Indicates the compression kind used by an operator for shuffle. The + reported value is set to the corresponding CompressionKind enum with 0 + (CompressionKind_NONE) as no compression. PrefixSort ---------- diff --git a/velox/exec/Exchange.cpp b/velox/exec/Exchange.cpp index 8dcf41ccc7a8..c45e8c8b2af4 100644 --- a/velox/exec/Exchange.cpp +++ b/velox/exec/Exchange.cpp @@ -21,13 +21,14 @@ namespace facebook::velox::exec { namespace { std::unique_ptr getVectorSerdeOptions( + const core::QueryConfig& queryConfig, VectorSerde::Kind kind) { std::unique_ptr options = kind == VectorSerde::Kind::kPresto ? std::make_unique() : std::make_unique(); options->compressionKind = - OutputBufferManager::getInstance().lock()->compressionKind(); + common::stringToCompressionKind(queryConfig.shuffleCompressionKind()); return options; } } // namespace @@ -46,8 +47,10 @@ Exchange::Exchange( operatorType), preferredOutputBatchBytes_{ driverCtx->queryConfig().preferredOutputBatchBytes()}, - serdeKind_(exchangeNode->serdeKind()), - options_{getVectorSerdeOptions(serdeKind_)}, + serdeKind_{exchangeNode->serdeKind()}, + serdeOptions_{getVectorSerdeOptions( + operatorCtx_->driverCtx()->queryConfig(), + serdeKind_)}, processSplits_{operatorCtx_->driverCtx()->driverId == 0}, exchangeClient_{std::move(exchangeClient)} {} @@ -159,7 +162,7 @@ RowVectorPtr Exchange::getOutput() { outputType_, &result_, resultOffset, - options_.get()); + serdeOptions_.get()); resultOffset = result_->size(); } } @@ -186,7 +189,7 @@ RowVectorPtr Exchange::getOutput() { outputType_, &result_, resultOffset, - options_.get()); + serdeOptions_.get()); // We expect the row-wise deserialization to consume all the input into one // output vector. VELOX_CHECK(inputStream->atEnd()); @@ -212,9 +215,15 @@ void Exchange::close() { exchangeClient_->close(); } exchangeClient_ = nullptr; - stats_.wlock()->addRuntimeStat( - Operator::kShuffleSerdeKind, - RuntimeCounter(static_cast(serdeKind_))); + { + auto lockedStats = stats_.wlock(); + lockedStats->addRuntimeStat( + Operator::kShuffleSerdeKind, + RuntimeCounter(static_cast(serdeKind_))); + lockedStats->addRuntimeStat( + Operator::kShuffleCompressionKind, + RuntimeCounter(static_cast(serdeOptions_->compressionKind))); + } } void Exchange::recordExchangeClientStats() { diff --git a/velox/exec/Exchange.h b/velox/exec/Exchange.h index 45cdce8ceb59..3a1f32cac533 100644 --- a/velox/exec/Exchange.h +++ b/velox/exec/Exchange.h @@ -83,7 +83,7 @@ class Exchange : public SourceOperator { const VectorSerde::Kind serdeKind_; - const std::unique_ptr options_; + const std::unique_ptr serdeOptions_; /// True if this operator is responsible for fetching splits from the Task /// and passing these to ExchangeClient. diff --git a/velox/exec/Merge.cpp b/velox/exec/Merge.cpp index f25f327bc9d7..fa35981725b5 100644 --- a/velox/exec/Merge.cpp +++ b/velox/exec/Merge.cpp @@ -21,6 +21,19 @@ using facebook::velox::common::testutil::TestValue; namespace facebook::velox::exec { +namespace { +std::unique_ptr getVectorSerdeOptions( + const core::QueryConfig& queryConfig, + VectorSerde::Kind kind) { + std::unique_ptr options = + kind == VectorSerde::Kind::kPresto + ? std::make_unique() + : std::make_unique(); + options->compressionKind = + common::stringToCompressionKind(queryConfig.shuffleCompressionKind()); + return options; +} +} // namespace Merge::Merge( int32_t operatorId, @@ -305,7 +318,10 @@ MergeExchange::MergeExchange( mergeExchangeNode->sortingOrders(), mergeExchangeNode->id(), "MergeExchange"), - serde_(getNamedVectorSerde(mergeExchangeNode->serdeKind())) {} + serde_(getNamedVectorSerde(mergeExchangeNode->serdeKind())), + serdeOptions_(getVectorSerdeOptions( + driverCtx->queryConfig(), + mergeExchangeNode->serdeKind())) {} BlockingReason MergeExchange::addMergeSources(ContinueFuture* future) { if (operatorCtx_->driverCtx()->driverId != 0) { @@ -370,8 +386,14 @@ void MergeExchange::close() { source->close(); } Operator::close(); - stats_.wlock()->addRuntimeStat( - Operator::kShuffleSerdeKind, - RuntimeCounter(static_cast(serde_->kind()))); + { + auto lockedStats = stats_.wlock(); + lockedStats->addRuntimeStat( + Operator::kShuffleSerdeKind, + RuntimeCounter(static_cast(serde_->kind()))); + lockedStats->addRuntimeStat( + Operator::kShuffleCompressionKind, + RuntimeCounter(static_cast(serdeOptions_->compressionKind))); + } } } // namespace facebook::velox::exec diff --git a/velox/exec/Merge.h b/velox/exec/Merge.h index e8aa3749dd4d..b5c1a75e66c4 100644 --- a/velox/exec/Merge.h +++ b/velox/exec/Merge.h @@ -195,6 +195,10 @@ class MergeExchange : public Merge { return serde_; } + VectorSerde::Options* serdeOptions() const { + return serdeOptions_.get(); + } + void close() override; protected: @@ -202,6 +206,7 @@ class MergeExchange : public Merge { private: VectorSerde* const serde_; + const std::unique_ptr serdeOptions_; bool noMoreSplits_ = false; // Task Ids from all the splits we took to process so far. std::vector remoteSourceTaskIds_; diff --git a/velox/exec/MergeSource.cpp b/velox/exec/MergeSource.cpp index a319f2535afa..786bd40045cc 100644 --- a/velox/exec/MergeSource.cpp +++ b/velox/exec/MergeSource.cpp @@ -168,7 +168,8 @@ class MergeExchangeSource : public MergeSource { mergeExchange_->pool(), mergeExchange_->outputType(), mergeExchange_->serde(), - &data); + &data, + mergeExchange_->serdeOptions()); auto lockedStats = mergeExchange_->stats().wlock(); lockedStats->addInputVector(data->estimateFlatSize(), data->size()); diff --git a/velox/exec/Operator.h b/velox/exec/Operator.h index de06d78da65a..4a1b7f3f43ab 100644 --- a/velox/exec/Operator.h +++ b/velox/exec/Operator.h @@ -364,6 +364,11 @@ class Operator : public BaseRuntimeStatWriter { /// runtime stats value is the corresponding enum value. static inline const std::string kShuffleSerdeKind{"shuffleSerdeKind"}; + /// The compression kind used by an operator for shuffle. The recorded + /// runtime stats value is the corresponding enum value. + static inline const std::string kShuffleCompressionKind{ + "shuffleCompressionKind"}; + /// 'operatorId' is the initial index of the 'this' in the Driver's list of /// Operators. This is used as in index into OperatorStats arrays in the Task. /// 'planNodeId' is a query-level unique identifier of the PlanNode to which diff --git a/velox/exec/OutputBufferManager.h b/velox/exec/OutputBufferManager.h index 847532cd554e..038d42cdce77 100644 --- a/velox/exec/OutputBufferManager.h +++ b/velox/exec/OutputBufferManager.h @@ -23,15 +23,11 @@ class OutputBufferManager { public: /// Options for shuffle. This is initialized once and affects both /// PartitionedOutput and Exchange. This can be used for controlling - /// compression, protocol version and other matters where shuffle sides should + /// protocol version and other matters where shuffle sides should /// agree. - struct Options { - common::CompressionKind compressionKind{ - common::CompressionKind::CompressionKind_NONE}; - }; + struct Options {}; - OutputBufferManager(Options options) - : compressionKind_(options.compressionKind) {} + explicit OutputBufferManager(Options /*unused*/) {} void initializeTask( std::shared_ptr task, @@ -135,21 +131,11 @@ class OutputBufferManager { // Returns NULL if task not found. std::shared_ptr getBufferIfExists(const std::string& taskId); - void testingSetCompression(common::CompressionKind kind) { - *const_cast(&compressionKind_) = kind; - } - - common::CompressionKind compressionKind() const { - return compressionKind_; - } - private: // Retrieves the set of buffers for a query. // Throws an exception if buffer doesn't exist. std::shared_ptr getBuffer(const std::string& taskId); - const common::CompressionKind compressionKind_; - folly::Synchronized< std::unordered_map>, std::mutex> diff --git a/velox/exec/PartitionedOutput.cpp b/velox/exec/PartitionedOutput.cpp index 88663c9ad1e5..a08114210245 100644 --- a/velox/exec/PartitionedOutput.cpp +++ b/velox/exec/PartitionedOutput.cpp @@ -21,13 +21,14 @@ namespace facebook::velox::exec { namespace { std::unique_ptr getVectorSerdeOptions( + const core::QueryConfig& queryConfig, VectorSerde::Kind kind) { std::unique_ptr options = kind == VectorSerde::Kind::kPresto ? std::make_unique() : std::make_unique(); options->compressionKind = - OutputBufferManager::getInstance().lock()->compressionKind(); + common::stringToCompressionKind(queryConfig.shuffleCompressionKind()); options->minCompressionRatio = PartitionedOutput::minCompressionRatio(); return options; } @@ -38,14 +39,14 @@ Destination::Destination( const std::string& taskId, int destination, VectorSerde* serde, - VectorSerde::Options* options, + VectorSerde::Options* serdeOptions, memory::MemoryPool* pool, bool eagerFlush, std::function recordEnqueued) : taskId_(taskId), destination_(destination), serde_(serde), - options_(options), + serdeOptions_(serdeOptions), pool_(pool), eagerFlush_(eagerFlush), recordEnqueued_(std::move(recordEnqueued)) { @@ -89,7 +90,7 @@ BlockingReason Destination::advance( if (current_ == nullptr) { current_ = std::make_unique(pool_, serde_); const auto rowType = asRowType(output->type()); - current_->createStreamTree(rowType, rowsInCurrent_, options_); + current_->createStreamTree(rowType, rowsInCurrent_, serdeOptions_); } const auto rows = folly::Range(&rows_[firstRow], rowIdx_ - firstRow); @@ -200,7 +201,9 @@ PartitionedOutput::PartitionedOutput( .maxPartitionedOutputBufferSize()), eagerFlush_(eagerFlush), serde_(getNamedVectorSerde(planNode->serdeKind())), - options_(getVectorSerdeOptions(planNode->serdeKind())) { + serdeOptions_(getVectorSerdeOptions( + operatorCtx_->driverCtx()->queryConfig(), + planNode->serdeKind())) { if (!planNode->isPartitioned()) { VELOX_USER_CHECK_EQ(numDestinations_, 1); } @@ -256,7 +259,7 @@ void PartitionedOutput::initializeDestinations() { taskId, i, serde_, - options_.get(), + serdeOptions_.get(), pool(), eagerFlush_, [&](uint64_t bytes, uint64_t rows) { @@ -473,9 +476,15 @@ bool PartitionedOutput::isFinished() { void PartitionedOutput::close() { Operator::close(); - stats_.wlock()->addRuntimeStat( - Operator::kShuffleSerdeKind, - RuntimeCounter(static_cast(serde_->kind()))); + { + auto lockedStats = stats_.wlock(); + lockedStats->addRuntimeStat( + Operator::kShuffleSerdeKind, + RuntimeCounter(static_cast(serde_->kind()))); + lockedStats->addRuntimeStat( + Operator::kShuffleCompressionKind, + RuntimeCounter(static_cast(serdeOptions_->compressionKind))); + } destinations_.clear(); } diff --git a/velox/exec/PartitionedOutput.h b/velox/exec/PartitionedOutput.h index 699af6d65590..5a1c44cf0b19 100644 --- a/velox/exec/PartitionedOutput.h +++ b/velox/exec/PartitionedOutput.h @@ -105,7 +105,7 @@ class Destination { const std::string taskId_; const int destination_; VectorSerde* const serde_; - VectorSerde::Options* const options_; + VectorSerde::Options* const serdeOptions_; memory::MemoryPool* const pool_; const bool eagerFlush_; const std::function recordEnqueued_; @@ -220,7 +220,7 @@ class PartitionedOutput : public Operator { const int64_t maxBufferedBytes_; const bool eagerFlush_; VectorSerde* const serde_; - const std::unique_ptr options_; + const std::unique_ptr serdeOptions_; BlockingReason blockingReason_{BlockingReason::kNotBlocked}; ContinueFuture future_; diff --git a/velox/exec/tests/MultiFragmentTest.cpp b/velox/exec/tests/MultiFragmentTest.cpp index 1a8b3812fc5a..439bade62a2f 100644 --- a/velox/exec/tests/MultiFragmentTest.cpp +++ b/velox/exec/tests/MultiFragmentTest.cpp @@ -37,16 +37,29 @@ using facebook::velox::test::BatchMaker; namespace facebook::velox::exec { namespace { -class MultiFragmentTest - : public HiveConnectorTestBase, - public testing::WithParamInterface { +struct TestParam { + VectorSerde::Kind serdeKind; + common::CompressionKind compressionKind; +}; + +class MultiFragmentTest : public HiveConnectorTestBase, + public testing::WithParamInterface { public: - static std::vector getTestParams() { - const std::vector kinds( - {VectorSerde::Kind::kPresto, - VectorSerde::Kind::kCompactRow, - VectorSerde::Kind::kUnsafeRow}); - return kinds; + static std::vector getTestParams() { + std::vector params; + params.emplace_back( + VectorSerde::Kind::kPresto, common::CompressionKind_NONE); + params.emplace_back( + VectorSerde::Kind::kCompactRow, common::CompressionKind_NONE); + params.emplace_back( + VectorSerde::Kind::kUnsafeRow, common::CompressionKind_NONE); + params.emplace_back( + VectorSerde::Kind::kPresto, common::CompressionKind_LZ4); + params.emplace_back( + VectorSerde::Kind::kCompactRow, common::CompressionKind_LZ4); + params.emplace_back( + VectorSerde::Kind::kUnsafeRow, common::CompressionKind_LZ4); + return params; } protected: @@ -54,6 +67,8 @@ class MultiFragmentTest HiveConnectorTestBase::SetUp(); exec::ExchangeSource::factories().clear(); exec::ExchangeSource::registerFactory(createLocalExchangeSource); + configSettings_[core::QueryConfig::kShuffleCompressionKind] = + common::compressionKindToString(GetParam().compressionKind); } void TearDown() override { @@ -137,26 +152,6 @@ class MultiFragmentTest task->noMoreSplits("0"); } - std::shared_ptr assertQuery( - const core::PlanNodePtr& plan, - const std::vector& remoteTaskIds, - const std::string& duckDbSql, - std::optional> sortingKeys = std::nullopt) { - std::vector> splits; - for (auto& taskId : remoteTaskIds) { - splits.push_back(std::make_shared(taskId)); - } - return OperatorTestBase::assertQuery(plan, splits, duckDbSql, sortingKeys); - } - - void assertQueryOrdered( - const core::PlanNodePtr& plan, - const std::vector& remoteTaskIds, - const std::string& duckDbSql, - const std::vector& sortingKeys) { - assertQuery(plan, remoteTaskIds, duckDbSql, sortingKeys); - } - void setupSources(int filePathCount, int rowsPerVector) { filePaths_ = makeFilePaths(filePathCount); vectors_ = makeVectors(filePaths_.size(), rowsPerVector); @@ -170,7 +165,8 @@ class MultiFragmentTest const std::string& taskId, int32_t destination, const RowVectorPtr& data) { - auto page = toSerializedPage(data, GetParam(), bufferManager_, pool()); + auto page = + toSerializedPage(data, GetParam().serdeKind, bufferManager_, pool()); const auto pageSize = page->size(); ContinueFuture unused; @@ -224,7 +220,8 @@ TEST_P(MultiFragmentTest, aggregationSingleKey) { .tableScan(rowType_) .project({"c0 % 10 AS c0", "c1"}) .partialAggregation({"c0"}, {"sum(c1)"}) - .partitionedOutput({"c0"}, 3, /*outputLayout=*/{}, GetParam()) + .partitionedOutput( + {"c0"}, 3, /*outputLayout=*/{}, GetParam().serdeKind) .capturePlanNodeId(partitionNodeId) .planNode(); @@ -241,10 +238,10 @@ TEST_P(MultiFragmentTest, aggregationSingleKey) { for (int i = 0; i < 3; i++) { finalAggPlan = PlanBuilder() - .exchange(partialAggPlan->outputType(), GetParam()) + .exchange(partialAggPlan->outputType(), GetParam().serdeKind) .capturePlanNodeId(exchangeNodeId) .finalAggregation({"c0"}, {"sum(a0)"}, {{BIGINT()}}) - .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam()) + .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam().serdeKind) .planNode(); finalAggTaskIds.push_back(makeTaskId("final-agg", i)); @@ -255,11 +252,20 @@ TEST_P(MultiFragmentTest, aggregationSingleKey) { addRemoteSplits(task, {leafTaskId}); } - auto op = - PlanBuilder().exchange(finalAggPlan->outputType(), GetParam()).planNode(); + auto op = PlanBuilder() + .exchange(finalAggPlan->outputType(), GetParam().serdeKind) + .planNode(); - assertQuery( - op, finalAggTaskIds, "SELECT c0 % 10, sum(c1) FROM tmp GROUP BY 1"); + std::vector finalAggTaskSplits; + for (auto finalAggTaskId : finalAggTaskIds) { + finalAggTaskSplits.emplace_back(remoteSplit(finalAggTaskId)); + } + test::AssertQueryBuilder(op, duckDbQueryRunner_) + .splits(std::move(finalAggTaskSplits)) + .config( + core::QueryConfig::kShuffleCompressionKind, + compressionKindToString(GetParam().compressionKind)) + .assertResults("SELECT c0 % 10, sum(c1) FROM tmp GROUP BY 1"); for (auto& task : tasks) { ASSERT_TRUE(waitForTaskCompletion(task.get())) << task->taskId(); @@ -293,16 +299,17 @@ TEST_P(MultiFragmentTest, aggregationSingleKey) { pools.swap(childPools); } if (i == 0) { - // For leaf task, it has total 21 memory pools: task pool + 4 plan node - // pools (TableScan, FilterProject, PartialAggregation, PartitionedOutput) - // + 16 operator pools (4 drivers * number of plan nodes) + 4 connector - // pools for TableScan. + // For leaf task, it has total 21 memory pools: task pool + 4 plan + // node pools (TableScan, FilterProject, PartialAggregation, + // PartitionedOutput) + // + 16 operator pools (4 drivers * number of plan nodes) + 4 + // connector pools for TableScan. ASSERT_EQ(numPools, 25); } else { // For root task, it has total 8 memory pools: task pool + 3 plan node - // pools (Exchange, Aggregation, PartitionedOutput) and 4 leaf pools: 3 - // operator pools (1 driver * number of plan nodes) + 1 exchange client - // pool. + // pools (Exchange, Aggregation, PartitionedOutput) and 4 leaf pools: + // 3 operator pools (1 driver * number of plan nodes) + 1 exchange + // client pool. ASSERT_EQ(numPools, 8); } } @@ -311,8 +318,10 @@ TEST_P(MultiFragmentTest, aggregationSingleKey) { leafPlanStats.at(partitionNodeId) .customStats.at(Operator::kShuffleSerdeKind); ASSERT_EQ(serdeKindRuntimsStats.count, 4); - ASSERT_EQ(serdeKindRuntimsStats.min, static_cast(GetParam())); - ASSERT_EQ(serdeKindRuntimsStats.max, static_cast(GetParam())); + ASSERT_EQ( + serdeKindRuntimsStats.min, static_cast(GetParam().serdeKind)); + ASSERT_EQ( + serdeKindRuntimsStats.max, static_cast(GetParam().serdeKind)); for (const auto& finalTask : finalTasks) { auto finalPlanStats = toPlanStats(finalTask->taskStats()); @@ -320,8 +329,10 @@ TEST_P(MultiFragmentTest, aggregationSingleKey) { finalPlanStats.at(exchangeNodeId) .customStats.at(Operator::kShuffleSerdeKind); ASSERT_EQ(serdeKindRuntimsStats.count, 1); - ASSERT_EQ(serdeKindRuntimsStats.min, static_cast(GetParam())); - ASSERT_EQ(serdeKindRuntimsStats.max, static_cast(GetParam())); + ASSERT_EQ( + serdeKindRuntimsStats.min, static_cast(GetParam().serdeKind)); + ASSERT_EQ( + serdeKindRuntimsStats.max, static_cast(GetParam().serdeKind)); } } @@ -336,7 +347,8 @@ TEST_P(MultiFragmentTest, aggregationMultiKey) { .tableScan(rowType_) .project({"c0 % 10 AS c0", "c1 % 2 AS c1", "c2"}) .partialAggregation({"c0", "c1"}, {"sum(c2)"}) - .partitionedOutput({"c0", "c1"}, 3, /*outputLayout=*/{}, GetParam()) + .partitionedOutput( + {"c0", "c1"}, 3, /*outputLayout=*/{}, GetParam().serdeKind) .planNode(); auto leafTask = makeTask(leafTaskId, partialAggPlan, 0); @@ -350,9 +362,9 @@ TEST_P(MultiFragmentTest, aggregationMultiKey) { for (int i = 0; i < 3; i++) { finalAggPlan = PlanBuilder() - .exchange(partialAggPlan->outputType(), GetParam()) + .exchange(partialAggPlan->outputType(), GetParam().serdeKind) .finalAggregation({"c0", "c1"}, {"sum(a0)"}, {{BIGINT()}}) - .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam()) + .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam().serdeKind) .planNode(); finalAggTaskIds.push_back(makeTaskId("final-agg", i)); @@ -362,13 +374,20 @@ TEST_P(MultiFragmentTest, aggregationMultiKey) { addRemoteSplits(task, {leafTaskId}); } - auto op = - PlanBuilder().exchange(finalAggPlan->outputType(), GetParam()).planNode(); + auto op = PlanBuilder() + .exchange(finalAggPlan->outputType(), GetParam().serdeKind) + .planNode(); - assertQuery( - op, - finalAggTaskIds, - "SELECT c0 % 10, c1 % 2, sum(c2) FROM tmp GROUP BY 1, 2"); + std::vector finalAggTaskSplits; + for (auto finalAggTaskId : finalAggTaskIds) { + finalAggTaskSplits.emplace_back(remoteSplit(finalAggTaskId)); + } + test::AssertQueryBuilder(op, duckDbQueryRunner_) + .splits(std::move(finalAggTaskSplits)) + .config( + core::QueryConfig::kShuffleCompressionKind, + common::compressionKindToString(GetParam().compressionKind)) + .assertResults("SELECT c0 % 10, c1 % 2, sum(c2) FROM tmp GROUP BY 1, 2"); for (auto& task : tasks) { ASSERT_TRUE(waitForTaskCompletion(task.get())) << task->taskId(); @@ -385,17 +404,23 @@ TEST_P(MultiFragmentTest, distributedTableScan) { PlanBuilder() .tableScan(rowType_) .project({"c0 % 10", "c1 % 2", "c2"}) - .partitionedOutput({}, 1, {"c2", "p1", "p0"}, GetParam()) + .partitionedOutput({}, 1, {"c2", "p1", "p0"}, GetParam().serdeKind) .planNode(); auto leafTask = makeTask(leafTaskId, leafPlan, 0); leafTask->start(4); addHiveSplits(leafTask, filePaths_); - auto op = - PlanBuilder().exchange(leafPlan->outputType(), GetParam()).planNode(); + auto op = PlanBuilder() + .exchange(leafPlan->outputType(), GetParam().serdeKind) + .planNode(); auto task = - assertQuery(op, {leafTaskId}, "SELECT c2, c1 % 2, c0 % 10 FROM tmp"); + test::AssertQueryBuilder(op, duckDbQueryRunner_) + .split(remoteSplit(leafTaskId)) + .config( + core::QueryConfig::kShuffleCompressionKind, + common::compressionKindToString(GetParam().compressionKind)) + .assertResults("SELECT c2, c1 % 2, c0 % 10 FROM tmp"); verifyExchangeStats(task, 1, 1); @@ -444,7 +469,7 @@ TEST_P(MultiFragmentTest, abortMergeExchange) { .tableScan(rowType_) .orderBy({"c0"}, true) .planNode()}) - .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam()) + .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam().serdeKind) .capturePlanNodeId(partitionNodeId) .planNode(); @@ -465,9 +490,9 @@ TEST_P(MultiFragmentTest, abortMergeExchange) { core::PlanNodeId mergeExchangeId; auto finalSortPlan = PlanBuilder() - .mergeExchange(outputType, {"c0"}, GetParam()) + .mergeExchange(outputType, {"c0"}, GetParam().serdeKind) .capturePlanNodeId(mergeExchangeId) - .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam()) + .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam().serdeKind) .planNode(); auto mergeTask = makeTask(finalSortTaskId, finalSortPlan, 0); tasks.push_back(mergeTask); @@ -517,7 +542,7 @@ TEST_P(MultiFragmentTest, mergeExchange) { .tableScan(rowType_) .orderBy({"c0"}, true) .planNode()}) - .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam()) + .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam().serdeKind) .capturePlanNodeId(partitionNodeId) .planNode(); @@ -532,9 +557,9 @@ TEST_P(MultiFragmentTest, mergeExchange) { core::PlanNodeId mergeExchangeId; auto finalSortPlan = PlanBuilder() - .mergeExchange(outputType, {"c0"}, GetParam()) + .mergeExchange(outputType, {"c0"}, GetParam().serdeKind) .capturePlanNodeId(mergeExchangeId) - .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam()) + .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam().serdeKind) .planNode(); auto mergeTask = makeTask(finalSortTaskId, finalSortPlan, 0); @@ -542,9 +567,15 @@ TEST_P(MultiFragmentTest, mergeExchange) { mergeTask->start(1); addRemoteSplits(mergeTask, partialSortTaskIds); - auto op = PlanBuilder().exchange(outputType, GetParam()).planNode(); - assertQueryOrdered( - op, {finalSortTaskId}, "SELECT * FROM tmp ORDER BY 1 NULLS LAST", {0}); + auto op = PlanBuilder().exchange(outputType, GetParam().serdeKind).planNode(); + + test::AssertQueryBuilder(op, duckDbQueryRunner_) + .split(remoteSplit(finalSortTaskId)) + .config( + core::QueryConfig::kShuffleCompressionKind, + common::compressionKindToString(GetParam().compressionKind)) + .assertResults( + "SELECT * FROM tmp ORDER BY 1 NULLS LAST", std::vector{0}); for (auto& task : tasks) { ASSERT_TRUE(waitForTaskCompletion(task.get())) << task->taskId(); @@ -562,8 +593,10 @@ TEST_P(MultiFragmentTest, mergeExchange) { const auto serdeKindRuntimsStats = mergeExchangeStats.customStats.at(Operator::kShuffleSerdeKind); ASSERT_EQ(serdeKindRuntimsStats.count, 1); - ASSERT_EQ(serdeKindRuntimsStats.min, static_cast(GetParam())); - ASSERT_EQ(serdeKindRuntimsStats.max, static_cast(GetParam())); + ASSERT_EQ( + serdeKindRuntimsStats.min, static_cast(GetParam().serdeKind)); + ASSERT_EQ( + serdeKindRuntimsStats.max, static_cast(GetParam().serdeKind)); } // Test reordering and dropping columns in PartitionedOutput operator. @@ -573,16 +606,23 @@ TEST_P(MultiFragmentTest, partitionedOutput) { // Test dropping columns only { auto leafTaskId = makeTaskId("leaf", 0); - auto leafPlan = PlanBuilder() - .values(vectors_) - .partitionedOutput({}, 1, {"c0", "c1"}, GetParam()) - .planNode(); + auto leafPlan = + PlanBuilder() + .values(vectors_) + .partitionedOutput({}, 1, {"c0", "c1"}, GetParam().serdeKind) + .planNode(); auto leafTask = makeTask(leafTaskId, leafPlan, 0); leafTask->start(4); - auto op = - PlanBuilder().exchange(leafPlan->outputType(), GetParam()).planNode(); + auto op = PlanBuilder() + .exchange(leafPlan->outputType(), GetParam().serdeKind) + .planNode(); - assertQuery(op, {leafTaskId}, "SELECT c0, c1 FROM tmp"); + test::AssertQueryBuilder(op, duckDbQueryRunner_) + .split(remoteSplit(leafTaskId)) + .config( + core::QueryConfig::kShuffleCompressionKind, + common::compressionKindToString(GetParam().compressionKind)) + .assertResults("SELECT c0, c1 FROM tmp"); ASSERT_TRUE(waitForTaskCompletion(leafTask.get())) << leafTask->taskId(); } @@ -593,14 +633,20 @@ TEST_P(MultiFragmentTest, partitionedOutput) { auto leafPlan = PlanBuilder() .values(vectors_) - .partitionedOutput({}, 1, {"c3", "c0", "c2"}, GetParam()) + .partitionedOutput({}, 1, {"c3", "c0", "c2"}, GetParam().serdeKind) .planNode(); auto leafTask = makeTask(leafTaskId, leafPlan, 0); leafTask->start(4); - auto op = - PlanBuilder().exchange(leafPlan->outputType(), GetParam()).planNode(); + auto op = PlanBuilder() + .exchange(leafPlan->outputType(), GetParam().serdeKind) + .planNode(); - assertQuery(op, {leafTaskId}, "SELECT c3, c0, c2 FROM tmp"); + test::AssertQueryBuilder(op, duckDbQueryRunner_) + .split(remoteSplit(leafTaskId)) + .config( + core::QueryConfig::kShuffleCompressionKind, + common::compressionKindToString(GetParam().compressionKind)) + .assertResults("SELECT c3, c0, c2 FROM tmp"); ASSERT_TRUE(waitForTaskCompletion(leafTask.get())) << leafTask->taskId(); } @@ -615,15 +661,20 @@ TEST_P(MultiFragmentTest, partitionedOutput) { {}, 1, {"c0", "c1", "c2", "c3", "c4", "c3", "c2", "c1", "c0"}, - GetParam()) + GetParam().serdeKind) .planNode(); auto leafTask = makeTask(leafTaskId, leafPlan, 0); leafTask->start(4); - auto op = - PlanBuilder().exchange(leafPlan->outputType(), GetParam()).planNode(); + auto op = PlanBuilder() + .exchange(leafPlan->outputType(), GetParam().serdeKind) + .planNode(); - assertQuery( - op, {leafTaskId}, "SELECT c0, c1, c2, c3, c4, c3, c2, c1, c0 FROM tmp"); + test::AssertQueryBuilder(op, duckDbQueryRunner_) + .split(remoteSplit(leafTaskId)) + .config( + core::QueryConfig::kShuffleCompressionKind, + common::compressionKindToString(GetParam().compressionKind)) + .assertResults("SELECT c0, c1, c2, c3, c4, c3, c2, c1, c0 FROM tmp"); ASSERT_TRUE(waitForTaskCompletion(leafTask.get())) << leafTask->taskId(); } @@ -635,15 +686,16 @@ TEST_P(MultiFragmentTest, partitionedOutput) { auto leafPlan = PlanBuilder() .values(vectors_) - .partitionedOutput({"c5"}, kFanout, {"c2", "c0", "c3"}, GetParam()) + .partitionedOutput( + {"c5"}, kFanout, {"c2", "c0", "c3"}, GetParam().serdeKind) .planNode(); auto leafTask = makeTask(leafTaskId, leafPlan, 0); leafTask->start(4); auto intermediatePlan = PlanBuilder() - .exchange(leafPlan->outputType(), GetParam()) - .partitionedOutput({}, 1, {"c3", "c0", "c2"}, GetParam()) + .exchange(leafPlan->outputType(), GetParam().serdeKind) + .partitionedOutput({}, 1, {"c3", "c0", "c2"}, GetParam().serdeKind) .planNode(); std::vector intermediateTaskIds; for (auto i = 0; i < kFanout; ++i) { @@ -654,12 +706,22 @@ TEST_P(MultiFragmentTest, partitionedOutput) { addRemoteSplits(intermediateTask, {leafTaskId}); } - auto op = PlanBuilder() - .exchange(intermediatePlan->outputType(), GetParam()) - .planNode(); + auto op = + PlanBuilder() + .exchange(intermediatePlan->outputType(), GetParam().serdeKind) + .planNode(); + std::vector intermediateSplits; + for (auto intermediateTaskId : intermediateTaskIds) { + intermediateSplits.emplace_back(remoteSplit(intermediateTaskId)); + } auto task = - assertQuery(op, intermediateTaskIds, "SELECT c3, c0, c2 FROM tmp"); + test::AssertQueryBuilder(op, duckDbQueryRunner_) + .splits(std::move(intermediateSplits)) + .config( + core::QueryConfig::kShuffleCompressionKind, + common::compressionKindToString(GetParam().compressionKind)) + .assertResults("SELECT c3, c0, c2 FROM tmp"); verifyExchangeStats(task, kFanout, kFanout); @@ -669,30 +731,36 @@ TEST_P(MultiFragmentTest, partitionedOutput) { // Test dropping all columns. { auto leafTaskId = makeTaskId("leaf", 0); - auto leafPlan = PlanBuilder() - .values(vectors_) - .addNode( - [](std::string nodeId, - core::PlanNodePtr source) -> core::PlanNodePtr { - return core::PartitionedOutputNode::broadcast( - nodeId, 1, ROW({}), GetParam(), source); - }) - .planNode(); + auto leafPlan = + PlanBuilder() + .values(vectors_) + .addNode( + [](std::string nodeId, + core::PlanNodePtr source) -> core::PlanNodePtr { + return core::PartitionedOutputNode::broadcast( + nodeId, 1, ROW({}), GetParam().serdeKind, source); + }) + .planNode(); auto leafTask = makeTask(leafTaskId, leafPlan, 0); leafTask->start(4); leafTask->updateOutputBuffers(1, true); - auto op = - PlanBuilder().exchange(leafPlan->outputType(), GetParam()).planNode(); + auto op = PlanBuilder() + .exchange(leafPlan->outputType(), GetParam().serdeKind) + .planNode(); vector_size_t numRows = 0; for (const auto& vector : vectors_) { numRows += vector->size(); } - auto result = AssertQueryBuilder(op) - .split(remoteSplit(leafTaskId)) - .copyResults(pool()); + auto result = + AssertQueryBuilder(op) + .split(remoteSplit(leafTaskId)) + .config( + core::QueryConfig::kShuffleCompressionKind, + common::compressionKindToString(GetParam().compressionKind)) + .copyResults(pool()); ASSERT_EQ(*result->type(), *ROW({})); ASSERT_EQ(result->size(), numRows); @@ -702,10 +770,11 @@ TEST_P(MultiFragmentTest, partitionedOutput) { // Test asynchronously deleting task buffer (due to abort from downstream). { auto leafTaskId = makeTaskId("leaf", 0); - auto leafPlan = PlanBuilder() - .values(vectors_) - .partitionedOutput({}, 1, {"c0", "c1"}, GetParam()) - .planNode(); + auto leafPlan = + PlanBuilder() + .values(vectors_) + .partitionedOutput({}, 1, {"c0", "c1"}, GetParam().serdeKind) + .planNode(); auto leafTask = makeTask(leafTaskId, leafPlan, 0); leafTask->start(4); // Delete the results asynchronously to simulate abort from downstream. @@ -731,21 +800,23 @@ TEST_P(MultiFragmentTest, noHashPartitionSkew) { auto producerPlan = PlanBuilder() .values(vectors_) - .partitionedOutput({"c0"}, numPartitions, {"c0", "c1"}, GetParam()) + .partitionedOutput( + {"c0"}, numPartitions, {"c0", "c1"}, GetParam().serdeKind) .planNode(); auto producerTask = makeTask(producerTaskId, producerPlan, 0); producerTask->start(1); core::PlanNodeId partialAggregationNodeId; - auto consumerPlan = PlanBuilder() - .exchange(producerPlan->outputType(), GetParam()) - .localPartition({"c0"}) - .partialAggregation({"c0"}, {"count(1)"}) - .capturePlanNodeId(partialAggregationNodeId) - .localPartition({}) - .finalAggregation() - .singleAggregation({}, {"sum(1)"}) - .planNode(); + auto consumerPlan = + PlanBuilder() + .exchange(producerPlan->outputType(), GetParam().serdeKind) + .localPartition({"c0"}) + .partialAggregation({"c0"}, {"count(1)"}) + .capturePlanNodeId(partialAggregationNodeId) + .localPartition({}) + .finalAggregation() + .singleAggregation({}, {"sum(1)"}) + .planNode(); // This is computed based offline and shouldn't change across runs. const std::vector expectedValues{ @@ -756,11 +827,15 @@ TEST_P(MultiFragmentTest, noHashPartitionSkew) { const auto expectedResult = makeRowVector({makeFlatVector( std::vector{expectedValues[partition]})}); SCOPED_TRACE(fmt::format("partition {}", partition)); - auto consumerTask = test::AssertQueryBuilder(consumerPlan) - .split(remoteSplit(producerTaskId)) - .destination(partition) - .maxDrivers(numConsumerDriverThreads) - .assertResults(expectedResult); + auto consumerTask = + test::AssertQueryBuilder(consumerPlan) + .split(remoteSplit(producerTaskId)) + .destination(partition) + .config( + core::QueryConfig::kShuffleCompressionKind, + common::compressionKindToString(GetParam().compressionKind)) + .maxDrivers(numConsumerDriverThreads) + .assertResults(expectedResult); // Verifies that each partial aggregation operator process a number of // inputs. @@ -815,21 +890,22 @@ TEST_P(MultiFragmentTest, noHivePartitionSkew) { std::vector{0}, std::vector{}), {"c0", "c1"}, - GetParam()) + GetParam().serdeKind) .planNode(); auto producerTask = makeTask(producerTaskId, producerPlan, 0); producerTask->start(1); core::PlanNodeId partialAggregationNodeId; - auto consumerPlan = PlanBuilder() - .exchange(producerPlan->outputType(), GetParam()) - .localPartition(numBuckets, {0}, {}) - .partialAggregation({"c0"}, {"count(1)"}) - .capturePlanNodeId(partialAggregationNodeId) - .localPartition({}) - .finalAggregation() - .singleAggregation({}, {"sum(1)"}) - .planNode(); + auto consumerPlan = + PlanBuilder() + .exchange(producerPlan->outputType(), GetParam().serdeKind) + .localPartition(numBuckets, {0}, {}) + .partialAggregation({"c0"}, {"count(1)"}) + .capturePlanNodeId(partialAggregationNodeId) + .localPartition({}) + .finalAggregation() + .singleAggregation({}, {"sum(1)"}) + .planNode(); const int numConsumerDriverThreads{4}; const auto runConsumer = [&](int partition) { @@ -837,11 +913,15 @@ TEST_P(MultiFragmentTest, noHivePartitionSkew) { const auto expectedResult = makeRowVector({makeFlatVector(std::vector{1'250})}); SCOPED_TRACE(fmt::format("partition {}", partition)); - auto consumerTask = test::AssertQueryBuilder(consumerPlan) - .split(remoteSplit(producerTaskId)) - .destination(partition) - .maxDrivers(numConsumerDriverThreads) - .assertResults(expectedResult); + auto consumerTask = + test::AssertQueryBuilder(consumerPlan) + .split(remoteSplit(producerTaskId)) + .destination(partition) + .config( + core::QueryConfig::kShuffleCompressionKind, + common::compressionKindToString(GetParam().compressionKind)) + .maxDrivers(numConsumerDriverThreads) + .assertResults(expectedResult); // Verifies that each partial aggregation operator process a number of // inputs. @@ -880,18 +960,25 @@ TEST_P(MultiFragmentTest, partitionedOutputWithLargeInput) { // Single Partition { auto leafTaskId = makeTaskId("leaf", 0); - auto leafPlan = PlanBuilder() - .values(vectors_) - .partitionedOutput( - {}, 1, {"c0", "c1", "c2", "c3", "c4"}, GetParam()) - .planNode(); + auto leafPlan = + PlanBuilder() + .values(vectors_) + .partitionedOutput( + {}, 1, {"c0", "c1", "c2", "c3", "c4"}, GetParam().serdeKind) + .planNode(); auto leafTask = makeTask(leafTaskId, leafPlan, 0, nullptr, 4 << 20); leafTask->start(1); - auto op = - PlanBuilder().exchange(leafPlan->outputType(), GetParam()).planNode(); + auto op = PlanBuilder() + .exchange(leafPlan->outputType(), GetParam().serdeKind) + .planNode(); auto task = - assertQuery(op, {leafTaskId}, "SELECT c0, c1, c2, c3, c4 FROM tmp"); + test::AssertQueryBuilder(op, duckDbQueryRunner_) + .split(remoteSplit(leafTaskId)) + .config( + core::QueryConfig::kShuffleCompressionKind, + common::compressionKindToString(GetParam().compressionKind)) + .assertResults("SELECT c0, c1, c2, c3, c4 FROM tmp"); ASSERT_TRUE(waitForTaskCompletion(leafTask.get())) << leafTask->taskId() << "state: " << leafTask->state(); auto taskStats = toPlanStats(leafTask->taskStats()); @@ -911,16 +998,16 @@ TEST_P(MultiFragmentTest, partitionedOutputWithLargeInput) { false, std::make_shared(), {"c0", "c1", "c2", "c3", "c4"}, - GetParam()) + GetParam().serdeKind) .planNode(); auto leafTask = makeTask(leafTaskId, leafPlan, 0); leafTask->start(1); auto intermediatePlan = PlanBuilder() - .exchange(leafPlan->outputType(), GetParam()) + .exchange(leafPlan->outputType(), GetParam().serdeKind) .partitionedOutput( - {}, 1, {"c0", "c1", "c2", "c3", "c4"}, GetParam()) + {}, 1, {"c0", "c1", "c2", "c3", "c4"}, GetParam().serdeKind) .planNode(); std::vector intermediateTaskIds; for (auto i = 0; i < kFanout; ++i) { @@ -931,12 +1018,22 @@ TEST_P(MultiFragmentTest, partitionedOutputWithLargeInput) { addRemoteSplits(intermediateTask, {leafTaskId}); } - auto op = PlanBuilder() - .exchange(intermediatePlan->outputType(), GetParam()) - .planNode(); + auto op = + PlanBuilder() + .exchange(intermediatePlan->outputType(), GetParam().serdeKind) + .planNode(); - auto task = assertQuery( - op, intermediateTaskIds, "SELECT c0, c1, c2, c3, c4 FROM tmp"); + std::vector intermediateSplits; + for (auto intermediateTaskId : intermediateTaskIds) { + intermediateSplits.emplace_back(remoteSplit(intermediateTaskId)); + } + auto task = + test::AssertQueryBuilder(op, duckDbQueryRunner_) + .splits(std::move(intermediateSplits)) + .config( + core::QueryConfig::kShuffleCompressionKind, + common::compressionKindToString(GetParam().compressionKind)) + .assertResults("SELECT c0, c1, c2, c3, c4 FROM tmp"); ASSERT_TRUE(waitForTaskCompletion(leafTask.get())) << "state: " << leafTask->state(); auto taskStats = toPlanStats(leafTask->taskStats()); @@ -951,11 +1048,11 @@ TEST_P(MultiFragmentTest, broadcast) { // Make leaf task: Values -> Repartitioning (broadcast) std::vector> tasks; auto leafTaskId = makeTaskId("leaf", 0); - auto leafPlan = - PlanBuilder() - .values({data}) - .partitionedOutputBroadcast(/*outputLayout=*/{}, GetParam()) - .planNode(); + auto leafPlan = PlanBuilder() + .values({data}) + .partitionedOutputBroadcast( + /*outputLayout=*/{}, GetParam().serdeKind) + .planNode(); auto leafTask = makeTask(leafTaskId, leafPlan, 0); tasks.emplace_back(leafTask); leafTask->start(1); @@ -966,9 +1063,9 @@ TEST_P(MultiFragmentTest, broadcast) { for (int i = 0; i < 3; i++) { finalAggPlan = PlanBuilder() - .exchange(leafPlan->outputType(), GetParam()) + .exchange(leafPlan->outputType(), GetParam().serdeKind) .singleAggregation({}, {"count(1)"}) - .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam()) + .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam().serdeKind) .planNode(); finalAggTaskIds.push_back(makeTaskId("final-agg", i)); @@ -981,10 +1078,20 @@ TEST_P(MultiFragmentTest, broadcast) { leafTask->updateOutputBuffers(finalAggTaskIds.size(), true); // Collect results from multiple tasks. - auto op = - PlanBuilder().exchange(finalAggPlan->outputType(), GetParam()).planNode(); + auto op = PlanBuilder() + .exchange(finalAggPlan->outputType(), GetParam().serdeKind) + .planNode(); - assertQuery(op, finalAggTaskIds, "SELECT UNNEST(array[1000, 1000, 1000])"); + std::vector finalAggTaskSplits; + for (auto finalAggTaskId : finalAggTaskIds) { + finalAggTaskSplits.emplace_back(remoteSplit(finalAggTaskId)); + } + test::AssertQueryBuilder(op, duckDbQueryRunner_) + .splits(std::move(finalAggTaskSplits)) + .config( + core::QueryConfig::kShuffleCompressionKind, + common::compressionKindToString(GetParam().compressionKind)) + .assertResults("SELECT UNNEST(array[1000, 1000, 1000])"); for (auto& task : tasks) { ASSERT_TRUE(waitForTaskCompletion(task.get())) << task->taskId(); @@ -1021,7 +1128,7 @@ TEST_P(MultiFragmentTest, roundRobinPartition) { false, std::make_shared(), /*outputLayout=*/{}, - GetParam()) + GetParam().serdeKind) .planNode(); auto leafTask = makeTask(leafTaskId, leafPlan, 0); @@ -1041,10 +1148,11 @@ TEST_P(MultiFragmentTest, roundRobinPartition) { core::PlanNodePtr collectPlan; std::vector collectTaskIds; for (int i = 0; i < 2; i++) { - collectPlan = PlanBuilder() - .exchange(leafPlan->outputType(), GetParam()) - .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam()) - .planNode(); + collectPlan = + PlanBuilder() + .exchange(leafPlan->outputType(), GetParam().serdeKind) + .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam().serdeKind) + .planNode(); collectTaskIds.push_back(makeTaskId("collect", i)); auto task = makeTask(collectTaskIds.back(), collectPlan, i); @@ -1052,10 +1160,20 @@ TEST_P(MultiFragmentTest, roundRobinPartition) { } // Collect everything. - auto finalPlan = - PlanBuilder().exchange(leafPlan->outputType(), GetParam()).planNode(); + auto finalPlan = PlanBuilder() + .exchange(leafPlan->outputType(), GetParam().serdeKind) + .planNode(); - assertQuery(finalPlan, {collectTaskIds}, "SELECT * FROM tmp"); + std::vector collectTaskSplits; + for (auto collectTaskId : collectTaskIds) { + collectTaskSplits.emplace_back(remoteSplit(collectTaskId)); + } + test::AssertQueryBuilder(finalPlan, duckDbQueryRunner_) + .splits(std::move(collectTaskSplits)) + .config( + core::QueryConfig::kShuffleCompressionKind, + common::compressionKindToString(GetParam().compressionKind)) + .assertResults("SELECT * FROM tmp"); for (auto& task : tasks) { ASSERT_TRUE(waitForTaskCompletion(task.get())) << task->taskId(); @@ -1081,11 +1199,11 @@ TEST_P(MultiFragmentTest, constantKeys) { // Make leaf task: Values -> Repartitioning (3-way) auto leafTaskId = makeTaskId("leaf", 0); - auto leafPlan = - PlanBuilder() - .values({data}) - .partitionedOutput({"c0", "123"}, 3, true, {"c0"}, GetParam()) - .planNode(); + auto leafPlan = PlanBuilder() + .values({data}) + .partitionedOutput( + {"c0", "123"}, 3, true, {"c0"}, GetParam().serdeKind) + .planNode(); auto leafTask = makeTask(leafTaskId, leafPlan, 0); addTask(leafTask, {}); @@ -1095,10 +1213,10 @@ TEST_P(MultiFragmentTest, constantKeys) { for (int i = 0; i < 3; i++) { finalAggPlan = PlanBuilder() - .exchange(leafPlan->outputType(), GetParam()) + .exchange(leafPlan->outputType(), GetParam().serdeKind) .project({"c0 is null AS co_is_null"}) .partialAggregation({}, {"count_if(co_is_null)", "count(1)"}) - .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam()) + .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam().serdeKind) .planNode(); finalAggTaskIds.push_back(makeTaskId("final-agg", i)); @@ -1109,15 +1227,22 @@ TEST_P(MultiFragmentTest, constantKeys) { // Collect results and verify number of nulls is 3 times larger than in the // original data. auto op = PlanBuilder() - .exchange(finalAggPlan->outputType(), GetParam()) + .exchange(finalAggPlan->outputType(), GetParam().serdeKind) .finalAggregation( {}, {"sum(a0)", "sum(a1)"}, {{BIGINT()}, {BIGINT()}}) .planNode(); - assertQuery( - op, - finalAggTaskIds, - "SELECT 3 * ceil(1000.0 / 7) /* number of null rows */, 1000 + 2 * ceil(1000.0 / 7) /* total number of rows */"); + std::vector finalAggTaskSplits; + for (auto finalAggTaskId : finalAggTaskIds) { + finalAggTaskSplits.emplace_back(remoteSplit(finalAggTaskId)); + } + test::AssertQueryBuilder(op, duckDbQueryRunner_) + .splits(std::move(finalAggTaskSplits)) + .config( + core::QueryConfig::kShuffleCompressionKind, + common::compressionKindToString(GetParam().compressionKind)) + .assertResults( + "SELECT 3 * ceil(1000.0 / 7) /* number of null rows */, 1000 + 2 * ceil(1000.0 / 7) /* total number of rows */"); for (auto& task : tasks) { ASSERT_TRUE(waitForTaskCompletion(task.get())) << task->taskId(); @@ -1143,7 +1268,8 @@ TEST_P(MultiFragmentTest, replicateNullsAndAny) { auto leafPlan = PlanBuilder() .values({data}) - .partitionedOutput({"c0"}, 3, true, /*outputLayout=*/{}, GetParam()) + .partitionedOutput( + {"c0"}, 3, true, /*outputLayout=*/{}, GetParam().serdeKind) .planNode(); auto leafTask = makeTask(leafTaskId, leafPlan, 0); addTask(leafTask, {}); @@ -1154,10 +1280,10 @@ TEST_P(MultiFragmentTest, replicateNullsAndAny) { for (int i = 0; i < 3; i++) { finalAggPlan = PlanBuilder() - .exchange(leafPlan->outputType(), GetParam()) + .exchange(leafPlan->outputType(), GetParam().serdeKind) .project({"c0 is null AS co_is_null"}) .partialAggregation({}, {"count_if(co_is_null)", "count(1)"}) - .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam()) + .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam().serdeKind) .planNode(); finalAggTaskIds.push_back(makeTaskId("final-agg", i)); @@ -1168,15 +1294,22 @@ TEST_P(MultiFragmentTest, replicateNullsAndAny) { // Collect results and verify number of nulls is 3 times larger than in the // original data. auto op = PlanBuilder() - .exchange(finalAggPlan->outputType(), GetParam()) + .exchange(finalAggPlan->outputType(), GetParam().serdeKind) .finalAggregation( {}, {"sum(a0)", "sum(a1)"}, {{BIGINT()}, {BIGINT()}}) .planNode(); - assertQuery( - op, - finalAggTaskIds, - "SELECT 3 * ceil(1000.0 / 7) /* number of null rows */, 1000 + 2 * ceil(1000.0 / 7) /* total number of rows */"); + std::vector finalAggTaskSplits; + for (auto finalAggTaskId : finalAggTaskIds) { + finalAggTaskSplits.emplace_back(remoteSplit(finalAggTaskId)); + } + test::AssertQueryBuilder(op, duckDbQueryRunner_) + .splits(std::move(finalAggTaskSplits)) + .config( + core::QueryConfig::kShuffleCompressionKind, + common::compressionKindToString(GetParam().compressionKind)) + .assertResults( + "SELECT 3 * ceil(1000.0 / 7) /* number of null rows */, 1000 + 2 * ceil(1000.0 / 7) /* total number of rows */"); for (auto& task : tasks) { ASSERT_TRUE(waitForTaskCompletion(task.get())) << task->taskId(); @@ -1197,7 +1330,7 @@ TEST_P(MultiFragmentTest, limit) { PlanBuilder() .tableScan(std::dynamic_pointer_cast(data->type())) .limit(0, 10, true) - .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam()) + .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam().serdeKind) .planNode(); auto leafTask = makeTask(leafTaskId, leafPlan, 0); leafTask->start(1); @@ -1207,25 +1340,20 @@ TEST_P(MultiFragmentTest, limit) { // Make final task: Exchange -> FinalLimit(10). auto plan = PlanBuilder() - .exchange(leafPlan->outputType(), GetParam()) + .exchange(leafPlan->outputType(), GetParam().serdeKind) .localPartition(std::vector{}) .limit(0, 10, false) .planNode(); // Expect the task to produce results before receiving no-more-splits message. - bool splitAdded = false; - auto task = ::assertQuery( - plan, - [&](Task* task) { - if (splitAdded) { - return; - } - task->addSplit("0", remoteSplit(leafTaskId)); - splitAdded = true; - }, - "VALUES (null), (1), (2), (3), (4), (5), (6), (null), (8), (9)", - duckDbQueryRunner_); - + auto task = + test::AssertQueryBuilder(plan, duckDbQueryRunner_) + .split(remoteSplit(leafTaskId)) + .config( + core::QueryConfig::kShuffleCompressionKind, + common::compressionKindToString(GetParam().compressionKind)) + .assertResults( + "VALUES (null), (1), (2), (3), (4), (5), (6), (null), (8), (9)"); ASSERT_TRUE(waitForTaskCompletion(task.get())) << task->taskId(); ASSERT_TRUE(waitForTaskCompletion(leafTask.get())) << leafTask->taskId(); } @@ -1239,10 +1367,11 @@ TEST_P(MultiFragmentTest, mergeExchangeOverEmptySources) { for (int i = 0; i < 2; ++i) { auto taskId = makeTaskId("leaf-", i); leafTaskIds.push_back(taskId); - auto plan = PlanBuilder() - .values({data}) - .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam()) - .planNode(); + auto plan = + PlanBuilder() + .values({data}) + .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam().serdeKind) + .planNode(); auto task = makeTask(taskId, plan, tasks.size()); tasks.push_back(task); @@ -1251,11 +1380,20 @@ TEST_P(MultiFragmentTest, mergeExchangeOverEmptySources) { auto exchangeTaskId = makeTaskId("exchange-", 0); auto plan = PlanBuilder() - .mergeExchange(rowType_, {"c0"}, GetParam()) + .mergeExchange(rowType_, {"c0"}, GetParam().serdeKind) .singleAggregation({"c0"}, {"count(1)"}) .planNode(); - assertQuery(plan, leafTaskIds, ""); + std::vector leafTaskSplits; + for (auto leafTaskId : leafTaskIds) { + leafTaskSplits.emplace_back(remoteSplit(leafTaskId)); + } + test::AssertQueryBuilder(plan, duckDbQueryRunner_) + .splits(std::move(leafTaskSplits)) + .config( + core::QueryConfig::kShuffleCompressionKind, + common::compressionKindToString(GetParam().compressionKind)) + .assertResults(""); for (auto& task : tasks) { ASSERT_TRUE(waitForTaskCompletion(task.get())) << task->taskId(); @@ -1309,7 +1447,8 @@ TEST_P(MultiFragmentTest, earlyCompletion) { auto leafTaskId = makeTaskId("leaf", 0); auto plan = PlanBuilder() .values({data, data, data, data}) - .partitionedOutput({"c0"}, 2, /*outputLayout=*/{}, GetParam()) + .partitionedOutput( + {"c0"}, 2, /*outputLayout=*/{}, GetParam().serdeKind) .planNode(); auto task = makeTask(leafTaskId, plan, tasks.size()); @@ -1329,7 +1468,7 @@ TEST_P(MultiFragmentTest, earlyCompletion) { } auto joinPlan = makeJoinOverExchangePlan( - asRowType(data->type()), buildData, GetParam()); + asRowType(data->type()), buildData, GetParam().serdeKind); joinOutputType = joinPlan->outputType(); @@ -1345,10 +1484,18 @@ TEST_P(MultiFragmentTest, earlyCompletion) { // Create output task. auto outputPlan = - PlanBuilder().exchange(joinOutputType, GetParam()).planNode(); + PlanBuilder().exchange(joinOutputType, GetParam().serdeKind).planNode(); - assertQuery( - outputPlan, joinTaskIds, "SELECT UNNEST([3, 3, 3, 3, 4, 4, 4, 4])"); + std::vector joinTaskSplits; + for (auto joinTaskId : joinTaskIds) { + joinTaskSplits.emplace_back(remoteSplit(joinTaskId)); + } + test::AssertQueryBuilder(outputPlan, duckDbQueryRunner_) + .splits(std::move(joinTaskSplits)) + .config( + core::QueryConfig::kShuffleCompressionKind, + common::compressionKindToString(GetParam().compressionKind)) + .assertResults("SELECT UNNEST([3, 3, 3, 3, 4, 4, 4, 4])"); for (auto& task : tasks) { ASSERT_TRUE(waitForTaskCompletion(task.get())) << task->taskId(); @@ -1373,7 +1520,8 @@ TEST_P(MultiFragmentTest, earlyCompletionBroadcast) { auto leafTaskId = makeTaskId("leaf", 0); auto plan = PlanBuilder() .values({data, data, data, data}) - .partitionedOutputBroadcast(/*outputLayout=*/{}, GetParam()) + .partitionedOutputBroadcast( + /*outputLayout=*/{}, GetParam().serdeKind) .planNode(); auto leafTask = makeTask(leafTaskId, plan, tasks.size()); @@ -1393,7 +1541,7 @@ TEST_P(MultiFragmentTest, earlyCompletionBroadcast) { } auto joinPlan = makeJoinOverExchangePlan( - asRowType(data->type()), buildData, GetParam()); + asRowType(data->type()), buildData, GetParam().serdeKind); joinOutputType = joinPlan->outputType(); @@ -1412,9 +1560,18 @@ TEST_P(MultiFragmentTest, earlyCompletionBroadcast) { // Create output task. auto outputPlan = - PlanBuilder().exchange(joinOutputType, GetParam()).planNode(); + PlanBuilder().exchange(joinOutputType, GetParam().serdeKind).planNode(); - assertQuery(outputPlan, joinTaskIds, "SELECT UNNEST([10, 10, 10, 10])"); + std::vector joinTaskSplits; + for (auto joinTaskId : joinTaskIds) { + joinTaskSplits.emplace_back(remoteSplit(joinTaskId)); + } + test::AssertQueryBuilder(outputPlan, duckDbQueryRunner_) + .splits(std::move(joinTaskSplits)) + .config( + core::QueryConfig::kShuffleCompressionKind, + common::compressionKindToString(GetParam().compressionKind)) + .assertResults("SELECT UNNEST([10, 10, 10, 10])"); for (auto& task : tasks) { ASSERT_TRUE(waitForTaskCompletion(task.get())) << task->taskId(); @@ -1438,7 +1595,8 @@ TEST_P(MultiFragmentTest, earlyCompletionMerge) { auto leafTaskId = makeTaskId("leaf", 0); auto plan = PlanBuilder() .values({data, data, data, data}) - .partitionedOutput({"c0"}, 2, /*outputLayout=*/{}, GetParam()) + .partitionedOutput( + {"c0"}, 2, /*outputLayout=*/{}, GetParam().serdeKind) .planNode(); auto task = makeTask(leafTaskId, plan, tasks.size()); @@ -1460,14 +1618,15 @@ TEST_P(MultiFragmentTest, earlyCompletionMerge) { auto planNodeIdGenerator = std::make_shared(); auto joinPlan = PlanBuilder(planNodeIdGenerator) - .mergeExchange(asRowType(data->type()), {"c0"}, GetParam()) + .mergeExchange( + asRowType(data->type()), {"c0"}, GetParam().serdeKind) .hashJoin( {"c0"}, {"u_c0"}, PlanBuilder(planNodeIdGenerator).values({buildData}).planNode(), "", {"c0"}) - .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam()) + .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam().serdeKind) .planNode(); joinOutputType = joinPlan->outputType(); @@ -1484,10 +1643,18 @@ TEST_P(MultiFragmentTest, earlyCompletionMerge) { // Create output task. auto outputPlan = - PlanBuilder().exchange(joinOutputType, GetParam()).planNode(); + PlanBuilder().exchange(joinOutputType, GetParam().serdeKind).planNode(); - assertQuery( - outputPlan, joinTaskIds, "SELECT UNNEST([3, 3, 3, 3, 4, 4, 4, 4])"); + std::vector joinTaskSplits; + for (auto joinTaskId : joinTaskIds) { + joinTaskSplits.emplace_back(remoteSplit(joinTaskId)); + } + test::AssertQueryBuilder(outputPlan, duckDbQueryRunner_) + .splits(std::move(joinTaskSplits)) + .config( + core::QueryConfig::kShuffleCompressionKind, + common::compressionKindToString(GetParam().compressionKind)) + .assertResults("SELECT UNNEST([3, 3, 3, 3, 4, 4, 4, 4])"); for (auto& task : tasks) { ASSERT_TRUE(waitForTaskCompletion(task.get())) << task->taskId(); @@ -1587,11 +1754,12 @@ TEST_P(MultiFragmentTest, exchangeDestruction) { auto leafTaskId = makeTaskId("leaf", 0); core::PlanNodePtr leafPlan; - leafPlan = PlanBuilder() - .tableScan(rowType_) - .project({"c0 % 10 AS c0", "c1"}) - .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam()) - .planNode(); + leafPlan = + PlanBuilder() + .tableScan(rowType_) + .project({"c0 % 10 AS c0", "c1"}) + .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam().serdeKind) + .planNode(); auto leafTask = makeTask(leafTaskId, leafPlan, 0); leafTask->start(1); @@ -1599,11 +1767,11 @@ TEST_P(MultiFragmentTest, exchangeDestruction) { auto rootPlan = PlanBuilder() - .exchange(leafPlan->outputType(), GetParam()) + .exchange(leafPlan->outputType(), GetParam().serdeKind) .addNode([&leafPlan](std::string id, core::PlanNodePtr node) { return std::make_shared(id, std::move(node)); }) - .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam()) + .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam().serdeKind) .planNode(); auto rootTask = makeTask("root-task", rootPlan, 0); @@ -1623,17 +1791,18 @@ TEST_P(MultiFragmentTest, exchangeDestruction) { TEST_P(MultiFragmentTest, cancelledExchange) { // Create a source fragment borrow the output type from it. - auto planFragment = exec::test::PlanBuilder() - .tableScan(rowType_) - .filter("c0 % 5 = 1") - .partitionedOutput({}, 1, {"c0", "c1"}, GetParam()) - .planFragment(); + auto planFragment = + exec::test::PlanBuilder() + .tableScan(rowType_) + .filter("c0 % 5 = 1") + .partitionedOutput({}, 1, {"c0", "c1"}, GetParam().serdeKind) + .planFragment(); // Create task with exchange. auto planFragmentWithExchange = exec::test::PlanBuilder() - .exchange(planFragment.planNode->outputType(), GetParam()) - .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam()) + .exchange(planFragment.planNode->outputType(), GetParam().serdeKind) + .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam().serdeKind) .planFragment(); auto exchangeTask = makeTask("output.0.0.1", planFragmentWithExchange.planNode, 0); @@ -1733,22 +1902,26 @@ TEST_P(MultiFragmentTest, customPlanNodeWithExchangeClient) { Operator::registerOperator(std::make_unique()); auto leafTaskId = makeTaskId("leaf", 0); core::PlanNodeId partitionNodeId; - auto leafPlan = PlanBuilder() - .values(vectors_) - .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam()) - .capturePlanNodeId(partitionNodeId) - .planNode(); + auto leafPlan = + PlanBuilder() + .values(vectors_) + .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam().serdeKind) + .capturePlanNodeId(partitionNodeId) + .planNode(); auto leafTask = makeTask(leafTaskId, leafPlan, 0); leafTask->start(1); CursorParameters params; + params.queryConfigs.emplace( + core::QueryConfig::kShuffleCompressionKind, + common::compressionKindToString(GetParam().compressionKind)); core::PlanNodeId testNodeId; params.maxDrivers = 1; params.planNode = PlanBuilder() .addNode([&leafPlan](std::string id, core::PlanNodePtr /* input */) { return std::make_shared( - id, leafPlan->outputType(), GetParam()); + id, leafPlan->outputType(), GetParam().serdeKind); }) .capturePlanNodeId(testNodeId) .planNode(); @@ -1772,8 +1945,10 @@ TEST_P(MultiFragmentTest, customPlanNodeWithExchangeClient) { const auto serdeKindRuntimsStats = planStats.at(partitionNodeId).customStats.at(Operator::kShuffleSerdeKind); ASSERT_EQ(serdeKindRuntimsStats.count, 1); - ASSERT_EQ(serdeKindRuntimsStats.min, static_cast(GetParam())); - ASSERT_EQ(serdeKindRuntimsStats.max, static_cast(GetParam())); + ASSERT_EQ( + serdeKindRuntimsStats.min, static_cast(GetParam().serdeKind)); + ASSERT_EQ( + serdeKindRuntimsStats.max, static_cast(GetParam().serdeKind)); } // This test is to reproduce the race condition between task terminate and no @@ -1795,7 +1970,7 @@ DEBUG_ONLY_TEST_P( PlanBuilder() .tableScan(rowType_) .project({"c0 % 10 AS c0", "c1"}) - .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam()) + .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam().serdeKind) .planNode(); auto leafTask = makeTask(leafTaskId, leafPlan, 0); leafTask->start(1); @@ -1828,7 +2003,7 @@ DEBUG_ONLY_TEST_P( blockTerminate.await([&]() { return readyToTerminate.load(); }); }))); auto rootPlan = PlanBuilder() - .exchange(leafPlan->outputType(), GetParam()) + .exchange(leafPlan->outputType(), GetParam().serdeKind) .finalAggregation({"c0"}, {"count(c1)"}, {{BIGINT()}}) .planNode(); @@ -1854,10 +2029,11 @@ TEST_P(MultiFragmentTest, taskTerminateWithPendingOutputBuffers) { setupSources(8, 1000); auto taskId = makeTaskId("task", 0); core::PlanNodePtr leafPlan; - leafPlan = PlanBuilder() - .tableScan(rowType_) - .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam()) - .planNode(); + leafPlan = + PlanBuilder() + .tableScan(rowType_) + .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam().serdeKind) + .planNode(); auto task = makeTask(taskId, leafPlan, 0); task->start(1); @@ -1926,19 +2102,20 @@ DEBUG_ONLY_TEST_P( makeRowVector({"p_c0"}, {makeFlatVector({1, 2, 3})}); auto planNodeIdGenerator = std::make_shared(); core::PlanNodeId exchangeNodeId; - auto plan = PlanBuilder(planNodeIdGenerator) - .values({probeData}, true) - .hashJoin( - {"p_c0"}, - {"c0"}, - PlanBuilder(planNodeIdGenerator) - .exchange(rowType_, GetParam()) - .capturePlanNodeId(exchangeNodeId) - .planNode(), - "", - {"c0"}) - .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam()) - .planNode(); + auto plan = + PlanBuilder(planNodeIdGenerator) + .values({probeData}, true) + .hashJoin( + {"p_c0"}, + {"c0"}, + PlanBuilder(planNodeIdGenerator) + .exchange(rowType_, GetParam().serdeKind) + .capturePlanNodeId(exchangeNodeId) + .planNode(), + "", + {"c0"}) + .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam().serdeKind) + .planNode(); auto taskId = makeTaskId("final", 0); auto task = makeTask(taskId, plan, 0); task->start(2); @@ -2007,7 +2184,7 @@ DEBUG_ONLY_TEST_P(MultiFragmentTest, mergeWithEarlyTermination) { .tableScan(rowType_) .orderBy({"c0"}, true) .planNode()}) - .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam()) + .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam().serdeKind) .planNode(); auto partialSortTask = makeTask(sortTaskId, partialSortPlan, 1); @@ -2034,8 +2211,9 @@ DEBUG_ONLY_TEST_P(MultiFragmentTest, mergeWithEarlyTermination) { auto finalSortTaskId = makeTaskId("orderby", 1); auto finalSortPlan = PlanBuilder() - .mergeExchange(partialSortPlan->outputType(), {"c0"}, GetParam()) - .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam()) + .mergeExchange( + partialSortPlan->outputType(), {"c0"}, GetParam().serdeKind) + .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam().serdeKind) .planNode(); auto finalSortTask = makeTask(finalSortTaskId, finalSortPlan, 0); finalSortTask->start(1); @@ -2168,6 +2346,11 @@ class DataFetcher { /// of individual pages. PartitionedOutput operator is expected to limit page /// sizes to no more than 1MB give and take 30%. TEST_P(MultiFragmentTest, maxBytes) { + if (GetParam().compressionKind != common::CompressionKind_NONE) { + // NOTE: different compression generates different serialized byte size so + // only test with no-compression to ease testing.s + return; + } std::string s(25, 'x'); // Keep the row count under 7000 to avoid hitting the row limit in the // operator instead. @@ -2178,11 +2361,12 @@ TEST_P(MultiFragmentTest, maxBytes) { }); core::PlanNodeId outputNodeId; - auto plan = PlanBuilder() - .values({data}, false, 100) - .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam()) - .capturePlanNodeId(outputNodeId) - .planNode(); + auto plan = + PlanBuilder() + .values({data}, false, 100) + .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam().serdeKind) + .capturePlanNodeId(outputNodeId) + .planNode(); int32_t testIteration = 0; DataFetcher::Stats prevStats; @@ -2258,7 +2442,7 @@ DEBUG_ONLY_TEST_P(MultiFragmentTest, exchangeStatsOnFailure) { auto producerPlan = PlanBuilder() .values({data}, false, 30) - .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam()) + .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam().serdeKind) .planNode(); auto producerTaskId = makeTaskId("producer", 0); @@ -2266,8 +2450,9 @@ DEBUG_ONLY_TEST_P(MultiFragmentTest, exchangeStatsOnFailure) { producerTask->start(1); producerTask->updateOutputBuffers(1, true); - auto plan = - PlanBuilder().exchange(producerPlan->outputType(), GetParam()).planNode(); + auto plan = PlanBuilder() + .exchange(producerPlan->outputType(), GetParam().serdeKind) + .planNode(); auto task = makeTask("t", plan, 0, noopConsumer()); task->start(4); @@ -2301,7 +2486,7 @@ TEST_P(MultiFragmentTest, earlyTaskFailure) { .tableScan(rowType_) .orderBy({"c0"}, true) .planNode()}) - .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam()) + .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam().serdeKind) .planNode(); for (bool internalFailure : {false, true}) { SCOPED_TRACE(fmt::format("internalFailure: {}", internalFailure)); @@ -2312,10 +2497,11 @@ TEST_P(MultiFragmentTest, earlyTaskFailure) { auto outputType = partialSortPlan->outputType(); auto finalSortTaskId = makeTaskId("finalSortBy", 0); - auto finalSortPlan = PlanBuilder() - .mergeExchange(outputType, {"c0"}, GetParam()) - .partitionedOutput({}, 1) - .planNode(); + auto finalSortPlan = + PlanBuilder() + .mergeExchange(outputType, {"c0"}, GetParam().serdeKind) + .partitionedOutput({}, 1) + .planNode(); auto finalSortTask = makeTask(finalSortTaskId, finalSortPlan, 0); if (internalFailure) { @@ -2348,16 +2534,18 @@ TEST_P(MultiFragmentTest, mergeSmallBatchesInExchange) { auto data = makeRowVector({makeFlatVector({1, 2, 3})}); const int32_t numPartitions = 100; - auto producerPlan = - test::PlanBuilder() - .values({data}) - .partitionedOutput( - {"c0"}, numPartitions, /*outputLayout=*/{}, GetParam()) - .planNode(); + auto producerPlan = test::PlanBuilder() + .values({data}) + .partitionedOutput( + {"c0"}, + numPartitions, + /*outputLayout=*/{}, + GetParam().serdeKind) + .planNode(); const auto producerTaskId = "local://t1"; auto plan = test::PlanBuilder() - .exchange(asRowType(data->type()), GetParam()) + .exchange(asRowType(data->type()), GetParam().serdeKind) .planNode(); auto expected = makeRowVector({ @@ -2401,12 +2589,12 @@ TEST_P(MultiFragmentTest, mergeSmallBatchesInExchange) { ASSERT_EQ(numPages, stats.customStats.at("numReceivedPages").sum); }; - if (GetParam() == VectorSerde::Kind::kPresto) { + if (GetParam().serdeKind == VectorSerde::Kind::kPresto) { test(1, 1'000); test(1'000, 56); test(10'000, 6); test(100'000, 1); - } else if (GetParam() == VectorSerde::Kind::kCompactRow) { + } else if (GetParam().serdeKind == VectorSerde::Kind::kCompactRow) { test(1, 1'000); test(1'000, 38); test(10'000, 4); @@ -2420,24 +2608,17 @@ TEST_P(MultiFragmentTest, mergeSmallBatchesInExchange) { } TEST_P(MultiFragmentTest, compression) { - bufferManager_->testingSetCompression( - common::CompressionKind::CompressionKind_LZ4); - auto guard = folly::makeGuard([&]() { - bufferManager_->testingSetCompression( - common::CompressionKind::CompressionKind_NONE); - }); - constexpr int32_t kNumRepeats = 1'000'000; const auto data = makeRowVector({makeFlatVector({1, 2, 3})}); const auto producerPlan = test::PlanBuilder() .values({data}, false, kNumRepeats) - .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam()) + .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam().serdeKind) .planNode(); const auto plan = test::PlanBuilder() - .exchange(asRowType(data->type()), GetParam()) + .exchange(asRowType(data->type()), GetParam().serdeKind) .singleAggregation({}, {"sum(c0)"}) .planNode(); @@ -2451,30 +2632,60 @@ TEST_P(MultiFragmentTest, compression) { auto producerTask = makeTask(producerTaskId, producerPlan); producerTask->start(1); - auto consumerTask = test::AssertQueryBuilder(plan) - .split(remoteSplit(producerTaskId)) - .destination(0) - .assertResults(expected); + auto consumerTask = + test::AssertQueryBuilder(plan) + .split(remoteSplit(producerTaskId)) + .config( + core::QueryConfig::kShuffleCompressionKind, + common::compressionKindToString(GetParam().compressionKind)) + .destination(0) + .assertResults(expected); auto consumerTaskStats = exec::toPlanStats(consumerTask->taskStats()); const auto& consumerPlanStats = consumerTaskStats.at("0"); + ASSERT_EQ( + consumerPlanStats.customStats.at(Operator::kShuffleCompressionKind).min, + static_cast(GetParam().compressionKind)); + ASSERT_EQ( + consumerPlanStats.customStats.at(Operator::kShuffleCompressionKind).max, + static_cast(GetParam().compressionKind)); ASSERT_EQ(data->size() * kNumRepeats, consumerPlanStats.outputRows); auto producerTaskStats = exec::toPlanStats(producerTask->taskStats()); const auto& producerStats = producerTaskStats.at("1"); + ASSERT_EQ( + producerStats.customStats.at(Operator::kShuffleCompressionKind).min, + static_cast(GetParam().compressionKind)); + ASSERT_EQ( + producerStats.customStats.at(Operator::kShuffleCompressionKind).max, + static_cast(GetParam().compressionKind)); + if (GetParam().compressionKind == common::CompressionKind_NONE) { + ASSERT_EQ(producerStats.customStats.at("compressedBytes").sum, 0); + ASSERT_EQ(producerStats.customStats.at("compressionInputBytes").sum, 0); + ASSERT_EQ(producerStats.customStats.at("compressionSkippedBytes").sum, 0); + return; + } // The data is extremely compressible, 1, 2, 3 repeated 1000000 times. if (!expectSkipCompression) { - EXPECT_LT( + ASSERT_LT( producerStats.customStats.at("compressedBytes").sum, producerStats.customStats.at("compressionInputBytes").sum); - EXPECT_EQ(0, producerStats.customStats.at("compressionSkippedBytes").sum); + ASSERT_EQ(0, producerStats.customStats.at("compressionSkippedBytes").sum); } else { - EXPECT_LT(0, producerStats.customStats.at("compressionSkippedBytes").sum); + ASSERT_LT(0, producerStats.customStats.at("compressionSkippedBytes").sum); } }; - test("local://t1", 0.7, false); - test("local://t2", 0.0000001, true); + { + SCOPED_TRACE( + fmt::format("compression kind {}", GetParam().compressionKind)); + { + SCOPED_TRACE(fmt::format("minCompressionRatio 0.7")); + test("local://t1", 0.7, false); + } + SCOPED_TRACE(fmt::format("minCompressionRatio 0.0000001")); + { test("local://t2", 0.0000001, true); } + } } TEST_P(MultiFragmentTest, scaledTableScan) { @@ -2529,7 +2740,7 @@ TEST_P(MultiFragmentTest, scaledTableScan) { .capturePlanNodeId(scanNodeId) .partialAggregation( {"c5"}, {"max(c0)", "sum(c1)", "sum(c2)", "sum(c3)", "sum(c4)"}) - .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam()) + .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam().serdeKind) .planNode(); const auto leafTaskId = "local://leaf-0"; @@ -2540,12 +2751,12 @@ TEST_P(MultiFragmentTest, scaledTableScan) { const auto finalAggPlan = PlanBuilder() - .exchange(leafPlan->outputType(), GetParam()) + .exchange(leafPlan->outputType(), GetParam().serdeKind) .finalAggregation( {"c5"}, {"max(a0)", "sum(a1)", "sum(a2)", "sum(a3)", "sum(a4)"}, {{BIGINT()}, {INTEGER()}, {SMALLINT()}, {REAL()}, {DOUBLE()}}) - .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam()) + .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam().serdeKind) .planNode(); const auto finalAggTaskId = "local://final-agg-0"; @@ -2556,13 +2767,16 @@ TEST_P(MultiFragmentTest, scaledTableScan) { const auto resultPlan = PlanBuilder() - .exchange(finalAggPlan->outputType(), GetParam()) + .exchange(finalAggPlan->outputType(), GetParam().serdeKind) .planNode(); - assertQuery( - resultPlan, - {finalAggTaskId}, - "SELECT c5, max(c0), sum(c1), sum(c2), sum(c3), sum(c4) FROM tmp group by c5"); + test::AssertQueryBuilder(resultPlan, duckDbQueryRunner_) + .split(remoteSplit(finalAggTaskId)) + .config( + core::QueryConfig::kShuffleCompressionKind, + common::compressionKindToString(GetParam().compressionKind)) + .assertResults( + "SELECT c5, max(c0), sum(c1), sum(c2), sum(c3), sum(c4) FROM tmp group by c5"); ASSERT_TRUE(waitForTaskCompletion(leafTask.get())) << leafTask->taskId(); ASSERT_TRUE(waitForTaskCompletion(finalAggTask.get())) diff --git a/velox/serializers/RowSerializer.h b/velox/serializers/RowSerializer.h index ae062b658fd7..9c4f1a78abbb 100644 --- a/velox/serializers/RowSerializer.h +++ b/velox/serializers/RowSerializer.h @@ -24,13 +24,13 @@ namespace facebook::velox::serializer { using TRowSize = uint32_t; namespace detail { -struct RowHeader { +struct RowGroupHeader { int32_t uncompressedSize; int32_t compressedSize; bool compressed; - static RowHeader read(ByteInputStream* source) { - RowHeader header; + static RowGroupHeader read(ByteInputStream* source) { + RowGroupHeader header; header.uncompressedSize = source->read(); header.compressedSize = source->read(); header.compressed = source->read(); @@ -48,6 +48,19 @@ struct RowHeader { out->write(reinterpret_cast(&writeValue), sizeof(char)); } + void write(char* out) { + ::memcpy(out, reinterpret_cast(&uncompressedSize), sizeof(int32_t)); + ::memcpy( + out + sizeof(int32_t), + reinterpret_cast(&compressedSize), + sizeof(int32_t)); + const char writeValue = compressed ? 1 : 0; + ::memcpy( + out + sizeof(int32_t) * 2, + reinterpret_cast(&writeValue), + sizeof(char)); + } + static size_t size() { return sizeof(int32_t) * 2 + sizeof(char); } @@ -143,13 +156,13 @@ class RowSerializer : public IterativeVectorSerializer { size_t maxSerializedSize() const override { const auto size = uncompressedSize(); if (!needCompression()) { - return detail::RowHeader::size() + size; + return detail::RowGroupHeader::size() + size; } VELOX_CHECK_LE( size, codec_->maxUncompressedLength(), "UncompressedSize exceeds limit"); - return detail::RowHeader::size() + codec_->maxCompressedLength(size); + return detail::RowGroupHeader::size() + codec_->maxCompressedLength(size); } /// The serialization format is | uncompressedSize | compressedSize | @@ -178,7 +191,7 @@ class RowSerializer : public IterativeVectorSerializer { flushUncompressed(size, stream); } else { // Do the compression. - detail::RowHeader header = {size, compressedSize, true}; + detail::RowGroupHeader header = {size, compressedSize, true}; header.write(stream); for (auto range : *compressedBuffer) { stream->write( @@ -257,7 +270,7 @@ class RowSerializer : public IterativeVectorSerializer { } void flushUncompressed(int32_t size, OutputStream* stream) { - detail::RowHeader header = {size, size, false}; + detail::RowGroupHeader header = {size, size, false}; header.write(stream); for (const auto& buffer : buffers_) { stream->write(buffer->template asMutable(), buffer->size()); @@ -284,7 +297,7 @@ class RowDeserializer { : options->compressionKind; while (!source->atEnd()) { std::unique_ptr uncompressedBuf; - const auto header = detail::RowHeader::read(source); + const auto header = detail::RowGroupHeader::read(source); if (header.compressed) { VELOX_DCHECK_NE( compressionKind, common::CompressionKind::CompressionKind_NONE); diff --git a/velox/vector/VectorStream.h b/velox/vector/VectorStream.h index 6094ef7830b8..f9d9b6790c50 100644 --- a/velox/vector/VectorStream.h +++ b/velox/vector/VectorStream.h @@ -419,7 +419,7 @@ class VectorStreamGroup : public StreamArena { RowTypePtr type, VectorSerde* serde, RowVectorPtr* result, - const VectorSerde::Options* options = nullptr); + const VectorSerde::Options* options); void clear() override { StreamArena::clear(); diff --git a/velox/vector/tests/VectorTest.cpp b/velox/vector/tests/VectorTest.cpp index add1aa8b03c1..bba945f84a4d 100644 --- a/velox/vector/tests/VectorTest.cpp +++ b/velox/vector/tests/VectorTest.cpp @@ -878,7 +878,7 @@ class VectorTest : public testing::Test, public velox::test::VectorTestBase { RowVectorPtr resultRow; VectorStreamGroup::read( - evenInput.get(), pool(), sourceRowType, nullptr, &resultRow); + evenInput.get(), pool(), sourceRowType, nullptr, &resultRow, nullptr); VectorPtr result = resultRow->childAt(0); switch (source->encoding()) { case VectorEncoding::Simple::FLAT: @@ -908,7 +908,7 @@ class VectorTest : public testing::Test, public velox::test::VectorTestBase { auto oddInput = prepareInput(oddString); VectorStreamGroup::read( - oddInput.get(), pool(), sourceRowType, nullptr, &resultRow); + oddInput.get(), pool(), sourceRowType, nullptr, &resultRow, nullptr); result = resultRow->childAt(0); for (int32_t i = 0; i < oddIndices.size(); ++i) { EXPECT_TRUE(result->equalValueAt(source.get(), i, oddIndices[i].begin))