diff --git a/velox/core/QueryConfig.h b/velox/core/QueryConfig.h index 90105af887e6..e80385ece796 100644 --- a/velox/core/QueryConfig.h +++ b/velox/core/QueryConfig.h @@ -15,6 +15,7 @@ */ #pragma once +#include "velox/common/compression/Compression.h" #include "velox/common/config/Config.h" #include "velox/vector/TypeAliases.h" @@ -490,6 +491,11 @@ class QueryConfig { static constexpr const char* kTableScanScaleUpMemoryUsageRatio = "table_scan_scale_up_memory_usage_ratio"; + /// Specifies the shuffle compression kind which is defined by + /// CompressionKind. If it is CompressionKind_NONE, then no compression. + static constexpr const char* kShuffleCompressionKind = + "shuffle_compression_kind"; + bool selectiveNimbleReaderEnabled() const { return get(kSelectiveNimbleReaderEnabled, false); } @@ -906,6 +912,12 @@ class QueryConfig { return get(kTableScanScaleUpMemoryUsageRatio, 0.7); } + int32_t shuffleCompressionKind() const { + return get( + kShuffleCompressionKind, + static_cast(common::CompressionKind::CompressionKind_NONE)); + } + template T get(const std::string& key, const T& defaultValue) const { return config_->get(key, defaultValue); diff --git a/velox/docs/configs.rst b/velox/docs/configs.rst index 9fe2534b1262..1fae4ece8f07 100644 --- a/velox/docs/configs.rst +++ b/velox/docs/configs.rst @@ -148,6 +148,11 @@ Generic Configuration - integer - 16 - Byte length of the string prefix stored in the prefix-sort buffer. This doesn't include the null byte. + * - shuffle_compression_kind + - integer + - 0 + - Specifies the shuffle compression kind which is defined by CompressionKind. If it is set to + CompressionKind_NONE (0), then no compression. .. _expression-evaluation-conf: diff --git a/velox/docs/monitoring/stats.rst b/velox/docs/monitoring/stats.rst index a6feb1d32cb0..c6f9f5f91156 100644 --- a/velox/docs/monitoring/stats.rst +++ b/velox/docs/monitoring/stats.rst @@ -205,6 +205,11 @@ These stats are reported by shuffle operators. - Indicates the vector serde kind used by an operator for shuffle with 1 for Presto, 2 for CompactRow, 3 for UnsafeRow. It is reported by Exchange, MergeExchange and PartitionedOutput operators for now. + * - shuffleCompressionKind + - + - Indicates the compression kind used by an operator for shuffle. The + reported value is set to the corresponding CompressionKind enum with 0 + (CompressionKind_NONE) as no compression. PrefixSort ---------- diff --git a/velox/exec/Exchange.cpp b/velox/exec/Exchange.cpp index 8dcf41ccc7a8..f49459ae09b6 100644 --- a/velox/exec/Exchange.cpp +++ b/velox/exec/Exchange.cpp @@ -21,13 +21,14 @@ namespace facebook::velox::exec { namespace { std::unique_ptr getVectorSerdeOptions( + const core::QueryConfig& queryConfig, VectorSerde::Kind kind) { std::unique_ptr options = kind == VectorSerde::Kind::kPresto ? std::make_unique() : std::make_unique(); - options->compressionKind = - OutputBufferManager::getInstance().lock()->compressionKind(); + options->compressionKind = static_cast( + queryConfig.shuffleCompressionKind()); return options; } } // namespace @@ -46,8 +47,10 @@ Exchange::Exchange( operatorType), preferredOutputBatchBytes_{ driverCtx->queryConfig().preferredOutputBatchBytes()}, - serdeKind_(exchangeNode->serdeKind()), - options_{getVectorSerdeOptions(serdeKind_)}, + serdeKind_{exchangeNode->serdeKind()}, + serdeOptions_{getVectorSerdeOptions( + operatorCtx_->driverCtx()->queryConfig(), + serdeKind_)}, processSplits_{operatorCtx_->driverCtx()->driverId == 0}, exchangeClient_{std::move(exchangeClient)} {} @@ -159,7 +162,7 @@ RowVectorPtr Exchange::getOutput() { outputType_, &result_, resultOffset, - options_.get()); + serdeOptions_.get()); resultOffset = result_->size(); } } @@ -186,7 +189,7 @@ RowVectorPtr Exchange::getOutput() { outputType_, &result_, resultOffset, - options_.get()); + serdeOptions_.get()); // We expect the row-wise deserialization to consume all the input into one // output vector. VELOX_CHECK(inputStream->atEnd()); @@ -212,9 +215,15 @@ void Exchange::close() { exchangeClient_->close(); } exchangeClient_ = nullptr; - stats_.wlock()->addRuntimeStat( - Operator::kShuffleSerdeKind, - RuntimeCounter(static_cast(serdeKind_))); + { + auto lockedStats = stats_.wlock(); + lockedStats->addRuntimeStat( + Operator::kShuffleSerdeKind, + RuntimeCounter(static_cast(serdeKind_))); + lockedStats->addRuntimeStat( + Operator::kShuffleCompressionKind, + RuntimeCounter(static_cast(serdeOptions_->compressionKind))); + } } void Exchange::recordExchangeClientStats() { diff --git a/velox/exec/Exchange.h b/velox/exec/Exchange.h index 45cdce8ceb59..3a1f32cac533 100644 --- a/velox/exec/Exchange.h +++ b/velox/exec/Exchange.h @@ -83,7 +83,7 @@ class Exchange : public SourceOperator { const VectorSerde::Kind serdeKind_; - const std::unique_ptr options_; + const std::unique_ptr serdeOptions_; /// True if this operator is responsible for fetching splits from the Task /// and passing these to ExchangeClient. diff --git a/velox/exec/Merge.cpp b/velox/exec/Merge.cpp index f25f327bc9d7..ac2fcad22277 100644 --- a/velox/exec/Merge.cpp +++ b/velox/exec/Merge.cpp @@ -21,6 +21,19 @@ using facebook::velox::common::testutil::TestValue; namespace facebook::velox::exec { +namespace { +std::unique_ptr getVectorSerdeOptions( + const core::QueryConfig& queryConfig, + VectorSerde::Kind kind) { + std::unique_ptr options = + kind == VectorSerde::Kind::kPresto + ? std::make_unique() + : std::make_unique(); + options->compressionKind = static_cast( + queryConfig.shuffleCompressionKind()); + return options; +} +} // namespace Merge::Merge( int32_t operatorId, @@ -305,7 +318,10 @@ MergeExchange::MergeExchange( mergeExchangeNode->sortingOrders(), mergeExchangeNode->id(), "MergeExchange"), - serde_(getNamedVectorSerde(mergeExchangeNode->serdeKind())) {} + serde_(getNamedVectorSerde(mergeExchangeNode->serdeKind())), + serdeOptions_(getVectorSerdeOptions( + driverCtx->queryConfig(), + mergeExchangeNode->serdeKind())) {} BlockingReason MergeExchange::addMergeSources(ContinueFuture* future) { if (operatorCtx_->driverCtx()->driverId != 0) { @@ -370,8 +386,14 @@ void MergeExchange::close() { source->close(); } Operator::close(); - stats_.wlock()->addRuntimeStat( - Operator::kShuffleSerdeKind, - RuntimeCounter(static_cast(serde_->kind()))); + { + auto lockedStats = stats_.wlock(); + lockedStats->addRuntimeStat( + Operator::kShuffleSerdeKind, + RuntimeCounter(static_cast(serde_->kind()))); + lockedStats->addRuntimeStat( + Operator::kShuffleCompressionKind, + RuntimeCounter(static_cast(serdeOptions_->compressionKind))); + } } } // namespace facebook::velox::exec diff --git a/velox/exec/Merge.h b/velox/exec/Merge.h index e8aa3749dd4d..867c07dee44a 100644 --- a/velox/exec/Merge.h +++ b/velox/exec/Merge.h @@ -195,6 +195,10 @@ class MergeExchange : public Merge { return serde_; } + VectorSerde::Options* serdeOptions() { + return serdeOptions_.get(); + } + void close() override; protected: @@ -202,6 +206,7 @@ class MergeExchange : public Merge { private: VectorSerde* const serde_; + const std::unique_ptr serdeOptions_; bool noMoreSplits_ = false; // Task Ids from all the splits we took to process so far. std::vector remoteSourceTaskIds_; diff --git a/velox/exec/MergeSource.cpp b/velox/exec/MergeSource.cpp index a319f2535afa..786bd40045cc 100644 --- a/velox/exec/MergeSource.cpp +++ b/velox/exec/MergeSource.cpp @@ -168,7 +168,8 @@ class MergeExchangeSource : public MergeSource { mergeExchange_->pool(), mergeExchange_->outputType(), mergeExchange_->serde(), - &data); + &data, + mergeExchange_->serdeOptions()); auto lockedStats = mergeExchange_->stats().wlock(); lockedStats->addInputVector(data->estimateFlatSize(), data->size()); diff --git a/velox/exec/Operator.h b/velox/exec/Operator.h index de06d78da65a..4a1b7f3f43ab 100644 --- a/velox/exec/Operator.h +++ b/velox/exec/Operator.h @@ -364,6 +364,11 @@ class Operator : public BaseRuntimeStatWriter { /// runtime stats value is the corresponding enum value. static inline const std::string kShuffleSerdeKind{"shuffleSerdeKind"}; + /// The compression kind used by an operator for shuffle. The recorded + /// runtime stats value is the corresponding enum value. + static inline const std::string kShuffleCompressionKind{ + "shuffleCompressionKind"}; + /// 'operatorId' is the initial index of the 'this' in the Driver's list of /// Operators. This is used as in index into OperatorStats arrays in the Task. /// 'planNodeId' is a query-level unique identifier of the PlanNode to which diff --git a/velox/exec/OutputBufferManager.h b/velox/exec/OutputBufferManager.h index 847532cd554e..038d42cdce77 100644 --- a/velox/exec/OutputBufferManager.h +++ b/velox/exec/OutputBufferManager.h @@ -23,15 +23,11 @@ class OutputBufferManager { public: /// Options for shuffle. This is initialized once and affects both /// PartitionedOutput and Exchange. This can be used for controlling - /// compression, protocol version and other matters where shuffle sides should + /// protocol version and other matters where shuffle sides should /// agree. - struct Options { - common::CompressionKind compressionKind{ - common::CompressionKind::CompressionKind_NONE}; - }; + struct Options {}; - OutputBufferManager(Options options) - : compressionKind_(options.compressionKind) {} + explicit OutputBufferManager(Options /*unused*/) {} void initializeTask( std::shared_ptr task, @@ -135,21 +131,11 @@ class OutputBufferManager { // Returns NULL if task not found. std::shared_ptr getBufferIfExists(const std::string& taskId); - void testingSetCompression(common::CompressionKind kind) { - *const_cast(&compressionKind_) = kind; - } - - common::CompressionKind compressionKind() const { - return compressionKind_; - } - private: // Retrieves the set of buffers for a query. // Throws an exception if buffer doesn't exist. std::shared_ptr getBuffer(const std::string& taskId); - const common::CompressionKind compressionKind_; - folly::Synchronized< std::unordered_map>, std::mutex> diff --git a/velox/exec/PartitionedOutput.cpp b/velox/exec/PartitionedOutput.cpp index 88663c9ad1e5..d06b9e2f3443 100644 --- a/velox/exec/PartitionedOutput.cpp +++ b/velox/exec/PartitionedOutput.cpp @@ -21,13 +21,14 @@ namespace facebook::velox::exec { namespace { std::unique_ptr getVectorSerdeOptions( + const core::QueryConfig& queryConfig, VectorSerde::Kind kind) { std::unique_ptr options = kind == VectorSerde::Kind::kPresto ? std::make_unique() : std::make_unique(); - options->compressionKind = - OutputBufferManager::getInstance().lock()->compressionKind(); + options->compressionKind = static_cast( + queryConfig.shuffleCompressionKind()); options->minCompressionRatio = PartitionedOutput::minCompressionRatio(); return options; } @@ -38,14 +39,14 @@ Destination::Destination( const std::string& taskId, int destination, VectorSerde* serde, - VectorSerde::Options* options, + VectorSerde::Options* serdeOptions, memory::MemoryPool* pool, bool eagerFlush, std::function recordEnqueued) : taskId_(taskId), destination_(destination), serde_(serde), - options_(options), + serdeOptions_(serdeOptions), pool_(pool), eagerFlush_(eagerFlush), recordEnqueued_(std::move(recordEnqueued)) { @@ -89,7 +90,7 @@ BlockingReason Destination::advance( if (current_ == nullptr) { current_ = std::make_unique(pool_, serde_); const auto rowType = asRowType(output->type()); - current_->createStreamTree(rowType, rowsInCurrent_, options_); + current_->createStreamTree(rowType, rowsInCurrent_, serdeOptions_); } const auto rows = folly::Range(&rows_[firstRow], rowIdx_ - firstRow); @@ -200,7 +201,9 @@ PartitionedOutput::PartitionedOutput( .maxPartitionedOutputBufferSize()), eagerFlush_(eagerFlush), serde_(getNamedVectorSerde(planNode->serdeKind())), - options_(getVectorSerdeOptions(planNode->serdeKind())) { + serdeOptions_(getVectorSerdeOptions( + operatorCtx_->driverCtx()->queryConfig(), + planNode->serdeKind())) { if (!planNode->isPartitioned()) { VELOX_USER_CHECK_EQ(numDestinations_, 1); } @@ -256,7 +259,7 @@ void PartitionedOutput::initializeDestinations() { taskId, i, serde_, - options_.get(), + serdeOptions_.get(), pool(), eagerFlush_, [&](uint64_t bytes, uint64_t rows) { @@ -473,9 +476,15 @@ bool PartitionedOutput::isFinished() { void PartitionedOutput::close() { Operator::close(); - stats_.wlock()->addRuntimeStat( - Operator::kShuffleSerdeKind, - RuntimeCounter(static_cast(serde_->kind()))); + { + auto lockedStats = stats_.wlock(); + lockedStats->addRuntimeStat( + Operator::kShuffleSerdeKind, + RuntimeCounter(static_cast(serde_->kind()))); + lockedStats->addRuntimeStat( + Operator::kShuffleCompressionKind, + RuntimeCounter(static_cast(serdeOptions_->compressionKind))); + } destinations_.clear(); } diff --git a/velox/exec/PartitionedOutput.h b/velox/exec/PartitionedOutput.h index 699af6d65590..5a1c44cf0b19 100644 --- a/velox/exec/PartitionedOutput.h +++ b/velox/exec/PartitionedOutput.h @@ -105,7 +105,7 @@ class Destination { const std::string taskId_; const int destination_; VectorSerde* const serde_; - VectorSerde::Options* const options_; + VectorSerde::Options* const serdeOptions_; memory::MemoryPool* const pool_; const bool eagerFlush_; const std::function recordEnqueued_; @@ -220,7 +220,7 @@ class PartitionedOutput : public Operator { const int64_t maxBufferedBytes_; const bool eagerFlush_; VectorSerde* const serde_; - const std::unique_ptr options_; + const std::unique_ptr serdeOptions_; BlockingReason blockingReason_{BlockingReason::kNotBlocked}; ContinueFuture future_; diff --git a/velox/exec/tests/MultiFragmentTest.cpp b/velox/exec/tests/MultiFragmentTest.cpp index 1a8b3812fc5a..2e275d3f896d 100644 --- a/velox/exec/tests/MultiFragmentTest.cpp +++ b/velox/exec/tests/MultiFragmentTest.cpp @@ -37,16 +37,29 @@ using facebook::velox::test::BatchMaker; namespace facebook::velox::exec { namespace { -class MultiFragmentTest - : public HiveConnectorTestBase, - public testing::WithParamInterface { +struct TestParam { + VectorSerde::Kind serdeKind; + common::CompressionKind compressionKind; +}; + +class MultiFragmentTest : public HiveConnectorTestBase, + public testing::WithParamInterface { public: - static std::vector getTestParams() { - const std::vector kinds( - {VectorSerde::Kind::kPresto, - VectorSerde::Kind::kCompactRow, - VectorSerde::Kind::kUnsafeRow}); - return kinds; + static std::vector getTestParams() { + std::vector params; + params.emplace_back( + VectorSerde::Kind::kPresto, common::CompressionKind_NONE); + params.emplace_back( + VectorSerde::Kind::kCompactRow, common::CompressionKind_NONE); + params.emplace_back( + VectorSerde::Kind::kUnsafeRow, common::CompressionKind_NONE); + params.emplace_back( + VectorSerde::Kind::kPresto, common::CompressionKind_LZ4); + params.emplace_back( + VectorSerde::Kind::kCompactRow, common::CompressionKind_LZ4); + params.emplace_back( + VectorSerde::Kind::kUnsafeRow, common::CompressionKind_LZ4); + return params; } protected: @@ -54,6 +67,8 @@ class MultiFragmentTest HiveConnectorTestBase::SetUp(); exec::ExchangeSource::factories().clear(); exec::ExchangeSource::registerFactory(createLocalExchangeSource); + configSettings_[core::QueryConfig::kShuffleCompressionKind] = + std::to_string(GetParam().compressionKind); } void TearDown() override { @@ -137,26 +152,6 @@ class MultiFragmentTest task->noMoreSplits("0"); } - std::shared_ptr assertQuery( - const core::PlanNodePtr& plan, - const std::vector& remoteTaskIds, - const std::string& duckDbSql, - std::optional> sortingKeys = std::nullopt) { - std::vector> splits; - for (auto& taskId : remoteTaskIds) { - splits.push_back(std::make_shared(taskId)); - } - return OperatorTestBase::assertQuery(plan, splits, duckDbSql, sortingKeys); - } - - void assertQueryOrdered( - const core::PlanNodePtr& plan, - const std::vector& remoteTaskIds, - const std::string& duckDbSql, - const std::vector& sortingKeys) { - assertQuery(plan, remoteTaskIds, duckDbSql, sortingKeys); - } - void setupSources(int filePathCount, int rowsPerVector) { filePaths_ = makeFilePaths(filePathCount); vectors_ = makeVectors(filePaths_.size(), rowsPerVector); @@ -170,7 +165,8 @@ class MultiFragmentTest const std::string& taskId, int32_t destination, const RowVectorPtr& data) { - auto page = toSerializedPage(data, GetParam(), bufferManager_, pool()); + auto page = + toSerializedPage(data, GetParam().serdeKind, bufferManager_, pool()); const auto pageSize = page->size(); ContinueFuture unused; @@ -224,7 +220,8 @@ TEST_P(MultiFragmentTest, aggregationSingleKey) { .tableScan(rowType_) .project({"c0 % 10 AS c0", "c1"}) .partialAggregation({"c0"}, {"sum(c1)"}) - .partitionedOutput({"c0"}, 3, /*outputLayout=*/{}, GetParam()) + .partitionedOutput( + {"c0"}, 3, /*outputLayout=*/{}, GetParam().serdeKind) .capturePlanNodeId(partitionNodeId) .planNode(); @@ -241,10 +238,10 @@ TEST_P(MultiFragmentTest, aggregationSingleKey) { for (int i = 0; i < 3; i++) { finalAggPlan = PlanBuilder() - .exchange(partialAggPlan->outputType(), GetParam()) + .exchange(partialAggPlan->outputType(), GetParam().serdeKind) .capturePlanNodeId(exchangeNodeId) .finalAggregation({"c0"}, {"sum(a0)"}, {{BIGINT()}}) - .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam()) + .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam().serdeKind) .planNode(); finalAggTaskIds.push_back(makeTaskId("final-agg", i)); @@ -255,11 +252,20 @@ TEST_P(MultiFragmentTest, aggregationSingleKey) { addRemoteSplits(task, {leafTaskId}); } - auto op = - PlanBuilder().exchange(finalAggPlan->outputType(), GetParam()).planNode(); + auto op = PlanBuilder() + .exchange(finalAggPlan->outputType(), GetParam().serdeKind) + .planNode(); - assertQuery( - op, finalAggTaskIds, "SELECT c0 % 10, sum(c1) FROM tmp GROUP BY 1"); + std::vector finalAggTaskSplits; + for (auto finalAggTaskId : finalAggTaskIds) { + finalAggTaskSplits.emplace_back(remoteSplit(finalAggTaskId)); + } + test::AssertQueryBuilder(op, duckDbQueryRunner_) + .splits(std::move(finalAggTaskSplits)) + .config( + core::QueryConfig::kShuffleCompressionKind, + std::to_string(GetParam().compressionKind)) + .assertResults("SELECT c0 % 10, sum(c1) FROM tmp GROUP BY 1"); for (auto& task : tasks) { ASSERT_TRUE(waitForTaskCompletion(task.get())) << task->taskId(); @@ -293,16 +299,17 @@ TEST_P(MultiFragmentTest, aggregationSingleKey) { pools.swap(childPools); } if (i == 0) { - // For leaf task, it has total 21 memory pools: task pool + 4 plan node - // pools (TableScan, FilterProject, PartialAggregation, PartitionedOutput) - // + 16 operator pools (4 drivers * number of plan nodes) + 4 connector - // pools for TableScan. + // For leaf task, it has total 21 memory pools: task pool + 4 plan + // node pools (TableScan, FilterProject, PartialAggregation, + // PartitionedOutput) + // + 16 operator pools (4 drivers * number of plan nodes) + 4 + // connector pools for TableScan. ASSERT_EQ(numPools, 25); } else { // For root task, it has total 8 memory pools: task pool + 3 plan node - // pools (Exchange, Aggregation, PartitionedOutput) and 4 leaf pools: 3 - // operator pools (1 driver * number of plan nodes) + 1 exchange client - // pool. + // pools (Exchange, Aggregation, PartitionedOutput) and 4 leaf pools: + // 3 operator pools (1 driver * number of plan nodes) + 1 exchange + // client pool. ASSERT_EQ(numPools, 8); } } @@ -311,8 +318,10 @@ TEST_P(MultiFragmentTest, aggregationSingleKey) { leafPlanStats.at(partitionNodeId) .customStats.at(Operator::kShuffleSerdeKind); ASSERT_EQ(serdeKindRuntimsStats.count, 4); - ASSERT_EQ(serdeKindRuntimsStats.min, static_cast(GetParam())); - ASSERT_EQ(serdeKindRuntimsStats.max, static_cast(GetParam())); + ASSERT_EQ( + serdeKindRuntimsStats.min, static_cast(GetParam().serdeKind)); + ASSERT_EQ( + serdeKindRuntimsStats.max, static_cast(GetParam().serdeKind)); for (const auto& finalTask : finalTasks) { auto finalPlanStats = toPlanStats(finalTask->taskStats()); @@ -320,8 +329,10 @@ TEST_P(MultiFragmentTest, aggregationSingleKey) { finalPlanStats.at(exchangeNodeId) .customStats.at(Operator::kShuffleSerdeKind); ASSERT_EQ(serdeKindRuntimsStats.count, 1); - ASSERT_EQ(serdeKindRuntimsStats.min, static_cast(GetParam())); - ASSERT_EQ(serdeKindRuntimsStats.max, static_cast(GetParam())); + ASSERT_EQ( + serdeKindRuntimsStats.min, static_cast(GetParam().serdeKind)); + ASSERT_EQ( + serdeKindRuntimsStats.max, static_cast(GetParam().serdeKind)); } } @@ -336,7 +347,8 @@ TEST_P(MultiFragmentTest, aggregationMultiKey) { .tableScan(rowType_) .project({"c0 % 10 AS c0", "c1 % 2 AS c1", "c2"}) .partialAggregation({"c0", "c1"}, {"sum(c2)"}) - .partitionedOutput({"c0", "c1"}, 3, /*outputLayout=*/{}, GetParam()) + .partitionedOutput( + {"c0", "c1"}, 3, /*outputLayout=*/{}, GetParam().serdeKind) .planNode(); auto leafTask = makeTask(leafTaskId, partialAggPlan, 0); @@ -350,9 +362,9 @@ TEST_P(MultiFragmentTest, aggregationMultiKey) { for (int i = 0; i < 3; i++) { finalAggPlan = PlanBuilder() - .exchange(partialAggPlan->outputType(), GetParam()) + .exchange(partialAggPlan->outputType(), GetParam().serdeKind) .finalAggregation({"c0", "c1"}, {"sum(a0)"}, {{BIGINT()}}) - .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam()) + .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam().serdeKind) .planNode(); finalAggTaskIds.push_back(makeTaskId("final-agg", i)); @@ -362,13 +374,20 @@ TEST_P(MultiFragmentTest, aggregationMultiKey) { addRemoteSplits(task, {leafTaskId}); } - auto op = - PlanBuilder().exchange(finalAggPlan->outputType(), GetParam()).planNode(); + auto op = PlanBuilder() + .exchange(finalAggPlan->outputType(), GetParam().serdeKind) + .planNode(); - assertQuery( - op, - finalAggTaskIds, - "SELECT c0 % 10, c1 % 2, sum(c2) FROM tmp GROUP BY 1, 2"); + std::vector finalAggTaskSplits; + for (auto finalAggTaskId : finalAggTaskIds) { + finalAggTaskSplits.emplace_back(remoteSplit(finalAggTaskId)); + } + test::AssertQueryBuilder(op, duckDbQueryRunner_) + .splits(std::move(finalAggTaskSplits)) + .config( + core::QueryConfig::kShuffleCompressionKind, + std::to_string(GetParam().compressionKind)) + .assertResults("SELECT c0 % 10, c1 % 2, sum(c2) FROM tmp GROUP BY 1, 2"); for (auto& task : tasks) { ASSERT_TRUE(waitForTaskCompletion(task.get())) << task->taskId(); @@ -385,17 +404,22 @@ TEST_P(MultiFragmentTest, distributedTableScan) { PlanBuilder() .tableScan(rowType_) .project({"c0 % 10", "c1 % 2", "c2"}) - .partitionedOutput({}, 1, {"c2", "p1", "p0"}, GetParam()) + .partitionedOutput({}, 1, {"c2", "p1", "p0"}, GetParam().serdeKind) .planNode(); auto leafTask = makeTask(leafTaskId, leafPlan, 0); leafTask->start(4); addHiveSplits(leafTask, filePaths_); - auto op = - PlanBuilder().exchange(leafPlan->outputType(), GetParam()).planNode(); - auto task = - assertQuery(op, {leafTaskId}, "SELECT c2, c1 % 2, c0 % 10 FROM tmp"); + auto op = PlanBuilder() + .exchange(leafPlan->outputType(), GetParam().serdeKind) + .planNode(); + auto task = test::AssertQueryBuilder(op, duckDbQueryRunner_) + .split(remoteSplit(leafTaskId)) + .config( + core::QueryConfig::kShuffleCompressionKind, + std::to_string(GetParam().compressionKind)) + .assertResults("SELECT c2, c1 % 2, c0 % 10 FROM tmp"); verifyExchangeStats(task, 1, 1); @@ -444,7 +468,7 @@ TEST_P(MultiFragmentTest, abortMergeExchange) { .tableScan(rowType_) .orderBy({"c0"}, true) .planNode()}) - .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam()) + .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam().serdeKind) .capturePlanNodeId(partitionNodeId) .planNode(); @@ -465,9 +489,9 @@ TEST_P(MultiFragmentTest, abortMergeExchange) { core::PlanNodeId mergeExchangeId; auto finalSortPlan = PlanBuilder() - .mergeExchange(outputType, {"c0"}, GetParam()) + .mergeExchange(outputType, {"c0"}, GetParam().serdeKind) .capturePlanNodeId(mergeExchangeId) - .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam()) + .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam().serdeKind) .planNode(); auto mergeTask = makeTask(finalSortTaskId, finalSortPlan, 0); tasks.push_back(mergeTask); @@ -517,7 +541,7 @@ TEST_P(MultiFragmentTest, mergeExchange) { .tableScan(rowType_) .orderBy({"c0"}, true) .planNode()}) - .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam()) + .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam().serdeKind) .capturePlanNodeId(partitionNodeId) .planNode(); @@ -532,9 +556,9 @@ TEST_P(MultiFragmentTest, mergeExchange) { core::PlanNodeId mergeExchangeId; auto finalSortPlan = PlanBuilder() - .mergeExchange(outputType, {"c0"}, GetParam()) + .mergeExchange(outputType, {"c0"}, GetParam().serdeKind) .capturePlanNodeId(mergeExchangeId) - .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam()) + .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam().serdeKind) .planNode(); auto mergeTask = makeTask(finalSortTaskId, finalSortPlan, 0); @@ -542,9 +566,15 @@ TEST_P(MultiFragmentTest, mergeExchange) { mergeTask->start(1); addRemoteSplits(mergeTask, partialSortTaskIds); - auto op = PlanBuilder().exchange(outputType, GetParam()).planNode(); - assertQueryOrdered( - op, {finalSortTaskId}, "SELECT * FROM tmp ORDER BY 1 NULLS LAST", {0}); + auto op = PlanBuilder().exchange(outputType, GetParam().serdeKind).planNode(); + + test::AssertQueryBuilder(op, duckDbQueryRunner_) + .split(remoteSplit(finalSortTaskId)) + .config( + core::QueryConfig::kShuffleCompressionKind, + std::to_string(GetParam().compressionKind)) + .assertResults( + "SELECT * FROM tmp ORDER BY 1 NULLS LAST", std::vector{0}); for (auto& task : tasks) { ASSERT_TRUE(waitForTaskCompletion(task.get())) << task->taskId(); @@ -562,8 +592,10 @@ TEST_P(MultiFragmentTest, mergeExchange) { const auto serdeKindRuntimsStats = mergeExchangeStats.customStats.at(Operator::kShuffleSerdeKind); ASSERT_EQ(serdeKindRuntimsStats.count, 1); - ASSERT_EQ(serdeKindRuntimsStats.min, static_cast(GetParam())); - ASSERT_EQ(serdeKindRuntimsStats.max, static_cast(GetParam())); + ASSERT_EQ( + serdeKindRuntimsStats.min, static_cast(GetParam().serdeKind)); + ASSERT_EQ( + serdeKindRuntimsStats.max, static_cast(GetParam().serdeKind)); } // Test reordering and dropping columns in PartitionedOutput operator. @@ -573,16 +605,23 @@ TEST_P(MultiFragmentTest, partitionedOutput) { // Test dropping columns only { auto leafTaskId = makeTaskId("leaf", 0); - auto leafPlan = PlanBuilder() - .values(vectors_) - .partitionedOutput({}, 1, {"c0", "c1"}, GetParam()) - .planNode(); + auto leafPlan = + PlanBuilder() + .values(vectors_) + .partitionedOutput({}, 1, {"c0", "c1"}, GetParam().serdeKind) + .planNode(); auto leafTask = makeTask(leafTaskId, leafPlan, 0); leafTask->start(4); - auto op = - PlanBuilder().exchange(leafPlan->outputType(), GetParam()).planNode(); + auto op = PlanBuilder() + .exchange(leafPlan->outputType(), GetParam().serdeKind) + .planNode(); - assertQuery(op, {leafTaskId}, "SELECT c0, c1 FROM tmp"); + test::AssertQueryBuilder(op, duckDbQueryRunner_) + .split(remoteSplit(leafTaskId)) + .config( + core::QueryConfig::kShuffleCompressionKind, + std::to_string(GetParam().compressionKind)) + .assertResults("SELECT c0, c1 FROM tmp"); ASSERT_TRUE(waitForTaskCompletion(leafTask.get())) << leafTask->taskId(); } @@ -593,14 +632,20 @@ TEST_P(MultiFragmentTest, partitionedOutput) { auto leafPlan = PlanBuilder() .values(vectors_) - .partitionedOutput({}, 1, {"c3", "c0", "c2"}, GetParam()) + .partitionedOutput({}, 1, {"c3", "c0", "c2"}, GetParam().serdeKind) .planNode(); auto leafTask = makeTask(leafTaskId, leafPlan, 0); leafTask->start(4); - auto op = - PlanBuilder().exchange(leafPlan->outputType(), GetParam()).planNode(); + auto op = PlanBuilder() + .exchange(leafPlan->outputType(), GetParam().serdeKind) + .planNode(); - assertQuery(op, {leafTaskId}, "SELECT c3, c0, c2 FROM tmp"); + test::AssertQueryBuilder(op, duckDbQueryRunner_) + .split(remoteSplit(leafTaskId)) + .config( + core::QueryConfig::kShuffleCompressionKind, + std::to_string(GetParam().compressionKind)) + .assertResults("SELECT c3, c0, c2 FROM tmp"); ASSERT_TRUE(waitForTaskCompletion(leafTask.get())) << leafTask->taskId(); } @@ -615,15 +660,20 @@ TEST_P(MultiFragmentTest, partitionedOutput) { {}, 1, {"c0", "c1", "c2", "c3", "c4", "c3", "c2", "c1", "c0"}, - GetParam()) + GetParam().serdeKind) .planNode(); auto leafTask = makeTask(leafTaskId, leafPlan, 0); leafTask->start(4); - auto op = - PlanBuilder().exchange(leafPlan->outputType(), GetParam()).planNode(); + auto op = PlanBuilder() + .exchange(leafPlan->outputType(), GetParam().serdeKind) + .planNode(); - assertQuery( - op, {leafTaskId}, "SELECT c0, c1, c2, c3, c4, c3, c2, c1, c0 FROM tmp"); + test::AssertQueryBuilder(op, duckDbQueryRunner_) + .split(remoteSplit(leafTaskId)) + .config( + core::QueryConfig::kShuffleCompressionKind, + std::to_string(GetParam().compressionKind)) + .assertResults("SELECT c0, c1, c2, c3, c4, c3, c2, c1, c0 FROM tmp"); ASSERT_TRUE(waitForTaskCompletion(leafTask.get())) << leafTask->taskId(); } @@ -635,15 +685,16 @@ TEST_P(MultiFragmentTest, partitionedOutput) { auto leafPlan = PlanBuilder() .values(vectors_) - .partitionedOutput({"c5"}, kFanout, {"c2", "c0", "c3"}, GetParam()) + .partitionedOutput( + {"c5"}, kFanout, {"c2", "c0", "c3"}, GetParam().serdeKind) .planNode(); auto leafTask = makeTask(leafTaskId, leafPlan, 0); leafTask->start(4); auto intermediatePlan = PlanBuilder() - .exchange(leafPlan->outputType(), GetParam()) - .partitionedOutput({}, 1, {"c3", "c0", "c2"}, GetParam()) + .exchange(leafPlan->outputType(), GetParam().serdeKind) + .partitionedOutput({}, 1, {"c3", "c0", "c2"}, GetParam().serdeKind) .planNode(); std::vector intermediateTaskIds; for (auto i = 0; i < kFanout; ++i) { @@ -654,12 +705,21 @@ TEST_P(MultiFragmentTest, partitionedOutput) { addRemoteSplits(intermediateTask, {leafTaskId}); } - auto op = PlanBuilder() - .exchange(intermediatePlan->outputType(), GetParam()) - .planNode(); + auto op = + PlanBuilder() + .exchange(intermediatePlan->outputType(), GetParam().serdeKind) + .planNode(); - auto task = - assertQuery(op, intermediateTaskIds, "SELECT c3, c0, c2 FROM tmp"); + std::vector intermediateSplits; + for (auto intermediateTaskId : intermediateTaskIds) { + intermediateSplits.emplace_back(remoteSplit(intermediateTaskId)); + } + auto task = test::AssertQueryBuilder(op, duckDbQueryRunner_) + .splits(std::move(intermediateSplits)) + .config( + core::QueryConfig::kShuffleCompressionKind, + std::to_string(GetParam().compressionKind)) + .assertResults("SELECT c3, c0, c2 FROM tmp"); verifyExchangeStats(task, kFanout, kFanout); @@ -669,21 +729,23 @@ TEST_P(MultiFragmentTest, partitionedOutput) { // Test dropping all columns. { auto leafTaskId = makeTaskId("leaf", 0); - auto leafPlan = PlanBuilder() - .values(vectors_) - .addNode( - [](std::string nodeId, - core::PlanNodePtr source) -> core::PlanNodePtr { - return core::PartitionedOutputNode::broadcast( - nodeId, 1, ROW({}), GetParam(), source); - }) - .planNode(); + auto leafPlan = + PlanBuilder() + .values(vectors_) + .addNode( + [](std::string nodeId, + core::PlanNodePtr source) -> core::PlanNodePtr { + return core::PartitionedOutputNode::broadcast( + nodeId, 1, ROW({}), GetParam().serdeKind, source); + }) + .planNode(); auto leafTask = makeTask(leafTaskId, leafPlan, 0); leafTask->start(4); leafTask->updateOutputBuffers(1, true); - auto op = - PlanBuilder().exchange(leafPlan->outputType(), GetParam()).planNode(); + auto op = PlanBuilder() + .exchange(leafPlan->outputType(), GetParam().serdeKind) + .planNode(); vector_size_t numRows = 0; for (const auto& vector : vectors_) { @@ -692,6 +754,9 @@ TEST_P(MultiFragmentTest, partitionedOutput) { auto result = AssertQueryBuilder(op) .split(remoteSplit(leafTaskId)) + .config( + core::QueryConfig::kShuffleCompressionKind, + std::to_string(GetParam().compressionKind)) .copyResults(pool()); ASSERT_EQ(*result->type(), *ROW({})); ASSERT_EQ(result->size(), numRows); @@ -702,10 +767,11 @@ TEST_P(MultiFragmentTest, partitionedOutput) { // Test asynchronously deleting task buffer (due to abort from downstream). { auto leafTaskId = makeTaskId("leaf", 0); - auto leafPlan = PlanBuilder() - .values(vectors_) - .partitionedOutput({}, 1, {"c0", "c1"}, GetParam()) - .planNode(); + auto leafPlan = + PlanBuilder() + .values(vectors_) + .partitionedOutput({}, 1, {"c0", "c1"}, GetParam().serdeKind) + .planNode(); auto leafTask = makeTask(leafTaskId, leafPlan, 0); leafTask->start(4); // Delete the results asynchronously to simulate abort from downstream. @@ -731,21 +797,23 @@ TEST_P(MultiFragmentTest, noHashPartitionSkew) { auto producerPlan = PlanBuilder() .values(vectors_) - .partitionedOutput({"c0"}, numPartitions, {"c0", "c1"}, GetParam()) + .partitionedOutput( + {"c0"}, numPartitions, {"c0", "c1"}, GetParam().serdeKind) .planNode(); auto producerTask = makeTask(producerTaskId, producerPlan, 0); producerTask->start(1); core::PlanNodeId partialAggregationNodeId; - auto consumerPlan = PlanBuilder() - .exchange(producerPlan->outputType(), GetParam()) - .localPartition({"c0"}) - .partialAggregation({"c0"}, {"count(1)"}) - .capturePlanNodeId(partialAggregationNodeId) - .localPartition({}) - .finalAggregation() - .singleAggregation({}, {"sum(1)"}) - .planNode(); + auto consumerPlan = + PlanBuilder() + .exchange(producerPlan->outputType(), GetParam().serdeKind) + .localPartition({"c0"}) + .partialAggregation({"c0"}, {"count(1)"}) + .capturePlanNodeId(partialAggregationNodeId) + .localPartition({}) + .finalAggregation() + .singleAggregation({}, {"sum(1)"}) + .planNode(); // This is computed based offline and shouldn't change across runs. const std::vector expectedValues{ @@ -759,6 +827,9 @@ TEST_P(MultiFragmentTest, noHashPartitionSkew) { auto consumerTask = test::AssertQueryBuilder(consumerPlan) .split(remoteSplit(producerTaskId)) .destination(partition) + .config( + core::QueryConfig::kShuffleCompressionKind, + std::to_string(GetParam().compressionKind)) .maxDrivers(numConsumerDriverThreads) .assertResults(expectedResult); @@ -815,21 +886,22 @@ TEST_P(MultiFragmentTest, noHivePartitionSkew) { std::vector{0}, std::vector{}), {"c0", "c1"}, - GetParam()) + GetParam().serdeKind) .planNode(); auto producerTask = makeTask(producerTaskId, producerPlan, 0); producerTask->start(1); core::PlanNodeId partialAggregationNodeId; - auto consumerPlan = PlanBuilder() - .exchange(producerPlan->outputType(), GetParam()) - .localPartition(numBuckets, {0}, {}) - .partialAggregation({"c0"}, {"count(1)"}) - .capturePlanNodeId(partialAggregationNodeId) - .localPartition({}) - .finalAggregation() - .singleAggregation({}, {"sum(1)"}) - .planNode(); + auto consumerPlan = + PlanBuilder() + .exchange(producerPlan->outputType(), GetParam().serdeKind) + .localPartition(numBuckets, {0}, {}) + .partialAggregation({"c0"}, {"count(1)"}) + .capturePlanNodeId(partialAggregationNodeId) + .localPartition({}) + .finalAggregation() + .singleAggregation({}, {"sum(1)"}) + .planNode(); const int numConsumerDriverThreads{4}; const auto runConsumer = [&](int partition) { @@ -840,6 +912,9 @@ TEST_P(MultiFragmentTest, noHivePartitionSkew) { auto consumerTask = test::AssertQueryBuilder(consumerPlan) .split(remoteSplit(producerTaskId)) .destination(partition) + .config( + core::QueryConfig::kShuffleCompressionKind, + std::to_string(GetParam().compressionKind)) .maxDrivers(numConsumerDriverThreads) .assertResults(expectedResult); @@ -880,18 +955,24 @@ TEST_P(MultiFragmentTest, partitionedOutputWithLargeInput) { // Single Partition { auto leafTaskId = makeTaskId("leaf", 0); - auto leafPlan = PlanBuilder() - .values(vectors_) - .partitionedOutput( - {}, 1, {"c0", "c1", "c2", "c3", "c4"}, GetParam()) - .planNode(); + auto leafPlan = + PlanBuilder() + .values(vectors_) + .partitionedOutput( + {}, 1, {"c0", "c1", "c2", "c3", "c4"}, GetParam().serdeKind) + .planNode(); auto leafTask = makeTask(leafTaskId, leafPlan, 0, nullptr, 4 << 20); leafTask->start(1); - auto op = - PlanBuilder().exchange(leafPlan->outputType(), GetParam()).planNode(); + auto op = PlanBuilder() + .exchange(leafPlan->outputType(), GetParam().serdeKind) + .planNode(); - auto task = - assertQuery(op, {leafTaskId}, "SELECT c0, c1, c2, c3, c4 FROM tmp"); + auto task = test::AssertQueryBuilder(op, duckDbQueryRunner_) + .split(remoteSplit(leafTaskId)) + .config( + core::QueryConfig::kShuffleCompressionKind, + std::to_string(GetParam().compressionKind)) + .assertResults("SELECT c0, c1, c2, c3, c4 FROM tmp"); ASSERT_TRUE(waitForTaskCompletion(leafTask.get())) << leafTask->taskId() << "state: " << leafTask->state(); auto taskStats = toPlanStats(leafTask->taskStats()); @@ -911,16 +992,16 @@ TEST_P(MultiFragmentTest, partitionedOutputWithLargeInput) { false, std::make_shared(), {"c0", "c1", "c2", "c3", "c4"}, - GetParam()) + GetParam().serdeKind) .planNode(); auto leafTask = makeTask(leafTaskId, leafPlan, 0); leafTask->start(1); auto intermediatePlan = PlanBuilder() - .exchange(leafPlan->outputType(), GetParam()) + .exchange(leafPlan->outputType(), GetParam().serdeKind) .partitionedOutput( - {}, 1, {"c0", "c1", "c2", "c3", "c4"}, GetParam()) + {}, 1, {"c0", "c1", "c2", "c3", "c4"}, GetParam().serdeKind) .planNode(); std::vector intermediateTaskIds; for (auto i = 0; i < kFanout; ++i) { @@ -931,12 +1012,21 @@ TEST_P(MultiFragmentTest, partitionedOutputWithLargeInput) { addRemoteSplits(intermediateTask, {leafTaskId}); } - auto op = PlanBuilder() - .exchange(intermediatePlan->outputType(), GetParam()) - .planNode(); + auto op = + PlanBuilder() + .exchange(intermediatePlan->outputType(), GetParam().serdeKind) + .planNode(); - auto task = assertQuery( - op, intermediateTaskIds, "SELECT c0, c1, c2, c3, c4 FROM tmp"); + std::vector intermediateSplits; + for (auto intermediateTaskId : intermediateTaskIds) { + intermediateSplits.emplace_back(remoteSplit(intermediateTaskId)); + } + auto task = test::AssertQueryBuilder(op, duckDbQueryRunner_) + .splits(std::move(intermediateSplits)) + .config( + core::QueryConfig::kShuffleCompressionKind, + std::to_string(GetParam().compressionKind)) + .assertResults("SELECT c0, c1, c2, c3, c4 FROM tmp"); ASSERT_TRUE(waitForTaskCompletion(leafTask.get())) << "state: " << leafTask->state(); auto taskStats = toPlanStats(leafTask->taskStats()); @@ -951,11 +1041,11 @@ TEST_P(MultiFragmentTest, broadcast) { // Make leaf task: Values -> Repartitioning (broadcast) std::vector> tasks; auto leafTaskId = makeTaskId("leaf", 0); - auto leafPlan = - PlanBuilder() - .values({data}) - .partitionedOutputBroadcast(/*outputLayout=*/{}, GetParam()) - .planNode(); + auto leafPlan = PlanBuilder() + .values({data}) + .partitionedOutputBroadcast( + /*outputLayout=*/{}, GetParam().serdeKind) + .planNode(); auto leafTask = makeTask(leafTaskId, leafPlan, 0); tasks.emplace_back(leafTask); leafTask->start(1); @@ -966,9 +1056,9 @@ TEST_P(MultiFragmentTest, broadcast) { for (int i = 0; i < 3; i++) { finalAggPlan = PlanBuilder() - .exchange(leafPlan->outputType(), GetParam()) + .exchange(leafPlan->outputType(), GetParam().serdeKind) .singleAggregation({}, {"count(1)"}) - .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam()) + .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam().serdeKind) .planNode(); finalAggTaskIds.push_back(makeTaskId("final-agg", i)); @@ -981,10 +1071,20 @@ TEST_P(MultiFragmentTest, broadcast) { leafTask->updateOutputBuffers(finalAggTaskIds.size(), true); // Collect results from multiple tasks. - auto op = - PlanBuilder().exchange(finalAggPlan->outputType(), GetParam()).planNode(); + auto op = PlanBuilder() + .exchange(finalAggPlan->outputType(), GetParam().serdeKind) + .planNode(); - assertQuery(op, finalAggTaskIds, "SELECT UNNEST(array[1000, 1000, 1000])"); + std::vector finalAggTaskSplits; + for (auto finalAggTaskId : finalAggTaskIds) { + finalAggTaskSplits.emplace_back(remoteSplit(finalAggTaskId)); + } + test::AssertQueryBuilder(op, duckDbQueryRunner_) + .splits(std::move(finalAggTaskSplits)) + .config( + core::QueryConfig::kShuffleCompressionKind, + std::to_string(GetParam().compressionKind)) + .assertResults("SELECT UNNEST(array[1000, 1000, 1000])"); for (auto& task : tasks) { ASSERT_TRUE(waitForTaskCompletion(task.get())) << task->taskId(); @@ -1021,7 +1121,7 @@ TEST_P(MultiFragmentTest, roundRobinPartition) { false, std::make_shared(), /*outputLayout=*/{}, - GetParam()) + GetParam().serdeKind) .planNode(); auto leafTask = makeTask(leafTaskId, leafPlan, 0); @@ -1041,10 +1141,11 @@ TEST_P(MultiFragmentTest, roundRobinPartition) { core::PlanNodePtr collectPlan; std::vector collectTaskIds; for (int i = 0; i < 2; i++) { - collectPlan = PlanBuilder() - .exchange(leafPlan->outputType(), GetParam()) - .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam()) - .planNode(); + collectPlan = + PlanBuilder() + .exchange(leafPlan->outputType(), GetParam().serdeKind) + .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam().serdeKind) + .planNode(); collectTaskIds.push_back(makeTaskId("collect", i)); auto task = makeTask(collectTaskIds.back(), collectPlan, i); @@ -1052,10 +1153,20 @@ TEST_P(MultiFragmentTest, roundRobinPartition) { } // Collect everything. - auto finalPlan = - PlanBuilder().exchange(leafPlan->outputType(), GetParam()).planNode(); + auto finalPlan = PlanBuilder() + .exchange(leafPlan->outputType(), GetParam().serdeKind) + .planNode(); - assertQuery(finalPlan, {collectTaskIds}, "SELECT * FROM tmp"); + std::vector collectTaskSplits; + for (auto collectTaskId : collectTaskIds) { + collectTaskSplits.emplace_back(remoteSplit(collectTaskId)); + } + test::AssertQueryBuilder(finalPlan, duckDbQueryRunner_) + .splits(std::move(collectTaskSplits)) + .config( + core::QueryConfig::kShuffleCompressionKind, + std::to_string(GetParam().compressionKind)) + .assertResults("SELECT * FROM tmp"); for (auto& task : tasks) { ASSERT_TRUE(waitForTaskCompletion(task.get())) << task->taskId(); @@ -1081,11 +1192,11 @@ TEST_P(MultiFragmentTest, constantKeys) { // Make leaf task: Values -> Repartitioning (3-way) auto leafTaskId = makeTaskId("leaf", 0); - auto leafPlan = - PlanBuilder() - .values({data}) - .partitionedOutput({"c0", "123"}, 3, true, {"c0"}, GetParam()) - .planNode(); + auto leafPlan = PlanBuilder() + .values({data}) + .partitionedOutput( + {"c0", "123"}, 3, true, {"c0"}, GetParam().serdeKind) + .planNode(); auto leafTask = makeTask(leafTaskId, leafPlan, 0); addTask(leafTask, {}); @@ -1095,10 +1206,10 @@ TEST_P(MultiFragmentTest, constantKeys) { for (int i = 0; i < 3; i++) { finalAggPlan = PlanBuilder() - .exchange(leafPlan->outputType(), GetParam()) + .exchange(leafPlan->outputType(), GetParam().serdeKind) .project({"c0 is null AS co_is_null"}) .partialAggregation({}, {"count_if(co_is_null)", "count(1)"}) - .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam()) + .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam().serdeKind) .planNode(); finalAggTaskIds.push_back(makeTaskId("final-agg", i)); @@ -1109,15 +1220,22 @@ TEST_P(MultiFragmentTest, constantKeys) { // Collect results and verify number of nulls is 3 times larger than in the // original data. auto op = PlanBuilder() - .exchange(finalAggPlan->outputType(), GetParam()) + .exchange(finalAggPlan->outputType(), GetParam().serdeKind) .finalAggregation( {}, {"sum(a0)", "sum(a1)"}, {{BIGINT()}, {BIGINT()}}) .planNode(); - assertQuery( - op, - finalAggTaskIds, - "SELECT 3 * ceil(1000.0 / 7) /* number of null rows */, 1000 + 2 * ceil(1000.0 / 7) /* total number of rows */"); + std::vector finalAggTaskSplits; + for (auto finalAggTaskId : finalAggTaskIds) { + finalAggTaskSplits.emplace_back(remoteSplit(finalAggTaskId)); + } + test::AssertQueryBuilder(op, duckDbQueryRunner_) + .splits(std::move(finalAggTaskSplits)) + .config( + core::QueryConfig::kShuffleCompressionKind, + std::to_string(GetParam().compressionKind)) + .assertResults( + "SELECT 3 * ceil(1000.0 / 7) /* number of null rows */, 1000 + 2 * ceil(1000.0 / 7) /* total number of rows */"); for (auto& task : tasks) { ASSERT_TRUE(waitForTaskCompletion(task.get())) << task->taskId(); @@ -1143,7 +1261,8 @@ TEST_P(MultiFragmentTest, replicateNullsAndAny) { auto leafPlan = PlanBuilder() .values({data}) - .partitionedOutput({"c0"}, 3, true, /*outputLayout=*/{}, GetParam()) + .partitionedOutput( + {"c0"}, 3, true, /*outputLayout=*/{}, GetParam().serdeKind) .planNode(); auto leafTask = makeTask(leafTaskId, leafPlan, 0); addTask(leafTask, {}); @@ -1154,10 +1273,10 @@ TEST_P(MultiFragmentTest, replicateNullsAndAny) { for (int i = 0; i < 3; i++) { finalAggPlan = PlanBuilder() - .exchange(leafPlan->outputType(), GetParam()) + .exchange(leafPlan->outputType(), GetParam().serdeKind) .project({"c0 is null AS co_is_null"}) .partialAggregation({}, {"count_if(co_is_null)", "count(1)"}) - .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam()) + .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam().serdeKind) .planNode(); finalAggTaskIds.push_back(makeTaskId("final-agg", i)); @@ -1168,15 +1287,22 @@ TEST_P(MultiFragmentTest, replicateNullsAndAny) { // Collect results and verify number of nulls is 3 times larger than in the // original data. auto op = PlanBuilder() - .exchange(finalAggPlan->outputType(), GetParam()) + .exchange(finalAggPlan->outputType(), GetParam().serdeKind) .finalAggregation( {}, {"sum(a0)", "sum(a1)"}, {{BIGINT()}, {BIGINT()}}) .planNode(); - assertQuery( - op, - finalAggTaskIds, - "SELECT 3 * ceil(1000.0 / 7) /* number of null rows */, 1000 + 2 * ceil(1000.0 / 7) /* total number of rows */"); + std::vector finalAggTaskSplits; + for (auto finalAggTaskId : finalAggTaskIds) { + finalAggTaskSplits.emplace_back(remoteSplit(finalAggTaskId)); + } + test::AssertQueryBuilder(op, duckDbQueryRunner_) + .splits(std::move(finalAggTaskSplits)) + .config( + core::QueryConfig::kShuffleCompressionKind, + std::to_string(GetParam().compressionKind)) + .assertResults( + "SELECT 3 * ceil(1000.0 / 7) /* number of null rows */, 1000 + 2 * ceil(1000.0 / 7) /* total number of rows */"); for (auto& task : tasks) { ASSERT_TRUE(waitForTaskCompletion(task.get())) << task->taskId(); @@ -1197,7 +1323,7 @@ TEST_P(MultiFragmentTest, limit) { PlanBuilder() .tableScan(std::dynamic_pointer_cast(data->type())) .limit(0, 10, true) - .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam()) + .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam().serdeKind) .planNode(); auto leafTask = makeTask(leafTaskId, leafPlan, 0); leafTask->start(1); @@ -1207,25 +1333,20 @@ TEST_P(MultiFragmentTest, limit) { // Make final task: Exchange -> FinalLimit(10). auto plan = PlanBuilder() - .exchange(leafPlan->outputType(), GetParam()) + .exchange(leafPlan->outputType(), GetParam().serdeKind) .localPartition(std::vector{}) .limit(0, 10, false) .planNode(); // Expect the task to produce results before receiving no-more-splits message. - bool splitAdded = false; - auto task = ::assertQuery( - plan, - [&](Task* task) { - if (splitAdded) { - return; - } - task->addSplit("0", remoteSplit(leafTaskId)); - splitAdded = true; - }, - "VALUES (null), (1), (2), (3), (4), (5), (6), (null), (8), (9)", - duckDbQueryRunner_); - + auto task = + test::AssertQueryBuilder(plan, duckDbQueryRunner_) + .split(remoteSplit(leafTaskId)) + .config( + core::QueryConfig::kShuffleCompressionKind, + std::to_string(GetParam().compressionKind)) + .assertResults( + "VALUES (null), (1), (2), (3), (4), (5), (6), (null), (8), (9)"); ASSERT_TRUE(waitForTaskCompletion(task.get())) << task->taskId(); ASSERT_TRUE(waitForTaskCompletion(leafTask.get())) << leafTask->taskId(); } @@ -1239,10 +1360,11 @@ TEST_P(MultiFragmentTest, mergeExchangeOverEmptySources) { for (int i = 0; i < 2; ++i) { auto taskId = makeTaskId("leaf-", i); leafTaskIds.push_back(taskId); - auto plan = PlanBuilder() - .values({data}) - .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam()) - .planNode(); + auto plan = + PlanBuilder() + .values({data}) + .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam().serdeKind) + .planNode(); auto task = makeTask(taskId, plan, tasks.size()); tasks.push_back(task); @@ -1251,11 +1373,20 @@ TEST_P(MultiFragmentTest, mergeExchangeOverEmptySources) { auto exchangeTaskId = makeTaskId("exchange-", 0); auto plan = PlanBuilder() - .mergeExchange(rowType_, {"c0"}, GetParam()) + .mergeExchange(rowType_, {"c0"}, GetParam().serdeKind) .singleAggregation({"c0"}, {"count(1)"}) .planNode(); - assertQuery(plan, leafTaskIds, ""); + std::vector leafTaskSplits; + for (auto leafTaskId : leafTaskIds) { + leafTaskSplits.emplace_back(remoteSplit(leafTaskId)); + } + test::AssertQueryBuilder(plan, duckDbQueryRunner_) + .splits(std::move(leafTaskSplits)) + .config( + core::QueryConfig::kShuffleCompressionKind, + std::to_string(GetParam().compressionKind)) + .assertResults(""); for (auto& task : tasks) { ASSERT_TRUE(waitForTaskCompletion(task.get())) << task->taskId(); @@ -1309,7 +1440,8 @@ TEST_P(MultiFragmentTest, earlyCompletion) { auto leafTaskId = makeTaskId("leaf", 0); auto plan = PlanBuilder() .values({data, data, data, data}) - .partitionedOutput({"c0"}, 2, /*outputLayout=*/{}, GetParam()) + .partitionedOutput( + {"c0"}, 2, /*outputLayout=*/{}, GetParam().serdeKind) .planNode(); auto task = makeTask(leafTaskId, plan, tasks.size()); @@ -1329,7 +1461,7 @@ TEST_P(MultiFragmentTest, earlyCompletion) { } auto joinPlan = makeJoinOverExchangePlan( - asRowType(data->type()), buildData, GetParam()); + asRowType(data->type()), buildData, GetParam().serdeKind); joinOutputType = joinPlan->outputType(); @@ -1345,10 +1477,18 @@ TEST_P(MultiFragmentTest, earlyCompletion) { // Create output task. auto outputPlan = - PlanBuilder().exchange(joinOutputType, GetParam()).planNode(); + PlanBuilder().exchange(joinOutputType, GetParam().serdeKind).planNode(); - assertQuery( - outputPlan, joinTaskIds, "SELECT UNNEST([3, 3, 3, 3, 4, 4, 4, 4])"); + std::vector joinTaskSplits; + for (auto joinTaskId : joinTaskIds) { + joinTaskSplits.emplace_back(remoteSplit(joinTaskId)); + } + test::AssertQueryBuilder(outputPlan, duckDbQueryRunner_) + .splits(std::move(joinTaskSplits)) + .config( + core::QueryConfig::kShuffleCompressionKind, + std::to_string(GetParam().compressionKind)) + .assertResults("SELECT UNNEST([3, 3, 3, 3, 4, 4, 4, 4])"); for (auto& task : tasks) { ASSERT_TRUE(waitForTaskCompletion(task.get())) << task->taskId(); @@ -1373,7 +1513,8 @@ TEST_P(MultiFragmentTest, earlyCompletionBroadcast) { auto leafTaskId = makeTaskId("leaf", 0); auto plan = PlanBuilder() .values({data, data, data, data}) - .partitionedOutputBroadcast(/*outputLayout=*/{}, GetParam()) + .partitionedOutputBroadcast( + /*outputLayout=*/{}, GetParam().serdeKind) .planNode(); auto leafTask = makeTask(leafTaskId, plan, tasks.size()); @@ -1393,7 +1534,7 @@ TEST_P(MultiFragmentTest, earlyCompletionBroadcast) { } auto joinPlan = makeJoinOverExchangePlan( - asRowType(data->type()), buildData, GetParam()); + asRowType(data->type()), buildData, GetParam().serdeKind); joinOutputType = joinPlan->outputType(); @@ -1412,9 +1553,18 @@ TEST_P(MultiFragmentTest, earlyCompletionBroadcast) { // Create output task. auto outputPlan = - PlanBuilder().exchange(joinOutputType, GetParam()).planNode(); + PlanBuilder().exchange(joinOutputType, GetParam().serdeKind).planNode(); - assertQuery(outputPlan, joinTaskIds, "SELECT UNNEST([10, 10, 10, 10])"); + std::vector joinTaskSplits; + for (auto joinTaskId : joinTaskIds) { + joinTaskSplits.emplace_back(remoteSplit(joinTaskId)); + } + test::AssertQueryBuilder(outputPlan, duckDbQueryRunner_) + .splits(std::move(joinTaskSplits)) + .config( + core::QueryConfig::kShuffleCompressionKind, + std::to_string(GetParam().compressionKind)) + .assertResults("SELECT UNNEST([10, 10, 10, 10])"); for (auto& task : tasks) { ASSERT_TRUE(waitForTaskCompletion(task.get())) << task->taskId(); @@ -1438,7 +1588,8 @@ TEST_P(MultiFragmentTest, earlyCompletionMerge) { auto leafTaskId = makeTaskId("leaf", 0); auto plan = PlanBuilder() .values({data, data, data, data}) - .partitionedOutput({"c0"}, 2, /*outputLayout=*/{}, GetParam()) + .partitionedOutput( + {"c0"}, 2, /*outputLayout=*/{}, GetParam().serdeKind) .planNode(); auto task = makeTask(leafTaskId, plan, tasks.size()); @@ -1460,14 +1611,15 @@ TEST_P(MultiFragmentTest, earlyCompletionMerge) { auto planNodeIdGenerator = std::make_shared(); auto joinPlan = PlanBuilder(planNodeIdGenerator) - .mergeExchange(asRowType(data->type()), {"c0"}, GetParam()) + .mergeExchange( + asRowType(data->type()), {"c0"}, GetParam().serdeKind) .hashJoin( {"c0"}, {"u_c0"}, PlanBuilder(planNodeIdGenerator).values({buildData}).planNode(), "", {"c0"}) - .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam()) + .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam().serdeKind) .planNode(); joinOutputType = joinPlan->outputType(); @@ -1484,10 +1636,18 @@ TEST_P(MultiFragmentTest, earlyCompletionMerge) { // Create output task. auto outputPlan = - PlanBuilder().exchange(joinOutputType, GetParam()).planNode(); + PlanBuilder().exchange(joinOutputType, GetParam().serdeKind).planNode(); - assertQuery( - outputPlan, joinTaskIds, "SELECT UNNEST([3, 3, 3, 3, 4, 4, 4, 4])"); + std::vector joinTaskSplits; + for (auto joinTaskId : joinTaskIds) { + joinTaskSplits.emplace_back(remoteSplit(joinTaskId)); + } + test::AssertQueryBuilder(outputPlan, duckDbQueryRunner_) + .splits(std::move(joinTaskSplits)) + .config( + core::QueryConfig::kShuffleCompressionKind, + std::to_string(GetParam().compressionKind)) + .assertResults("SELECT UNNEST([3, 3, 3, 3, 4, 4, 4, 4])"); for (auto& task : tasks) { ASSERT_TRUE(waitForTaskCompletion(task.get())) << task->taskId(); @@ -1587,11 +1747,12 @@ TEST_P(MultiFragmentTest, exchangeDestruction) { auto leafTaskId = makeTaskId("leaf", 0); core::PlanNodePtr leafPlan; - leafPlan = PlanBuilder() - .tableScan(rowType_) - .project({"c0 % 10 AS c0", "c1"}) - .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam()) - .planNode(); + leafPlan = + PlanBuilder() + .tableScan(rowType_) + .project({"c0 % 10 AS c0", "c1"}) + .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam().serdeKind) + .planNode(); auto leafTask = makeTask(leafTaskId, leafPlan, 0); leafTask->start(1); @@ -1599,11 +1760,11 @@ TEST_P(MultiFragmentTest, exchangeDestruction) { auto rootPlan = PlanBuilder() - .exchange(leafPlan->outputType(), GetParam()) + .exchange(leafPlan->outputType(), GetParam().serdeKind) .addNode([&leafPlan](std::string id, core::PlanNodePtr node) { return std::make_shared(id, std::move(node)); }) - .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam()) + .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam().serdeKind) .planNode(); auto rootTask = makeTask("root-task", rootPlan, 0); @@ -1623,17 +1784,18 @@ TEST_P(MultiFragmentTest, exchangeDestruction) { TEST_P(MultiFragmentTest, cancelledExchange) { // Create a source fragment borrow the output type from it. - auto planFragment = exec::test::PlanBuilder() - .tableScan(rowType_) - .filter("c0 % 5 = 1") - .partitionedOutput({}, 1, {"c0", "c1"}, GetParam()) - .planFragment(); + auto planFragment = + exec::test::PlanBuilder() + .tableScan(rowType_) + .filter("c0 % 5 = 1") + .partitionedOutput({}, 1, {"c0", "c1"}, GetParam().serdeKind) + .planFragment(); // Create task with exchange. auto planFragmentWithExchange = exec::test::PlanBuilder() - .exchange(planFragment.planNode->outputType(), GetParam()) - .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam()) + .exchange(planFragment.planNode->outputType(), GetParam().serdeKind) + .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam().serdeKind) .planFragment(); auto exchangeTask = makeTask("output.0.0.1", planFragmentWithExchange.planNode, 0); @@ -1733,22 +1895,26 @@ TEST_P(MultiFragmentTest, customPlanNodeWithExchangeClient) { Operator::registerOperator(std::make_unique()); auto leafTaskId = makeTaskId("leaf", 0); core::PlanNodeId partitionNodeId; - auto leafPlan = PlanBuilder() - .values(vectors_) - .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam()) - .capturePlanNodeId(partitionNodeId) - .planNode(); + auto leafPlan = + PlanBuilder() + .values(vectors_) + .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam().serdeKind) + .capturePlanNodeId(partitionNodeId) + .planNode(); auto leafTask = makeTask(leafTaskId, leafPlan, 0); leafTask->start(1); CursorParameters params; + params.queryConfigs.emplace( + core::QueryConfig::kShuffleCompressionKind, + std::to_string(GetParam().compressionKind)); core::PlanNodeId testNodeId; params.maxDrivers = 1; params.planNode = PlanBuilder() .addNode([&leafPlan](std::string id, core::PlanNodePtr /* input */) { return std::make_shared( - id, leafPlan->outputType(), GetParam()); + id, leafPlan->outputType(), GetParam().serdeKind); }) .capturePlanNodeId(testNodeId) .planNode(); @@ -1772,8 +1938,10 @@ TEST_P(MultiFragmentTest, customPlanNodeWithExchangeClient) { const auto serdeKindRuntimsStats = planStats.at(partitionNodeId).customStats.at(Operator::kShuffleSerdeKind); ASSERT_EQ(serdeKindRuntimsStats.count, 1); - ASSERT_EQ(serdeKindRuntimsStats.min, static_cast(GetParam())); - ASSERT_EQ(serdeKindRuntimsStats.max, static_cast(GetParam())); + ASSERT_EQ( + serdeKindRuntimsStats.min, static_cast(GetParam().serdeKind)); + ASSERT_EQ( + serdeKindRuntimsStats.max, static_cast(GetParam().serdeKind)); } // This test is to reproduce the race condition between task terminate and no @@ -1795,7 +1963,7 @@ DEBUG_ONLY_TEST_P( PlanBuilder() .tableScan(rowType_) .project({"c0 % 10 AS c0", "c1"}) - .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam()) + .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam().serdeKind) .planNode(); auto leafTask = makeTask(leafTaskId, leafPlan, 0); leafTask->start(1); @@ -1828,7 +1996,7 @@ DEBUG_ONLY_TEST_P( blockTerminate.await([&]() { return readyToTerminate.load(); }); }))); auto rootPlan = PlanBuilder() - .exchange(leafPlan->outputType(), GetParam()) + .exchange(leafPlan->outputType(), GetParam().serdeKind) .finalAggregation({"c0"}, {"count(c1)"}, {{BIGINT()}}) .planNode(); @@ -1854,10 +2022,11 @@ TEST_P(MultiFragmentTest, taskTerminateWithPendingOutputBuffers) { setupSources(8, 1000); auto taskId = makeTaskId("task", 0); core::PlanNodePtr leafPlan; - leafPlan = PlanBuilder() - .tableScan(rowType_) - .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam()) - .planNode(); + leafPlan = + PlanBuilder() + .tableScan(rowType_) + .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam().serdeKind) + .planNode(); auto task = makeTask(taskId, leafPlan, 0); task->start(1); @@ -1926,19 +2095,20 @@ DEBUG_ONLY_TEST_P( makeRowVector({"p_c0"}, {makeFlatVector({1, 2, 3})}); auto planNodeIdGenerator = std::make_shared(); core::PlanNodeId exchangeNodeId; - auto plan = PlanBuilder(planNodeIdGenerator) - .values({probeData}, true) - .hashJoin( - {"p_c0"}, - {"c0"}, - PlanBuilder(planNodeIdGenerator) - .exchange(rowType_, GetParam()) - .capturePlanNodeId(exchangeNodeId) - .planNode(), - "", - {"c0"}) - .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam()) - .planNode(); + auto plan = + PlanBuilder(planNodeIdGenerator) + .values({probeData}, true) + .hashJoin( + {"p_c0"}, + {"c0"}, + PlanBuilder(planNodeIdGenerator) + .exchange(rowType_, GetParam().serdeKind) + .capturePlanNodeId(exchangeNodeId) + .planNode(), + "", + {"c0"}) + .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam().serdeKind) + .planNode(); auto taskId = makeTaskId("final", 0); auto task = makeTask(taskId, plan, 0); task->start(2); @@ -2007,7 +2177,7 @@ DEBUG_ONLY_TEST_P(MultiFragmentTest, mergeWithEarlyTermination) { .tableScan(rowType_) .orderBy({"c0"}, true) .planNode()}) - .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam()) + .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam().serdeKind) .planNode(); auto partialSortTask = makeTask(sortTaskId, partialSortPlan, 1); @@ -2034,8 +2204,9 @@ DEBUG_ONLY_TEST_P(MultiFragmentTest, mergeWithEarlyTermination) { auto finalSortTaskId = makeTaskId("orderby", 1); auto finalSortPlan = PlanBuilder() - .mergeExchange(partialSortPlan->outputType(), {"c0"}, GetParam()) - .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam()) + .mergeExchange( + partialSortPlan->outputType(), {"c0"}, GetParam().serdeKind) + .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam().serdeKind) .planNode(); auto finalSortTask = makeTask(finalSortTaskId, finalSortPlan, 0); finalSortTask->start(1); @@ -2168,6 +2339,11 @@ class DataFetcher { /// of individual pages. PartitionedOutput operator is expected to limit page /// sizes to no more than 1MB give and take 30%. TEST_P(MultiFragmentTest, maxBytes) { + if (GetParam().compressionKind != common::CompressionKind_NONE) { + // NOTE: different compression generates different serialized byte size so + // only test with no-compression to ease testing.s + return; + } std::string s(25, 'x'); // Keep the row count under 7000 to avoid hitting the row limit in the // operator instead. @@ -2178,11 +2354,12 @@ TEST_P(MultiFragmentTest, maxBytes) { }); core::PlanNodeId outputNodeId; - auto plan = PlanBuilder() - .values({data}, false, 100) - .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam()) - .capturePlanNodeId(outputNodeId) - .planNode(); + auto plan = + PlanBuilder() + .values({data}, false, 100) + .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam().serdeKind) + .capturePlanNodeId(outputNodeId) + .planNode(); int32_t testIteration = 0; DataFetcher::Stats prevStats; @@ -2258,7 +2435,7 @@ DEBUG_ONLY_TEST_P(MultiFragmentTest, exchangeStatsOnFailure) { auto producerPlan = PlanBuilder() .values({data}, false, 30) - .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam()) + .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam().serdeKind) .planNode(); auto producerTaskId = makeTaskId("producer", 0); @@ -2266,8 +2443,9 @@ DEBUG_ONLY_TEST_P(MultiFragmentTest, exchangeStatsOnFailure) { producerTask->start(1); producerTask->updateOutputBuffers(1, true); - auto plan = - PlanBuilder().exchange(producerPlan->outputType(), GetParam()).planNode(); + auto plan = PlanBuilder() + .exchange(producerPlan->outputType(), GetParam().serdeKind) + .planNode(); auto task = makeTask("t", plan, 0, noopConsumer()); task->start(4); @@ -2301,7 +2479,7 @@ TEST_P(MultiFragmentTest, earlyTaskFailure) { .tableScan(rowType_) .orderBy({"c0"}, true) .planNode()}) - .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam()) + .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam().serdeKind) .planNode(); for (bool internalFailure : {false, true}) { SCOPED_TRACE(fmt::format("internalFailure: {}", internalFailure)); @@ -2312,10 +2490,11 @@ TEST_P(MultiFragmentTest, earlyTaskFailure) { auto outputType = partialSortPlan->outputType(); auto finalSortTaskId = makeTaskId("finalSortBy", 0); - auto finalSortPlan = PlanBuilder() - .mergeExchange(outputType, {"c0"}, GetParam()) - .partitionedOutput({}, 1) - .planNode(); + auto finalSortPlan = + PlanBuilder() + .mergeExchange(outputType, {"c0"}, GetParam().serdeKind) + .partitionedOutput({}, 1) + .planNode(); auto finalSortTask = makeTask(finalSortTaskId, finalSortPlan, 0); if (internalFailure) { @@ -2348,16 +2527,18 @@ TEST_P(MultiFragmentTest, mergeSmallBatchesInExchange) { auto data = makeRowVector({makeFlatVector({1, 2, 3})}); const int32_t numPartitions = 100; - auto producerPlan = - test::PlanBuilder() - .values({data}) - .partitionedOutput( - {"c0"}, numPartitions, /*outputLayout=*/{}, GetParam()) - .planNode(); + auto producerPlan = test::PlanBuilder() + .values({data}) + .partitionedOutput( + {"c0"}, + numPartitions, + /*outputLayout=*/{}, + GetParam().serdeKind) + .planNode(); const auto producerTaskId = "local://t1"; auto plan = test::PlanBuilder() - .exchange(asRowType(data->type()), GetParam()) + .exchange(asRowType(data->type()), GetParam().serdeKind) .planNode(); auto expected = makeRowVector({ @@ -2401,12 +2582,12 @@ TEST_P(MultiFragmentTest, mergeSmallBatchesInExchange) { ASSERT_EQ(numPages, stats.customStats.at("numReceivedPages").sum); }; - if (GetParam() == VectorSerde::Kind::kPresto) { + if (GetParam().serdeKind == VectorSerde::Kind::kPresto) { test(1, 1'000); test(1'000, 56); test(10'000, 6); test(100'000, 1); - } else if (GetParam() == VectorSerde::Kind::kCompactRow) { + } else if (GetParam().serdeKind == VectorSerde::Kind::kCompactRow) { test(1, 1'000); test(1'000, 38); test(10'000, 4); @@ -2420,24 +2601,17 @@ TEST_P(MultiFragmentTest, mergeSmallBatchesInExchange) { } TEST_P(MultiFragmentTest, compression) { - bufferManager_->testingSetCompression( - common::CompressionKind::CompressionKind_LZ4); - auto guard = folly::makeGuard([&]() { - bufferManager_->testingSetCompression( - common::CompressionKind::CompressionKind_NONE); - }); - constexpr int32_t kNumRepeats = 1'000'000; const auto data = makeRowVector({makeFlatVector({1, 2, 3})}); const auto producerPlan = test::PlanBuilder() .values({data}, false, kNumRepeats) - .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam()) + .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam().serdeKind) .planNode(); const auto plan = test::PlanBuilder() - .exchange(asRowType(data->type()), GetParam()) + .exchange(asRowType(data->type()), GetParam().serdeKind) .singleAggregation({}, {"sum(c0)"}) .planNode(); @@ -2453,28 +2627,57 @@ TEST_P(MultiFragmentTest, compression) { auto consumerTask = test::AssertQueryBuilder(plan) .split(remoteSplit(producerTaskId)) + .config( + core::QueryConfig::kShuffleCompressionKind, + std::to_string(GetParam().compressionKind)) .destination(0) .assertResults(expected); auto consumerTaskStats = exec::toPlanStats(consumerTask->taskStats()); const auto& consumerPlanStats = consumerTaskStats.at("0"); + ASSERT_EQ( + consumerPlanStats.customStats.at(Operator::kShuffleCompressionKind).min, + static_cast(GetParam().compressionKind)); + ASSERT_EQ( + consumerPlanStats.customStats.at(Operator::kShuffleCompressionKind).max, + static_cast(GetParam().compressionKind)); ASSERT_EQ(data->size() * kNumRepeats, consumerPlanStats.outputRows); auto producerTaskStats = exec::toPlanStats(producerTask->taskStats()); const auto& producerStats = producerTaskStats.at("1"); + ASSERT_EQ( + producerStats.customStats.at(Operator::kShuffleCompressionKind).min, + static_cast(GetParam().compressionKind)); + ASSERT_EQ( + producerStats.customStats.at(Operator::kShuffleCompressionKind).max, + static_cast(GetParam().compressionKind)); + if (GetParam().compressionKind == common::CompressionKind_NONE) { + ASSERT_EQ(producerStats.customStats.at("compressedBytes").sum, 0); + ASSERT_EQ(producerStats.customStats.at("compressionInputBytes").sum, 0); + ASSERT_EQ(producerStats.customStats.at("compressionSkippedBytes").sum, 0); + return; + } // The data is extremely compressible, 1, 2, 3 repeated 1000000 times. if (!expectSkipCompression) { - EXPECT_LT( + ASSERT_LT( producerStats.customStats.at("compressedBytes").sum, producerStats.customStats.at("compressionInputBytes").sum); - EXPECT_EQ(0, producerStats.customStats.at("compressionSkippedBytes").sum); + ASSERT_EQ(0, producerStats.customStats.at("compressionSkippedBytes").sum); } else { - EXPECT_LT(0, producerStats.customStats.at("compressionSkippedBytes").sum); + ASSERT_LT(0, producerStats.customStats.at("compressionSkippedBytes").sum); } }; - test("local://t1", 0.7, false); - test("local://t2", 0.0000001, true); + { + SCOPED_TRACE( + fmt::format("compression kind {}", GetParam().compressionKind)); + { + SCOPED_TRACE(fmt::format("minCompressionRatio 0.7")); + test("local://t1", 0.7, false); + } + SCOPED_TRACE(fmt::format("minCompressionRatio 0.0000001")); + { test("local://t2", 0.0000001, true); } + } } TEST_P(MultiFragmentTest, scaledTableScan) { @@ -2529,7 +2732,7 @@ TEST_P(MultiFragmentTest, scaledTableScan) { .capturePlanNodeId(scanNodeId) .partialAggregation( {"c5"}, {"max(c0)", "sum(c1)", "sum(c2)", "sum(c3)", "sum(c4)"}) - .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam()) + .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam().serdeKind) .planNode(); const auto leafTaskId = "local://leaf-0"; @@ -2540,12 +2743,12 @@ TEST_P(MultiFragmentTest, scaledTableScan) { const auto finalAggPlan = PlanBuilder() - .exchange(leafPlan->outputType(), GetParam()) + .exchange(leafPlan->outputType(), GetParam().serdeKind) .finalAggregation( {"c5"}, {"max(a0)", "sum(a1)", "sum(a2)", "sum(a3)", "sum(a4)"}, {{BIGINT()}, {INTEGER()}, {SMALLINT()}, {REAL()}, {DOUBLE()}}) - .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam()) + .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam().serdeKind) .planNode(); const auto finalAggTaskId = "local://final-agg-0"; @@ -2556,13 +2759,16 @@ TEST_P(MultiFragmentTest, scaledTableScan) { const auto resultPlan = PlanBuilder() - .exchange(finalAggPlan->outputType(), GetParam()) + .exchange(finalAggPlan->outputType(), GetParam().serdeKind) .planNode(); - assertQuery( - resultPlan, - {finalAggTaskId}, - "SELECT c5, max(c0), sum(c1), sum(c2), sum(c3), sum(c4) FROM tmp group by c5"); + test::AssertQueryBuilder(resultPlan, duckDbQueryRunner_) + .split(remoteSplit(finalAggTaskId)) + .config( + core::QueryConfig::kShuffleCompressionKind, + std::to_string(GetParam().compressionKind)) + .assertResults( + "SELECT c5, max(c0), sum(c1), sum(c2), sum(c3), sum(c4) FROM tmp group by c5"); ASSERT_TRUE(waitForTaskCompletion(leafTask.get())) << leafTask->taskId(); ASSERT_TRUE(waitForTaskCompletion(finalAggTask.get())) diff --git a/velox/vector/VectorStream.h b/velox/vector/VectorStream.h index 6094ef7830b8..f9d9b6790c50 100644 --- a/velox/vector/VectorStream.h +++ b/velox/vector/VectorStream.h @@ -419,7 +419,7 @@ class VectorStreamGroup : public StreamArena { RowTypePtr type, VectorSerde* serde, RowVectorPtr* result, - const VectorSerde::Options* options = nullptr); + const VectorSerde::Options* options); void clear() override { StreamArena::clear();