diff --git a/velox/serializers/CompactRowSerializer.cpp b/velox/serializers/CompactRowSerializer.cpp index db70b3882ddd..a47be6c7e742 100644 --- a/velox/serializers/CompactRowSerializer.cpp +++ b/velox/serializers/CompactRowSerializer.cpp @@ -22,7 +22,8 @@ namespace facebook::velox::serializer { void CompactRowVectorSerde::estimateSerializedSize( VectorPtr /* vector */, const folly::Range& /* ranges */, - vector_size_t** /* sizes */) { + vector_size_t** /* sizes */, + Scratch& /*scratch*/) { VELOX_UNSUPPORTED(); } @@ -36,7 +37,8 @@ class CompactRowVectorSerializer : public VectorSerializer { void append( const RowVectorPtr& vector, - const folly::Range& ranges) override { + const folly::Range& ranges, + Scratch& scratch) override { size_t totalSize = 0; row::CompactRow row(vector); if (auto fixedRowSize = diff --git a/velox/serializers/CompactRowSerializer.h b/velox/serializers/CompactRowSerializer.h index 48737d55c3d9..10c5a928cb49 100644 --- a/velox/serializers/CompactRowSerializer.h +++ b/velox/serializers/CompactRowSerializer.h @@ -28,7 +28,8 @@ class CompactRowVectorSerde : public VectorSerde { void estimateSerializedSize( VectorPtr vector, const folly::Range& ranges, - vector_size_t** sizes) override; + vector_size_t** sizes, + Scratch& scratch) override; // This method is not used in production code. It is only used to // support round-trip tests for deserialization. diff --git a/velox/serializers/PrestoSerializer.cpp b/velox/serializers/PrestoSerializer.cpp index 53747c6f4d4a..4b30375b54b8 100644 --- a/velox/serializers/PrestoSerializer.cpp +++ b/velox/serializers/PrestoSerializer.cpp @@ -1077,6 +1077,11 @@ class CountingOutputStream : public OutputStream { std::streampos pos_{0}; }; +raw_vector& threadTempNulls() { + thread_local raw_vector temp; + return temp; +} + // Appendable container for serialized values. To append a value at a // time, call appendNull or appendNonNull first. Then call // appendLength if the type has a length. A null value has a length of @@ -1094,7 +1099,8 @@ class VectorStream { useLosslessTimestamp_(useLosslessTimestamp), nulls_(streamArena, true, true), lengths_(streamArena), - values_(streamArena) { + values_(streamArena), + isLongDecimal_(type_->isLongDecimal()) { if (initialNumRows == 0) { initializeHeader(typeToEncodingName(type), *streamArena); return; @@ -1198,6 +1204,72 @@ class VectorStream { lengths_.appendOne(totalLength_); } + void appendNulls( + const uint64_t* nulls, + int32_t begin, + int32_t end, + int32_t numNonNull) { + VELOX_DCHECK_EQ(numNonNull, bits::countBits(nulls, begin, end)); + const auto numRows = end - begin; + const auto numNulls = numRows - numNonNull; + if (numNulls == 0 && nullCount_ == 0) { + nonNullCount_ += numNonNull; + return; + } + if (UNLIKELY(numNulls > 0 && nonNullCount_ > 0 && nullCount_ == 0)) { + // There were only non-nulls up until now. Add the bits for them. + nulls_.appendBool(false, nonNullCount_); + } + nullCount_ += numNulls; + nonNullCount_ += numNonNull; + if (LIKELY(end <= 64)) { + uint64_t inverted = ~nulls[0]; + nulls_.appendBitsFresh(&inverted, begin, end); + return; + } + const int32_t firstWord = begin >> 6; + const int32_t firstBit = begin & 63; + const auto numWords = bits::nwords(numRows + firstBit); + // The polarity of nulls is reverse in wire format. Make an inverted copy. + uint64_t smallNulls[16]; + uint64_t* invertedNulls = smallNulls; + if (numWords > sizeof(smallNulls) / sizeof(smallNulls[0])) { + auto& tempNulls = threadTempNulls(); + tempNulls.resize(numWords + 1); + invertedNulls = tempNulls.data(); + } + for (auto i = 0; i < numWords; ++i) { + invertedNulls[i] = ~nulls[i + firstWord]; + } + nulls_.appendBitsFresh(invertedNulls, firstBit, firstBit + numRows); + } + + // Appends a zero length for each null bit and a length from lengthFunc(row) + // for non-nulls in rows. + template + void appendLengths( + const uint64_t* nulls, + folly::Range rows, + int32_t numNonNull, + LengthFunc lengthFunc) { + const auto numRows = rows.size(); + if (!nulls) { + appendNonNull(numRows); + for (auto i = 0; i < numRows; ++i) { + appendLength(lengthFunc(rows[i])); + } + } else { + appendNulls(nulls, 0, numRows, numNonNull); + for (auto i = 0; i < numRows; ++i) { + if (bits::isBitSet(nulls, i)) { + appendLength(lengthFunc(rows[i])); + } else { + appendLength(0); + } + } + } + } + template void append(folly::Range values) { values_.append(values); @@ -1212,6 +1284,14 @@ class VectorStream { return children_[index].get(); } + auto& values() { + return values_; + } + + auto& nulls() { + return nulls_; + } + // Returns the size to flush to OutputStream before calling `flush`. size_t serializedSize() { CountingOutputStream out; @@ -1310,6 +1390,10 @@ class VectorStream { } } + bool isLongDecimal() const { + return isLongDecimal_; + } + private: const TypePtr type_; const std::optional encoding_; @@ -1326,6 +1410,7 @@ class VectorStream { ByteOutputStream lengths_; ByteOutputStream values_; std::vector> children_; + const bool isLongDecimal_; }; template <> @@ -1374,7 +1459,7 @@ template <> void VectorStream::append(folly::Range values) { for (auto& value : values) { int128_t val = value; - if (type_->isLongDecimal()) { + if (isLongDecimal_) { val = toJavaDecimalValue(value); } values_.append(folly::Range(&val, 1)); @@ -1464,6 +1549,12 @@ void serializeColumn( const folly::Range& ranges, VectorStream* stream); +void serializeColumn( + const BaseVector* vector, + const folly::Range& rows, + VectorStream* stream, + Scratch& scratch); + void serializeWrapped( const BaseVector* vector, const folly::Range& ranges, @@ -1710,6 +1801,605 @@ void serializeColumn( } } +// Returns ranges for the non-null rows of an array or map. 'rows' gives the +// rows. nulls is the nulls of the array/map or nullptr if no nulls. 'offsets' +// and 'sizes' are the offsets and sizes of the array/map.Returns the number of +// index ranges. Obtains the ranges from 'rangesHolder'. If 'sizesPtr' is +// non-null, gets returns the sizes for the inner ranges in 'sizesHolder'. If +// 'stream' is non-null, writes the lengths and nulls for the array/map into +// 'stream'. +int32_t rowsToRanges( + folly::Range rows, + const uint64_t* rawNulls, + const vector_size_t* offsets, + const vector_size_t* sizes, + vector_size_t** sizesPtr, + ScratchPtr& rangesHolder, + ScratchPtr* sizesHolder, + VectorStream* stream, + Scratch& scratch) { + auto numRows = rows.size(); + auto* innerRows = rows.data(); + auto* nonNullRows = innerRows; + int32_t numInner = rows.size(); + ScratchPtr nonNullHolder(scratch); + ScratchPtr innerRowsHolder(scratch); + if (rawNulls) { + ScratchPtr nullsHolder(scratch); + auto* nulls = nullsHolder.get(bits::nwords(rows.size())); + simd::gatherBits(rawNulls, rows, nulls); + auto* mutableNonNullRows = nonNullHolder.get(numRows); + auto* mutableInnerRows = innerRowsHolder.get(numRows); + numInner = simd::indicesOfSetBits(nulls, 0, numRows, mutableNonNullRows); + if (stream) { + stream->appendLengths( + nulls, rows, numInner, [&](auto row) { return sizes[row]; }); + } + simd::transpose( + rows.data(), + folly::Range(mutableNonNullRows, numInner), + mutableInnerRows); + nonNullRows = mutableNonNullRows; + innerRows = mutableInnerRows; + } else if (stream) { + stream->appendNonNull(rows.size()); + for (auto i = 0; i < rows.size(); ++i) { + stream->appendLength(sizes[rows[i]]); + } + } + vector_size_t** sizesOut = nullptr; + if (sizesPtr) { + sizesOut = sizesHolder->get(numInner); + } + auto ranges = rangesHolder.get(numInner); + int32_t fill = 0; + for (auto i = 0; i < numInner; ++i) { + if (sizes[innerRows[i]] == 0) { + continue; + } + if (sizesOut) { + sizesOut[fill] = sizesPtr[rawNulls ? nonNullRows[i] : i]; + } + ranges[fill].begin = offsets[innerRows[i]]; + ranges[fill].size = sizes[innerRows[i]]; + ++fill; + } + return fill; +} + +template +void copyWords( + T* destination, + const int32_t* indices, + int32_t numIndices, + const T* values, + bool isLongDecimal = false) { + if (std::is_same_v && isLongDecimal) { + for (auto i = 0; i < numIndices; ++i) { + reinterpret_cast(destination)[i] = toJavaDecimalValue( + reinterpret_cast(values)[indices[i]]); + } + return; + } + for (auto i = 0; i < numIndices; ++i) { + destination[i] = values[indices[i]]; + } +} + +template +void copyWordsWithRows( + T* destination, + const int32_t* rows, + const int32_t* indices, + int32_t numIndices, + const T* values, + bool isLongDecimal = false) { + if (!indices) { + copyWords(destination, rows, numIndices, values, isLongDecimal); + return; + } + if (std::is_same_v && isLongDecimal) { + for (auto i = 0; i < numIndices; ++i) { + reinterpret_cast(destination)[i] = toJavaDecimalValue( + reinterpret_cast(values)[rows[indices[i]]]); + } + return; + } + for (auto i = 0; i < numIndices; ++i) { + destination[i] = values[rows[indices[i]]]; + } +} + +template +void appendNonNull( + VectorStream* stream, + const uint64_t* nulls, + folly::Range rows, + const T* values, + Scratch& scratch) { + auto numRows = rows.size(); + ScratchPtr nonNullHolder(scratch); + const int32_t* nonNullIndices; + int32_t numNonNull; + if (LIKELY(numRows <= 8)) { + // Short batches need extra optimization. The set bits are prematerialized. + uint8_t nullsByte = *reinterpret_cast(nulls); + numNonNull = __builtin_popcount(nullsByte); + nonNullIndices = + numNonNull == numRows ? nullptr : simd::byteSetBits(nullsByte); + } else { + auto mutableIndices = nonNullHolder.get(numRows); + // Convert null flags to indices. This is much faster than checking bits one + // by one, several bits per clock specially if mostly null or non-null. Even + // worst case of half nulls is more than one row per clock. + numNonNull = simd::indicesOfSetBits(nulls, 0, numRows, mutableIndices); + nonNullIndices = numNonNull == numRows ? nullptr : mutableIndices; + } + stream->appendNulls(nulls, 0, rows.size(), numNonNull); + ByteOutputStream& out = stream->values(); + + if constexpr (sizeof(T) == 8) { + AppendWindow window(out, scratch); + int64_t* output = window.get(numNonNull); + copyWordsWithRows( + output, + rows.data(), + nonNullIndices, + numNonNull, + reinterpret_cast(values)); + } else if constexpr (sizeof(T) == 4) { + AppendWindow window(out, scratch); + int32_t* output = window.get(numNonNull); + copyWordsWithRows( + output, + rows.data(), + nonNullIndices, + numNonNull, + reinterpret_cast(values)); + } else { + AppendWindow window(out, scratch); + T* output = window.get(numNonNull); + copyWordsWithRows( + output, + rows.data(), + nonNullIndices, + numNonNull, + values, + stream->isLongDecimal()); + } +} + +void appendStrings( + const uint64_t* nulls, + folly::Range rows, + const StringView* views, + VectorStream* stream, + Scratch& scratch) { + if (!nulls) { + stream->appendLengths(nullptr, rows, rows.size(), [&](auto row) { + return views[row].size(); + }); + for (auto i = 0; i < rows.size(); ++i) { + auto& view = views[rows[i]]; + stream->values().appendStringView( + std::string_view(view.data(), view.size())); + } + return; + } + ScratchPtr nonNullHolder(scratch); + auto nonNull = nonNullHolder.get(rows.size()); + auto numNonNull = simd::indicesOfSetBits(nulls, 0, rows.size(), nonNull); + stream->appendLengths( + nulls, rows, numNonNull, [&](auto row) { return views[row].size(); }); + for (auto i = 0; i < numNonNull; ++i) { + auto& view = views[rows[nonNull[i]]]; + stream->values().appendStringView( + std::string_view(view.data(), view.size())); + } +} + +void appendTimestamps( + const uint64_t* nulls, + folly::Range rows, + const Timestamp* timestamps, + VectorStream* stream, + Scratch& scratch) { + if (!nulls) { + stream->appendNonNull(rows.size()); + for (auto i = 0; i < rows.size(); ++i) { + stream->appendOne(timestamps[rows[i]]); + } + return; + } + ScratchPtr nonNullHolder(scratch); + auto nonNullRows = nonNullHolder.get(rows.size()); + auto numNonNull = simd::indicesOfSetBits(nulls, 0, rows.size(), nonNullRows); + stream->appendNulls(nulls, 0, rows.size(), numNonNull); + for (auto i = 0; i < numNonNull; ++i) { + stream->appendOne(timestamps[rows[nonNullRows[i]]]); + } +} + +template +void serializeFlatVector( + const BaseVector* vector, + const folly::Range& rows, + VectorStream* stream, + Scratch& scratch) { + using T = typename TypeTraits::NativeType; + auto flatVector = vector->asUnchecked>(); + auto rawValues = flatVector->rawValues(); + if (!flatVector->mayHaveNulls()) { + if (std::is_same_v) { + appendTimestamps( + nullptr, + rows, + reinterpret_cast(rawValues), + stream, + scratch); + return; + } + + if (std::is_same_v) { + appendStrings( + nullptr, + rows, + reinterpret_cast(rawValues), + stream, + scratch); + return; + } + + stream->appendNonNull(rows.size()); + AppendWindow window(stream->values(), scratch); + T* output = window.get(rows.size()); + copyWords( + output, rows.data(), rows.size(), rawValues, stream->isLongDecimal()); + return; + } + ScratchPtr nullsHolder(scratch); + uint64_t* nulls = nullsHolder.get(bits::nwords(rows.size())); + simd::gatherBits(vector->rawNulls(), rows, nulls); + if (std::is_same_v) { + appendTimestamps( + nulls, + rows, + reinterpret_cast(rawValues), + stream, + scratch); + return; + } + if (std::is_same_v) { + appendStrings( + nulls, + rows, + reinterpret_cast(rawValues), + stream, + scratch); + return; + } + appendNonNull(stream, nulls, rows, rawValues, scratch); +} + +uint64_t bitsToBytesMap[256]; + +uint64_t bitsToBytes(uint8_t byte) { + return bitsToBytesMap[byte]; +} + +template <> +void serializeFlatVector( + const BaseVector* vector, + const folly::Range& rows, + VectorStream* stream, + Scratch& scratch) { + auto flatVector = reinterpret_cast*>(vector); + auto rawValues = flatVector->rawValues(); + ScratchPtr bitsHolder(scratch); + uint64_t* valueBits; + int32_t numValueBits; + if (!flatVector->mayHaveNulls()) { + stream->appendNonNull(rows.size()); + valueBits = bitsHolder.get(bits::nwords(rows.size())); + simd::gatherBits( + reinterpret_cast(rawValues), rows, valueBits); + numValueBits = rows.size(); + } else { + uint64_t* nulls = bitsHolder.get(bits::nwords(rows.size())); + simd::gatherBits(vector->rawNulls(), rows, nulls); + ScratchPtr nonNullsHolder(scratch); + auto nonNulls = nonNullsHolder.get(rows.size()); + numValueBits = simd::indicesOfSetBits(nulls, 0, rows.size(), nonNulls); + stream->appendNulls(nulls, 0, rows.size(), numValueBits); + valueBits = nulls; + simd::transpose( + rows.data(), + folly::Range(nonNulls, numValueBits), + nonNulls); + simd::gatherBits( + reinterpret_cast(rawValues), + folly::Range(nonNulls, numValueBits), + valueBits); + } + // 'valueBits' contains the non-null bools to be appended to the + // stream. The wire format has a byte for each bit. Every full byte + // is appended as a word. The partial byte is translated to a word + // and its low bytes are appended. + AppendWindow window(stream->values(), scratch); + uint8_t* output = window.get(numValueBits); + const auto numBytes = bits::nbytes(numValueBits); + for (auto i = 0; i < numBytes; ++i) { + uint64_t word = bitsToBytes(reinterpret_cast(valueBits)[i]); + if (i < numBytes - 1) { + reinterpret_cast(output)[i] = word; + } else { + memcpy(output + i * 8, &word, numValueBits - i * 8); + } + } +} + +void serializeWrapped( + const BaseVector* vector, + const folly::Range& rows, + VectorStream* stream, + Scratch& scratch) { + ScratchPtr innerRowsHolder(scratch); + const int32_t numRows = rows.size(); + int32_t numInner = 0; + auto innerRows = innerRowsHolder.get(numRows); + const BaseVector* wrapped; + if (vector->encoding() == VectorEncoding::Simple::DICTIONARY && + !vector->rawNulls()) { + // Dictionary with no nulls. + auto* indices = vector->wrapInfo()->as(); + wrapped = vector->valueVector().get(); + simd::transpose(indices, rows, innerRows); + numInner = numRows; + } else { + wrapped = vector->wrappedVector(); + for (int32_t i = 0; i < rows.size(); ++i) { + if (vector->isNullAt(rows[i])) { + if (numInner > 0) { + serializeColumn( + wrapped, + folly::Range(innerRows, numInner), + stream, + scratch); + numInner = 0; + } + stream->appendNull(); + continue; + } + innerRows[numInner++] = vector->wrappedIndex(rows[i]); + } + } + if (numInner > 0) { + serializeColumn( + wrapped, + folly::Range(innerRows, numInner), + stream, + scratch); + } +} + +template <> +void serializeFlatVector( + const BaseVector* vector, + const folly::Range& rows, + VectorStream* stream, + Scratch& scratch) { + VELOX_CHECK_NOT_NULL(vector->rawNulls()); + for (auto i = 0; i < rows.size(); ++i) { + VELOX_DCHECK(vector->isNullAt(rows[i])); + stream->appendNull(); + } +} + +template <> +void serializeFlatVector( + const BaseVector* vector, + const folly::Range& ranges, + VectorStream* stream, + Scratch& scratch) { + VELOX_UNSUPPORTED(); +} + +void serializeTimestampWithTimeZone( + const RowVector* rowVector, + const folly::Range& rows, + VectorStream* stream, + Scratch& scratch) { + auto timestamps = rowVector->childAt(0)->as>(); + auto timezones = rowVector->childAt(1)->as>(); + for (auto i : rows) { + if (rowVector->isNullAt(i)) { + stream->appendNull(); + } else { + stream->appendNonNull(); + stream->appendOne(packTimestampWithTimeZone( + timestamps->valueAt(i), timezones->valueAt(i))); + } + } +} + +void serializeRowVector( + const BaseVector* vector, + const folly::Range& rows, + VectorStream* stream, + Scratch& scratch) { + auto rowVector = reinterpret_cast(vector); + vector_size_t* childRows; + int32_t numChildRows = 0; + if (isTimestampWithTimeZoneType(vector->type())) { + serializeTimestampWithTimeZone(rowVector, rows, stream, scratch); + return; + } + ScratchPtr nullsHolder(scratch); + ScratchPtr innerRowsHolder(scratch); + auto innerRows = rows.data(); + auto numInnerRows = rows.size(); + if (auto rawNulls = vector->rawNulls()) { + auto nulls = nullsHolder.get(bits::nwords(rows.size())); + simd::gatherBits(rawNulls, rows, nulls); + auto* mutableInnerRows = innerRowsHolder.get(rows.size()); + numInnerRows = + simd::indicesOfSetBits(nulls, 0, rows.size(), mutableInnerRows); + stream->appendLengths(nulls, rows, numInnerRows, [](int32_t) { return 1; }); + simd::transpose( + rows.data(), + folly::Range(mutableInnerRows, numInnerRows), + mutableInnerRows); + innerRows = mutableInnerRows; + } else { + stream->appendLengths( + nullptr, rows, rows.size(), [](int32_t) { return 1; }); + } + for (int32_t i = 0; i < rowVector->childrenSize(); ++i) { + serializeColumn( + rowVector->childAt(i).get(), + folly::Range(innerRows, numInnerRows), + stream->childAt(i), + scratch); + } +} + +void serializeArrayVector( + const BaseVector* vector, + const folly::Range& rows, + VectorStream* stream, + Scratch& scratch) { + auto arrayVector = reinterpret_cast(vector); + + ScratchPtr rangesHolder(scratch); + int32_t numRanges = rowsToRanges( + rows, + arrayVector->rawNulls(), + arrayVector->rawOffsets(), + arrayVector->rawSizes(), + nullptr, + rangesHolder, + nullptr, + stream, + scratch); + if (numRanges == 0) { + return; + } + serializeColumn( + arrayVector->elements().get(), + folly::Range(rangesHolder.get(), numRanges), + stream->childAt(0)); +} + +void serializeMapVector( + const BaseVector* vector, + const folly::Range& rows, + VectorStream* stream, + Scratch& scratch) { + auto mapVector = reinterpret_cast(vector); + + ScratchPtr rangesHolder(scratch); + int32_t numRanges = rowsToRanges( + rows, + mapVector->rawNulls(), + mapVector->rawOffsets(), + mapVector->rawSizes(), + nullptr, + rangesHolder, + nullptr, + stream, + scratch); + if (numRanges == 0) { + return; + } + serializeColumn( + mapVector->mapKeys().get(), + folly::Range(rangesHolder.get(), numRanges), + stream->childAt(0)); + serializeColumn( + mapVector->mapValues().get(), + folly::Range(rangesHolder.get(), numRanges), + stream->childAt(1)); +} + +template +void serializeConstantVector( + const BaseVector* vector, + const folly::Range& rows, + VectorStream* stream, + Scratch& scratch) { + using T = typename KindToFlatVector::WrapperType; + auto constVector = dynamic_cast*>(vector); + if (constVector->valueVector()) { + serializeWrapped(constVector, rows, stream, scratch); + return; + } + const auto numRows = rows.size(); + if (vector->isNullAt(0)) { + for (int32_t i = 0; i < numRows; ++i) { + stream->appendNull(); + } + return; + } + + T value = constVector->valueAtFast(0); + for (int32_t i = 0; i < numRows; ++i) { + stream->appendNonNull(); + stream->appendOne(value); + } +} + +template +void serializeBiasVector( + const BaseVector* vector, + const folly::Range& rows, + VectorStream* stream, + Scratch& scratch) { + VELOX_UNSUPPORTED(); +} + +void serializeColumn( + const BaseVector* vector, + const folly::Range& rows, + VectorStream* stream, + Scratch& scratch) { + switch (vector->encoding()) { + case VectorEncoding::Simple::FLAT: + VELOX_DYNAMIC_SCALAR_TYPE_DISPATCH_ALL( + serializeFlatVector, + vector->typeKind(), + vector, + rows, + stream, + scratch); + break; + case VectorEncoding::Simple::CONSTANT: + VELOX_DYNAMIC_TYPE_DISPATCH_ALL( + serializeConstantVector, + vector->typeKind(), + vector, + rows, + stream, + scratch); + break; + case VectorEncoding::Simple::BIASED: + VELOX_UNSUPPORTED(); + case VectorEncoding::Simple::ROW: + serializeRowVector(vector, rows, stream, scratch); + break; + case VectorEncoding::Simple::ARRAY: + serializeArrayVector(vector, rows, stream, scratch); + break; + case VectorEncoding::Simple::MAP: + serializeMapVector(vector, rows, stream, scratch); + break; + case VectorEncoding::Simple::LAZY: + serializeColumn(vector->loadedVector(), rows, stream, scratch); + break; + default: + serializeWrapped(vector, rows, stream, scratch); + } +} + template void serializeConstantColumn( const BaseVector* vector, @@ -1800,19 +2490,12 @@ void estimateFlatSerializedSize( vector_size_t** sizes) { auto valueSize = vector->type()->cppSizeInBytes(); if (vector->mayHaveNulls()) { + auto rawNulls = vector->rawNulls(); for (int32_t i = 0; i < ranges.size(); ++i) { auto end = ranges[i].begin + ranges[i].size; - int32_t numNulls = 0; - int32_t bytes = 0; - auto rawNulls = vector->rawNulls(); - for (int32_t offset = ranges[i].begin; offset < end; ++offset) { - if (bits::isBitNull(rawNulls, offset)) { - ++numNulls; - } else { - bytes += valueSize; - } - } - *(sizes[i]) += bytes + bits::nbytes(numNulls); + auto numValues = bits::countBits(rawNulls, ranges[i].begin, end); + *(sizes[i]) += + numValues * valueSize + bits::nbytes(ranges[i].size - numValues); } } else { for (int32_t i = 0; i < ranges.size(); ++i) { @@ -1868,16 +2551,8 @@ void estimateBiasedSerializedSize( auto rawNulls = vector->rawNulls(); for (int32_t i = 0; i < ranges.size(); ++i) { auto end = ranges[i].begin + ranges[i].size; - int32_t numNulls = 0; - int32_t bytes = 0; - for (int32_t offset = ranges[i].begin; offset < end; ++offset) { - if (bits::isBitNull(rawNulls, offset)) { - ++numNulls; - } else { - bytes += valueSize; - } - } - *(sizes[i]) += bytes + bits::nbytes(numNulls); + int32_t numValues = bits::countBits(rawNulls, ranges[i].begin, end); + *(sizes[i]) += numValues * valueSize + bits::nbytes(ranges[i].size); } } else { for (int32_t i = 0; i < ranges.size(); ++i) { @@ -1889,12 +2564,14 @@ void estimateBiasedSerializedSize( void estimateSerializedSizeInt( const BaseVector* vector, const folly::Range& ranges, - vector_size_t** sizes); + vector_size_t** sizes, + Scratch& scratch); void estimateWrapperSerializedSize( const folly::Range& ranges, vector_size_t** sizes, - const BaseVector* wrapper) { + const BaseVector* wrapper, + Scratch& scratch) { std::vector newRanges; std::vector newSizes; const BaseVector* wrapped = wrapper->wrappedVector(); @@ -1911,19 +2588,20 @@ void estimateWrapperSerializedSize( } *sizes[i] += bits::nbytes(numNulls); } - estimateSerializedSizeInt(wrapped, newRanges, newSizes.data()); + estimateSerializedSizeInt(wrapped, newRanges, newSizes.data(), scratch); } template void estimateConstantSerializedSize( const BaseVector* vector, const folly::Range& ranges, - vector_size_t** sizes) { + vector_size_t** sizes, + Scratch& scratch) { VELOX_CHECK(vector->encoding() == VectorEncoding::Simple::CONSTANT); using T = typename KindToFlatVector::WrapperType; auto constantVector = vector->as>(); if (constantVector->valueVector()) { - estimateWrapperSerializedSize(ranges, sizes, vector); + estimateWrapperSerializedSize(ranges, sizes, vector, scratch); return; } int32_t elementSize = sizeof(T); @@ -1942,7 +2620,8 @@ void estimateConstantSerializedSize( void estimateSerializedSizeInt( const BaseVector* vector, const folly::Range& ranges, - vector_size_t** sizes) { + vector_size_t** sizes, + Scratch& scratch) { switch (vector->encoding()) { case VectorEncoding::Simple::FLAT: VELOX_DYNAMIC_SCALAR_TYPE_DISPATCH_ALL( @@ -1958,11 +2637,12 @@ void estimateSerializedSizeInt( vector->typeKind(), vector, ranges, - sizes); + sizes, + scratch); break; case VectorEncoding::Simple::DICTIONARY: case VectorEncoding::Simple::SEQUENCE: - estimateWrapperSerializedSize(ranges, sizes, vector); + estimateWrapperSerializedSize(ranges, sizes, vector, scratch); break; case VectorEncoding::Simple::BIASED: estimateBiasedSerializedSize(vector, ranges, sizes); @@ -1982,13 +2662,14 @@ void estimateSerializedSizeInt( } } auto rowVector = vector->as(); - auto children = rowVector->children(); + auto& children = rowVector->children(); for (auto& child : children) { if (child) { estimateSerializedSizeInt( child.get(), folly::Range(childRanges.data(), childRanges.size()), - childSizes.data()); + childSizes.data(), + scratch); } } break; @@ -2006,9 +2687,12 @@ void estimateSerializedSizeInt( &childRanges, &childSizes); estimateSerializedSizeInt( - mapVector->mapKeys().get(), childRanges, childSizes.data()); + mapVector->mapKeys().get(), childRanges, childSizes.data(), scratch); estimateSerializedSizeInt( - mapVector->mapValues().get(), childRanges, childSizes.data()); + mapVector->mapValues().get(), + childRanges, + childSizes.data(), + scratch); break; } case VectorEncoding::Simple::ARRAY: { @@ -2024,11 +2708,303 @@ void estimateSerializedSizeInt( &childRanges, &childSizes); estimateSerializedSizeInt( - arrayVector->elements().get(), childRanges, childSizes.data()); + arrayVector->elements().get(), + childRanges, + childSizes.data(), + scratch); break; } case VectorEncoding::Simple::LAZY: - estimateSerializedSizeInt(vector->loadedVector(), ranges, sizes); + estimateSerializedSizeInt(vector->loadedVector(), ranges, sizes, scratch); + break; + default: + VELOX_CHECK(false, "Unsupported vector encoding {}", vector->encoding()); + } +} + +void estimateSerializedSizeInt( + const BaseVector* vector, + const folly::Range& rows, + vector_size_t** sizes, + Scratch& scratch); + +template +void estimateFlatSerializedSize( + const BaseVector* vector, + const folly::Range& rows, + vector_size_t** sizes, + Scratch& scratch) { + const auto valueSize = vector->type()->cppSizeInBytes(); + const auto numRows = rows.size(); + if (vector->mayHaveNulls()) { + auto rawNulls = vector->rawNulls(); + ScratchPtr nullsHolder(scratch); + ScratchPtr nonNullsHolder(scratch); + auto nulls = nullsHolder.get(bits::nwords(numRows)); + simd::gatherBits(rawNulls, rows, nulls); + auto nonNulls = nonNullsHolder.get(numRows); + const auto numNonNull = simd::indicesOfSetBits(nulls, 0, numRows, nonNulls); + for (int32_t i = 0; i < numNonNull; ++i) { + *sizes[nonNulls[i]] += valueSize; + } + } else { + VELOX_UNREACHABLE("Non null fixed width case handled before this"); + } +} + +void estimateFlatSerializedSizeVarcharOrVarbinary( + const BaseVector* vector, + const folly::Range& rows, + vector_size_t** sizes, + Scratch& scratch) { + const auto numRows = rows.size(); + auto strings = static_cast*>(vector); + auto rawNulls = strings->rawNulls(); + auto rawValues = strings->rawValues(); + if (!rawNulls) { + for (auto i = 0; i < rows.size(); ++i) { + *sizes[i] += rawValues[rows[i]].size(); + } + } else { + ScratchPtr nullsHolder(scratch); + ScratchPtr nonNullsHolder(scratch); + auto nulls = nullsHolder.get(bits::nwords(numRows)); + simd::gatherBits(rawNulls, rows, nulls); + auto* nonNulls = nonNullsHolder.get(numRows); + auto numNonNull = simd::indicesOfSetBits(nulls, 0, numRows, nonNulls); + + for (int32_t i = 0; i < numNonNull; ++i) { + *sizes[nonNulls[i]] += rawValues[rows[nonNulls[i]]].size(); + } + } +} + +template <> +void estimateFlatSerializedSize( + const BaseVector* vector, + const folly::Range& rows, + vector_size_t** sizes, + Scratch& scratch) { + estimateFlatSerializedSizeVarcharOrVarbinary(vector, rows, sizes, scratch); +} + +template <> +void estimateFlatSerializedSize( + const BaseVector* vector, + const folly::Range& rows, + vector_size_t** sizes, + Scratch& scratch) { + estimateFlatSerializedSizeVarcharOrVarbinary(vector, rows, sizes, scratch); +} + +void estimateBiasedSerializedSize( + const BaseVector* vector, + const folly::Range& rows, + vector_size_t** sizes, + Scratch& scratch) { + auto valueSize = vector->type()->cppSizeInBytes(); + VELOX_UNSUPPORTED(); +} + +void estimateWrapperSerializedSize( + const folly::Range& rows, + vector_size_t** sizes, + const BaseVector* wrapper, + Scratch& scratch) { + ScratchPtr innerRowsHolder(scratch); + ScratchPtr innerSizesHolder(scratch); + const int32_t numRows = rows.size(); + int32_t numInner = 0; + auto innerRows = innerRowsHolder.get(numRows); + vector_size_t** innerSizes = innerSizesHolder.get(numRows); + const BaseVector* wrapped; + if (wrapper->encoding() == VectorEncoding::Simple::DICTIONARY && + !wrapper->rawNulls()) { + // Dictionary with no nulls. + auto* indices = wrapper->wrapInfo()->as(); + wrapped = wrapper->valueVector().get(); + simd::transpose(indices, rows, innerRows); + } else { + wrapped = wrapper->wrappedVector(); + for (int32_t i = 0; i < rows.size(); ++i) { + if (!wrapper->isNullAt(rows[i])) { + innerRows[numInner] = wrapper->wrappedIndex(rows[i]); + innerSizes[numInner] = sizes[i]; + ++numInner; + } + } + } + if (numInner == 0) { + return; + } + estimateSerializedSizeInt( + wrapped, + folly::Range(innerRows, numInner), + innerSizes, + scratch); +} + +template +void estimateConstantSerializedSize( + const BaseVector* vector, + const folly::Range& rows, + vector_size_t** sizes, + Scratch& scratch) { + VELOX_CHECK(vector->encoding() == VectorEncoding::Simple::CONSTANT); + using T = typename KindToFlatVector::WrapperType; + auto constantVector = vector->as>(); + int32_t elementSize = sizeof(T); + if (constantVector->isNullAt(0)) { + elementSize = 1; + } else if (vector->valueVector()) { + auto values = constantVector->wrappedVector(); + vector_size_t* sizePtr = &elementSize; + vector_size_t singleRow = constantVector->wrappedIndex(0); + estimateSerializedSizeInt( + values, + folly::Range(&singleRow, 1), + &sizePtr, + scratch); + } else if (std::is_same_v) { + auto value = constantVector->valueAt(0); + auto string = reinterpret_cast(&value); + elementSize = string->size(); + } + for (int32_t i = 0; i < rows.size(); ++i) { + *sizes[i] += elementSize; + } +} +void estimateSerializedSizeInt( + const BaseVector* vector, + const folly::Range& rows, + vector_size_t** sizes, + Scratch& scratch) { + const auto numRows = rows.size(); + if (vector->type()->isFixedWidth() && !vector->mayHaveNullsRecursive()) { + const auto elementSize = vector->type()->cppSizeInBytes(); + for (auto i = 0; i < numRows; ++i) { + *sizes[i] += elementSize; + } + return; + } + switch (vector->encoding()) { + case VectorEncoding::Simple::FLAT: { + VELOX_DYNAMIC_TYPE_DISPATCH_ALL( + estimateFlatSerializedSize, + vector->typeKind(), + vector, + rows, + sizes, + scratch); + break; + } + case VectorEncoding::Simple::CONSTANT: + VELOX_DYNAMIC_TYPE_DISPATCH_ALL( + estimateConstantSerializedSize, + vector->typeKind(), + vector, + rows, + sizes, + scratch); + break; + case VectorEncoding::Simple::DICTIONARY: + case VectorEncoding::Simple::SEQUENCE: + estimateWrapperSerializedSize(rows, sizes, vector, scratch); + break; + case VectorEncoding::Simple::BIASED: + estimateBiasedSerializedSize(vector, rows, sizes, scratch); + break; + case VectorEncoding::Simple::ROW: { + ScratchPtr innerRowsHolder(scratch); + ScratchPtr innerSizesHolder(scratch); + ScratchPtr nullsHolder(scratch); + auto* innerRows = rows.data(); + auto* innerSizes = sizes; + const auto numRows = rows.size(); + int32_t numInner = numRows; + if (vector->mayHaveNulls()) { + auto nulls = nullsHolder.get(bits::nwords(numRows)); + simd::gatherBits(vector->rawNulls(), rows, nulls); + auto mutableInnerRows = innerRowsHolder.get(numRows); + numInner = simd::indicesOfSetBits(nulls, 0, numRows, mutableInnerRows); + innerSizes = innerSizesHolder.get(numInner); + for (auto i = 0; i < numInner; ++i) { + innerSizes[i] = sizes[mutableInnerRows[i]]; + } + simd::transpose( + rows.data(), + folly::Range(mutableInnerRows, numInner), + mutableInnerRows); + innerRows = mutableInnerRows; + } + auto rowVector = vector->as(); + auto& children = rowVector->children(); + for (auto& child : children) { + if (child) { + estimateSerializedSizeInt( + child.get(), + folly::Range(innerRows, numInner), + innerSizes, + scratch); + } + } + break; + } + case VectorEncoding::Simple::MAP: { + auto mapVector = vector->asUnchecked(); + ScratchPtr rangeHolder(scratch); + ScratchPtr sizesHolder(scratch); + const auto numRanges = rowsToRanges( + rows, + mapVector->rawNulls(), + mapVector->rawOffsets(), + mapVector->rawSizes(), + sizes, + rangeHolder, + &sizesHolder, + nullptr, + scratch); + if (numRanges == 0) { + return; + } + estimateSerializedSizeInt( + mapVector->mapKeys().get(), + folly::Range(rangeHolder.get(), numRanges), + sizesHolder.get(), + scratch); + estimateSerializedSizeInt( + mapVector->mapValues().get(), + folly::Range(rangeHolder.get(), numRanges), + sizesHolder.get(), + scratch); + break; + } + case VectorEncoding::Simple::ARRAY: { + auto arrayVector = vector->as(); + ScratchPtr rangeHolder(scratch); + ScratchPtr sizesHolder(scratch); + const auto numRanges = rowsToRanges( + rows, + arrayVector->rawNulls(), + arrayVector->rawOffsets(), + arrayVector->rawSizes(), + sizes, + rangeHolder, + &sizesHolder, + nullptr, + scratch); + if (numRanges == 0) { + return; + } + estimateSerializedSizeInt( + arrayVector->elements().get(), + folly::Range(rangeHolder.get(), numRanges), + sizesHolder.get(), + scratch); + break; + } + case VectorEncoding::Simple::LAZY: + estimateSerializedSizeInt(vector->loadedVector(), rows, sizes, scratch); break; default: VELOX_CHECK(false, "Unsupported vector encoding {}", vector->encoding()); @@ -2061,7 +3037,8 @@ class PrestoVectorSerializer : public VectorSerializer { void append( const RowVectorPtr& vector, - const folly::Range& ranges) override { + const folly::Range& ranges, + Scratch& scratch) override { auto newRows = rangesTotalSize(ranges); if (newRows > 0) { numRows_ += newRows; @@ -2071,6 +3048,20 @@ class PrestoVectorSerializer : public VectorSerializer { } } + void append( + const RowVectorPtr& vector, + const folly::Range& rows, + Scratch& scratch) override { + auto newRows = rows.size(); + if (newRows > 0) { + numRows_ += newRows; + for (int32_t i = 0; i < vector->childrenSize(); ++i) { + serializeColumn( + vector->childAt(i).get(), rows, streams_[i].get(), scratch); + } + } + } + void appendEncoded( const RowVectorPtr& vector, const folly::Range& ranges) { @@ -2254,8 +3245,17 @@ class PrestoVectorSerializer : public VectorSerializer { void PrestoVectorSerde::estimateSerializedSize( VectorPtr vector, const folly::Range& ranges, - vector_size_t** sizes) { - estimateSerializedSizeInt(vector->loadedVector(), ranges, sizes); + vector_size_t** sizes, + Scratch& scratch) { + estimateSerializedSizeInt(vector->loadedVector(), ranges, sizes, scratch); +} + +void PrestoVectorSerde::estimateSerializedSize( + VectorPtr vector, + const folly::Range rows, + vector_size_t** sizes, + Scratch& scratch) { + estimateSerializedSizeInt(vector->loadedVector(), rows, sizes, scratch); } std::unique_ptr PrestoVectorSerde::createSerializer( @@ -2387,6 +3387,14 @@ void testingScatterStructNulls( // static void PrestoVectorSerde::registerVectorSerde() { + auto toByte = [](int32_t number, int32_t bit) { + return static_cast(bits::isBitSet(&number, bit)) << (bit * 8); + }; + for (auto i = 0; i < 256; ++i) { + bitsToBytesMap[i] = toByte(i, 0) | toByte(i, 1) | toByte(i, 2) | + toByte(i, 3) | toByte(i, 4) | toByte(i, 5) | toByte(i, 6) | + toByte(i, 7); + } velox::registerVectorSerde(std::make_unique()); } diff --git a/velox/serializers/PrestoSerializer.h b/velox/serializers/PrestoSerializer.h index da20b21cc008..29eb40b69d85 100644 --- a/velox/serializers/PrestoSerializer.h +++ b/velox/serializers/PrestoSerializer.h @@ -43,10 +43,19 @@ class PrestoVectorSerde : public VectorSerde { std::vector encodings; }; + /// Adds the serialized sizes of the rows of 'vector' in 'ranges[i]' to + /// '*sizes[i]'. void estimateSerializedSize( VectorPtr vector, const folly::Range& ranges, - vector_size_t** sizes) override; + vector_size_t** sizes, + Scratch& scratch) override; + + void estimateSerializedSize( + VectorPtr vector, + const folly::Range rows, + vector_size_t** sizes, + Scratch& scratch) override; std::unique_ptr createSerializer( RowTypePtr type, diff --git a/velox/serializers/UnsafeRowSerializer.cpp b/velox/serializers/UnsafeRowSerializer.cpp index e94cbb44ec8d..342311c5d926 100644 --- a/velox/serializers/UnsafeRowSerializer.cpp +++ b/velox/serializers/UnsafeRowSerializer.cpp @@ -23,7 +23,8 @@ namespace facebook::velox::serializer::spark { void UnsafeRowVectorSerde::estimateSerializedSize( VectorPtr /* vector */, const folly::Range& /* ranges */, - vector_size_t** /* sizes */) { + vector_size_t** /* sizes */, + Scratch& /*scratch*/) { VELOX_UNSUPPORTED(); } @@ -37,7 +38,8 @@ class UnsafeRowVectorSerializer : public VectorSerializer { void append( const RowVectorPtr& vector, - const folly::Range& ranges) override { + const folly::Range& ranges, + Scratch& /*scratch*/) override { size_t totalSize = 0; row::UnsafeRowFast unsafeRow(vector); if (auto fixedRowSize = diff --git a/velox/serializers/UnsafeRowSerializer.h b/velox/serializers/UnsafeRowSerializer.h index 995e58498e5b..1bf41c4f0cae 100644 --- a/velox/serializers/UnsafeRowSerializer.h +++ b/velox/serializers/UnsafeRowSerializer.h @@ -26,7 +26,8 @@ class UnsafeRowVectorSerde : public VectorSerde { void estimateSerializedSize( VectorPtr vector, const folly::Range& ranges, - vector_size_t** sizes) override; + vector_size_t** sizes, + Scratch& scratch) override; // This method is not used in production code. It is only used to // support round-trip tests for deserialization. diff --git a/velox/serializers/tests/CMakeLists.txt b/velox/serializers/tests/CMakeLists.txt index e6ddedd5d9b9..0d2a86bee18a 100644 --- a/velox/serializers/tests/CMakeLists.txt +++ b/velox/serializers/tests/CMakeLists.txt @@ -28,3 +28,15 @@ target_link_libraries( gtest_main gflags::gflags glog::glog) + +add_executable(velox_serializer_benchmark SerializerBenchmark.cpp) + +target_link_libraries( + velox_serializer_benchmark + velox_presto_serializer + velox_vector_test_lib + velox_vector_fuzzer + gtest + gtest_main + gflags::gflags + glog::glog) diff --git a/velox/serializers/tests/CompactRowSerializerTest.cpp b/velox/serializers/tests/CompactRowSerializerTest.cpp index 4e8524fdbe3e..55a6a24199e7 100644 --- a/velox/serializers/tests/CompactRowSerializerTest.cpp +++ b/velox/serializers/tests/CompactRowSerializerTest.cpp @@ -41,7 +41,8 @@ class CompactRowSerializerTest : public ::testing::Test, auto rowType = asRowType(rowVector->type()); auto serializer = serde_->createSerializer(rowType, numRows, arena.get()); - serializer->append(rowVector, folly::Range(rows.data(), numRows)); + Scratch scratch; + serializer->append(rowVector, folly::Range(rows.data(), numRows), scratch); auto size = serializer->maxSerializedSize(); OStreamOutputStream out(output); serializer->flush(&out); diff --git a/velox/serializers/tests/PrestoSerializerTest.cpp b/velox/serializers/tests/PrestoSerializerTest.cpp index 0165c273706c..f211c021a6d4 100644 --- a/velox/serializers/tests/PrestoSerializerTest.cpp +++ b/velox/serializers/tests/PrestoSerializerTest.cpp @@ -18,6 +18,8 @@ #include #include #include "velox/common/base/tests/GTestUtils.h" +#include "velox/common/memory/ByteStream.h" +#include "velox/common/time/Timer.h" #include "velox/functions/prestosql/types/TimestampWithTimeZoneType.h" #include "velox/vector/fuzzer/VectorFuzzer.h" #include "velox/vector/tests/utils/VectorTestBase.h" @@ -38,6 +40,7 @@ class PrestoSerializerTest } void sanityCheckEstimateSerializedSize(const RowVectorPtr& rowVector) { + Scratch scratch; const auto numRows = rowVector->size(); std::vector rows(numRows); @@ -51,7 +54,10 @@ class PrestoSerializerTest rawRowSizes[i] = &rowSizes[i]; } serde_->estimateSerializedSize( - rowVector, folly::Range(rows.data(), numRows), rawRowSizes.data()); + rowVector, + folly::Range(rows.data(), numRows), + rawRowSizes.data(), + scratch); } serializer::presto::PrestoVectorSerde::PrestoOptions getParamSerdeOptions( @@ -68,8 +74,9 @@ class PrestoSerializerTest void serialize( const RowVectorPtr& rowVector, std::ostream* output, - const serializer::presto::PrestoVectorSerde::PrestoOptions* - serdeOptions) { + const serializer::presto::PrestoVectorSerde::PrestoOptions* serdeOptions, + std::optional> indexRanges = std::nullopt, + std::optional> rows = std::nullopt) { auto streamInitialSize = output->tellp(); sanityCheckEstimateSerializedSize(rowVector); @@ -79,9 +86,34 @@ class PrestoSerializerTest auto paramOptions = getParamSerdeOptions(serdeOptions); auto serializer = serde_->createSerializer(rowType, numRows, arena.get(), ¶mOptions); - - serializer->append(rowVector); + vector_size_t sizeEstimate = 0; + + Scratch scratch; + if (indexRanges.has_value()) { + raw_vector sizes(indexRanges.value().size()); + std::fill(sizes.begin(), sizes.end(), &sizeEstimate); + serde_->estimateSerializedSize( + rowVector, indexRanges.value(), sizes.data(), scratch); + serializer->append(rowVector, indexRanges.value(), scratch); + } else if (rows.has_value()) { + raw_vector sizes(rows.value().size()); + std::fill(sizes.begin(), sizes.end(), &sizeEstimate); + serde_->estimateSerializedSize( + rowVector, rows.value(), sizes.data(), scratch); + serializer->append(rowVector, rows.value(), scratch); + } else { + vector_size_t* sizes = &sizeEstimate; + IndexRange range{0, rowVector->size()}; + serde_->estimateSerializedSize( + rowVector, + folly::Range(&range, 1), + &sizes, + scratch); + serializer->append(rowVector); + } auto size = serializer->maxSerializedSize(); + LOG(INFO) << "Size=" << size << " estimate=" << sizeEstimate << " " + << (100 * sizeEstimate) / size << "%"; facebook::velox::serializer::presto::PrestoOutputStreamListener listener; OStreamOutputStream out(output, &listener); serializer->flush(&out); @@ -164,6 +196,35 @@ class PrestoSerializerTest } assertEqualVectors(result, rowVector); + + // Serialize the vector with even and odd rows in different partitions. + auto even = + makeIndices(rowVector->size() / 2, [&](auto row) { return row * 2; }); + auto odd = makeIndices( + (rowVector->size() - 1) / 2, [&](auto row) { return (row * 2) + 1; }); + testSerializeRows(rowVector, even, serdeOptions); + testSerializeRows(rowVector, odd, serdeOptions); + } + + void testSerializeRows( + const RowVectorPtr& rowVector, + BufferPtr indices, + const serializer::presto::PrestoVectorSerde::PrestoOptions* + serdeOptions) { + std::ostringstream out; + auto rows = folly::Range( + indices->as(), indices->size() / sizeof(vector_size_t)); + serialize(rowVector, &out, serdeOptions, std::nullopt, rows); + + auto rowType = asRowType(rowVector->type()); + auto deserialized = deserialize(rowType, out.str(), serdeOptions); + assertEqualVectors( + deserialized, + BaseVector::wrapInDictionary( + BufferPtr(nullptr), + indices, + indices->size() / sizeof(vector_size_t), + rowVector)); } void serializeEncoded( @@ -522,12 +583,19 @@ TEST_P(PrestoSerializerTest, roundTrip) { VectorFuzzer::Options::TimestampPrecision::kMilliSeconds; opts.nullRatio = 0.1; VectorFuzzer fuzzer(opts, pool_.get()); + VectorFuzzer::Options nonNullOpts; + nonNullOpts.timestampPrecision = + VectorFuzzer::Options::TimestampPrecision::kMilliSeconds; + nonNullOpts.nullRatio = 0; + VectorFuzzer nonNullFuzzer(nonNullOpts, pool_.get()); const size_t numRounds = 20; for (size_t i = 0; i < numRounds; ++i) { auto rowType = fuzzer.randRowType(); - auto inputRowVector = fuzzer.fuzzInputRow(rowType); + + auto inputRowVector = (i % 2 == 0) ? fuzzer.fuzzInputRow(rowType) + : nonNullFuzzer.fuzzInputRow(rowType); testRoundTrip(inputRowVector); } } diff --git a/velox/serializers/tests/SerializerBenchmark.cpp b/velox/serializers/tests/SerializerBenchmark.cpp new file mode 100644 index 000000000000..0a3131f2dafe --- /dev/null +++ b/velox/serializers/tests/SerializerBenchmark.cpp @@ -0,0 +1,158 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include + +#include +#include "velox/common/base/tests/GTestUtils.h" +#include "velox/common/memory/ByteStream.h" +#include "velox/common/time/Timer.h" +#include "velox/functions/prestosql/types/TimestampWithTimeZoneType.h" +#include "velox/serializers/PrestoSerializer.h" +#include "velox/vector/fuzzer/VectorFuzzer.h" +#include "velox/vector/tests/utils/VectorTestBase.h" + +using namespace facebook::velox; +using namespace facebook::velox::test; + +class SerializerBenchmark : public VectorTestBase { + public: + struct SerializeCase { + int32_t nullPct; + int32_t numSelected; + int32_t bits; + int32_t vectorSize; + uint64_t irTime{0}; + uint64_t rrTime{0}; + + std::string toString() { + return fmt::format( + "{} of {} {} bit {}%null: {} ir / {} rr", + numSelected, + vectorSize, + bits, + nullPct, + irTime, + rrTime); + } + }; + + void setup() { + serde_ = std::make_unique(); + } + void timeFlat() { + // Serialize different fractions of a 10K vector of int32_t and int64_t with + // IndexRange and row range variants with and without nulls. + constexpr int32_t kPad = 8; + std::vector numSelectedValues = {3, 30, 300, 10000}; + std::vector> indexRanges; + std::vector> rowSets; + std::vector nullPctValues = {0, 1, 10, 90}; + std::vector bitsValues = {32, 64}; + const int32_t vectorSize = 10000; + for (auto numSelected : numSelectedValues) { + std::vector ir; + std::vector rr; + int32_t step = vectorSize / numSelected; + for (auto r = 0; r < vectorSize; r += step) { + ir.push_back(IndexRange{r, 1}); + rr.push_back(r); + } + std::cout << rr.size(); + indexRanges.push_back(std::move(ir)); + rr.resize(rr.size() + kPad, 999999999); + rowSets.push_back(std::move(rr)); + } + VectorMaker vm(pool_.get()); + std::vector v32s; + std::vector v64s; + for (auto nullPct : nullPctValues) { + auto v32 = vm.flatVector( + vectorSize, + [&](auto row) { return row; }, + [&](auto row) { return nullPct == 0 ? false : row % 100 < nullPct; }); + auto v64 = vm.flatVector( + vectorSize, + [&](auto row) { return row; }, + [&](auto row) { return nullPct == 0 ? false : row % 100 < nullPct; }); + v32s.push_back(std::move(v32)); + v64s.push_back(std::move(v64)); + } + std::vector cases; + Scratch scratch; + + auto runCase = [&](int32_t nullIdx, int32_t selIdx, int32_t bits) { + SerializeCase item; + item.vectorSize = vectorSize; + item.nullPct = nullPctValues[nullIdx]; + item.numSelected = numSelectedValues[selIdx]; + item.bits = bits; + int32_t numRepeat = 100 * vectorSize / indexRanges[selIdx].size(); + + VectorPtr vector = bits == 32 ? v32s[nullIdx] : v64s[nullIdx]; + auto rowType = ROW({vector->type()}); + auto rowVector = vm.rowVector({vector}); + { + MicrosecondTimer t(&item.irTime); + auto group = std::make_unique(pool_.get()); + group->createStreamTree(rowType, rowSets[selIdx].size() - kPad); + for (auto repeat = 0; repeat < numRepeat; ++repeat) { + group->append( + rowVector, + folly::Range( + indexRanges[selIdx].data(), indexRanges[selIdx].size()), + scratch); + } + } + + { + MicrosecondTimer t(&item.rrTime); + auto group = std::make_unique(pool_.get()); + group->createStreamTree(rowType, rowSets[selIdx].size()); + + for (auto repeat = 0; repeat < numRepeat; ++repeat) { + group->append( + rowVector, + folly::Range( + rowSets[selIdx].data(), rowSets[selIdx].size() - kPad), + scratch); + } + } + return item; + }; + + for (auto bits : bitsValues) { + for (auto nullIdx = 0; nullIdx < nullPctValues.size(); ++nullIdx) { + for (auto selIdx = 0; selIdx < numSelectedValues.size(); ++selIdx) { + int32_t numRepeat = 10 / numSelectedValues[selIdx]; + cases.push_back(runCase(nullIdx, selIdx, bits)); + } + } + } + for (auto& item : cases) { + std::cout << item.toString() << std::endl; + } + } + + std::unique_ptr serde_; +}; + +int main(int argc, char** argv) { + folly::init(&argc, &argv); + serializer::presto::PrestoVectorSerde::registerVectorSerde(); + SerializerBenchmark bm; + bm.setup(); + bm.timeFlat(); +} diff --git a/velox/serializers/tests/UnsafeRowSerializerTest.cpp b/velox/serializers/tests/UnsafeRowSerializerTest.cpp index d686a90b67d7..28338f9b2939 100644 --- a/velox/serializers/tests/UnsafeRowSerializerTest.cpp +++ b/velox/serializers/tests/UnsafeRowSerializerTest.cpp @@ -41,7 +41,8 @@ class UnsafeRowSerializerTest : public ::testing::Test, auto rowType = std::dynamic_pointer_cast(rowVector->type()); auto serializer = serde_->createSerializer(rowType, numRows, arena.get()); - serializer->append(rowVector, folly::Range(rows.data(), numRows)); + Scratch scratch; + serializer->append(rowVector, folly::Range(rows.data(), numRows), scratch); auto size = serializer->maxSerializedSize(); OStreamOutputStream out(output); serializer->flush(&out); diff --git a/velox/vector/VectorStream.cpp b/velox/vector/VectorStream.cpp index 2d907dc1b35c..fec8eb7a9fd1 100644 --- a/velox/vector/VectorStream.cpp +++ b/velox/vector/VectorStream.cpp @@ -15,12 +15,14 @@ */ #include "velox/vector/VectorStream.h" #include +#include "velox/common/base/RawVector.h" namespace facebook::velox { void VectorSerializer::append(const RowVectorPtr& vector) { const IndexRange allRows{0, vector->size()}; - append(vector, folly::Range(&allRows, 1)); + Scratch scratch; + append(vector, folly::Range(&allRows, 1), scratch); } namespace { @@ -102,8 +104,16 @@ void VectorStreamGroup::createStreamTree( void VectorStreamGroup::append( const RowVectorPtr& vector, - const folly::Range& ranges) { - serializer_->append(vector, ranges); + const folly::Range& ranges, + Scratch& scratch) { + serializer_->append(vector, ranges, scratch); +} + +void VectorStreamGroup::append( + const RowVectorPtr& vector, + const folly::Range& rows, + Scratch& scratch) { + serializer_->append(vector, rows, scratch); } void VectorStreamGroup::append(const RowVectorPtr& vector) { @@ -118,8 +128,18 @@ void VectorStreamGroup::flush(OutputStream* out) { void VectorStreamGroup::estimateSerializedSize( VectorPtr vector, const folly::Range& ranges, - vector_size_t** sizes) { - getVectorSerde()->estimateSerializedSize(vector, ranges, sizes); + vector_size_t** sizes, + Scratch& scratch) { + getVectorSerde()->estimateSerializedSize(vector, ranges, sizes, scratch); +} + +// static +void VectorStreamGroup::estimateSerializedSize( + VectorPtr vector, + folly::Range rows, + vector_size_t** sizes, + Scratch& scratch) { + getVectorSerde()->estimateSerializedSize(vector, rows, sizes, scratch); } // static @@ -148,7 +168,8 @@ folly::IOBuf rowVectorToIOBuf( streamGroup->createStreamTree(asRowType(rowVector->type()), rangeEnd); IndexRange range{0, rangeEnd}; - streamGroup->append(rowVector, folly::Range(&range, 1)); + Scratch scratch; + streamGroup->append(rowVector, folly::Range(&range, 1), scratch); IOBufOutputStream stream(pool); streamGroup->flush(&stream); diff --git a/velox/vector/VectorStream.h b/velox/vector/VectorStream.h index e79af35aaa8e..5f1899195a73 100644 --- a/velox/vector/VectorStream.h +++ b/velox/vector/VectorStream.h @@ -15,7 +15,9 @@ */ #pragma once +#include #include "velox/buffer/Buffer.h" +#include "velox/common/base/Scratch.h" #include "velox/common/memory/ByteStream.h" #include "velox/common/memory/Memory.h" #include "velox/common/memory/MemoryAllocator.h" @@ -38,11 +40,31 @@ class VectorSerializer { /// Serialize a subset of rows in a vector. virtual void append( const RowVectorPtr& vector, - const folly::Range& ranges) = 0; + const folly::Range& ranges, + Scratch& scratch) = 0; + + virtual void append( + const RowVectorPtr& vector, + const folly::Range& ranges) { + Scratch scratch; + append(vector, ranges, scratch); + } + + virtual void append( + const RowVectorPtr& vector, + const folly::Range& rows, + Scratch& scratch) { + VELOX_UNSUPPORTED(); + } /// Serialize all rows in a vector. void append(const RowVectorPtr& vector); + // True if supports append with folly::Range. + virtual bool supportsAppendRows() const { + return false; + } + /// Returns the maximum serialized size of the data previously added via /// 'append' methods. Can be used to allocate buffer of exact or maximum size /// before calling 'flush'. @@ -70,10 +92,32 @@ class VectorSerde { virtual ~Options() {} }; + /// Adds the serialized size of vector at 'rows[i]' to '*sizes[i]'. + virtual void estimateSerializedSize( + VectorPtr vector, + folly::Range rows, + vector_size_t** sizes, + Scratch& scratch) { + VELOX_UNSUPPORTED(); + } + + /// Adds the serialized sizes of the rows of 'vector' in 'ranges[i]' to + /// '*sizes[i]'. + virtual void estimateSerializedSize( + VectorPtr vector, + const folly::Range& ranges, + vector_size_t** sizes, + Scratch& scratch) { + VELOX_UNSUPPORTED(); + } + virtual void estimateSerializedSize( VectorPtr vector, const folly::Range& ranges, - vector_size_t** sizes) = 0; + vector_size_t** sizes) { + Scratch scratch; + estimateSerializedSize(vector, ranges, sizes, scratch); + } virtual std::unique_ptr createSerializer( RowTypePtr type, @@ -153,14 +197,43 @@ class VectorStreamGroup : public StreamArena { int32_t numRows, const VectorSerde::Options* options = nullptr); + /// Increments sizes[i] for each ith row in 'rows' in 'vector'. static void estimateSerializedSize( VectorPtr vector, + folly::Range rows, + vector_size_t** sizes, + Scratch& scratch); + + static void estimateSerializedSize( + VectorPtr vector, + const folly::Range& ranges, + vector_size_t** sizes, + Scratch& scratch); + + static inline void estimateSerializedSize( + VectorPtr vector, + const folly::Range& ranges, + vector_size_t** sizes) { + Scratch scratch; + estimateSerializedSize(vector, ranges, sizes, scratch); + } + + void append( + const RowVectorPtr& vector, const folly::Range& ranges, - vector_size_t** sizes); + Scratch& scratch); + + void append( + const RowVectorPtr& vector, + const folly::Range& ranges) { + Scratch scratch; + append(vector, ranges, scratch); + } void append( const RowVectorPtr& vector, - const folly::Range& ranges); + const folly::Range& rows, + Scratch& scratch); void append(const RowVectorPtr& vector);