Skip to content

Commit

Permalink
fix: Optimize DWRF footer IO read count and size in Hive connector
Browse files Browse the repository at this point in the history
Summary:
Presto splitting policy sometimes over-splits files with large stripes (e.g. those with large flatmap columns), resulting in quite a lot splits which actually do not contain any stripes.  For those essentially empty splits, we still need to read the file footer in order to compare the split boundary with stripe boundaries.  However we can skip the stripe metadata cache in this case.  We also reduce the number of tiny reads while reading file footer, so most footers can be read in 1 read IO instead of 3.  This combination of optimizations gives up to 2.5 times execution time reduction for some queries.

I also tried caching the parsed footer in file handles; however that does not work well, since Presto seems sending splits from same file to different workers and the cache hit rate remains quite low.

Differential Revision: D66943503
  • Loading branch information
Yuhta authored and facebook-github-bot committed Dec 9, 2024
1 parent 929affe commit bdfdc8a
Show file tree
Hide file tree
Showing 26 changed files with 435 additions and 103 deletions.
2 changes: 1 addition & 1 deletion velox/connectors/hive/HiveConfig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ uint64_t HiveConfig::sortWriterFinishTimeSliceLimitMs(
}

uint64_t HiveConfig::footerEstimatedSize() const {
return config_->get<uint64_t>(kFooterEstimatedSize, 1UL << 20);
return config_->get<uint64_t>(kFooterEstimatedSize, 256UL << 10);
}

