diff --git a/velox/connectors/hive/HiveDataSource.cpp b/velox/connectors/hive/HiveDataSource.cpp index ea9ad39f5d98..279f892bb50d 100644 --- a/velox/connectors/hive/HiveDataSource.cpp +++ b/velox/connectors/hive/HiveDataSource.cpp @@ -20,6 +20,7 @@ #include #include "velox/dwio/common/CachedBufferedInput.h" +#include "velox/dwio/common/DirectBufferedInput.h" #include "velox/dwio/common/ReaderFactory.h" #include "velox/expression/ExprToSubfieldFilter.h" #include "velox/expression/FieldReference.h" @@ -803,11 +804,15 @@ HiveDataSource::createBufferedInput( executor_, readerOpts); } - return std::make_unique( + return std::make_unique( fileHandle.file, - readerOpts.getMemoryPool(), dwio::common::MetricsLog::voidLog(), - ioStats_.get()); + fileHandle.uuid.id(), + Connector::getTracker(scanId_, readerOpts.loadQuantum()), + fileHandle.groupId.id(), + ioStats_, + executor_, + readerOpts); } vector_size_t HiveDataSource::evaluateRemainingFilter(RowVectorPtr& rowVector) { diff --git a/velox/dwio/common/CMakeLists.txt b/velox/dwio/common/CMakeLists.txt index c4b767317a6e..5c9e25183d08 100644 --- a/velox/dwio/common/CMakeLists.txt +++ b/velox/dwio/common/CMakeLists.txt @@ -28,6 +28,8 @@ add_library( BufferedInput.cpp CachedBufferedInput.cpp CacheInputStream.cpp + DirectBufferedInput.cpp + DirectInputStream.cpp ColumnSelector.cpp DataBufferHolder.cpp DecoderUtil.cpp diff --git a/velox/dwio/common/DirectBufferedInput.cpp b/velox/dwio/common/DirectBufferedInput.cpp new file mode 100644 index 000000000000..5f85a1860eb6 --- /dev/null +++ b/velox/dwio/common/DirectBufferedInput.cpp @@ -0,0 +1,299 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/dwio/common/DirectBufferedInput.h" +#include "velox/common/memory/Allocation.h" +#include "velox/common/process/TraceContext.h" +#include "velox/dwio/common/DirectInputStream.h" + +DECLARE_int32(cache_prefetch_min_pct); + +using ::facebook::velox::common::Region; + +namespace facebook::velox::dwio::common { + +using cache::CoalescedLoad; +using cache::ScanTracker; +using cache::TrackingId; + +std::unique_ptr DirectBufferedInput::enqueue( + Region region, + const StreamIdentifier* sid = nullptr) { + if (!coalescedLoads_.empty()) { + // Results of previous load are no more available here. + coalescedLoads_.clear(); + streamToCoalescedLoad_.wlock()->clear(); + } + if (region.length == 0) { + return std::make_unique( + static_cast(nullptr), 0); + } + + TrackingId id; + if (sid) { + id = TrackingId(sid->getId()); + } + VELOX_CHECK_LE(region.offset + region.length, fileSize_); + requests_.emplace_back(region, id); + if (tracker_) { + tracker_->recordReference(id, region.length, fileNum_, groupId_); + } + auto stream = std::make_unique( + this, + ioStats_.get(), + region, + input_, + fileNum_, + tracker_, + id, + groupId_, + options_.loadQuantum()); + requests_.back().stream = stream.get(); + return stream; +} + +bool DirectBufferedInput::isBuffered(uint64_t /*offset*/, uint64_t /*length*/) + const { + return false; +} + +bool DirectBufferedInput::shouldPreload(int32_t numPages) { + return false; +} + +namespace { + +// True if the percentage is high enough to warrant prefetch. +bool isPrefetchablePct(int32_t pct) { + return pct >= FLAGS_cache_prefetch_min_pct; +} + +int32_t adjustedReadPct(const cache::TrackingData& trackingData) { + // When called, there will be one more reference that read, since references + // are counted before reading. + if (trackingData.numReferences < 2) { + return 0; + } + return (100 * trackingData.numReads) / (trackingData.numReferences - 1); +} +} // namespace + +void DirectBufferedInput::load(const LogType /*unused*/) { + // After load, new requests cannot be merged into pre-load ones. + auto requests = std::move(requests_); + + // We loop over access frequency buckets. For example readPct 80 + // will get all streams where 80% or more of the referenced data is + // actually loaded. + + for (auto readPct : std::vector{80, 50, 20, 0}) { + std::vector storageLoad; + for (auto& request : requests) { + if (request.processed) { + continue; + } + cache::TrackingData trackingData; + const bool prefetchAnyway = request.trackingId.empty() || + request.trackingId.id() == StreamIdentifier::sequentialFile().id_; + if (!prefetchAnyway && tracker_) { + trackingData = tracker_->trackingData(request.trackingId); + } + if (prefetchAnyway || adjustedReadPct(trackingData) >= readPct) { + request.processed = true; + storageLoad.push_back(&request); + } + } + makeLoads(std::move(storageLoad), isPrefetchablePct(readPct)); + } +} + +void DirectBufferedInput::makeLoads( + std::vector requests, + bool shouldPrefetch) { + if (requests.empty() || (requests.size() < 2 && !shouldPrefetch)) { + // A single request has no other requests to coalesce with and is not + // eligibale to prefetch. This will be loded by itself on first use. + return; + } + const int32_t maxDistance = options_.maxCoalesceDistance(); + const auto loadQuantum = options_.loadQuantum(); + // If reading densely accessed, coalesce into large for best throughput, if + // for sparse, coalesce to quantum to reduce overread. Not all sparse access + // is correlated. + const auto maxCoalesceBytes = + shouldPrefetch ? options_.maxCoalesceBytes() : loadQuantum; + std::sort( + requests.begin(), + requests.end(), + [&](const LoadRequest* left, const LoadRequest* right) { + return left->region.offset < right->region.offset; + }); + // Combine adjacent short reads. + + int32_t numNewLoads = 0; + int64_t coalescedBytes = 0; + coalesceIo( + requests, + maxDistance, + // Break batches up. Better load more short ones i parallel. + 1000, // limit coalesce by size, not count. + [&](int32_t index) { return requests[index]->region.offset; }, + [&](int32_t index) -> int32_t { + auto size = requests[index]->region.length; + if (size > loadQuantum) { + coalescedBytes += loadQuantum; + return loadQuantum; + } + coalescedBytes += size; + return size; + }, + [&](int32_t index) { + if (coalescedBytes > maxCoalesceBytes) { + coalescedBytes = 0; + return kNoCoalesce; + } + return 1; + }, + [&](LoadRequest* request, std::vector& ranges) { + ranges.push_back(request); + }, + [&](int32_t /*gap*/, std::vector /*ranges*/) { /*no op*/ }, + [&](const std::vector& /*requests*/, + int32_t /*begin*/, + int32_t /*end*/, + uint64_t /*offset*/, + const std::vector& ranges) { + ++numNewLoads; + readRegion(ranges, shouldPrefetch); + }); + if (shouldPrefetch && executor_) { + for (auto i = 0; i < coalescedLoads_.size(); ++i) { + auto& load = coalescedLoads_[i]; + if (load->state() == CoalescedLoad::State::kPlanned) { + executor_->add([pendingLoad = load]() { + process::TraceContext trace("Read Ahead"); + pendingLoad->loadOrFuture(nullptr); + }); + } + } + } +} + +void DirectBufferedInput::readRegion( + std::vector requests, + bool prefetch) { + if (requests.empty() || (requests.size() == 1 && !prefetch)) { + return; + } + auto load = std::make_shared( + input_, ioStats_, groupId_, requests, pool_, options_.loadQuantum()); + coalescedLoads_.push_back(load); + streamToCoalescedLoad_.withWLock([&](auto& loads) { + for (auto& request : requests) { + loads[request->stream] = load; + } + }); +} + +std::shared_ptr DirectBufferedInput::coalescedLoad( + const SeekableInputStream* stream) { + return streamToCoalescedLoad_.withWLock( + [&](auto& loads) -> std::shared_ptr { + auto it = loads.find(stream); + if (it == loads.end()) { + return nullptr; + } + auto load = std::move(it->second); + loads.erase(it); + return load; + }); +} + +namespace { +void appendRanges( + memory::Allocation& allocation, + size_t length, + std::vector>& buffers) { + uint64_t offsetInRuns = 0; + for (int i = 0; i < allocation.numRuns(); ++i) { + auto run = allocation.runAt(i); + const uint64_t bytes = memory::AllocationTraits::pageBytes(run.numPages()); + const uint64_t readSize = std::min(bytes, length - offsetInRuns); + buffers.push_back(folly::Range(run.data(), readSize)); + offsetInRuns += readSize; + } +} +} // namespace + +std::vector DirectCoalescedLoad::loadData(bool isPrefetch) { + std::vector> buffers; + int64_t lastEnd = requests_[0].region.offset; + int64_t size = 0; + int64_t overread = 0; + for (auto& request : requests_) { + auto& region = request.region; + if (region.offset > lastEnd) { + buffers.push_back(folly::Range( + nullptr, + reinterpret_cast( + static_cast(region.offset - lastEnd)))); + overread += buffers.back().size(); + } + if (region.length > DirectBufferedInput::kTinySize) { + if (&request != &requests_.back()) { + // Case where request is a little over quantum but is folowed by another + // within the max distance. Coalesces and allows reading the region of + // max quantum + max distance in one piece. + request.loadSize = region.length; + } else { + request.loadSize = std::min(region.length, loadQuantum_); + } + auto numPages = memory::AllocationTraits::numPages(request.loadSize); + pool_.allocateNonContiguous(numPages, request.data); + appendRanges(request.data, request.loadSize, buffers); + } else { + request.loadSize = region.length; + request.tinyData.resize(region.length); + buffers.push_back(folly::Range(request.tinyData.data(), region.length)); + } + lastEnd = region.offset + request.loadSize; + size += std::min(loadQuantum_, region.length); + } + input_->read(buffers, requests_[0].region.offset, LogType::FILE); + ioStats_->read().increment(size); + ioStats_->incRawOverreadBytes(overread); + if (isPrefetch) { + ioStats_->prefetch().increment(size); + } + return {}; +} + +int32_t DirectCoalescedLoad::getData( + int64_t offset, + + memory::Allocation& data, + std::string& tinyData) { + for (auto& request : requests_) { + if (request.region.offset == offset) { + data = std::move(request.data); + tinyData = std::move(request.tinyData); + return request.loadSize; + } + } + return 0; +} + +} // namespace facebook::velox::dwio::common diff --git a/velox/dwio/common/DirectBufferedInput.h b/velox/dwio/common/DirectBufferedInput.h new file mode 100644 index 000000000000..815e0e19ebae --- /dev/null +++ b/velox/dwio/common/DirectBufferedInput.h @@ -0,0 +1,226 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include + +#include "velox/common/caching/AsyncDataCache.h" +#include "velox/common/caching/FileGroupStats.h" +#include "velox/common/caching/ScanTracker.h" +#include "velox/common/io/IoStatistics.h" +#include "velox/common/io/Options.h" +#include "velox/dwio/common/BufferedInput.h" +#include "velox/dwio/common/CacheInputStream.h" +#include "velox/dwio/common/InputStream.h" + +namespace facebook::velox::dwio::common { + +struct LoadRequest { + LoadRequest() = default; + LoadRequest(velox::common::Region& _region, cache::TrackingId _trackingId) + : region(_region), trackingId(_trackingId) {} + + velox::common::Region region; + cache::TrackingId trackingId; + bool processed{false}; + + const SeekableInputStream* stream; + + /// Buffers to be handed to 'stream' after load. + memory::Allocation data; + std::string tinyData; + // Number of bytes in 'data/tinyData'. + int32_t loadSize{0}; +}; + +/// Represents planned loads that should be performed as a single IO. +class DirectCoalescedLoad : public cache::CoalescedLoad { + public: + DirectCoalescedLoad( + std::shared_ptr input, + std::shared_ptr ioStats, + uint64_t groupId, + const std::vector& requests, + memory::MemoryPool& pool, + int32_t loadQuantum) + : CoalescedLoad({}, {}), + ioStats_(ioStats), + groupId_(groupId), + input_(std::move(input)), + loadQuantum_(loadQuantum), + pool_(pool) { + requests_.reserve(requests.size()); + for (auto i = 0; i < requests.size(); ++i) { + requests_.push_back(std::move(*requests[i])); + } + }; + + // Loads the regions. Returns {} since no cache entries are made. The loaded + // data is retrieved with getData(). + std::vector loadData(bool isPrefetch) override; + + // Returns the buffer for 'region' in either 'data' or 'tinyData'. 'region' + // must match a region given to SelectiveBufferedInput::enqueue(). + int32_t + getData(int64_t offset, memory::Allocation& data, std::string& tinyData); + + const auto& requests() { + return requests_; + } + + int64_t size() const override { + int64_t size = 0; + for (auto& request : requests_) { + size += request.region.length; + } + return size; + } + + private: + const std::shared_ptr ioStats_; + const uint64_t groupId_; + const std::shared_ptr input_; + const int32_t loadQuantum_; + memory::MemoryPool& pool_; + std::vector requests_; +}; + +class DirectBufferedInput : public BufferedInput { + public: + static constexpr int32_t kTinySize = 2'000; + + DirectBufferedInput( + std::shared_ptr readFile, + const MetricsLogPtr& metricsLog, + uint64_t fileNum, + std::shared_ptr tracker, + uint64_t groupId, + std::shared_ptr ioStats, + folly::Executor* executor, + const io::ReaderOptions& readerOptions) + : BufferedInput( + std::move(readFile), + readerOptions.getMemoryPool(), + metricsLog), + fileNum_(fileNum), + tracker_(std::move(tracker)), + groupId_(groupId), + ioStats_(std::move(ioStats)), + executor_(executor), + fileSize_(input_->getLength()), + options_(readerOptions) {} + + ~DirectBufferedInput() override { + for (auto& load : coalescedLoads_) { + load->cancel(); + } + } + + std::unique_ptr enqueue( + velox::common::Region region, + const StreamIdentifier* sid) override; + + void load(const LogType /*unused*/) override; + + bool isBuffered(uint64_t offset, uint64_t length) const override; + + bool shouldPreload(int32_t numPages = 0) override; + + bool shouldPrefetchStripes() const override { + return false; + } + + void setNumStripes(int32_t numStripes) override { + auto* stats = tracker_->fileGroupStats(); + if (stats) { + stats->recordFile(fileNum_, groupId_, numStripes); + } + } + + virtual std::unique_ptr clone() const override { + std::unique_ptr input(new DirectBufferedInput( + input_, fileNum_, tracker_, groupId_, ioStats_, executor_, options_)); + return input; + } + + memory::MemoryPool* pool() { + return &pool_; + } + + /// Returns the CoalescedLoad that contains the correlated loads for + /// 'stream' or nullptr if none. Returns nullptr on all but first + /// call for 'stream' since the load is to be triggered by the first + /// access. + std::shared_ptr coalescedLoad( + const SeekableInputStream* stream); + + folly::Executor* executor() const override { + return executor_; + } + + private: + /// Constructor used by clone(). + DirectBufferedInput( + std::shared_ptr input, + uint64_t fileNum, + std::shared_ptr tracker, + uint64_t groupId, + std::shared_ptr ioStats, + folly::Executor* executor, + const io::ReaderOptions& readerOptions) + : BufferedInput(std::move(input), readerOptions.getMemoryPool()), + fileNum_(fileNum), + tracker_(std::move(tracker)), + groupId_(groupId), + ioStats_(std::move(ioStats)), + executor_(executor), + fileSize_(input_->getLength()), + options_(readerOptions) {} + + // Sorts requests and makes CoalescedLoads for nearby requests. If + // 'shouldPrefetch' is true, starts background loading. + void makeLoads(std::vector requests, bool shouldPrefetch); + + // Makes a CoalescedLoad for 'requests' to be read together, coalescing + // IO if appropriate. If 'prefetch' is set, schedules the CoalescedLoad + // on 'executor_'. Links the CoalescedLoad to all DirectInputStreams that it + // covers. + void readRegion(std::vector requests, bool prefetch); + + const uint64_t fileNum_; + const std::shared_ptr tracker_; + const uint64_t groupId_; + const std::shared_ptr ioStats_; + folly::Executor* const executor_; + const uint64_t fileSize_; + + // Regions that are candidates for loading. + std::vector requests_; + + // Coalesced loads spanning multiple streams in one IO. + folly::Synchronized>> + streamToCoalescedLoad_; + + // Distinct coalesced loads in 'coalescedLoads_'. + std::vector> coalescedLoads_; + + io::ReaderOptions options_; +}; + +} // namespace facebook::velox::dwio::common diff --git a/velox/dwio/common/DirectInputStream.cpp b/velox/dwio/common/DirectInputStream.cpp new file mode 100644 index 000000000000..133ce1cb147e --- /dev/null +++ b/velox/dwio/common/DirectInputStream.cpp @@ -0,0 +1,215 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +#include "velox/common/process/TraceContext.h" +#include "velox/common/time/Timer.h" +#include "velox/dwio/common/DirectBufferedInput.h" +#include "velox/dwio/common/DirectInputStream.h" + +using ::facebook::velox::common::Region; + +namespace facebook::velox::dwio::common { + +using velox::cache::ScanTracker; +using velox::cache::TrackingId; +using velox::memory::MemoryAllocator; + +DirectInputStream::DirectInputStream( + DirectBufferedInput* bufferedInput, + IoStatistics* ioStats, + const Region& region, + std::shared_ptr input, + uint64_t fileNum, + std::shared_ptr tracker, + TrackingId trackingId, + uint64_t groupId, + int32_t loadQuantum) + : bufferedInput_(bufferedInput), + ioStats_(ioStats), + input_(std::move(input)), + region_(region), + fileNum_(fileNum), + tracker_(std::move(tracker)), + trackingId_(trackingId), + groupId_(groupId), + loadQuantum_(loadQuantum) {} + +bool DirectInputStream::Next(const void** buffer, int32_t* size) { + if (offsetInRegion_ >= region_.length) { + *size = 0; + return false; + } + loadPosition(); + + *buffer = reinterpret_cast(run_ + offsetInRun_); + *size = runSize_ - offsetInRun_; + if (offsetInRegion_ + *size > region_.length) { + *size = region_.length - offsetInRegion_; + } + offsetInRun_ += *size; + offsetInRegion_ += *size; + + if (tracker_) { + tracker_->recordRead(trackingId_, *size, fileNum_, groupId_); + } + return true; +} + +void DirectInputStream::BackUp(int32_t count) { + VELOX_CHECK_GE(count, 0, "can't backup negative distances"); + + const uint64_t unsignedCount = static_cast(count); + VELOX_CHECK(unsignedCount <= offsetInRun_, "Can't backup that much!"); + offsetInRegion_ -= unsignedCount; +} + +bool DirectInputStream::SkipInt64(int64_t count) { + if (count < 0) { + return false; + } + const uint64_t unsignedCount = static_cast(count); + if (unsignedCount + offsetInRegion_ <= region_.length) { + offsetInRegion_ += unsignedCount; + return true; + } + offsetInRegion_ = region_.length; + return false; +} + +google::protobuf::int64 DirectInputStream::ByteCount() const { + return static_cast(offsetInRegion_); +} + +void DirectInputStream::seekToPosition(PositionProvider& seekPosition) { + offsetInRegion_ = seekPosition.next(); + VELOX_CHECK_LE(offsetInRegion_, region_.length); +} + +std::string DirectInputStream::getName() const { + return fmt::format( + "DirectInputStream {} of {}", offsetInRegion_, region_.length); +} + +size_t DirectInputStream::positionSize() { + // not compressed, so only need 1 position (uncompressed position) + return 1; +} + +namespace { +std::vector> +makeRanges(size_t size, memory::Allocation& data, std::string& tinyData) { + std::vector> buffers; + if (data.numPages() > 0) { + buffers.reserve(data.numRuns()); + uint64_t offsetInRuns = 0; + for (int i = 0; i < data.numRuns(); ++i) { + auto run = data.runAt(i); + uint64_t bytes = memory::AllocationTraits::pageBytes(run.numPages()); + uint64_t readSize = std::min(bytes, size - offsetInRuns); + buffers.push_back(folly::Range(run.data(), readSize)); + offsetInRuns += readSize; + } + } else { + buffers.push_back(folly::Range(tinyData.data(), size)); + } + return buffers; +} +} // namespace + +void DirectInputStream::loadSync() { + if (region_.length < DirectBufferedInput::kTinySize && + data_.numPages() == 0) { + tinyData_.resize(region_.length); + } else { + const auto numPages = + memory::AllocationTraits::numPages(loadedRegion_.length); + if (numPages > data_.numPages()) { + bufferedInput_->pool()->allocateNonContiguous(numPages, data_); + } + } + + process::TraceContext trace("DirectInputStream::loadSync"); + + ioStats_->incRawBytesRead(loadedRegion_.length); + auto ranges = makeRanges(loadedRegion_.length, data_, tinyData_); + uint64_t usecs = 0; + { + MicrosecondTimer timer(&usecs); + input_->read(ranges, loadedRegion_.offset, LogType::FILE); + } + ioStats_->read().increment(loadedRegion_.length); + ioStats_->queryThreadIoLatency().increment(usecs); +} + +void DirectInputStream::loadPosition() { + VELOX_CHECK_LT(offsetInRegion_, region_.length); + if (!loaded_) { + loaded_ = true; + auto load = bufferedInput_->coalescedLoad(this); + if (load) { + folly::SemiFuture waitFuture(false); + uint64_t usecs = 0; + { + MicrosecondTimer timer(&usecs); + if (!load->loadOrFuture(&waitFuture)) { + auto& exec = folly::QueuedImmediateExecutor::instance(); + std::move(waitFuture).via(&exec).wait(); + } + loadedRegion_.offset = region_.offset; + loadedRegion_.length = load->getData(region_.offset, data_, tinyData_); + } + ioStats_->queryThreadIoLatency().increment(usecs); + } else { + // Standalone stream, not part of coalesced load. + loadedRegion_.offset = 0; + loadedRegion_.length = 0; + } + } + // Check if position outside of loaded bounds. + if (loadedRegion_.length == 0 || + region_.offset + offsetInRegion_ < loadedRegion_.offset || + region_.offset + offsetInRegion_ >= + loadedRegion_.offset + loadedRegion_.length) { + loadedRegion_.offset = region_.offset + offsetInRegion_; + loadedRegion_.length = (offsetInRegion_ + loadQuantum_ <= region_.length) + ? loadQuantum_ + : (region_.length - offsetInRegion_); + loadSync(); + } + + const auto offsetInData = + offsetInRegion_ - (loadedRegion_.offset - region_.offset); + if (data_.numPages() == 0) { + run_ = reinterpret_cast(tinyData_.data()); + runSize_ = tinyData_.size(); + offsetInRun_ = offsetInData; + offsetOfRun_ = 0; + } else { + data_.findRun(offsetInData, &runIndex_, &offsetInRun_); + offsetOfRun_ = offsetInData - offsetInRun_; + auto run = data_.runAt(runIndex_); + run_ = run.data(); + runSize_ = memory::AllocationTraits::pageBytes(run.numPages()); + if (offsetOfRun_ + runSize_ > loadedRegion_.length) { + runSize_ = loadedRegion_.length - offsetOfRun_; + } + } + VELOX_CHECK_LT(offsetInRun_, runSize_); +} + +} // namespace facebook::velox::dwio::common diff --git a/velox/dwio/common/DirectInputStream.h b/velox/dwio/common/DirectInputStream.h new file mode 100644 index 000000000000..58905c881bb1 --- /dev/null +++ b/velox/dwio/common/DirectInputStream.h @@ -0,0 +1,116 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "velox/common/caching/FileIds.h" +#include "velox/common/caching/ScanTracker.h" +#include "velox/common/io/IoStatistics.h" +#include "velox/dwio/common/InputStream.h" +#include "velox/dwio/common/SeekableInputStream.h" + +namespace facebook::velox::dwio::common { + +class DirectBufferedInput; + +/// An input sream over possibly coalesced loads. Created by +/// DirectBufferedInput. Similar to CacheInputStream but does not use cache. +class DirectInputStream : public SeekableInputStream { + public: + DirectInputStream( + DirectBufferedInput* bufferedInput, + IoStatistics* ioStats, + const velox::common::Region& region, + std::shared_ptr input, + uint64_t fileNum, + std::shared_ptr tracker, + cache::TrackingId trackingId, + uint64_t groupId, + int32_t loadQuantum); + + bool Next(const void** data, int* size) override; + void BackUp(int count) override; + bool SkipInt64(int64_t count) override; + google::protobuf::int64 ByteCount() const override; + + void seekToPosition(PositionProvider& position) override; + std::string getName() const override; + size_t positionSize() override; + + /// Testing function to access loaded state. + void testingData( + velox::common::Region& loadedRegion, + memory::Allocation*& data, + std::string*& tinyData) { + loadedRegion = loadedRegion_; + data = &data_; + tinyData = &tinyData_; + } + + private: + // Ensures that the current position is covered by 'data_'. + void loadPosition(); + + // Synchronously sets 'data_' to cover loadedRegion_'. + void loadSync(); + + DirectBufferedInput* const bufferedInput_; + IoStatistics* const ioStats_; + const std::shared_ptr input_; + // The region of 'input' 'this' ranges over. + const velox::common::Region region_; + const uint64_t fileNum_; + std::shared_ptr tracker_; + const cache::TrackingId trackingId_; + const uint64_t groupId_; + + // Maximum number of bytes read from 'input' at a time. + const int32_t loadQuantum_; + + // The part of 'region_' that is loaded into 'data_'/'tinyData_'. Relative to + // file start. + velox::common::Region loadedRegion_; + + // Allocation with loaded data. Has space for region.length or loadQuantum_ + // bytes, whichever is less. + memory::Allocation data_; + + // Contains the data if the range is too small for Allocation. + std::string tinyData_; + + // Pointer to start of current run in 'entry->data()' or + // 'entry->tinyData()'. + uint8_t* run_{nullptr}; + + // Offset of current run from start of 'data_' + uint64_t offsetOfRun_; + + // Position of stream relative to 'run_'. + int offsetInRun_{0}; + + // Index of run in 'data_' + int runIndex_ = -1; + + // Number of valid bytes starting at 'run_' + uint32_t runSize_ = 0; + // Position relative to 'region_.offset'. + uint64_t offsetInRegion_ = 0; + + // Set to true when data is first loaded. + bool loaded_{false}; +}; + +} // namespace facebook::velox::dwio::common diff --git a/velox/dwio/dwrf/test/CMakeLists.txt b/velox/dwio/dwrf/test/CMakeLists.txt index f99975fb4aab..063fdc8b26da 100644 --- a/velox/dwio/dwrf/test/CMakeLists.txt +++ b/velox/dwio/dwrf/test/CMakeLists.txt @@ -490,7 +490,8 @@ target_link_libraries( ${FOLLY_BENCHMARK} fmt::fmt) -add_executable(velox_dwio_cache_test CacheInputTest.cpp) +add_executable(velox_dwio_cache_test CacheInputTest.cpp + DirectBufferedInputTest.cpp) add_test(velox_dwio_cache_test velox_dwio_cache_test) diff --git a/velox/dwio/dwrf/test/CacheInputTest.cpp b/velox/dwio/dwrf/test/CacheInputTest.cpp index 0741d55aca1f..6d9f32238175 100644 --- a/velox/dwio/dwrf/test/CacheInputTest.cpp +++ b/velox/dwio/dwrf/test/CacheInputTest.cpp @@ -24,6 +24,7 @@ #include "velox/common/memory/MmapAllocator.h" #include "velox/dwio/common/CachedBufferedInput.h" #include "velox/dwio/dwrf/common/Common.h" +#include "velox/dwio/dwrf/test/TestReadFile.h" #include "velox/exec/tests/utils/TempDirectoryPath.h" #include @@ -37,60 +38,6 @@ using facebook::velox::common::Region; using memory::MemoryAllocator; using IoStatisticsPtr = std::shared_ptr; -// Testing stream producing deterministic data. The byte at offset is -// the low byte of 'seed_' + offset. -class TestReadFile : public ReadFile { - public: - TestReadFile(uint64_t seed, uint64_t length, IoStatisticsPtr ioStats) - : seed_(seed), length_(length), ioStats_(std::move(ioStats)) {} - - uint64_t size() const override { - return length_; - } - - std::string_view pread(uint64_t offset, uint64_t length, void* buffer) - const override { - int fill; - uint64_t content = offset + seed_; - uint64_t available = std::min(length_ - offset, length); - for (fill = 0; fill < (available); ++fill) { - reinterpret_cast(buffer)[fill] = content + fill; - } - ioStats_->incRawBytesRead(length); - return std::string_view(static_cast(buffer), fill); - } - - // Asserts that 'bytes' is as would be read from 'offset'. - void checkData(const void* bytes, uint64_t offset, int32_t size) { - for (auto i = 0; i < size; ++i) { - char expected = seed_ + offset + i; - ASSERT_EQ(expected, reinterpret_cast(bytes)[i]) - << " at " << offset + i; - } - } - - uint64_t memoryUsage() const override { - VELOX_NYI(); - } - - bool shouldCoalesce() const override { - VELOX_NYI(); - } - - std::string getName() const override { - return ""; - } - - uint64_t getNaturalReadSize() const override { - VELOX_NYI(); - } - - private: - const uint64_t seed_; - const uint64_t length_; - IoStatisticsPtr ioStats_; -}; - class CacheTest : public testing::Test { protected: static constexpr int32_t kMaxStreams = 50; diff --git a/velox/dwio/dwrf/test/DirectBufferedInputTest.cpp b/velox/dwio/dwrf/test/DirectBufferedInputTest.cpp new file mode 100644 index 000000000000..41c429fc05a6 --- /dev/null +++ b/velox/dwio/dwrf/test/DirectBufferedInputTest.cpp @@ -0,0 +1,183 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/dwio/common/DirectBufferedInput.h" +#include +#include +#include +#include "velox/common/io/IoStatistics.h" +#include "velox/common/memory/MmapAllocator.h" +#include "velox/dwio/common/Options.h" +#include "velox/dwio/dwrf/common/Common.h" +#include "velox/dwio/dwrf/test/TestReadFile.h" + +#include + +using namespace facebook::velox; +using namespace facebook::velox::dwio; +using namespace facebook::velox::dwio::common; +using namespace facebook::velox::cache; + +using facebook::velox::common::Region; + +using memory::MemoryAllocator; +using IoStatisticsPtr = std::shared_ptr; + +struct TestRegion { + int32_t offset; + int32_t length; +}; + +class DirectBufferedInputTest : public testing::Test { + protected: + static constexpr int32_t kLoadQuantum = 8 << 20; + + void SetUp() override { + executor_ = std::make_unique(10, 10); + ioStats_ = std::make_shared(); + fileIoStats_ = std::make_shared(); + tracker_ = std::make_shared("", nullptr, kLoadQuantum); + file_ = std::make_shared(11, 100 << 20, fileIoStats_); + opts_ = std::make_unique(pool_.get()); + opts_->setLoadQuantum(kLoadQuantum); + } + + void TearDown() override { + executor_->join(); + } + + std::unique_ptr makeInput() { + return std::make_unique( + file_, + dwio::common::MetricsLog::voidLog(), + 1, + tracker_, + 2, + ioStats_, + executor_.get(), + *opts_); + } + + // Reads and checks the result of reading ''regions' and checks that this + // causes 'numIos' accesses to the file. + void testLoads(std::vector regions, int32_t numIos) { + auto previous = file_->numIos(); + auto input = makeInput(); + std::vector> streams; + for (auto i = 0; i < regions.size(); ++i) { + if (regions[i].length > 0) { + Region region; + region.offset = regions[i].offset; + region.length = regions[i].length; + StreamIdentifier si(i); + streams.push_back(input->enqueue(region, &si)); + } + } + input->load(LogType::FILE); + for (auto i = 0; i < regions.size(); ++i) { + if (regions[i].length > 0) { + checkRead(streams[i].get(), regions[i]); + } + } + EXPECT_EQ(numIos, file_->numIos() - previous); + } + + // Marks the numStreams first streams as densely read. A large number of + // references that all end in a read. + void makeDense(int32_t numStreams) { + for (auto i = 0; i < numStreams; ++i) { + StreamIdentifier si(i); + auto trackId = TrackingId(si.getId()); + for (auto counter = 0; counter < 100; ++counter) { + tracker_->recordReference(trackId, 1000000, 1, 1); + tracker_->recordRead(trackId, 1000000, 1, 1); + } + } + } + + void checkRead(SeekableInputStream* stream, TestRegion region) { + int32_t size; + int32_t totalRead = 0; + const void* buffer; + while (stream->Next(&buffer, &size)) { + file_->checkData(buffer, region.offset + totalRead, size); + totalRead += size; + } + EXPECT_EQ(region.length, totalRead); + } + + std::unique_ptr opts_; + std::shared_ptr file_; + std::shared_ptr tracker_; + std::shared_ptr ioStats_; + std::shared_ptr fileIoStats_; + std::unique_ptr executor_; + std::shared_ptr pool_{memory::addDefaultLeafMemoryPool()}; +}; + +TEST_F(DirectBufferedInputTest, basic) { + // The small leading parts coalesce with the the 7M. The 2M goes standalone. + // the last is read in 2 parts. This is because these are not yet densely + // accessed and thus coalescing only works to load quantum of 8MB. + testLoads( + {{100, 100}, + {300, 100}, + {1000, 7000000}, + {7004000, 2000000}, + {20000000, 10000000}}, + 4); + + // All but the last coalesce into one , the last is read in 2 parts. The + // columns are now dense and coalesce goes up to 128MB if gaps are small + // enough. + testLoads( + {{100, 100}, + {300, 100}, + {1000, 7000000}, + {7004000, 2000000}, + {20000000, 10000000}}, + 3); + + // Mark the first 4 ranges as densely accessed. + makeDense(4); + + // The first and first part of second coalesce. + testLoads({{100, 100}, {1000, 10000000}}, 2); + + // The first is read in two parts, the tail of the first does not coalesce + // with the second. + testLoads({{1000, 10000000}, {10001000, 1000}}, 3); + + // One large standalone read in 2 parts. + testLoads({{1000, 10000000}}, 2); + + // Small standalone read in 1 part. + testLoads({{100, 100}}, 1); + + // Two small far apart + testLoads({{100, 100}, {1000000, 100}}, 2); + // The two coalesce because the first fits within load quantum + max coalesce + // distance. + testLoads({{1000, 8500000}, {8510000, 1000000}}, 1); + + // The two coalesce because the first fits within load quantum + max coalesce + // distance. The tail of the second does not coalesce. + testLoads({{1000, 8500000}, {8510000, 8400000}}, 2); + + // The first reads in 2 parts and does not coalesce to the second, which reads + // in one part. + testLoads({{1000, 9000000}, {9010000, 1000000}}, 3); +} diff --git a/velox/dwio/dwrf/test/TestReadFile.h b/velox/dwio/dwrf/test/TestReadFile.h new file mode 100644 index 000000000000..1a20b6c57cba --- /dev/null +++ b/velox/dwio/dwrf/test/TestReadFile.h @@ -0,0 +1,96 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "velox/common/io/IoStatistics.h" + +#include "velox/common/file/File.h" + +#include + +namespace facebook::velox::dwio::common { + +// Testing stream producing deterministic data. The byte at offset is +// the low byte of 'seed_' + offset. +class TestReadFile : public velox::ReadFile { + public: + TestReadFile( + uint64_t seed, + uint64_t length, + std::shared_ptr ioStats) + : seed_(seed), length_(length), ioStats_(std::move(ioStats)) {} + + uint64_t size() const override { + return length_; + } + + std::string_view pread(uint64_t offset, uint64_t length, void* buffer) + const override { + int fill; + uint64_t content = offset + seed_; + uint64_t available = std::min(length_ - offset, length); + for (fill = 0; fill < (available); ++fill) { + reinterpret_cast(buffer)[fill] = content + fill; + } + return std::string_view(static_cast(buffer), fill); + } + + uint64_t preadv( + uint64_t offset, + const std::vector>& buffers) const override { + auto res = ReadFile::preadv(offset, buffers); + ++numIos_; + return res; + } + + // Asserts that 'bytes' is as would be read from 'offset'. + void checkData(const void* bytes, uint64_t offset, int32_t size) { + for (auto i = 0; i < size; ++i) { + char expected = seed_ + offset + i; + ASSERT_EQ(expected, reinterpret_cast(bytes)[i]) + << " at " << offset + i; + } + } + + uint64_t memoryUsage() const override { + VELOX_NYI(); + } + + bool shouldCoalesce() const override { + VELOX_NYI(); + } + + int64_t numIos() const { + return numIos_; + } + + std::string getName() const override { + return ""; + } + + uint64_t getNaturalReadSize() const override { + VELOX_NYI(); + } + + private: + const uint64_t seed_; + const uint64_t length_; + std::shared_ptr ioStats_; + mutable std::atomic numIos_{0}; +}; + +} // namespace facebook::velox::dwio::common