Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unify compaction prefetching logic #13187

Open
wants to merge 15 commits into
base: main
Choose a base branch
from
Open
40 changes: 21 additions & 19 deletions file/file_prefetch_buffer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -771,6 +771,12 @@ bool FilePrefetchBuffer::TryReadFromCache(const IOOptions& opts,
bool FilePrefetchBuffer::TryReadFromCacheUntracked(
const IOOptions& opts, RandomAccessFileReader* reader, uint64_t offset,
size_t n, Slice* result, Status* status, bool for_compaction) {
// We disallow async IO for compaction reads since they are performed in
// the background anyways and are less latency sensitive compareed to
// user-initiated reads
(void)for_compaction;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With the change to unify the logic, for_compaction is not used outside of this assert so we need this (void) for_compaction to prevent the compiler from complaining

assert(!for_compaction || num_buffers_ == 1);

if (track_min_offset_ && offset < min_offset_read_) {
min_offset_read_ = static_cast<size_t>(offset);
}
Expand Down Expand Up @@ -819,27 +825,22 @@ bool FilePrefetchBuffer::TryReadFromCacheUntracked(
assert(reader != nullptr);
assert(max_readahead_size_ >= readahead_size_);

if (for_compaction) {
s = Prefetch(opts, reader, offset, std::max(n, readahead_size_));
} else {
if (implicit_auto_readahead_) {
if (!IsEligibleForPrefetch(offset, n)) {
// Ignore status as Prefetch is not called.
s.PermitUncheckedError();
return false;
}
if (implicit_auto_readahead_) {
if (!IsEligibleForPrefetch(offset, n)) {
// Ignore status as Prefetch is not called.
s.PermitUncheckedError();
return false;
}

// Prefetch n + readahead_size_/2 synchronously as remaining
// readahead_size_/2 will be prefetched asynchronously if num_buffers_
// > 1.
s = PrefetchInternal(
opts, reader, offset, n,
(num_buffers_ > 1 ? readahead_size_ / 2 : readahead_size_),
copy_to_overlap_buffer);
explicit_prefetch_submitted_ = false;
}

// Prefetch n + readahead_size_/2 synchronously as remaining
// readahead_size_/2 will be prefetched asynchronously if num_buffers_
// > 1.
s = PrefetchInternal(
opts, reader, offset, n,
(num_buffers_ > 1 ? readahead_size_ / 2 : readahead_size_),
copy_to_overlap_buffer);
explicit_prefetch_submitted_ = false;
if (!s.ok()) {
if (status) {
*status = s;
Expand All @@ -853,7 +854,7 @@ bool FilePrefetchBuffer::TryReadFromCacheUntracked(
} else {
return false;
}
} else if (!for_compaction) {
} else {
UpdateStats(/*found_in_buffer=*/true, n);
}

Expand All @@ -864,6 +865,7 @@ bool FilePrefetchBuffer::TryReadFromCacheUntracked(
buf = overlap_buf_;
}
assert(buf->offset_ <= offset);
assert(buf->IsDataBlockInBuffer(offset, n));
uint64_t offset_in_buffer = offset - buf->offset_;
*result = Slice(buf->buffer_.BufferStart() + offset_in_buffer, n);
if (prefetched) {
Expand Down
Loading
Loading