Skip to content

Commit

Permalink
feat: Adds shuffle compression support (facebookincubator#11914)
Browse files Browse the repository at this point in the history
Summary:

Adds per query shuffle compression support. Currently we configure compression kind in partition output 
buffer manager which enforces all the queries use the same compression kind and assume all the
workers having the same compression kind which is not flexible not align with Presto java as well. This change
removes the compression kind from partition output buffer manager and instead configure it through query config.
Also the shuffle operators report the compression kind.

The followup is to integrate with Prestissimo work by setting LZ4 compression kind if the shuffle compression
session property is set. Note Presto java doesn't allow to configure compression kind to use.

With Meta internal workloads, LZ4 compression kind can reduce e2e execution time by 20% with half
of shuffle data volume reduction

Reviewed By: tanjialiang, oerling

Differential Revision: D67407045
  • Loading branch information
xiaoxmeng authored and facebook-github-bot committed Dec 20, 2024
1 parent 1bf58f4 commit 0aec65c
Show file tree
Hide file tree
Showing 16 changed files with 665 additions and 380 deletions.
10 changes: 10 additions & 0 deletions velox/core/QueryConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
#pragma once

#include "velox/common/compression/Compression.h"
#include "velox/common/config/Config.h"
#include "velox/vector/TypeAliases.h"

Expand Down Expand Up @@ -490,6 +491,11 @@ class QueryConfig {
static constexpr const char* kTableScanScaleUpMemoryUsageRatio =
"table_scan_scale_up_memory_usage_ratio";

/// Specifies the shuffle compression kind which is defined by
/// CompressionKind. If it is CompressionKind_NONE, then no compression.
static constexpr const char* kShuffleCompressionKind =
"shuffle_compression_codec";

bool selectiveNimbleReaderEnabled() const {
return get<bool>(kSelectiveNimbleReaderEnabled, false);
}
Expand Down Expand Up @@ -906,6 +912,10 @@ class QueryConfig {
return get<double>(kTableScanScaleUpMemoryUsageRatio, 0.7);
}

std::string shuffleCompressionKind() const {
return get<std::string>(kShuffleCompressionKind, "none");
}

template <typename T>
T get(const std::string& key, const T& defaultValue) const {
return config_->get<T>(key, defaultValue);
Expand Down
10 changes: 8 additions & 2 deletions velox/docs/configs.rst
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,12 @@ Generic Configuration
- integer
- 16
- Byte length of the string prefix stored in the prefix-sort buffer. This doesn't include the null byte.
* - shuffle_compression_codec
- string
- none
- Specifies the compression algorithm type to compress the shuffle data to
trade CPU for network IO efficiency. The supported compression codecs
are: zlib, snappy, lzo, zstd, lz4 and gzip. none means no compression.

.. _expression-evaluation-conf:

Expand Down Expand Up @@ -355,8 +361,8 @@ Spilling
- string
- none
- Specifies the compression algorithm type to compress the spilled data before write to disk to trade CPU for IO
efficiency. The supported compression codecs are: ZLIB, SNAPPY, LZO, ZSTD, LZ4 and GZIP.
NONE means no compression.
efficiency. The supported compression codecs are: zlib, snappy, lzo, zstd, lz4 and gzip.
none means no compression.
* - spill_prefixsort_enabled
- bool
- false
Expand Down
5 changes: 5 additions & 0 deletions velox/docs/monitoring/stats.rst
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,11 @@ These stats are reported by shuffle operators.
- Indicates the vector serde kind used by an operator for shuffle with 1
for Presto, 2 for CompactRow, 3 for UnsafeRow. It is reported by Exchange,
MergeExchange and PartitionedOutput operators for now.
* - shuffleCompressionKind
-
- Indicates the compression kind used by an operator for shuffle. The
reported value is set to the corresponding CompressionKind enum with 0
(CompressionKind_NONE) as no compression.

PrefixSort
----------
Expand Down
25 changes: 17 additions & 8 deletions velox/exec/Exchange.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,14 @@ namespace facebook::velox::exec {

namespace {
std::unique_ptr<VectorSerde::Options> getVectorSerdeOptions(
const core::QueryConfig& queryConfig,
VectorSerde::Kind kind) {
std::unique_ptr<VectorSerde::Options> options =
kind == VectorSerde::Kind::kPresto
? std::make_unique<serializer::presto::PrestoVectorSerde::PrestoOptions>()
: std::make_unique<VectorSerde::Options>();
options->compressionKind =
OutputBufferManager::getInstance().lock()->compressionKind();
common::stringToCompressionKind(queryConfig.shuffleCompressionKind());
return options;
}
} // namespace
Expand All @@ -46,8 +47,10 @@ Exchange::Exchange(
operatorType),
preferredOutputBatchBytes_{
driverCtx->queryConfig().preferredOutputBatchBytes()},
serdeKind_(exchangeNode->serdeKind()),
options_{getVectorSerdeOptions(serdeKind_)},
serdeKind_{exchangeNode->serdeKind()},
serdeOptions_{getVectorSerdeOptions(
operatorCtx_->driverCtx()->queryConfig(),
serdeKind_)},
processSplits_{operatorCtx_->driverCtx()->driverId == 0},
exchangeClient_{std::move(exchangeClient)} {}

Expand Down Expand Up @@ -159,7 +162,7 @@ RowVectorPtr Exchange::getOutput() {
outputType_,
&result_,
resultOffset,
options_.get());
serdeOptions_.get());
resultOffset = result_->size();
}
}
Expand All @@ -186,7 +189,7 @@ RowVectorPtr Exchange::getOutput() {
outputType_,
&result_,
resultOffset,
options_.get());
serdeOptions_.get());
// We expect the row-wise deserialization to consume all the input into one
// output vector.
VELOX_CHECK(inputStream->atEnd());
Expand All @@ -212,9 +215,15 @@ void Exchange::close() {
exchangeClient_->close();
}
exchangeClient_ = nullptr;
stats_.wlock()->addRuntimeStat(
Operator::kShuffleSerdeKind,
RuntimeCounter(static_cast<int64_t>(serdeKind_)));
{
auto lockedStats = stats_.wlock();
lockedStats->addRuntimeStat(
Operator::kShuffleSerdeKind,
RuntimeCounter(static_cast<int64_t>(serdeKind_)));
lockedStats->addRuntimeStat(
Operator::kShuffleCompressionKind,
RuntimeCounter(static_cast<int64_t>(serdeOptions_->compressionKind)));
}
}

