Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-43408: [C++] IO: Make Advance to virtual and naive implement it #43409

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions cpp/src/arrow/io/buffered.cc
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,32 @@ class BufferedInputStream::Impl : public BufferedBase {
return std::shared_ptr<Buffer>(std::move(buffer));
}

Status Advance(int64_t nbytes) {
if (nbytes < 0) {
return Status::Invalid("Bytes to advance must be non-negative. Received:", nbytes);
}
if (nbytes == 0) {
return Status::OK();
}

if (nbytes < bytes_buffered_) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if (nbytes < bytes_buffered_) {
if (nbytes <= bytes_buffered_) {

ConsumeBuffer(nbytes);
return Status::OK();
}

// Invalidate buffered data, as with a Seek or large Read
int64_t remain_skip_bytes = nbytes - bytes_buffered_;
RewindBuffer();
// TODO(mwish): Considering using raw_->Advance if available,
// currently we don't have a way to know if the underlying stream supports fast
// skipping. So we just read and discard the data.
Comment on lines +453 to +455
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe some code like:

  /// \brief Return true if InputStream is capable of zero copy Buffer reads
  ///
  /// Zero copy reads imply the use of Buffer-returning Read() overloads.
  virtual bool supports_zero_copy() const;

Would help, a supports_fast_advance like https://github.com/ClickHouse/ClickHouse/blob/1b2fd51e090214deb340a76833bab7b4985eecfc/src/Disks/IO/ReadBufferFromRemoteFSGather.h#L19 might work

auto result = Read(remain_skip_bytes);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not simply call Advance? The default implementation calls Read anyway.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not simply call Advance? The default implementation calls Read anyway.

If underlying don't have supports_fast_advance, it would call "read" without buffering, and might be a low-efficient direct read and less efficent than "do large buffer and read"

if (!result.ok()) {
return result.status();
}
return Status::OK();
}

// For providing access to the raw file handles
std::shared_ptr<InputStream> raw() const { return raw_; }

Expand Down Expand Up @@ -498,6 +524,8 @@ Result<std::shared_ptr<Buffer>> BufferedInputStream::DoRead(int64_t nbytes) {
return impl_->Read(nbytes);
}

Status BufferedInputStream::DoAdvance(int64_t nbytes) { return impl_->Advance(nbytes); }

Result<std::shared_ptr<const KeyValueMetadata>> BufferedInputStream::ReadMetadata() {
return impl_->raw()->ReadMetadata();
}
Expand Down
3 changes: 3 additions & 0 deletions cpp/src/arrow/io/buffered.h
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,9 @@ class ARROW_EXPORT BufferedInputStream
/// expands the buffer size if necessary
Result<std::string_view> DoPeek(int64_t nbytes) override;

/// \brief Advance the position of the stream by nbytes.
Status DoAdvance(int64_t nbytes) override;

class ARROW_NO_EXPORT Impl;
std::unique_ptr<Impl> impl_;
};
Expand Down
8 changes: 8 additions & 0 deletions cpp/src/arrow/io/concurrency.h
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,11 @@ class ARROW_EXPORT InputStreamConcurrencyWrapper : public InputStream {
return derived()->DoPeek(nbytes);
}

Status Advance(int64_t nbytes) override {
auto guard = lock_.exclusive_guard();
return derived()->DoAdvance(nbytes);
}

/*
Methods to implement in derived class:

Expand All @@ -132,6 +137,7 @@ class ARROW_EXPORT InputStreamConcurrencyWrapper : public InputStream {
And optionally:

Status DoAbort() override;
Status DoAdvance(int64_t nbytes) override;
Result<std::string_view> DoPeek(int64_t nbytes) override;

These methods should be protected in the derived class and
Expand All @@ -145,6 +151,8 @@ class ARROW_EXPORT InputStreamConcurrencyWrapper : public InputStream {
// have derived classes itself.
virtual Status DoAbort() { return derived()->DoClose(); }

virtual Status DoAdvance(int64_t nbytes) { return derived()->Advance(nbytes); }

virtual Result<std::string_view> DoPeek(int64_t ARROW_ARG_UNUSED(nbytes)) {
return Status::NotImplemented("Peek not implemented");
}
Expand Down
7 changes: 7 additions & 0 deletions cpp/src/arrow/io/interfaces.cc
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,13 @@ class FileSegmentReader
return buffer;
}

Status DoAdvance(int64_t nbytes) override {
RETURN_NOT_OK(CheckOpen());
int64_t bytes_to_skip = std::min(nbytes, nbytes_ - position_);
position_ += bytes_to_skip;
return Status::OK();
}

private:
std::shared_ptr<RandomAccessFile> file_;
bool closed_;
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/io/interfaces.h
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ class ARROW_EXPORT InputStream : virtual public FileInterface, virtual public Re
/// \brief Advance or skip stream indicated number of bytes
/// \param[in] nbytes the number to move forward
/// \return Status
Status Advance(int64_t nbytes);
virtual Status Advance(int64_t nbytes);

/// \brief Return zero-copy string_view to upcoming bytes.
///
Expand Down
Loading