From ce7ce2327665396b01f6e015d475f30c9611b9c7 Mon Sep 17 00:00:00 2001 From: Orri Erling Date: Tue, 5 Dec 2023 10:34:56 -0800 Subject: [PATCH] Make ByteStream::size() consistent across use cases (#7520) Summary: ByteStream::size is expected to have the semantics of the length of std::stringstream. It is the offset of the first unwritten byte from the start. This does not move backward when seeking to mid stream and writing, for example. been written. Adds a test to verify proper function when randomly overwriting a ByteStream backed by HashStringAllocator multipart entries. Pull Request resolved: https://github.com/facebookincubator/velox/pull/7520 Reviewed By: xiaoxmeng Differential Revision: D51270817 Pulled By: oerling fbshipit-source-id: 7b772852663b933b600c1d52b400a37ea9429edc --- velox/common/memory/ByteStream.cpp | 13 +++- velox/common/memory/ByteStream.h | 19 ++++-- velox/common/memory/HashStringAllocator.cpp | 37 +++++++---- velox/common/memory/HashStringAllocator.h | 21 ++++-- velox/common/memory/StreamArena.cpp | 10 ++- velox/common/memory/StreamArena.h | 26 ++++++-- .../memory/tests/HashStringAllocatorTest.cpp | 65 +++++++++++++++++++ velox/common/memory/tests/StreamArenaTest.cpp | 13 ++-- velox/exec/Strings.cpp | 5 +- .../prestosql/aggregates/ValueList.cpp | 8 ++- .../aggregates/tests/ValueListTest.cpp | 2 +- velox/serializers/PrestoSerializer.cpp | 2 +- 12 files changed, 177 insertions(+), 44 deletions(-) diff --git a/velox/common/memory/ByteStream.cpp b/velox/common/memory/ByteStream.cpp index cf7aefba8c43..99055d25e4e6 100644 --- a/velox/common/memory/ByteStream.cpp +++ b/velox/common/memory/ByteStream.cpp @@ -321,7 +321,10 @@ void ByteStream::extend(int32_t bytes) { ranges_.emplace_back(); current_ = &ranges_.back(); lastRangeEnd_ = 0; - arena_->newRange(newRangeSize(bytes), current_); + arena_->newRange( + newRangeSize(bytes), + ranges_.size() == 1 ? nullptr : &ranges_[ranges_.size() - 2], + current_); allocatedBytes_ += current_->size; VELOX_CHECK_GT(allocatedBytes_, 0); if (isBits_) { @@ -344,6 +347,14 @@ int32_t ByteStream::newRangeSize(int32_t bytes) const { return bits::roundUp(bytes, memory::AllocationTraits::kPageSize); } +ByteInputStream ByteStream::inputStream() const { + VELOX_CHECK(!ranges_.empty()); + updateEnd(); + auto rangeCopy = ranges_; + rangeCopy.back().size = lastRangeEnd_; + return ByteInputStream(std::move(rangeCopy)); +} + std::string ByteStream::toString() const { std::stringstream oss; oss << "ByteStream[lastRangeEnd " << lastRangeEnd_ << ", " << ranges_.size() diff --git a/velox/common/memory/ByteStream.h b/velox/common/memory/ByteStream.h index 53cc532186e5..2e36a202a7ec 100644 --- a/velox/common/memory/ByteStream.h +++ b/velox/common/memory/ByteStream.h @@ -227,11 +227,15 @@ class ByteStream { void operator=(const ByteStream& other) = delete; - void setRange(ByteRange range) { + /// Sets 'this' to range over 'range'. If this is for purposes of writing, + /// lastWrittenPosition specifies the end of any pre-existing content in + /// 'range'. + void setRange(ByteRange range, int32_t lastWrittenPosition) { ranges_.resize(1); ranges_[0] = range; current_ = ranges_.data(); - lastRangeEnd_ = ranges_[0].size; + VELOX_CHECK_GE(ranges_.back().size, lastWrittenPosition); + lastRangeEnd_ = lastWrittenPosition; } const std::vector& ranges() const { @@ -256,7 +260,7 @@ class ByteStream { /// the last range. size_t size() const; - int32_t lastRangeEnd() { + int32_t lastRangeEnd() const { updateEnd(); return lastRangeEnd_; } @@ -326,6 +330,10 @@ class ByteStream { return allocatedBytes_; } + /// Returns a ByteInputStream to range over the current content of 'this'. The + /// result is valid as long as 'this' is live and not changed. + ByteInputStream inputStream() const; + std::string toString() const; private: @@ -352,7 +360,7 @@ class ByteStream { int32_t newRangeSize(int32_t bytes) const; - void updateEnd() { + void updateEnd() const { if (!ranges_.empty() && current_ == &ranges_.back() && current_->position > lastRangeEnd_) { lastRangeEnd_ = current_->position; @@ -381,7 +389,8 @@ class ByteStream { // of 'ranges_'. In a write situation, all non-last ranges are full // and the last may be partly full. The position in the last range // is not necessarily the the end if there has been a seek. - int32_t lastRangeEnd_{0}; + mutable int32_t lastRangeEnd_{0}; + template friend class AppendWindow; }; diff --git a/velox/common/memory/HashStringAllocator.cpp b/velox/common/memory/HashStringAllocator.cpp index b46cf26dfcc6..4046882974a1 100644 --- a/velox/common/memory/HashStringAllocator.cpp +++ b/velox/common/memory/HashStringAllocator.cpp @@ -146,10 +146,12 @@ HashStringAllocator::Position HashStringAllocator::newWrite( "HashStringAllocator"); currentHeader_ = allocate(preferredSize, false); - stream.setRange(ByteRange{ - reinterpret_cast(currentHeader_->begin()), - currentHeader_->size(), - 0}); + stream.setRange( + ByteRange{ + reinterpret_cast(currentHeader_->begin()), + currentHeader_->size(), + 0}, + 0); startPosition_ = Position::atOffset(currentHeader_, 0); @@ -171,10 +173,12 @@ void HashStringAllocator::extendWrite(Position position, ByteStream& stream) { header->clearContinued(); } - stream.setRange(ByteRange{ - reinterpret_cast(position.position), - static_cast(header->end() - position.position), - 0}); + stream.setRange( + ByteRange{ + reinterpret_cast(position.header->begin()), + position.header->size(), + static_cast(position.position - position.header->begin())}, + 0); currentHeader_ = header; startPosition_ = position; } @@ -247,6 +251,7 @@ void HashStringAllocator::newSlab() { void HashStringAllocator::newRange( int32_t bytes, + ByteRange* lastRange, ByteRange* range, bool contiguous) { // Allocates at least kMinContiguous or to the end of the current @@ -263,18 +268,28 @@ void HashStringAllocator::newRange( *lastWordPtr = newHeader; currentHeader_->setContinued(); currentHeader_ = newHeader; + if (lastRange) { + // The last bytes of the last range are no longer payload. So do not + // count them in size and do not overwrite them if overwriting the + // multirange entry. Set position at the new end. + lastRange->size -= sizeof(void*); + lastRange->position = std::min(lastRange->size, lastRange->position); + } *range = ByteRange{ reinterpret_cast(currentHeader_->begin()), currentHeader_->size(), Header::kContinuedPtrSize}; } -void HashStringAllocator::newRange(int32_t bytes, ByteRange* range) { - newRange(bytes, range, false); +void HashStringAllocator::newRange( + int32_t bytes, + ByteRange* lastRange, + ByteRange* range) { + newRange(bytes, lastRange, range, false); } void HashStringAllocator::newContiguousRange(int32_t bytes, ByteRange* range) { - newRange(bytes, range, true); + newRange(bytes, nullptr, range, true); } // static diff --git a/velox/common/memory/HashStringAllocator.h b/velox/common/memory/HashStringAllocator.h index 5a40faca5603..b5645f2b2632 100644 --- a/velox/common/memory/HashStringAllocator.h +++ b/velox/common/memory/HashStringAllocator.h @@ -280,16 +280,23 @@ class HashStringAllocator : public StreamArena { /// Allocates a new range for a stream writing to 'this'. Sets the last word /// of the previous range to point to the new range and copies the overwritten - /// word as the first word of the new range. + /// word as the first word of the new range. If 'lastRange' is non-null, we + /// are continuing an existing entry and setting the last word of the + /// previous entry point to the new one. In this case, we decrement the size + /// in 'lastEntry' by the size of the continue pointer, so that the sum of the + /// sizes reflects the payload size without any overheads. Furthermore, + /// rewriting a multirange entry is safe because a write spanning multiple + /// ranges will not overwrite the next pointer. /// /// May allocate less than 'bytes'. - void newRange(int32_t bytes, ByteRange* FOLLY_NONNULL range) override; + void newRange(int32_t bytes, ByteRange* lastRange, ByteRange* range) override; /// Allocates a new range of at least 'bytes' size. void newContiguousRange(int32_t bytes, ByteRange* range); - void newTinyRange(int32_t bytes, ByteRange* FOLLY_NONNULL range) override { - newRange(bytes, range); + void newTinyRange(int32_t bytes, ByteRange* lastRange, ByteRange* range) + override { + newRange(bytes, lastRange, range); } // Returns the total memory footprint of 'this'. @@ -345,7 +352,11 @@ class HashStringAllocator : public StreamArena { static constexpr int32_t kMinContiguous = 48; static constexpr int32_t kNumFreeLists = kMaxAlloc - kMinAlloc + 2; - void newRange(int32_t bytes, ByteRange* range, bool contiguous); + void newRange( + int32_t bytes, + ByteRange* lastRange, + ByteRange* range, + bool contiguous); // Adds a new standard size slab to the free list. This // grows the footprint in MemoryAllocator but does not allocate diff --git a/velox/common/memory/StreamArena.cpp b/velox/common/memory/StreamArena.cpp index 14e5dbc37c11..e8778750d2fc 100644 --- a/velox/common/memory/StreamArena.cpp +++ b/velox/common/memory/StreamArena.cpp @@ -20,7 +20,10 @@ namespace facebook::velox { StreamArena::StreamArena(memory::MemoryPool* pool) : pool_(pool) {} -void StreamArena::newRange(int32_t bytes, ByteRange* range) { +void StreamArena::newRange( + int32_t bytes, + ByteRange* /*lastRange*/, + ByteRange* range) { VELOX_CHECK_GT(bytes, 0, "StreamArena::newRange can't be zero length"); const memory::MachinePageCount numPages = memory::AllocationTraits::numPages(bytes); @@ -62,7 +65,10 @@ void StreamArena::newRange(int32_t bytes, ByteRange* range) { } } -void StreamArena::newTinyRange(int32_t bytes, ByteRange* range) { +void StreamArena::newTinyRange( + int32_t bytes, + ByteRange* /*lastRange*/, + ByteRange* range) { VELOX_CHECK_GT(bytes, 0, "StreamArena::newTinyRange can't be zero length"); tinyRanges_.emplace_back(); tinyRanges_.back().resize(bytes); diff --git a/velox/common/memory/StreamArena.h b/velox/common/memory/StreamArena.h index a9e9f71e1954..b0a0b9498ec9 100644 --- a/velox/common/memory/StreamArena.h +++ b/velox/common/memory/StreamArena.h @@ -30,16 +30,28 @@ class StreamArena { virtual ~StreamArena() = default; - /// Sets range to the request 'bytes' of writable memory owned by 'this'. - /// We allocate non-contiguous memory to store range bytes if requested - /// 'bytes' is equal or less than the largest class page size. Otherwise, we - /// allocate from contiguous memory. - virtual void newRange(int32_t bytes, ByteRange* range); + /// Sets range to the request 'bytes' of writable memory owned by + /// 'this'. We allocate non-contiguous memory to store range bytes + /// if requested 'bytes' is equal or less than the largest class + /// page size. Otherwise, we allocate from contiguous + /// memory. 'range' is set to point to the allocated memory. If + /// 'lastRange' is non-nullptr, it is the last range of the stream + /// to which we are adding the new range. 'lastRange' is nullptr if + /// adding the first range to a stream. The memory is stays owned by + /// 'this' in all cases. Used by HashStringAllocator when extending + /// a multipart entry. The previously last part has its last 8 bytes + /// moved to the next part and gets a pointer to the next part as + /// its last 8 bytes. When extending, we need to update the entry so + /// that the next pointer is not seen when reading the content and + /// is also not counted in the payload size of the multipart entry. + virtual void newRange(int32_t bytes, ByteRange* lastRange, ByteRange* range); /// sets 'range' to point to a small piece of memory owned by this. These /// always come from the heap. The use case is for headers that may change - /// length based on data properties, not for bulk data. - virtual void newTinyRange(int32_t bytes, ByteRange* range); + /// length based on data properties, not for bulk data. See 'newRange' for the + /// meaning of 'lastRange'. + virtual void + newTinyRange(int32_t bytes, ByteRange* lastRange, ByteRange* range); /// Returns the Total size in bytes held by all Allocations. virtual size_t size() const { diff --git a/velox/common/memory/tests/HashStringAllocatorTest.cpp b/velox/common/memory/tests/HashStringAllocatorTest.cpp index c55e1862d485..6b766ee5cb98 100644 --- a/velox/common/memory/tests/HashStringAllocatorTest.cpp +++ b/velox/common/memory/tests/HashStringAllocatorTest.cpp @@ -544,6 +544,71 @@ TEST_F(HashStringAllocatorTest, strings) { allocator_->checkConsistency(); } +TEST_F(HashStringAllocatorTest, sizeAndPosition) { + // We make a stream consisting of multiple non-contiguous ranges + // and verify that it is writable and appendable and that its + // size() always reflects the number of written bytes, excluding + // any overheads. + + // First, we make a free list to make sure things are multipart. + constexpr int32_t kUnitSize = 256; + std::vector pieces; + for (auto i = 0; i < 100; ++i) { + pieces.push_back(allocator_->allocate(kUnitSize + 30)); + } + for (auto i = 0; i < pieces.size(); i += 2) { + allocator_->free(pieces[i]); + } + + // We write each nth character of stream to be n % kunitSize. + std::string allChars; + allChars.resize(kUnitSize); + for (auto i = 0; i < kUnitSize; ++i) { + allChars[i] = i; + } + + ByteStream stream(allocator_.get()); + auto position = allocator_->newWrite(stream, 20); + // Nothing written yet. + EXPECT_EQ(0, stream.size()); + for (auto i = 0; i < 10; ++i) { + stream.appendStringView(allChars); + // We check that the size reflects the payload size after each write. + EXPECT_EQ((i + 1) * kUnitSize, stream.size()); + } + // We expect a multipart allocation. + EXPECT_TRUE(position.header->isContinued()); + EXPECT_EQ(kUnitSize * 10, stream.tellp()); + + // we check and rewrite different offsets in the stream, not to pass past end. + for (auto start = 90; start < kUnitSize * 9; start += 125) { + stream.seekp(start); + EXPECT_EQ(start, stream.tellp()); + EXPECT_EQ(kUnitSize * 10, stream.size()); + ByteInputStream input = stream.inputStream(); + input.seekp(start); + EXPECT_EQ(kUnitSize * 10 - start, input.remainingSize()); + for (auto c = 0; c < 10; ++c) { + uint8_t byte = input.readByte(); + EXPECT_EQ(byte, (start + c) % kUnitSize); + } + // Overwrite the bytes just read. + stream.seekp(start); + stream.appendStringView(std::string_view(allChars.data(), 100)); + input = stream.inputStream(); + input.seekp(start); + for (auto c = 0; c < 100; ++c) { + uint8_t byte = input.readByte(); + EXPECT_EQ(byte, c % kUnitSize); + } + } + EXPECT_EQ(kUnitSize * 10, stream.size()); + stream.seekp(kUnitSize * 10 - 100); + stream.appendStringView(allChars); + // The last write extends the size. + EXPECT_EQ(kUnitSize * 11 - 100, stream.size()); +} + TEST_F(HashStringAllocatorTest, storeStringFast) { allocator_->allocate(HashStringAllocator::kMinAlloc); std::string s(allocator_->freeSpace() + sizeof(void*), 'x'); diff --git a/velox/common/memory/tests/StreamArenaTest.cpp b/velox/common/memory/tests/StreamArenaTest.cpp index 7d4ee844b862..70a31dc0970c 100644 --- a/velox/common/memory/tests/StreamArenaTest.cpp +++ b/velox/common/memory/tests/StreamArenaTest.cpp @@ -107,7 +107,7 @@ TEST_F(StreamArenaTest, newRange) { auto arena = newArena(); ByteRange range; for (int i = 0; i < testData.requestRangeSizes.size(); ++i) { - arena->newRange(testData.requestRangeSizes[i], &range); + arena->newRange(testData.requestRangeSizes[i], nullptr, &range); ASSERT_EQ(range.size, testData.expectedRangeSizes[i]) << range.toString(); ASSERT_EQ(range.position, 0); ASSERT_TRUE(range.buffer != nullptr); @@ -135,18 +135,18 @@ TEST_F(StreamArenaTest, randomRange) { if (folly::Random::oneIn(4)) { const int requestSize = 1 + folly::Random::rand32() % (2 * AllocationTraits::kPageSize); - arena->newTinyRange(requestSize, &range); + arena->newTinyRange(requestSize, nullptr, &range); ASSERT_EQ(range.size, requestSize); } else if (folly::Random::oneIn(3)) { const int requestSize = AllocationTraits::pageBytes(pool_->largestSizeClass()) + (folly::Random::rand32() % (4 << 20)); - arena->newRange(requestSize, &range); + arena->newRange(requestSize, nullptr, &range); ASSERT_EQ(AllocationTraits::roundUpPageBytes(requestSize), range.size); } else { const int requestSize = 1 + folly::Random::rand32() % pool_->largestSizeClass(); - arena->newRange(requestSize, &range); + arena->newRange(requestSize, nullptr, &range); ASSERT_LE(range.size, AllocationTraits::roundUpPageBytes(requestSize)); } ASSERT_EQ(range.position, 0); @@ -158,8 +158,9 @@ TEST_F(StreamArenaTest, error) { auto arena = newArena(); ByteRange range; VELOX_ASSERT_THROW( - arena->newTinyRange(0, &range), + arena->newTinyRange(0, nullptr, &range), "StreamArena::newTinyRange can't be zero length"); VELOX_ASSERT_THROW( - arena->newRange(0, &range), "StreamArena::newRange can't be zero length"); + arena->newRange(0, nullptr, &range), + "StreamArena::newRange can't be zero length"); } diff --git a/velox/exec/Strings.cpp b/velox/exec/Strings.cpp index b923685d053a..b3c2ef6aa157 100644 --- a/velox/exec/Strings.cpp +++ b/velox/exec/Strings.cpp @@ -39,12 +39,13 @@ StringView Strings::append(StringView value, HashStringAllocator& allocator) { } // Check if there is enough space left. - if (stream.ranges().back().size < requiredBytes) { + auto& currentRange = stream.ranges().back(); + if (currentRange.size - currentRange.position < requiredBytes) { // Not enough space. Allocate new block. ByteRange newRange; allocator.newContiguousRange(requiredBytes, &newRange); - stream.setRange(newRange); + stream.setRange(newRange, 0); } VELOX_DCHECK_LE(requiredBytes, stream.ranges().back().size); diff --git a/velox/functions/prestosql/aggregates/ValueList.cpp b/velox/functions/prestosql/aggregates/ValueList.cpp index 9187bce3642d..11f4295e5dbb 100644 --- a/velox/functions/prestosql/aggregates/ValueList.cpp +++ b/velox/functions/prestosql/aggregates/ValueList.cpp @@ -63,14 +63,16 @@ void ValueList::appendNonNull( prepareAppend(allocator); ByteStream stream(allocator); allocator->extendWrite(dataCurrent_, stream); + // The stream may have a tail of a previous write. + const auto initialSize = stream.size(); exec::ContainerRowSerde::serialize(values, index, stream); ++size_; - bytes_ += stream.size(); + bytes_ += stream.size() - initialSize; - // Leave space up to the size appended so far, at least 24 but no more + // Leave space up to half the size appended so far, at least 24 but no more // than 1024. dataCurrent_ = - allocator->finishWrite(stream, std::clamp(bytes_, 24, 1024)).second; + allocator->finishWrite(stream, std::clamp(bytes_ / 2, 24, 1024)).second; } void ValueList::appendValue( diff --git a/velox/functions/prestosql/aggregates/tests/ValueListTest.cpp b/velox/functions/prestosql/aggregates/tests/ValueListTest.cpp index 73bfce87df06..cb67a579b3f9 100644 --- a/velox/functions/prestosql/aggregates/tests/ValueListTest.cpp +++ b/velox/functions/prestosql/aggregates/tests/ValueListTest.cpp @@ -112,7 +112,7 @@ TEST_F(ValueListTest, integers) { TEST_F(ValueListTest, arrays) { // No nulls. - int32_t kSizeCaps[] = {500, 4000, 6000, 50000}; + int32_t kSizeCaps[] = {730, 4000, 7500, 50000}; int32_t counter = 0; for (auto size : kTestSizes) { auto data = makeArrayVector( diff --git a/velox/serializers/PrestoSerializer.cpp b/velox/serializers/PrestoSerializer.cpp index e9875bd6905f..49af228d8329 100644 --- a/velox/serializers/PrestoSerializer.cpp +++ b/velox/serializers/PrestoSerializer.cpp @@ -1169,7 +1169,7 @@ class VectorStream { } void initializeHeader(std::string_view name, StreamArena& streamArena) { - streamArena.newTinyRange(50, &header_); + streamArena.newTinyRange(50, nullptr, &header_); header_.size = name.size() + sizeof(int32_t); *reinterpret_cast(header_.buffer) = name.size(); memcpy(header_.buffer + sizeof(int32_t), &name[0], name.size());