void Exchange::recordExchangeClientStats() {
Expand Down
2 changes: 1 addition & 1 deletion velox/exec/Exchange.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ class Exchange : public SourceOperator {

const VectorSerde::Kind serdeKind_;

const std::unique_ptr<VectorSerde::Options> options_;
const std::unique_ptr<VectorSerde::Options> serdeOptions_;

/// True if this operator is responsible for fetching splits from the Task
/// and passing these to ExchangeClient.
Expand Down
30 changes: 26 additions & 4 deletions velox/exec/Merge.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,19 @@
using facebook::velox::common::testutil::TestValue;

namespace facebook::velox::exec {
namespace {
std::unique_ptr<VectorSerde::Options> getVectorSerdeOptions(
const core::QueryConfig& queryConfig,
VectorSerde::Kind kind) {
std::unique_ptr<VectorSerde::Options> options =
kind == VectorSerde::Kind::kPresto
? std::make_unique<serializer::presto::PrestoVectorSerde::PrestoOptions>()
: std::make_unique<VectorSerde::Options>();
options->compressionKind =
common::stringToCompressionKind(queryConfig.shuffleCompressionKind());
return options;
}
} // namespace

Merge::Merge(
int32_t operatorId,
Expand Down Expand Up @@ -305,7 +318,10 @@ MergeExchange::MergeExchange(
mergeExchangeNode->sortingOrders(),
mergeExchangeNode->id(),
"MergeExchange"),
serde_(getNamedVectorSerde(mergeExchangeNode->serdeKind())) {}
serde_(getNamedVectorSerde(mergeExchangeNode->serdeKind())),
serdeOptions_(getVectorSerdeOptions(
driverCtx->queryConfig(),
mergeExchangeNode->serdeKind())) {}

BlockingReason MergeExchange::addMergeSources(ContinueFuture* future) {
if (operatorCtx_->driverCtx()->driverId != 0) {
Expand Down Expand Up @@ -370,8 +386,14 @@ void MergeExchange::close() {
source->close();
}
Operator::close();
stats_.wlock()->addRuntimeStat(
Operator::kShuffleSerdeKind,
RuntimeCounter(static_cast<int64_t>(serde_->kind())));
{
auto lockedStats = stats_.wlock();
lockedStats->addRuntimeStat(
Operator::kShuffleSerdeKind,
RuntimeCounter(static_cast<int64_t>(serde_->kind())));
lockedStats->addRuntimeStat(
Operator::kShuffleCompressionKind,
RuntimeCounter(static_cast<int64_t>(serdeOptions_->compressionKind)));
}
}
} // namespace facebook::velox::exec
5 changes: 5 additions & 0 deletions velox/exec/Merge.h
Original file line number Diff line number Diff line change
Expand Up @@ -195,13 +195,18 @@ class MergeExchange : public Merge {
return serde_;
}

VectorSerde::Options* serdeOptions() const {
return serdeOptions_.get();
}

void close() override;

protected:
BlockingReason addMergeSources(ContinueFuture* future) override;

private:
VectorSerde* const serde_;
const std::unique_ptr<VectorSerde::Options> serdeOptions_;
bool noMoreSplits_ = false;
// Task Ids from all the splits we took to process so far.
std::vector<std::string> remoteSourceTaskIds_;
Expand Down
3 changes: 2 additions & 1 deletion velox/exec/MergeSource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,8 @@ class MergeExchangeSource : public MergeSource {
mergeExchange_->pool(),
mergeExchange_->outputType(),
mergeExchange_->serde(),
&data);
&data,
mergeExchange_->serdeOptions());

auto lockedStats = mergeExchange_->stats().wlock();
lockedStats->addInputVector(data->estimateFlatSize(), data->size());
Expand Down
5 changes: 5 additions & 0 deletions velox/exec/Operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,11 @@ class Operator : public BaseRuntimeStatWriter {
/// runtime stats value is the corresponding enum value.
static inline const std::string kShuffleSerdeKind{"shuffleSerdeKind"};

/// The compression kind used by an operator for shuffle. The recorded
/// runtime stats value is the corresponding enum value.
static inline const std::string kShuffleCompressionKind{
"shuffleCompressionKind"};

/// 'operatorId' is the initial index of the 'this' in the Driver's list of
/// Operators. This is used as in index into OperatorStats arrays in the Task.
/// 'planNodeId' is a query-level unique identifier of the PlanNode to which
Expand Down
20 changes: 3 additions & 17 deletions velox/exec/OutputBufferManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,11 @@ class OutputBufferManager {
public:
/// Options for shuffle. This is initialized once and affects both
/// PartitionedOutput and Exchange. This can be used for controlling
/// compression, protocol version and other matters where shuffle sides should
/// protocol version and other matters where shuffle sides should
/// agree.
struct Options {
common::CompressionKind compressionKind{
common::CompressionKind::CompressionKind_NONE};
};
struct Options {};

OutputBufferManager(Options options)
: compressionKind_(options.compressionKind) {}
explicit OutputBufferManager(Options /*unused*/) {}

void initializeTask(
std::shared_ptr<Task> task,
Expand Down Expand Up @@ -135,21 +131,11 @@ class OutputBufferManager {
// Returns NULL if task not found.
std::shared_ptr<OutputBuffer> getBufferIfExists(const std::string& taskId);

void testingSetCompression(common::CompressionKind kind) {
*const_cast<common::CompressionKind*>(&compressionKind_) = kind;
}

common::CompressionKind compressionKind() const {
return compressionKind_;
}

private:
// Retrieves the set of buffers for a query.
// Throws an exception if buffer doesn't exist.
std::shared_ptr<OutputBuffer> getBuffer(const std::string& taskId);

const common::CompressionKind compressionKind_;

folly::Synchronized<
std::unordered_map<std::string, std::shared_ptr<OutputBuffer>>,
std::mutex>
Expand Down
27 changes: 18 additions & 9 deletions velox/exec/PartitionedOutput.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,14 @@
namespace facebook::velox::exec {
namespace {
std::unique_ptr<VectorSerde::Options> getVectorSerdeOptions(
const core::QueryConfig& queryConfig,
VectorSerde::Kind kind) {
std::unique_ptr<VectorSerde::Options> options =
kind == VectorSerde::Kind::kPresto
? std::make_unique<serializer::presto::PrestoVectorSerde::PrestoOptions>()
: std::make_unique<VectorSerde::Options>();
options->compressionKind =
OutputBufferManager::getInstance().lock()->compressionKind();
common::stringToCompressionKind(queryConfig.shuffleCompressionKind());
options->minCompressionRatio = PartitionedOutput::minCompressionRatio();
return options;
}
Expand All @@ -38,14 +39,14 @@ Destination::Destination(
const std::string& taskId,
int destination,
VectorSerde* serde,
VectorSerde::Options* options,
VectorSerde::Options* serdeOptions,
memory::MemoryPool* pool,
bool eagerFlush,
std::function<void(uint64_t bytes, uint64_t rows)> recordEnqueued)
: taskId_(taskId),
destination_(destination),
serde_(serde),
options_(options),
serdeOptions_(serdeOptions),
pool_(pool),
eagerFlush_(eagerFlush),
recordEnqueued_(std::move(recordEnqueued)) {
Expand Down Expand Up @@ -89,7 +90,7 @@ BlockingReason Destination::advance(
if (current_ == nullptr) {
current_ = std::make_unique<VectorStreamGroup>(pool_, serde_);
const auto rowType = asRowType(output->type());
current_->createStreamTree(rowType, rowsInCurrent_, options_);
current_->createStreamTree(rowType, rowsInCurrent_, serdeOptions_);
}

const auto rows = folly::Range(&rows_[firstRow], rowIdx_ - firstRow);
Expand Down Expand Up @@ -200,7 +201,9 @@ PartitionedOutput::PartitionedOutput(
.maxPartitionedOutputBufferSize()),
eagerFlush_(eagerFlush),
serde_(getNamedVectorSerde(planNode->serdeKind())),
options_(getVectorSerdeOptions(planNode->serdeKind())) {
serdeOptions_(getVectorSerdeOptions(
operatorCtx_->driverCtx()->queryConfig(),
planNode->serdeKind())) {
if (!planNode->isPartitioned()) {
VELOX_USER_CHECK_EQ(numDestinations_, 1);
}
Expand Down Expand Up @@ -256,7 +259,7 @@ void PartitionedOutput::initializeDestinations() {
taskId,
i,
serde_,
options_.get(),
serdeOptions_.get(),
pool(),
eagerFlush_,
[&](uint64_t bytes, uint64_t rows) {
Expand Down Expand Up @@ -473,9 +476,15 @@ bool PartitionedOutput::isFinished() {

void PartitionedOutput::close() {
Operator::close();
stats_.wlock()->addRuntimeStat(
Operator::kShuffleSerdeKind,
RuntimeCounter(static_cast<int64_t>(serde_->kind())));
{
auto lockedStats = stats_.wlock();
lockedStats->addRuntimeStat(
Operator::kShuffleSerdeKind,
RuntimeCounter(static_cast<int64_t>(serde_->kind())));
lockedStats->addRuntimeStat(
Operator::kShuffleCompressionKind,
RuntimeCounter(static_cast<int64_t>(serdeOptions_->compressionKind)));
}
destinations_.clear();
}

Expand Down
4 changes: 2 additions & 2 deletions velox/exec/PartitionedOutput.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ class Destination {
const std::string taskId_;
const int destination_;
VectorSerde* const serde_;
VectorSerde::Options* const options_;
VectorSerde::Options* const serdeOptions_;
memory::MemoryPool* const pool_;
const bool eagerFlush_;
const std::function<void(uint64_t bytes, uint64_t rows)> recordEnqueued_;
Expand Down Expand Up @@ -220,7 +220,7 @@ class PartitionedOutput : public Operator {
const int64_t maxBufferedBytes_;
const bool eagerFlush_;
VectorSerde* const serde_;
const std::unique_ptr<VectorSerde::Options> options_;
const std::unique_ptr<VectorSerde::Options> serdeOptions_;

BlockingReason blockingReason_{BlockingReason::kNotBlocked};
ContinueFuture future_;
Expand Down
Loading

0 comments on commit 0aec65c

Please sign in to comment.