Skip to content

Commit

Permalink
Fix and optimize read IO pattern for flat map column
Browse files Browse the repository at this point in the history
Summary:
1. `numReferences` and `numReads` in `TrackingData` can overflow for large volume of read on flatmap column, which causes the read percentage becomes negative and fail to coalesce.
2. Increase the limit for maximum number of regions for coalesce, reducing the number of IO reads for a typical flatmap column to 1/3.

Differential Revision: D64225777
  • Loading branch information
Yuhta authored and facebook-github-bot committed Oct 11, 2024
1 parent daeff59 commit 016bc9c
Show file tree
Hide file tree
Showing 5 changed files with 51 additions and 66 deletions.
4 changes: 2 additions & 2 deletions velox/common/caching/ScanTracker.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,8 @@ class FileGroupStats;
struct TrackingData {
int64_t referencedBytes{};
int64_t readBytes{};
int32_t numReferences{};
int32_t numReads{};
int64_t numReferences{};
int64_t numReads{};

/// Marks that 'bytes' worth of data in the tracked object has been referenced
/// and may later be accessed. If 'bytes' is larger than a single
Expand Down
71 changes: 33 additions & 38 deletions velox/dwio/common/CachedBufferedInput.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,9 @@ int32_t adjustedReadPct(const cache::TrackingData& trackingData) {
if (trackingData.numReferences < 2) {
return 0;
}
return (100 * trackingData.numReads) / (trackingData.numReferences - 1);
auto pct = (100 * trackingData.numReads) / (trackingData.numReferences - 1);
VELOX_CHECK_GE(pct, 0);
return pct;
}
} // namespace

