diff --git a/velox/docs/develop/serde/compactrow.rst b/velox/docs/develop/serde/compactrow.rst index bb40c909b87e..230a2744bc10 100644 --- a/velox/docs/develop/serde/compactrow.rst +++ b/velox/docs/develop/serde/compactrow.rst @@ -38,6 +38,9 @@ TIMESTAMP 8 UNKNOWN 0 ================ ============================================== +Timestamps are serialized with microsecond precision to align with Spark's +handling of timestamps. + Strings (VARCHAR and VARBINARY) use 4 bytes for size plus the length of the string. Empty string uses 4 bytes. 1-character string uses 5 bytes. 20-character ASCII string uses 24 bytes. Null strings do not take up space diff --git a/velox/row/CMakeLists.txt b/velox/row/CMakeLists.txt index 9eb9cb524682..91e0266dcf7a 100644 --- a/velox/row/CMakeLists.txt +++ b/velox/row/CMakeLists.txt @@ -19,3 +19,7 @@ velox_link_libraries(velox_row_fast PUBLIC velox_vector) if(${VELOX_BUILD_TESTING}) add_subdirectory(tests) endif() + +if(${VELOX_ENABLE_BENCHMARKS}) + add_subdirectory(benchmarks) +endif() diff --git a/velox/row/CompactRow.cpp b/velox/row/CompactRow.cpp index e692e490df2b..8e59358db31f 100644 --- a/velox/row/CompactRow.cpp +++ b/velox/row/CompactRow.cpp @@ -17,6 +17,183 @@ #include "velox/vector/FlatVector.h" namespace facebook::velox::row { +namespace { +constexpr size_t kSizeBytes = sizeof(int32_t); + +void writeInt32(char* buffer, int32_t n) { + ::memcpy(buffer, &n, kSizeBytes); +} + +int32_t readInt32(const char* buffer) { + int32_t n; + ::memcpy(&n, buffer, kSizeBytes); + return n; +} + +FOLLY_ALWAYS_INLINE void writeFixedWidth( + const char* rawData, + vector_size_t index, + size_t valueBytes, + char* buffer, + size_t& offset) { + ::memcpy(buffer + offset, rawData + index * valueBytes, valueBytes); + offset += valueBytes; +} + +FOLLY_ALWAYS_INLINE void +writeTimestamp(const Timestamp& timestamp, char* buffer, size_t& offset) { + // Write micros(int64_t) for timestamp value. + const auto timeUs = timestamp.toMicros(); + ::memcpy(buffer + offset, &timeUs, sizeof(int64_t)); + offset += sizeof(int64_t); +} + +FOLLY_ALWAYS_INLINE void +writeString(const StringView& value, char* buffer, size_t& offset) { + writeInt32(buffer + offset, value.size()); + if (!value.empty()) { + ::memcpy(buffer + offset + kSizeBytes, value.data(), value.size()); + } + offset += kSizeBytes + value.size(); +} + +// Serialize the child vector of a row type within a range of consecutive rows. +// Write the serialized data at offsets of buffer row by row. +// Update offsets with the actual serialized size. +template +void serializeTyped( + const raw_vector& rows, + uint32_t childIdx, + const DecodedVector& decoded, + size_t valueBytes, + const raw_vector& nulls, + char* buffer, + std::vector& offsets) { + const auto* rawData = decoded.data(); + if (!decoded.mayHaveNulls()) { + for (auto i = 0; i < rows.size(); ++i) { + writeFixedWidth( + rawData, decoded.index(rows[i]), valueBytes, buffer, offsets[i]); + } + } else { + for (auto i = 0; i < rows.size(); ++i) { + if (decoded.isNullAt(rows[i])) { + bits::setBit(nulls[i], childIdx, true); + offsets[i] += valueBytes; + } else { + writeFixedWidth( + rawData, decoded.index(rows[i]), valueBytes, buffer, offsets[i]); + } + } + } +} + +template <> +void serializeTyped( + const raw_vector& rows, + uint32_t childIdx, + const DecodedVector& /* unused */, + size_t /* unused */, + const raw_vector& nulls, + char* /* unused */, + std::vector& /* unused */) { + for (auto i = 0; i < rows.size(); ++i) { + bits::setBit(nulls[i], childIdx, true); + } +} + +template <> +void serializeTyped( + const raw_vector& rows, + uint32_t childIdx, + const DecodedVector& decoded, + size_t /* unused */, + const raw_vector& nulls, + char* buffer, + std::vector& offsets) { + auto* byte = reinterpret_cast(buffer); + if (!decoded.mayHaveNulls()) { + for (auto i = 0; i < rows.size(); ++i) { + byte[offsets[i]++] = decoded.valueAt(rows[i]); + } + } else { + for (auto i = 0; i < rows.size(); ++i) { + if (decoded.isNullAt(rows[i])) { + bits::setBit(nulls[i], childIdx, true); + offsets[i] += 1; + } else { + // Write 1 byte for bool type. + byte[offsets[i]++] = decoded.valueAt(rows[i]); + } + } + } +} + +template <> +void serializeTyped( + const raw_vector& rows, + uint32_t childIdx, + const DecodedVector& decoded, + size_t /* unused */, + const raw_vector& nulls, + char* buffer, + std::vector& offsets) { + const auto* rawData = decoded.data(); + if (!decoded.mayHaveNulls()) { + for (auto i = 0; i < rows.size(); ++i) { + const auto index = decoded.index(rows[i]); + writeTimestamp(rawData[index], buffer, offsets[i]); + } + } else { + for (auto i = 0; i < rows.size(); ++i) { + if (decoded.isNullAt(rows[i])) { + bits::setBit(nulls[i], childIdx, true); + offsets[i] += sizeof(int64_t); + } else { + const auto index = decoded.index(rows[i]); + writeTimestamp(rawData[index], buffer, offsets[i]); + } + } + } +} + +template <> +void serializeTyped( + const raw_vector& rows, + uint32_t childIdx, + const DecodedVector& decoded, + size_t valueBytes, + const raw_vector& nulls, + char* buffer, + std::vector& offsets) { + if (!decoded.mayHaveNulls()) { + for (auto i = 0; i < rows.size(); ++i) { + writeString(decoded.valueAt(rows[i]), buffer, offsets[i]); + } + } else { + for (auto i = 0; i < rows.size(); ++i) { + if (decoded.isNullAt(rows[i])) { + bits::setBit(nulls[i], childIdx, true); + } else { + writeString(decoded.valueAt(rows[i]), buffer, offsets[i]); + } + } + } +} + +template <> +void serializeTyped( + const raw_vector& rows, + uint32_t childIdx, + const DecodedVector& decoded, + size_t valueBytes, + const raw_vector& nulls, + char* buffer, + std::vector& offsets) { + serializeTyped( + rows, childIdx, decoded, valueBytes, nulls, buffer, offsets); +} +} // namespace CompactRow::CompactRow(const RowVectorPtr& vector) : typeKind_{vector->typeKind()}, decoded_{*vector} { @@ -184,6 +361,61 @@ int32_t CompactRow::serializeRow(vector_size_t index, char* buffer) { return valuesOffset; } +void CompactRow::serializeRow( + vector_size_t offset, + vector_size_t size, + char* buffer, + const size_t* bufferOffsets) { + raw_vector rows(size); + raw_vector nulls(size); + if (decoded_.isIdentityMapping()) { + std::iota(rows.begin(), rows.end(), offset); + } else { + for (auto i = 0; i < size; ++i) { + rows[i] = decoded_.index(offset + i); + } + } + + // After serializing each column, the 'offsets' are updated accordingly. + std::vector offsets(size); + auto* const base = reinterpret_cast(buffer); + for (auto i = 0; i < size; ++i) { + nulls[i] = base + bufferOffsets[i]; + offsets[i] = bufferOffsets[i] + rowNullBytes_; + } + + // Fixed-width and varchar/varbinary types are serialized using the vectorized + // API 'serializedTyped'. Other data types are serialized row-by-row. + for (auto childIdx = 0; childIdx < children_.size(); ++childIdx) { + auto& child = children_[childIdx]; + if (childIsFixedWidth_[childIdx] || + child.typeKind_ == TypeKind::VARBINARY || + child.typeKind_ == TypeKind::VARCHAR) { + VELOX_DYNAMIC_SCALAR_TYPE_DISPATCH_ALL( + serializeTyped, + child.typeKind_, + rows, + childIdx, + child.decoded_, + child.valueBytes_, + nulls, + buffer, + offsets); + } else { + const bool mayHaveNulls = child.decoded_.mayHaveNulls(); + for (auto i = 0; i < rows.size(); ++i) { + if (mayHaveNulls && child.isNullAt(rows[i])) { + bits::setBit(nulls[i], childIdx, true); + } else { + // Write non-null variable-width value. + offsets[i] += + child.serializeVariableWidth(rows[i], buffer + offsets[i]); + } + } + } + } +} + bool CompactRow::isNullAt(vector_size_t index) { return decoded_.isNullAt(index); } @@ -281,21 +513,6 @@ int32_t CompactRow::serializeArray(vector_size_t index, char* buffer) { children_[0], offset, size, childIsFixedWidth_[0], buffer); } -namespace { - -constexpr size_t kSizeBytes = sizeof(int32_t); - -void writeInt32(char* buffer, int32_t n) { - memcpy(buffer, &n, sizeof(int32_t)); -} - -int32_t readInt32(const char* buffer) { - int32_t n; - memcpy(&n, buffer, sizeof(int32_t)); - return n; -} -} // namespace - int32_t CompactRow::serializeAsArray( CompactRow& elements, vector_size_t offset, @@ -420,6 +637,14 @@ int32_t CompactRow::serialize(vector_size_t index, char* buffer) { return serializeRow(index, buffer); } +void CompactRow::serialize( + vector_size_t offset, + vector_size_t size, + const size_t* bufferOffsets, + char* buffer) { + serializeRow(offset, size, buffer, bufferOffsets); +} + void CompactRow::serializeFixedWidth(vector_size_t index, char* buffer) { VELOX_DCHECK(fixedWidthTypeKind_); switch (typeKind_) { @@ -428,11 +653,11 @@ void CompactRow::serializeFixedWidth(vector_size_t index, char* buffer) { break; case TypeKind::TIMESTAMP: { auto micros = decoded_.valueAt(index).toMicros(); - memcpy(buffer, µs, sizeof(int64_t)); + ::memcpy(buffer, µs, sizeof(int64_t)); break; } default: - memcpy( + ::memcpy( buffer, decoded_.data() + decoded_.index(index) * valueBytes_, valueBytes_); @@ -446,7 +671,7 @@ void CompactRow::serializeFixedWidth( VELOX_DCHECK(supportsBulkCopy_); // decoded_.data() can be null if all values are null. if (decoded_.data()) { - memcpy( + ::memcpy( buffer, decoded_.data() + decoded_.index(offset) * valueBytes_, valueBytes_ * size); @@ -461,7 +686,7 @@ int32_t CompactRow::serializeVariableWidth(vector_size_t index, char* buffer) { auto value = decoded_.valueAt(index); writeInt32(buffer, value.size()); if (!value.empty()) { - memcpy(buffer + kSizeBytes, value.data(), value.size()); + ::memcpy(buffer + kSizeBytes, value.data(), value.size()); } return kSizeBytes + value.size(); } @@ -490,11 +715,11 @@ void readFixedWidthValue( flatVector->setNull(index, true); } else if constexpr (std::is_same_v) { int64_t micros; - memcpy(µs, buffer, sizeof(int64_t)); + ::memcpy(µs, buffer, sizeof(int64_t)); flatVector->set(index, Timestamp::fromMicros(micros)); } else { T value; - memcpy(&value, buffer, sizeof(T)); + ::memcpy(&value, buffer, sizeof(T)); flatVector->set(index, value); } } diff --git a/velox/row/CompactRow.h b/velox/row/CompactRow.h index 9abaed0bdee2..323ccc66dde6 100644 --- a/velox/row/CompactRow.h +++ b/velox/row/CompactRow.h @@ -15,6 +15,7 @@ */ #pragma once +#include "velox/common/base/RawVector.h" #include "velox/vector/ComplexVector.h" #include "velox/vector/DecodedVector.h" @@ -36,6 +37,18 @@ class CompactRow { /// 'buffer' must have sufficient capacity and set to all zeros. int32_t serialize(vector_size_t index, char* buffer); + /// Serializes rows in the range [offset, offset + size) into 'buffer' at + /// given 'bufferOffsets'. 'buffer' must have sufficient capacity and set to + /// all zeros for null-bits handling. 'bufferOffsets' must be pre-filled with + /// the write offsets for each row and must be accessible for 'size' elements. + /// The caller must ensure that the space between each offset in + /// 'bufferOffsets' is no less than the 'fixedRowSize' or 'rowSize'. + void serialize( + vector_size_t offset, + vector_size_t size, + const size_t* bufferOffsets, + char* buffer); + /// Deserializes multiple rows into a RowVector of specified type. The type /// must match the contents of the serialized rows. static RowVectorPtr deserialize( @@ -108,6 +121,14 @@ class CompactRow { /// Serializes struct value to buffer. Value must not be null. int32_t serializeRow(vector_size_t index, char* buffer); + /// Serializes struct values in range [offset, offset + size) to buffer. + /// Value must not be null. + void serializeRow( + vector_size_t offset, + vector_size_t size, + char* buffer, + const size_t* bufferOffsets); + const TypeKind typeKind_; DecodedVector decoded_; diff --git a/velox/row/benchmarks/CMakeLists.txt b/velox/row/benchmarks/CMakeLists.txt new file mode 100644 index 000000000000..34e863100722 --- /dev/null +++ b/velox/row/benchmarks/CMakeLists.txt @@ -0,0 +1,24 @@ +# Copyright (c) Facebook, Inc. and its affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +add_executable(velox_unsafe_row_serialize_benchmark + UnsafeRowSerializeBenchmark.cpp) +target_link_libraries( + velox_unsafe_row_serialize_benchmark + PRIVATE + velox_exec + velox_row_fast + velox_vector_fuzzer + Folly::folly + ${FOLLY_BENCHMARK}) diff --git a/velox/row/benchmark/DynamicRowVectorDeserializeBenchmark.cpp b/velox/row/benchmarks/DynamicRowVectorDeserializeBenchmark.cpp similarity index 100% rename from velox/row/benchmark/DynamicRowVectorDeserializeBenchmark.cpp rename to velox/row/benchmarks/DynamicRowVectorDeserializeBenchmark.cpp diff --git a/velox/row/benchmark/UnsafeRowSerializeBenchmark.cpp b/velox/row/benchmarks/UnsafeRowSerializeBenchmark.cpp similarity index 86% rename from velox/row/benchmark/UnsafeRowSerializeBenchmark.cpp rename to velox/row/benchmarks/UnsafeRowSerializeBenchmark.cpp index 388908ecd718..4bc4027798c2 100644 --- a/velox/row/benchmark/UnsafeRowSerializeBenchmark.cpp +++ b/velox/row/benchmarks/UnsafeRowSerializeBenchmark.cpp @@ -58,20 +58,31 @@ class SerializeBenchmark { auto data = makeData(rowType); suspender.dismiss(); + const auto numRows = data->size(); + std::vector rowSize(numRows); + std::vector offsets(numRows); + CompactRow compact(data); - auto totalSize = computeTotalSize(compact, rowType, data->size()); - auto buffer = AlignedBuffer::allocate(totalSize, pool()); - auto serialized = serialize(compact, data->size(), buffer); - VELOX_CHECK_EQ(serialized.size(), data->size()); + auto totalSize = + computeTotalSize(compact, rowType, numRows, rowSize, offsets); + auto buffer = AlignedBuffer::allocate(totalSize, pool(), 0); + auto serialized = serialize(compact, numRows, buffer, rowSize, offsets); + VELOX_CHECK_EQ(serialized.size(), numRows); } void deserializeCompact(const RowTypePtr& rowType) { folly::BenchmarkSuspender suspender; auto data = makeData(rowType); + + const auto numRows = data->size(); + std::vector rowSize(numRows); + std::vector offsets(numRows); + CompactRow compact(data); - auto totalSize = computeTotalSize(compact, rowType, data->size()); - auto buffer = AlignedBuffer::allocate(totalSize, pool()); - auto serialized = serialize(compact, data->size(), buffer); + auto totalSize = + computeTotalSize(compact, rowType, numRows, rowSize, offsets); + auto buffer = AlignedBuffer::allocate(totalSize, pool(), 0); + auto serialized = serialize(compact, numRows, buffer, rowSize, offsets); suspender.dismiss(); auto copy = CompactRow::deserialize(serialized, rowType, pool()); @@ -155,32 +166,40 @@ class SerializeBenchmark { size_t computeTotalSize( CompactRow& compactRow, const RowTypePtr& rowType, - vector_size_t numRows) { + vector_size_t numRows, + std::vector& rowSize, + std::vector& offsets) { size_t totalSize = 0; if (auto fixedRowSize = CompactRow::fixedRowSize(rowType)) { - totalSize += fixedRowSize.value() * numRows; + totalSize = fixedRowSize.value() * numRows; + for (auto i = 0; i < numRows; ++i) { + rowSize[i] = fixedRowSize.value(); + offsets[i] = fixedRowSize.value() * i; + } } else { for (auto i = 0; i < numRows; ++i) { - auto rowSize = compactRow.rowSize(i); - totalSize += rowSize; + rowSize[i] = compactRow.rowSize(i); + offsets[i] = totalSize; + totalSize += rowSize[i]; } } return totalSize; } - std::vector - serialize(CompactRow& compactRow, vector_size_t numRows, BufferPtr& buffer) { - std::vector serialized; + std::vector serialize( + CompactRow& compactRow, + vector_size_t numRows, + BufferPtr& buffer, + const std::vector& rowSize, + const std::vector& offsets) { auto rawBuffer = buffer->asMutable(); + compactRow.serialize(0, numRows, offsets.data(), rawBuffer); - size_t offset = 0; + std::vector serialized; for (auto i = 0; i < numRows; ++i) { - auto rowSize = compactRow.serialize(i, rawBuffer + offset); - serialized.push_back(std::string_view(rawBuffer + offset, rowSize)); - offset += rowSize; + serialized.push_back( + std::string_view(rawBuffer + offsets[i], rowSize[i])); } - - VELOX_CHECK_EQ(buffer->size(), offset); return serialized; } diff --git a/velox/row/tests/CompactRowTest.cpp b/velox/row/tests/CompactRowTest.cpp index 00ef636a0e30..1fc2e70c52b5 100644 --- a/velox/row/tests/CompactRowTest.cpp +++ b/velox/row/tests/CompactRowTest.cpp @@ -49,35 +49,68 @@ class CompactRowTest : public ::testing::Test, public VectorTestBase { auto rowType = asRowType(data->type()); auto numRows = data->size(); + std::vector rowSize(numRows); + std::vector offsets(numRows); CompactRow row(data); size_t totalSize = 0; if (auto fixedRowSize = CompactRow::fixedRowSize(rowType)) { totalSize = fixedRowSize.value() * numRows; + for (auto i = 0; i < numRows; ++i) { + rowSize[i] = fixedRowSize.value(); + offsets[i] = fixedRowSize.value() * i; + } } else { for (auto i = 0; i < numRows; ++i) { - totalSize += row.rowSize(i); + rowSize[i] = row.rowSize(i); + offsets[i] = totalSize; + totalSize += rowSize[i]; } } - std::vector serialized; - BufferPtr buffer = AlignedBuffer::allocate(totalSize, pool(), 0); auto* rawBuffer = buffer->asMutable(); - size_t offset = 0; - for (auto i = 0; i < numRows; ++i) { - auto size = row.serialize(i, rawBuffer + offset); - serialized.push_back(std::string_view(rawBuffer + offset, size)); - offset += size; + { + // Test serialize row-by-row. + size_t offset = 0; + std::vector serialized; + for (auto i = 0; i < numRows; ++i) { + auto size = row.serialize(i, rawBuffer + offset); + serialized.push_back(std::string_view(rawBuffer + offset, size)); + offset += size; - VELOX_CHECK_EQ(size, row.rowSize(i), "Row {}: {}", i, data->toString(i)); - } + VELOX_CHECK_EQ( + size, row.rowSize(i), "Row {}: {}", i, data->toString(i)); + } - VELOX_CHECK_EQ(offset, totalSize); + VELOX_CHECK_EQ(offset, totalSize); - auto copy = CompactRow::deserialize(serialized, rowType, pool()); - assertEqualVectors(data, copy); + auto copy = CompactRow::deserialize(serialized, rowType, pool()); + assertEqualVectors(data, copy); + } + { + // Test serialize by range. + memset(rawBuffer, 0, totalSize); + + std::vector serialized; + vector_size_t offset = 0; + vector_size_t rangeSize = 1; + // Serialize with different range size. + while (offset < numRows) { + auto size = std::min(rangeSize, numRows - offset); + row.serialize(offset, size, offsets.data() + offset, rawBuffer); + offset += size; + rangeSize = checkedMultiply(rangeSize, 2); + } + + for (auto i = 0; i < numRows; ++i) { + serialized.push_back( + std::string_view(rawBuffer + offsets[i], rowSize[i])); + } + auto copy = CompactRow::deserialize(serialized, rowType, pool()); + assertEqualVectors(data, copy); + } } }; diff --git a/velox/serializers/CMakeLists.txt b/velox/serializers/CMakeLists.txt index 72a069d8ca55..24f3b04dbe30 100644 --- a/velox/serializers/CMakeLists.txt +++ b/velox/serializers/CMakeLists.txt @@ -19,3 +19,7 @@ velox_link_libraries(velox_presto_serializer velox_vector velox_row_fast) if(${VELOX_BUILD_TESTING}) add_subdirectory(tests) endif() + +if(${VELOX_ENABLE_BENCHMARKS}) + add_subdirectory(benchmarks) +endif() diff --git a/velox/serializers/CompactRowSerializer.cpp b/velox/serializers/CompactRowSerializer.cpp index 70407c0d4f47..524184ca9200 100644 --- a/velox/serializers/CompactRowSerializer.cpp +++ b/velox/serializers/CompactRowSerializer.cpp @@ -40,17 +40,28 @@ class CompactRowVectorSerializer : public IterativeVectorSerializer { const folly::Range& ranges, Scratch& scratch) override { size_t totalSize = 0; + const auto totalRows = std::accumulate( + ranges.begin(), + ranges.end(), + 0, + [](vector_size_t sum, const auto& range) { return sum + range.size; }); + + if (totalRows == 0) { + return; + } + row::CompactRow row(vector); + std::vector rowSize(totalRows); if (auto fixedRowSize = row::CompactRow::fixedRowSize(asRowType(vector->type()))) { - for (const auto& range : ranges) { - totalSize += (fixedRowSize.value() + sizeof(TRowSize)) * range.size; - } - + totalSize += (fixedRowSize.value() + sizeof(TRowSize)) * totalRows; + std::fill(rowSize.begin(), rowSize.end(), fixedRowSize.value()); } else { + vector_size_t index = 0; for (const auto& range : ranges) { - for (auto i = range.begin; i < range.begin + range.size; ++i) { - totalSize += row.rowSize(i) + sizeof(TRowSize); + for (auto i = 0; i < range.size; ++i, ++index) { + rowSize[index] = row.rowSize(range.begin + i); + totalSize += rowSize[index] + sizeof(TRowSize); } } } @@ -60,18 +71,29 @@ class CompactRowVectorSerializer : public IterativeVectorSerializer { } BufferPtr buffer = AlignedBuffer::allocate(totalSize, pool_, 0); - auto rawBuffer = buffer->asMutable(); + auto* rawBuffer = buffer->asMutable(); buffers_.push_back(std::move(buffer)); size_t offset = 0; - for (auto& range : ranges) { - for (auto i = range.begin; i < range.begin + range.size; ++i) { - // Write row data. - TRowSize size = row.serialize(i, rawBuffer + offset + sizeof(TRowSize)); - - // Write raw size. Needs to be in big endian order. - *(TRowSize*)(rawBuffer + offset) = folly::Endian::big(size); - offset += sizeof(TRowSize) + size; + vector_size_t index = 0; + for (const auto& range : ranges) { + if (range.size == 1) { + // Fast path for single-row serialization. + *(TRowSize*)(rawBuffer + offset) = folly::Endian::big(rowSize[index]); + auto size = + row.serialize(range.begin, rawBuffer + offset + sizeof(TRowSize)); + offset += size + sizeof(TRowSize); + ++index; + } else { + raw_vector offsets(range.size); + for (auto i = 0; i < range.size; ++i, ++index) { + // Write raw size. Needs to be in big endian order. + *(TRowSize*)(rawBuffer + offset) = folly::Endian::big(rowSize[index]); + offsets[i] = offset + sizeof(TRowSize); + offset += rowSize[index] + sizeof(TRowSize); + } + // Write row data for all rows in range. + row.serialize(range.begin, range.size, offsets.data(), rawBuffer); } } } diff --git a/velox/serializers/benchmarks/CMakeLists.txt b/velox/serializers/benchmarks/CMakeLists.txt new file mode 100644 index 000000000000..d0efd1d14135 --- /dev/null +++ b/velox/serializers/benchmarks/CMakeLists.txt @@ -0,0 +1,22 @@ +# Copyright (c) Facebook, Inc. and its affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +add_executable(velox_row_serializer_benchmark RowSerializerBenchmark.cpp) + +target_link_libraries( + velox_row_serializer_benchmark + velox_presto_serializer + velox_vector_fuzzer + velox_memory + Folly::folly + ${FOLLY_BENCHMARK}) diff --git a/velox/serializers/benchmarks/RowSerializerBenchmark.cpp b/velox/serializers/benchmarks/RowSerializerBenchmark.cpp new file mode 100644 index 000000000000..9443cdedbfa5 --- /dev/null +++ b/velox/serializers/benchmarks/RowSerializerBenchmark.cpp @@ -0,0 +1,150 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include +#include + +#include "velox/serializers/CompactRowSerializer.h" +#include "velox/vector/fuzzer/VectorFuzzer.h" + +namespace facebook::velox::test { +namespace { +class RowSerializerBenchmark { + public: + void compactRowVectorSerde( + const RowTypePtr& rowType, + vector_size_t rangeSize) { + serializer::CompactRowVectorSerde::registerVectorSerde(); + serialize(rowType, rangeSize); + deregisterVectorSerde(); + } + + private: + void serialize(const RowTypePtr& rowType, vector_size_t rangeSize) { + folly::BenchmarkSuspender suspender; + auto data = makeData(rowType); + std::vector indexRanges; + for (auto begin = 0; begin < data->size(); begin += rangeSize) { + indexRanges.push_back( + IndexRange{begin, std::min(rangeSize, data->size() - begin)}); + } + suspender.dismiss(); + + Scratch scratch; + auto group = std::make_unique(pool_.get()); + group->createStreamTree(rowType, data->size()); + group->append( + data, folly::Range(indexRanges.data(), indexRanges.size()), scratch); + + std::stringstream stream; + OStreamOutputStream outputStream(&stream); + group->flush(&outputStream); + } + + RowVectorPtr makeData(const RowTypePtr& rowType) { + VectorFuzzer::Options options; + options.vectorSize = 1'000; + + const auto seed = 1; // For reproducibility. + VectorFuzzer fuzzer(options, pool_.get(), seed); + + return fuzzer.fuzzInputRow(rowType); + } + + std::shared_ptr pool_{ + memory::memoryManager()->addLeafPool()}; +}; + +#define VECTOR_SERDE_BENCHMARKS(name, rowType) \ + BENCHMARK(compact_serialize_1_##name) { \ + RowSerializerBenchmark benchmark; \ + benchmark.compactRowVectorSerde(rowType, 1); \ + } \ + BENCHMARK(compact_serialize_10_##name) { \ + RowSerializerBenchmark benchmark; \ + benchmark.compactRowVectorSerde(rowType, 10); \ + } \ + BENCHMARK(compact_serialize_100_##name) { \ + RowSerializerBenchmark benchmark; \ + benchmark.compactRowVectorSerde(rowType, 100); \ + } \ + BENCHMARK(compact_serialize_1000_##name) { \ + RowSerializerBenchmark benchmark; \ + benchmark.compactRowVectorSerde(rowType, 1'000); \ + } + +VECTOR_SERDE_BENCHMARKS( + fixedWidth5, + ROW({BIGINT(), DOUBLE(), BOOLEAN(), TINYINT(), REAL()})); + +VECTOR_SERDE_BENCHMARKS( + fixedWidth10, + ROW({ + BIGINT(), + BIGINT(), + BIGINT(), + BIGINT(), + BIGINT(), + BIGINT(), + DOUBLE(), + BIGINT(), + BIGINT(), + BIGINT(), + })); + +VECTOR_SERDE_BENCHMARKS( + fixedWidth20, + ROW({ + BIGINT(), BIGINT(), BIGINT(), BIGINT(), BIGINT(), BIGINT(), BIGINT(), + BIGINT(), BIGINT(), BIGINT(), DOUBLE(), DOUBLE(), DOUBLE(), DOUBLE(), + DOUBLE(), DOUBLE(), DOUBLE(), DOUBLE(), BIGINT(), BIGINT(), + })); + +VECTOR_SERDE_BENCHMARKS( + decimal, + ROW({BIGINT(), DECIMAL(12, 2), DECIMAL(38, 18)})); + +VECTOR_SERDE_BENCHMARKS(strings1, ROW({BIGINT(), VARCHAR()})); + +VECTOR_SERDE_BENCHMARKS( + strings5, + ROW({ + BIGINT(), + VARCHAR(), + VARCHAR(), + VARCHAR(), + VARCHAR(), + VARCHAR(), + })); + +VECTOR_SERDE_BENCHMARKS(arrays, ROW({BIGINT(), ARRAY(BIGINT())})); + +VECTOR_SERDE_BENCHMARKS(nestedArrays, ROW({BIGINT(), ARRAY(ARRAY(BIGINT()))})); + +VECTOR_SERDE_BENCHMARKS(maps, ROW({BIGINT(), MAP(BIGINT(), REAL())})); + +VECTOR_SERDE_BENCHMARKS( + structs, + ROW({BIGINT(), ROW({BIGINT(), DOUBLE(), BOOLEAN(), TINYINT(), REAL()})})); + +} // namespace +} // namespace facebook::velox::test + +int main(int argc, char** argv) { + folly::Init init{&argc, &argv}; + facebook::velox::memory::MemoryManager::initialize({}); + folly::runBenchmarks(); + return 0; +} diff --git a/velox/serializers/tests/CompactRowSerializerTest.cpp b/velox/serializers/tests/CompactRowSerializerTest.cpp index 7dc49e83a49c..c0a091b5e0b4 100644 --- a/velox/serializers/tests/CompactRowSerializerTest.cpp +++ b/velox/serializers/tests/CompactRowSerializerTest.cpp @@ -36,9 +36,15 @@ class CompactRowSerializerTest : public ::testing::Test, void serialize(RowVectorPtr rowVector, std::ostream* output) { auto numRows = rowVector->size(); - std::vector rows(numRows); - for (int i = 0; i < numRows; i++) { - rows[i] = IndexRange{i, 1}; + // Serialize with different range size. + std::vector rows; + vector_size_t offset = 0; + vector_size_t rangeSize = 1; + while (offset < numRows) { + auto size = std::min(rangeSize, numRows - offset); + rows.push_back(IndexRange{offset, size}); + offset += size; + rangeSize = checkedMultiply(rangeSize, 2); } auto arena = std::make_unique(pool_.get()); @@ -47,7 +53,8 @@ class CompactRowSerializerTest : public ::testing::Test, serde_->createIterativeSerializer(rowType, numRows, arena.get()); Scratch scratch; - serializer->append(rowVector, folly::Range(rows.data(), numRows), scratch); + serializer->append( + rowVector, folly::Range(rows.data(), rows.size()), scratch); auto size = serializer->maxSerializedSize(); OStreamOutputStream out(output); serializer->flush(&out);