Skip to content

Commit

Permalink
fix: Optimize DWRF footer IO read count and size in Hive connector (#…
Browse files Browse the repository at this point in the history
…11798)

Summary:

Presto splitting policy sometimes over-splits files with large stripes (e.g. those with large flatmap columns), resulting in quite a lot splits which actually do not contain any stripes.  For those essentially empty splits, we still need to read the file footer in order to compare the split boundary with stripe boundaries.  However we can skip the stripe metadata cache in this case.  We also reduce the number of tiny reads while reading file footer, so most footers can be read in 1 read IO instead of 3.  This combination of optimizations gives up to 2.5 times execution time reduction for some queries.

I also tried caching the parsed footer in file handles; however that does not work well, since Presto seems sending splits from same file to different workers and the cache hit rate remains quite low.

Reviewed By: xiaoxmeng, oerling

Differential Revision: D66943503
  • Loading branch information
Yuhta authored and facebook-github-bot committed Dec 13, 2024
1 parent 42bd38a commit 19fd5c5
Show file tree
Hide file tree
Showing 19 changed files with 212 additions and 98 deletions.
2 changes: 1 addition & 1 deletion velox/connectors/hive/HiveConfig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ uint64_t HiveConfig::sortWriterFinishTimeSliceLimitMs(
}

uint64_t HiveConfig::footerEstimatedSize() const {
return config_->get<uint64_t>(kFooterEstimatedSize, 1UL << 20);
return config_->get<uint64_t>(kFooterEstimatedSize, 256UL << 10);
}

