From c740b61143c369bc305e801171777a0024017c4e Mon Sep 17 00:00:00 2001 From: mwish Date: Wed, 10 Apr 2024 13:45:04 +0800 Subject: [PATCH] minor: enhance boundary checking in CompressedInputStream --- cpp/src/arrow/io/compressed.cc | 27 ++++++++++++++++++++------- 1 file changed, 20 insertions(+), 7 deletions(-) diff --git a/cpp/src/arrow/io/compressed.cc b/cpp/src/arrow/io/compressed.cc index 5faa4d095eb1e..ae0d3cf4f12b9 100644 --- a/cpp/src/arrow/io/compressed.cc +++ b/cpp/src/arrow/io/compressed.cc @@ -269,7 +269,7 @@ class CompressedInputStream::Impl { // Read compressed data if necessary Status EnsureCompressedData() { - int64_t compressed_avail = compressed_ ? compressed_->size() - compressed_pos_ : 0; + int64_t compressed_avail = CompressedBufferAvailable(); if (compressed_avail == 0) { // Ensure compressed_ buffer is allocated with kChunkSize. if (!supports_zero_copy_from_raw_) { @@ -298,9 +298,10 @@ class CompressedInputStream::Impl { } // Decompress some data from the compressed_ buffer. - // Call this function only if the decompressed_ buffer is empty. + // Call this function only if the decompressed_ buffer is fully consumed. Status DecompressData() { - DCHECK_NE(compressed_->data(), nullptr); + DCHECK_NE(0, CompressedBufferAvailable()); + DCHECK_EQ(0, DecompressedBufferAvailable()); int64_t decompress_size = kDecompressSize; @@ -354,7 +355,8 @@ class CompressedInputStream::Impl { // Try to feed more data into the decompressed_ buffer. Status RefillDecompressed(bool* has_data) { // First try to read data from the decompressor - if (compressed_ && compressed_->size() != 0) { + int64_t compress_avail = CompressedBufferAvailable(); + if (compress_avail != 0) { if (decompressor_->IsFinished()) { // We just went over the end of a previous compressed stream. RETURN_NOT_OK(decompressor_->Reset()); @@ -362,8 +364,10 @@ class CompressedInputStream::Impl { } RETURN_NOT_OK(DecompressData()); } - if (!decompressed_ || decompressed_->size() == 0) { - // Got nothing, need to read more compressed data + int64_t decompress_avail = DecompressedBufferAvailable(); + if (decompress_avail == 0) { + // Got nothing from existing `compressed_`, need to read + // more compressed data RETURN_NOT_OK(EnsureCompressedData()); if (compressed_pos_ == compressed_->size()) { // No more data to decompress @@ -405,13 +409,22 @@ class CompressedInputStream::Impl { ARROW_ASSIGN_OR_RAISE(auto buf, AllocateResizableBuffer(nbytes, pool_)); ARROW_ASSIGN_OR_RAISE(int64_t bytes_read, Read(nbytes, buf->mutable_data())); RETURN_NOT_OK(buf->Resize(bytes_read)); - // Using std::move because the some compiler might has issue below: + // Using std::move because some compiler might has issue below: // https://wg21.cmeerw.net/cwg/issue1579 return std::move(buf); } const std::shared_ptr& raw() const { return raw_; } + private: + int64_t CompressedBufferAvailable() const { + return compressed_ ? compressed_->size() - compressed_pos_ : 0; + } + + int64_t DecompressedBufferAvailable() const { + return decompressed_ ? decompressed_->size() - decompressed_pos_ : 0; + } + private: // Read 64 KB compressed data at a time static const int64_t kChunkSize = 64 * 1024;