Skip to content

Commit

Permalink
feat: Support compress UnsafeRow and CompactRow (facebookincubator#11497
Browse files Browse the repository at this point in the history
)

Summary:
Convert the buffers to `folly::IOBuf`, then compress it, and record some stats, skip compression when compressedSize/uncompressedSize exceeds minCompressionRatio with default value 0.8.
Serialization format is:
 | uncompressedSize | compressedSize | compressed | serializedData for Iterator[Row] |
Test the RowVector with all types and size is 500, the test output is as following:

row kind | compression kind | uncompressedSize | compressedSize | compression ratio
-- | -- | -- | -- | --
UnsafeRow    | zlib | 519344 | 227749 | 44%
UnsafeRow    | snappy | 307936 | 115012 | 37%
UnsafeRow    | zstd | 689544 | 273433 | 40%
UnsafeRow    | lz4 | 622688 | 205956 | 33%
UnsafeRow    | gzip | 759608 | 213922 | 28%
CompactRow | zlib | 263474 | 129241 | 49%
CompactRow | snappy | 388313 | 78297 | 20%
CompactRow | zstd | 224144 | 92744 | 41%
CompactRow | lz4 | 110043 | 61615 | 56%
CompactRow | gzip | 224631 | 93989 | 42%

Pull Request resolved: facebookincubator#11497

Test Plan:
With ZSTD compression, the shuffle volume has been reduced by half from 59.25 TB
(20241218_182541_00001_4w82c) down to 40TB (20241219_001045_00001_6rkjb) for row-wise shuffle
which is only one stage of the query
The followup is to integrate with Prestissimo which only support LZ4
and change from process wide to per-query and test with columnar shuffle

Reviewed By: miaoever

Differential Revision: D67380519

Pulled By: xiaoxmeng

fbshipit-source-id: a173422b87b54ff77597c23482595e3558f85387
  • Loading branch information
jinchengchenghh authored and facebook-github-bot committed Dec 19, 2024
1 parent 67e858c commit 0ee82f3
Show file tree
Hide file tree
Showing 17 changed files with 460 additions and 129 deletions.
36 changes: 34 additions & 2 deletions velox/exec/Exchange.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,38 @@

namespace facebook::velox::exec {

namespace {
std::unique_ptr<VectorSerde::Options> getVectorSerdeOptions(
VectorSerde::Kind kind) {
std::unique_ptr<VectorSerde::Options> options =
kind == VectorSerde::Kind::kPresto
? std::make_unique<serializer::presto::PrestoVectorSerde::PrestoOptions>()
: std::make_unique<VectorSerde::Options>();
options->compressionKind =
OutputBufferManager::getInstance().lock()->compressionKind();
return options;
}
} // namespace

Exchange::Exchange(
int32_t operatorId,
DriverCtx* driverCtx,
const std::shared_ptr<const core::ExchangeNode>& exchangeNode,
std::shared_ptr<ExchangeClient> exchangeClient,
const std::string& operatorType)
: SourceOperator(
driverCtx,
exchangeNode->outputType(),
operatorId,
exchangeNode->id(),
operatorType),
preferredOutputBatchBytes_{
driverCtx->queryConfig().preferredOutputBatchBytes()},
serdeKind_(exchangeNode->serdeKind()),
options_{getVectorSerdeOptions(serdeKind_)},
processSplits_{operatorCtx_->driverCtx()->driverId == 0},
exchangeClient_{std::move(exchangeClient)} {}

void Exchange::addTaskIds(std::vector<std::string>& taskIds) {
std::shuffle(std::begin(taskIds), std::end(taskIds), rng_);
for (const std::string& taskId : taskIds) {
Expand Down Expand Up @@ -127,7 +159,7 @@ RowVectorPtr Exchange::getOutput() {
outputType_,
&result_,
resultOffset,
&options_);
options_.get());
resultOffset = result_->size();
}
}
Expand All @@ -154,7 +186,7 @@ RowVectorPtr Exchange::getOutput() {
outputType_,
&result_,
resultOffset,
&options_);
options_.get());
// We expect the row-wise deserialization to consume all the input into one
// output vector.
VELOX_CHECK(inputStream->atEnd());
Expand Down
40 changes: 14 additions & 26 deletions velox/exec/Exchange.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,21 +42,7 @@ class Exchange : public SourceOperator {
DriverCtx* driverCtx,
const std::shared_ptr<const core::ExchangeNode>& exchangeNode,
std::shared_ptr<ExchangeClient> exchangeClient,
const std::string& operatorType = "Exchange")
: SourceOperator(
driverCtx,
exchangeNode->outputType(),
operatorId,
exchangeNode->id(),
operatorType),
preferredOutputBatchBytes_{
driverCtx->queryConfig().preferredOutputBatchBytes()},
serdeKind_(exchangeNode->serdeKind()),
processSplits_{operatorCtx_->driverCtx()->driverId == 0},
exchangeClient_{std::move(exchangeClient)} {
options_.compressionKind =
OutputBufferManager::getInstance().lock()->compressionKind();
}
const std::string& operatorType = "Exchange");