uint64_t HiveConfig::filePreloadThreshold() const {
Expand Down
2 changes: 1 addition & 1 deletion velox/dwio/common/BufferedInput.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ bool BufferedInput::tryMerge(Region& first, const Region& second) {
return false;
}

std::unique_ptr<SeekableInputStream> BufferedInput::readBuffer(
std::unique_ptr<SeekableInputStreamWithKnownLength> BufferedInput::readBuffer(
uint64_t offset,
uint64_t length) const {
const auto result = readInternal(offset, length);
Expand Down
6 changes: 3 additions & 3 deletions velox/dwio/common/BufferedInput.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,9 @@ class BufferedInput {
return !!readBuffer(offset, length);
}

virtual std::unique_ptr<SeekableInputStream>
virtual std::unique_ptr<SeekableInputStreamWithKnownLength>
read(uint64_t offset, uint64_t length, LogType logType) const {
std::unique_ptr<SeekableInputStream> ret = readBuffer(offset, length);
auto ret = readBuffer(offset, length);
if (ret != nullptr) {
return ret;
}
Expand Down Expand Up @@ -171,7 +171,7 @@ class BufferedInput {
memory::MemoryPool* const pool_;

private:
std::unique_ptr<SeekableInputStream> readBuffer(
std::unique_ptr<SeekableInputStreamWithKnownLength> readBuffer(
uint64_t offset,
uint64_t length) const;

Expand Down
2 changes: 1 addition & 1 deletion velox/dwio/common/CacheInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ std::string CacheInputStream::getName() const {
return result;
}

size_t CacheInputStream::positionSize() {
size_t CacheInputStream::positionSize() const {
// not compressed, so only need 1 position (uncompressed position)
return 1;
}
Expand Down
13 changes: 11 additions & 2 deletions velox/dwio/common/CacheInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ namespace facebook::velox::dwio::common {

class CachedBufferedInput;

class CacheInputStream : public SeekableInputStream {
class CacheInputStream : public SeekableInputStreamWithKnownLength {
public:
CacheInputStream(
CachedBufferedInput* cache,
Expand All @@ -43,13 +43,22 @@ class CacheInputStream : public SeekableInputStream {

~CacheInputStream() override;

CacheInputStream& operator=(const CacheInputStream&) = delete;
CacheInputStream(const CacheInputStream&) = delete;
CacheInputStream& operator=(CacheInputStream&&) = delete;
CacheInputStream(CacheInputStream&&) = delete;

bool Next(const void** data, int* size) override;
void BackUp(int count) override;
bool SkipInt64(int64_t count) override;
google::protobuf::int64 ByteCount() const override;
void seekToPosition(PositionProvider& position) override;
std::string getName() const override;
size_t positionSize() override;
size_t positionSize() const override;

int64_t totalLength() const override {
return region_.length;
}

/// Returns a copy of 'this', ranging over the same bytes. The clone is
/// initially positioned at the position of 'this' and can be moved
Expand Down
2 changes: 1 addition & 1 deletion velox/dwio/common/CachedBufferedInput.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -488,7 +488,7 @@ std::shared_ptr<cache::CoalescedLoad> CachedBufferedInput::coalescedLoad(
});
}

std::unique_ptr<SeekableInputStream> CachedBufferedInput::read(
std::unique_ptr<SeekableInputStreamWithKnownLength> CachedBufferedInput::read(
uint64_t offset,
uint64_t length,
LogType /*logType*/) const {
Expand Down
2 changes: 1 addition & 1 deletion velox/dwio/common/CachedBufferedInput.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ class CachedBufferedInput : public BufferedInput {

bool isBuffered(uint64_t /*unused*/, uint64_t /*unused*/) const override;

std::unique_ptr<SeekableInputStream>
std::unique_ptr<SeekableInputStreamWithKnownLength>
read(uint64_t offset, uint64_t length, LogType logType) const override;

/// Schedules load of 'region' on 'executor_'. Fails silently if no memory or
Expand Down
2 changes: 1 addition & 1 deletion velox/dwio/common/DirectBufferedInput.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ std::shared_ptr<DirectCoalescedLoad> DirectBufferedInput::coalescedLoad(
});
}

std::unique_ptr<SeekableInputStream> DirectBufferedInput::read(
std::unique_ptr<SeekableInputStreamWithKnownLength> DirectBufferedInput::read(
uint64_t offset,
uint64_t length,
LogType /*logType*/) const {
Expand Down
2 changes: 1 addition & 1 deletion velox/dwio/common/DirectBufferedInput.h
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ class DirectBufferedInput : public BufferedInput {
std::shared_ptr<DirectCoalescedLoad> coalescedLoad(
const SeekableInputStream* stream);

std::unique_ptr<SeekableInputStream>
std::unique_ptr<SeekableInputStreamWithKnownLength>
read(uint64_t offset, uint64_t length, LogType logType) const override;

folly::Executor* executor() const override {
Expand Down
2 changes: 1 addition & 1 deletion velox/dwio/common/DirectInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ std::string DirectInputStream::getName() const {
"DirectInputStream {} of {}", offsetInRegion_, region_.length);
}

size_t DirectInputStream::positionSize() {
size_t DirectInputStream::positionSize() const {
// not compressed, so only need 1 position (uncompressed position)
return 1;
}
Expand Down
8 changes: 6 additions & 2 deletions velox/dwio/common/DirectInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class DirectBufferedInput;

/// An input stream over possibly coalesced loads. Created by
/// DirectBufferedInput. Similar to CacheInputStream but does not use cache.
class DirectInputStream : public SeekableInputStream {
class DirectInputStream : public SeekableInputStreamWithKnownLength {
public:
DirectInputStream(
DirectBufferedInput* bufferedInput,
Expand All @@ -48,7 +48,11 @@ class DirectInputStream : public SeekableInputStream {

void seekToPosition(PositionProvider& position) override;
std::string getName() const override;
size_t positionSize() override;
size_t positionSize() const override;

int64_t totalLength() const override {
return region_.length;
}

/// Testing function to access loaded state.
void testingData(
Expand Down
78 changes: 76 additions & 2 deletions velox/dwio/common/SeekableInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ std::string SeekableArrayInputStream::getName() const {
"SeekableArrayInputStream ", position_, " of ", length_);
}

size_t SeekableArrayInputStream::positionSize() {
size_t SeekableArrayInputStream::positionSize() const {
// not compressed, so only need 1 position (uncompressed position)
return 1;
}
Expand Down Expand Up @@ -257,9 +257,83 @@ std::string SeekableFileInputStream::getName() const {
input_->getName(), " from ", start_, " for ", length_);
}

size_t SeekableFileInputStream::positionSize() {
size_t SeekableFileInputStream::positionSize() const {
// not compressed, so only need 1 position (uncompressed position)
return 1;
}

SeekableConcatInputStream::SeekableConcatInputStream(
std::vector<std::unique_ptr<SeekableInputStreamWithKnownLength>> streams)
: streams_(std::move(streams)) {
VELOX_CHECK(!streams_.empty());
length_ = 0;
for (auto& stream : streams_) {
VELOX_CHECK_EQ(stream->positionSize(), 1);
length_ += stream->totalLength();
}
}

bool SeekableConcatInputStream::Next(const void** data, int32_t* size) {
while (streamIndex_ < streams_.size()) {
if (streams_[streamIndex_]->Next(data, size)) {
position_ += *size;
return true;
}
++streamIndex_;
}
*size = 0;
return false;
}

void SeekableConcatInputStream::BackUp(int32_t count) {
VELOX_CHECK_GE(count, 0);
while (count > 0) {
VELOX_CHECK_GE(streamIndex_, 0);
if (streamIndex_ >= streams_.size()) {
--streamIndex_;
continue;
}
auto streamByteCount = streams_[streamIndex_]->ByteCount();
if (streamByteCount == 0) {
--streamIndex_;
continue;
}
auto streamBackup = std::min<int64_t>(count, streamByteCount);
streams_[streamIndex_]->BackUp(streamBackup);
position_ -= streamBackup;
count -= streamBackup;
}
}

bool SeekableConcatInputStream::SkipInt64(int64_t count) {
VELOX_CHECK_GE(count, 0);
while (count > 0) {
if (streamIndex_ >= streams_.size()) {
return false;
}
auto& stream = streams_[streamIndex_];
auto streamRemaining = stream->totalLength() - stream->ByteCount();
if (streamRemaining == 0) {
++streamIndex_;
continue;
}
auto skip = std::min(count, streamRemaining);
VELOX_CHECK(stream->SkipInt64(skip));
position_ += skip;
count -= skip;
}
return true;
}

void SeekableConcatInputStream::seekToPosition(PositionProvider& /*position*/) {
// We only use this class to read metadata for now, so this method is not
// used.
VELOX_NYI();
}

std::string SeekableConcatInputStream::getName() const {
return fmt::format(
"SeekableConcatInputStream of {} streams", streams_.size());
}

} // namespace facebook::velox::dwio::common
53 changes: 48 additions & 5 deletions velox/dwio/common/SeekableInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class SeekableInputStream : public google::protobuf::io::ZeroCopyInputStream {

// Returns the number of position values this input stream uses to identify an
// ORC/DWRF stream address.
virtual size_t positionSize() = 0;
virtual size_t positionSize() const = 0;

virtual bool SkipInt64(int64_t count) = 0;

Expand All @@ -51,10 +51,15 @@ class SeekableInputStream : public google::protobuf::io::ZeroCopyInputStream {
void readFully(char* buffer, size_t bufferSize);
};

class SeekableInputStreamWithKnownLength : public SeekableInputStream {
public:
virtual int64_t totalLength() const = 0;
};

/**
* Create a seekable input stream based on a memory range.
*/
class SeekableArrayInputStream : public SeekableInputStream {
class SeekableArrayInputStream : public SeekableInputStreamWithKnownLength {
public:
SeekableArrayInputStream(
const unsigned char* list,
Expand Down Expand Up @@ -82,7 +87,11 @@ class SeekableArrayInputStream : public SeekableInputStream {
virtual google::protobuf::int64 ByteCount() const override;
virtual void seekToPosition(PositionProvider& position) override;
virtual std::string getName() const override;
virtual size_t positionSize() override;
virtual size_t positionSize() const override;

int64_t totalLength() const override {
return length_;
}

/// Return the total number of bytes returned from Next() calls. Intended to
/// be used for test validation.
Expand All @@ -106,7 +115,7 @@ class SeekableArrayInputStream : public SeekableInputStream {
/**
* Create a seekable input stream based on an io stream.
*/
class SeekableFileInputStream : public SeekableInputStream {
class SeekableFileInputStream : public SeekableInputStreamWithKnownLength {
public:
SeekableFileInputStream(
std::shared_ptr<ReadFileInputStream> input,
Expand All @@ -123,7 +132,11 @@ class SeekableFileInputStream : public SeekableInputStream {
virtual google::protobuf::int64 ByteCount() const override;
virtual void seekToPosition(PositionProvider& position) override;
virtual std::string getName() const override;
virtual size_t positionSize() override;
virtual size_t positionSize() const override;

int64_t totalLength() const override {
return length_;
}

private:
const std::shared_ptr<ReadFileInputStream> input_;
Expand All @@ -138,4 +151,34 @@ class SeekableFileInputStream : public SeekableInputStream {
uint64_t pushback_;
};

/// A SeekableInputStream that is the concatenation of multiple streams.
class SeekableConcatInputStream : public SeekableInputStreamWithKnownLength {
public:
explicit SeekableConcatInputStream(
std::vector<std::unique_ptr<SeekableInputStreamWithKnownLength>> streams);
bool Next(const void** data, int32_t* size) override;
void BackUp(int32_t count) override;
bool SkipInt64(int64_t count) override;
void seekToPosition(PositionProvider& position) override;
std::string getName() const override;

size_t positionSize() const override {
return 1;
}

google::protobuf::int64 ByteCount() const override {
return position_;
}

int64_t totalLength() const override {
return length_;
}

private:
std::vector<std::unique_ptr<SeekableInputStreamWithKnownLength>> streams_;
int64_t length_;
int streamIndex_ = 0;
int64_t position_ = 0;
};

} // namespace facebook::velox::dwio::common
32 changes: 25 additions & 7 deletions velox/dwio/common/Statistics.h
Original file line number Diff line number Diff line change
Expand Up @@ -535,16 +535,34 @@ struct RuntimeStatistics {
// Number of strides (row groups) skipped based on statistics.
int64_t skippedStrides{0};

int64_t footerBufferOverread{0};

ColumnReaderStatistics columnReaderStatistics;

std::unordered_map<std::string, RuntimeCounter> toMap() {
return {
{"skippedSplits", RuntimeCounter(skippedSplits)},
{"skippedSplitBytes",
RuntimeCounter(skippedSplitBytes, RuntimeCounter::Unit::kBytes)},
{"skippedStrides", RuntimeCounter(skippedStrides)},
{"flattenStringDictionaryValues",
RuntimeCounter(columnReaderStatistics.flattenStringDictionaryValues)}};
std::unordered_map<std::string, RuntimeCounter> result;
if (skippedSplits > 0) {
result.emplace("skippedSplits", RuntimeCounter(skippedSplits));
}
if (skippedSplitBytes > 0) {
result.emplace(
"skippedSplitBytes",
RuntimeCounter(skippedSplitBytes, RuntimeCounter::Unit::kBytes));
}
if (skippedStrides > 0) {
result.emplace("skippedStrides", RuntimeCounter(skippedStrides));
}
if (footerBufferOverread > 0) {
result.emplace(
"footerBufferOverread",
RuntimeCounter(footerBufferOverread, RuntimeCounter::Unit::kBytes));
}
if (columnReaderStatistics.flattenStringDictionaryValues > 0) {
result.emplace(
"flattenStringDictionaryValues",
RuntimeCounter(columnReaderStatistics.flattenStringDictionaryValues));
}
return result;
}
};

Expand Down
2 changes: 1 addition & 1 deletion velox/dwio/common/compression/PagedInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ class PagedInputStream : public dwio::common::SeekableInputStream {
")");
}

size_t positionSize() override {
size_t positionSize() const override {
// not compressed, so need 2 positions (compressed position + uncompressed
// position)
return 2;
Expand Down
1 change: 1 addition & 0 deletions velox/dwio/common/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ add_executable(
ReadFileInputStreamTests.cpp
ReaderTest.cpp
RetryTests.cpp
SeekableInputStreamTest.cpp
TestBufferedInput.cpp
ThrottlerTest.cpp
TypeTests.cpp
Expand Down
Loading

0 comments on commit bdfdc8a

Please sign in to comment.