Skip to content

Commit

Permalink
Adds shuffle compression support
Browse files Browse the repository at this point in the history
Summary:
Adds per query shuffle compression support. Currently we configure compression kind in partition output 
buffer manager which enforces all the queries use the same compression kind and assume all the
workers having the same compression kind which is not flexible not align with Presto java as well. This change
removes the compression kind from partition output buffer manager and instead configure it through query config.
Also the shuffle operators report the compression kind.

The followup is to integrate with Prestissimo work by setting LZ4 compression kind if the shuffle compression
session property is set. Note Presto java doesn't allow to configure compression kind to use. Will test in batch shadow.

Differential Revision: D67407045
  • Loading branch information
xiaoxmeng authored and facebook-github-bot committed Dec 19, 2024
1 parent 61b0b39 commit aeda1ba
Show file tree
Hide file tree
Showing 14 changed files with 621 additions and 356 deletions.
12 changes: 12 additions & 0 deletions velox/core/QueryConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
#pragma once

#include "velox/common/compression/Compression.h"
#include "velox/common/config/Config.h"
#include "velox/vector/TypeAliases.h"

Expand Down Expand Up @@ -490,6 +491,11 @@ class QueryConfig {
static constexpr const char* kTableScanScaleUpMemoryUsageRatio =
"table_scan_scale_up_memory_usage_ratio";

/// Specifies the shuffle compression kind which is defined by
/// CompressionKind. If it is CompressionKind_NONE, then no compression.
static constexpr const char* kShuffleCompressionKind =
"shuffle_compression_kind";

bool selectiveNimbleReaderEnabled() const {
return get<bool>(kSelectiveNimbleReaderEnabled, false);
}
Expand Down Expand Up @@ -906,6 +912,12 @@ class QueryConfig {
return get<double>(kTableScanScaleUpMemoryUsageRatio, 0.7);
}

int32_t shuffleCompressionKind() const {
return get<int32_t>(
kShuffleCompressionKind,
static_cast<uint32_t>(common::CompressionKind::CompressionKind_NONE));
}

template <typename T>
T get(const std::string& key, const T& defaultValue) const {
return config_->get<T>(key, defaultValue);
Expand Down
5 changes: 5 additions & 0 deletions velox/docs/configs.rst
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,11 @@ Generic Configuration
- integer
- 16
- Byte length of the string prefix stored in the prefix-sort buffer. This doesn't include the null byte.
* - shuffle_compression_kind
- integer
- 0
- Specifies the shuffle compression kind which is defined by CompressionKind. If it is set to
CompressionKind_NONE (0), then no compression.

.. _expression-evaluation-conf:

Expand Down
5 changes: 5 additions & 0 deletions velox/docs/monitoring/stats.rst
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,11 @@ These stats are reported by shuffle operators.
- Indicates the vector serde kind used by an operator for shuffle with 1
for Presto, 2 for CompactRow, 3 for UnsafeRow. It is reported by Exchange,
MergeExchange and PartitionedOutput operators for now.
* - shuffleCompressionKind
-
- Indicates the compression kind used by an operator for shuffle. The
reported value is set to the corresponding CompressionKind enum with 0
(CompressionKind_NONE) as no compression.

PrefixSort
----------
Expand Down
27 changes: 18 additions & 9 deletions velox/exec/Exchange.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,14 @@ namespace facebook::velox::exec {

namespace {
std::unique_ptr<VectorSerde::Options> getVectorSerdeOptions(
const core::QueryConfig& queryConfig,
VectorSerde::Kind kind) {
std::unique_ptr<VectorSerde::Options> options =
kind == VectorSerde::Kind::kPresto
? std::make_unique<serializer::presto::PrestoVectorSerde::PrestoOptions>()
: std::make_unique<VectorSerde::Options>();
options->compressionKind =
OutputBufferManager::getInstance().lock()->compressionKind();
options->compressionKind = static_cast<common::CompressionKind>(
queryConfig.shuffleCompressionKind());
return options;
}
} // namespace
Expand All @@ -46,8 +47,10 @@ Exchange::Exchange(
operatorType),
preferredOutputBatchBytes_{
driverCtx->queryConfig().preferredOutputBatchBytes()},
serdeKind_(exchangeNode->serdeKind()),
options_{getVectorSerdeOptions(serdeKind_)},
serdeKind_{exchangeNode->serdeKind()},
serdeOptions_{getVectorSerdeOptions(
operatorCtx_->driverCtx()->queryConfig(),
serdeKind_)},
processSplits_{operatorCtx_->driverCtx()->driverId == 0},
exchangeClient_{std::move(exchangeClient)} {}

Expand Down Expand Up @@ -159,7 +162,7 @@ RowVectorPtr Exchange::getOutput() {
outputType_,
&result_,
resultOffset,
options_.get());
serdeOptions_.get());
resultOffset = result_->size();
}
}
Expand All @@ -186,7 +189,7 @@ RowVectorPtr Exchange::getOutput() {
outputType_,
&result_,
resultOffset,
options_.get());
serdeOptions_.get());
// We expect the row-wise deserialization to consume all the input into one
// output vector.
VELOX_CHECK(inputStream->atEnd());
Expand All @@ -212,9 +215,15 @@ void Exchange::close() {
exchangeClient_->close();
}
exchangeClient_ = nullptr;
stats_.wlock()->addRuntimeStat(
Operator::kShuffleSerdeKind,
RuntimeCounter(static_cast<int64_t>(serdeKind_)));
{
auto lockedStats = stats_.wlock();
lockedStats->addRuntimeStat(
Operator::kShuffleSerdeKind,
RuntimeCounter(static_cast<int64_t>(serdeKind_)));
lockedStats->addRuntimeStat(
Operator::kShuffleCompressionKind,
RuntimeCounter(static_cast<int64_t>(serdeOptions_->compressionKind)));
}
}

