From f3120d62ae1a4e3865607bbee28c5bfb2f3871ef Mon Sep 17 00:00:00 2001 From: Orri Erling Date: Sun, 10 Dec 2023 17:31:46 -0800 Subject: [PATCH] Add serialization API for set of rows Top level rows are more efficiently serialized in row sets rather than arrays or ranges. Arrays of ranges are still useful for repeated nested content. The row set path can uses SIMD to gather nulls and extract idices of non-null values for serialization. A Scratch objett is added to signatures to pass reusable scratch memory also for top level calls to range serializing serializatin functions. This can remove malloc use for temporary vectors. The API is tested standalone but is not connected to running code, so this diff does not affect running systens. --- velox/serializers/CompactRowSerializer.cpp | 6 +- velox/serializers/CompactRowSerializer.h | 3 +- velox/serializers/PrestoSerializer.cpp | 1087 ++++++++++++++++- velox/serializers/PrestoSerializer.h | 11 +- velox/serializers/UnsafeRowSerializer.cpp | 6 +- velox/serializers/UnsafeRowSerializer.h | 3 +- velox/serializers/tests/CMakeLists.txt | 12 + .../tests/CompactRowSerializerTest.cpp | 3 +- .../tests/PrestoSerializerTest.cpp | 80 +- .../serializers/tests/SerializerBenchmark.cpp | 151 +++ .../tests/UnsafeRowSerializerTest.cpp | 3 +- velox/vector/VectorStream.cpp | 33 +- velox/vector/VectorStream.h | 81 +- 13 files changed, 1414 insertions(+), 65 deletions(-) create mode 100644 velox/serializers/tests/SerializerBenchmark.cpp diff --git a/velox/serializers/CompactRowSerializer.cpp b/velox/serializers/CompactRowSerializer.cpp index db70b3882ddde..a47be6c7e742c 100644 --- a/velox/serializers/CompactRowSerializer.cpp +++ b/velox/serializers/CompactRowSerializer.cpp @@ -22,7 +22,8 @@ namespace facebook::velox::serializer { void CompactRowVectorSerde::estimateSerializedSize( VectorPtr /* vector */, const folly::Range& /* ranges */, - vector_size_t** /* sizes */) { + vector_size_t** /* sizes */, + Scratch& /*scratch*/) { VELOX_UNSUPPORTED(); } @@ -36,7 +37,8 @@ class CompactRowVectorSerializer : public VectorSerializer { void append( const RowVectorPtr& vector, - const folly::Range& ranges) override { + const folly::Range& ranges, + Scratch& scratch) override { size_t totalSize = 0; row::CompactRow row(vector); if (auto fixedRowSize = diff --git a/velox/serializers/CompactRowSerializer.h b/velox/serializers/CompactRowSerializer.h index 48737d55c3d95..10c5a928cb491 100644 --- a/velox/serializers/CompactRowSerializer.h +++ b/velox/serializers/CompactRowSerializer.h @@ -28,7 +28,8 @@ class CompactRowVectorSerde : public VectorSerde { void estimateSerializedSize( VectorPtr vector, const folly::Range& ranges, - vector_size_t** sizes) override; + vector_size_t** sizes, + Scratch& scratch) override; // This method is not used in production code. It is only used to // support round-trip tests for deserialization. diff --git a/velox/serializers/PrestoSerializer.cpp b/velox/serializers/PrestoSerializer.cpp index 53747c6f4d4a5..b9c61c65e3192 100644 --- a/velox/serializers/PrestoSerializer.cpp +++ b/velox/serializers/PrestoSerializer.cpp @@ -1077,6 +1077,11 @@ class CountingOutputStream : public OutputStream { std::streampos pos_{0}; }; +raw_vector& threadTempNulls() { + thread_local raw_vector temp; + return temp; +} + // Appendable container for serialized values. To append a value at a // time, call appendNull or appendNonNull first. Then call // appendLength if the type has a length. A null value has a length of @@ -1094,7 +1099,8 @@ class VectorStream { useLosslessTimestamp_(useLosslessTimestamp), nulls_(streamArena, true, true), lengths_(streamArena), - values_(streamArena) { + values_(streamArena), + isLongDecimal_(type_->isLongDecimal()) { if (initialNumRows == 0) { initializeHeader(typeToEncodingName(type), *streamArena); return; @@ -1198,6 +1204,72 @@ class VectorStream { lengths_.appendOne(totalLength_); } + void appendNulls( + const uint64_t* nulls, + int32_t begin, + int32_t end, + int32_t numNonNull) { + VELOX_DCHECK_EQ(numNonNull, bits::countBits(nulls, begin, end)); + const auto numRows = end - begin; + const auto numNulls = numRows - numNonNull; + if (numNulls == 0 && nullCount_ == 0) { + nonNullCount_ += numNonNull; + return; + } + if (UNLIKELY(numNulls > 0 && nonNullCount_ > 0 && nullCount_ == 0)) { + // There were only non-nulls up until now. Add the bits for them. + nulls_.appendBool(false, nonNullCount_); + } + nullCount_ += numNulls; + nonNullCount_ += numNonNull; + if (LIKELY(end <= 64)) { + uint64_t inverted = ~nulls[0]; + nulls_.appendBitsFresh(&inverted, begin, end); + return; + } + const int32_t firstWord = begin >> 6; + const int32_t firstBit = begin & 63; + const auto numWords = bits::nwords(numRows + firstBit); + // The polarity of nulls is reverse in wire format. Make an inverted copy. + uint64_t smallNulls[16]; + uint64_t* invertedNulls = smallNulls; + if (numWords > sizeof(smallNulls) / sizeof(smallNulls[0])) { + auto& tempNulls = threadTempNulls(); + tempNulls.resize(numWords + 1); + invertedNulls = tempNulls.data(); + } + for (auto i = 0; i < numWords; ++i) { + invertedNulls[i] = ~nulls[i + firstWord]; + } + nulls_.appendBitsFresh(invertedNulls, firstBit, firstBit + numRows); + } + + // Appends a zero length for each null bit and a length from lengthFunc(row) + // for non-nulls in rows. + template + void appendLengths( + const uint64_t* nulls, + folly::Range rows, + int32_t numNonNull, + LengthFunc lengthFunc) { + const auto numRows = rows.size(); + if (!nulls) { + appendNonNull(numRows); + for (auto i = 0; i < numRows; ++i) { + appendLength(lengthFunc(rows[i])); + } + } else { + appendNulls(nulls, 0, numRows, numNonNull); + for (auto i = 0; i < numRows; ++i) { + if (bits::isBitSet(nulls, i)) { + appendLength(lengthFunc(rows[i])); + } else { + appendLength(0); + } + } + } + } + template void append(folly::Range values) { values_.append(values); @@ -1212,6 +1284,14 @@ class VectorStream { return children_[index].get(); } + auto& values() { + return values_; + } + + auto& nulls() { + return nulls_; + } + // Returns the size to flush to OutputStream before calling `flush`. size_t serializedSize() { CountingOutputStream out; @@ -1310,6 +1390,10 @@ class VectorStream { } } + bool isLongDecimal() const { + return isLongDecimal_; + } + private: const TypePtr type_; const std::optional encoding_; @@ -1326,6 +1410,7 @@ class VectorStream { ByteOutputStream lengths_; ByteOutputStream values_; std::vector> children_; + const bool isLongDecimal_; }; template <> @@ -1374,7 +1459,7 @@ template <> void VectorStream::append(folly::Range values) { for (auto& value : values) { int128_t val = value; - if (type_->isLongDecimal()) { + if (isLongDecimal_) { val = toJavaDecimalValue(value); } values_.append(folly::Range(&val, 1)); @@ -1464,6 +1549,12 @@ void serializeColumn( const folly::Range& ranges, VectorStream* stream); +void serializeColumn( + const BaseVector* vector, + const folly::Range& rows, + VectorStream* stream, + Scratch& scratch); + void serializeWrapped( const BaseVector* vector, const folly::Range& ranges, @@ -1710,6 +1801,605 @@ void serializeColumn( } } +// Returns ranges for the non-null rows of an array or map. 'rows' gives the +// rows. nulls is the nulls of the array/map or nullptr if no nulls. 'offsets' +// and 'sizes' are the offsets and sizes of the array/map.Returns the number of +// index ranges. Obtains the ranges from 'rangesHolder'. If 'sizesPtr' is +// non-null, gets returns the sizes for the inner ranges in 'sizesHolder'. If +// 'stream' is non-null, writes the lengths and nulls for the array/map into +// 'stream'. +int32_t rowsToRanges( + folly::Range rows, + const uint64_t* rawNulls, + const vector_size_t* offsets, + const vector_size_t* sizes, + vector_size_t** sizesPtr, + ScratchPtr& rangesHolder, + ScratchPtr* sizesHolder, + VectorStream* stream, + Scratch& scratch) { + auto numRows = rows.size(); + auto* innerRows = rows.data(); + auto* nonNullRows = innerRows; + int32_t numInner = rows.size(); + ScratchPtr nonNullHolder(scratch); + ScratchPtr innerRowsHolder(scratch); + if (rawNulls) { + ScratchPtr nullsHolder(scratch); + auto* nulls = nullsHolder.get(bits::nwords(rows.size())); + simd::gatherBits(rawNulls, rows, nulls); + auto* mutableNonNullRows = nonNullHolder.get(numRows); + auto* mutableInnerRows = innerRowsHolder.get(numRows); + numInner = simd::indicesOfSetBits(nulls, 0, numRows, mutableNonNullRows); + if (stream) { + stream->appendLengths( + nulls, rows, numInner, [&](auto row) { return sizes[row]; }); + } + simd::transpose( + rows.data(), + folly::Range(mutableNonNullRows, numInner), + mutableInnerRows); + nonNullRows = mutableNonNullRows; + innerRows = mutableInnerRows; + } else if (stream) { + stream->appendNonNull(rows.size()); + for (auto i = 0; i < rows.size(); ++i) { + stream->appendLength(sizes[rows[i]]); + } + } + vector_size_t** sizesOut = nullptr; + if (sizesPtr) { + sizesOut = sizesHolder->get(numInner); + } + auto ranges = rangesHolder.get(numInner); + int32_t fill = 0; + for (auto i = 0; i < numInner; ++i) { + if (sizes[innerRows[i]] == 0) { + continue; + } + if (sizesOut) { + sizesOut[fill] = sizesPtr[rawNulls ? nonNullRows[i] : i]; + } + ranges[fill].begin = offsets[innerRows[i]]; + ranges[fill].size = sizes[innerRows[i]]; + ++fill; + } + return fill; +} + +template +void copyWords( + T* destination, + const int32_t* indices, + int32_t numIndices, + const T* values, + bool isLongDecimal = false) { + if (std::is_same_v && isLongDecimal) { + for (auto i = 0; i < numIndices; ++i) { + reinterpret_cast(destination)[i] = toJavaDecimalValue( + reinterpret_cast(values)[indices[i]]); + } + return; + } + for (auto i = 0; i < numIndices; ++i) { + destination[i] = values[indices[i]]; + } +} + +template +void copyWordsWithRows( + T* destination, + const int32_t* rows, + const int32_t* indices, + int32_t numIndices, + const T* values, + bool isLongDecimal = false) { + if (!indices) { + copyWords(destination, rows, numIndices, values, isLongDecimal); + return; + } + if (std::is_same_v && isLongDecimal) { + for (auto i = 0; i < numIndices; ++i) { + reinterpret_cast(destination)[i] = toJavaDecimalValue( + reinterpret_cast(values)[rows[indices[i]]]); + } + return; + } + for (auto i = 0; i < numIndices; ++i) { + destination[i] = values[rows[indices[i]]]; + } +} + +template +void appendNonNull( + VectorStream* stream, + const uint64_t* nulls, + folly::Range rows, + const T* values, + Scratch& scratch) { + auto numRows = rows.size(); + ScratchPtr nonNullHolder(scratch); + const int32_t* nonNullIndices; + int32_t numNonNull; + if (LIKELY(numRows <= 8)) { + // Short batches need extra optimization. The set bits are prematerialized. + uint8_t nullsByte = *reinterpret_cast(nulls); + numNonNull = __builtin_popcount(nullsByte); + nonNullIndices = + numNonNull == numRows ? nullptr : simd::byteSetBits(nullsByte); + } else { + auto mutableIndices = nonNullHolder.get(numRows); + // Convert null flags to indices. This is much faster than checking bits one + // by one, several bits per clock specially if mostly null or non-null. Even + // worst case of half nulls is more than one row per clock. + numNonNull = simd::indicesOfSetBits(nulls, 0, numRows, mutableIndices); + nonNullIndices = numNonNull == numRows ? nullptr : mutableIndices; + } + stream->appendNulls(nulls, 0, rows.size(), numNonNull); + ByteOutputStream& out = stream->values(); + + if constexpr (sizeof(T) == 8) { + AppendWindow window(out, scratch); + int64_t* output = window.get(numNonNull); + copyWordsWithRows( + output, + rows.data(), + nonNullIndices, + numNonNull, + reinterpret_cast(values)); + } else if constexpr (sizeof(T) == 4) { + AppendWindow window(out, scratch); + int32_t* output = window.get(numNonNull); + copyWordsWithRows( + output, + rows.data(), + nonNullIndices, + numNonNull, + reinterpret_cast(values)); + } else { + AppendWindow window(out, scratch); + T* output = window.get(numNonNull); + copyWordsWithRows( + output, + rows.data(), + nonNullIndices, + numNonNull, + values, + stream->isLongDecimal()); + } +} + +void appendStrings( + const uint64_t* nulls, + folly::Range rows, + const StringView* views, + VectorStream* stream, + Scratch& scratch) { + if (!nulls) { + stream->appendLengths(nullptr, rows, rows.size(), [&](auto row) { + return views[row].size(); + }); + for (auto i = 0; i < rows.size(); ++i) { + auto& view = views[rows[i]]; + stream->values().appendStringView( + std::string_view(view.data(), view.size())); + } + return; + } + ScratchPtr nonNullHolder(scratch); + auto nonNull = nonNullHolder.get(rows.size()); + auto numNonNull = simd::indicesOfSetBits(nulls, 0, rows.size(), nonNull); + stream->appendLengths( + nulls, rows, numNonNull, [&](auto row) { return views[row].size(); }); + for (auto i = 0; i < numNonNull; ++i) { + auto& view = views[rows[nonNull[i]]]; + stream->values().appendStringView( + std::string_view(view.data(), view.size())); + } +} + +void appendTimestamps( + const uint64_t* nulls, + folly::Range rows, + const Timestamp* timestamps, + VectorStream* stream, + Scratch& scratch) { + if (!nulls) { + stream->appendNonNull(rows.size()); + for (auto i = 0; i < rows.size(); ++i) { + stream->appendOne(timestamps[rows[i]]); + } + return; + } + ScratchPtr nonNullHolder(scratch); + auto nonNullRows = nonNullHolder.get(rows.size()); + auto numNonNull = simd::indicesOfSetBits(nulls, 0, rows.size(), nonNullRows); + stream->appendNulls(nulls, 0, rows.size(), numNonNull); + for (auto i = 0; i < numNonNull; ++i) { + stream->appendOne(timestamps[rows[nonNullRows[i]]]); + } +} + +template +void serializeFlatVector( + const BaseVector* vector, + const folly::Range& rows, + VectorStream* stream, + Scratch& scratch) { + using T = typename TypeTraits::NativeType; + auto flatVector = vector->asUnchecked>(); + auto rawValues = flatVector->rawValues(); + if (!flatVector->mayHaveNulls()) { + if (std::is_same_v) { + appendTimestamps( + nullptr, + rows, + reinterpret_cast(rawValues), + stream, + scratch); + return; + } + + if (std::is_same_v) { + appendStrings( + nullptr, + rows, + reinterpret_cast(rawValues), + stream, + scratch); + return; + } + + stream->appendNonNull(rows.size()); + AppendWindow window(stream->values(), scratch); + T* output = window.get(rows.size()); + copyWords( + output, rows.data(), rows.size(), rawValues, stream->isLongDecimal()); + return; + } + ScratchPtr nullsHolder(scratch); + uint64_t* nulls = nullsHolder.get(bits::nwords(rows.size())); + simd::gatherBits(vector->rawNulls(), rows, nulls); + if (std::is_same_v) { + appendTimestamps( + nulls, + rows, + reinterpret_cast(rawValues), + stream, + scratch); + return; + } + if (std::is_same_v) { + appendStrings( + nulls, + rows, + reinterpret_cast(rawValues), + stream, + scratch); + return; + } + appendNonNull(stream, nulls, rows, rawValues, scratch); +} + +uint64_t bitsToBytesMap[256]; + +uint64_t bitsToBytes(uint8_t byte) { + return bitsToBytesMap[byte]; +} + +template <> +void serializeFlatVector( + const BaseVector* vector, + const folly::Range& rows, + VectorStream* stream, + Scratch& scratch) { + auto flatVector = reinterpret_cast*>(vector); + auto rawValues = flatVector->rawValues(); + ScratchPtr bitsHolder(scratch); + uint64_t* valueBits; + int32_t numValueBits; + if (!flatVector->mayHaveNulls()) { + stream->appendNonNull(rows.size()); + valueBits = bitsHolder.get(bits::nwords(rows.size())); + simd::gatherBits( + reinterpret_cast(rawValues), rows, valueBits); + numValueBits = rows.size(); + } else { + uint64_t* nulls = bitsHolder.get(bits::nwords(rows.size())); + simd::gatherBits(vector->rawNulls(), rows, nulls); + ScratchPtr nonNullsHolder(scratch); + auto nonNulls = nonNullsHolder.get(rows.size()); + numValueBits = simd::indicesOfSetBits(nulls, 0, rows.size(), nonNulls); + stream->appendNulls(nulls, 0, rows.size(), numValueBits); + valueBits = nulls; + simd::transpose( + rows.data(), + folly::Range(nonNulls, numValueBits), + nonNulls); + simd::gatherBits( + reinterpret_cast(rawValues), + folly::Range(nonNulls, numValueBits), + valueBits); + } + // 'valueBits' contains the non-null bools to be appended to the + // stream. The wire format has a byte for each bit. Every full byte + // is appended as a word. The partial byte is translated to a word + // and its low bytes are appended. + AppendWindow window(stream->values(), scratch); + uint8_t* output = window.get(numValueBits); + const auto numBytes = bits::nbytes(numValueBits); + for (auto i = 0; i < numBytes; ++i) { + uint64_t word = bitsToBytes(reinterpret_cast(valueBits)[i]); + if (i < numBytes - 1) { + reinterpret_cast(output)[i] = word; + } else { + memcpy(output + i * 8, &word, numValueBits - i * 8); + } + } +} + +void serializeWrapped( + const BaseVector* vector, + const folly::Range& rows, + VectorStream* stream, + Scratch& scratch) { + ScratchPtr innerRowsHolder(scratch); + const int32_t numRows = rows.size(); + int32_t numInner = 0; + auto innerRows = innerRowsHolder.get(numRows); + const BaseVector* wrapped; + if (vector->encoding() == VectorEncoding::Simple::DICTIONARY && + !vector->rawNulls()) { + // Dictionary with no nulls. + auto* indices = vector->wrapInfo()->as(); + wrapped = vector->valueVector().get(); + simd::transpose(indices, rows, innerRows); + numInner = numRows; + } else { + wrapped = vector->wrappedVector(); + for (int32_t i = 0; i < rows.size(); ++i) { + if (vector->isNullAt(rows[i])) { + if (numInner > 0) { + serializeColumn( + wrapped, + folly::Range(innerRows, numInner), + stream, + scratch); + numInner = 0; + } + stream->appendNull(); + continue; + } + innerRows[numInner++] = vector->wrappedIndex(rows[i]); + } + } + if (numInner > 0) { + serializeColumn( + wrapped, + folly::Range(innerRows, numInner), + stream, + scratch); + } +} + +template <> +void serializeFlatVector( + const BaseVector* vector, + const folly::Range& rows, + VectorStream* stream, + Scratch& scratch) { + VELOX_CHECK_NOT_NULL(vector->rawNulls()); + for (auto i = 0; i < rows.size(); ++i) { + VELOX_DCHECK(vector->isNullAt(rows[i])); + stream->appendNull(); + } +} + +template <> +void serializeFlatVector( + const BaseVector* vector, + const folly::Range& ranges, + VectorStream* stream, + Scratch& scratch) { + VELOX_UNSUPPORTED(); +} + +void serializeTimestampWithTimeZone( + const RowVector* rowVector, + const folly::Range& rows, + VectorStream* stream, + Scratch& scratch) { + auto timestamps = rowVector->childAt(0)->as>(); + auto timezones = rowVector->childAt(1)->as>(); + for (auto i : rows) { + if (rowVector->isNullAt(i)) { + stream->appendNull(); + } else { + stream->appendNonNull(); + stream->appendOne(packTimestampWithTimeZone( + timestamps->valueAt(i), timezones->valueAt(i))); + } + } +} + +void serializeRowVector( + const BaseVector* vector, + const folly::Range& rows, + VectorStream* stream, + Scratch& scratch) { + auto rowVector = reinterpret_cast(vector); + vector_size_t* childRows; + int32_t numChildRows = 0; + if (isTimestampWithTimeZoneType(vector->type())) { + serializeTimestampWithTimeZone(rowVector, rows, stream, scratch); + return; + } + ScratchPtr nullsHolder(scratch); + ScratchPtr innerRowsHolder(scratch); + auto innerRows = rows.data(); + auto numInnerRows = rows.size(); + if (auto rawNulls = vector->rawNulls()) { + auto nulls = nullsHolder.get(bits::nwords(rows.size())); + simd::gatherBits(rawNulls, rows, nulls); + auto* mutableInnerRows = innerRowsHolder.get(rows.size()); + numInnerRows = + simd::indicesOfSetBits(nulls, 0, rows.size(), mutableInnerRows); + stream->appendLengths(nulls, rows, numInnerRows, [](int32_t) { return 1; }); + simd::transpose( + rows.data(), + folly::Range(mutableInnerRows, numInnerRows), + mutableInnerRows); + innerRows = mutableInnerRows; + } else { + stream->appendLengths( + nullptr, rows, rows.size(), [](int32_t) { return 1; }); + } + for (int32_t i = 0; i < rowVector->childrenSize(); ++i) { + serializeColumn( + rowVector->childAt(i).get(), + folly::Range(innerRows, numInnerRows), + stream->childAt(i), + scratch); + } +} + +void serializeArrayVector( + const BaseVector* vector, + const folly::Range& rows, + VectorStream* stream, + Scratch& scratch) { + auto arrayVector = reinterpret_cast(vector); + + ScratchPtr rangesHolder(scratch); + int32_t numRanges = rowsToRanges( + rows, + arrayVector->rawNulls(), + arrayVector->rawOffsets(), + arrayVector->rawSizes(), + nullptr, + rangesHolder, + nullptr, + stream, + scratch); + if (numRanges == 0) { + return; + } + serializeColumn( + arrayVector->elements().get(), + folly::Range(rangesHolder.get(), numRanges), + stream->childAt(0)); +} + +void serializeMapVector( + const BaseVector* vector, + const folly::Range& rows, + VectorStream* stream, + Scratch& scratch) { + auto mapVector = reinterpret_cast(vector); + + ScratchPtr rangesHolder(scratch); + int32_t numRanges = rowsToRanges( + rows, + mapVector->rawNulls(), + mapVector->rawOffsets(), + mapVector->rawSizes(), + nullptr, + rangesHolder, + nullptr, + stream, + scratch); + if (numRanges == 0) { + return; + } + serializeColumn( + mapVector->mapKeys().get(), + folly::Range(rangesHolder.get(), numRanges), + stream->childAt(0)); + serializeColumn( + mapVector->mapValues().get(), + folly::Range(rangesHolder.get(), numRanges), + stream->childAt(1)); +} + +template +void serializeConstantVector( + const BaseVector* vector, + const folly::Range& rows, + VectorStream* stream, + Scratch& scratch) { + using T = typename KindToFlatVector::WrapperType; + auto constVector = dynamic_cast*>(vector); + if (constVector->valueVector()) { + serializeWrapped(constVector, rows, stream, scratch); + return; + } + const auto numRows = rows.size(); + if (vector->isNullAt(0)) { + for (int32_t i = 0; i < numRows; ++i) { + stream->appendNull(); + } + return; + } + + T value = constVector->valueAtFast(0); + for (int32_t i = 0; i < numRows; ++i) { + stream->appendNonNull(); + stream->appendOne(value); + } +} + +template +void serializeBiasVector( + const BaseVector* vector, + const folly::Range& rows, + VectorStream* stream, + Scratch& scratch) { + VELOX_UNSUPPORTED(); +} + +void serializeColumn( + const BaseVector* vector, + const folly::Range& rows, + VectorStream* stream, + Scratch& scratch) { + switch (vector->encoding()) { + case VectorEncoding::Simple::FLAT: + VELOX_DYNAMIC_SCALAR_TYPE_DISPATCH_ALL( + serializeFlatVector, + vector->typeKind(), + vector, + rows, + stream, + scratch); + break; + case VectorEncoding::Simple::CONSTANT: + VELOX_DYNAMIC_TYPE_DISPATCH_ALL( + serializeConstantVector, + vector->typeKind(), + vector, + rows, + stream, + scratch); + break; + case VectorEncoding::Simple::BIASED: + VELOX_UNSUPPORTED(); + case VectorEncoding::Simple::ROW: + serializeRowVector(vector, rows, stream, scratch); + break; + case VectorEncoding::Simple::ARRAY: + serializeArrayVector(vector, rows, stream, scratch); + break; + case VectorEncoding::Simple::MAP: + serializeMapVector(vector, rows, stream, scratch); + break; + case VectorEncoding::Simple::LAZY: + serializeColumn(vector->loadedVector(), rows, stream, scratch); + break; + default: + serializeWrapped(vector, rows, stream, scratch); + } +} + template void serializeConstantColumn( const BaseVector* vector, @@ -1800,19 +2490,11 @@ void estimateFlatSerializedSize( vector_size_t** sizes) { auto valueSize = vector->type()->cppSizeInBytes(); if (vector->mayHaveNulls()) { + auto rawNulls = vector->rawNulls(); for (int32_t i = 0; i < ranges.size(); ++i) { auto end = ranges[i].begin + ranges[i].size; - int32_t numNulls = 0; - int32_t bytes = 0; - auto rawNulls = vector->rawNulls(); - for (int32_t offset = ranges[i].begin; offset < end; ++offset) { - if (bits::isBitNull(rawNulls, offset)) { - ++numNulls; - } else { - bytes += valueSize; - } - } - *(sizes[i]) += bytes + bits::nbytes(numNulls); + auto numValues = bits::countBits(rawNulls, ranges[i].begin, end); + *(sizes[i]) += numValues * valueSize + bits::nbytes(ranges[i].size); } } else { for (int32_t i = 0; i < ranges.size(); ++i) { @@ -1868,16 +2550,8 @@ void estimateBiasedSerializedSize( auto rawNulls = vector->rawNulls(); for (int32_t i = 0; i < ranges.size(); ++i) { auto end = ranges[i].begin + ranges[i].size; - int32_t numNulls = 0; - int32_t bytes = 0; - for (int32_t offset = ranges[i].begin; offset < end; ++offset) { - if (bits::isBitNull(rawNulls, offset)) { - ++numNulls; - } else { - bytes += valueSize; - } - } - *(sizes[i]) += bytes + bits::nbytes(numNulls); + int32_t numValues = bits::countBits(rawNulls, ranges[i].begin, end); + *(sizes[i]) += numValues * valueSize + bits::nbytes(ranges[i].size); } } else { for (int32_t i = 0; i < ranges.size(); ++i) { @@ -1889,12 +2563,14 @@ void estimateBiasedSerializedSize( void estimateSerializedSizeInt( const BaseVector* vector, const folly::Range& ranges, - vector_size_t** sizes); + vector_size_t** sizes, + Scratch& scratch); void estimateWrapperSerializedSize( const folly::Range& ranges, vector_size_t** sizes, - const BaseVector* wrapper) { + const BaseVector* wrapper, + Scratch& scratch) { std::vector newRanges; std::vector newSizes; const BaseVector* wrapped = wrapper->wrappedVector(); @@ -1911,19 +2587,20 @@ void estimateWrapperSerializedSize( } *sizes[i] += bits::nbytes(numNulls); } - estimateSerializedSizeInt(wrapped, newRanges, newSizes.data()); + estimateSerializedSizeInt(wrapped, newRanges, newSizes.data(), scratch); } template void estimateConstantSerializedSize( const BaseVector* vector, const folly::Range& ranges, - vector_size_t** sizes) { + vector_size_t** sizes, + Scratch& scratch) { VELOX_CHECK(vector->encoding() == VectorEncoding::Simple::CONSTANT); using T = typename KindToFlatVector::WrapperType; auto constantVector = vector->as>(); if (constantVector->valueVector()) { - estimateWrapperSerializedSize(ranges, sizes, vector); + estimateWrapperSerializedSize(ranges, sizes, vector, scratch); return; } int32_t elementSize = sizeof(T); @@ -1942,7 +2619,8 @@ void estimateConstantSerializedSize( void estimateSerializedSizeInt( const BaseVector* vector, const folly::Range& ranges, - vector_size_t** sizes) { + vector_size_t** sizes, + Scratch& scratch) { switch (vector->encoding()) { case VectorEncoding::Simple::FLAT: VELOX_DYNAMIC_SCALAR_TYPE_DISPATCH_ALL( @@ -1958,11 +2636,12 @@ void estimateSerializedSizeInt( vector->typeKind(), vector, ranges, - sizes); + sizes, + scratch); break; case VectorEncoding::Simple::DICTIONARY: case VectorEncoding::Simple::SEQUENCE: - estimateWrapperSerializedSize(ranges, sizes, vector); + estimateWrapperSerializedSize(ranges, sizes, vector, scratch); break; case VectorEncoding::Simple::BIASED: estimateBiasedSerializedSize(vector, ranges, sizes); @@ -1982,13 +2661,14 @@ void estimateSerializedSizeInt( } } auto rowVector = vector->as(); - auto children = rowVector->children(); + auto& children = rowVector->children(); for (auto& child : children) { if (child) { estimateSerializedSizeInt( child.get(), folly::Range(childRanges.data(), childRanges.size()), - childSizes.data()); + childSizes.data(), + scratch); } } break; @@ -2006,9 +2686,12 @@ void estimateSerializedSizeInt( &childRanges, &childSizes); estimateSerializedSizeInt( - mapVector->mapKeys().get(), childRanges, childSizes.data()); + mapVector->mapKeys().get(), childRanges, childSizes.data(), scratch); estimateSerializedSizeInt( - mapVector->mapValues().get(), childRanges, childSizes.data()); + mapVector->mapValues().get(), + childRanges, + childSizes.data(), + scratch); break; } case VectorEncoding::Simple::ARRAY: { @@ -2024,11 +2707,303 @@ void estimateSerializedSizeInt( &childRanges, &childSizes); estimateSerializedSizeInt( - arrayVector->elements().get(), childRanges, childSizes.data()); + arrayVector->elements().get(), + childRanges, + childSizes.data(), + scratch); break; } case VectorEncoding::Simple::LAZY: - estimateSerializedSizeInt(vector->loadedVector(), ranges, sizes); + estimateSerializedSizeInt(vector->loadedVector(), ranges, sizes, scratch); + break; + default: + VELOX_CHECK(false, "Unsupported vector encoding {}", vector->encoding()); + } +} + +void estimateSerializedSizeInt( + const BaseVector* vector, + const folly::Range& rows, + vector_size_t** sizes, + Scratch& scratch); + +template +void estimateFlatSerializedSize( + const BaseVector* vector, + const folly::Range& rows, + vector_size_t** sizes, + Scratch& scratch) { + const auto valueSize = vector->type()->cppSizeInBytes(); + const auto numRows = rows.size(); + if (vector->mayHaveNulls()) { + auto rawNulls = vector->rawNulls(); + ScratchPtr nullsHolder(scratch); + ScratchPtr nonNullsHolder(scratch); + auto nulls = nullsHolder.get(bits::nwords(numRows)); + simd::gatherBits(rawNulls, rows, nulls); + auto nonNulls = nonNullsHolder.get(numRows); + const auto numNonNull = simd::indicesOfSetBits(nulls, 0, numRows, nonNulls); + for (int32_t i = 0; i < numNonNull; ++i) { + *sizes[nonNulls[i]] += valueSize; + } + } else { + VELOX_UNREACHABLE("Non null fixed width case handled before this"); + } +} + +void estimateFlatSerializedSizeVarcharOrVarbinary( + const BaseVector* vector, + const folly::Range& rows, + vector_size_t** sizes, + Scratch& scratch) { + const auto numRows = rows.size(); + auto strings = static_cast*>(vector); + auto rawNulls = strings->rawNulls(); + auto rawValues = strings->rawValues(); + if (!rawNulls) { + for (auto i = 0; i < rows.size(); ++i) { + *sizes[i] += rawValues[rows[i]].size(); + } + } else { + ScratchPtr nullsHolder(scratch); + ScratchPtr nonNullsHolder(scratch); + auto nulls = nullsHolder.get(bits::nwords(numRows)); + simd::gatherBits(rawNulls, rows, nulls); + auto* nonNulls = nonNullsHolder.get(numRows); + auto numNonNull = simd::indicesOfSetBits(nulls, 0, numRows, nonNulls); + + for (int32_t i = 0; i < numNonNull; ++i) { + *sizes[nonNulls[i]] += rawValues[rows[nonNulls[i]]].size(); + } + } +} + +template <> +void estimateFlatSerializedSize( + const BaseVector* vector, + const folly::Range& rows, + vector_size_t** sizes, + Scratch& scratch) { + estimateFlatSerializedSizeVarcharOrVarbinary(vector, rows, sizes, scratch); +} + +template <> +void estimateFlatSerializedSize( + const BaseVector* vector, + const folly::Range& rows, + vector_size_t** sizes, + Scratch& scratch) { + estimateFlatSerializedSizeVarcharOrVarbinary(vector, rows, sizes, scratch); +} + +void estimateBiasedSerializedSize( + const BaseVector* vector, + const folly::Range& rows, + vector_size_t** sizes, + Scratch& scratch) { + auto valueSize = vector->type()->cppSizeInBytes(); + VELOX_UNSUPPORTED(); +} + +void estimateWrapperSerializedSize( + const folly::Range& rows, + vector_size_t** sizes, + const BaseVector* wrapper, + Scratch& scratch) { + ScratchPtr innerRowsHolder(scratch); + ScratchPtr innerSizesHolder(scratch); + const int32_t numRows = rows.size(); + int32_t numInner = 0; + auto innerRows = innerRowsHolder.get(numRows); + vector_size_t** innerSizes = innerSizesHolder.get(numRows); + const BaseVector* wrapped; + if (wrapper->encoding() == VectorEncoding::Simple::DICTIONARY && + !wrapper->rawNulls()) { + // Dictionary with no nulls. + auto* indices = wrapper->wrapInfo()->as(); + wrapped = wrapper->valueVector().get(); + simd::transpose(indices, rows, innerRows); + } else { + wrapped = wrapper->wrappedVector(); + for (int32_t i = 0; i < rows.size(); ++i) { + if (!wrapper->isNullAt(rows[i])) { + innerRows[numInner] = wrapper->wrappedIndex(rows[i]); + innerSizes[numInner] = sizes[i]; + ++numInner; + } + } + } + if (numInner == 0) { + return; + } + estimateSerializedSizeInt( + wrapped, + folly::Range(innerRows, numInner), + innerSizes, + scratch); +} + +template +void estimateConstantSerializedSize( + const BaseVector* vector, + const folly::Range& rows, + vector_size_t** sizes, + Scratch& scratch) { + VELOX_CHECK(vector->encoding() == VectorEncoding::Simple::CONSTANT); + using T = typename KindToFlatVector::WrapperType; + auto constantVector = vector->as>(); + int32_t elementSize = sizeof(T); + if (constantVector->isNullAt(0)) { + elementSize = 1; + } else if (vector->valueVector()) { + auto values = constantVector->wrappedVector(); + vector_size_t* sizePtr = &elementSize; + vector_size_t singleRow = constantVector->wrappedIndex(0); + estimateSerializedSizeInt( + values, + folly::Range(&singleRow, 1), + &sizePtr, + scratch); + } else if (std::is_same_v) { + auto value = constantVector->valueAt(0); + auto string = reinterpret_cast(&value); + elementSize = string->size(); + } + for (int32_t i = 0; i < rows.size(); ++i) { + *sizes[i] += elementSize; + } +} +void estimateSerializedSizeInt( + const BaseVector* vector, + const folly::Range& rows, + vector_size_t** sizes, + Scratch& scratch) { + const auto numRows = rows.size(); + if (vector->type()->isFixedWidth() && !vector->mayHaveNullsRecursive()) { + const auto elementSize = vector->type()->cppSizeInBytes(); + for (auto i = 0; i < numRows; ++i) { + *sizes[i] += elementSize; + } + return; + } + switch (vector->encoding()) { + case VectorEncoding::Simple::FLAT: { + VELOX_DYNAMIC_TYPE_DISPATCH_ALL( + estimateFlatSerializedSize, + vector->typeKind(), + vector, + rows, + sizes, + scratch); + break; + } + case VectorEncoding::Simple::CONSTANT: + VELOX_DYNAMIC_TYPE_DISPATCH_ALL( + estimateConstantSerializedSize, + vector->typeKind(), + vector, + rows, + sizes, + scratch); + break; + case VectorEncoding::Simple::DICTIONARY: + case VectorEncoding::Simple::SEQUENCE: + estimateWrapperSerializedSize(rows, sizes, vector, scratch); + break; + case VectorEncoding::Simple::BIASED: + estimateBiasedSerializedSize(vector, rows, sizes, scratch); + break; + case VectorEncoding::Simple::ROW: { + ScratchPtr innerRowsHolder(scratch); + ScratchPtr innerSizesHolder(scratch); + ScratchPtr nullsHolder(scratch); + auto* innerRows = rows.data(); + auto* innerSizes = sizes; + const auto numRows = rows.size(); + int32_t numInner = numRows; + if (vector->mayHaveNulls()) { + auto nulls = nullsHolder.get(bits::nwords(numRows)); + simd::gatherBits(vector->rawNulls(), rows, nulls); + auto mutableInnerRows = innerRowsHolder.get(numRows); + numInner = simd::indicesOfSetBits(nulls, 0, numRows, mutableInnerRows); + innerSizes = innerSizesHolder.get(numInner); + for (auto i = 0; i < numInner; ++i) { + innerSizes[i] = sizes[mutableInnerRows[i]]; + } + simd::transpose( + rows.data(), + folly::Range(mutableInnerRows, numInner), + mutableInnerRows); + innerRows = mutableInnerRows; + } + auto rowVector = vector->as(); + auto& children = rowVector->children(); + for (auto& child : children) { + if (child) { + estimateSerializedSizeInt( + child.get(), + folly::Range(innerRows, numInner), + innerSizes, + scratch); + } + } + break; + } + case VectorEncoding::Simple::MAP: { + auto mapVector = vector->asUnchecked(); + ScratchPtr rangeHolder(scratch); + ScratchPtr sizesHolder(scratch); + const auto numRanges = rowsToRanges( + rows, + mapVector->rawNulls(), + mapVector->rawOffsets(), + mapVector->rawSizes(), + sizes, + rangeHolder, + &sizesHolder, + nullptr, + scratch); + if (numRanges == 0) { + return; + } + estimateSerializedSizeInt( + mapVector->mapKeys().get(), + folly::Range(rangeHolder.get(), numRanges), + sizesHolder.get(), + scratch); + estimateSerializedSizeInt( + mapVector->mapValues().get(), + folly::Range(rangeHolder.get(), numRanges), + sizesHolder.get(), + scratch); + break; + } + case VectorEncoding::Simple::ARRAY: { + auto arrayVector = vector->as(); + ScratchPtr rangeHolder(scratch); + ScratchPtr sizesHolder(scratch); + const auto numRanges = rowsToRanges( + rows, + arrayVector->rawNulls(), + arrayVector->rawOffsets(), + arrayVector->rawSizes(), + sizes, + rangeHolder, + &sizesHolder, + nullptr, + scratch); + if (numRanges == 0) { + return; + } + estimateSerializedSizeInt( + arrayVector->elements().get(), + folly::Range(rangeHolder.get(), numRanges), + sizesHolder.get(), + scratch); + break; + } + case VectorEncoding::Simple::LAZY: + estimateSerializedSizeInt(vector->loadedVector(), rows, sizes, scratch); break; default: VELOX_CHECK(false, "Unsupported vector encoding {}", vector->encoding()); @@ -2061,7 +3036,8 @@ class PrestoVectorSerializer : public VectorSerializer { void append( const RowVectorPtr& vector, - const folly::Range& ranges) override { + const folly::Range& ranges, + Scratch& scratch) override { auto newRows = rangesTotalSize(ranges); if (newRows > 0) { numRows_ += newRows; @@ -2071,6 +3047,20 @@ class PrestoVectorSerializer : public VectorSerializer { } } + void append( + const RowVectorPtr& vector, + const folly::Range& rows, + Scratch& scratch) override { + auto newRows = rows.size(); + if (newRows > 0) { + numRows_ += newRows; + for (int32_t i = 0; i < vector->childrenSize(); ++i) { + serializeColumn( + vector->childAt(i).get(), rows, streams_[i].get(), scratch); + } + } + } + void appendEncoded( const RowVectorPtr& vector, const folly::Range& ranges) { @@ -2254,8 +3244,17 @@ class PrestoVectorSerializer : public VectorSerializer { void PrestoVectorSerde::estimateSerializedSize( VectorPtr vector, const folly::Range& ranges, - vector_size_t** sizes) { - estimateSerializedSizeInt(vector->loadedVector(), ranges, sizes); + vector_size_t** sizes, + Scratch& scratch) { + estimateSerializedSizeInt(vector->loadedVector(), ranges, sizes, scratch); +} + +void PrestoVectorSerde::estimateSerializedSize( + VectorPtr vector, + const folly::Range rows, + vector_size_t** sizes, + Scratch& scratch) { + estimateSerializedSizeInt(vector->loadedVector(), rows, sizes, scratch); } std::unique_ptr PrestoVectorSerde::createSerializer( @@ -2387,6 +3386,14 @@ void testingScatterStructNulls( // static void PrestoVectorSerde::registerVectorSerde() { + auto toByte = [](int32_t number, int32_t bit) { + return static_cast(bits::isBitSet(&number, bit)) << (bit * 8); + }; + for (auto i = 0; i < 256; ++i) { + bitsToBytesMap[i] = toByte(i, 0) | toByte(i, 1) | toByte(i, 2) | + toByte(i, 3) | toByte(i, 4) | toByte(i, 5) | toByte(i, 6) | + toByte(i, 7); + } velox::registerVectorSerde(std::make_unique()); } diff --git a/velox/serializers/PrestoSerializer.h b/velox/serializers/PrestoSerializer.h index da20b21cc008b..29eb40b69d853 100644 --- a/velox/serializers/PrestoSerializer.h +++ b/velox/serializers/PrestoSerializer.h @@ -43,10 +43,19 @@ class PrestoVectorSerde : public VectorSerde { std::vector encodings; }; + /// Adds the serialized sizes of the rows of 'vector' in 'ranges[i]' to + /// '*sizes[i]'. void estimateSerializedSize( VectorPtr vector, const folly::Range& ranges, - vector_size_t** sizes) override; + vector_size_t** sizes, + Scratch& scratch) override; + + void estimateSerializedSize( + VectorPtr vector, + const folly::Range rows, + vector_size_t** sizes, + Scratch& scratch) override; std::unique_ptr createSerializer( RowTypePtr type, diff --git a/velox/serializers/UnsafeRowSerializer.cpp b/velox/serializers/UnsafeRowSerializer.cpp index e94cbb44ec8d0..342311c5d9264 100644 --- a/velox/serializers/UnsafeRowSerializer.cpp +++ b/velox/serializers/UnsafeRowSerializer.cpp @@ -23,7 +23,8 @@ namespace facebook::velox::serializer::spark { void UnsafeRowVectorSerde::estimateSerializedSize( VectorPtr /* vector */, const folly::Range& /* ranges */, - vector_size_t** /* sizes */) { + vector_size_t** /* sizes */, + Scratch& /*scratch*/) { VELOX_UNSUPPORTED(); } @@ -37,7 +38,8 @@ class UnsafeRowVectorSerializer : public VectorSerializer { void append( const RowVectorPtr& vector, - const folly::Range& ranges) override { + const folly::Range& ranges, + Scratch& /*scratch*/) override { size_t totalSize = 0; row::UnsafeRowFast unsafeRow(vector); if (auto fixedRowSize = diff --git a/velox/serializers/UnsafeRowSerializer.h b/velox/serializers/UnsafeRowSerializer.h index 995e58498e5b6..1bf41c4f0cae7 100644 --- a/velox/serializers/UnsafeRowSerializer.h +++ b/velox/serializers/UnsafeRowSerializer.h @@ -26,7 +26,8 @@ class UnsafeRowVectorSerde : public VectorSerde { void estimateSerializedSize( VectorPtr vector, const folly::Range& ranges, - vector_size_t** sizes) override; + vector_size_t** sizes, + Scratch& scratch) override; // This method is not used in production code. It is only used to // support round-trip tests for deserialization. diff --git a/velox/serializers/tests/CMakeLists.txt b/velox/serializers/tests/CMakeLists.txt index e6ddedd5d9b92..0d2a86bee18a5 100644 --- a/velox/serializers/tests/CMakeLists.txt +++ b/velox/serializers/tests/CMakeLists.txt @@ -28,3 +28,15 @@ target_link_libraries( gtest_main gflags::gflags glog::glog) + +add_executable(velox_serializer_benchmark SerializerBenchmark.cpp) + +target_link_libraries( + velox_serializer_benchmark + velox_presto_serializer + velox_vector_test_lib + velox_vector_fuzzer + gtest + gtest_main + gflags::gflags + glog::glog) diff --git a/velox/serializers/tests/CompactRowSerializerTest.cpp b/velox/serializers/tests/CompactRowSerializerTest.cpp index 4e8524fdbe3e0..55a6a24199e73 100644 --- a/velox/serializers/tests/CompactRowSerializerTest.cpp +++ b/velox/serializers/tests/CompactRowSerializerTest.cpp @@ -41,7 +41,8 @@ class CompactRowSerializerTest : public ::testing::Test, auto rowType = asRowType(rowVector->type()); auto serializer = serde_->createSerializer(rowType, numRows, arena.get()); - serializer->append(rowVector, folly::Range(rows.data(), numRows)); + Scratch scratch; + serializer->append(rowVector, folly::Range(rows.data(), numRows), scratch); auto size = serializer->maxSerializedSize(); OStreamOutputStream out(output); serializer->flush(&out); diff --git a/velox/serializers/tests/PrestoSerializerTest.cpp b/velox/serializers/tests/PrestoSerializerTest.cpp index 0165c273706c9..f211c021a6d4e 100644 --- a/velox/serializers/tests/PrestoSerializerTest.cpp +++ b/velox/serializers/tests/PrestoSerializerTest.cpp @@ -18,6 +18,8 @@ #include #include #include "velox/common/base/tests/GTestUtils.h" +#include "velox/common/memory/ByteStream.h" +#include "velox/common/time/Timer.h" #include "velox/functions/prestosql/types/TimestampWithTimeZoneType.h" #include "velox/vector/fuzzer/VectorFuzzer.h" #include "velox/vector/tests/utils/VectorTestBase.h" @@ -38,6 +40,7 @@ class PrestoSerializerTest } void sanityCheckEstimateSerializedSize(const RowVectorPtr& rowVector) { + Scratch scratch; const auto numRows = rowVector->size(); std::vector rows(numRows); @@ -51,7 +54,10 @@ class PrestoSerializerTest rawRowSizes[i] = &rowSizes[i]; } serde_->estimateSerializedSize( - rowVector, folly::Range(rows.data(), numRows), rawRowSizes.data()); + rowVector, + folly::Range(rows.data(), numRows), + rawRowSizes.data(), + scratch); } serializer::presto::PrestoVectorSerde::PrestoOptions getParamSerdeOptions( @@ -68,8 +74,9 @@ class PrestoSerializerTest void serialize( const RowVectorPtr& rowVector, std::ostream* output, - const serializer::presto::PrestoVectorSerde::PrestoOptions* - serdeOptions) { + const serializer::presto::PrestoVectorSerde::PrestoOptions* serdeOptions, + std::optional> indexRanges = std::nullopt, + std::optional> rows = std::nullopt) { auto streamInitialSize = output->tellp(); sanityCheckEstimateSerializedSize(rowVector); @@ -79,9 +86,34 @@ class PrestoSerializerTest auto paramOptions = getParamSerdeOptions(serdeOptions); auto serializer = serde_->createSerializer(rowType, numRows, arena.get(), ¶mOptions); - - serializer->append(rowVector); + vector_size_t sizeEstimate = 0; + + Scratch scratch; + if (indexRanges.has_value()) { + raw_vector sizes(indexRanges.value().size()); + std::fill(sizes.begin(), sizes.end(), &sizeEstimate); + serde_->estimateSerializedSize( + rowVector, indexRanges.value(), sizes.data(), scratch); + serializer->append(rowVector, indexRanges.value(), scratch); + } else if (rows.has_value()) { + raw_vector sizes(rows.value().size()); + std::fill(sizes.begin(), sizes.end(), &sizeEstimate); + serde_->estimateSerializedSize( + rowVector, rows.value(), sizes.data(), scratch); + serializer->append(rowVector, rows.value(), scratch); + } else { + vector_size_t* sizes = &sizeEstimate; + IndexRange range{0, rowVector->size()}; + serde_->estimateSerializedSize( + rowVector, + folly::Range(&range, 1), + &sizes, + scratch); + serializer->append(rowVector); + } auto size = serializer->maxSerializedSize(); + LOG(INFO) << "Size=" << size << " estimate=" << sizeEstimate << " " + << (100 * sizeEstimate) / size << "%"; facebook::velox::serializer::presto::PrestoOutputStreamListener listener; OStreamOutputStream out(output, &listener); serializer->flush(&out); @@ -164,6 +196,35 @@ class PrestoSerializerTest } assertEqualVectors(result, rowVector); + + // Serialize the vector with even and odd rows in different partitions. + auto even = + makeIndices(rowVector->size() / 2, [&](auto row) { return row * 2; }); + auto odd = makeIndices( + (rowVector->size() - 1) / 2, [&](auto row) { return (row * 2) + 1; }); + testSerializeRows(rowVector, even, serdeOptions); + testSerializeRows(rowVector, odd, serdeOptions); + } + + void testSerializeRows( + const RowVectorPtr& rowVector, + BufferPtr indices, + const serializer::presto::PrestoVectorSerde::PrestoOptions* + serdeOptions) { + std::ostringstream out; + auto rows = folly::Range( + indices->as(), indices->size() / sizeof(vector_size_t)); + serialize(rowVector, &out, serdeOptions, std::nullopt, rows); + + auto rowType = asRowType(rowVector->type()); + auto deserialized = deserialize(rowType, out.str(), serdeOptions); + assertEqualVectors( + deserialized, + BaseVector::wrapInDictionary( + BufferPtr(nullptr), + indices, + indices->size() / sizeof(vector_size_t), + rowVector)); } void serializeEncoded( @@ -522,12 +583,19 @@ TEST_P(PrestoSerializerTest, roundTrip) { VectorFuzzer::Options::TimestampPrecision::kMilliSeconds; opts.nullRatio = 0.1; VectorFuzzer fuzzer(opts, pool_.get()); + VectorFuzzer::Options nonNullOpts; + nonNullOpts.timestampPrecision = + VectorFuzzer::Options::TimestampPrecision::kMilliSeconds; + nonNullOpts.nullRatio = 0; + VectorFuzzer nonNullFuzzer(nonNullOpts, pool_.get()); const size_t numRounds = 20; for (size_t i = 0; i < numRounds; ++i) { auto rowType = fuzzer.randRowType(); - auto inputRowVector = fuzzer.fuzzInputRow(rowType); + + auto inputRowVector = (i % 2 == 0) ? fuzzer.fuzzInputRow(rowType) + : nonNullFuzzer.fuzzInputRow(rowType); testRoundTrip(inputRowVector); } } diff --git a/velox/serializers/tests/SerializerBenchmark.cpp b/velox/serializers/tests/SerializerBenchmark.cpp new file mode 100644 index 0000000000000..29d8290b05cea --- /dev/null +++ b/velox/serializers/tests/SerializerBenchmark.cpp @@ -0,0 +1,151 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include +#include +#include +#include "velox/common/base/tests/GTestUtils.h" +#include "velox/common/memory/ByteStream.h" +#include "velox/common/time/Timer.h" +#include "velox/functions/prestosql/types/TimestampWithTimeZoneType.h" +#include "velox/serializers/PrestoSerializer.h" +#include "velox/vector/fuzzer/VectorFuzzer.h" +#include "velox/vector/tests/utils/VectorTestBase.h" + +using namespace facebook::velox; +using namespace facebook::velox::test; + +class SerializerBenchmark : public ::testing::Test, public VectorTestBase { + protected: + void SetUp() override { + serializer::presto::PrestoVectorSerde::registerVectorSerde(); + serde_ = std::make_unique(); + } + + std::unique_ptr serde_; +}; + +struct SerializeCase { + int32_t nullPct; + int32_t numSelected; + int32_t bits; + int32_t vectorSize; + uint64_t irTime{0}; + uint64_t rrTime{0}; + + std::string toString() { + return fmt::format( + "{} of {} {} bit {}%null: {} ir / {} rr", + numSelected, + vectorSize, + bits, + nullPct, + irTime, + rrTime); + } +}; + +TEST_F(SerializerBenchmark, timeFlat) { + // Serialize different fractions of a 10K vector of int32_t and int64_t with + // IndexRange and row range variants with and without nulls. + constexpr int32_t kPad = 8; + std::vector numSelectedValues = {3, 30, 300, 10000}; + std::vector> indexRanges; + std::vector> rowSets; + std::vector nullPctValues = {0, 1, 10, 90}; + std::vector bitsValues = {32, 64}; + const int32_t vectorSize = 10000; + for (auto numSelected : numSelectedValues) { + std::vector ir; + std::vector rr; + int32_t step = vectorSize / numSelected; + for (auto r = 0; r < vectorSize; r += step) { + ir.push_back(IndexRange{r, 1}); + rr.push_back(r); + } + std::cout << rr.size(); + indexRanges.push_back(std::move(ir)); + rr.resize(rr.size() + kPad, 999999999); + rowSets.push_back(std::move(rr)); + } + VectorMaker vm(pool_.get()); + std::vector v32s; + std::vector v64s; + for (auto nullPct : nullPctValues) { + auto v32 = vm.flatVector( + vectorSize, + [&](auto row) { return row; }, + [&](auto row) { return nullPct == 0 ? false : row % 100 < nullPct; }); + auto v64 = vm.flatVector( + vectorSize, + [&](auto row) { return row; }, + [&](auto row) { return nullPct == 0 ? false : row % 100 < nullPct; }); + v32s.push_back(std::move(v32)); + v64s.push_back(std::move(v64)); + } + std::vector cases; + Scratch scratch; + + auto runCase = [&](int32_t nullIdx, int32_t selIdx, int32_t bits) { + SerializeCase item; + item.vectorSize = vectorSize; + item.nullPct = nullPctValues[nullIdx]; + item.numSelected = numSelectedValues[selIdx]; + item.bits = bits; + int32_t numRepeat = 100 * vectorSize / indexRanges[selIdx].size(); + + VectorPtr vector = bits == 32 ? v32s[nullIdx] : v64s[nullIdx]; + auto rowType = ROW({vector->type()}); + auto rowVector = vm.rowVector({vector}); + { + MicrosecondTimer t(&item.irTime); + auto group = std::make_unique(pool_.get()); + group->createStreamTree(rowType, rowSets[selIdx].size() - kPad); + for (auto repeat = 0; repeat < numRepeat; ++repeat) { + group->append( + rowVector, + folly::Range( + indexRanges[selIdx].data(), indexRanges[selIdx].size()), + scratch); + } + } + + { + MicrosecondTimer t(&item.rrTime); + auto group = std::make_unique(pool_.get()); + group->createStreamTree(rowType, rowSets[selIdx].size()); + + for (auto repeat = 0; repeat < numRepeat; ++repeat) { + group->append( + rowVector, + folly::Range(rowSets[selIdx].data(), rowSets[selIdx].size() - kPad), + scratch); + } + } + return item; + }; + + for (auto bits : bitsValues) { + for (auto nullIdx = 0; nullIdx < nullPctValues.size(); ++nullIdx) { + for (auto selIdx = 0; selIdx < numSelectedValues.size(); ++selIdx) { + int32_t numRepeat = 10 / numSelectedValues[selIdx]; + cases.push_back(runCase(nullIdx, selIdx, bits)); + } + } + } + for (auto& item : cases) { + std::cout << item.toString() << std::endl; + } +} diff --git a/velox/serializers/tests/UnsafeRowSerializerTest.cpp b/velox/serializers/tests/UnsafeRowSerializerTest.cpp index d686a90b67d76..28338f9b29397 100644 --- a/velox/serializers/tests/UnsafeRowSerializerTest.cpp +++ b/velox/serializers/tests/UnsafeRowSerializerTest.cpp @@ -41,7 +41,8 @@ class UnsafeRowSerializerTest : public ::testing::Test, auto rowType = std::dynamic_pointer_cast(rowVector->type()); auto serializer = serde_->createSerializer(rowType, numRows, arena.get()); - serializer->append(rowVector, folly::Range(rows.data(), numRows)); + Scratch scratch; + serializer->append(rowVector, folly::Range(rows.data(), numRows), scratch); auto size = serializer->maxSerializedSize(); OStreamOutputStream out(output); serializer->flush(&out); diff --git a/velox/vector/VectorStream.cpp b/velox/vector/VectorStream.cpp index 2d907dc1b35ce..fec8eb7a9fd13 100644 --- a/velox/vector/VectorStream.cpp +++ b/velox/vector/VectorStream.cpp @@ -15,12 +15,14 @@ */ #include "velox/vector/VectorStream.h" #include +#include "velox/common/base/RawVector.h" namespace facebook::velox { void VectorSerializer::append(const RowVectorPtr& vector) { const IndexRange allRows{0, vector->size()}; - append(vector, folly::Range(&allRows, 1)); + Scratch scratch; + append(vector, folly::Range(&allRows, 1), scratch); } namespace { @@ -102,8 +104,16 @@ void VectorStreamGroup::createStreamTree( void VectorStreamGroup::append( const RowVectorPtr& vector, - const folly::Range& ranges) { - serializer_->append(vector, ranges); + const folly::Range& ranges, + Scratch& scratch) { + serializer_->append(vector, ranges, scratch); +} + +void VectorStreamGroup::append( + const RowVectorPtr& vector, + const folly::Range& rows, + Scratch& scratch) { + serializer_->append(vector, rows, scratch); } void VectorStreamGroup::append(const RowVectorPtr& vector) { @@ -118,8 +128,18 @@ void VectorStreamGroup::flush(OutputStream* out) { void VectorStreamGroup::estimateSerializedSize( VectorPtr vector, const folly::Range& ranges, - vector_size_t** sizes) { - getVectorSerde()->estimateSerializedSize(vector, ranges, sizes); + vector_size_t** sizes, + Scratch& scratch) { + getVectorSerde()->estimateSerializedSize(vector, ranges, sizes, scratch); +} + +// static +void VectorStreamGroup::estimateSerializedSize( + VectorPtr vector, + folly::Range rows, + vector_size_t** sizes, + Scratch& scratch) { + getVectorSerde()->estimateSerializedSize(vector, rows, sizes, scratch); } // static @@ -148,7 +168,8 @@ folly::IOBuf rowVectorToIOBuf( streamGroup->createStreamTree(asRowType(rowVector->type()), rangeEnd); IndexRange range{0, rangeEnd}; - streamGroup->append(rowVector, folly::Range(&range, 1)); + Scratch scratch; + streamGroup->append(rowVector, folly::Range(&range, 1), scratch); IOBufOutputStream stream(pool); streamGroup->flush(&stream); diff --git a/velox/vector/VectorStream.h b/velox/vector/VectorStream.h index e79af35aaa8e5..5f1899195a735 100644 --- a/velox/vector/VectorStream.h +++ b/velox/vector/VectorStream.h @@ -15,7 +15,9 @@ */ #pragma once +#include #include "velox/buffer/Buffer.h" +#include "velox/common/base/Scratch.h" #include "velox/common/memory/ByteStream.h" #include "velox/common/memory/Memory.h" #include "velox/common/memory/MemoryAllocator.h" @@ -38,11 +40,31 @@ class VectorSerializer { /// Serialize a subset of rows in a vector. virtual void append( const RowVectorPtr& vector, - const folly::Range& ranges) = 0; + const folly::Range& ranges, + Scratch& scratch) = 0; + + virtual void append( + const RowVectorPtr& vector, + const folly::Range& ranges) { + Scratch scratch; + append(vector, ranges, scratch); + } + + virtual void append( + const RowVectorPtr& vector, + const folly::Range& rows, + Scratch& scratch) { + VELOX_UNSUPPORTED(); + } /// Serialize all rows in a vector. void append(const RowVectorPtr& vector); + // True if supports append with folly::Range. + virtual bool supportsAppendRows() const { + return false; + } + /// Returns the maximum serialized size of the data previously added via /// 'append' methods. Can be used to allocate buffer of exact or maximum size /// before calling 'flush'. @@ -70,10 +92,32 @@ class VectorSerde { virtual ~Options() {} }; + /// Adds the serialized size of vector at 'rows[i]' to '*sizes[i]'. + virtual void estimateSerializedSize( + VectorPtr vector, + folly::Range rows, + vector_size_t** sizes, + Scratch& scratch) { + VELOX_UNSUPPORTED(); + } + + /// Adds the serialized sizes of the rows of 'vector' in 'ranges[i]' to + /// '*sizes[i]'. + virtual void estimateSerializedSize( + VectorPtr vector, + const folly::Range& ranges, + vector_size_t** sizes, + Scratch& scratch) { + VELOX_UNSUPPORTED(); + } + virtual void estimateSerializedSize( VectorPtr vector, const folly::Range& ranges, - vector_size_t** sizes) = 0; + vector_size_t** sizes) { + Scratch scratch; + estimateSerializedSize(vector, ranges, sizes, scratch); + } virtual std::unique_ptr createSerializer( RowTypePtr type, @@ -153,14 +197,43 @@ class VectorStreamGroup : public StreamArena { int32_t numRows, const VectorSerde::Options* options = nullptr); + /// Increments sizes[i] for each ith row in 'rows' in 'vector'. static void estimateSerializedSize( VectorPtr vector, + folly::Range rows, + vector_size_t** sizes, + Scratch& scratch); + + static void estimateSerializedSize( + VectorPtr vector, + const folly::Range& ranges, + vector_size_t** sizes, + Scratch& scratch); + + static inline void estimateSerializedSize( + VectorPtr vector, + const folly::Range& ranges, + vector_size_t** sizes) { + Scratch scratch; + estimateSerializedSize(vector, ranges, sizes, scratch); + } + + void append( + const RowVectorPtr& vector, const folly::Range& ranges, - vector_size_t** sizes); + Scratch& scratch); + + void append( + const RowVectorPtr& vector, + const folly::Range& ranges) { + Scratch scratch; + append(vector, ranges, scratch); + } void append( const RowVectorPtr& vector, - const folly::Range& ranges); + const folly::Range& rows, + Scratch& scratch); void append(const RowVectorPtr& vector);