Skip to content

Commit

Permalink
Rename ByteStream as ByteOutputStream (#7867)
Browse files Browse the repository at this point in the history
Summary:
ByteStream class was used both for reading and writing to a multi-part buffer.
Mixing reading and writing logic and APIs in a single class is confusing.
#7541 has been landed to extract reading logic and APIs into ByteInputStream
class. This change is to rename ByteStream as ByteOutputStream.

Pull Request resolved: #7867

Reviewed By: xiaoxmeng

Differential Revision: D51829923

Pulled By: zacw7

fbshipit-source-id: 543a124062738a1e72767d33d223c5314a2cff59
  • Loading branch information
zacw7 authored and facebook-github-bot committed Dec 6, 2023
1 parent 075cc92 commit 25b75f5
Show file tree
Hide file tree
Showing 20 changed files with 107 additions and 95 deletions.
45 changes: 24 additions & 21 deletions velox/common/memory/ByteStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ std::streampos ByteInputStream::tellp() const {
}
size += range.size;
}
VELOX_FAIL("ByteStream 'current_' is not in 'ranges_'.");
VELOX_FAIL("ByteInputStream 'current_' is not in 'ranges_'.");
}

void ByteInputStream::seekp(std::streampos position) {
Expand All @@ -106,7 +106,7 @@ void ByteInputStream::next(bool throwIfPastEnd) {
VELOX_CHECK_LT(position, ranges_.size());
if (position == ranges_.size() - 1) {
if (throwIfPastEnd) {
VELOX_FAIL("Reading past end of ByteStream");
VELOX_FAIL("Reading past end of ByteInputStream");
}
return;
}
Expand Down Expand Up @@ -167,7 +167,7 @@ void ByteInputStream::skip(int32_t size) {
}
}

size_t ByteStream::size() const {
size_t ByteOutputStream::size() const {
if (ranges_.empty()) {
return 0;
}
Expand All @@ -178,7 +178,7 @@ size_t ByteStream::size() const {
return total + std::max(ranges_.back().position, lastRangeEnd_);
}

void ByteStream::appendBool(bool value, int32_t count) {
void ByteOutputStream::appendBool(bool value, int32_t count) {
if (count == 1 && current_->size > current_->position) {
bits::setBit(
reinterpret_cast<uint64_t*>(current_->buffer),
Expand Down Expand Up @@ -206,7 +206,10 @@ void ByteStream::appendBool(bool value, int32_t count) {
}
}

void ByteStream::appendBits(const uint64_t* bits, int32_t begin, int32_t end) {
void ByteOutputStream::appendBits(
const uint64_t* bits,
int32_t begin,
int32_t end) {
VELOX_DCHECK(isBits_);
int32_t count = end - begin;
int32_t offset = 0;
Expand All @@ -229,11 +232,11 @@ void ByteStream::appendBits(const uint64_t* bits, int32_t begin, int32_t end) {
}
}

void ByteStream::appendStringView(StringView value) {
void ByteOutputStream::appendStringView(StringView value) {
appendStringView((std::string_view)value);
}

void ByteStream::appendStringView(std::string_view value) {
void ByteOutputStream::appendStringView(std::string_view value) {
const int32_t bytes = value.size();
int32_t offset = 0;
for (;;) {
Expand All @@ -250,7 +253,7 @@ void ByteStream::appendStringView(std::string_view value) {
}
}

std::streampos ByteStream::tellp() const {
std::streampos ByteOutputStream::tellp() const {
if (ranges_.empty()) {
return 0;
}
Expand All @@ -262,10 +265,10 @@ std::streampos ByteStream::tellp() const {
}
size += range.size;
}
VELOX_FAIL("ByteStream 'current_' is not in 'ranges_'.");
VELOX_FAIL("ByteOutputStream 'current_' is not in 'ranges_'.");
}

void ByteStream::seekp(std::streampos position) {
void ByteOutputStream::seekp(std::streampos position) {
int64_t toSkip = position;
// Record how much was written pre-seek.
updateEnd();
Expand All @@ -280,10 +283,10 @@ void ByteStream::seekp(std::streampos position) {
}
toSkip -= range.size;
}
VELOX_FAIL("Seeking past end of ByteStream: {}", position);
VELOX_FAIL("Seeking past end of ByteOutputStream: {}", position);
}

void ByteStream::flush(OutputStream* out) {
void ByteOutputStream::flush(OutputStream* out) {
updateEnd();
for (int32_t i = 0; i < ranges_.size(); ++i) {
int32_t count = i == ranges_.size() - 1 ? lastRangeEnd_ : ranges_[i].size;
Expand All @@ -298,17 +301,17 @@ void ByteStream::flush(OutputStream* out) {
}
}

char* ByteStream::writePosition() {
char* ByteOutputStream::writePosition() {
if (ranges_.empty()) {
return nullptr;
}
return reinterpret_cast<char*>(current_->buffer) + current_->position;
}

void ByteStream::extend(int32_t bytes) {
void ByteOutputStream::extend(int32_t bytes) {
if (current_ && current_->position != current_->size) {
LOG(FATAL) << "Extend ByteStream before range full: " << current_->position
<< " vs. " << current_->size;
LOG(FATAL) << "Extend ByteOutputStream before range full: "
<< current_->position << " vs. " << current_->size;
}

// Check if rewriting existing content. If so, move to next range and start at
Expand All @@ -333,7 +336,7 @@ void ByteStream::extend(int32_t bytes) {
}
}

int32_t ByteStream::newRangeSize(int32_t bytes) const {
int32_t ByteOutputStream::newRangeSize(int32_t bytes) const {
const int32_t newSize = allocatedBytes_ + bytes;
if (newSize < 128) {
return 128;
Expand All @@ -347,18 +350,18 @@ int32_t ByteStream::newRangeSize(int32_t bytes) const {
return bits::roundUp(bytes, memory::AllocationTraits::kPageSize);
}

ByteInputStream ByteStream::inputStream() const {
ByteInputStream ByteOutputStream::inputStream() const {
VELOX_CHECK(!ranges_.empty());
updateEnd();
auto rangeCopy = ranges_;
rangeCopy.back().size = lastRangeEnd_;
return ByteInputStream(std::move(rangeCopy));
}

std::string ByteStream::toString() const {
std::string ByteOutputStream::toString() const {
std::stringstream oss;
oss << "ByteStream[lastRangeEnd " << lastRangeEnd_ << ", " << ranges_.size()
<< " ranges (position/size) [";
oss << "ByteOutputStream[lastRangeEnd " << lastRangeEnd_ << ", "
<< ranges_.size() << " ranges (position/size) [";
for (const auto& range : ranges_) {
oss << "(" << range.position << "/" << range.size
<< (&range == current_ ? " current" : "") << ")";
Expand Down
16 changes: 8 additions & 8 deletions velox/common/memory/ByteStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -214,18 +214,18 @@ class ByteInputStream {
/// in hash tables. The stream is seekable and supports overwriting of
/// previous content, for example, writing a message body and then
/// seeking back to start to write a length header.
class ByteStream {
class ByteOutputStream {
public:
/// For output.
ByteStream(
ByteOutputStream(
StreamArena* arena,
bool isBits = false,
bool isReverseBitOrder = false)
: arena_(arena), isBits_(isBits), isReverseBitOrder_(isReverseBitOrder) {}

ByteStream(const ByteStream& other) = delete;
ByteOutputStream(const ByteOutputStream& other) = delete;

void operator=(const ByteStream& other) = delete;
void operator=(const ByteOutputStream& other) = delete;

/// Sets 'this' to range over 'range'. If this is for purposes of writing,
/// lastWrittenPosition specifies the end of any pre-existing content in
Expand Down Expand Up @@ -401,7 +401,7 @@ class ByteStream {
template <typename T>
class AppendWindow {
public:
AppendWindow(ByteStream& stream, Scratch& scratch)
AppendWindow(ByteOutputStream& stream, Scratch& scratch)
: stream_(stream), scratchPtr_(scratch) {}

~AppendWindow() {
Expand All @@ -417,7 +417,7 @@ class AppendWindow {
}

private:
ByteStream& stream_;
ByteOutputStream& stream_;
ScratchPtr<T> scratchPtr_;
};

Expand All @@ -443,7 +443,7 @@ class IOBufOutputStream : public OutputStream {
int32_t initialSize = memory::AllocationTraits::kPageSize)
: OutputStream(listener),
arena_(std::make_shared<StreamArena>(&pool)),
out_(std::make_unique<ByteStream>(arena_.get())) {
out_(std::make_unique<ByteOutputStream>(arena_.get())) {
out_->startWrite(initialSize);
}

Expand All @@ -464,7 +464,7 @@ class IOBufOutputStream : public OutputStream {

private:
std::shared_ptr<StreamArena> arena_;
std::unique_ptr<ByteStream> out_;
std::unique_ptr<ByteOutputStream> out_;
};

} // namespace facebook::velox
14 changes: 9 additions & 5 deletions velox/common/memory/HashStringAllocator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ ByteInputStream HashStringAllocator::prepareRead(const Header* begin) {
}

HashStringAllocator::Position HashStringAllocator::newWrite(
ByteStream& stream,
ByteOutputStream& stream,
int32_t preferredSize) {
VELOX_CHECK(
!currentHeader_,
Expand All @@ -158,7 +158,9 @@ HashStringAllocator::Position HashStringAllocator::newWrite(
return startPosition_;
}

void HashStringAllocator::extendWrite(Position position, ByteStream& stream) {
void HashStringAllocator::extendWrite(
Position position,
ByteOutputStream& stream) {
auto header = position.header;
const auto offset = position.offset();
VELOX_CHECK_GE(
Expand All @@ -184,7 +186,9 @@ void HashStringAllocator::extendWrite(Position position, ByteStream& stream) {
}

std::pair<HashStringAllocator::Position, HashStringAllocator::Position>
HashStringAllocator::finishWrite(ByteStream& stream, int32_t numReserveBytes) {
HashStringAllocator::finishWrite(
ByteOutputStream& stream,
int32_t numReserveBytes) {
VELOX_CHECK(
currentHeader_, "Must call newWrite or extendWrite before finishWrite");
auto writePosition = stream.writePosition();
Expand Down Expand Up @@ -521,7 +525,7 @@ void HashStringAllocator::ensureAvailable(int32_t bytes, Position& position) {
return;
}

ByteStream stream(this);
ByteOutputStream stream(this);
extendWrite(position, stream);
static char data[128];
while (bytes) {
Expand Down Expand Up @@ -593,7 +597,7 @@ void HashStringAllocator::copyMultipartNoInline(
return;
}
// Write the string as non-contiguous chunks.
ByteStream stream(this, false, false);
ByteOutputStream stream(this, false, false);
auto position = newWrite(stream, numBytes);
stream.appendStringView(*string);
finishWrite(stream, 0);
Expand Down
12 changes: 7 additions & 5 deletions velox/common/memory/HashStringAllocator.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@
namespace facebook::velox {

// Implements an arena backed by MappedMemory::Allocation. This is for backing
// ByteStream or for allocating single blocks. Blocks can be individually freed.
// Adjacent frees are coalesced and free blocks are kept in a free list.
// ByteOutputStream or for allocating single blocks. Blocks can be individually
// freed. Adjacent frees are coalesced and free blocks are kept in a free list.
// Allocated blocks are prefixed with a Header. This has a size and flags.
// kContinue means that last 8 bytes are a pointer to another Header after which
// the contents of this allocation continue. kFree means the block is free. A
Expand Down Expand Up @@ -263,19 +263,21 @@ class HashStringAllocator : public StreamArena {
// kMinContiguous bytes of contiguous space. finishWrite finalizes
// the allocation information after the write is done.
// Returns the position at the start of the allocated block.
Position newWrite(ByteStream& stream, int32_t preferredSize = kMinContiguous);
Position newWrite(
ByteOutputStream& stream,
int32_t preferredSize = kMinContiguous);

// Sets 'stream' to write starting at 'position'. If new ranges have to
// be allocated when writing, headers will be updated accordingly.
void extendWrite(Position position, ByteStream& stream);
void extendWrite(Position position, ByteOutputStream& stream);

// Completes a write prepared with newWrite or
// extendWrite. Up to 'numReserveBytes' unused bytes, if available, are left
// after the end of the write to accommodate another write. Returns a pair of
// positions: (1) position at the start of this 'write', (2) position
// immediately after the last written byte.
std::pair<Position, Position> finishWrite(
ByteStream& stream,
ByteOutputStream& stream,
int32_t numReserveBytes);

/// Allocates a new range for a stream writing to 'this'. Sets the last word
Expand Down
10 changes: 5 additions & 5 deletions velox/common/memory/tests/ByteStreamTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ TEST_F(ByteStreamTest, newRangeAllocation) {

const auto prevAllocCount = pool_->stats().numAllocs;
auto arena = newArena();
ByteStream byteStream(arena.get());
ByteOutputStream byteStream(arena.get());
byteStream.startWrite(0);
for (int i = 0; i < testData.newRangeSizes.size(); ++i) {
const auto newRangeSize = testData.newRangeSizes[i];
Expand All @@ -260,9 +260,9 @@ TEST_F(ByteStreamTest, newRangeAllocation) {
TEST_F(ByteStreamTest, randomRangeAllocationFromMultiStreamsTest) {
auto arena = newArena();
const int numByteStreams = 10;
std::vector<std::unique_ptr<ByteStream>> byteStreams;
std::vector<std::unique_ptr<ByteOutputStream>> byteStreams;
for (int i = 0; i < numByteStreams; ++i) {
byteStreams.push_back(std::make_unique<ByteStream>(arena.get()));
byteStreams.push_back(std::make_unique<ByteOutputStream>(arena.get()));
byteStreams.back()->startWrite(0);
}
const int testIterations = 1000;
Expand Down Expand Up @@ -293,7 +293,7 @@ TEST_F(ByteStreamTest, bits) {
bits.push_back(seed * (i + 1));
}
auto arena = newArena();
ByteStream bitStream(arena.get(), true);
ByteOutputStream bitStream(arena.get(), true);
bitStream.startWrite(11);
int32_t offset = 0;
// Odd number of sizes.
Expand Down Expand Up @@ -334,7 +334,7 @@ TEST_F(ByteStreamTest, appendWindow) {
}
auto arena = newArena();

ByteStream stream(arena.get());
ByteOutputStream stream(arena.get());
int32_t offset = 0;
std::vector<int32_t> sizes = {1, 19, 52, 58, 129};
int32_t counter = 0;
Expand Down
10 changes: 5 additions & 5 deletions velox/common/memory/tests/HashStringAllocatorTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ TEST_F(HashStringAllocatorTest, headerToString) {

ASSERT_NO_THROW(allocator_->toString());

ByteStream stream(allocator_.get());
ByteOutputStream stream(allocator_.get());
auto h4 = allocator_->newWrite(stream).header;
std::string data(123'456, 'x');
stream.appendStringView(data);
Expand Down Expand Up @@ -164,7 +164,7 @@ TEST_F(HashStringAllocatorTest, allocateLarge) {
}

TEST_F(HashStringAllocatorTest, finishWrite) {
ByteStream stream(allocator_.get());
ByteOutputStream stream(allocator_.get());
auto start = allocator_->newWrite(stream);

// Write a short string.
Expand Down Expand Up @@ -246,7 +246,7 @@ TEST_F(HashStringAllocatorTest, multipart) {
continue;
}
auto chars = randomString();
ByteStream stream(allocator_.get());
ByteOutputStream stream(allocator_.get());
if (data[i].start.header) {
if (rand32() % 5) {
// 4/5 of cases append to the end.
Expand Down Expand Up @@ -286,7 +286,7 @@ TEST_F(HashStringAllocatorTest, multipart) {
}

TEST_F(HashStringAllocatorTest, rewrite) {
ByteStream stream(allocator_.get());
ByteOutputStream stream(allocator_.get());
auto header = allocator_->allocate(5);
EXPECT_EQ(16, header->size()); // Rounds up to kMinAlloc.
HSA::Position current = HSA::Position::atOffset(header, 0);
Expand Down Expand Up @@ -567,7 +567,7 @@ TEST_F(HashStringAllocatorTest, sizeAndPosition) {
allChars[i] = i;
}

ByteStream stream(allocator_.get());
ByteOutputStream stream(allocator_.get());
auto position = allocator_->newWrite(stream, 20);
// Nothing written yet.
EXPECT_EQ(0, stream.size());
Expand Down
Loading

0 comments on commit 25b75f5

Please sign in to comment.