Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Avoid redownload coalesced region gap twice in buffered inputs #12018

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions velox/dwio/common/BufferedInput.h
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,52 @@ class BufferedInput {
return pct;
}

// Move the requests in `noPrefetch' to `prefetch' if it is already covered by
// coalescing in `prefetch'.
template <typename Request, typename GetRegionOffset, typename GetRegionEnd>
static void moveCoalesced(
std::vector<Request>& prefetch,
std::vector<int32_t>& ends,
std::vector<Request>& noPrefetch,
GetRegionOffset getRegionOffset,
GetRegionEnd getRegionEnd) {
auto numOldPrefetch = prefetch.size();
prefetch.resize(prefetch.size() + noPrefetch.size());
std::copy_backward(
prefetch.data(), prefetch.data() + numOldPrefetch, prefetch.end());
auto* oldPrefetch = prefetch.data() + noPrefetch.size();
int numMoved = 0;
int i = 0; // index into noPrefetch for read
int j = 0; // index into oldPrefetch
int k = 0; // index into prefetch
int l = 0; // index into noPrefetch for write
for (auto& end : ends) {
prefetch[k++] = oldPrefetch[j++];
while (j < end) {
auto coalesceStart = getRegionEnd(oldPrefetch[j - 1]);
auto coalesceEnd = getRegionOffset(oldPrefetch[j]);
while (i < noPrefetch.size() &&
getRegionOffset(noPrefetch[i]) < coalesceStart) {
noPrefetch[l++] = noPrefetch[i++];
}
while (i < noPrefetch.size() &&
getRegionEnd(noPrefetch[i]) <= coalesceEnd) {
prefetch[k++] = noPrefetch[i++];
++numMoved;
}
prefetch[k++] = oldPrefetch[j++];
}
end += numMoved;
}
while (i < noPrefetch.size()) {
noPrefetch[l++] = noPrefetch[i++];
}
VELOX_CHECK_EQ(k, numOldPrefetch + numMoved);
prefetch.resize(k);
VELOX_CHECK_EQ(l + numMoved, noPrefetch.size());
noPrefetch.resize(l);
}

const std::shared_ptr<ReadFileInputStream> input_;
memory::MemoryPool* const pool_;

Expand Down
158 changes: 100 additions & 58 deletions velox/dwio/common/CachedBufferedInput.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ bool CachedBufferedInput::shouldPreload(int32_t numPages) {
}

namespace {

bool isPrefetchPct(int32_t pct) {
return pct >= FLAGS_cache_prefetch_min_pct;
}
Expand Down Expand Up @@ -139,6 +140,26 @@ std::vector<CacheRequest*> makeRequestParts(
return parts;
}

template <bool kSsd>
uint64_t getOffset(const CacheRequest& request) {
if constexpr (kSsd) {
VELOX_DCHECK(!request.ssdPin.empty());
return request.ssdPin.run().offset();
} else {
return request.key.offset;
}
}

template <bool kSsd>
std::pair<uint64_t, uint64_t> toRegion(const CacheRequest& request) {
return std::make_pair(getOffset<kSsd>(request), request.size);
}

template <bool kSsd>
bool lessThan(const CacheRequest* left, const CacheRequest* right) {
return toRegion<kSsd>(*left) < toRegion<kSsd>(*right);
}

} // namespace

void CachedBufferedInput::load(const LogType /*unused*/) {
Expand Down Expand Up @@ -187,42 +208,48 @@ void CachedBufferedInput::load(const LogType /*unused*/) {
}
}

makeLoads(std::move(storageLoad[1]), true);
makeLoads(std::move(ssdLoad[1]), true);
makeLoads(std::move(storageLoad[0]), false);
makeLoads(std::move(ssdLoad[0]), false);
std::sort(storageLoad[0].begin(), storageLoad[0].end(), lessThan<false>);
std::sort(storageLoad[1].begin(), storageLoad[1].end(), lessThan<false>);
std::sort(ssdLoad[0].begin(), ssdLoad[0].end(), lessThan<true>);
std::sort(ssdLoad[1].begin(), ssdLoad[1].end(), lessThan<true>);
makeLoads<false>(storageLoad);
makeLoads<true>(ssdLoad);
}