void Exchange::recordExchangeClientStats() {
Expand Down
2 changes: 1 addition & 1 deletion velox/exec/Exchange.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ class Exchange : public SourceOperator {

const VectorSerde::Kind serdeKind_;

const std::unique_ptr<VectorSerde::Options> options_;
const std::unique_ptr<VectorSerde::Options> serdeOptions_;

/// True if this operator is responsible for fetching splits from the Task
/// and passing these to ExchangeClient.
Expand Down
30 changes: 26 additions & 4 deletions velox/exec/Merge.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,19 @@
using facebook::velox::common::testutil::TestValue;

namespace facebook::velox::exec {
namespace {
std::unique_ptr<VectorSerde::Options> getVectorSerdeOptions(
const core::QueryConfig& queryConfig,
VectorSerde::Kind kind) {
std::unique_ptr<VectorSerde::Options> options =
kind == VectorSerde::Kind::kPresto
? std::make_unique<serializer::presto::PrestoVectorSerde::PrestoOptions>()
: std::make_unique<VectorSerde::Options>();
options->compressionKind = static_cast<common::CompressionKind>(
queryConfig.shuffleCompressionKind());
return options;
}
} // namespace

Merge::Merge(
int32_t operatorId,
Expand Down Expand Up @@ -305,7 +318,10 @@ MergeExchange::MergeExchange(
mergeExchangeNode->sortingOrders(),
mergeExchangeNode->id(),
"MergeExchange"),
serde_(getNamedVectorSerde(mergeExchangeNode->serdeKind())) {}
serde_(getNamedVectorSerde(mergeExchangeNode->serdeKind())),
serdeOptions_(getVectorSerdeOptions(
driverCtx->queryConfig(),
mergeExchangeNode->serdeKind())) {}

BlockingReason MergeExchange::addMergeSources(ContinueFuture* future) {
if (operatorCtx_->driverCtx()->driverId != 0) {
Expand Down Expand Up @@ -370,8 +386,14 @@ void MergeExchange::close() {
source->close();
}
Operator::close();
stats_.wlock()->addRuntimeStat(
Operator::kShuffleSerdeKind,
RuntimeCounter(static_cast<int64_t>(serde_->kind())));
{
auto lockedStats = stats_.wlock();
lockedStats->addRuntimeStat(
Operator::kShuffleSerdeKind,
RuntimeCounter(static_cast<int64_t>(serde_->kind())));
lockedStats->addRuntimeStat(
Operator::kShuffleCompressionKind,
RuntimeCounter(static_cast<int64_t>(serdeOptions_->compressionKind)));
}
}
} // namespace facebook::velox::exec
5 changes: 5 additions & 0 deletions velox/exec/Merge.h
Original file line number Diff line number Diff line change
Expand Up @@ -195,13 +195,18 @@ class MergeExchange : public Merge {
return serde_;
}

VectorSerde::Options* serdeOptions() {
return serdeOptions_.get();
}

void close() override;

protected:
BlockingReason addMergeSources(ContinueFuture* future) override;

private:
VectorSerde* const serde_;
const std::unique_ptr<VectorSerde::Options> serdeOptions_;
bool noMoreSplits_ = false;
// Task Ids from all the splits we took to process so far.
std::vector<std::string> remoteSourceTaskIds_;
Expand Down
3 changes: 2 additions & 1 deletion velox/exec/MergeSource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,8 @@ class MergeExchangeSource : public MergeSource {
mergeExchange_->pool(),
mergeExchange_->outputType(),
mergeExchange_->serde(),
&data);
&data,
mergeExchange_->serdeOptions());

auto lockedStats = mergeExchange_->stats().wlock();
lockedStats->addInputVector(data->estimateFlatSize(), data->size());
Expand Down
5 changes: 5 additions & 0 deletions velox/exec/Operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,11 @@ class Operator : public BaseRuntimeStatWriter {
/// runtime stats value is the corresponding enum value.
static inline const std::string kShuffleSerdeKind{"shuffleSerdeKind"};

/// The compression kind used by an operator for shuffle. The recorded
/// runtime stats value is the corresponding enum value.
static inline const std::string kShuffleCompressionKind{
"shuffleCompressionKind"};

/// 'operatorId' is the initial index of the 'this' in the Driver's list of
/// Operators. This is used as in index into OperatorStats arrays in the Task.
/// 'planNodeId' is a query-level unique identifier of the PlanNode to which
Expand Down
20 changes: 3 additions & 17 deletions velox/exec/OutputBufferManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,11 @@ class OutputBufferManager {
public:
/// Options for shuffle. This is initialized once and affects both
/// PartitionedOutput and Exchange. This can be used for controlling
/// compression, protocol version and other matters where shuffle sides should
/// protocol version and other matters where shuffle sides should
/// agree.
struct Options {
common::CompressionKind compressionKind{
common::CompressionKind::CompressionKind_NONE};
};
struct Options {};

OutputBufferManager(Options options)
: compressionKind_(options.compressionKind) {}
explicit OutputBufferManager(Options /*unused*/) {}

void initializeTask(
std::shared_ptr<Task> task,
Expand Down Expand Up @@ -135,21 +131,11 @@ class OutputBufferManager {
// Returns NULL if task not found.
std::shared_ptr<OutputBuffer> getBufferIfExists(const std::string& taskId);

void testingSetCompression(common::CompressionKind kind) {
*const_cast<common::CompressionKind*>(&compressionKind_) = kind;
}

common::CompressionKind compressionKind() const {
return compressionKind_;
}

private:
// Retrieves the set of buffers for a query.
// Throws an exception if buffer doesn't exist.
std::shared_ptr<OutputBuffer> getBuffer(const std::string& taskId);

const common::CompressionKind compressionKind_;

folly::Synchronized<
std::unordered_map<std::string, std::shared_ptr<OutputBuffer>>,
std::mutex>
Expand Down
29 changes: 19 additions & 10 deletions velox/exec/PartitionedOutput.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,14 @@
namespace facebook::velox::exec {
namespace {
std::unique_ptr<VectorSerde::Options> getVectorSerdeOptions(
const core::QueryConfig& queryConfig,
VectorSerde::Kind kind) {
std::unique_ptr<VectorSerde::Options> options =
kind == VectorSerde::Kind::kPresto
? std::make_unique<serializer::presto::PrestoVectorSerde::PrestoOptions>()
: std::make_unique<VectorSerde::Options>();
options->compressionKind =
OutputBufferManager::getInstance().lock()->compressionKind();
options->compressionKind = static_cast<common::CompressionKind>(
queryConfig.shuffleCompressionKind());
options->minCompressionRatio = PartitionedOutput::minCompressionRatio();
return options;
}
Expand All @@ -38,14 +39,14 @@ Destination::Destination(
const std::string& taskId,
int destination,
VectorSerde* serde,
VectorSerde::Options* options,
VectorSerde::Options* serdeOptions,
memory::MemoryPool* pool,
bool eagerFlush,
std::function<void(uint64_t bytes, uint64_t rows)> recordEnqueued)
: taskId_(taskId),
destination_(destination),
serde_(serde),
options_(options),
serdeOptions_(serdeOptions),
pool_(pool),
eagerFlush_(eagerFlush),
recordEnqueued_(std::move(recordEnqueued)) {
Expand Down Expand Up @@ -89,7 +90,7 @@ BlockingReason Destination::advance(
if (current_ == nullptr) {
current_ = std::make_unique<VectorStreamGroup>(pool_, serde_);
const auto rowType = asRowType(output->type());
current_->createStreamTree(rowType, rowsInCurrent_, options_);
current_->createStreamTree(rowType, rowsInCurrent_, serdeOptions_);
}

const auto rows = folly::Range(&rows_[firstRow], rowIdx_ - firstRow);
Expand Down Expand Up @@ -200,7 +201,9 @@ PartitionedOutput::PartitionedOutput(
.maxPartitionedOutputBufferSize()),
eagerFlush_(eagerFlush),
serde_(getNamedVectorSerde(planNode->serdeKind())),
options_(getVectorSerdeOptions(planNode->serdeKind())) {
serdeOptions_(getVectorSerdeOptions(
operatorCtx_->driverCtx()->queryConfig(),
planNode->serdeKind())) {
if (!planNode->isPartitioned()) {
VELOX_USER_CHECK_EQ(numDestinations_, 1);
}
Expand Down Expand Up @@ -256,7 +259,7 @@ void PartitionedOutput::initializeDestinations() {
taskId,
i,
serde_,
options_.get(),
serdeOptions_.get(),
pool(),
eagerFlush_,
[&](uint64_t bytes, uint64_t rows) {
Expand Down Expand Up @@ -473,9 +476,15 @@ bool PartitionedOutput::isFinished() {

void PartitionedOutput::close() {
Operator::close();
stats_.wlock()->addRuntimeStat(
Operator::kShuffleSerdeKind,
RuntimeCounter(static_cast<int64_t>(serde_->kind())));
{
auto lockedStats = stats_.wlock();
lockedStats->addRuntimeStat(
Operator::kShuffleSerdeKind,
RuntimeCounter(static_cast<int64_t>(serde_->kind())));
lockedStats->addRuntimeStat(
Operator::kShuffleCompressionKind,
RuntimeCounter(static_cast<int64_t>(serdeOptions_->compressionKind)));
}
destinations_.clear();
}

Expand Down
4 changes: 2 additions & 2 deletions velox/exec/PartitionedOutput.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ class Destination {
const std::string taskId_;
const int destination_;
VectorSerde* const serde_;
VectorSerde::Options* const options_;
VectorSerde::Options* const serdeOptions_;
memory::MemoryPool* const pool_;
const bool eagerFlush_;
const std::function<void(uint64_t bytes, uint64_t rows)> recordEnqueued_;
Expand Down Expand Up @@ -220,7 +220,7 @@ class PartitionedOutput : public Operator {
const int64_t maxBufferedBytes_;
const bool eagerFlush_;
VectorSerde* const serde_;
const std::unique_ptr<VectorSerde::Options> options_;
const std::unique_ptr<VectorSerde::Options> serdeOptions_;

BlockingReason blockingReason_{BlockingReason::kNotBlocked};
ContinueFuture future_;
Expand Down
Loading

0 comments on commit aeda1ba

Please sign in to comment.