uint64_t HiveConfig::filePreloadThreshold() const {
Expand Down
2 changes: 1 addition & 1 deletion velox/dwio/common/BufferedInput.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ class BufferedInput {

virtual std::unique_ptr<SeekableInputStream>
read(uint64_t offset, uint64_t length, LogType logType) const {
std::unique_ptr<SeekableInputStream> ret = readBuffer(offset, length);
auto ret = readBuffer(offset, length);
if (ret != nullptr) {
return ret;
}
Expand Down
2 changes: 1 addition & 1 deletion velox/dwio/common/CacheInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ std::string CacheInputStream::getName() const {
return result;
}

size_t CacheInputStream::positionSize() {
size_t CacheInputStream::positionSize() const {
// not compressed, so only need 1 position (uncompressed position)
return 1;
}
Expand Down
7 changes: 6 additions & 1 deletion velox/dwio/common/CacheInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,18 @@ class CacheInputStream : public SeekableInputStream {

~CacheInputStream() override;

CacheInputStream& operator=(const CacheInputStream&) = delete;
CacheInputStream(const CacheInputStream&) = delete;
CacheInputStream& operator=(CacheInputStream&&) = delete;
CacheInputStream(CacheInputStream&&) = delete;

bool Next(const void** data, int* size) override;
void BackUp(int count) override;
bool SkipInt64(int64_t count) override;
google::protobuf::int64 ByteCount() const override;
void seekToPosition(PositionProvider& position) override;
std::string getName() const override;
size_t positionSize() override;
size_t positionSize() const override;

/// Returns a copy of 'this', ranging over the same bytes. The clone is
/// initially positioned at the position of 'this' and can be moved
Expand Down
2 changes: 1 addition & 1 deletion velox/dwio/common/DirectInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ std::string DirectInputStream::getName() const {
"DirectInputStream {} of {}", offsetInRegion_, region_.length);
}

size_t DirectInputStream::positionSize() {
size_t DirectInputStream::positionSize() const {
// not compressed, so only need 1 position (uncompressed position)
return 1;
}
Expand Down
2 changes: 1 addition & 1 deletion velox/dwio/common/DirectInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class DirectInputStream : public SeekableInputStream {

void seekToPosition(PositionProvider& position) override;
std::string getName() const override;
size_t positionSize() override;
size_t positionSize() const override;

/// Testing function to access loaded state.
void testingData(
Expand Down
4 changes: 2 additions & 2 deletions velox/dwio/common/SeekableInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ std::string SeekableArrayInputStream::getName() const {
"SeekableArrayInputStream ", position_, " of ", length_);
}

size_t SeekableArrayInputStream::positionSize() {
size_t SeekableArrayInputStream::positionSize() const {
// not compressed, so only need 1 position (uncompressed position)
return 1;
}
Expand Down Expand Up @@ -257,7 +257,7 @@ std::string SeekableFileInputStream::getName() const {
input_->getName(), " from ", start_, " for ", length_);
}

size_t SeekableFileInputStream::positionSize() {
size_t SeekableFileInputStream::positionSize() const {
// not compressed, so only need 1 position (uncompressed position)
return 1;
}
Expand Down
6 changes: 3 additions & 3 deletions velox/dwio/common/SeekableInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class SeekableInputStream : public google::protobuf::io::ZeroCopyInputStream {

// Returns the number of position values this input stream uses to identify an
// ORC/DWRF stream address.
virtual size_t positionSize() = 0;
virtual size_t positionSize() const = 0;

virtual bool SkipInt64(int64_t count) = 0;

Expand Down Expand Up @@ -82,7 +82,7 @@ class SeekableArrayInputStream : public SeekableInputStream {
virtual google::protobuf::int64 ByteCount() const override;
virtual void seekToPosition(PositionProvider& position) override;
virtual std::string getName() const override;
virtual size_t positionSize() override;
virtual size_t positionSize() const override;

/// Return the total number of bytes returned from Next() calls. Intended to
/// be used for test validation.
Expand Down Expand Up @@ -123,7 +123,7 @@ class SeekableFileInputStream : public SeekableInputStream {
virtual google::protobuf::int64 ByteCount() const override;
virtual void seekToPosition(PositionProvider& position) override;
virtual std::string getName() const override;
virtual size_t positionSize() override;
virtual size_t positionSize() const override;

private:
const std::shared_ptr<ReadFileInputStream> input_;
Expand Down
37 changes: 30 additions & 7 deletions velox/dwio/common/Statistics.h
Original file line number Diff line number Diff line change
Expand Up @@ -535,16 +535,39 @@ struct RuntimeStatistics {
// Number of strides (row groups) skipped based on statistics.
int64_t skippedStrides{0};

int64_t footerBufferOverread{0};

int64_t numStripes{0};

ColumnReaderStatistics columnReaderStatistics;

std::unordered_map<std::string, RuntimeCounter> toMap() {
return {
{"skippedSplits", RuntimeCounter(skippedSplits)},
{"skippedSplitBytes",
RuntimeCounter(skippedSplitBytes, RuntimeCounter::Unit::kBytes)},
{"skippedStrides", RuntimeCounter(skippedStrides)},
{"flattenStringDictionaryValues",
RuntimeCounter(columnReaderStatistics.flattenStringDictionaryValues)}};
std::unordered_map<std::string, RuntimeCounter> result;
if (skippedSplits > 0) {
result.emplace("skippedSplits", RuntimeCounter(skippedSplits));
}
if (skippedSplitBytes > 0) {
result.emplace(
"skippedSplitBytes",
RuntimeCounter(skippedSplitBytes, RuntimeCounter::Unit::kBytes));
}
if (skippedStrides > 0) {
result.emplace("skippedStrides", RuntimeCounter(skippedStrides));
}
if (footerBufferOverread > 0) {
result.emplace(
"footerBufferOverread",
RuntimeCounter(footerBufferOverread, RuntimeCounter::Unit::kBytes));
}
if (numStripes > 0) {
result.emplace("numStripes", RuntimeCounter(numStripes));
}
if (columnReaderStatistics.flattenStringDictionaryValues > 0) {
result.emplace(
"flattenStringDictionaryValues",
RuntimeCounter(columnReaderStatistics.flattenStringDictionaryValues));
}
return result;
}
};

Expand Down
2 changes: 1 addition & 1 deletion velox/dwio/common/compression/PagedInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ class PagedInputStream : public dwio::common::SeekableInputStream {
")");
}

size_t positionSize() override {
size_t positionSize() const override {
// not compressed, so need 2 positions (compressed position + uncompressed
// position)
return 2;
Expand Down
11 changes: 7 additions & 4 deletions velox/dwio/dwrf/reader/DwrfReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,9 @@ DwrfRowReader::DwrfRowReader(
}

unitLoader_ = getUnitLoader();
if (!emptyFile()) {
getReader().loadCache();
}
}

std::unique_ptr<ColumnReader>& DwrfRowReader::getColumnReader() {
Expand Down Expand Up @@ -339,7 +342,7 @@ std::unique_ptr<dwio::common::UnitLoader> DwrfRowReader::getUnitLoader() {

uint64_t DwrfRowReader::seekToRow(uint64_t rowNumber) {
// Empty file
if (isEmptyFile()) {
if (emptyFile()) {
return 0;
}
nextRowNumber_.reset();
Expand Down Expand Up @@ -422,7 +425,7 @@ uint64_t DwrfRowReader::seekToRow(uint64_t rowNumber) {
}

uint64_t DwrfRowReader::skipRows(uint64_t numberOfRowsToSkip) {
if (isEmptyFile()) {
if (emptyFile()) {
VLOG(1) << "Empty file, nothing to skip";
return 0;
}
Expand Down Expand Up @@ -586,7 +589,7 @@ uint64_t DwrfRowReader::rowNumber() {
if (nextRow != kAtEnd) {
return nextRow;
}
if (isEmptyFile()) {
if (emptyFile()) {
return 0;
}
return getReader().footer().numberOfRows();
Expand Down Expand Up @@ -615,7 +618,7 @@ uint64_t DwrfRowReader::next(
const dwio::common::Mutation* mutation) {
const auto nextRow = nextRowNumber();
if (nextRow == kAtEnd) {
if (!isEmptyFile()) {
if (!emptyFile()) {
previousRow_ = firstRowOfStripe_[stripeCeiling_ - 1] +
getReader().footer().stripes(stripeCeiling_ - 1).numberOfRows();
} else {
Expand Down
4 changes: 3 additions & 1 deletion velox/dwio/dwrf/reader/DwrfReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,8 @@ class DwrfRowReader : public StrideIndexProvider,
void updateRuntimeStats(
dwio::common::RuntimeStatistics& stats) const override {
stats.skippedStrides += skippedStrides_;
stats.footerBufferOverread += getReader().footerBufferOverread();
stats.numStripes += stripeCeiling_ - firstStripe_;
stats.columnReaderStatistics.flattenStringDictionaryValues +=
columnReaderStatistics_.flattenStringDictionaryValues;
}
Expand Down Expand Up @@ -152,7 +154,7 @@ class DwrfRowReader : public StrideIndexProvider,
const dwio::common::Statistics& stats,
uint32_t nodeId) const;

bool isEmptyFile() const {
bool emptyFile() const {
return stripeCeiling_ == firstStripe_;
}

Expand Down
Loading

0 comments on commit 19fd5c5

Please sign in to comment.