Skip to content

Commit

Permalink
Make ByteStream::size() consistent across use cases (#7520)
Browse files Browse the repository at this point in the history
Summary:
ByteStream::size is expected to have the semantics of the length of std::stringstream. It is the offset of the first unwritten byte from the start. This does not move backward when seeking to mid stream and writing, for example.  been written.

Adds a test to verify proper function when randomly overwriting a ByteStream backed by HashStringAllocator multipart entries.

Pull Request resolved: #7520

Reviewed By: xiaoxmeng

Differential Revision: D51270817

Pulled By: oerling

fbshipit-source-id: 7b772852663b933b600c1d52b400a37ea9429edc
  • Loading branch information
Orri Erling authored and facebook-github-bot committed Dec 5, 2023
1 parent 7f82f3e commit ce7ce23
Show file tree
Hide file tree
Showing 12 changed files with 177 additions and 44 deletions.
13 changes: 12 additions & 1 deletion velox/common/memory/ByteStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,10 @@ void ByteStream::extend(int32_t bytes) {
ranges_.emplace_back();
current_ = &ranges_.back();
lastRangeEnd_ = 0;
arena_->newRange(newRangeSize(bytes), current_);
arena_->newRange(
newRangeSize(bytes),
ranges_.size() == 1 ? nullptr : &ranges_[ranges_.size() - 2],
current_);
allocatedBytes_ += current_->size;
VELOX_CHECK_GT(allocatedBytes_, 0);
if (isBits_) {
Expand All @@ -344,6 +347,14 @@ int32_t ByteStream::newRangeSize(int32_t bytes) const {
return bits::roundUp(bytes, memory::AllocationTraits::kPageSize);
}

ByteInputStream ByteStream::inputStream() const {
VELOX_CHECK(!ranges_.empty());
updateEnd();
auto rangeCopy = ranges_;
rangeCopy.back().size = lastRangeEnd_;
return ByteInputStream(std::move(rangeCopy));
}

std::string ByteStream::toString() const {
std::stringstream oss;
oss << "ByteStream[lastRangeEnd " << lastRangeEnd_ << ", " << ranges_.size()
Expand Down
19 changes: 14 additions & 5 deletions velox/common/memory/ByteStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -227,11 +227,15 @@ class ByteStream {

void operator=(const ByteStream& other) = delete;

void setRange(ByteRange range) {
/// Sets 'this' to range over 'range'. If this is for purposes of writing,
/// lastWrittenPosition specifies the end of any pre-existing content in
/// 'range'.
void setRange(ByteRange range, int32_t lastWrittenPosition) {
ranges_.resize(1);
ranges_[0] = range;
current_ = ranges_.data();
lastRangeEnd_ = ranges_[0].size;
VELOX_CHECK_GE(ranges_.back().size, lastWrittenPosition);
lastRangeEnd_ = lastWrittenPosition;
}

const std::vector<ByteRange>& ranges() const {
Expand All @@ -256,7 +260,7 @@ class ByteStream {
/// the last range.
size_t size() const;

int32_t lastRangeEnd() {
int32_t lastRangeEnd() const {
updateEnd();
return lastRangeEnd_;
}
Expand Down Expand Up @@ -326,6 +330,10 @@ class ByteStream {
return allocatedBytes_;
}

/// Returns a ByteInputStream to range over the current content of 'this'. The
/// result is valid as long as 'this' is live and not changed.
ByteInputStream inputStream() const;

std::string toString() const;

private:
Expand All @@ -352,7 +360,7 @@ class ByteStream {

int32_t newRangeSize(int32_t bytes) const;

void updateEnd() {
void updateEnd() const {
if (!ranges_.empty() && current_ == &ranges_.back() &&
current_->position > lastRangeEnd_) {
lastRangeEnd_ = current_->position;
Expand Down Expand Up @@ -381,7 +389,8 @@ class ByteStream {
// of 'ranges_'. In a write situation, all non-last ranges are full
// and the last may be partly full. The position in the last range
// is not necessarily the the end if there has been a seek.
int32_t lastRangeEnd_{0};
mutable int32_t lastRangeEnd_{0};

template <typename T>
friend class AppendWindow;
};
Expand Down
37 changes: 26 additions & 11 deletions velox/common/memory/HashStringAllocator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -146,10 +146,12 @@ HashStringAllocator::Position HashStringAllocator::newWrite(
"HashStringAllocator");
currentHeader_ = allocate(preferredSize, false);

stream.setRange(ByteRange{
reinterpret_cast<uint8_t*>(currentHeader_->begin()),
currentHeader_->size(),
0});
stream.setRange(
ByteRange{
reinterpret_cast<uint8_t*>(currentHeader_->begin()),
currentHeader_->size(),
0},
0);

startPosition_ = Position::atOffset(currentHeader_, 0);

Expand All @@ -171,10 +173,12 @@ void HashStringAllocator::extendWrite(Position position, ByteStream& stream) {
header->clearContinued();
}

stream.setRange(ByteRange{
reinterpret_cast<uint8_t*>(position.position),
static_cast<int32_t>(header->end() - position.position),
0});
stream.setRange(
ByteRange{
reinterpret_cast<uint8_t*>(position.header->begin()),
position.header->size(),
static_cast<int32_t>(position.position - position.header->begin())},
0);
currentHeader_ = header;
startPosition_ = position;
}
Expand Down Expand Up @@ -247,6 +251,7 @@ void HashStringAllocator::newSlab() {

void HashStringAllocator::newRange(
int32_t bytes,
ByteRange* lastRange,
ByteRange* range,
bool contiguous) {
// Allocates at least kMinContiguous or to the end of the current
Expand All @@ -263,18 +268,28 @@ void HashStringAllocator::newRange(
*lastWordPtr = newHeader;
currentHeader_->setContinued();
currentHeader_ = newHeader;
if (lastRange) {
// The last bytes of the last range are no longer payload. So do not
// count them in size and do not overwrite them if overwriting the
// multirange entry. Set position at the new end.
lastRange->size -= sizeof(void*);
lastRange->position = std::min(lastRange->size, lastRange->position);
}
*range = ByteRange{
reinterpret_cast<uint8_t*>(currentHeader_->begin()),
currentHeader_->size(),
Header::kContinuedPtrSize};
}

void HashStringAllocator::newRange(int32_t bytes, ByteRange* range) {
newRange(bytes, range, false);
void HashStringAllocator::newRange(
int32_t bytes,
ByteRange* lastRange,
ByteRange* range) {
newRange(bytes, lastRange, range, false);
}

void HashStringAllocator::newContiguousRange(int32_t bytes, ByteRange* range) {
newRange(bytes, range, true);
newRange(bytes, nullptr, range, true);
}

// static
Expand Down
21 changes: 16 additions & 5 deletions velox/common/memory/HashStringAllocator.h
Original file line number Diff line number Diff line change
Expand Up @@ -280,16 +280,23 @@ class HashStringAllocator : public StreamArena {

/// Allocates a new range for a stream writing to 'this'. Sets the last word
/// of the previous range to point to the new range and copies the overwritten
/// word as the first word of the new range.
/// word as the first word of the new range. If 'lastRange' is non-null, we
/// are continuing an existing entry and setting the last word of the
/// previous entry point to the new one. In this case, we decrement the size
/// in 'lastEntry' by the size of the continue pointer, so that the sum of the
/// sizes reflects the payload size without any overheads. Furthermore,
/// rewriting a multirange entry is safe because a write spanning multiple
/// ranges will not overwrite the next pointer.
///
/// May allocate less than 'bytes'.
void newRange(int32_t bytes, ByteRange* FOLLY_NONNULL range) override;
void newRange(int32_t bytes, ByteRange* lastRange, ByteRange* range) override;

/// Allocates a new range of at least 'bytes' size.
void newContiguousRange(int32_t bytes, ByteRange* range);

void newTinyRange(int32_t bytes, ByteRange* FOLLY_NONNULL range) override {
newRange(bytes, range);
void newTinyRange(int32_t bytes, ByteRange* lastRange, ByteRange* range)
override {
newRange(bytes, lastRange, range);
}

// Returns the total memory footprint of 'this'.
Expand Down Expand Up @@ -345,7 +352,11 @@ class HashStringAllocator : public StreamArena {
static constexpr int32_t kMinContiguous = 48;
static constexpr int32_t kNumFreeLists = kMaxAlloc - kMinAlloc + 2;

void newRange(int32_t bytes, ByteRange* range, bool contiguous);
void newRange(
int32_t bytes,
ByteRange* lastRange,
ByteRange* range,
bool contiguous);

// Adds a new standard size slab to the free list. This
// grows the footprint in MemoryAllocator but does not allocate
Expand Down
10 changes: 8 additions & 2 deletions velox/common/memory/StreamArena.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@ namespace facebook::velox {

StreamArena::StreamArena(memory::MemoryPool* pool) : pool_(pool) {}

void StreamArena::newRange(int32_t bytes, ByteRange* range) {
void StreamArena::newRange(
int32_t bytes,
ByteRange* /*lastRange*/,
ByteRange* range) {
VELOX_CHECK_GT(bytes, 0, "StreamArena::newRange can't be zero length");
const memory::MachinePageCount numPages =
memory::AllocationTraits::numPages(bytes);
Expand Down Expand Up @@ -62,7 +65,10 @@ void StreamArena::newRange(int32_t bytes, ByteRange* range) {
}
}

void StreamArena::newTinyRange(int32_t bytes, ByteRange* range) {
void StreamArena::newTinyRange(
int32_t bytes,
ByteRange* /*lastRange*/,
ByteRange* range) {
VELOX_CHECK_GT(bytes, 0, "StreamArena::newTinyRange can't be zero length");
tinyRanges_.emplace_back();
tinyRanges_.back().resize(bytes);
Expand Down
26 changes: 19 additions & 7 deletions velox/common/memory/StreamArena.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,28 @@ class StreamArena {

virtual ~StreamArena() = default;

/// Sets range to the request 'bytes' of writable memory owned by 'this'.
/// We allocate non-contiguous memory to store range bytes if requested
/// 'bytes' is equal or less than the largest class page size. Otherwise, we
/// allocate from contiguous memory.
virtual void newRange(int32_t bytes, ByteRange* range);
/// Sets range to the request 'bytes' of writable memory owned by
/// 'this'. We allocate non-contiguous memory to store range bytes
/// if requested 'bytes' is equal or less than the largest class
/// page size. Otherwise, we allocate from contiguous
/// memory. 'range' is set to point to the allocated memory. If
/// 'lastRange' is non-nullptr, it is the last range of the stream
/// to which we are adding the new range. 'lastRange' is nullptr if
/// adding the first range to a stream. The memory is stays owned by
/// 'this' in all cases. Used by HashStringAllocator when extending
/// a multipart entry. The previously last part has its last 8 bytes
/// moved to the next part and gets a pointer to the next part as
/// its last 8 bytes. When extending, we need to update the entry so
/// that the next pointer is not seen when reading the content and
/// is also not counted in the payload size of the multipart entry.
virtual void newRange(int32_t bytes, ByteRange* lastRange, ByteRange* range);

/// sets 'range' to point to a small piece of memory owned by this. These
/// always come from the heap. The use case is for headers that may change
/// length based on data properties, not for bulk data.
virtual void newTinyRange(int32_t bytes, ByteRange* range);
/// length based on data properties, not for bulk data. See 'newRange' for the
/// meaning of 'lastRange'.
virtual void
newTinyRange(int32_t bytes, ByteRange* lastRange, ByteRange* range);

/// Returns the Total size in bytes held by all Allocations.
virtual size_t size() const {
Expand Down
65 changes: 65 additions & 0 deletions velox/common/memory/tests/HashStringAllocatorTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -544,6 +544,71 @@ TEST_F(HashStringAllocatorTest, strings) {
allocator_->checkConsistency();
}

TEST_F(HashStringAllocatorTest, sizeAndPosition) {
// We make a stream consisting of multiple non-contiguous ranges
// and verify that it is writable and appendable and that its
// size() always reflects the number of written bytes, excluding
// any overheads.

// First, we make a free list to make sure things are multipart.
constexpr int32_t kUnitSize = 256;
std::vector<HashStringAllocator::Header*> pieces;
for (auto i = 0; i < 100; ++i) {
pieces.push_back(allocator_->allocate(kUnitSize + 30));
}
for (auto i = 0; i < pieces.size(); i += 2) {
allocator_->free(pieces[i]);
}

// We write each nth character of stream to be n % kunitSize.
std::string allChars;
allChars.resize(kUnitSize);
for (auto i = 0; i < kUnitSize; ++i) {
allChars[i] = i;
}

ByteStream stream(allocator_.get());
auto position = allocator_->newWrite(stream, 20);
// Nothing written yet.
EXPECT_EQ(0, stream.size());
for (auto i = 0; i < 10; ++i) {
stream.appendStringView(allChars);
// We check that the size reflects the payload size after each write.
EXPECT_EQ((i + 1) * kUnitSize, stream.size());
}
// We expect a multipart allocation.
EXPECT_TRUE(position.header->isContinued());
EXPECT_EQ(kUnitSize * 10, stream.tellp());

// we check and rewrite different offsets in the stream, not to pass past end.
for (auto start = 90; start < kUnitSize * 9; start += 125) {
stream.seekp(start);
EXPECT_EQ(start, stream.tellp());
EXPECT_EQ(kUnitSize * 10, stream.size());
ByteInputStream input = stream.inputStream();
input.seekp(start);
EXPECT_EQ(kUnitSize * 10 - start, input.remainingSize());
for (auto c = 0; c < 10; ++c) {
uint8_t byte = input.readByte();
EXPECT_EQ(byte, (start + c) % kUnitSize);
}
// Overwrite the bytes just read.
stream.seekp(start);
stream.appendStringView(std::string_view(allChars.data(), 100));
input = stream.inputStream();
input.seekp(start);
for (auto c = 0; c < 100; ++c) {
uint8_t byte = input.readByte();
EXPECT_EQ(byte, c % kUnitSize);
}
}
EXPECT_EQ(kUnitSize * 10, stream.size());
stream.seekp(kUnitSize * 10 - 100);
stream.appendStringView(allChars);
// The last write extends the size.
EXPECT_EQ(kUnitSize * 11 - 100, stream.size());
}

TEST_F(HashStringAllocatorTest, storeStringFast) {
allocator_->allocate(HashStringAllocator::kMinAlloc);
std::string s(allocator_->freeSpace() + sizeof(void*), 'x');
Expand Down
13 changes: 7 additions & 6 deletions velox/common/memory/tests/StreamArenaTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ TEST_F(StreamArenaTest, newRange) {
auto arena = newArena();
ByteRange range;
for (int i = 0; i < testData.requestRangeSizes.size(); ++i) {
arena->newRange(testData.requestRangeSizes[i], &range);
arena->newRange(testData.requestRangeSizes[i], nullptr, &range);
ASSERT_EQ(range.size, testData.expectedRangeSizes[i]) << range.toString();
ASSERT_EQ(range.position, 0);
ASSERT_TRUE(range.buffer != nullptr);
Expand Down Expand Up @@ -135,18 +135,18 @@ TEST_F(StreamArenaTest, randomRange) {
if (folly::Random::oneIn(4)) {
const int requestSize =
1 + folly::Random::rand32() % (2 * AllocationTraits::kPageSize);
arena->newTinyRange(requestSize, &range);
arena->newTinyRange(requestSize, nullptr, &range);
ASSERT_EQ(range.size, requestSize);
} else if (folly::Random::oneIn(3)) {
const int requestSize =
AllocationTraits::pageBytes(pool_->largestSizeClass()) +
(folly::Random::rand32() % (4 << 20));
arena->newRange(requestSize, &range);
arena->newRange(requestSize, nullptr, &range);
ASSERT_EQ(AllocationTraits::roundUpPageBytes(requestSize), range.size);
} else {
const int requestSize =
1 + folly::Random::rand32() % pool_->largestSizeClass();
arena->newRange(requestSize, &range);
arena->newRange(requestSize, nullptr, &range);
ASSERT_LE(range.size, AllocationTraits::roundUpPageBytes(requestSize));
}
ASSERT_EQ(range.position, 0);
Expand All @@ -158,8 +158,9 @@ TEST_F(StreamArenaTest, error) {
auto arena = newArena();
ByteRange range;
VELOX_ASSERT_THROW(
arena->newTinyRange(0, &range),
arena->newTinyRange(0, nullptr, &range),
"StreamArena::newTinyRange can't be zero length");
VELOX_ASSERT_THROW(
arena->newRange(0, &range), "StreamArena::newRange can't be zero length");
arena->newRange(0, nullptr, &range),
"StreamArena::newRange can't be zero length");
}
5 changes: 3 additions & 2 deletions velox/exec/Strings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,13 @@ StringView Strings::append(StringView value, HashStringAllocator& allocator) {
}

// Check if there is enough space left.
if (stream.ranges().back().size < requiredBytes) {
auto& currentRange = stream.ranges().back();
if (currentRange.size - currentRange.position < requiredBytes) {
// Not enough space. Allocate new block.
ByteRange newRange;
allocator.newContiguousRange(requiredBytes, &newRange);

stream.setRange(newRange);
stream.setRange(newRange, 0);
}

VELOX_DCHECK_LE(requiredBytes, stream.ranges().back().size);
Expand Down
Loading

0 comments on commit ce7ce23

Please sign in to comment.