void CachedBufferedInput::makeLoads(
std::vector<CacheRequest*> requests,
bool prefetch) {
template <bool kSsd>
void CachedBufferedInput::makeLoads(std::vector<CacheRequest*> requests[2]) {
std::vector<int32_t> groupEnds[2];
groupEnds[1] = groupRequests<kSsd>(requests[1], true);
moveCoalesced(
requests[1],
groupEnds[1],
requests[0],
[](auto* request) { return getOffset<kSsd>(*request); },
[](auto* request) { return getOffset<kSsd>(*request) + request->size; });
groupEnds[0] = groupRequests<kSsd>(requests[0], false);
readRegions(requests[1], true, groupEnds[1]);
readRegions(requests[0], false, groupEnds[0]);
}

template <bool kSsd>
std::vector<int32_t> CachedBufferedInput::groupRequests(
const std::vector<CacheRequest*>& requests,
bool prefetch) const {
if (requests.empty() || (requests.size() < 2 && !prefetch)) {
return;
return {};
}
const bool isSsd = !requests[0]->ssdPin.empty();
const int32_t maxDistance = isSsd ? 20000 : options_.maxCoalesceDistance();
std::sort(
requests.begin(),
requests.end(),
[&](const CacheRequest* left, const CacheRequest* right) {
if (isSsd) {
return left->ssdPin.run().offset() < right->ssdPin.run().offset();
} else {
return left->key.offset < right->key.offset;
}
});
const int32_t maxDistance = kSsd ? 20000 : options_.maxCoalesceDistance();

// Combine adjacent short reads.
int32_t numNewLoads = 0;
int64_t coalescedBytes = 0;
coalesceIo<CacheRequest*, CacheRequest*>(
std::vector<int32_t> ends;
ends.reserve(requests.size());
std::vector<char> ranges;
coalesceIo<CacheRequest*, char>(
requests,
maxDistance,
std::numeric_limits<int32_t>::max(),
[&](int32_t index) {
return isSsd ? requests[index]->ssdPin.run().offset()
: requests[index]->key.offset;
},
[&](int32_t index) { return getOffset<kSsd>(*requests[index]); },
[&](int32_t index) {
const auto size = requests[index]->size;
coalescedBytes += size;
Expand All @@ -235,41 +262,16 @@ void CachedBufferedInput::makeLoads(
}
return requests[index]->coalesces ? 1 : kNoCoalesce;
},
[&](CacheRequest* request, std::vector<CacheRequest*>& ranges) {
ranges.push_back(request);
[&](CacheRequest* /*request*/, std::vector<char>& ranges) {
ranges.push_back(0);
},
[&](int32_t /*gap*/, std::vector<CacheRequest*> /*ranges*/) { /*no op*/ },
[&](int32_t /*gap*/, std::vector<char> /*ranges*/) { /*no op*/ },
[&](const std::vector<CacheRequest*>& /*requests*/,
int32_t /*begin*/,
int32_t /*end*/,
int32_t end,
uint64_t /*offset*/,
const std::vector<CacheRequest*>& ranges) {
++numNewLoads;
readRegion(ranges, prefetch);
});

if (prefetch && (executor_ != nullptr)) {
std::vector<int32_t> doneIndices;
for (auto i = 0; i < allCoalescedLoads_.size(); ++i) {
auto& load = allCoalescedLoads_[i];
if (load->state() == CoalescedLoad::State::kPlanned) {
executor_->add(
[pendingLoad = load, ssdSavable = !options_.noCacheRetention()]() {
process::TraceContext trace("Read Ahead");
pendingLoad->loadOrFuture(nullptr, ssdSavable);
});
} else {
doneIndices.push_back(i);
}
}

// Remove the loads that were complete. There can be done loads if the same
// CachedBufferedInput has multiple cycles of enqueues and loads.
for (int32_t i = doneIndices.size() - 1; i >= 0; --i) {
assert(!doneIndices.empty()); // lint
allCoalescedLoads_.erase(allCoalescedLoads_.begin() + doneIndices[i]);
}
}
const std::vector<char>& /*ranges*/) { ends.push_back(end); });
return ends;
}

namespace {
Expand Down Expand Up @@ -469,6 +471,46 @@ void CachedBufferedInput::readRegion(
});
}

