diff --git a/velox/exec/Exchange.cpp b/velox/exec/Exchange.cpp index 758957ac2a7c..8dcf41ccc7a8 100644 --- a/velox/exec/Exchange.cpp +++ b/velox/exec/Exchange.cpp @@ -19,6 +19,38 @@ namespace facebook::velox::exec { +namespace { +std::unique_ptr getVectorSerdeOptions( + VectorSerde::Kind kind) { + std::unique_ptr options = + kind == VectorSerde::Kind::kPresto + ? std::make_unique() + : std::make_unique(); + options->compressionKind = + OutputBufferManager::getInstance().lock()->compressionKind(); + return options; +} +} // namespace + +Exchange::Exchange( + int32_t operatorId, + DriverCtx* driverCtx, + const std::shared_ptr& exchangeNode, + std::shared_ptr exchangeClient, + const std::string& operatorType) + : SourceOperator( + driverCtx, + exchangeNode->outputType(), + operatorId, + exchangeNode->id(), + operatorType), + preferredOutputBatchBytes_{ + driverCtx->queryConfig().preferredOutputBatchBytes()}, + serdeKind_(exchangeNode->serdeKind()), + options_{getVectorSerdeOptions(serdeKind_)}, + processSplits_{operatorCtx_->driverCtx()->driverId == 0}, + exchangeClient_{std::move(exchangeClient)} {} + void Exchange::addTaskIds(std::vector& taskIds) { std::shuffle(std::begin(taskIds), std::end(taskIds), rng_); for (const std::string& taskId : taskIds) { @@ -127,7 +159,7 @@ RowVectorPtr Exchange::getOutput() { outputType_, &result_, resultOffset, - &options_); + options_.get()); resultOffset = result_->size(); } } @@ -154,7 +186,7 @@ RowVectorPtr Exchange::getOutput() { outputType_, &result_, resultOffset, - &options_); + options_.get()); // We expect the row-wise deserialization to consume all the input into one // output vector. VELOX_CHECK(inputStream->atEnd()); diff --git a/velox/exec/Exchange.h b/velox/exec/Exchange.h index 0aa69fa9f950..45cdce8ceb59 100644 --- a/velox/exec/Exchange.h +++ b/velox/exec/Exchange.h @@ -42,21 +42,7 @@ class Exchange : public SourceOperator { DriverCtx* driverCtx, const std::shared_ptr& exchangeNode, std::shared_ptr exchangeClient, - const std::string& operatorType = "Exchange") - : SourceOperator( - driverCtx, - exchangeNode->outputType(), - operatorId, - exchangeNode->id(), - operatorType), - preferredOutputBatchBytes_{ - driverCtx->queryConfig().preferredOutputBatchBytes()}, - serdeKind_(exchangeNode->serdeKind()), - processSplits_{operatorCtx_->driverCtx()->driverId == 0}, - exchangeClient_{std::move(exchangeClient)} { - options_.compressionKind = - OutputBufferManager::getInstance().lock()->compressionKind(); - } + const std::string& operatorType = "Exchange"); ~Exchange() override { close(); @@ -76,16 +62,16 @@ class Exchange : public SourceOperator { private: // Invoked to create exchange client for remote tasks. // The function shuffles the source task ids first to randomize the source - // tasks we fetch data from. This helps to avoid different tasks fetching from - // the same source task in a distributed system. + // tasks we fetch data from. This helps to avoid different tasks fetching + // from the same source task in a distributed system. void addTaskIds(std::vector& taskIds); /// Fetches splits from the task until there are no more splits or task /// returns a future that will be complete when more splits arrive. Adds - /// splits to exchangeClient_. Returns true if received a future from the task - /// and sets the 'future' parameter. Returns false if fetched all splits or if - /// this operator is not the first operator in the pipeline and therefore is - /// not responsible for fetching splits and adding them to the + /// splits to exchangeClient_. Returns true if received a future from the + /// task and sets the 'future' parameter. Returns false if fetched all + /// splits or if this operator is not the first operator in the pipeline and + /// therefore is not responsible for fetching splits and adding them to the /// exchangeClient_. bool getSplits(ContinueFuture* future); @@ -97,16 +83,19 @@ class Exchange : public SourceOperator { const VectorSerde::Kind serdeKind_; - /// True if this operator is responsible for fetching splits from the Task and - /// passing these to ExchangeClient. + const std::unique_ptr options_; + + /// True if this operator is responsible for fetching splits from the Task + /// and passing these to ExchangeClient. const bool processSplits_; bool noMoreSplits_ = false; std::shared_ptr exchangeClient_; - /// A future received from Task::getSplitOrFuture(). It will be complete when - /// there are more splits available or no-more-splits signal has arrived. + /// A future received from Task::getSplitOrFuture(). It will be complete + /// when there are more splits available or no-more-splits signal has + /// arrived. ContinueFuture splitFuture_{ContinueFuture::makeEmpty()}; // Reusable result vector. @@ -115,7 +104,6 @@ class Exchange : public SourceOperator { std::vector> currentPages_; bool atEnd_{false}; std::default_random_engine rng_{std::random_device{}()}; - serializer::presto::PrestoVectorSerde::PrestoOptions options_; }; } // namespace facebook::velox::exec diff --git a/velox/exec/OperatorTraceReader.h b/velox/exec/OperatorTraceReader.h index 60860e0e691e..6a522355e512 100644 --- a/velox/exec/OperatorTraceReader.h +++ b/velox/exec/OperatorTraceReader.h @@ -43,6 +43,7 @@ class OperatorTraceInputReader { const serializer::presto::PrestoVectorSerde::PrestoOptions readOptions_{ true, common::CompressionKind_ZSTD, // TODO: Use trace config. + 0.8, /*_nullsFirst=*/true}; const std::shared_ptr fs_; const RowTypePtr dataType_; diff --git a/velox/exec/OperatorTraceWriter.h b/velox/exec/OperatorTraceWriter.h index 94634eaa8ecb..1b6f545920cf 100644 --- a/velox/exec/OperatorTraceWriter.h +++ b/velox/exec/OperatorTraceWriter.h @@ -58,6 +58,7 @@ class OperatorTraceInputWriter { const serializer::presto::PrestoVectorSerde::PrestoOptions options_ = { true, common::CompressionKind::CompressionKind_ZSTD, + 0.8, /*nullsFirst=*/true}; const std::shared_ptr fs_; memory::MemoryPool* const pool_; diff --git a/velox/exec/PartitionedOutput.cpp b/velox/exec/PartitionedOutput.cpp index 67504891ec9e..88663c9ad1e5 100644 --- a/velox/exec/PartitionedOutput.cpp +++ b/velox/exec/PartitionedOutput.cpp @@ -19,7 +19,39 @@ #include "velox/exec/Task.h" namespace facebook::velox::exec { +namespace { +std::unique_ptr getVectorSerdeOptions( + VectorSerde::Kind kind) { + std::unique_ptr options = + kind == VectorSerde::Kind::kPresto + ? std::make_unique() + : std::make_unique(); + options->compressionKind = + OutputBufferManager::getInstance().lock()->compressionKind(); + options->minCompressionRatio = PartitionedOutput::minCompressionRatio(); + return options; +} +} // namespace + namespace detail { +Destination::Destination( + const std::string& taskId, + int destination, + VectorSerde* serde, + VectorSerde::Options* options, + memory::MemoryPool* pool, + bool eagerFlush, + std::function recordEnqueued) + : taskId_(taskId), + destination_(destination), + serde_(serde), + options_(options), + pool_(pool), + eagerFlush_(eagerFlush), + recordEnqueued_(std::move(recordEnqueued)) { + setTargetSizePct(); +} + BlockingReason Destination::advance( uint64_t maxBytes, const std::vector& sizes, @@ -57,15 +89,7 @@ BlockingReason Destination::advance( if (current_ == nullptr) { current_ = std::make_unique(pool_, serde_); const auto rowType = asRowType(output->type()); - if (serde_->kind() == VectorSerde::Kind::kPresto) { - serializer::presto::PrestoVectorSerde::PrestoOptions options; - options.compressionKind = - OutputBufferManager::getInstance().lock()->compressionKind(); - options.minCompressionRatio = PartitionedOutput::minCompressionRatio(); - current_->createStreamTree(rowType, rowsInCurrent_, &options); - } else { - current_->createStreamTree(rowType, rowsInCurrent_); - } + current_->createStreamTree(rowType, rowsInCurrent_, options_); } const auto rows = folly::Range(&rows_[firstRow], rowIdx_ - firstRow); @@ -175,7 +199,8 @@ PartitionedOutput::PartitionedOutput( ->queryConfig() .maxPartitionedOutputBufferSize()), eagerFlush_(eagerFlush), - serde_(getNamedVectorSerde(planNode->serdeKind())) { + serde_(getNamedVectorSerde(planNode->serdeKind())), + options_(getVectorSerdeOptions(planNode->serdeKind())) { if (!planNode->isPartitioned()) { VELOX_USER_CHECK_EQ(numDestinations_, 1); } @@ -231,6 +256,7 @@ void PartitionedOutput::initializeDestinations() { taskId, i, serde_, + options_.get(), pool(), eagerFlush_, [&](uint64_t bytes, uint64_t rows) { diff --git a/velox/exec/PartitionedOutput.h b/velox/exec/PartitionedOutput.h index fecdaf57c9c5..699af6d65590 100644 --- a/velox/exec/PartitionedOutput.h +++ b/velox/exec/PartitionedOutput.h @@ -33,17 +33,10 @@ class Destination { const std::string& taskId, int destination, VectorSerde* serde, + VectorSerde::Options* options, memory::MemoryPool* pool, bool eagerFlush, - std::function recordEnqueued) - : taskId_(taskId), - destination_(destination), - serde_(serde), - pool_(pool), - eagerFlush_(eagerFlush), - recordEnqueued_(std::move(recordEnqueued)) { - setTargetSizePct(); - } + std::function recordEnqueued); /// Resets the destination before starting a new batch. void beginBatch() { @@ -112,6 +105,7 @@ class Destination { const std::string taskId_; const int destination_; VectorSerde* const serde_; + VectorSerde::Options* const options_; memory::MemoryPool* const pool_; const bool eagerFlush_; const std::function recordEnqueued_; @@ -226,6 +220,7 @@ class PartitionedOutput : public Operator { const int64_t maxBufferedBytes_; const bool eagerFlush_; VectorSerde* const serde_; + const std::unique_ptr options_; BlockingReason blockingReason_{BlockingReason::kNotBlocked}; ContinueFuture future_; diff --git a/velox/exec/SpillFile.cpp b/velox/exec/SpillFile.cpp index 6b5e4f464b92..35786be53032 100644 --- a/velox/exec/SpillFile.cpp +++ b/velox/exec/SpillFile.cpp @@ -175,7 +175,10 @@ uint64_t SpillWriter::write( NanosecondTimer timer(&timeNs); if (batch_ == nullptr) { serializer::presto::PrestoVectorSerde::PrestoOptions options = { - kDefaultUseLosslessTimestamp, compressionKind_, true /*nullsFirst*/}; + kDefaultUseLosslessTimestamp, + compressionKind_, + 0.8, + /*nullsFirst=*/true}; batch_ = std::make_unique(pool_, serde_); batch_->createStreamTree( std::static_pointer_cast(rows->type()), @@ -300,6 +303,7 @@ SpillReadFile::SpillReadFile( readOptions_{ kDefaultUseLosslessTimestamp, compressionKind_, + 0.8, /*nullsFirst=*/true}, pool_(pool), serde_(getNamedVectorSerde(VectorSerde::Kind::kPresto)), diff --git a/velox/exec/tests/MultiFragmentTest.cpp b/velox/exec/tests/MultiFragmentTest.cpp index 3baff771ea9d..1a8b3812fc5a 100644 --- a/velox/exec/tests/MultiFragmentTest.cpp +++ b/velox/exec/tests/MultiFragmentTest.cpp @@ -2408,22 +2408,18 @@ TEST_P(MultiFragmentTest, mergeSmallBatchesInExchange) { test(100'000, 1); } else if (GetParam() == VectorSerde::Kind::kCompactRow) { test(1, 1'000); - test(1'000, 28); - test(10'000, 3); + test(1'000, 38); + test(10'000, 4); test(100'000, 1); } else { test(1, 1'000); - test(1'000, 63); + test(1'000, 72); test(10'000, 7); test(100'000, 1); } } TEST_P(MultiFragmentTest, compression) { - // NOTE: only presto format supports compression for now - if (GetParam() != VectorSerde::Kind::kPresto) { - return; - } bufferManager_->testingSetCompression( common::CompressionKind::CompressionKind_LZ4); auto guard = folly::makeGuard([&]() { diff --git a/velox/serializers/CompactRowSerializer.cpp b/velox/serializers/CompactRowSerializer.cpp index 6bddcbbbf3dd..afe5c1248a0a 100644 --- a/velox/serializers/CompactRowSerializer.cpp +++ b/velox/serializers/CompactRowSerializer.cpp @@ -26,8 +26,10 @@ using TRowSize = uint32_t; class CompactRowVectorSerializer : public RowSerializer { public: - explicit CompactRowVectorSerializer(memory::MemoryPool* pool) - : RowSerializer(pool) {} + explicit CompactRowVectorSerializer( + memory::MemoryPool* pool, + const VectorSerde::Options* options) + : RowSerializer(pool, options) {} private: void serializeRanges( @@ -74,8 +76,9 @@ CompactRowVectorSerde::createIterativeSerializer( RowTypePtr /* type */, int32_t /* numRows */, StreamArena* streamArena, - const Options* /* options */) { - return std::make_unique(streamArena->pool()); + const Options* options) { + return std::make_unique( + streamArena->pool(), options); } void CompactRowVectorSerde::deserialize( @@ -83,11 +86,11 @@ void CompactRowVectorSerde::deserialize( velox::memory::MemoryPool* pool, RowTypePtr type, RowVectorPtr* result, - const Options* /* options */) { + const Options* options) { std::vector serializedRows; std::vector> serializedBuffers; RowDeserializer::deserialize( - source, serializedRows, serializedBuffers); + source, serializedRows, serializedBuffers, options); if (serializedRows.empty()) { *result = BaseVector::create(type, 0, pool); diff --git a/velox/serializers/PrestoSerializer.cpp b/velox/serializers/PrestoSerializer.cpp index 7e09c038a4bd..891b6f6a9794 100644 --- a/velox/serializers/PrestoSerializer.cpp +++ b/velox/serializers/PrestoSerializer.cpp @@ -4230,21 +4230,6 @@ class PrestoIterativeVectorSerializer : public IterativeVectorSerializer { } private: - struct CompressionStats { - // Number of times compression was not attempted. - int32_t numCompressionSkipped{0}; - - // uncompressed size for which compression was attempted. - int64_t compressionInputBytes{0}; - - // Compressed bytes. - int64_t compressedBytes{0}; - - // Bytes for which compression was not attempted because of past - // non-performance. - int64_t compressionSkippedBytes{0}; - }; - const SerdeOpts opts_; StreamArena* const streamArena_; const std::unique_ptr codec_; diff --git a/velox/serializers/PrestoSerializer.h b/velox/serializers/PrestoSerializer.h index b6b2385e3d72..f396dea5c1e0 100644 --- a/velox/serializers/PrestoSerializer.h +++ b/velox/serializers/PrestoSerializer.h @@ -54,9 +54,10 @@ class PrestoVectorSerde : public VectorSerde { PrestoOptions( bool _useLosslessTimestamp, common::CompressionKind _compressionKind, + float _minCompressionRatio = 0.8, bool _nullsFirst = false, bool _preserveEncodings = false) - : VectorSerde::Options(_compressionKind), + : VectorSerde::Options(_compressionKind, _minCompressionRatio), useLosslessTimestamp(_useLosslessTimestamp), nullsFirst(_nullsFirst), preserveEncodings(_preserveEncodings) {} @@ -74,11 +75,6 @@ class PrestoVectorSerde : public VectorSerde { /// structs. bool nullsFirst{false}; - /// Minimum achieved compression if compression is enabled. Compressing less - /// than this causes subsequent compression attempts to be skipped. The more - /// times compression misses the target the less frequently it is tried. - float minCompressionRatio{0.8}; - /// If true, the serializer will not employ any optimizations that can /// affect the encoding of the input vectors. This is only relevant when /// using BatchVectorSerializer. diff --git a/velox/serializers/RowSerializer.h b/velox/serializers/RowSerializer.h index 7fd0c3b12507..ae062b658fd7 100644 --- a/velox/serializers/RowSerializer.h +++ b/velox/serializers/RowSerializer.h @@ -23,10 +23,52 @@ namespace facebook::velox::serializer { using TRowSize = uint32_t; +namespace detail { +struct RowHeader { + int32_t uncompressedSize; + int32_t compressedSize; + bool compressed; + + static RowHeader read(ByteInputStream* source) { + RowHeader header; + header.uncompressedSize = source->read(); + header.compressedSize = source->read(); + header.compressed = source->read(); + + VELOX_CHECK_GE(header.uncompressedSize, 0); + VELOX_CHECK_GE(header.compressedSize, 0); + + return header; + } + + void write(OutputStream* out) { + out->write(reinterpret_cast(&uncompressedSize), sizeof(int32_t)); + out->write(reinterpret_cast(&compressedSize), sizeof(int32_t)); + const char writeValue = compressed ? 1 : 0; + out->write(reinterpret_cast(&writeValue), sizeof(char)); + } + + static size_t size() { + return sizeof(int32_t) * 2 + sizeof(char); + } + + std::string debugString() const { + return fmt::format( + "uncompressedSize: {}, compressedSize: {}, compressed: {}", + succinctBytes(uncompressedSize), + succinctBytes(compressedSize), + compressed); + } +}; +} // namespace detail + template class RowSerializer : public IterativeVectorSerializer { public: - explicit RowSerializer(memory::MemoryPool* pool) : pool_(pool) {} + RowSerializer(memory::MemoryPool* pool, const VectorSerde::Options* options) + : pool_(pool), + options_(options == nullptr ? VectorSerde::Options() : *options), + codec_(common::compressionKindToCodec(options_.compressionKind)) {} void append( const RowVectorPtr& vector, @@ -99,20 +141,69 @@ class RowSerializer : public IterativeVectorSerializer { } size_t maxSerializedSize() const override { - size_t totalSize = 0; - for (const auto& buffer : buffers_) { - totalSize += buffer->size(); + const auto size = uncompressedSize(); + if (!needCompression()) { + return detail::RowHeader::size() + size; } - return totalSize; + VELOX_CHECK_LE( + size, + codec_->maxUncompressedLength(), + "UncompressedSize exceeds limit"); + return detail::RowHeader::size() + codec_->maxCompressedLength(size); } + /// The serialization format is | uncompressedSize | compressedSize | + /// compressed | data. void flush(OutputStream* stream) override { - for (const auto& buffer : buffers_) { - stream->write(buffer->template asMutable(), buffer->size()); + constexpr int32_t kMaxCompressionAttemptsToSkip = 30; + const auto size = uncompressedSize(); + if (!needCompression()) { + flushUncompressed(size, stream); + } else if (numCompressionToSkip_ > 0) { + flushUncompressed(size, stream); + stats_.compressionSkippedBytes += size; + --numCompressionToSkip_; + ++stats_.numCompressionSkipped; + } else { + // Compress the buffer if satisfied condition. + const auto toCompress = toIOBuf(buffers_); + const auto compressedBuffer = codec_->compress(toCompress.get()); + const int32_t compressedSize = compressedBuffer->length(); + stats_.compressionInputBytes += size; + stats_.compressedBytes += compressedSize; + if (compressedSize > options_.minCompressionRatio * size) { + // Skip this compression. + numCompressionToSkip_ = std::min( + kMaxCompressionAttemptsToSkip, 1 + stats_.numCompressionSkipped); + flushUncompressed(size, stream); + } else { + // Do the compression. + detail::RowHeader header = {size, compressedSize, true}; + header.write(stream); + for (auto range : *compressedBuffer) { + stream->write( + reinterpret_cast(range.data()), range.size()); + } + } } + buffers_.clear(); } + std::unordered_map runtimeStats() override { + std::unordered_map map; + map.insert( + {{"compressedBytes", + RuntimeCounter(stats_.compressedBytes, RuntimeCounter::Unit::kBytes)}, + {"compressionInputBytes", + RuntimeCounter( + stats_.compressionInputBytes, RuntimeCounter::Unit::kBytes)}, + {"compressionSkippedBytes", + RuntimeCounter( + stats_.compressionSkippedBytes, RuntimeCounter::Unit::kBytes)}}); + return map; + } + void clear() override {} protected: @@ -137,6 +228,47 @@ class RowSerializer : public IterativeVectorSerializer { memory::MemoryPool* const pool_; std::vector buffers_; + + private: + std::unique_ptr toIOBuf(const std::vector& buffers) { + std::unique_ptr iobuf; + for (const auto& buffer : buffers) { + auto newBuf = + folly::IOBuf::wrapBuffer(buffer->asMutable(), buffer->size()); + if (iobuf) { + iobuf->prev()->appendChain(std::move(newBuf)); + } else { + iobuf = std::move(newBuf); + } + } + return iobuf; + } + + int32_t uncompressedSize() const { + int32_t totalSize = 0; + for (const auto& buffer : buffers_) { + totalSize += buffer->size(); + } + return totalSize; + } + + bool needCompression() const { + return codec_->type() != folly::io::CodecType::NO_COMPRESSION; + } + + void flushUncompressed(int32_t size, OutputStream* stream) { + detail::RowHeader header = {size, size, false}; + header.write(stream); + for (const auto& buffer : buffers_) { + stream->write(buffer->template asMutable(), buffer->size()); + } + } + + const VectorSerde::Options options_; + const std::unique_ptr codec_; + // Count of forthcoming compressions to skip. + int32_t numCompressionToSkip_{0}; + CompressionStats stats_; }; template @@ -145,25 +277,60 @@ class RowDeserializer { static void deserialize( ByteInputStream* source, std::vector& serializedRows, - std::vector>& serializedBuffers) { + std::vector>& serializedBuffers, + const VectorSerde::Options* options) { + const auto compressionKind = options == nullptr + ? VectorSerde::Options().compressionKind + : options->compressionKind; while (!source->atEnd()) { - // First read row size in big endian order. - const auto rowSize = folly::Endian::big(source->read()); - auto serializedBuffer = std::make_unique(); - serializedBuffer->reserve(rowSize); - - const auto row = source->nextView(rowSize); - serializedBuffer->append(row.data(), row.size()); - // If we couldn't read the entire row at once, we need to concatenate it - // in a different buffer. - if (serializedBuffer->size() < rowSize) { - concatenatePartialRow(source, rowSize, *serializedBuffer); + std::unique_ptr uncompressedBuf; + const auto header = detail::RowHeader::read(source); + if (header.compressed) { + VELOX_DCHECK_NE( + compressionKind, common::CompressionKind::CompressionKind_NONE); + auto compressBuf = folly::IOBuf::create(header.compressedSize); + source->readBytes(compressBuf->writableData(), header.compressedSize); + compressBuf->append(header.compressedSize); + + // Process chained uncompressed results IOBufs. + const auto codec = common::compressionKindToCodec(compressionKind); + uncompressedBuf = + codec->uncompress(compressBuf.get(), header.uncompressedSize); } - VELOX_CHECK_EQ(serializedBuffer->size(), rowSize); - serializedBuffers.emplace_back(std::move(serializedBuffer)); - serializedRows.push_back(std::string_view( - serializedBuffers.back()->data(), serializedBuffers.back()->size())); + std::unique_ptr uncompressedStream; + ByteInputStream* uncompressedSource{nullptr}; + if (uncompressedBuf == nullptr) { + uncompressedSource = source; + } else { + uncompressedStream = std::make_unique( + byteRangesFromIOBuf(uncompressedBuf.get())); + uncompressedSource = uncompressedStream.get(); + } + const std::streampos initialSize = uncompressedSource->tellp(); + while (uncompressedSource->tellp() - initialSize < + header.uncompressedSize) { + // First read row size in big endian order. + const auto rowSize = + folly::Endian::big(uncompressedSource->read()); + + auto serializedBuffer = std::make_unique(); + serializedBuffer->reserve(rowSize); + + const auto row = uncompressedSource->nextView(rowSize); + serializedBuffer->append(row.data(), row.size()); + // If we couldn't read the entire row at once, we need to concatenate it + // in a different buffer. + if (serializedBuffer->size() < rowSize) { + concatenatePartialRow(uncompressedSource, rowSize, *serializedBuffer); + } + + VELOX_CHECK_EQ(serializedBuffer->size(), rowSize); + serializedBuffers.emplace_back(std::move(serializedBuffer)); + serializedRows.push_back(std::string_view( + serializedBuffers.back()->data(), + serializedBuffers.back()->size())); + } } } diff --git a/velox/serializers/UnsafeRowSerializer.cpp b/velox/serializers/UnsafeRowSerializer.cpp index ee3a1c069bfe..5dda76c58042 100644 --- a/velox/serializers/UnsafeRowSerializer.cpp +++ b/velox/serializers/UnsafeRowSerializer.cpp @@ -33,9 +33,9 @@ UnsafeRowVectorSerde::createIterativeSerializer( RowTypePtr /* type */, int32_t /* numRows */, StreamArena* streamArena, - const Options* /* options */) { + const Options* options) { return std::make_unique>( - streamArena->pool()); + streamArena->pool(), options); } void UnsafeRowVectorSerde::deserialize( @@ -43,11 +43,11 @@ void UnsafeRowVectorSerde::deserialize( velox::memory::MemoryPool* pool, RowTypePtr type, RowVectorPtr* result, - const Options* /* options */) { + const Options* options) { std::vector> serializedRows; std::vector> serializedBuffers; RowDeserializer>::deserialize( - source, serializedRows, serializedBuffers); + source, serializedRows, serializedBuffers, options); if (serializedRows.empty()) { *result = BaseVector::create(type, 0, pool); diff --git a/velox/serializers/tests/CompactRowSerializerTest.cpp b/velox/serializers/tests/CompactRowSerializerTest.cpp index 1de38c781165..af3d6fe9ffad 100644 --- a/velox/serializers/tests/CompactRowSerializerTest.cpp +++ b/velox/serializers/tests/CompactRowSerializerTest.cpp @@ -23,9 +23,29 @@ namespace facebook::velox::serializer { namespace { +struct TestParam { + common::CompressionKind compressionKind; + bool appendRow; + + TestParam(common::CompressionKind _compressionKind, bool _appendRow) + : compressionKind(_compressionKind), appendRow(_appendRow) {} +}; + class CompactRowSerializerTest : public ::testing::Test, public velox::test::VectorTestBase, - public testing::WithParamInterface { + public testing::WithParamInterface { + public: + static std::vector getTestParams() { + static std::vector testParams = { + {common::CompressionKind::CompressionKind_NONE, false}, + {common::CompressionKind::CompressionKind_ZLIB, true}, + {common::CompressionKind::CompressionKind_SNAPPY, false}, + {common::CompressionKind::CompressionKind_ZSTD, true}, + {common::CompressionKind::CompressionKind_LZ4, false}, + {common::CompressionKind::CompressionKind_GZIP, true}}; + return testParams; + } + protected: static void SetUpTestCase() { memory::MemoryManager::testingSetInstance({}); @@ -41,6 +61,9 @@ class CompactRowSerializerTest : public ::testing::Test, ASSERT_EQ( getNamedVectorSerde(VectorSerde::Kind::kCompactRow)->kind(), VectorSerde::Kind::kCompactRow); + appendRow_ = GetParam().appendRow; + compressionKind_ = GetParam().compressionKind; + options_ = std::make_unique(compressionKind_, 0.8); } void TearDown() override { @@ -56,7 +79,7 @@ class CompactRowSerializerTest : public ::testing::Test, vector_size_t offset = 0; vector_size_t rangeSize = 1; std::unique_ptr compactRow; - if (GetParam()) { + if (appendRow_) { compactRow = std::make_unique(rowVector); } while (offset < numRows) { @@ -69,10 +92,10 @@ class CompactRowSerializerTest : public ::testing::Test, auto arena = std::make_unique(pool_.get()); auto rowType = asRowType(rowVector->type()); auto serializer = getVectorSerde()->createIterativeSerializer( - rowType, numRows, arena.get()); + rowType, numRows, arena.get(), options_.get()); Scratch scratch; - if (GetParam()) { + if (appendRow_) { std::vector serializedRowSizes(numRows); std::vector serializedRowSizesPtr(numRows); for (auto i = 0; i < numRows; ++i) { @@ -97,7 +120,11 @@ class CompactRowSerializerTest : public ::testing::Test, auto size = serializer->maxSerializedSize(); OStreamOutputStream out(output); serializer->flush(&out); - ASSERT_EQ(size, output->tellp()); + if (!needCompression()) { + ASSERT_EQ(size, output->tellp()); + } else { + ASSERT_GT(size, output->tellp()); + } } std::unique_ptr toByteStream( @@ -127,7 +154,7 @@ class CompactRowSerializerTest : public ::testing::Test, RowVectorPtr result; getVectorSerde()->deserialize( - byteStream.get(), pool_.get(), rowType, &result); + byteStream.get(), pool_.get(), rowType, &result, options_.get()); return result; } @@ -141,6 +168,15 @@ class CompactRowSerializerTest : public ::testing::Test, } std::shared_ptr pool_; + + private: + bool needCompression() { + return compressionKind_ != common::CompressionKind::CompressionKind_NONE; + } + + common::CompressionKind compressionKind_; + std::unique_ptr options_; + bool appendRow_; }; TEST_P(CompactRowSerializerTest, fuzz) { @@ -187,6 +223,6 @@ TEST_P(CompactRowSerializerTest, fuzz) { VELOX_INSTANTIATE_TEST_SUITE_P( CompactRowSerializerTest, CompactRowSerializerTest, - testing::Values(false, true)); + testing::ValuesIn(CompactRowSerializerTest::getTestParams())); } // namespace } // namespace facebook::velox::serializer diff --git a/velox/serializers/tests/PrestoSerializerTest.cpp b/velox/serializers/tests/PrestoSerializerTest.cpp index 7dc4859abbf0..65bd73a555dd 100644 --- a/velox/serializers/tests/PrestoSerializerTest.cpp +++ b/velox/serializers/tests/PrestoSerializerTest.cpp @@ -120,7 +120,7 @@ class PrestoSerializerTest const bool preserveEncodings = serdeOptions == nullptr ? false : serdeOptions->preserveEncodings; serializer::presto::PrestoVectorSerde::PrestoOptions paramOptions{ - useLosslessTimestamp, kind, nullsFirst, preserveEncodings}; + useLosslessTimestamp, kind, 0.8, nullsFirst, preserveEncodings}; return paramOptions; } diff --git a/velox/serializers/tests/UnsafeRowSerializerTest.cpp b/velox/serializers/tests/UnsafeRowSerializerTest.cpp index b4e472b8e236..61b28acaaca2 100644 --- a/velox/serializers/tests/UnsafeRowSerializerTest.cpp +++ b/velox/serializers/tests/UnsafeRowSerializerTest.cpp @@ -23,9 +23,29 @@ using namespace facebook; using namespace facebook::velox; +struct TestParam { + common::CompressionKind compressionKind; + bool appendRow; + + TestParam(common::CompressionKind _compressionKind, bool _appendRow) + : compressionKind(_compressionKind), appendRow(_appendRow) {} +}; + class UnsafeRowSerializerTest : public ::testing::Test, public velox::test::VectorTestBase, - public testing::WithParamInterface { + public testing::WithParamInterface { + public: + static std::vector getTestParams() { + static std::vector testParams = { + {common::CompressionKind::CompressionKind_NONE, false}, + {common::CompressionKind::CompressionKind_ZLIB, true}, + {common::CompressionKind::CompressionKind_SNAPPY, false}, + {common::CompressionKind::CompressionKind_ZSTD, true}, + {common::CompressionKind::CompressionKind_LZ4, false}, + {common::CompressionKind::CompressionKind_GZIP, true}}; + return testParams; + } + protected: static void SetUpTestCase() { memory::MemoryManager::testingSetInstance({}); @@ -41,6 +61,9 @@ class UnsafeRowSerializerTest : public ::testing::Test, ASSERT_EQ( getNamedVectorSerde(VectorSerde::Kind::kUnsafeRow)->kind(), VectorSerde::Kind::kUnsafeRow); + appendRow_ = GetParam().appendRow; + compressionKind_ = GetParam().compressionKind; + options_ = std::make_unique(compressionKind_, 0.8); } void TearDown() override { @@ -49,6 +72,7 @@ class UnsafeRowSerializerTest : public ::testing::Test, } void serialize(RowVectorPtr rowVector, std::ostream* output) { + const auto streamInitialSize = output->tellp(); const auto numRows = rowVector->size(); std::vector ranges(numRows); @@ -64,7 +88,7 @@ class UnsafeRowSerializerTest : public ::testing::Test, for (auto i = 0; i < numRows; ++i) { serializedRowSizesPtr[i] = &serializedRowSizes[i]; } - if (GetParam()) { + if (appendRow_) { unsafeRow = std::make_unique(rowVector); getVectorSerde()->estimateSerializedSize( unsafeRow.get(), rows, serializedRowSizesPtr.data()); @@ -73,9 +97,9 @@ class UnsafeRowSerializerTest : public ::testing::Test, auto arena = std::make_unique(pool_.get()); auto rowType = std::dynamic_pointer_cast(rowVector->type()); auto serializer = getVectorSerde()->createIterativeSerializer( - rowType, numRows, arena.get()); + rowType, numRows, arena.get(), options_.get()); - if (GetParam()) { + if (appendRow_) { serializer->append(*unsafeRow, rows, serializedRowSizes); } else { Scratch scratch; @@ -86,7 +110,11 @@ class UnsafeRowSerializerTest : public ::testing::Test, auto size = serializer->maxSerializedSize(); OStreamOutputStream out(output); serializer->flush(&out); - ASSERT_EQ(size, output->tellp()); + if (!needCompression()) { + ASSERT_EQ(size, output->tellp() - streamInitialSize); + } else { + ASSERT_GT(size, output->tellp() - streamInitialSize); + } } std::unique_ptr toByteStream( @@ -110,7 +138,7 @@ class UnsafeRowSerializerTest : public ::testing::Test, RowVectorPtr result; getVectorSerde()->deserialize( - byteStream.get(), pool_.get(), rowType, &result); + byteStream.get(), pool_.get(), rowType, &result, options_.get()); return result; } @@ -127,13 +155,36 @@ class UnsafeRowSerializerTest : public ::testing::Test, testSerialize(RowVectorPtr rowVector, int8_t* expectedData, size_t dataSize) { std::ostringstream out; serialize(rowVector, &out); - EXPECT_EQ(std::memcmp(expectedData, out.str().data(), dataSize), 0); + if (!needCompression()) { + // Check the data after header. + EXPECT_EQ( + std::memcmp(expectedData, out.str().data() + kHeaderSize, dataSize), + 0); + } } void testDeserialize( const std::vector& input, RowVectorPtr expectedVector) { - auto results = deserialize(asRowType(expectedVector->type()), input); + if (needCompression()) { + return; + } + // Construct the header to make deserialization work. + std::vector uncompressedInput = input; + char header[kHeaderSize] = {0}; + int32_t uncompressedSize = 0; + for (const auto& in : input) { + uncompressedSize += in.size(); + } + auto* headerPtr = reinterpret_cast(&header); + headerPtr[0] = uncompressedSize; + headerPtr[1] = uncompressedSize; + header[kHeaderSize - 1] = 0; + + uncompressedInput.insert( + uncompressedInput.begin(), std::string_view(header, kHeaderSize)); + auto results = + deserialize(asRowType(expectedVector->type()), uncompressedInput); test::assertEqualVectors(expectedVector, results); } @@ -144,7 +195,17 @@ class UnsafeRowSerializerTest : public ::testing::Test, expectedVector); } + bool needCompression() { + return compressionKind_ != common::CompressionKind::CompressionKind_NONE; + } + std::shared_ptr pool_; + + private: + static constexpr int32_t kHeaderSize = sizeof(int32_t) * 2 + sizeof(char); + common::CompressionKind compressionKind_; + std::unique_ptr options_; + bool appendRow_; }; // These expected binary buffers were samples taken using Spark's java code. @@ -290,6 +351,11 @@ TEST_P(UnsafeRowSerializerTest, splitRow) { } TEST_P(UnsafeRowSerializerTest, incompleteRow) { + // The test data is for non-compression, and we don't know the compressed size + // to construct header. If the row is incomplete, readBytes will fail. + if (needCompression()) { + return; + } int8_t data[20] = {0, 0, 0, 16, 0, 0, 0, 0, 0, 0, 0, 0, 62, 28, -36, -33, 2, 0, 0, 0}; auto expected = @@ -317,7 +383,7 @@ TEST_P(UnsafeRowSerializerTest, incompleteRow) { buffers = {{rawData, 2}}; VELOX_ASSERT_RUNTIME_THROW( testDeserialize(buffers, expected), - "(1 vs. 1) Reading past end of BufferInputStream"); + "(2 vs. 2) Reading past end of BufferInputStream"); } TEST_P(UnsafeRowSerializerTest, types) { @@ -430,7 +496,20 @@ TEST_P(UnsafeRowSerializerTest, decimalVector) { testRoundTrip(rowVectorArray); } +TEST_P(UnsafeRowSerializerTest, multiPage) { + auto input = + makeRowVector({makeFlatVector(std::vector{12345678910, 123})}); + std::ostringstream out; + serialize(input, &out); + serialize(input, &out); + auto expected = makeRowVector({makeFlatVector( + std::vector{12345678910, 123, 12345678910, 123})}); + auto rowType = std::dynamic_pointer_cast(input->type()); + auto deserialized = deserialize(rowType, {out.str()}); + test::assertEqualVectors(deserialized, expected); +} + VELOX_INSTANTIATE_TEST_SUITE_P( UnsafeRowSerializerTest, UnsafeRowSerializerTest, - testing::Values(false, true)); + testing::ValuesIn(UnsafeRowSerializerTest::getTestParams())); diff --git a/velox/vector/VectorStream.h b/velox/vector/VectorStream.h index bb9b1a563bea..6094ef7830b8 100644 --- a/velox/vector/VectorStream.h +++ b/velox/vector/VectorStream.h @@ -38,6 +38,21 @@ class CompactRow; class UnsafeRowFast; }; // namespace row +struct CompressionStats { + // Number of times compression was not attempted. + int32_t numCompressionSkipped{0}; + + // Uncompressed size for which compression was attempted. + int64_t compressionInputBytes{0}; + + // Compressed bytes. + int64_t compressedBytes{0}; + + // Bytes for which compression was not attempted because of past + // non-performance. + int64_t compressionSkippedBytes{0}; +}; + /// Serializer that can iteratively build up a buffer of serialized rows from /// one or more RowVectors. /// @@ -171,13 +186,20 @@ class VectorSerde { struct Options { Options() = default; - explicit Options(common::CompressionKind _compressionKind) - : compressionKind(_compressionKind) {} + Options( + common::CompressionKind _compressionKind, + float _minCompressionRatio) + : compressionKind(_compressionKind), + minCompressionRatio(_minCompressionRatio) {} virtual ~Options() = default; common::CompressionKind compressionKind{ common::CompressionKind::CompressionKind_NONE}; + /// Minimum achieved compression if compression is enabled. Compressing less + /// than this causes subsequent compression attempts to be skipped. The more + /// times compression misses the target the less frequently it is tried. + float minCompressionRatio{0.8}; }; Kind kind() const {