~Exchange() override {
close();
Expand All @@ -76,16 +62,16 @@ class Exchange : public SourceOperator {
private:
// Invoked to create exchange client for remote tasks.
// The function shuffles the source task ids first to randomize the source
// tasks we fetch data from. This helps to avoid different tasks fetching from
// the same source task in a distributed system.
// tasks we fetch data from. This helps to avoid different tasks fetching
// from the same source task in a distributed system.
void addTaskIds(std::vector<std::string>& taskIds);

/// Fetches splits from the task until there are no more splits or task
/// returns a future that will be complete when more splits arrive. Adds
/// splits to exchangeClient_. Returns true if received a future from the task
/// and sets the 'future' parameter. Returns false if fetched all splits or if
/// this operator is not the first operator in the pipeline and therefore is
/// not responsible for fetching splits and adding them to the
/// splits to exchangeClient_. Returns true if received a future from the
/// task and sets the 'future' parameter. Returns false if fetched all
/// splits or if this operator is not the first operator in the pipeline and
/// therefore is not responsible for fetching splits and adding them to the
/// exchangeClient_.
bool getSplits(ContinueFuture* future);

Expand All @@ -97,16 +83,19 @@ class Exchange : public SourceOperator {

const VectorSerde::Kind serdeKind_;

/// True if this operator is responsible for fetching splits from the Task and
/// passing these to ExchangeClient.
const std::unique_ptr<VectorSerde::Options> options_;

/// True if this operator is responsible for fetching splits from the Task
/// and passing these to ExchangeClient.
const bool processSplits_;

bool noMoreSplits_ = false;

std::shared_ptr<ExchangeClient> exchangeClient_;

/// A future received from Task::getSplitOrFuture(). It will be complete when
/// there are more splits available or no-more-splits signal has arrived.
/// A future received from Task::getSplitOrFuture(). It will be complete
/// when there are more splits available or no-more-splits signal has
/// arrived.
ContinueFuture splitFuture_{ContinueFuture::makeEmpty()};

// Reusable result vector.
Expand All @@ -115,7 +104,6 @@ class Exchange : public SourceOperator {
std::vector<std::unique_ptr<SerializedPage>> currentPages_;
bool atEnd_{false};
std::default_random_engine rng_{std::random_device{}()};
serializer::presto::PrestoVectorSerde::PrestoOptions options_;
};

} // namespace facebook::velox::exec
1 change: 1 addition & 0 deletions velox/exec/OperatorTraceReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ class OperatorTraceInputReader {
const serializer::presto::PrestoVectorSerde::PrestoOptions readOptions_{
true,
common::CompressionKind_ZSTD, // TODO: Use trace config.
0.8,
/*_nullsFirst=*/true};
const std::shared_ptr<filesystems::FileSystem> fs_;
const RowTypePtr dataType_;
Expand Down
1 change: 1 addition & 0 deletions velox/exec/OperatorTraceWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ class OperatorTraceInputWriter {
const serializer::presto::PrestoVectorSerde::PrestoOptions options_ = {
true,
common::CompressionKind::CompressionKind_ZSTD,
0.8,
/*nullsFirst=*/true};
const std::shared_ptr<filesystems::FileSystem> fs_;
memory::MemoryPool* const pool_;
Expand Down
46 changes: 36 additions & 10 deletions velox/exec/PartitionedOutput.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,39 @@
#include "velox/exec/Task.h"

namespace facebook::velox::exec {
namespace {
std::unique_ptr<VectorSerde::Options> getVectorSerdeOptions(
VectorSerde::Kind kind) {
std::unique_ptr<VectorSerde::Options> options =
kind == VectorSerde::Kind::kPresto
? std::make_unique<serializer::presto::PrestoVectorSerde::PrestoOptions>()
: std::make_unique<VectorSerde::Options>();
options->compressionKind =
OutputBufferManager::getInstance().lock()->compressionKind();
options->minCompressionRatio = PartitionedOutput::minCompressionRatio();
return options;
}
} // namespace

namespace detail {
Destination::Destination(
const std::string& taskId,
int destination,
VectorSerde* serde,
VectorSerde::Options* options,
memory::MemoryPool* pool,
bool eagerFlush,
std::function<void(uint64_t bytes, uint64_t rows)> recordEnqueued)
: taskId_(taskId),
destination_(destination),
serde_(serde),
options_(options),
pool_(pool),
eagerFlush_(eagerFlush),
recordEnqueued_(std::move(recordEnqueued)) {
setTargetSizePct();
}

BlockingReason Destination::advance(
uint64_t maxBytes,
const std::vector<vector_size_t>& sizes,
Expand Down Expand Up @@ -57,15 +89,7 @@ BlockingReason Destination::advance(
if (current_ == nullptr) {
current_ = std::make_unique<VectorStreamGroup>(pool_, serde_);
const auto rowType = asRowType(output->type());
if (serde_->kind() == VectorSerde::Kind::kPresto) {
serializer::presto::PrestoVectorSerde::PrestoOptions options;
options.compressionKind =
OutputBufferManager::getInstance().lock()->compressionKind();
options.minCompressionRatio = PartitionedOutput::minCompressionRatio();
current_->createStreamTree(rowType, rowsInCurrent_, &options);
} else {
current_->createStreamTree(rowType, rowsInCurrent_);
}
current_->createStreamTree(rowType, rowsInCurrent_, options_);
}

const auto rows = folly::Range(&rows_[firstRow], rowIdx_ - firstRow);
Expand Down Expand Up @@ -175,7 +199,8 @@ PartitionedOutput::PartitionedOutput(
->queryConfig()
.maxPartitionedOutputBufferSize()),
eagerFlush_(eagerFlush),
serde_(getNamedVectorSerde(planNode->serdeKind())) {
serde_(getNamedVectorSerde(planNode->serdeKind())),
options_(getVectorSerdeOptions(planNode->serdeKind())) {
if (!planNode->isPartitioned()) {
VELOX_USER_CHECK_EQ(numDestinations_, 1);
}
Expand Down Expand Up @@ -231,6 +256,7 @@ void PartitionedOutput::initializeDestinations() {
taskId,
i,
serde_,
options_.get(),
pool(),
eagerFlush_,
[&](uint64_t bytes, uint64_t rows) {
Expand Down
13 changes: 4 additions & 9 deletions velox/exec/PartitionedOutput.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,10 @@ class Destination {
const std::string& taskId,
int destination,
VectorSerde* serde,
VectorSerde::Options* options,
memory::MemoryPool* pool,
bool eagerFlush,
std::function<void(uint64_t bytes, uint64_t rows)> recordEnqueued)
: taskId_(taskId),
destination_(destination),
serde_(serde),
pool_(pool),
eagerFlush_(eagerFlush),
recordEnqueued_(std::move(recordEnqueued)) {
setTargetSizePct();
}
std::function<void(uint64_t bytes, uint64_t rows)> recordEnqueued);

/// Resets the destination before starting a new batch.
void beginBatch() {
Expand Down Expand Up @@ -112,6 +105,7 @@ class Destination {
const std::string taskId_;
const int destination_;
VectorSerde* const serde_;
VectorSerde::Options* const options_;
memory::MemoryPool* const pool_;
const bool eagerFlush_;
const std::function<void(uint64_t bytes, uint64_t rows)> recordEnqueued_;
Expand Down Expand Up @@ -226,6 +220,7 @@ class PartitionedOutput : public Operator {
const int64_t maxBufferedBytes_;
const bool eagerFlush_;
VectorSerde* const serde_;
const std::unique_ptr<VectorSerde::Options> options_;

BlockingReason blockingReason_{BlockingReason::kNotBlocked};
ContinueFuture future_;
Expand Down
6 changes: 5 additions & 1 deletion velox/exec/SpillFile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,10 @@ uint64_t SpillWriter::write(
NanosecondTimer timer(&timeNs);
if (batch_ == nullptr) {
serializer::presto::PrestoVectorSerde::PrestoOptions options = {
kDefaultUseLosslessTimestamp, compressionKind_, true /*nullsFirst*/};
kDefaultUseLosslessTimestamp,
compressionKind_,
0.8,
/*nullsFirst=*/true};
batch_ = std::make_unique<VectorStreamGroup>(pool_, serde_);
batch_->createStreamTree(
std::static_pointer_cast<const RowType>(rows->type()),
Expand Down Expand Up @@ -300,6 +303,7 @@ SpillReadFile::SpillReadFile(
readOptions_{
kDefaultUseLosslessTimestamp,
compressionKind_,
0.8,
/*nullsFirst=*/true},
pool_(pool),
serde_(getNamedVectorSerde(VectorSerde::Kind::kPresto)),
Expand Down
10 changes: 3 additions & 7 deletions velox/exec/tests/MultiFragmentTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2408,22 +2408,18 @@ TEST_P(MultiFragmentTest, mergeSmallBatchesInExchange) {
test(100'000, 1);
} else if (GetParam() == VectorSerde::Kind::kCompactRow) {
test(1, 1'000);
test(1'000, 28);
test(10'000, 3);
test(1'000, 38);
test(10'000, 4);
test(100'000, 1);
} else {
test(1, 1'000);
test(1'000, 63);
test(1'000, 72);
test(10'000, 7);
test(100'000, 1);
}
}

TEST_P(MultiFragmentTest, compression) {
// NOTE: only presto format supports compression for now
if (GetParam() != VectorSerde::Kind::kPresto) {
return;
}
bufferManager_->testingSetCompression(
common::CompressionKind::CompressionKind_LZ4);
auto guard = folly::makeGuard([&]() {
Expand Down
15 changes: 9 additions & 6 deletions velox/serializers/CompactRowSerializer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,10 @@ using TRowSize = uint32_t;

class CompactRowVectorSerializer : public RowSerializer<row::CompactRow> {
public:
explicit CompactRowVectorSerializer(memory::MemoryPool* pool)
: RowSerializer<row::CompactRow>(pool) {}
explicit CompactRowVectorSerializer(
memory::MemoryPool* pool,
const VectorSerde::Options* options)
: RowSerializer<row::CompactRow>(pool, options) {}

private:
void serializeRanges(
Expand Down Expand Up @@ -74,20 +76,21 @@ CompactRowVectorSerde::createIterativeSerializer(
RowTypePtr /* type */,
int32_t /* numRows */,
StreamArena* streamArena,
const Options* /* options */) {
return std::make_unique<CompactRowVectorSerializer>(streamArena->pool());
const Options* options) {
return std::make_unique<CompactRowVectorSerializer>(
streamArena->pool(), options);
}

void CompactRowVectorSerde::deserialize(
ByteInputStream* source,
velox::memory::MemoryPool* pool,
RowTypePtr type,
RowVectorPtr* result,
const Options* /* options */) {
const Options* options) {
std::vector<std::string_view> serializedRows;
std::vector<std::unique_ptr<std::string>> serializedBuffers;
RowDeserializer<std::string_view>::deserialize(
source, serializedRows, serializedBuffers);
source, serializedRows, serializedBuffers, options);

if (serializedRows.empty()) {
*result = BaseVector::create<RowVector>(type, 0, pool);
Expand Down
15 changes: 0 additions & 15 deletions velox/serializers/PrestoSerializer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4230,21 +4230,6 @@ class PrestoIterativeVectorSerializer : public IterativeVectorSerializer {
}

private:
struct CompressionStats {
// Number of times compression was not attempted.
int32_t numCompressionSkipped{0};

// uncompressed size for which compression was attempted.
int64_t compressionInputBytes{0};

// Compressed bytes.
int64_t compressedBytes{0};

// Bytes for which compression was not attempted because of past
// non-performance.
int64_t compressionSkippedBytes{0};
};

const SerdeOpts opts_;
StreamArena* const streamArena_;
const std::unique_ptr<folly::io::Codec> codec_;
Expand Down
8 changes: 2 additions & 6 deletions velox/serializers/PrestoSerializer.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,10 @@ class PrestoVectorSerde : public VectorSerde {
PrestoOptions(
bool _useLosslessTimestamp,
common::CompressionKind _compressionKind,
float _minCompressionRatio = 0.8,
bool _nullsFirst = false,
bool _preserveEncodings = false)
: VectorSerde::Options(_compressionKind),
: VectorSerde::Options(_compressionKind, _minCompressionRatio),
useLosslessTimestamp(_useLosslessTimestamp),
nullsFirst(_nullsFirst),
preserveEncodings(_preserveEncodings) {}
Expand All @@ -74,11 +75,6 @@ class PrestoVectorSerde : public VectorSerde {
/// structs.
bool nullsFirst{false};

/// Minimum achieved compression if compression is enabled. Compressing less
/// than this causes subsequent compression attempts to be skipped. The more
/// times compression misses the target the less frequently it is tried.
float minCompressionRatio{0.8};

/// If true, the serializer will not employ any optimizations that can
/// affect the encoding of the input vectors. This is only relevant when
/// using BatchVectorSerializer.
Expand Down
Loading

0 comments on commit 0ee82f3

Please sign in to comment.