void CachedBufferedInput::readRegions(
const std::vector<CacheRequest*>& requests,
bool prefetch,
const std::vector<int32_t>& groupEnds) {
int i = 0;
std::vector<CacheRequest*> group;
for (auto end : groupEnds) {
while (i < end) {
group.push_back(requests[i++]);
}
readRegion(group, prefetch);
group.clear();
}
if (prefetch && executor_) {
std::vector<int32_t> doneIndices;
for (auto i = 0; i < allCoalescedLoads_.size(); ++i) {
auto& load = allCoalescedLoads_[i];
if (load->state() == CoalescedLoad::State::kPlanned) {
executor_->add(
[pendingLoad = load, ssdSavable = !options_.noCacheRetention()]() {
process::TraceContext trace("Read Ahead");
pendingLoad->loadOrFuture(nullptr, ssdSavable);
});
} else {
doneIndices.push_back(i);
}
}
// Remove the loads that were complete. There can be done loads if the same
// CachedBufferedInput has multiple cycles of enqueues and loads.
for (int i = 0, j = 0, k = 0; i < allCoalescedLoads_.size(); ++i) {
if (j < doneIndices.size() && doneIndices[j] == i) {
++j;
} else {
allCoalescedLoads_[k++] = std::move(allCoalescedLoads_[i]);
}
}
allCoalescedLoads_.resize(allCoalescedLoads_.size() - doneIndices.size());
}
}

std::shared_ptr<cache::CoalescedLoad> CachedBufferedInput::coalescedLoad(
const SeekableInputStream* stream) {
return coalescedLoads_.withWLock(
Expand All @@ -478,7 +520,7 @@ std::shared_ptr<cache::CoalescedLoad> CachedBufferedInput::coalescedLoad(
return nullptr;
}
auto load = std::move(it->second);
auto* dwioLoad = dynamic_cast<DwioCoalescedLoadBase*>(load.get());
auto* dwioLoad = static_cast<DwioCoalescedLoadBase*>(load.get());
for (auto& request : dwioLoad->requests()) {
loads.erase(request.stream);
}
Expand Down
15 changes: 12 additions & 3 deletions velox/dwio/common/CachedBufferedInput.h
Original file line number Diff line number Diff line change
Expand Up @@ -169,16 +169,25 @@ class CachedBufferedInput : public BufferedInput {
}

private:
// Sorts requests and makes CoalescedLoads for nearby requests. If 'prefetch'
// is true, starts background loading.
void makeLoads(std::vector<CacheRequest*> requests, bool prefetch);
template <bool kSsd>
std::vector<int32_t> groupRequests(
const std::vector<CacheRequest*>& requests,
bool prefetch) const;

// Makes a CoalescedLoad for 'requests' to be read together, coalescing IO is
// appropriate. If 'prefetch' is set, schedules the CoalescedLoad on
// 'executor_'. Links the CoalescedLoad to all CacheInputStreams that it
// concerns.
void readRegion(const std::vector<CacheRequest*>& requests, bool prefetch);

void readRegions(
const std::vector<CacheRequest*>& requests,
bool prefetch,
const std::vector<int32_t>& groupEnds);

template <bool kSsd>
void makeLoads(std::vector<CacheRequest*> requests[2]);

// We only support up to 8MB load quantum size on SSD and there is no need for
// larger SSD read size performance wise.
void checkLoadQuantum() {
Expand Down
Loading
Loading