Skip to content

Commit

Permalink
minor: enhance boundary checking in CompressedInputStream
Browse files Browse the repository at this point in the history
  • Loading branch information
mapleFU committed Apr 10, 2024
1 parent cd607d0 commit c740b61
Showing 1 changed file with 20 additions and 7 deletions.
27 changes: 20 additions & 7 deletions cpp/src/arrow/io/compressed.cc
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ class CompressedInputStream::Impl {

// Read compressed data if necessary
Status EnsureCompressedData() {
int64_t compressed_avail = compressed_ ? compressed_->size() - compressed_pos_ : 0;
int64_t compressed_avail = CompressedBufferAvailable();
if (compressed_avail == 0) {
// Ensure compressed_ buffer is allocated with kChunkSize.
if (!supports_zero_copy_from_raw_) {
Expand Down Expand Up @@ -298,9 +298,10 @@ class CompressedInputStream::Impl {
}

// Decompress some data from the compressed_ buffer.
// Call this function only if the decompressed_ buffer is empty.
// Call this function only if the decompressed_ buffer is fully consumed.
Status DecompressData() {
DCHECK_NE(compressed_->data(), nullptr);
DCHECK_NE(0, CompressedBufferAvailable());
DCHECK_EQ(0, DecompressedBufferAvailable());

int64_t decompress_size = kDecompressSize;

Expand Down Expand Up @@ -354,16 +355,19 @@ class CompressedInputStream::Impl {
// Try to feed more data into the decompressed_ buffer.
Status RefillDecompressed(bool* has_data) {
// First try to read data from the decompressor
if (compressed_ && compressed_->size() != 0) {
int64_t compress_avail = CompressedBufferAvailable();
if (compress_avail != 0) {
if (decompressor_->IsFinished()) {
// We just went over the end of a previous compressed stream.
RETURN_NOT_OK(decompressor_->Reset());
fresh_decompressor_ = true;
}
RETURN_NOT_OK(DecompressData());
}
if (!decompressed_ || decompressed_->size() == 0) {
// Got nothing, need to read more compressed data
int64_t decompress_avail = DecompressedBufferAvailable();
if (decompress_avail == 0) {
// Got nothing from existing `compressed_`, need to read
// more compressed data
RETURN_NOT_OK(EnsureCompressedData());
if (compressed_pos_ == compressed_->size()) {
// No more data to decompress
Expand Down Expand Up @@ -405,13 +409,22 @@ class CompressedInputStream::Impl {
ARROW_ASSIGN_OR_RAISE(auto buf, AllocateResizableBuffer(nbytes, pool_));
ARROW_ASSIGN_OR_RAISE(int64_t bytes_read, Read(nbytes, buf->mutable_data()));
RETURN_NOT_OK(buf->Resize(bytes_read));
// Using std::move because the some compiler might has issue below:
// Using std::move because some compiler might has issue below:
// https://wg21.cmeerw.net/cwg/issue1579
return std::move(buf);
}

const std::shared_ptr<InputStream>& raw() const { return raw_; }

private:
int64_t CompressedBufferAvailable() const {
return compressed_ ? compressed_->size() - compressed_pos_ : 0;
}

int64_t DecompressedBufferAvailable() const {
return decompressed_ ? decompressed_->size() - decompressed_pos_ : 0;
}

private:
// Read 64 KB compressed data at a time
static const int64_t kChunkSize = 64 * 1024;
Expand Down

0 comments on commit c740b61

Please sign in to comment.