From 25b75f5c9286e125d87ab4accb02d36beda7e827 Mon Sep 17 00:00:00 2001 From: Zac Date: Tue, 5 Dec 2023 20:04:03 -0800 Subject: [PATCH] Rename ByteStream as ByteOutputStream (#7867) Summary: ByteStream class was used both for reading and writing to a multi-part buffer. Mixing reading and writing logic and APIs in a single class is confusing. https://github.com/facebookincubator/velox/pull/7541 has been landed to extract reading logic and APIs into ByteInputStream class. This change is to rename ByteStream as ByteOutputStream. Pull Request resolved: https://github.com/facebookincubator/velox/pull/7867 Reviewed By: xiaoxmeng Differential Revision: D51829923 Pulled By: zacw7 fbshipit-source-id: 543a124062738a1e72767d33d223c5314a2cff59 --- velox/common/memory/ByteStream.cpp | 45 ++++++++++--------- velox/common/memory/ByteStream.h | 16 +++---- velox/common/memory/HashStringAllocator.cpp | 14 +++--- velox/common/memory/HashStringAllocator.h | 12 ++--- velox/common/memory/tests/ByteStreamTest.cpp | 10 ++--- .../memory/tests/HashStringAllocatorTest.cpp | 10 ++--- velox/docs/develop/arena.rst | 28 ++++++------ velox/exec/AddressableNonNullValueList.cpp | 2 +- velox/exec/ContainerRowSerde.cpp | 26 +++++------ velox/exec/ContainerRowSerde.h | 6 ++- velox/exec/RowContainer.cpp | 6 +-- velox/exec/SortedAggregations.cpp | 2 +- velox/exec/Strings.cpp | 2 +- velox/exec/tests/ContainerRowSerdeTest.cpp | 4 +- .../lib/aggregates/SingleValueAccumulator.cpp | 2 +- velox/functions/lib/aggregates/ValueSet.cpp | 2 +- .../prestosql/aggregates/ValueList.cpp | 4 +- .../benchmark/UnsafeRowSerializeBenchmark.cpp | 2 +- velox/serializers/PrestoSerializer.cpp | 6 +-- .../tests/UnsafeRowSerializerTest.cpp | 3 +- 20 files changed, 107 insertions(+), 95 deletions(-) diff --git a/velox/common/memory/ByteStream.cpp b/velox/common/memory/ByteStream.cpp index 99055d25e4e6..8fce27881688 100644 --- a/velox/common/memory/ByteStream.cpp +++ b/velox/common/memory/ByteStream.cpp @@ -81,7 +81,7 @@ std::streampos ByteInputStream::tellp() const { } size += range.size; } - VELOX_FAIL("ByteStream 'current_' is not in 'ranges_'."); + VELOX_FAIL("ByteInputStream 'current_' is not in 'ranges_'."); } void ByteInputStream::seekp(std::streampos position) { @@ -106,7 +106,7 @@ void ByteInputStream::next(bool throwIfPastEnd) { VELOX_CHECK_LT(position, ranges_.size()); if (position == ranges_.size() - 1) { if (throwIfPastEnd) { - VELOX_FAIL("Reading past end of ByteStream"); + VELOX_FAIL("Reading past end of ByteInputStream"); } return; } @@ -167,7 +167,7 @@ void ByteInputStream::skip(int32_t size) { } } -size_t ByteStream::size() const { +size_t ByteOutputStream::size() const { if (ranges_.empty()) { return 0; } @@ -178,7 +178,7 @@ size_t ByteStream::size() const { return total + std::max(ranges_.back().position, lastRangeEnd_); } -void ByteStream::appendBool(bool value, int32_t count) { +void ByteOutputStream::appendBool(bool value, int32_t count) { if (count == 1 && current_->size > current_->position) { bits::setBit( reinterpret_cast(current_->buffer), @@ -206,7 +206,10 @@ void ByteStream::appendBool(bool value, int32_t count) { } } -void ByteStream::appendBits(const uint64_t* bits, int32_t begin, int32_t end) { +void ByteOutputStream::appendBits( + const uint64_t* bits, + int32_t begin, + int32_t end) { VELOX_DCHECK(isBits_); int32_t count = end - begin; int32_t offset = 0; @@ -229,11 +232,11 @@ void ByteStream::appendBits(const uint64_t* bits, int32_t begin, int32_t end) { } } -void ByteStream::appendStringView(StringView value) { +void ByteOutputStream::appendStringView(StringView value) { appendStringView((std::string_view)value); } -void ByteStream::appendStringView(std::string_view value) { +void ByteOutputStream::appendStringView(std::string_view value) { const int32_t bytes = value.size(); int32_t offset = 0; for (;;) { @@ -250,7 +253,7 @@ void ByteStream::appendStringView(std::string_view value) { } } -std::streampos ByteStream::tellp() const { +std::streampos ByteOutputStream::tellp() const { if (ranges_.empty()) { return 0; } @@ -262,10 +265,10 @@ std::streampos ByteStream::tellp() const { } size += range.size; } - VELOX_FAIL("ByteStream 'current_' is not in 'ranges_'."); + VELOX_FAIL("ByteOutputStream 'current_' is not in 'ranges_'."); } -void ByteStream::seekp(std::streampos position) { +void ByteOutputStream::seekp(std::streampos position) { int64_t toSkip = position; // Record how much was written pre-seek. updateEnd(); @@ -280,10 +283,10 @@ void ByteStream::seekp(std::streampos position) { } toSkip -= range.size; } - VELOX_FAIL("Seeking past end of ByteStream: {}", position); + VELOX_FAIL("Seeking past end of ByteOutputStream: {}", position); } -void ByteStream::flush(OutputStream* out) { +void ByteOutputStream::flush(OutputStream* out) { updateEnd(); for (int32_t i = 0; i < ranges_.size(); ++i) { int32_t count = i == ranges_.size() - 1 ? lastRangeEnd_ : ranges_[i].size; @@ -298,17 +301,17 @@ void ByteStream::flush(OutputStream* out) { } } -char* ByteStream::writePosition() { +char* ByteOutputStream::writePosition() { if (ranges_.empty()) { return nullptr; } return reinterpret_cast(current_->buffer) + current_->position; } -void ByteStream::extend(int32_t bytes) { +void ByteOutputStream::extend(int32_t bytes) { if (current_ && current_->position != current_->size) { - LOG(FATAL) << "Extend ByteStream before range full: " << current_->position - << " vs. " << current_->size; + LOG(FATAL) << "Extend ByteOutputStream before range full: " + << current_->position << " vs. " << current_->size; } // Check if rewriting existing content. If so, move to next range and start at @@ -333,7 +336,7 @@ void ByteStream::extend(int32_t bytes) { } } -int32_t ByteStream::newRangeSize(int32_t bytes) const { +int32_t ByteOutputStream::newRangeSize(int32_t bytes) const { const int32_t newSize = allocatedBytes_ + bytes; if (newSize < 128) { return 128; @@ -347,7 +350,7 @@ int32_t ByteStream::newRangeSize(int32_t bytes) const { return bits::roundUp(bytes, memory::AllocationTraits::kPageSize); } -ByteInputStream ByteStream::inputStream() const { +ByteInputStream ByteOutputStream::inputStream() const { VELOX_CHECK(!ranges_.empty()); updateEnd(); auto rangeCopy = ranges_; @@ -355,10 +358,10 @@ ByteInputStream ByteStream::inputStream() const { return ByteInputStream(std::move(rangeCopy)); } -std::string ByteStream::toString() const { +std::string ByteOutputStream::toString() const { std::stringstream oss; - oss << "ByteStream[lastRangeEnd " << lastRangeEnd_ << ", " << ranges_.size() - << " ranges (position/size) ["; + oss << "ByteOutputStream[lastRangeEnd " << lastRangeEnd_ << ", " + << ranges_.size() << " ranges (position/size) ["; for (const auto& range : ranges_) { oss << "(" << range.position << "/" << range.size << (&range == current_ ? " current" : "") << ")"; diff --git a/velox/common/memory/ByteStream.h b/velox/common/memory/ByteStream.h index 2e36a202a7ec..349c63e185eb 100644 --- a/velox/common/memory/ByteStream.h +++ b/velox/common/memory/ByteStream.h @@ -214,18 +214,18 @@ class ByteInputStream { /// in hash tables. The stream is seekable and supports overwriting of /// previous content, for example, writing a message body and then /// seeking back to start to write a length header. -class ByteStream { +class ByteOutputStream { public: /// For output. - ByteStream( + ByteOutputStream( StreamArena* arena, bool isBits = false, bool isReverseBitOrder = false) : arena_(arena), isBits_(isBits), isReverseBitOrder_(isReverseBitOrder) {} - ByteStream(const ByteStream& other) = delete; + ByteOutputStream(const ByteOutputStream& other) = delete; - void operator=(const ByteStream& other) = delete; + void operator=(const ByteOutputStream& other) = delete; /// Sets 'this' to range over 'range'. If this is for purposes of writing, /// lastWrittenPosition specifies the end of any pre-existing content in @@ -401,7 +401,7 @@ class ByteStream { template class AppendWindow { public: - AppendWindow(ByteStream& stream, Scratch& scratch) + AppendWindow(ByteOutputStream& stream, Scratch& scratch) : stream_(stream), scratchPtr_(scratch) {} ~AppendWindow() { @@ -417,7 +417,7 @@ class AppendWindow { } private: - ByteStream& stream_; + ByteOutputStream& stream_; ScratchPtr scratchPtr_; }; @@ -443,7 +443,7 @@ class IOBufOutputStream : public OutputStream { int32_t initialSize = memory::AllocationTraits::kPageSize) : OutputStream(listener), arena_(std::make_shared(&pool)), - out_(std::make_unique(arena_.get())) { + out_(std::make_unique(arena_.get())) { out_->startWrite(initialSize); } @@ -464,7 +464,7 @@ class IOBufOutputStream : public OutputStream { private: std::shared_ptr arena_; - std::unique_ptr out_; + std::unique_ptr out_; }; } // namespace facebook::velox diff --git a/velox/common/memory/HashStringAllocator.cpp b/velox/common/memory/HashStringAllocator.cpp index 4046882974a1..d82bd0b80ff1 100644 --- a/velox/common/memory/HashStringAllocator.cpp +++ b/velox/common/memory/HashStringAllocator.cpp @@ -138,7 +138,7 @@ ByteInputStream HashStringAllocator::prepareRead(const Header* begin) { } HashStringAllocator::Position HashStringAllocator::newWrite( - ByteStream& stream, + ByteOutputStream& stream, int32_t preferredSize) { VELOX_CHECK( !currentHeader_, @@ -158,7 +158,9 @@ HashStringAllocator::Position HashStringAllocator::newWrite( return startPosition_; } -void HashStringAllocator::extendWrite(Position position, ByteStream& stream) { +void HashStringAllocator::extendWrite( + Position position, + ByteOutputStream& stream) { auto header = position.header; const auto offset = position.offset(); VELOX_CHECK_GE( @@ -184,7 +186,9 @@ void HashStringAllocator::extendWrite(Position position, ByteStream& stream) { } std::pair -HashStringAllocator::finishWrite(ByteStream& stream, int32_t numReserveBytes) { +HashStringAllocator::finishWrite( + ByteOutputStream& stream, + int32_t numReserveBytes) { VELOX_CHECK( currentHeader_, "Must call newWrite or extendWrite before finishWrite"); auto writePosition = stream.writePosition(); @@ -521,7 +525,7 @@ void HashStringAllocator::ensureAvailable(int32_t bytes, Position& position) { return; } - ByteStream stream(this); + ByteOutputStream stream(this); extendWrite(position, stream); static char data[128]; while (bytes) { @@ -593,7 +597,7 @@ void HashStringAllocator::copyMultipartNoInline( return; } // Write the string as non-contiguous chunks. - ByteStream stream(this, false, false); + ByteOutputStream stream(this, false, false); auto position = newWrite(stream, numBytes); stream.appendStringView(*string); finishWrite(stream, 0); diff --git a/velox/common/memory/HashStringAllocator.h b/velox/common/memory/HashStringAllocator.h index b5645f2b2632..a5e37fad338a 100644 --- a/velox/common/memory/HashStringAllocator.h +++ b/velox/common/memory/HashStringAllocator.h @@ -28,8 +28,8 @@ namespace facebook::velox { // Implements an arena backed by MappedMemory::Allocation. This is for backing -// ByteStream or for allocating single blocks. Blocks can be individually freed. -// Adjacent frees are coalesced and free blocks are kept in a free list. +// ByteOutputStream or for allocating single blocks. Blocks can be individually +// freed. Adjacent frees are coalesced and free blocks are kept in a free list. // Allocated blocks are prefixed with a Header. This has a size and flags. // kContinue means that last 8 bytes are a pointer to another Header after which // the contents of this allocation continue. kFree means the block is free. A @@ -263,11 +263,13 @@ class HashStringAllocator : public StreamArena { // kMinContiguous bytes of contiguous space. finishWrite finalizes // the allocation information after the write is done. // Returns the position at the start of the allocated block. - Position newWrite(ByteStream& stream, int32_t preferredSize = kMinContiguous); + Position newWrite( + ByteOutputStream& stream, + int32_t preferredSize = kMinContiguous); // Sets 'stream' to write starting at 'position'. If new ranges have to // be allocated when writing, headers will be updated accordingly. - void extendWrite(Position position, ByteStream& stream); + void extendWrite(Position position, ByteOutputStream& stream); // Completes a write prepared with newWrite or // extendWrite. Up to 'numReserveBytes' unused bytes, if available, are left @@ -275,7 +277,7 @@ class HashStringAllocator : public StreamArena { // positions: (1) position at the start of this 'write', (2) position // immediately after the last written byte. std::pair finishWrite( - ByteStream& stream, + ByteOutputStream& stream, int32_t numReserveBytes); /// Allocates a new range for a stream writing to 'this'. Sets the last word diff --git a/velox/common/memory/tests/ByteStreamTest.cpp b/velox/common/memory/tests/ByteStreamTest.cpp index 9d09f30e079e..66ad1d3e5d3e 100644 --- a/velox/common/memory/tests/ByteStreamTest.cpp +++ b/velox/common/memory/tests/ByteStreamTest.cpp @@ -236,7 +236,7 @@ TEST_F(ByteStreamTest, newRangeAllocation) { const auto prevAllocCount = pool_->stats().numAllocs; auto arena = newArena(); - ByteStream byteStream(arena.get()); + ByteOutputStream byteStream(arena.get()); byteStream.startWrite(0); for (int i = 0; i < testData.newRangeSizes.size(); ++i) { const auto newRangeSize = testData.newRangeSizes[i]; @@ -260,9 +260,9 @@ TEST_F(ByteStreamTest, newRangeAllocation) { TEST_F(ByteStreamTest, randomRangeAllocationFromMultiStreamsTest) { auto arena = newArena(); const int numByteStreams = 10; - std::vector> byteStreams; + std::vector> byteStreams; for (int i = 0; i < numByteStreams; ++i) { - byteStreams.push_back(std::make_unique(arena.get())); + byteStreams.push_back(std::make_unique(arena.get())); byteStreams.back()->startWrite(0); } const int testIterations = 1000; @@ -293,7 +293,7 @@ TEST_F(ByteStreamTest, bits) { bits.push_back(seed * (i + 1)); } auto arena = newArena(); - ByteStream bitStream(arena.get(), true); + ByteOutputStream bitStream(arena.get(), true); bitStream.startWrite(11); int32_t offset = 0; // Odd number of sizes. @@ -334,7 +334,7 @@ TEST_F(ByteStreamTest, appendWindow) { } auto arena = newArena(); - ByteStream stream(arena.get()); + ByteOutputStream stream(arena.get()); int32_t offset = 0; std::vector sizes = {1, 19, 52, 58, 129}; int32_t counter = 0; diff --git a/velox/common/memory/tests/HashStringAllocatorTest.cpp b/velox/common/memory/tests/HashStringAllocatorTest.cpp index 6b766ee5cb98..a0b517debb23 100644 --- a/velox/common/memory/tests/HashStringAllocatorTest.cpp +++ b/velox/common/memory/tests/HashStringAllocatorTest.cpp @@ -113,7 +113,7 @@ TEST_F(HashStringAllocatorTest, headerToString) { ASSERT_NO_THROW(allocator_->toString()); - ByteStream stream(allocator_.get()); + ByteOutputStream stream(allocator_.get()); auto h4 = allocator_->newWrite(stream).header; std::string data(123'456, 'x'); stream.appendStringView(data); @@ -164,7 +164,7 @@ TEST_F(HashStringAllocatorTest, allocateLarge) { } TEST_F(HashStringAllocatorTest, finishWrite) { - ByteStream stream(allocator_.get()); + ByteOutputStream stream(allocator_.get()); auto start = allocator_->newWrite(stream); // Write a short string. @@ -246,7 +246,7 @@ TEST_F(HashStringAllocatorTest, multipart) { continue; } auto chars = randomString(); - ByteStream stream(allocator_.get()); + ByteOutputStream stream(allocator_.get()); if (data[i].start.header) { if (rand32() % 5) { // 4/5 of cases append to the end. @@ -286,7 +286,7 @@ TEST_F(HashStringAllocatorTest, multipart) { } TEST_F(HashStringAllocatorTest, rewrite) { - ByteStream stream(allocator_.get()); + ByteOutputStream stream(allocator_.get()); auto header = allocator_->allocate(5); EXPECT_EQ(16, header->size()); // Rounds up to kMinAlloc. HSA::Position current = HSA::Position::atOffset(header, 0); @@ -567,7 +567,7 @@ TEST_F(HashStringAllocatorTest, sizeAndPosition) { allChars[i] = i; } - ByteStream stream(allocator_.get()); + ByteOutputStream stream(allocator_.get()); auto position = allocator_->newWrite(stream, 20); // Nothing written yet. EXPECT_EQ(0, stream.size()); diff --git a/velox/docs/develop/arena.rst b/velox/docs/develop/arena.rst index f6abb785903f..44c411dd2316 100644 --- a/velox/docs/develop/arena.rst +++ b/velox/docs/develop/arena.rst @@ -85,9 +85,9 @@ StlAllocator, an allocator backed by HashStringAllocator that can be used with STL containers, is implemented using the above allocate() and free() methods. NewWrite(), extendWrite() and finishWrite() methods allow for serializing -variable width data whose size is not known in advance using ByteStream. When -using ByteStream, the underlying data may come from multiple non-contiguous -blocks. ByteStream transparently manages allocation of additional blocks by +variable width data whose size is not known in advance using ByteOutputStream. When +using ByteOutputStream, the underlying data may come from multiple non-contiguous +blocks. ByteOutputStream transparently manages allocation of additional blocks by calling HashStringAllocator::newRange() method. .. code-block:: c++ @@ -97,19 +97,19 @@ calling HashStringAllocator::newRange() method. // kMinContiguous bytes of contiguous space. finishWrite finalizes // the allocation information after the write is done. // Returns the position at the start of the allocated block. - Position newWrite(ByteStream& stream, int32_t preferredSize = kMinContiguous); + Position newWrite(ByteOutputStream& stream, int32_t preferredSize = kMinContiguous); // Completes a write prepared with newWrite or // extendWrite. Up to 'numReserveBytes' unused bytes, if available, are left // after the end of the write to accommodate another write. Returns the // position immediately after the last written byte. - Position finishWrite(ByteStream& stream, int32_t numReserveBytes); + Position finishWrite(ByteOutputStream& stream, int32_t numReserveBytes); // Sets 'stream' to write starting at 'position'. If new ranges have to // be allocated when writing, headers will be updated accordingly. - void extendWrite(Position position, ByteStream& stream); + void extendWrite(Position position, ByteOutputStream& stream); -The prepareRead() method allows deserializing the data using ByteStream. +The prepareRead() method allows deserializing the data using ByteInputStream. .. code-block:: c++ @@ -117,7 +117,7 @@ The prepareRead() method allows deserializing the data using ByteStream. // possible continuation ranges. static void prepareRead( const Header* FOLLY_NONNULL header, - ByteStream& stream); + ByteInputStream& stream); Examples of Usage ----------------- @@ -139,22 +139,22 @@ The accumulator calls finishWrite() after writing the value. .. code-block:: c++ // Write first value - ByteStream stream(allocator); + ByteOutputStream stream(allocator); auto begin = allocator->newWrite(stream); // ... write to the stream allocator->finishWrite(stream); // Update the value - ByteStream stream(allocator); + ByteOutputStream stream(allocator); auto begin = allocator->extendWrite(begin, stream); // ... write to the stream allocator->finishWrite(stream); -The accumulator uses prepareRead() to read the data back using ByteStream. +The accumulator uses prepareRead() to read the data back using ByteInputStream. .. code-block:: c++ - ByteStream stream; + ByteInputStream stream; exec::HashStringAllocator::prepareRead(begin, stream); // … read from the stream @@ -174,13 +174,13 @@ write. .. code-block:: c++ // Write first value - ByteStream stream(allocator); + ByteOutputStream stream(allocator); auto begin = allocator->newWrite(stream); // ... write to the stream auto current = allocator->finishWrite(stream); // Update the value - ByteStream stream(allocator); + ByteOutputStream stream(allocator); auto begin = allocator->extendWrite(current, stream); // ... write to the stream allocator->finishWrite(stream); diff --git a/velox/exec/AddressableNonNullValueList.cpp b/velox/exec/AddressableNonNullValueList.cpp index 5ca2ce8b803e..a47ac65a7f3f 100644 --- a/velox/exec/AddressableNonNullValueList.cpp +++ b/velox/exec/AddressableNonNullValueList.cpp @@ -22,7 +22,7 @@ HashStringAllocator::Position AddressableNonNullValueList::append( const DecodedVector& decoded, vector_size_t index, HashStringAllocator* allocator) { - ByteStream stream(allocator); + ByteOutputStream stream(allocator); if (!firstHeader_) { // An array_agg or related begins with an allocation of 5 words and // 4 bytes for header. This is compact for small arrays (up to 5 diff --git a/velox/exec/ContainerRowSerde.cpp b/velox/exec/ContainerRowSerde.cpp index f01a69681c4b..f6bba959d34a 100644 --- a/velox/exec/ContainerRowSerde.cpp +++ b/velox/exec/ContainerRowSerde.cpp @@ -26,13 +26,13 @@ namespace { void serializeSwitch( const BaseVector& source, vector_size_t index, - ByteStream& out); + ByteOutputStream& out); template void serializeOne( const BaseVector& vector, vector_size_t index, - ByteStream& stream) { + ByteOutputStream& stream) { using T = typename TypeTraits::NativeType; stream.appendOne(vector.asUnchecked>()->valueAt(index)); } @@ -41,7 +41,7 @@ template <> void serializeOne( const BaseVector& vector, vector_size_t index, - ByteStream& stream) { + ByteOutputStream& stream) { auto string = vector.asUnchecked>()->valueAt(index); stream.appendOne(string.size()); stream.appendStringView(string); @@ -51,7 +51,7 @@ template <> void serializeOne( const BaseVector& vector, vector_size_t index, - ByteStream& stream) { + ByteOutputStream& stream) { auto string = vector.asUnchecked>()->valueAt(index); stream.appendOne(string.size()); stream.appendStringView(string); @@ -61,7 +61,7 @@ template <> void serializeOne( const BaseVector& vector, vector_size_t index, - ByteStream& out) { + ByteOutputStream& out) { auto row = vector.wrappedVector()->asUnchecked(); auto wrappedIndex = vector.wrappedIndex(index); const auto& type = row->type()->as(); @@ -89,7 +89,7 @@ void writeNulls( const BaseVector& values, vector_size_t offset, vector_size_t size, - ByteStream& out) { + ByteOutputStream& out) { for (auto i = 0; i < size; i += 64) { uint64_t flags = 0; auto end = i + 64 < size ? 64 : size - i; @@ -105,7 +105,7 @@ void writeNulls( void writeNulls( const BaseVector& values, folly::Range indices, - ByteStream& out) { + ByteOutputStream& out) { auto size = indices.size(); for (auto i = 0; i < size; i += 64) { uint64_t flags = 0; @@ -123,7 +123,7 @@ void serializeArray( const BaseVector& elements, vector_size_t offset, vector_size_t size, - ByteStream& out) { + ByteOutputStream& out) { out.appendOne(size); writeNulls(elements, offset, size, out); for (auto i = 0; i < size; ++i) { @@ -136,7 +136,7 @@ void serializeArray( void serializeArray( const BaseVector& elements, folly::Range indices, - ByteStream& out) { + ByteOutputStream& out) { out.appendOne(indices.size()); writeNulls(elements, indices, out); for (auto i : indices) { @@ -150,7 +150,7 @@ template <> void serializeOne( const BaseVector& source, vector_size_t index, - ByteStream& out) { + ByteOutputStream& out) { auto array = source.wrappedVector()->asUnchecked(); auto wrappedIndex = source.wrappedIndex(index); serializeArray( @@ -164,7 +164,7 @@ template <> void serializeOne( const BaseVector& vector, vector_size_t index, - ByteStream& out) { + ByteOutputStream& out) { auto map = vector.wrappedVector()->asUnchecked(); auto wrappedIndex = vector.wrappedIndex(index); auto size = map->sizeAt(wrappedIndex); @@ -177,7 +177,7 @@ void serializeOne( void serializeSwitch( const BaseVector& source, vector_size_t index, - ByteStream& stream) { + ByteOutputStream& stream) { VELOX_DYNAMIC_TYPE_DISPATCH( serializeOne, source.typeKind(), source, index, stream); } @@ -787,7 +787,7 @@ uint64_t hashSwitch(ByteInputStream& in, const Type* type) { void ContainerRowSerde::serialize( const BaseVector& source, vector_size_t index, - ByteStream& out) { + ByteOutputStream& out) { VELOX_DCHECK( !source.isNullAt(index), "Null top-level values are not supported"); serializeSwitch(source, index, out); diff --git a/velox/exec/ContainerRowSerde.h b/velox/exec/ContainerRowSerde.h index 8c97db2f3b5a..eabe51d54460 100644 --- a/velox/exec/ContainerRowSerde.h +++ b/velox/exec/ContainerRowSerde.h @@ -26,8 +26,10 @@ class ContainerRowSerde { public: /// Serializes value from source[index] into 'out'. The value must not be /// null. - static void - serialize(const BaseVector& source, vector_size_t index, ByteStream& out); + static void serialize( + const BaseVector& source, + vector_size_t index, + ByteOutputStream& out); static void deserialize(ByteInputStream& in, vector_size_t index, BaseVector* result); diff --git a/velox/exec/RowContainer.cpp b/velox/exec/RowContainer.cpp index ebbde30323b0..0146613aeaa4 100644 --- a/velox/exec/RowContainer.cpp +++ b/velox/exec/RowContainer.cpp @@ -520,7 +520,7 @@ int32_t RowContainer::storeVariableSizeAt( } } else { if (size > 0) { - ByteStream stream(stringAllocator_.get(), false, false); + ByteOutputStream stream(stringAllocator_.get(), false, false); const auto position = stringAllocator_->newWrite(stream); stream.appendStringView(std::string_view(data + 4, size)); stringAllocator_->finishWrite(stream, 0); @@ -663,7 +663,7 @@ void RowContainer::storeComplexType( return; } RowSizeTracker tracker(row[rowSizeOffset_], *stringAllocator_); - ByteStream stream(stringAllocator_.get(), false, false); + ByteOutputStream stream(stringAllocator_.get(), false, false); auto position = stringAllocator_->newWrite(stream); ContainerRowSerde::serialize(*decoded.base(), decoded.index(index), stream); stringAllocator_->finishWrite(stream, 0); @@ -671,7 +671,7 @@ void RowContainer::storeComplexType( valueAt(row, offset) = std::string_view( reinterpret_cast(position.position), stream.size()); - // TODO Fix ByteStream::size() API. @oerling is looking into that. + // TODO Fix ByteOutputStream::size() API. @oerling is looking into that. // Fix the 'size' of the std::string_view. // stream.size() is the capacity // stream.size() - stream.remainingSize() is the size of the data + size of diff --git a/velox/exec/SortedAggregations.cpp b/velox/exec/SortedAggregations.cpp index 3bd6819577a2..d7b091b108c0 100644 --- a/velox/exec/SortedAggregations.cpp +++ b/velox/exec/SortedAggregations.cpp @@ -27,7 +27,7 @@ struct RowPointers { size_t size{0}; void append(char* row, HashStringAllocator& allocator) { - ByteStream stream(&allocator); + ByteOutputStream stream(&allocator); if (firstBlock == nullptr) { // Allocate first block. currentBlock = allocator.newWrite(stream); diff --git a/velox/exec/Strings.cpp b/velox/exec/Strings.cpp index b3c2ef6aa157..c99bcb0cc7b3 100644 --- a/velox/exec/Strings.cpp +++ b/velox/exec/Strings.cpp @@ -29,7 +29,7 @@ StringView Strings::append(StringView value, HashStringAllocator& allocator) { const int32_t requiredBytes = value.size() + HashStringAllocator::Header::kContinuedPtrSize + 8; - ByteStream stream(&allocator); + ByteOutputStream stream(&allocator); if (firstBlock == nullptr) { // Allocate first block. currentBlock = allocator.newWrite(stream, requiredBytes); diff --git a/velox/exec/tests/ContainerRowSerdeTest.cpp b/velox/exec/tests/ContainerRowSerdeTest.cpp index a773618eaa95..3a6b590990da 100644 --- a/velox/exec/tests/ContainerRowSerdeTest.cpp +++ b/velox/exec/tests/ContainerRowSerdeTest.cpp @@ -29,7 +29,7 @@ class ContainerRowSerdeTest : public testing::Test, // Writes all rows together and returns a position at the start of this // combined write. HashStringAllocator::Position serialize(const VectorPtr& data) { - ByteStream out(&allocator_); + ByteOutputStream out(&allocator_); auto position = allocator_.newWrite(out); for (auto i = 0; i < data->size(); ++i) { ContainerRowSerde::serialize(*data, i, out); @@ -46,7 +46,7 @@ class ContainerRowSerdeTest : public testing::Test, positions.reserve(size); for (auto i = 0; i < size; ++i) { - ByteStream out(&allocator_); + ByteOutputStream out(&allocator_); auto position = allocator_.newWrite(out); ContainerRowSerde::serialize(*data, i, out); allocator_.finishWrite(out, 0); diff --git a/velox/functions/lib/aggregates/SingleValueAccumulator.cpp b/velox/functions/lib/aggregates/SingleValueAccumulator.cpp index 8fb1ae7052af..356eac69bada 100644 --- a/velox/functions/lib/aggregates/SingleValueAccumulator.cpp +++ b/velox/functions/lib/aggregates/SingleValueAccumulator.cpp @@ -25,7 +25,7 @@ void SingleValueAccumulator::write( const BaseVector* vector, vector_size_t index, HashStringAllocator* allocator) { - ByteStream stream(allocator); + ByteOutputStream stream(allocator); if (start_.header == nullptr) { start_ = allocator->newWrite(stream); } else { diff --git a/velox/functions/lib/aggregates/ValueSet.cpp b/velox/functions/lib/aggregates/ValueSet.cpp index e0b5f5d3747d..fbac3780d187 100644 --- a/velox/functions/lib/aggregates/ValueSet.cpp +++ b/velox/functions/lib/aggregates/ValueSet.cpp @@ -22,7 +22,7 @@ void ValueSet::write( const BaseVector& vector, vector_size_t index, HashStringAllocator::Position& position) const { - ByteStream stream(allocator_); + ByteOutputStream stream(allocator_); if (position.header == nullptr) { position = allocator_->newWrite(stream); } else { diff --git a/velox/functions/prestosql/aggregates/ValueList.cpp b/velox/functions/prestosql/aggregates/ValueList.cpp index 11f4295e5dbb..c78521201de5 100644 --- a/velox/functions/prestosql/aggregates/ValueList.cpp +++ b/velox/functions/prestosql/aggregates/ValueList.cpp @@ -39,7 +39,7 @@ void ValueList::prepareAppend(HashStringAllocator* allocator) { } void ValueList::writeLastNulls(HashStringAllocator* allocator) { - ByteStream stream(allocator); + ByteOutputStream stream(allocator); if (nullsBegin_) { allocator->extendWrite(nullsCurrent_, stream); } else { @@ -61,7 +61,7 @@ void ValueList::appendNonNull( vector_size_t index, HashStringAllocator* allocator) { prepareAppend(allocator); - ByteStream stream(allocator); + ByteOutputStream stream(allocator); allocator->extendWrite(dataCurrent_, stream); // The stream may have a tail of a previous write. const auto initialSize = stream.size(); diff --git a/velox/row/benchmark/UnsafeRowSerializeBenchmark.cpp b/velox/row/benchmark/UnsafeRowSerializeBenchmark.cpp index 369aef41025f..d2f58b7eca85 100644 --- a/velox/row/benchmark/UnsafeRowSerializeBenchmark.cpp +++ b/velox/row/benchmark/UnsafeRowSerializeBenchmark.cpp @@ -187,7 +187,7 @@ class SerializeBenchmark { HashStringAllocator::Position serialize( const RowVectorPtr& data, HashStringAllocator& allocator) { - ByteStream out(&allocator); + ByteOutputStream out(&allocator); auto position = allocator.newWrite(out); for (auto i = 0; i < data->size(); ++i) { exec::ContainerRowSerde::serialize(*data, i, out); diff --git a/velox/serializers/PrestoSerializer.cpp b/velox/serializers/PrestoSerializer.cpp index 49af228d8329..53747c6f4d4a 100644 --- a/velox/serializers/PrestoSerializer.cpp +++ b/velox/serializers/PrestoSerializer.cpp @@ -1322,9 +1322,9 @@ class VectorStream { int32_t totalLength_{0}; bool hasLengths_{false}; ByteRange header_; - ByteStream nulls_; - ByteStream lengths_; - ByteStream values_; + ByteOutputStream nulls_; + ByteOutputStream lengths_; + ByteOutputStream values_; std::vector> children_; }; diff --git a/velox/serializers/tests/UnsafeRowSerializerTest.cpp b/velox/serializers/tests/UnsafeRowSerializerTest.cpp index 379220707e07..d686a90b67d7 100644 --- a/velox/serializers/tests/UnsafeRowSerializerTest.cpp +++ b/velox/serializers/tests/UnsafeRowSerializerTest.cpp @@ -277,7 +277,8 @@ TEST_F(UnsafeRowSerializerTest, incompleteRow) { // Cut in the middle of the `size` integer. buffers = {{rawData, 2}}; VELOX_ASSERT_RUNTIME_THROW( - testDeserialize(buffers, expected), "Reading past end of ByteStream"); + testDeserialize(buffers, expected), + "Reading past end of ByteInputStream"); } TEST_F(UnsafeRowSerializerTest, types) {