From aeda1bafbc4d3413570a0d0a8d4cb2bb8f0b6af5 Mon Sep 17 00:00:00 2001 From: Xiaoxuan Meng Date: Thu, 19 Dec 2024 10:43:46 -0800 Subject: [PATCH] Adds shuffle compression support Summary: Adds per query shuffle compression support. Currently we configure compression kind in partition output buffer manager which enforces all the queries use the same compression kind and assume all the workers having the same compression kind which is not flexible not align with Presto java as well. This change removes the compression kind from partition output buffer manager and instead configure it through query config. Also the shuffle operators report the compression kind. The followup is to integrate with Prestissimo work by setting LZ4 compression kind if the shuffle compression session property is set. Note Presto java doesn't allow to configure compression kind to use. Will test in batch shadow. Differential Revision: D67407045 --- velox/core/QueryConfig.h | 12 + velox/docs/configs.rst | 5 + velox/docs/monitoring/stats.rst | 5 + velox/exec/Exchange.cpp | 27 +- velox/exec/Exchange.h | 2 +- velox/exec/Merge.cpp | 30 +- velox/exec/Merge.h | 5 + velox/exec/MergeSource.cpp | 3 +- velox/exec/Operator.h | 5 + velox/exec/OutputBufferManager.h | 20 +- velox/exec/PartitionedOutput.cpp | 29 +- velox/exec/PartitionedOutput.h | 4 +- velox/exec/tests/MultiFragmentTest.cpp | 828 +++++++++++++++---------- velox/vector/VectorStream.h | 2 +- 14 files changed, 621 insertions(+), 356 deletions(-) diff --git a/velox/core/QueryConfig.h b/velox/core/QueryConfig.h index 90105af887e64..e80385ece7968 100644 --- a/velox/core/QueryConfig.h +++ b/velox/core/QueryConfig.h @@ -15,6 +15,7 @@ */ #pragma once +#include "velox/common/compression/Compression.h" #include "velox/common/config/Config.h" #include "velox/vector/TypeAliases.h" @@ -490,6 +491,11 @@ class QueryConfig { static constexpr const char* kTableScanScaleUpMemoryUsageRatio = "table_scan_scale_up_memory_usage_ratio"; + /// Specifies the shuffle compression kind which is defined by + /// CompressionKind. If it is CompressionKind_NONE, then no compression. + static constexpr const char* kShuffleCompressionKind = + "shuffle_compression_kind"; + bool selectiveNimbleReaderEnabled() const { return get(kSelectiveNimbleReaderEnabled, false); } @@ -906,6 +912,12 @@ class QueryConfig { return get(kTableScanScaleUpMemoryUsageRatio, 0.7); } + int32_t shuffleCompressionKind() const { + return get( + kShuffleCompressionKind, + static_cast(common::CompressionKind::CompressionKind_NONE)); + } + template T get(const std::string& key, const T& defaultValue) const { return config_->get(key, defaultValue); diff --git a/velox/docs/configs.rst b/velox/docs/configs.rst index 9fe2534b12620..1fae4ece8f078 100644 --- a/velox/docs/configs.rst +++ b/velox/docs/configs.rst @@ -148,6 +148,11 @@ Generic Configuration - integer - 16 - Byte length of the string prefix stored in the prefix-sort buffer. This doesn't include the null byte. + * - shuffle_compression_kind + - integer + - 0 + - Specifies the shuffle compression kind which is defined by CompressionKind. If it is set to + CompressionKind_NONE (0), then no compression. .. _expression-evaluation-conf: diff --git a/velox/docs/monitoring/stats.rst b/velox/docs/monitoring/stats.rst index a6feb1d32cb08..c6f9f5f91156f 100644 --- a/velox/docs/monitoring/stats.rst +++ b/velox/docs/monitoring/stats.rst @@ -205,6 +205,11 @@ These stats are reported by shuffle operators. - Indicates the vector serde kind used by an operator for shuffle with 1 for Presto, 2 for CompactRow, 3 for UnsafeRow. It is reported by Exchange, MergeExchange and PartitionedOutput operators for now. + * - shuffleCompressionKind + - + - Indicates the compression kind used by an operator for shuffle. The + reported value is set to the corresponding CompressionKind enum with 0 + (CompressionKind_NONE) as no compression. PrefixSort ---------- diff --git a/velox/exec/Exchange.cpp b/velox/exec/Exchange.cpp index 8dcf41ccc7a80..f49459ae09b66 100644 --- a/velox/exec/Exchange.cpp +++ b/velox/exec/Exchange.cpp @@ -21,13 +21,14 @@ namespace facebook::velox::exec { namespace { std::unique_ptr getVectorSerdeOptions( + const core::QueryConfig& queryConfig, VectorSerde::Kind kind) { std::unique_ptr options = kind == VectorSerde::Kind::kPresto ? std::make_unique() : std::make_unique(); - options->compressionKind = - OutputBufferManager::getInstance().lock()->compressionKind(); + options->compressionKind = static_cast( + queryConfig.shuffleCompressionKind()); return options; } } // namespace @@ -46,8 +47,10 @@ Exchange::Exchange( operatorType), preferredOutputBatchBytes_{ driverCtx->queryConfig().preferredOutputBatchBytes()}, - serdeKind_(exchangeNode->serdeKind()), - options_{getVectorSerdeOptions(serdeKind_)}, + serdeKind_{exchangeNode->serdeKind()}, + serdeOptions_{getVectorSerdeOptions( + operatorCtx_->driverCtx()->queryConfig(), + serdeKind_)}, processSplits_{operatorCtx_->driverCtx()->driverId == 0}, exchangeClient_{std::move(exchangeClient)} {} @@ -159,7 +162,7 @@ RowVectorPtr Exchange::getOutput() { outputType_, &result_, resultOffset, - options_.get()); + serdeOptions_.get()); resultOffset = result_->size(); } } @@ -186,7 +189,7 @@ RowVectorPtr Exchange::getOutput() { outputType_, &result_, resultOffset, - options_.get()); + serdeOptions_.get()); // We expect the row-wise deserialization to consume all the input into one // output vector. VELOX_CHECK(inputStream->atEnd()); @@ -212,9 +215,15 @@ void Exchange::close() { exchangeClient_->close(); } exchangeClient_ = nullptr; - stats_.wlock()->addRuntimeStat( - Operator::kShuffleSerdeKind, - RuntimeCounter(static_cast(serdeKind_))); + { + auto lockedStats = stats_.wlock(); + lockedStats->addRuntimeStat( + Operator::kShuffleSerdeKind, + RuntimeCounter(static_cast(serdeKind_))); + lockedStats->addRuntimeStat( + Operator::kShuffleCompressionKind, + RuntimeCounter(static_cast(serdeOptions_->compressionKind))); + } } void Exchange::recordExchangeClientStats() { diff --git a/velox/exec/Exchange.h b/velox/exec/Exchange.h index 45cdce8ceb591..3a1f32cac5339 100644 --- a/velox/exec/Exchange.h +++ b/velox/exec/Exchange.h @@ -83,7 +83,7 @@ class Exchange : public SourceOperator { const VectorSerde::Kind serdeKind_; - const std::unique_ptr options_; + const std::unique_ptr serdeOptions_; /// True if this operator is responsible for fetching splits from the Task /// and passing these to ExchangeClient. diff --git a/velox/exec/Merge.cpp b/velox/exec/Merge.cpp index f25f327bc9d70..ac2fcad222770 100644 --- a/velox/exec/Merge.cpp +++ b/velox/exec/Merge.cpp @@ -21,6 +21,19 @@ using facebook::velox::common::testutil::TestValue; namespace facebook::velox::exec { +namespace { +std::unique_ptr getVectorSerdeOptions( + const core::QueryConfig& queryConfig, + VectorSerde::Kind kind) { + std::unique_ptr options = + kind == VectorSerde::Kind::kPresto + ? std::make_unique() + : std::make_unique(); + options->compressionKind = static_cast( + queryConfig.shuffleCompressionKind()); + return options; +} +} // namespace Merge::Merge( int32_t operatorId, @@ -305,7 +318,10 @@ MergeExchange::MergeExchange( mergeExchangeNode->sortingOrders(), mergeExchangeNode->id(), "MergeExchange"), - serde_(getNamedVectorSerde(mergeExchangeNode->serdeKind())) {} + serde_(getNamedVectorSerde(mergeExchangeNode->serdeKind())), + serdeOptions_(getVectorSerdeOptions( + driverCtx->queryConfig(), + mergeExchangeNode->serdeKind())) {} BlockingReason MergeExchange::addMergeSources(ContinueFuture* future) { if (operatorCtx_->driverCtx()->driverId != 0) { @@ -370,8 +386,14 @@ void MergeExchange::close() { source->close(); } Operator::close(); - stats_.wlock()->addRuntimeStat( - Operator::kShuffleSerdeKind, - RuntimeCounter(static_cast(serde_->kind()))); + { + auto lockedStats = stats_.wlock(); + lockedStats->addRuntimeStat( + Operator::kShuffleSerdeKind, + RuntimeCounter(static_cast(serde_->kind()))); + lockedStats->addRuntimeStat( + Operator::kShuffleCompressionKind, + RuntimeCounter(static_cast(serdeOptions_->compressionKind))); + } } } // namespace facebook::velox::exec diff --git a/velox/exec/Merge.h b/velox/exec/Merge.h index e8aa3749dd4d7..867c07dee44a5 100644 --- a/velox/exec/Merge.h +++ b/velox/exec/Merge.h @@ -195,6 +195,10 @@ class MergeExchange : public Merge { return serde_; } + VectorSerde::Options* serdeOptions() { + return serdeOptions_.get(); + } + void close() override; protected: @@ -202,6 +206,7 @@ class MergeExchange : public Merge { private: VectorSerde* const serde_; + const std::unique_ptr serdeOptions_; bool noMoreSplits_ = false; // Task Ids from all the splits we took to process so far. std::vector remoteSourceTaskIds_; diff --git a/velox/exec/MergeSource.cpp b/velox/exec/MergeSource.cpp index a319f2535afa4..786bd40045cca 100644 --- a/velox/exec/MergeSource.cpp +++ b/velox/exec/MergeSource.cpp @@ -168,7 +168,8 @@ class MergeExchangeSource : public MergeSource { mergeExchange_->pool(), mergeExchange_->outputType(), mergeExchange_->serde(), - &data); + &data, + mergeExchange_->serdeOptions()); auto lockedStats = mergeExchange_->stats().wlock(); lockedStats->addInputVector(data->estimateFlatSize(), data->size()); diff --git a/velox/exec/Operator.h b/velox/exec/Operator.h index de06d78da65a3..4a1b7f3f43ab8 100644 --- a/velox/exec/Operator.h +++ b/velox/exec/Operator.h @@ -364,6 +364,11 @@ class Operator : public BaseRuntimeStatWriter { /// runtime stats value is the corresponding enum value. static inline const std::string kShuffleSerdeKind{"shuffleSerdeKind"}; + /// The compression kind used by an operator for shuffle. The recorded + /// runtime stats value is the corresponding enum value. + static inline const std::string kShuffleCompressionKind{ + "shuffleCompressionKind"}; + /// 'operatorId' is the initial index of the 'this' in the Driver's list of /// Operators. This is used as in index into OperatorStats arrays in the Task. /// 'planNodeId' is a query-level unique identifier of the PlanNode to which diff --git a/velox/exec/OutputBufferManager.h b/velox/exec/OutputBufferManager.h index 847532cd554e9..038d42cdce77c 100644 --- a/velox/exec/OutputBufferManager.h +++ b/velox/exec/OutputBufferManager.h @@ -23,15 +23,11 @@ class OutputBufferManager { public: /// Options for shuffle. This is initialized once and affects both /// PartitionedOutput and Exchange. This can be used for controlling - /// compression, protocol version and other matters where shuffle sides should + /// protocol version and other matters where shuffle sides should /// agree. - struct Options { - common::CompressionKind compressionKind{ - common::CompressionKind::CompressionKind_NONE}; - }; + struct Options {}; - OutputBufferManager(Options options) - : compressionKind_(options.compressionKind) {} + explicit OutputBufferManager(Options /*unused*/) {} void initializeTask( std::shared_ptr task, @@ -135,21 +131,11 @@ class OutputBufferManager { // Returns NULL if task not found. std::shared_ptr getBufferIfExists(const std::string& taskId); - void testingSetCompression(common::CompressionKind kind) { - *const_cast(&compressionKind_) = kind; - } - - common::CompressionKind compressionKind() const { - return compressionKind_; - } - private: // Retrieves the set of buffers for a query. // Throws an exception if buffer doesn't exist. std::shared_ptr getBuffer(const std::string& taskId); - const common::CompressionKind compressionKind_; - folly::Synchronized< std::unordered_map>, std::mutex> diff --git a/velox/exec/PartitionedOutput.cpp b/velox/exec/PartitionedOutput.cpp index 88663c9ad1e5b..d06b9e2f34432 100644 --- a/velox/exec/PartitionedOutput.cpp +++ b/velox/exec/PartitionedOutput.cpp @@ -21,13 +21,14 @@ namespace facebook::velox::exec { namespace { std::unique_ptr getVectorSerdeOptions( + const core::QueryConfig& queryConfig, VectorSerde::Kind kind) { std::unique_ptr options = kind == VectorSerde::Kind::kPresto ? std::make_unique() : std::make_unique(); - options->compressionKind = - OutputBufferManager::getInstance().lock()->compressionKind(); + options->compressionKind = static_cast( + queryConfig.shuffleCompressionKind()); options->minCompressionRatio = PartitionedOutput::minCompressionRatio(); return options; } @@ -38,14 +39,14 @@ Destination::Destination( const std::string& taskId, int destination, VectorSerde* serde, - VectorSerde::Options* options, + VectorSerde::Options* serdeOptions, memory::MemoryPool* pool, bool eagerFlush, std::function recordEnqueued) : taskId_(taskId), destination_(destination), serde_(serde), - options_(options), + serdeOptions_(serdeOptions), pool_(pool), eagerFlush_(eagerFlush), recordEnqueued_(std::move(recordEnqueued)) { @@ -89,7 +90,7 @@ BlockingReason Destination::advance( if (current_ == nullptr) { current_ = std::make_unique(pool_, serde_); const auto rowType = asRowType(output->type()); - current_->createStreamTree(rowType, rowsInCurrent_, options_); + current_->createStreamTree(rowType, rowsInCurrent_, serdeOptions_); } const auto rows = folly::Range(&rows_[firstRow], rowIdx_ - firstRow); @@ -200,7 +201,9 @@ PartitionedOutput::PartitionedOutput( .maxPartitionedOutputBufferSize()), eagerFlush_(eagerFlush), serde_(getNamedVectorSerde(planNode->serdeKind())), - options_(getVectorSerdeOptions(planNode->serdeKind())) { + serdeOptions_(getVectorSerdeOptions( + operatorCtx_->driverCtx()->queryConfig(), + planNode->serdeKind())) { if (!planNode->isPartitioned()) { VELOX_USER_CHECK_EQ(numDestinations_, 1); } @@ -256,7 +259,7 @@ void PartitionedOutput::initializeDestinations() { taskId, i, serde_, - options_.get(), + serdeOptions_.get(), pool(), eagerFlush_, [&](uint64_t bytes, uint64_t rows) { @@ -473,9 +476,15 @@ bool PartitionedOutput::isFinished() { void PartitionedOutput::close() { Operator::close(); - stats_.wlock()->addRuntimeStat( - Operator::kShuffleSerdeKind, - RuntimeCounter(static_cast(serde_->kind()))); + { + auto lockedStats = stats_.wlock(); + lockedStats->addRuntimeStat( + Operator::kShuffleSerdeKind, + RuntimeCounter(static_cast(serde_->kind()))); + lockedStats->addRuntimeStat( + Operator::kShuffleCompressionKind, + RuntimeCounter(static_cast(serdeOptions_->compressionKind))); + } destinations_.clear(); } diff --git a/velox/exec/PartitionedOutput.h b/velox/exec/PartitionedOutput.h index 699af6d655906..5a1c44cf0b196 100644 --- a/velox/exec/PartitionedOutput.h +++ b/velox/exec/PartitionedOutput.h @@ -105,7 +105,7 @@ class Destination { const std::string taskId_; const int destination_; VectorSerde* const serde_; - VectorSerde::Options* const options_; + VectorSerde::Options* const serdeOptions_; memory::MemoryPool* const pool_; const bool eagerFlush_; const std::function recordEnqueued_; @@ -220,7 +220,7 @@ class PartitionedOutput : public Operator { const int64_t maxBufferedBytes_; const bool eagerFlush_; VectorSerde* const serde_; - const std::unique_ptr options_; + const std::unique_ptr serdeOptions_; BlockingReason blockingReason_{BlockingReason::kNotBlocked}; ContinueFuture future_; diff --git a/velox/exec/tests/MultiFragmentTest.cpp b/velox/exec/tests/MultiFragmentTest.cpp index 1a8b3812fc5a7..2e275d3f896dd 100644 --- a/velox/exec/tests/MultiFragmentTest.cpp +++ b/velox/exec/tests/MultiFragmentTest.cpp @@ -37,16 +37,29 @@ using facebook::velox::test::BatchMaker; namespace facebook::velox::exec { namespace { -class MultiFragmentTest - : public HiveConnectorTestBase, - public testing::WithParamInterface { +struct TestParam { + VectorSerde::Kind serdeKind; + common::CompressionKind compressionKind; +}; + +class MultiFragmentTest : public HiveConnectorTestBase, + public testing::WithParamInterface { public: - static std::vector getTestParams() { - const std::vector kinds( - {VectorSerde::Kind::kPresto, - VectorSerde::Kind::kCompactRow, - VectorSerde::Kind::kUnsafeRow}); - return kinds; + static std::vector getTestParams() { + std::vector params; + params.emplace_back( + VectorSerde::Kind::kPresto, common::CompressionKind_NONE); + params.emplace_back( + VectorSerde::Kind::kCompactRow, common::CompressionKind_NONE); + params.emplace_back( + VectorSerde::Kind::kUnsafeRow, common::CompressionKind_NONE); + params.emplace_back( + VectorSerde::Kind::kPresto, common::CompressionKind_LZ4); + params.emplace_back( + VectorSerde::Kind::kCompactRow, common::CompressionKind_LZ4); + params.emplace_back( + VectorSerde::Kind::kUnsafeRow, common::CompressionKind_LZ4); + return params; } protected: @@ -54,6 +67,8 @@ class MultiFragmentTest HiveConnectorTestBase::SetUp(); exec::ExchangeSource::factories().clear(); exec::ExchangeSource::registerFactory(createLocalExchangeSource); + configSettings_[core::QueryConfig::kShuffleCompressionKind] = + std::to_string(GetParam().compressionKind); } void TearDown() override { @@ -137,26 +152,6 @@ class MultiFragmentTest task->noMoreSplits("0"); } - std::shared_ptr assertQuery( - const core::PlanNodePtr& plan, - const std::vector& remoteTaskIds, - const std::string& duckDbSql, - std::optional> sortingKeys = std::nullopt) { - std::vector> splits; - for (auto& taskId : remoteTaskIds) { - splits.push_back(std::make_shared(taskId)); - } - return OperatorTestBase::assertQuery(plan, splits, duckDbSql, sortingKeys); - } - - void assertQueryOrdered( - const core::PlanNodePtr& plan, - const std::vector& remoteTaskIds, - const std::string& duckDbSql, - const std::vector& sortingKeys) { - assertQuery(plan, remoteTaskIds, duckDbSql, sortingKeys); - } - void setupSources(int filePathCount, int rowsPerVector) { filePaths_ = makeFilePaths(filePathCount); vectors_ = makeVectors(filePaths_.size(), rowsPerVector); @@ -170,7 +165,8 @@ class MultiFragmentTest const std::string& taskId, int32_t destination, const RowVectorPtr& data) { - auto page = toSerializedPage(data, GetParam(), bufferManager_, pool()); + auto page = + toSerializedPage(data, GetParam().serdeKind, bufferManager_, pool()); const auto pageSize = page->size(); ContinueFuture unused; @@ -224,7 +220,8 @@ TEST_P(MultiFragmentTest, aggregationSingleKey) { .tableScan(rowType_) .project({"c0 % 10 AS c0", "c1"}) .partialAggregation({"c0"}, {"sum(c1)"}) - .partitionedOutput({"c0"}, 3, /*outputLayout=*/{}, GetParam()) + .partitionedOutput( + {"c0"}, 3, /*outputLayout=*/{}, GetParam().serdeKind) .capturePlanNodeId(partitionNodeId) .planNode(); @@ -241,10 +238,10 @@ TEST_P(MultiFragmentTest, aggregationSingleKey) { for (int i = 0; i < 3; i++) { finalAggPlan = PlanBuilder() - .exchange(partialAggPlan->outputType(), GetParam()) + .exchange(partialAggPlan->outputType(), GetParam().serdeKind) .capturePlanNodeId(exchangeNodeId) .finalAggregation({"c0"}, {"sum(a0)"}, {{BIGINT()}}) - .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam()) + .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam().serdeKind) .planNode(); finalAggTaskIds.push_back(makeTaskId("final-agg", i)); @@ -255,11 +252,20 @@ TEST_P(MultiFragmentTest, aggregationSingleKey) { addRemoteSplits(task, {leafTaskId}); } - auto op = - PlanBuilder().exchange(finalAggPlan->outputType(), GetParam()).planNode(); + auto op = PlanBuilder() + .exchange(finalAggPlan->outputType(), GetParam().serdeKind) + .planNode(); - assertQuery( - op, finalAggTaskIds, "SELECT c0 % 10, sum(c1) FROM tmp GROUP BY 1"); + std::vector finalAggTaskSplits; + for (auto finalAggTaskId : finalAggTaskIds) { + finalAggTaskSplits.emplace_back(remoteSplit(finalAggTaskId)); + } + test::AssertQueryBuilder(op, duckDbQueryRunner_) + .splits(std::move(finalAggTaskSplits)) + .config( + core::QueryConfig::kShuffleCompressionKind, + std::to_string(GetParam().compressionKind)) + .assertResults("SELECT c0 % 10, sum(c1) FROM tmp GROUP BY 1"); for (auto& task : tasks) { ASSERT_TRUE(waitForTaskCompletion(task.get())) << task->taskId(); @@ -293,16 +299,17 @@ TEST_P(MultiFragmentTest, aggregationSingleKey) { pools.swap(childPools); } if (i == 0) { - // For leaf task, it has total 21 memory pools: task pool + 4 plan node - // pools (TableScan, FilterProject, PartialAggregation, PartitionedOutput) - // + 16 operator pools (4 drivers * number of plan nodes) + 4 connector - // pools for TableScan. + // For leaf task, it has total 21 memory pools: task pool + 4 plan + // node pools (TableScan, FilterProject, PartialAggregation, + // PartitionedOutput) + // + 16 operator pools (4 drivers * number of plan nodes) + 4 + // connector pools for TableScan. ASSERT_EQ(numPools, 25); } else { // For root task, it has total 8 memory pools: task pool + 3 plan node - // pools (Exchange, Aggregation, PartitionedOutput) and 4 leaf pools: 3 - // operator pools (1 driver * number of plan nodes) + 1 exchange client - // pool. + // pools (Exchange, Aggregation, PartitionedOutput) and 4 leaf pools: + // 3 operator pools (1 driver * number of plan nodes) + 1 exchange + // client pool. ASSERT_EQ(numPools, 8); } } @@ -311,8 +318,10 @@ TEST_P(MultiFragmentTest, aggregationSingleKey) { leafPlanStats.at(partitionNodeId) .customStats.at(Operator::kShuffleSerdeKind); ASSERT_EQ(serdeKindRuntimsStats.count, 4); - ASSERT_EQ(serdeKindRuntimsStats.min, static_cast(GetParam())); - ASSERT_EQ(serdeKindRuntimsStats.max, static_cast(GetParam())); + ASSERT_EQ( + serdeKindRuntimsStats.min, static_cast(GetParam().serdeKind)); + ASSERT_EQ( + serdeKindRuntimsStats.max, static_cast(GetParam().serdeKind)); for (const auto& finalTask : finalTasks) { auto finalPlanStats = toPlanStats(finalTask->taskStats()); @@ -320,8 +329,10 @@ TEST_P(MultiFragmentTest, aggregationSingleKey) { finalPlanStats.at(exchangeNodeId) .customStats.at(Operator::kShuffleSerdeKind); ASSERT_EQ(serdeKindRuntimsStats.count, 1); - ASSERT_EQ(serdeKindRuntimsStats.min, static_cast(GetParam())); - ASSERT_EQ(serdeKindRuntimsStats.max, static_cast(GetParam())); + ASSERT_EQ( + serdeKindRuntimsStats.min, static_cast(GetParam().serdeKind)); + ASSERT_EQ( + serdeKindRuntimsStats.max, static_cast(GetParam().serdeKind)); } } @@ -336,7 +347,8 @@ TEST_P(MultiFragmentTest, aggregationMultiKey) { .tableScan(rowType_) .project({"c0 % 10 AS c0", "c1 % 2 AS c1", "c2"}) .partialAggregation({"c0", "c1"}, {"sum(c2)"}) - .partitionedOutput({"c0", "c1"}, 3, /*outputLayout=*/{}, GetParam()) + .partitionedOutput( + {"c0", "c1"}, 3, /*outputLayout=*/{}, GetParam().serdeKind) .planNode(); auto leafTask = makeTask(leafTaskId, partialAggPlan, 0); @@ -350,9 +362,9 @@ TEST_P(MultiFragmentTest, aggregationMultiKey) { for (int i = 0; i < 3; i++) { finalAggPlan = PlanBuilder() - .exchange(partialAggPlan->outputType(), GetParam()) + .exchange(partialAggPlan->outputType(), GetParam().serdeKind) .finalAggregation({"c0", "c1"}, {"sum(a0)"}, {{BIGINT()}}) - .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam()) + .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam().serdeKind) .planNode(); finalAggTaskIds.push_back(makeTaskId("final-agg", i)); @@ -362,13 +374,20 @@ TEST_P(MultiFragmentTest, aggregationMultiKey) { addRemoteSplits(task, {leafTaskId}); } - auto op = - PlanBuilder().exchange(finalAggPlan->outputType(), GetParam()).planNode(); + auto op = PlanBuilder() + .exchange(finalAggPlan->outputType(), GetParam().serdeKind) + .planNode(); - assertQuery( - op, - finalAggTaskIds, - "SELECT c0 % 10, c1 % 2, sum(c2) FROM tmp GROUP BY 1, 2"); + std::vector finalAggTaskSplits; + for (auto finalAggTaskId : finalAggTaskIds) { + finalAggTaskSplits.emplace_back(remoteSplit(finalAggTaskId)); + } + test::AssertQueryBuilder(op, duckDbQueryRunner_) + .splits(std::move(finalAggTaskSplits)) + .config( + core::QueryConfig::kShuffleCompressionKind, + std::to_string(GetParam().compressionKind)) + .assertResults("SELECT c0 % 10, c1 % 2, sum(c2) FROM tmp GROUP BY 1, 2"); for (auto& task : tasks) { ASSERT_TRUE(waitForTaskCompletion(task.get())) << task->taskId(); @@ -385,17 +404,22 @@ TEST_P(MultiFragmentTest, distributedTableScan) { PlanBuilder() .tableScan(rowType_) .project({"c0 % 10", "c1 % 2", "c2"}) - .partitionedOutput({}, 1, {"c2", "p1", "p0"}, GetParam()) + .partitionedOutput({}, 1, {"c2", "p1", "p0"}, GetParam().serdeKind) .planNode(); auto leafTask = makeTask(leafTaskId, leafPlan, 0); leafTask->start(4); addHiveSplits(leafTask, filePaths_); - auto op = - PlanBuilder().exchange(leafPlan->outputType(), GetParam()).planNode(); - auto task = - assertQuery(op, {leafTaskId}, "SELECT c2, c1 % 2, c0 % 10 FROM tmp"); + auto op = PlanBuilder() + .exchange(leafPlan->outputType(), GetParam().serdeKind) + .planNode(); + auto task = test::AssertQueryBuilder(op, duckDbQueryRunner_) + .split(remoteSplit(leafTaskId)) + .config( + core::QueryConfig::kShuffleCompressionKind, + std::to_string(GetParam().compressionKind)) + .assertResults("SELECT c2, c1 % 2, c0 % 10 FROM tmp"); verifyExchangeStats(task, 1, 1); @@ -444,7 +468,7 @@ TEST_P(MultiFragmentTest, abortMergeExchange) { .tableScan(rowType_) .orderBy({"c0"}, true) .planNode()}) - .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam()) + .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam().serdeKind) .capturePlanNodeId(partitionNodeId) .planNode(); @@ -465,9 +489,9 @@ TEST_P(MultiFragmentTest, abortMergeExchange) { core::PlanNodeId mergeExchangeId; auto finalSortPlan = PlanBuilder() - .mergeExchange(outputType, {"c0"}, GetParam()) + .mergeExchange(outputType, {"c0"}, GetParam().serdeKind) .capturePlanNodeId(mergeExchangeId) - .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam()) + .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam().serdeKind) .planNode(); auto mergeTask = makeTask(finalSortTaskId, finalSortPlan, 0); tasks.push_back(mergeTask); @@ -517,7 +541,7 @@ TEST_P(MultiFragmentTest, mergeExchange) { .tableScan(rowType_) .orderBy({"c0"}, true) .planNode()}) - .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam()) + .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam().serdeKind) .capturePlanNodeId(partitionNodeId) .planNode(); @@ -532,9 +556,9 @@ TEST_P(MultiFragmentTest, mergeExchange) { core::PlanNodeId mergeExchangeId; auto finalSortPlan = PlanBuilder() - .mergeExchange(outputType, {"c0"}, GetParam()) + .mergeExchange(outputType, {"c0"}, GetParam().serdeKind) .capturePlanNodeId(mergeExchangeId) - .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam()) + .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam().serdeKind) .planNode(); auto mergeTask = makeTask(finalSortTaskId, finalSortPlan, 0); @@ -542,9 +566,15 @@ TEST_P(MultiFragmentTest, mergeExchange) { mergeTask->start(1); addRemoteSplits(mergeTask, partialSortTaskIds); - auto op = PlanBuilder().exchange(outputType, GetParam()).planNode(); - assertQueryOrdered( - op, {finalSortTaskId}, "SELECT * FROM tmp ORDER BY 1 NULLS LAST", {0}); + auto op = PlanBuilder().exchange(outputType, GetParam().serdeKind).planNode(); + + test::AssertQueryBuilder(op, duckDbQueryRunner_) + .split(remoteSplit(finalSortTaskId)) + .config( + core::QueryConfig::kShuffleCompressionKind, + std::to_string(GetParam().compressionKind)) + .assertResults( + "SELECT * FROM tmp ORDER BY 1 NULLS LAST", std::vector{0}); for (auto& task : tasks) { ASSERT_TRUE(waitForTaskCompletion(task.get())) << task->taskId(); @@ -562,8 +592,10 @@ TEST_P(MultiFragmentTest, mergeExchange) { const auto serdeKindRuntimsStats = mergeExchangeStats.customStats.at(Operator::kShuffleSerdeKind); ASSERT_EQ(serdeKindRuntimsStats.count, 1); - ASSERT_EQ(serdeKindRuntimsStats.min, static_cast(GetParam())); - ASSERT_EQ(serdeKindRuntimsStats.max, static_cast(GetParam())); + ASSERT_EQ( + serdeKindRuntimsStats.min, static_cast(GetParam().serdeKind)); + ASSERT_EQ( + serdeKindRuntimsStats.max, static_cast(GetParam().serdeKind)); } // Test reordering and dropping columns in PartitionedOutput operator. @@ -573,16 +605,23 @@ TEST_P(MultiFragmentTest, partitionedOutput) { // Test dropping columns only { auto leafTaskId = makeTaskId("leaf", 0); - auto leafPlan = PlanBuilder() - .values(vectors_) - .partitionedOutput({}, 1, {"c0", "c1"}, GetParam()) - .planNode(); + auto leafPlan = + PlanBuilder() + .values(vectors_) + .partitionedOutput({}, 1, {"c0", "c1"}, GetParam().serdeKind) + .planNode(); auto leafTask = makeTask(leafTaskId, leafPlan, 0); leafTask->start(4); - auto op = - PlanBuilder().exchange(leafPlan->outputType(), GetParam()).planNode(); + auto op = PlanBuilder() + .exchange(leafPlan->outputType(), GetParam().serdeKind) + .planNode(); - assertQuery(op, {leafTaskId}, "SELECT c0, c1 FROM tmp"); + test::AssertQueryBuilder(op, duckDbQueryRunner_) + .split(remoteSplit(leafTaskId)) + .config( + core::QueryConfig::kShuffleCompressionKind, + std::to_string(GetParam().compressionKind)) + .assertResults("SELECT c0, c1 FROM tmp"); ASSERT_TRUE(waitForTaskCompletion(leafTask.get())) << leafTask->taskId(); } @@ -593,14 +632,20 @@ TEST_P(MultiFragmentTest, partitionedOutput) { auto leafPlan = PlanBuilder() .values(vectors_) - .partitionedOutput({}, 1, {"c3", "c0", "c2"}, GetParam()) + .partitionedOutput({}, 1, {"c3", "c0", "c2"}, GetParam().serdeKind) .planNode(); auto leafTask = makeTask(leafTaskId, leafPlan, 0); leafTask->start(4); - auto op = - PlanBuilder().exchange(leafPlan->outputType(), GetParam()).planNode(); + auto op = PlanBuilder() + .exchange(leafPlan->outputType(), GetParam().serdeKind) + .planNode(); - assertQuery(op, {leafTaskId}, "SELECT c3, c0, c2 FROM tmp"); + test::AssertQueryBuilder(op, duckDbQueryRunner_) + .split(remoteSplit(leafTaskId)) + .config( + core::QueryConfig::kShuffleCompressionKind, + std::to_string(GetParam().compressionKind)) + .assertResults("SELECT c3, c0, c2 FROM tmp"); ASSERT_TRUE(waitForTaskCompletion(leafTask.get())) << leafTask->taskId(); } @@ -615,15 +660,20 @@ TEST_P(MultiFragmentTest, partitionedOutput) { {}, 1, {"c0", "c1", "c2", "c3", "c4", "c3", "c2", "c1", "c0"}, - GetParam()) + GetParam().serdeKind) .planNode(); auto leafTask = makeTask(leafTaskId, leafPlan, 0); leafTask->start(4); - auto op = - PlanBuilder().exchange(leafPlan->outputType(), GetParam()).planNode(); + auto op = PlanBuilder() + .exchange(leafPlan->outputType(), GetParam().serdeKind) + .planNode(); - assertQuery( - op, {leafTaskId}, "SELECT c0, c1, c2, c3, c4, c3, c2, c1, c0 FROM tmp"); + test::AssertQueryBuilder(op, duckDbQueryRunner_) + .split(remoteSplit(leafTaskId)) + .config( + core::QueryConfig::kShuffleCompressionKind, + std::to_string(GetParam().compressionKind)) + .assertResults("SELECT c0, c1, c2, c3, c4, c3, c2, c1, c0 FROM tmp"); ASSERT_TRUE(waitForTaskCompletion(leafTask.get())) << leafTask->taskId(); } @@ -635,15 +685,16 @@ TEST_P(MultiFragmentTest, partitionedOutput) { auto leafPlan = PlanBuilder() .values(vectors_) - .partitionedOutput({"c5"}, kFanout, {"c2", "c0", "c3"}, GetParam()) + .partitionedOutput( + {"c5"}, kFanout, {"c2", "c0", "c3"}, GetParam().serdeKind) .planNode(); auto leafTask = makeTask(leafTaskId, leafPlan, 0); leafTask->start(4); auto intermediatePlan = PlanBuilder() - .exchange(leafPlan->outputType(), GetParam()) - .partitionedOutput({}, 1, {"c3", "c0", "c2"}, GetParam()) + .exchange(leafPlan->outputType(), GetParam().serdeKind) + .partitionedOutput({}, 1, {"c3", "c0", "c2"}, GetParam().serdeKind) .planNode(); std::vector intermediateTaskIds; for (auto i = 0; i < kFanout; ++i) { @@ -654,12 +705,21 @@ TEST_P(MultiFragmentTest, partitionedOutput) { addRemoteSplits(intermediateTask, {leafTaskId}); } - auto op = PlanBuilder() - .exchange(intermediatePlan->outputType(), GetParam()) - .planNode(); + auto op = + PlanBuilder() + .exchange(intermediatePlan->outputType(), GetParam().serdeKind) + .planNode(); - auto task = - assertQuery(op, intermediateTaskIds, "SELECT c3, c0, c2 FROM tmp"); + std::vector intermediateSplits; + for (auto intermediateTaskId : intermediateTaskIds) { + intermediateSplits.emplace_back(remoteSplit(intermediateTaskId)); + } + auto task = test::AssertQueryBuilder(op, duckDbQueryRunner_) + .splits(std::move(intermediateSplits)) + .config( + core::QueryConfig::kShuffleCompressionKind, + std::to_string(GetParam().compressionKind)) + .assertResults("SELECT c3, c0, c2 FROM tmp"); verifyExchangeStats(task, kFanout, kFanout); @@ -669,21 +729,23 @@ TEST_P(MultiFragmentTest, partitionedOutput) { // Test dropping all columns. { auto leafTaskId = makeTaskId("leaf", 0); - auto leafPlan = PlanBuilder() - .values(vectors_) - .addNode( - [](std::string nodeId, - core::PlanNodePtr source) -> core::PlanNodePtr { - return core::PartitionedOutputNode::broadcast( - nodeId, 1, ROW({}), GetParam(), source); - }) - .planNode(); + auto leafPlan = + PlanBuilder() + .values(vectors_) + .addNode( + [](std::string nodeId, + core::PlanNodePtr source) -> core::PlanNodePtr { + return core::PartitionedOutputNode::broadcast( + nodeId, 1, ROW({}), GetParam().serdeKind, source); + }) + .planNode(); auto leafTask = makeTask(leafTaskId, leafPlan, 0); leafTask->start(4); leafTask->updateOutputBuffers(1, true); - auto op = - PlanBuilder().exchange(leafPlan->outputType(), GetParam()).planNode(); + auto op = PlanBuilder() + .exchange(leafPlan->outputType(), GetParam().serdeKind) + .planNode(); vector_size_t numRows = 0; for (const auto& vector : vectors_) { @@ -692,6 +754,9 @@ TEST_P(MultiFragmentTest, partitionedOutput) { auto result = AssertQueryBuilder(op) .split(remoteSplit(leafTaskId)) + .config( + core::QueryConfig::kShuffleCompressionKind, + std::to_string(GetParam().compressionKind)) .copyResults(pool()); ASSERT_EQ(*result->type(), *ROW({})); ASSERT_EQ(result->size(), numRows); @@ -702,10 +767,11 @@ TEST_P(MultiFragmentTest, partitionedOutput) { // Test asynchronously deleting task buffer (due to abort from downstream). { auto leafTaskId = makeTaskId("leaf", 0); - auto leafPlan = PlanBuilder() - .values(vectors_) - .partitionedOutput({}, 1, {"c0", "c1"}, GetParam()) - .planNode(); + auto leafPlan = + PlanBuilder() + .values(vectors_) + .partitionedOutput({}, 1, {"c0", "c1"}, GetParam().serdeKind) + .planNode(); auto leafTask = makeTask(leafTaskId, leafPlan, 0); leafTask->start(4); // Delete the results asynchronously to simulate abort from downstream. @@ -731,21 +797,23 @@ TEST_P(MultiFragmentTest, noHashPartitionSkew) { auto producerPlan = PlanBuilder() .values(vectors_) - .partitionedOutput({"c0"}, numPartitions, {"c0", "c1"}, GetParam()) + .partitionedOutput( + {"c0"}, numPartitions, {"c0", "c1"}, GetParam().serdeKind) .planNode(); auto producerTask = makeTask(producerTaskId, producerPlan, 0); producerTask->start(1); core::PlanNodeId partialAggregationNodeId; - auto consumerPlan = PlanBuilder() - .exchange(producerPlan->outputType(), GetParam()) - .localPartition({"c0"}) - .partialAggregation({"c0"}, {"count(1)"}) - .capturePlanNodeId(partialAggregationNodeId) - .localPartition({}) - .finalAggregation() - .singleAggregation({}, {"sum(1)"}) - .planNode(); + auto consumerPlan = + PlanBuilder() + .exchange(producerPlan->outputType(), GetParam().serdeKind) + .localPartition({"c0"}) + .partialAggregation({"c0"}, {"count(1)"}) + .capturePlanNodeId(partialAggregationNodeId) + .localPartition({}) + .finalAggregation() + .singleAggregation({}, {"sum(1)"}) + .planNode(); // This is computed based offline and shouldn't change across runs. const std::vector expectedValues{ @@ -759,6 +827,9 @@ TEST_P(MultiFragmentTest, noHashPartitionSkew) { auto consumerTask = test::AssertQueryBuilder(consumerPlan) .split(remoteSplit(producerTaskId)) .destination(partition) + .config( + core::QueryConfig::kShuffleCompressionKind, + std::to_string(GetParam().compressionKind)) .maxDrivers(numConsumerDriverThreads) .assertResults(expectedResult); @@ -815,21 +886,22 @@ TEST_P(MultiFragmentTest, noHivePartitionSkew) { std::vector{0}, std::vector{}), {"c0", "c1"}, - GetParam()) + GetParam().serdeKind) .planNode(); auto producerTask = makeTask(producerTaskId, producerPlan, 0); producerTask->start(1); core::PlanNodeId partialAggregationNodeId; - auto consumerPlan = PlanBuilder() - .exchange(producerPlan->outputType(), GetParam()) - .localPartition(numBuckets, {0}, {}) - .partialAggregation({"c0"}, {"count(1)"}) - .capturePlanNodeId(partialAggregationNodeId) - .localPartition({}) - .finalAggregation() - .singleAggregation({}, {"sum(1)"}) - .planNode(); + auto consumerPlan = + PlanBuilder() + .exchange(producerPlan->outputType(), GetParam().serdeKind) + .localPartition(numBuckets, {0}, {}) + .partialAggregation({"c0"}, {"count(1)"}) + .capturePlanNodeId(partialAggregationNodeId) + .localPartition({}) + .finalAggregation() + .singleAggregation({}, {"sum(1)"}) + .planNode(); const int numConsumerDriverThreads{4}; const auto runConsumer = [&](int partition) { @@ -840,6 +912,9 @@ TEST_P(MultiFragmentTest, noHivePartitionSkew) { auto consumerTask = test::AssertQueryBuilder(consumerPlan) .split(remoteSplit(producerTaskId)) .destination(partition) + .config( + core::QueryConfig::kShuffleCompressionKind, + std::to_string(GetParam().compressionKind)) .maxDrivers(numConsumerDriverThreads) .assertResults(expectedResult); @@ -880,18 +955,24 @@ TEST_P(MultiFragmentTest, partitionedOutputWithLargeInput) { // Single Partition { auto leafTaskId = makeTaskId("leaf", 0); - auto leafPlan = PlanBuilder() - .values(vectors_) - .partitionedOutput( - {}, 1, {"c0", "c1", "c2", "c3", "c4"}, GetParam()) - .planNode(); + auto leafPlan = + PlanBuilder() + .values(vectors_) + .partitionedOutput( + {}, 1, {"c0", "c1", "c2", "c3", "c4"}, GetParam().serdeKind) + .planNode(); auto leafTask = makeTask(leafTaskId, leafPlan, 0, nullptr, 4 << 20); leafTask->start(1); - auto op = - PlanBuilder().exchange(leafPlan->outputType(), GetParam()).planNode(); + auto op = PlanBuilder() + .exchange(leafPlan->outputType(), GetParam().serdeKind) + .planNode(); - auto task = - assertQuery(op, {leafTaskId}, "SELECT c0, c1, c2, c3, c4 FROM tmp"); + auto task = test::AssertQueryBuilder(op, duckDbQueryRunner_) + .split(remoteSplit(leafTaskId)) + .config( + core::QueryConfig::kShuffleCompressionKind, + std::to_string(GetParam().compressionKind)) + .assertResults("SELECT c0, c1, c2, c3, c4 FROM tmp"); ASSERT_TRUE(waitForTaskCompletion(leafTask.get())) << leafTask->taskId() << "state: " << leafTask->state(); auto taskStats = toPlanStats(leafTask->taskStats()); @@ -911,16 +992,16 @@ TEST_P(MultiFragmentTest, partitionedOutputWithLargeInput) { false, std::make_shared(), {"c0", "c1", "c2", "c3", "c4"}, - GetParam()) + GetParam().serdeKind) .planNode(); auto leafTask = makeTask(leafTaskId, leafPlan, 0); leafTask->start(1); auto intermediatePlan = PlanBuilder() - .exchange(leafPlan->outputType(), GetParam()) + .exchange(leafPlan->outputType(), GetParam().serdeKind) .partitionedOutput( - {}, 1, {"c0", "c1", "c2", "c3", "c4"}, GetParam()) + {}, 1, {"c0", "c1", "c2", "c3", "c4"}, GetParam().serdeKind) .planNode(); std::vector intermediateTaskIds; for (auto i = 0; i < kFanout; ++i) { @@ -931,12 +1012,21 @@ TEST_P(MultiFragmentTest, partitionedOutputWithLargeInput) { addRemoteSplits(intermediateTask, {leafTaskId}); } - auto op = PlanBuilder() - .exchange(intermediatePlan->outputType(), GetParam()) - .planNode(); + auto op = + PlanBuilder() + .exchange(intermediatePlan->outputType(), GetParam().serdeKind) + .planNode(); - auto task = assertQuery( - op, intermediateTaskIds, "SELECT c0, c1, c2, c3, c4 FROM tmp"); + std::vector intermediateSplits; + for (auto intermediateTaskId : intermediateTaskIds) { + intermediateSplits.emplace_back(remoteSplit(intermediateTaskId)); + } + auto task = test::AssertQueryBuilder(op, duckDbQueryRunner_) + .splits(std::move(intermediateSplits)) + .config( + core::QueryConfig::kShuffleCompressionKind, + std::to_string(GetParam().compressionKind)) + .assertResults("SELECT c0, c1, c2, c3, c4 FROM tmp"); ASSERT_TRUE(waitForTaskCompletion(leafTask.get())) << "state: " << leafTask->state(); auto taskStats = toPlanStats(leafTask->taskStats()); @@ -951,11 +1041,11 @@ TEST_P(MultiFragmentTest, broadcast) { // Make leaf task: Values -> Repartitioning (broadcast) std::vector> tasks; auto leafTaskId = makeTaskId("leaf", 0); - auto leafPlan = - PlanBuilder() - .values({data}) - .partitionedOutputBroadcast(/*outputLayout=*/{}, GetParam()) - .planNode(); + auto leafPlan = PlanBuilder() + .values({data}) + .partitionedOutputBroadcast( + /*outputLayout=*/{}, GetParam().serdeKind) + .planNode(); auto leafTask = makeTask(leafTaskId, leafPlan, 0); tasks.emplace_back(leafTask); leafTask->start(1); @@ -966,9 +1056,9 @@ TEST_P(MultiFragmentTest, broadcast) { for (int i = 0; i < 3; i++) { finalAggPlan = PlanBuilder() - .exchange(leafPlan->outputType(), GetParam()) + .exchange(leafPlan->outputType(), GetParam().serdeKind) .singleAggregation({}, {"count(1)"}) - .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam()) + .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam().serdeKind) .planNode(); finalAggTaskIds.push_back(makeTaskId("final-agg", i)); @@ -981,10 +1071,20 @@ TEST_P(MultiFragmentTest, broadcast) { leafTask->updateOutputBuffers(finalAggTaskIds.size(), true); // Collect results from multiple tasks. - auto op = - PlanBuilder().exchange(finalAggPlan->outputType(), GetParam()).planNode(); + auto op = PlanBuilder() + .exchange(finalAggPlan->outputType(), GetParam().serdeKind) + .planNode(); - assertQuery(op, finalAggTaskIds, "SELECT UNNEST(array[1000, 1000, 1000])"); + std::vector finalAggTaskSplits; + for (auto finalAggTaskId : finalAggTaskIds) { + finalAggTaskSplits.emplace_back(remoteSplit(finalAggTaskId)); + } + test::AssertQueryBuilder(op, duckDbQueryRunner_) + .splits(std::move(finalAggTaskSplits)) + .config( + core::QueryConfig::kShuffleCompressionKind, + std::to_string(GetParam().compressionKind)) + .assertResults("SELECT UNNEST(array[1000, 1000, 1000])"); for (auto& task : tasks) { ASSERT_TRUE(waitForTaskCompletion(task.get())) << task->taskId(); @@ -1021,7 +1121,7 @@ TEST_P(MultiFragmentTest, roundRobinPartition) { false, std::make_shared(), /*outputLayout=*/{}, - GetParam()) + GetParam().serdeKind) .planNode(); auto leafTask = makeTask(leafTaskId, leafPlan, 0); @@ -1041,10 +1141,11 @@ TEST_P(MultiFragmentTest, roundRobinPartition) { core::PlanNodePtr collectPlan; std::vector collectTaskIds; for (int i = 0; i < 2; i++) { - collectPlan = PlanBuilder() - .exchange(leafPlan->outputType(), GetParam()) - .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam()) - .planNode(); + collectPlan = + PlanBuilder() + .exchange(leafPlan->outputType(), GetParam().serdeKind) + .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam().serdeKind) + .planNode(); collectTaskIds.push_back(makeTaskId("collect", i)); auto task = makeTask(collectTaskIds.back(), collectPlan, i); @@ -1052,10 +1153,20 @@ TEST_P(MultiFragmentTest, roundRobinPartition) { } // Collect everything. - auto finalPlan = - PlanBuilder().exchange(leafPlan->outputType(), GetParam()).planNode(); + auto finalPlan = PlanBuilder() + .exchange(leafPlan->outputType(), GetParam().serdeKind) + .planNode(); - assertQuery(finalPlan, {collectTaskIds}, "SELECT * FROM tmp"); + std::vector collectTaskSplits; + for (auto collectTaskId : collectTaskIds) { + collectTaskSplits.emplace_back(remoteSplit(collectTaskId)); + } + test::AssertQueryBuilder(finalPlan, duckDbQueryRunner_) + .splits(std::move(collectTaskSplits)) + .config( + core::QueryConfig::kShuffleCompressionKind, + std::to_string(GetParam().compressionKind)) + .assertResults("SELECT * FROM tmp"); for (auto& task : tasks) { ASSERT_TRUE(waitForTaskCompletion(task.get())) << task->taskId(); @@ -1081,11 +1192,11 @@ TEST_P(MultiFragmentTest, constantKeys) { // Make leaf task: Values -> Repartitioning (3-way) auto leafTaskId = makeTaskId("leaf", 0); - auto leafPlan = - PlanBuilder() - .values({data}) - .partitionedOutput({"c0", "123"}, 3, true, {"c0"}, GetParam()) - .planNode(); + auto leafPlan = PlanBuilder() + .values({data}) + .partitionedOutput( + {"c0", "123"}, 3, true, {"c0"}, GetParam().serdeKind) + .planNode(); auto leafTask = makeTask(leafTaskId, leafPlan, 0); addTask(leafTask, {}); @@ -1095,10 +1206,10 @@ TEST_P(MultiFragmentTest, constantKeys) { for (int i = 0; i < 3; i++) { finalAggPlan = PlanBuilder() - .exchange(leafPlan->outputType(), GetParam()) + .exchange(leafPlan->outputType(), GetParam().serdeKind) .project({"c0 is null AS co_is_null"}) .partialAggregation({}, {"count_if(co_is_null)", "count(1)"}) - .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam()) + .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam().serdeKind) .planNode(); finalAggTaskIds.push_back(makeTaskId("final-agg", i)); @@ -1109,15 +1220,22 @@ TEST_P(MultiFragmentTest, constantKeys) { // Collect results and verify number of nulls is 3 times larger than in the // original data. auto op = PlanBuilder() - .exchange(finalAggPlan->outputType(), GetParam()) + .exchange(finalAggPlan->outputType(), GetParam().serdeKind) .finalAggregation( {}, {"sum(a0)", "sum(a1)"}, {{BIGINT()}, {BIGINT()}}) .planNode(); - assertQuery( - op, - finalAggTaskIds, - "SELECT 3 * ceil(1000.0 / 7) /* number of null rows */, 1000 + 2 * ceil(1000.0 / 7) /* total number of rows */"); + std::vector finalAggTaskSplits; + for (auto finalAggTaskId : finalAggTaskIds) { + finalAggTaskSplits.emplace_back(remoteSplit(finalAggTaskId)); + } + test::AssertQueryBuilder(op, duckDbQueryRunner_) + .splits(std::move(finalAggTaskSplits)) + .config( + core::QueryConfig::kShuffleCompressionKind, + std::to_string(GetParam().compressionKind)) + .assertResults( + "SELECT 3 * ceil(1000.0 / 7) /* number of null rows */, 1000 + 2 * ceil(1000.0 / 7) /* total number of rows */"); for (auto& task : tasks) { ASSERT_TRUE(waitForTaskCompletion(task.get())) << task->taskId(); @@ -1143,7 +1261,8 @@ TEST_P(MultiFragmentTest, replicateNullsAndAny) { auto leafPlan = PlanBuilder() .values({data}) - .partitionedOutput({"c0"}, 3, true, /*outputLayout=*/{}, GetParam()) + .partitionedOutput( + {"c0"}, 3, true, /*outputLayout=*/{}, GetParam().serdeKind) .planNode(); auto leafTask = makeTask(leafTaskId, leafPlan, 0); addTask(leafTask, {}); @@ -1154,10 +1273,10 @@ TEST_P(MultiFragmentTest, replicateNullsAndAny) { for (int i = 0; i < 3; i++) { finalAggPlan = PlanBuilder() - .exchange(leafPlan->outputType(), GetParam()) + .exchange(leafPlan->outputType(), GetParam().serdeKind) .project({"c0 is null AS co_is_null"}) .partialAggregation({}, {"count_if(co_is_null)", "count(1)"}) - .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam()) + .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam().serdeKind) .planNode(); finalAggTaskIds.push_back(makeTaskId("final-agg", i)); @@ -1168,15 +1287,22 @@ TEST_P(MultiFragmentTest, replicateNullsAndAny) { // Collect results and verify number of nulls is 3 times larger than in the // original data. auto op = PlanBuilder() - .exchange(finalAggPlan->outputType(), GetParam()) + .exchange(finalAggPlan->outputType(), GetParam().serdeKind) .finalAggregation( {}, {"sum(a0)", "sum(a1)"}, {{BIGINT()}, {BIGINT()}}) .planNode(); - assertQuery( - op, - finalAggTaskIds, - "SELECT 3 * ceil(1000.0 / 7) /* number of null rows */, 1000 + 2 * ceil(1000.0 / 7) /* total number of rows */"); + std::vector finalAggTaskSplits; + for (auto finalAggTaskId : finalAggTaskIds) { + finalAggTaskSplits.emplace_back(remoteSplit(finalAggTaskId)); + } + test::AssertQueryBuilder(op, duckDbQueryRunner_) + .splits(std::move(finalAggTaskSplits)) + .config( + core::QueryConfig::kShuffleCompressionKind, + std::to_string(GetParam().compressionKind)) + .assertResults( + "SELECT 3 * ceil(1000.0 / 7) /* number of null rows */, 1000 + 2 * ceil(1000.0 / 7) /* total number of rows */"); for (auto& task : tasks) { ASSERT_TRUE(waitForTaskCompletion(task.get())) << task->taskId(); @@ -1197,7 +1323,7 @@ TEST_P(MultiFragmentTest, limit) { PlanBuilder() .tableScan(std::dynamic_pointer_cast(data->type())) .limit(0, 10, true) - .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam()) + .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam().serdeKind) .planNode(); auto leafTask = makeTask(leafTaskId, leafPlan, 0); leafTask->start(1); @@ -1207,25 +1333,20 @@ TEST_P(MultiFragmentTest, limit) { // Make final task: Exchange -> FinalLimit(10). auto plan = PlanBuilder() - .exchange(leafPlan->outputType(), GetParam()) + .exchange(leafPlan->outputType(), GetParam().serdeKind) .localPartition(std::vector{}) .limit(0, 10, false) .planNode(); // Expect the task to produce results before receiving no-more-splits message. - bool splitAdded = false; - auto task = ::assertQuery( - plan, - [&](Task* task) { - if (splitAdded) { - return; - } - task->addSplit("0", remoteSplit(leafTaskId)); - splitAdded = true; - }, - "VALUES (null), (1), (2), (3), (4), (5), (6), (null), (8), (9)", - duckDbQueryRunner_); - + auto task = + test::AssertQueryBuilder(plan, duckDbQueryRunner_) + .split(remoteSplit(leafTaskId)) + .config( + core::QueryConfig::kShuffleCompressionKind, + std::to_string(GetParam().compressionKind)) + .assertResults( + "VALUES (null), (1), (2), (3), (4), (5), (6), (null), (8), (9)"); ASSERT_TRUE(waitForTaskCompletion(task.get())) << task->taskId(); ASSERT_TRUE(waitForTaskCompletion(leafTask.get())) << leafTask->taskId(); } @@ -1239,10 +1360,11 @@ TEST_P(MultiFragmentTest, mergeExchangeOverEmptySources) { for (int i = 0; i < 2; ++i) { auto taskId = makeTaskId("leaf-", i); leafTaskIds.push_back(taskId); - auto plan = PlanBuilder() - .values({data}) - .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam()) - .planNode(); + auto plan = + PlanBuilder() + .values({data}) + .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam().serdeKind) + .planNode(); auto task = makeTask(taskId, plan, tasks.size()); tasks.push_back(task); @@ -1251,11 +1373,20 @@ TEST_P(MultiFragmentTest, mergeExchangeOverEmptySources) { auto exchangeTaskId = makeTaskId("exchange-", 0); auto plan = PlanBuilder() - .mergeExchange(rowType_, {"c0"}, GetParam()) + .mergeExchange(rowType_, {"c0"}, GetParam().serdeKind) .singleAggregation({"c0"}, {"count(1)"}) .planNode(); - assertQuery(plan, leafTaskIds, ""); + std::vector leafTaskSplits; + for (auto leafTaskId : leafTaskIds) { + leafTaskSplits.emplace_back(remoteSplit(leafTaskId)); + } + test::AssertQueryBuilder(plan, duckDbQueryRunner_) + .splits(std::move(leafTaskSplits)) + .config( + core::QueryConfig::kShuffleCompressionKind, + std::to_string(GetParam().compressionKind)) + .assertResults(""); for (auto& task : tasks) { ASSERT_TRUE(waitForTaskCompletion(task.get())) << task->taskId(); @@ -1309,7 +1440,8 @@ TEST_P(MultiFragmentTest, earlyCompletion) { auto leafTaskId = makeTaskId("leaf", 0); auto plan = PlanBuilder() .values({data, data, data, data}) - .partitionedOutput({"c0"}, 2, /*outputLayout=*/{}, GetParam()) + .partitionedOutput( + {"c0"}, 2, /*outputLayout=*/{}, GetParam().serdeKind) .planNode(); auto task = makeTask(leafTaskId, plan, tasks.size()); @@ -1329,7 +1461,7 @@ TEST_P(MultiFragmentTest, earlyCompletion) { } auto joinPlan = makeJoinOverExchangePlan( - asRowType(data->type()), buildData, GetParam()); + asRowType(data->type()), buildData, GetParam().serdeKind); joinOutputType = joinPlan->outputType(); @@ -1345,10 +1477,18 @@ TEST_P(MultiFragmentTest, earlyCompletion) { // Create output task. auto outputPlan = - PlanBuilder().exchange(joinOutputType, GetParam()).planNode(); + PlanBuilder().exchange(joinOutputType, GetParam().serdeKind).planNode(); - assertQuery( - outputPlan, joinTaskIds, "SELECT UNNEST([3, 3, 3, 3, 4, 4, 4, 4])"); + std::vector joinTaskSplits; + for (auto joinTaskId : joinTaskIds) { + joinTaskSplits.emplace_back(remoteSplit(joinTaskId)); + } + test::AssertQueryBuilder(outputPlan, duckDbQueryRunner_) + .splits(std::move(joinTaskSplits)) + .config( + core::QueryConfig::kShuffleCompressionKind, + std::to_string(GetParam().compressionKind)) + .assertResults("SELECT UNNEST([3, 3, 3, 3, 4, 4, 4, 4])"); for (auto& task : tasks) { ASSERT_TRUE(waitForTaskCompletion(task.get())) << task->taskId(); @@ -1373,7 +1513,8 @@ TEST_P(MultiFragmentTest, earlyCompletionBroadcast) { auto leafTaskId = makeTaskId("leaf", 0); auto plan = PlanBuilder() .values({data, data, data, data}) - .partitionedOutputBroadcast(/*outputLayout=*/{}, GetParam()) + .partitionedOutputBroadcast( + /*outputLayout=*/{}, GetParam().serdeKind) .planNode(); auto leafTask = makeTask(leafTaskId, plan, tasks.size()); @@ -1393,7 +1534,7 @@ TEST_P(MultiFragmentTest, earlyCompletionBroadcast) { } auto joinPlan = makeJoinOverExchangePlan( - asRowType(data->type()), buildData, GetParam()); + asRowType(data->type()), buildData, GetParam().serdeKind); joinOutputType = joinPlan->outputType(); @@ -1412,9 +1553,18 @@ TEST_P(MultiFragmentTest, earlyCompletionBroadcast) { // Create output task. auto outputPlan = - PlanBuilder().exchange(joinOutputType, GetParam()).planNode(); + PlanBuilder().exchange(joinOutputType, GetParam().serdeKind).planNode(); - assertQuery(outputPlan, joinTaskIds, "SELECT UNNEST([10, 10, 10, 10])"); + std::vector joinTaskSplits; + for (auto joinTaskId : joinTaskIds) { + joinTaskSplits.emplace_back(remoteSplit(joinTaskId)); + } + test::AssertQueryBuilder(outputPlan, duckDbQueryRunner_) + .splits(std::move(joinTaskSplits)) + .config( + core::QueryConfig::kShuffleCompressionKind, + std::to_string(GetParam().compressionKind)) + .assertResults("SELECT UNNEST([10, 10, 10, 10])"); for (auto& task : tasks) { ASSERT_TRUE(waitForTaskCompletion(task.get())) << task->taskId(); @@ -1438,7 +1588,8 @@ TEST_P(MultiFragmentTest, earlyCompletionMerge) { auto leafTaskId = makeTaskId("leaf", 0); auto plan = PlanBuilder() .values({data, data, data, data}) - .partitionedOutput({"c0"}, 2, /*outputLayout=*/{}, GetParam()) + .partitionedOutput( + {"c0"}, 2, /*outputLayout=*/{}, GetParam().serdeKind) .planNode(); auto task = makeTask(leafTaskId, plan, tasks.size()); @@ -1460,14 +1611,15 @@ TEST_P(MultiFragmentTest, earlyCompletionMerge) { auto planNodeIdGenerator = std::make_shared(); auto joinPlan = PlanBuilder(planNodeIdGenerator) - .mergeExchange(asRowType(data->type()), {"c0"}, GetParam()) + .mergeExchange( + asRowType(data->type()), {"c0"}, GetParam().serdeKind) .hashJoin( {"c0"}, {"u_c0"}, PlanBuilder(planNodeIdGenerator).values({buildData}).planNode(), "", {"c0"}) - .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam()) + .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam().serdeKind) .planNode(); joinOutputType = joinPlan->outputType(); @@ -1484,10 +1636,18 @@ TEST_P(MultiFragmentTest, earlyCompletionMerge) { // Create output task. auto outputPlan = - PlanBuilder().exchange(joinOutputType, GetParam()).planNode(); + PlanBuilder().exchange(joinOutputType, GetParam().serdeKind).planNode(); - assertQuery( - outputPlan, joinTaskIds, "SELECT UNNEST([3, 3, 3, 3, 4, 4, 4, 4])"); + std::vector joinTaskSplits; + for (auto joinTaskId : joinTaskIds) { + joinTaskSplits.emplace_back(remoteSplit(joinTaskId)); + } + test::AssertQueryBuilder(outputPlan, duckDbQueryRunner_) + .splits(std::move(joinTaskSplits)) + .config( + core::QueryConfig::kShuffleCompressionKind, + std::to_string(GetParam().compressionKind)) + .assertResults("SELECT UNNEST([3, 3, 3, 3, 4, 4, 4, 4])"); for (auto& task : tasks) { ASSERT_TRUE(waitForTaskCompletion(task.get())) << task->taskId(); @@ -1587,11 +1747,12 @@ TEST_P(MultiFragmentTest, exchangeDestruction) { auto leafTaskId = makeTaskId("leaf", 0); core::PlanNodePtr leafPlan; - leafPlan = PlanBuilder() - .tableScan(rowType_) - .project({"c0 % 10 AS c0", "c1"}) - .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam()) - .planNode(); + leafPlan = + PlanBuilder() + .tableScan(rowType_) + .project({"c0 % 10 AS c0", "c1"}) + .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam().serdeKind) + .planNode(); auto leafTask = makeTask(leafTaskId, leafPlan, 0); leafTask->start(1); @@ -1599,11 +1760,11 @@ TEST_P(MultiFragmentTest, exchangeDestruction) { auto rootPlan = PlanBuilder() - .exchange(leafPlan->outputType(), GetParam()) + .exchange(leafPlan->outputType(), GetParam().serdeKind) .addNode([&leafPlan](std::string id, core::PlanNodePtr node) { return std::make_shared(id, std::move(node)); }) - .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam()) + .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam().serdeKind) .planNode(); auto rootTask = makeTask("root-task", rootPlan, 0); @@ -1623,17 +1784,18 @@ TEST_P(MultiFragmentTest, exchangeDestruction) { TEST_P(MultiFragmentTest, cancelledExchange) { // Create a source fragment borrow the output type from it. - auto planFragment = exec::test::PlanBuilder() - .tableScan(rowType_) - .filter("c0 % 5 = 1") - .partitionedOutput({}, 1, {"c0", "c1"}, GetParam()) - .planFragment(); + auto planFragment = + exec::test::PlanBuilder() + .tableScan(rowType_) + .filter("c0 % 5 = 1") + .partitionedOutput({}, 1, {"c0", "c1"}, GetParam().serdeKind) + .planFragment(); // Create task with exchange. auto planFragmentWithExchange = exec::test::PlanBuilder() - .exchange(planFragment.planNode->outputType(), GetParam()) - .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam()) + .exchange(planFragment.planNode->outputType(), GetParam().serdeKind) + .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam().serdeKind) .planFragment(); auto exchangeTask = makeTask("output.0.0.1", planFragmentWithExchange.planNode, 0); @@ -1733,22 +1895,26 @@ TEST_P(MultiFragmentTest, customPlanNodeWithExchangeClient) { Operator::registerOperator(std::make_unique()); auto leafTaskId = makeTaskId("leaf", 0); core::PlanNodeId partitionNodeId; - auto leafPlan = PlanBuilder() - .values(vectors_) - .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam()) - .capturePlanNodeId(partitionNodeId) - .planNode(); + auto leafPlan = + PlanBuilder() + .values(vectors_) + .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam().serdeKind) + .capturePlanNodeId(partitionNodeId) + .planNode(); auto leafTask = makeTask(leafTaskId, leafPlan, 0); leafTask->start(1); CursorParameters params; + params.queryConfigs.emplace( + core::QueryConfig::kShuffleCompressionKind, + std::to_string(GetParam().compressionKind)); core::PlanNodeId testNodeId; params.maxDrivers = 1; params.planNode = PlanBuilder() .addNode([&leafPlan](std::string id, core::PlanNodePtr /* input */) { return std::make_shared( - id, leafPlan->outputType(), GetParam()); + id, leafPlan->outputType(), GetParam().serdeKind); }) .capturePlanNodeId(testNodeId) .planNode(); @@ -1772,8 +1938,10 @@ TEST_P(MultiFragmentTest, customPlanNodeWithExchangeClient) { const auto serdeKindRuntimsStats = planStats.at(partitionNodeId).customStats.at(Operator::kShuffleSerdeKind); ASSERT_EQ(serdeKindRuntimsStats.count, 1); - ASSERT_EQ(serdeKindRuntimsStats.min, static_cast(GetParam())); - ASSERT_EQ(serdeKindRuntimsStats.max, static_cast(GetParam())); + ASSERT_EQ( + serdeKindRuntimsStats.min, static_cast(GetParam().serdeKind)); + ASSERT_EQ( + serdeKindRuntimsStats.max, static_cast(GetParam().serdeKind)); } // This test is to reproduce the race condition between task terminate and no @@ -1795,7 +1963,7 @@ DEBUG_ONLY_TEST_P( PlanBuilder() .tableScan(rowType_) .project({"c0 % 10 AS c0", "c1"}) - .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam()) + .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam().serdeKind) .planNode(); auto leafTask = makeTask(leafTaskId, leafPlan, 0); leafTask->start(1); @@ -1828,7 +1996,7 @@ DEBUG_ONLY_TEST_P( blockTerminate.await([&]() { return readyToTerminate.load(); }); }))); auto rootPlan = PlanBuilder() - .exchange(leafPlan->outputType(), GetParam()) + .exchange(leafPlan->outputType(), GetParam().serdeKind) .finalAggregation({"c0"}, {"count(c1)"}, {{BIGINT()}}) .planNode(); @@ -1854,10 +2022,11 @@ TEST_P(MultiFragmentTest, taskTerminateWithPendingOutputBuffers) { setupSources(8, 1000); auto taskId = makeTaskId("task", 0); core::PlanNodePtr leafPlan; - leafPlan = PlanBuilder() - .tableScan(rowType_) - .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam()) - .planNode(); + leafPlan = + PlanBuilder() + .tableScan(rowType_) + .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam().serdeKind) + .planNode(); auto task = makeTask(taskId, leafPlan, 0); task->start(1); @@ -1926,19 +2095,20 @@ DEBUG_ONLY_TEST_P( makeRowVector({"p_c0"}, {makeFlatVector({1, 2, 3})}); auto planNodeIdGenerator = std::make_shared(); core::PlanNodeId exchangeNodeId; - auto plan = PlanBuilder(planNodeIdGenerator) - .values({probeData}, true) - .hashJoin( - {"p_c0"}, - {"c0"}, - PlanBuilder(planNodeIdGenerator) - .exchange(rowType_, GetParam()) - .capturePlanNodeId(exchangeNodeId) - .planNode(), - "", - {"c0"}) - .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam()) - .planNode(); + auto plan = + PlanBuilder(planNodeIdGenerator) + .values({probeData}, true) + .hashJoin( + {"p_c0"}, + {"c0"}, + PlanBuilder(planNodeIdGenerator) + .exchange(rowType_, GetParam().serdeKind) + .capturePlanNodeId(exchangeNodeId) + .planNode(), + "", + {"c0"}) + .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam().serdeKind) + .planNode(); auto taskId = makeTaskId("final", 0); auto task = makeTask(taskId, plan, 0); task->start(2); @@ -2007,7 +2177,7 @@ DEBUG_ONLY_TEST_P(MultiFragmentTest, mergeWithEarlyTermination) { .tableScan(rowType_) .orderBy({"c0"}, true) .planNode()}) - .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam()) + .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam().serdeKind) .planNode(); auto partialSortTask = makeTask(sortTaskId, partialSortPlan, 1); @@ -2034,8 +2204,9 @@ DEBUG_ONLY_TEST_P(MultiFragmentTest, mergeWithEarlyTermination) { auto finalSortTaskId = makeTaskId("orderby", 1); auto finalSortPlan = PlanBuilder() - .mergeExchange(partialSortPlan->outputType(), {"c0"}, GetParam()) - .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam()) + .mergeExchange( + partialSortPlan->outputType(), {"c0"}, GetParam().serdeKind) + .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam().serdeKind) .planNode(); auto finalSortTask = makeTask(finalSortTaskId, finalSortPlan, 0); finalSortTask->start(1); @@ -2168,6 +2339,11 @@ class DataFetcher { /// of individual pages. PartitionedOutput operator is expected to limit page /// sizes to no more than 1MB give and take 30%. TEST_P(MultiFragmentTest, maxBytes) { + if (GetParam().compressionKind != common::CompressionKind_NONE) { + // NOTE: different compression generates different serialized byte size so + // only test with no-compression to ease testing.s + return; + } std::string s(25, 'x'); // Keep the row count under 7000 to avoid hitting the row limit in the // operator instead. @@ -2178,11 +2354,12 @@ TEST_P(MultiFragmentTest, maxBytes) { }); core::PlanNodeId outputNodeId; - auto plan = PlanBuilder() - .values({data}, false, 100) - .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam()) - .capturePlanNodeId(outputNodeId) - .planNode(); + auto plan = + PlanBuilder() + .values({data}, false, 100) + .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam().serdeKind) + .capturePlanNodeId(outputNodeId) + .planNode(); int32_t testIteration = 0; DataFetcher::Stats prevStats; @@ -2258,7 +2435,7 @@ DEBUG_ONLY_TEST_P(MultiFragmentTest, exchangeStatsOnFailure) { auto producerPlan = PlanBuilder() .values({data}, false, 30) - .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam()) + .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam().serdeKind) .planNode(); auto producerTaskId = makeTaskId("producer", 0); @@ -2266,8 +2443,9 @@ DEBUG_ONLY_TEST_P(MultiFragmentTest, exchangeStatsOnFailure) { producerTask->start(1); producerTask->updateOutputBuffers(1, true); - auto plan = - PlanBuilder().exchange(producerPlan->outputType(), GetParam()).planNode(); + auto plan = PlanBuilder() + .exchange(producerPlan->outputType(), GetParam().serdeKind) + .planNode(); auto task = makeTask("t", plan, 0, noopConsumer()); task->start(4); @@ -2301,7 +2479,7 @@ TEST_P(MultiFragmentTest, earlyTaskFailure) { .tableScan(rowType_) .orderBy({"c0"}, true) .planNode()}) - .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam()) + .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam().serdeKind) .planNode(); for (bool internalFailure : {false, true}) { SCOPED_TRACE(fmt::format("internalFailure: {}", internalFailure)); @@ -2312,10 +2490,11 @@ TEST_P(MultiFragmentTest, earlyTaskFailure) { auto outputType = partialSortPlan->outputType(); auto finalSortTaskId = makeTaskId("finalSortBy", 0); - auto finalSortPlan = PlanBuilder() - .mergeExchange(outputType, {"c0"}, GetParam()) - .partitionedOutput({}, 1) - .planNode(); + auto finalSortPlan = + PlanBuilder() + .mergeExchange(outputType, {"c0"}, GetParam().serdeKind) + .partitionedOutput({}, 1) + .planNode(); auto finalSortTask = makeTask(finalSortTaskId, finalSortPlan, 0); if (internalFailure) { @@ -2348,16 +2527,18 @@ TEST_P(MultiFragmentTest, mergeSmallBatchesInExchange) { auto data = makeRowVector({makeFlatVector({1, 2, 3})}); const int32_t numPartitions = 100; - auto producerPlan = - test::PlanBuilder() - .values({data}) - .partitionedOutput( - {"c0"}, numPartitions, /*outputLayout=*/{}, GetParam()) - .planNode(); + auto producerPlan = test::PlanBuilder() + .values({data}) + .partitionedOutput( + {"c0"}, + numPartitions, + /*outputLayout=*/{}, + GetParam().serdeKind) + .planNode(); const auto producerTaskId = "local://t1"; auto plan = test::PlanBuilder() - .exchange(asRowType(data->type()), GetParam()) + .exchange(asRowType(data->type()), GetParam().serdeKind) .planNode(); auto expected = makeRowVector({ @@ -2401,12 +2582,12 @@ TEST_P(MultiFragmentTest, mergeSmallBatchesInExchange) { ASSERT_EQ(numPages, stats.customStats.at("numReceivedPages").sum); }; - if (GetParam() == VectorSerde::Kind::kPresto) { + if (GetParam().serdeKind == VectorSerde::Kind::kPresto) { test(1, 1'000); test(1'000, 56); test(10'000, 6); test(100'000, 1); - } else if (GetParam() == VectorSerde::Kind::kCompactRow) { + } else if (GetParam().serdeKind == VectorSerde::Kind::kCompactRow) { test(1, 1'000); test(1'000, 38); test(10'000, 4); @@ -2420,24 +2601,17 @@ TEST_P(MultiFragmentTest, mergeSmallBatchesInExchange) { } TEST_P(MultiFragmentTest, compression) { - bufferManager_->testingSetCompression( - common::CompressionKind::CompressionKind_LZ4); - auto guard = folly::makeGuard([&]() { - bufferManager_->testingSetCompression( - common::CompressionKind::CompressionKind_NONE); - }); - constexpr int32_t kNumRepeats = 1'000'000; const auto data = makeRowVector({makeFlatVector({1, 2, 3})}); const auto producerPlan = test::PlanBuilder() .values({data}, false, kNumRepeats) - .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam()) + .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam().serdeKind) .planNode(); const auto plan = test::PlanBuilder() - .exchange(asRowType(data->type()), GetParam()) + .exchange(asRowType(data->type()), GetParam().serdeKind) .singleAggregation({}, {"sum(c0)"}) .planNode(); @@ -2453,28 +2627,57 @@ TEST_P(MultiFragmentTest, compression) { auto consumerTask = test::AssertQueryBuilder(plan) .split(remoteSplit(producerTaskId)) + .config( + core::QueryConfig::kShuffleCompressionKind, + std::to_string(GetParam().compressionKind)) .destination(0) .assertResults(expected); auto consumerTaskStats = exec::toPlanStats(consumerTask->taskStats()); const auto& consumerPlanStats = consumerTaskStats.at("0"); + ASSERT_EQ( + consumerPlanStats.customStats.at(Operator::kShuffleCompressionKind).min, + static_cast(GetParam().compressionKind)); + ASSERT_EQ( + consumerPlanStats.customStats.at(Operator::kShuffleCompressionKind).max, + static_cast(GetParam().compressionKind)); ASSERT_EQ(data->size() * kNumRepeats, consumerPlanStats.outputRows); auto producerTaskStats = exec::toPlanStats(producerTask->taskStats()); const auto& producerStats = producerTaskStats.at("1"); + ASSERT_EQ( + producerStats.customStats.at(Operator::kShuffleCompressionKind).min, + static_cast(GetParam().compressionKind)); + ASSERT_EQ( + producerStats.customStats.at(Operator::kShuffleCompressionKind).max, + static_cast(GetParam().compressionKind)); + if (GetParam().compressionKind == common::CompressionKind_NONE) { + ASSERT_EQ(producerStats.customStats.at("compressedBytes").sum, 0); + ASSERT_EQ(producerStats.customStats.at("compressionInputBytes").sum, 0); + ASSERT_EQ(producerStats.customStats.at("compressionSkippedBytes").sum, 0); + return; + } // The data is extremely compressible, 1, 2, 3 repeated 1000000 times. if (!expectSkipCompression) { - EXPECT_LT( + ASSERT_LT( producerStats.customStats.at("compressedBytes").sum, producerStats.customStats.at("compressionInputBytes").sum); - EXPECT_EQ(0, producerStats.customStats.at("compressionSkippedBytes").sum); + ASSERT_EQ(0, producerStats.customStats.at("compressionSkippedBytes").sum); } else { - EXPECT_LT(0, producerStats.customStats.at("compressionSkippedBytes").sum); + ASSERT_LT(0, producerStats.customStats.at("compressionSkippedBytes").sum); } }; - test("local://t1", 0.7, false); - test("local://t2", 0.0000001, true); + { + SCOPED_TRACE( + fmt::format("compression kind {}", GetParam().compressionKind)); + { + SCOPED_TRACE(fmt::format("minCompressionRatio 0.7")); + test("local://t1", 0.7, false); + } + SCOPED_TRACE(fmt::format("minCompressionRatio 0.0000001")); + { test("local://t2", 0.0000001, true); } + } } TEST_P(MultiFragmentTest, scaledTableScan) { @@ -2529,7 +2732,7 @@ TEST_P(MultiFragmentTest, scaledTableScan) { .capturePlanNodeId(scanNodeId) .partialAggregation( {"c5"}, {"max(c0)", "sum(c1)", "sum(c2)", "sum(c3)", "sum(c4)"}) - .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam()) + .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam().serdeKind) .planNode(); const auto leafTaskId = "local://leaf-0"; @@ -2540,12 +2743,12 @@ TEST_P(MultiFragmentTest, scaledTableScan) { const auto finalAggPlan = PlanBuilder() - .exchange(leafPlan->outputType(), GetParam()) + .exchange(leafPlan->outputType(), GetParam().serdeKind) .finalAggregation( {"c5"}, {"max(a0)", "sum(a1)", "sum(a2)", "sum(a3)", "sum(a4)"}, {{BIGINT()}, {INTEGER()}, {SMALLINT()}, {REAL()}, {DOUBLE()}}) - .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam()) + .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam().serdeKind) .planNode(); const auto finalAggTaskId = "local://final-agg-0"; @@ -2556,13 +2759,16 @@ TEST_P(MultiFragmentTest, scaledTableScan) { const auto resultPlan = PlanBuilder() - .exchange(finalAggPlan->outputType(), GetParam()) + .exchange(finalAggPlan->outputType(), GetParam().serdeKind) .planNode(); - assertQuery( - resultPlan, - {finalAggTaskId}, - "SELECT c5, max(c0), sum(c1), sum(c2), sum(c3), sum(c4) FROM tmp group by c5"); + test::AssertQueryBuilder(resultPlan, duckDbQueryRunner_) + .split(remoteSplit(finalAggTaskId)) + .config( + core::QueryConfig::kShuffleCompressionKind, + std::to_string(GetParam().compressionKind)) + .assertResults( + "SELECT c5, max(c0), sum(c1), sum(c2), sum(c3), sum(c4) FROM tmp group by c5"); ASSERT_TRUE(waitForTaskCompletion(leafTask.get())) << leafTask->taskId(); ASSERT_TRUE(waitForTaskCompletion(finalAggTask.get())) diff --git a/velox/vector/VectorStream.h b/velox/vector/VectorStream.h index 6094ef7830b84..f9d9b6790c50f 100644 --- a/velox/vector/VectorStream.h +++ b/velox/vector/VectorStream.h @@ -419,7 +419,7 @@ class VectorStreamGroup : public StreamArena { RowTypePtr type, VectorSerde* serde, RowVectorPtr* result, - const VectorSerde::Options* options = nullptr); + const VectorSerde::Options* options); void clear() override { StreamArena::clear();