Expand All @@ -165,49 +167,42 @@ void CachedBufferedInput::load(const LogType /*unused*/) {
// Extra requests made for pre-loadable regions that are larger than
// 'loadQuantum'.
std::vector<std::unique_ptr<CacheRequest>> extraRequests;
// We loop over access frequency buckets. For example readPct 80 will get all
// streams where 80% or more of the referenced data is actually loaded.
for (const auto readPct : std::vector<int32_t>{80, 50, 20, 0}) {
std::vector<CacheRequest*> storageLoad;
std::vector<CacheRequest*> ssdLoad;
for (auto& request : requests) {
if (request.processed) {
std::vector<CacheRequest*> storageLoad[2];
std::vector<CacheRequest*> ssdLoad[2];
for (auto& request : requests) {
cache::TrackingData trackingData;
const bool prefetchAnyway = request.trackingId.empty() ||
request.trackingId.id() == StreamIdentifier::sequentialFile().id_;
if (!prefetchAnyway && (tracker_ != nullptr)) {
trackingData = tracker_->trackingData(request.trackingId);
}
const bool prefetch =
prefetchAnyway || isPrefetchPct(adjustedReadPct(trackingData));
auto parts = makeRequestParts(
request, trackingData, options_.loadQuantum(), extraRequests);
for (auto part : parts) {
if (cache_->exists(part->key)) {
continue;
}
cache::TrackingData trackingData;
const bool prefetchAnyway = request.trackingId.empty() ||
request.trackingId.id() == StreamIdentifier::sequentialFile().id_;
if (!prefetchAnyway && (tracker_ != nullptr)) {
trackingData = tracker_->trackingData(request.trackingId);
}
if (prefetchAnyway || adjustedReadPct(trackingData) >= readPct) {
request.processed = true;
auto parts = makeRequestParts(
request, trackingData, options_.loadQuantum(), extraRequests);
for (auto part : parts) {
if (cache_->exists(part->key)) {
continue;
}
if (ssdFile != nullptr) {
part->ssdPin = ssdFile->find(part->key);
if (!part->ssdPin.empty() &&
part->ssdPin.run().size() < part->size) {
LOG(INFO) << "IOERR: Ignoring SSD shorter than requested: "
<< part->ssdPin.run().size() << " vs " << part->size;
part->ssdPin.clear();
}
if (!part->ssdPin.empty()) {
ssdLoad.push_back(part);
continue;
}
}
storageLoad.push_back(part);
if (ssdFile != nullptr) {
part->ssdPin = ssdFile->find(part->key);
if (!part->ssdPin.empty() && part->ssdPin.run().size() < part->size) {
LOG(INFO) << "IOERR: Ignoring SSD shorter than requested: "
<< part->ssdPin.run().size() << " vs " << part->size;
part->ssdPin.clear();
}
if (!part->ssdPin.empty()) {
ssdLoad[prefetch].push_back(part);
continue;
}
}
storageLoad[prefetch].push_back(part);
}
makeLoads(std::move(storageLoad), isPrefetchPct(readPct));
makeLoads(std::move(ssdLoad), isPrefetchPct(readPct));
}
makeLoads(std::move(storageLoad[1]), true);
makeLoads(std::move(ssdLoad[1]), true);
makeLoads(std::move(storageLoad[0]), false);
makeLoads(std::move(ssdLoad[0]), false);
}

void CachedBufferedInput::makeLoads(
Expand Down
2 changes: 0 additions & 2 deletions velox/dwio/common/CachedBufferedInput.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,6 @@ struct CacheRequest {
cache::CachePin pin;
cache::SsdPin ssdPin;

bool processed{false};

/// True if this should be coalesced into a CoalescedLoad with other nearby
/// requests with a similar load probability. This is false for sparsely
/// accessed large columns where hitting one piece should not load the
Expand Down
39 changes: 16 additions & 23 deletions velox/dwio/common/DirectBufferedInput.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,36 +87,29 @@ int32_t adjustedReadPct(const cache::TrackingData& trackingData) {
if (trackingData.numReferences < 2) {
return 0;
}
return (100 * trackingData.numReads) / (trackingData.numReferences - 1);
auto pct = (100 * trackingData.numReads) / (trackingData.numReferences - 1);
VELOX_CHECK_GE(pct, 0);
return pct;
}
} // namespace

void DirectBufferedInput::load(const LogType /*unused*/) {
// After load, new requests cannot be merged into pre-load ones.
auto requests = std::move(requests_);

// We loop over access frequency buckets. For example readPct 80
// will get all streams where 80% or more of the referenced data is
// actually loaded.
for (auto readPct : std::vector<int32_t>{80, 50, 20, 0}) {
std::vector<LoadRequest*> storageLoad;
for (auto& request : requests) {
if (request.processed) {
continue;
}
cache::TrackingData trackingData;
const bool prefetchAnyway = request.trackingId.empty() ||
request.trackingId.id() == StreamIdentifier::sequentialFile().id_;
if (!prefetchAnyway && tracker_) {
trackingData = tracker_->trackingData(request.trackingId);
}
if (prefetchAnyway || adjustedReadPct(trackingData) >= readPct) {
request.processed = true;
storageLoad.push_back(&request);
}
std::vector<LoadRequest*> storageLoad[2];
for (auto& request : requests) {
cache::TrackingData trackingData;
const bool prefetchAnyway = request.trackingId.empty() ||
request.trackingId.id() == StreamIdentifier::sequentialFile().id_;
if (!prefetchAnyway && tracker_) {
trackingData = tracker_->trackingData(request.trackingId);
}
makeLoads(std::move(storageLoad), isPrefetchablePct(readPct));
const bool prefetch =
prefetchAnyway || isPrefetchablePct(adjustedReadPct(trackingData));
storageLoad[prefetch].push_back(&request);
}
makeLoads(std::move(storageLoad[1]), true);
makeLoads(std::move(storageLoad[0]), false);
}

void DirectBufferedInput::makeLoads(
Expand Down Expand Up @@ -148,7 +141,7 @@ void DirectBufferedInput::makeLoads(
requests,
maxDistance,
// Break batches up. Better load more short ones i parallel.
1000, // limit coalesce by size, not count.
std::numeric_limits<int32_t>::max(), // limit coalesce by size, not count.
[&](int32_t index) { return requests[index]->region.offset; },
[&](int32_t index) -> int32_t {
auto size = requests[index]->region.length;
Expand Down
1 change: 0 additions & 1 deletion velox/dwio/common/DirectBufferedInput.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ struct LoadRequest {

velox::common::Region region;
cache::TrackingId trackingId;
bool processed{false};

const SeekableInputStream* stream;

Expand Down

0 comments on commit 016bc9c

Please sign in to comment.