Skip to content

Commit

Permalink
fix: Optimize DWRF footer IO read count and size in Hive connector (f…
Browse files Browse the repository at this point in the history
…acebookincubator#11798)

Summary:

Presto splitting policy sometimes over-splits files with large stripes (e.g. those with large flatmap columns), resulting in quite a lot splits which actually do not contain any stripes.  For those essentially empty splits, we still need to read the file footer in order to compare the split boundary with stripe boundaries.  However we can skip the stripe metadata cache in this case.  We also reduce the number of tiny reads while reading file footer, so most footers can be read in 1 read IO instead of 3.  This combination of optimizations gives up to 2.5 times execution time reduction for some queries.

I also tried caching the parsed footer in file handles; however that does not work well, since Presto seems sending splits from same file to different workers and the cache hit rate remains quite low.

Reviewed By: oerling

Differential Revision: D66943503
  • Loading branch information
Yuhta authored and facebook-github-bot committed Dec 12, 2024
1 parent b44ffc9 commit 076cbf8
Show file tree
Hide file tree
Showing 19 changed files with 200 additions and 93 deletions.
2 changes: 1 addition & 1 deletion velox/connectors/hive/HiveConfig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ uint64_t HiveConfig::sortWriterFinishTimeSliceLimitMs(
}

uint64_t HiveConfig::footerEstimatedSize() const {
return config_->get<uint64_t>(kFooterEstimatedSize, 1UL << 20);
return config_->get<uint64_t>(kFooterEstimatedSize, 256UL << 10);
}

uint64_t HiveConfig::filePreloadThreshold() const {
Expand Down
2 changes: 1 addition & 1 deletion velox/dwio/common/BufferedInput.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ class BufferedInput {

virtual std::unique_ptr<SeekableInputStream>
read(uint64_t offset, uint64_t length, LogType logType) const {
std::unique_ptr<SeekableInputStream> ret = readBuffer(offset, length);
auto ret = readBuffer(offset, length);
if (ret != nullptr) {
return ret;
}
Expand Down
2 changes: 1 addition & 1 deletion velox/dwio/common/CacheInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ std::string CacheInputStream::getName() const {
return result;
}

size_t CacheInputStream::positionSize() {
size_t CacheInputStream::positionSize() const {
// not compressed, so only need 1 position (uncompressed position)
return 1;
}
Expand Down
7 changes: 6 additions & 1 deletion velox/dwio/common/CacheInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,18 @@ class CacheInputStream : public SeekableInputStream {

~CacheInputStream() override;

CacheInputStream& operator=(const CacheInputStream&) = delete;
CacheInputStream(const CacheInputStream&) = delete;
CacheInputStream& operator=(CacheInputStream&&) = delete;
CacheInputStream(CacheInputStream&&) = delete;

bool Next(const void** data, int* size) override;
void BackUp(int count) override;
bool SkipInt64(int64_t count) override;
google::protobuf::int64 ByteCount() const override;
void seekToPosition(PositionProvider& position) override;
std::string getName() const override;
size_t positionSize() override;
size_t positionSize() const override;

/// Returns a copy of 'this', ranging over the same bytes. The clone is
/// initially positioned at the position of 'this' and can be moved
Expand Down
2 changes: 1 addition & 1 deletion velox/dwio/common/DirectInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ std::string DirectInputStream::getName() const {
"DirectInputStream {} of {}", offsetInRegion_, region_.length);
}

size_t DirectInputStream::positionSize() {
size_t DirectInputStream::positionSize() const {
// not compressed, so only need 1 position (uncompressed position)
return 1;
}
Expand Down
2 changes: 1 addition & 1 deletion velox/dwio/common/DirectInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class DirectInputStream : public SeekableInputStream {

void seekToPosition(PositionProvider& position) override;
std::string getName() const override;
size_t positionSize() override;
size_t positionSize() const override;

/// Testing function to access loaded state.
void testingData(
Expand Down
4 changes: 2 additions & 2 deletions velox/dwio/common/SeekableInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ std::string SeekableArrayInputStream::getName() const {
"SeekableArrayInputStream ", position_, " of ", length_);
}

size_t SeekableArrayInputStream::positionSize() {
size_t SeekableArrayInputStream::positionSize() const {
// not compressed, so only need 1 position (uncompressed position)
return 1;
}
Expand Down Expand Up @@ -257,7 +257,7 @@ std::string SeekableFileInputStream::getName() const {
input_->getName(), " from ", start_, " for ", length_);
}

size_t SeekableFileInputStream::positionSize() {
size_t SeekableFileInputStream::positionSize() const {
// not compressed, so only need 1 position (uncompressed position)
return 1;
}
Expand Down
6 changes: 3 additions & 3 deletions velox/dwio/common/SeekableInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class SeekableInputStream : public google::protobuf::io::ZeroCopyInputStream {

// Returns the number of position values this input stream uses to identify an
// ORC/DWRF stream address.
virtual size_t positionSize() = 0;
virtual size_t positionSize() const = 0;

virtual bool SkipInt64(int64_t count) = 0;

Expand Down Expand Up @@ -82,7 +82,7 @@ class SeekableArrayInputStream : public SeekableInputStream {
virtual google::protobuf::int64 ByteCount() const override;
virtual void seekToPosition(PositionProvider& position) override;
virtual std::string getName() const override;
virtual size_t positionSize() override;
virtual size_t positionSize() const override;

/// Return the total number of bytes returned from Next() calls. Intended to
/// be used for test validation.
Expand Down Expand Up @@ -123,7 +123,7 @@ class SeekableFileInputStream : public SeekableInputStream {
virtual google::protobuf::int64 ByteCount() const override;
virtual void seekToPosition(PositionProvider& position) override;
virtual std::string getName() const override;
virtual size_t positionSize() override;
virtual size_t positionSize() const override;

private:
const std::shared_ptr<ReadFileInputStream> input_;
Expand Down
37 changes: 30 additions & 7 deletions velox/dwio/common/Statistics.h
Original file line number Diff line number Diff line change
Expand Up @@ -535,16 +535,39 @@ struct RuntimeStatistics {
// Number of strides (row groups) skipped based on statistics.
int64_t skippedStrides{0};

int64_t footerBufferOverread{0};

int64_t numStripes{0};

ColumnReaderStatistics columnReaderStatistics;

std::unordered_map<std::string, RuntimeCounter> toMap() {
return {
{"skippedSplits", RuntimeCounter(skippedSplits)},
{"skippedSplitBytes",
RuntimeCounter(skippedSplitBytes, RuntimeCounter::Unit::kBytes)},
{"skippedStrides", RuntimeCounter(skippedStrides)},
{"flattenStringDictionaryValues",
RuntimeCounter(columnReaderStatistics.flattenStringDictionaryValues)}};
std::unordered_map<std::string, RuntimeCounter> result;
if (skippedSplits > 0) {
result.emplace("skippedSplits", RuntimeCounter(skippedSplits));
}
if (skippedSplitBytes > 0) {
result.emplace(
"skippedSplitBytes",
RuntimeCounter(skippedSplitBytes, RuntimeCounter::Unit::kBytes));
}
if (skippedStrides > 0) {
result.emplace("skippedStrides", RuntimeCounter(skippedStrides));
}
if (footerBufferOverread > 0) {
result.emplace(
"footerBufferOverread",
RuntimeCounter(footerBufferOverread, RuntimeCounter::Unit::kBytes));
}
if (numStripes > 0) {
result.emplace("numStripes", RuntimeCounter(numStripes));
}
if (columnReaderStatistics.flattenStringDictionaryValues > 0) {
result.emplace(
"flattenStringDictionaryValues",
RuntimeCounter(columnReaderStatistics.flattenStringDictionaryValues));
}
return result;
}
};

Expand Down
2 changes: 1 addition & 1 deletion velox/dwio/common/compression/PagedInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ class PagedInputStream : public dwio::common::SeekableInputStream {
")");
}

size_t positionSize() override {
size_t positionSize() const override {
// not compressed, so need 2 positions (compressed position + uncompressed
// position)
return 2;
Expand Down
3 changes: 3 additions & 0 deletions velox/dwio/dwrf/reader/DwrfReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,9 @@ DwrfRowReader::DwrfRowReader(
}

unitLoader_ = getUnitLoader();
if (!isEmptyFile()) {
getReader().loadCache();
}
}

std::unique_ptr<ColumnReader>& DwrfRowReader::getColumnReader() {
Expand Down
2 changes: 2 additions & 0 deletions velox/dwio/dwrf/reader/DwrfReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,8 @@ class DwrfRowReader : public StrideIndexProvider,
void updateRuntimeStats(
dwio::common::RuntimeStatistics& stats) const override {
stats.skippedStrides += skippedStrides_;
stats.footerBufferOverread += getReader().footerBufferOverread();
stats.numStripes += stripeCeiling_ - firstStripe_;
stats.columnReaderStatistics.flattenStringDictionaryValues +=
columnReaderStatistics_.flattenStringDictionaryValues;
}
Expand Down
134 changes: 93 additions & 41 deletions velox/dwio/dwrf/reader/ReaderBase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,56 +82,74 @@ ReaderBase::ReaderBase(
FileFormat fileFormat)
: ReaderBase(createReaderOptions(pool, fileFormat), std::move(input)) {}

namespace {

template <typename T>
std::unique_ptr<PostScript> parsePostScript(const char* input, int size) {
auto impl = std::make_unique<T>();
VELOX_CHECK(impl->ParseFromArray(input, size));
return std::make_unique<PostScript>(std::move(impl));
}

template <typename T>
std::unique_ptr<FooterWrapper> parseFooter(
dwio::common::SeekableInputStream* input,
google::protobuf::Arena* arena) {
auto* impl = google::protobuf::Arena::CreateMessage<T>(arena);
VELOX_CHECK(impl->ParseFromZeroCopyStream(input));
return std::make_unique<FooterWrapper>(impl);
}

} // namespace

ReaderBase::ReaderBase(
const dwio::common::ReaderOptions& options,
std::unique_ptr<dwio::common::BufferedInput> input)
: arena_(std::make_unique<google::protobuf::Arena>()),
options_{options},
: options_{options},
input_(std::move(input)),
fileLength_(input_->getReadFile()->size()) {
fileLength_(input_->getReadFile()->size()),
arena_(std::make_unique<google::protobuf::Arena>()) {
process::TraceContext trace("ReaderBase::ReaderBase");
// TODO: make a config
DWIO_ENSURE(fileLength_ > 0, "ORC file is empty");
VELOX_CHECK_GE(fileLength_, 4, "File size too small");

const auto preloadFile = fileLength_ <= options_.filePreloadThreshold();
const uint64_t readSize = preloadFile
? fileLength_
: std::min(fileLength_, options_.footerEstimatedSize());
const int64_t bufSize = std::min(fileLength_, options_.footerEstimatedSize());
const uint64_t readSize = preloadFile ? fileLength_ : bufSize;
if (input_->supportSyncLoad()) {
input_->enqueue({fileLength_ - readSize, readSize, "footer"});
input_->load(preloadFile ? LogType::FILE : LogType::FOOTER);
}

// TODO: read footer from spectrum
{
const void* buf;
int32_t ignored;
auto lastByteStream = input_->read(fileLength_ - 1, 1, LogType::FOOTER);
const bool ret = lastByteStream->Next(&buf, &ignored);
VELOX_CHECK(ret, "Failed to read");
// Make sure 'lastByteStream' is live while dereferencing 'buf'.
psLength_ = *static_cast<const char*>(buf) & 0xff;
}
footerBuffer_ =
AlignedBuffer::allocate<char>(bufSize, &options_.memoryPool());
auto* buf = footerBuffer_->asMutable<char>();
input_->read(fileLength_ - bufSize, bufSize, LogType::FOOTER)
->readFully(buf, bufSize);
int offset = bufSize - 1;
psLength_ = static_cast<uint8_t>(buf[offset]);
VELOX_CHECK_LE(
psLength_ + 4, // 1 byte for post script len, 3 byte "ORC" header.
fileLength_,
"Corrupted file, Post script size is invalid");

VELOX_CHECK_GE(offset, psLength_);
offset -= psLength_;
if (fileFormat() == FileFormat::DWRF) {
auto postScript = ProtoUtils::readProto<proto::PostScript>(
input_->read(fileLength_ - psLength_ - 1, psLength_, LogType::FOOTER));
postScript_ = std::make_unique<PostScript>(std::move(postScript));
postScript_ = parsePostScript<proto::PostScript>(buf + offset, psLength_);
} else {
auto postScript = ProtoUtils::readProto<proto::orc::PostScript>(
input_->read(fileLength_ - psLength_ - 1, psLength_, LogType::FOOTER));
postScript_ = std::make_unique<PostScript>(std::move(postScript));
postScript_ =
parsePostScript<proto::orc::PostScript>(buf + offset, psLength_);
}

const uint64_t footerSize = postScript_->footerLength();
const uint64_t cacheSize =
postScript_->hasCacheSize() ? postScript_->cacheSize() : 0;
const uint64_t tailSize = 1 + psLength_ + footerSize + cacheSize;
footerBufferOverread_ =
std::max<int>(0, bufSize - static_cast<int64_t>(tailSize));

// There are cases in warehouse, where RC/text files are stored
// in ORC partition. This causes the Reader to SIGSEGV. The following
Expand All @@ -154,29 +172,53 @@ ReaderBase::ReaderBase(
input_->load(LogType::FOOTER);
}

auto footerStream = input_->read(
fileLength_ - psLength_ - footerSize - 1, footerSize, LogType::FOOTER);
BufferPtr remainingFooterBuffer;
char* footerStart;
if (offset >= footerSize) {
offset -= footerSize;
footerStart = buf + offset;
} else {
remainingFooterBuffer =
AlignedBuffer::allocate<char>(footerSize, &options_.memoryPool());
footerStart = remainingFooterBuffer->asMutable<char>();
auto remaining = footerSize - offset;
input_
->read(
fileLength_ - footerSize - psLength_ - 1,
remaining,
LogType::FOOTER)
->readFully(footerStart, remaining);
memcpy(footerStart + remaining, buf, offset);
offset = 0;
}
auto decompressed = createDecompressedStream(
std::make_unique<dwio::common::SeekableArrayInputStream>(
footerStart, footerSize),
"File Footer");
if (fileFormat() == FileFormat::DWRF) {
auto footer =
google::protobuf::Arena::CreateMessage<proto::Footer>(arena_.get());
ProtoUtils::readProtoInto<proto::Footer>(
createDecompressedStream(std::move(footerStream), "File Footer"),
footer);
footer_ = std::make_unique<FooterWrapper>(footer);
footer_ = parseFooter<proto::Footer>(decompressed.get(), arena_.get());
} else {
auto footer = google::protobuf::Arena::CreateMessage<proto::orc::Footer>(
arena_.get());
ProtoUtils::readProtoInto<proto::orc::Footer>(
createDecompressedStream(std::move(footerStream), "File Footer"),
footer);
footer_ = std::make_unique<FooterWrapper>(footer);
footer_ = parseFooter<proto::orc::Footer>(decompressed.get(), arena_.get());
}
footerBufferSize_ = offset;

schema_ = std::dynamic_pointer_cast<const RowType>(
convertType(*footer_, 0, options_.fileColumnNamesReadAsLowerCase()));
VELOX_CHECK_NOT_NULL(schema_, "invalid schema");

// load stripe index/footer cache
// initialize file decrypter
handler_ =
DecryptionHandler::create(*footer_, options_.decrypterFactory().get());
}

void ReaderBase::loadCache() {
if (!footerBuffer_) {
return;
}
const uint64_t footerSize = postScript_->footerLength();
const uint64_t cacheSize =
postScript_->hasCacheSize() ? postScript_->cacheSize() : 0;
const uint64_t tailSize = 1 + psLength_ + footerSize + cacheSize;
if (cacheSize > 0) {
VELOX_CHECK_EQ(format(), DwrfFormat::kDwrf);
const uint64_t cacheOffset = fileLength_ - tailSize;
Expand All @@ -189,8 +231,19 @@ ReaderBase::ReaderBase(
} else {
auto cacheBuffer = std::make_shared<dwio::common::DataBuffer<char>>(
options_.memoryPool(), cacheSize);
input_->read(cacheOffset, cacheSize, LogType::FOOTER)
->readFully(cacheBuffer->data(), cacheSize);
auto* target = cacheBuffer->data();
auto* source = footerBuffer_->as<char>();
auto size = cacheSize;
if (cacheSize > footerBufferSize_) {
auto remaining = cacheSize - footerBufferSize_;
auto stream = input_->read(cacheOffset, remaining, LogType::FOOTER);
stream->readFully(target, remaining);
target += remaining;
size -= remaining;
} else {
source += footerBufferSize_ - cacheSize;
}
memcpy(target, source, size);
cache_ = std::make_unique<StripeMetadataCache>(
postScript_->cacheMode(), *footer_, std::move(cacheBuffer));
}
Expand All @@ -208,9 +261,8 @@ ReaderBase::ReaderBase(
input_->load(LogType::FOOTER);
}
}
// initialize file decrypter
handler_ =
DecryptionHandler::create(*footer_, options_.decrypterFactory().get());
// Release the memory as we no longer need it.
footerBuffer_.reset();
}

std::vector<uint64_t> ReaderBase::rowsPerStripe() const {
Expand Down
Loading

0 comments on commit 076cbf8

Please sign in to comment.