Skip to content

Commit

Permalink
io::BufferedInput: Fixing the invalid state with SetBufferSize
Browse files Browse the repository at this point in the history
  • Loading branch information
mapleFU committed Oct 12, 2024
1 parent 45b3697 commit 1970634
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 4 deletions.
22 changes: 18 additions & 4 deletions cpp/src/arrow/io/buffered.cc
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ class BufferedBase {
return !is_open_;
}

// Reset the `buffer_` to a new buffer of size `buffer_size_`
Status ResetBuffer() {
if (!buffer_) {
// On first invocation, or if the buffer has been released, we allocate a
Expand Down Expand Up @@ -284,17 +285,30 @@ class BufferedInputStream::Impl : public BufferedBase {

// Resize internal read buffer. Note that the internal buffer-size
// should be not larger than the raw_read_bound_.
//
// SetBufferSize will not change the buffer_pos_ and bytes_buffered_.
Status SetBufferSize(int64_t new_buffer_size) {
if (new_buffer_size <= 0) {
return Status::Invalid("Buffer size should be positive");
}
if ((buffer_pos_ + bytes_buffered_) >= new_buffer_size) {
return Status::Invalid("Cannot shrink read buffer if buffered data remains");
return Status::Invalid(
"Cannot shrink read buffer if buffered data remains, new_buffer_size:",
new_buffer_size, ", buffer_pos:", buffer_pos_,
", bytes_buffered:", bytes_buffered_, ", buffer_size:", buffer_size_);
}
if (raw_read_bound_ >= 0) {
// No need to reserve space for more than the total remaining number of bytes.
new_buffer_size = std::min(new_buffer_size,
bytes_buffered_ + (raw_read_bound_ - raw_read_total_));
if (bytes_buffered_ == 0) {
// Special case: we can not keep the current buffer because it does not
// contains any required data.
new_buffer_size = std::min(new_buffer_size, raw_read_bound_ - raw_read_total_);
} else {
// We should keeping the current buffer because it contains data that
// can be read.
new_buffer_size =
std::min(new_buffer_size, buffer_size_ + (raw_read_bound_ - raw_read_total_));
}
}
return ResizeBuffer(new_buffer_size);
}
Expand Down Expand Up @@ -350,7 +364,7 @@ class BufferedInputStream::Impl : public BufferedBase {
}

Status DoBuffer() {
// Fill buffer
// Refill the buffer from the raw stream with `buffer_size_` bytes.
if (!buffer_) {
RETURN_NOT_OK(ResetBuffer());
}
Expand Down
1 change: 1 addition & 0 deletions cpp/src/arrow/io/buffered.h
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ class ARROW_EXPORT BufferedInputStream
int64_t raw_read_bound = -1);

/// \brief Resize internal read buffer; calls to Read(...) will read at least
/// this many bytes from the raw InputStream if possible.
/// \param[in] new_buffer_size the new read buffer size
/// \return Status
Status SetBufferSize(int64_t new_buffer_size);
Expand Down
13 changes: 13 additions & 0 deletions cpp/src/arrow/io/buffered_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -491,6 +491,19 @@ TEST_F(TestBufferedInputStream, BufferSizeLimit) {
}
}

TEST_F(TestBufferedInputStream, PeekPastBufferedBytesTwice) {
MakeExample1(/*buffer_size=*/10, default_memory_pool(), /*raw_read_bound=*/15);
ASSERT_OK(buffered_->Read(9));
ASSERT_EQ(1, buffered_->bytes_buffered());
ASSERT_EQ(10, buffered_->buffer_size());
ASSERT_OK(buffered_->Peek(6));
ASSERT_EQ(6, buffered_->bytes_buffered());
ASSERT_EQ(15, buffered_->buffer_size());
ASSERT_OK(buffered_->Peek(6));
ASSERT_EQ(6, buffered_->bytes_buffered());
ASSERT_EQ(15, buffered_->buffer_size());
}

class TestBufferedInputStreamBound : public ::testing::Test {
public:
void SetUp() { CreateExample(/*bounded=*/true); }
Expand Down

0 comments on commit 1970634

Please sign in to comment.