Skip to content

Commit

Permalink
SplitReader refactor
Browse files Browse the repository at this point in the history
To prepare for the upcoming equality delete file read, we need to
refactor the SplitReader and break prepareSplit() into several parts.
This commit does this, and also include a couple of other code cleanups.
  • Loading branch information
yingsu00 committed Mar 12, 2024
1 parent 6f189b0 commit 227571f
Show file tree
Hide file tree
Showing 4 changed files with 172 additions and 138 deletions.
232 changes: 125 additions & 107 deletions velox/connectors/hive/SplitReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ SplitReader::SplitReader(
baseReaderOpts_(connectorQueryCtx->memoryPool()) {}

void SplitReader::configureReaderOptions(
std::shared_ptr<random::RandomSkipTracker> randomSkip) {
std::shared_ptr<velox::random::RandomSkipTracker> randomSkip) {
hive::configureReaderOptions(
baseReaderOpts_,
hiveConfig_,
Expand All @@ -141,6 +141,101 @@ void SplitReader::configureReaderOptions(
void SplitReader::prepareSplit(
std::shared_ptr<common::MetadataFilter> metadataFilter,
dwio::common::RuntimeStatistics& runtimeStats) {
createReader();

if (testEmptySplit(runtimeStats)) {
return;
}

createRowReader(metadataFilter);
}

uint64_t SplitReader::next(uint64_t size, VectorPtr& output) {
if (!baseReaderOpts_.randomSkip()) {
return baseRowReader_->next(size, output);
}
dwio::common::Mutation mutation;
mutation.randomSkip = baseReaderOpts_.randomSkip().get();
return baseRowReader_->next(size, output, &mutation);
}

void SplitReader::resetFilterCaches() {
if (baseRowReader_) {
baseRowReader_->resetFilterCaches();
}
}

bool SplitReader::emptySplit() const {
return emptySplit_;
}

void SplitReader::resetSplit() {
hiveSplit_.reset();
}

int64_t SplitReader::estimatedRowSize() const {
if (!baseRowReader_) {
return DataSource::kUnknownRowSize;
}

auto size = baseRowReader_->estimatedRowSize();
if (size.has_value()) {
return size.value();
}
return DataSource::kUnknownRowSize;
}

void SplitReader::updateRuntimeStats(
dwio::common::RuntimeStatistics& stats) const {
if (baseRowReader_) {
baseRowReader_->updateRuntimeStats(stats);
}
}

bool SplitReader::allPrefetchIssued() const {
return baseRowReader_ && baseRowReader_->allPrefetchIssued();
}

void SplitReader::setPartitionValue(
common::ScanSpec* spec,
const std::string& partitionKey,
const std::optional<std::string>& value) const {
auto it = partitionKeys_->find(partitionKey);
VELOX_CHECK(
it != partitionKeys_->end(),
"ColumnHandle is missing for partition key {}",
partitionKey);
auto type = it->second->dataType();
auto constant = VELOX_DYNAMIC_SCALAR_TYPE_DISPATCH_ALL(
newConstantFromString,
type->kind(),
type,
value,
1,
connectorQueryCtx_->memoryPool());
spec->setConstantValue(constant);
}

std::string SplitReader::toString() const {
std::string partitionKeys;
std::for_each(
partitionKeys_->begin(),
partitionKeys_->end(),
[&](std::pair<
const std::string,
std::shared_ptr<facebook::velox::connector::hive::HiveColumnHandle>>
column) { partitionKeys += " " + column.second->toString(); });
return fmt::format(
"SplitReader: hiveSplit_{} scanSpec_{} readerOutputType_{} partitionKeys_{} reader{} rowReader{}",
hiveSplit_->toString(),
scanSpec_->toString(),
readerOutputType_->toString(),
partitionKeys,
static_cast<const void*>(baseReader_.get()),
static_cast<const void*>(baseRowReader_.get()));
}

void SplitReader::createReader() {
VELOX_CHECK_NE(
baseReaderOpts_.getFileFormat(), dwio::common::FileFormat::UNKNOWN);

Expand All @@ -157,6 +252,7 @@ void SplitReader::prepareSplit(
throw;
}
}

// Here we keep adding new entries to CacheTTLController when new fileHandles
// are generated, if CacheTTLController was created. Creator of
// CacheTTLController needs to make sure a size control strategy was available
Expand All @@ -169,28 +265,10 @@ void SplitReader::prepareSplit(

baseReader_ = dwio::common::getReaderFactory(baseReaderOpts_.getFileFormat())
->createReader(std::move(baseFileInput), baseReaderOpts_);
}

// Note that this doesn't apply to Hudi tables.
emptySplit_ = false;
if (baseReader_->numberOfRows() == 0) {
emptySplit_ = true;
return;
}

// Check filters and see if the whole split can be skipped. Note that this
// doesn't apply to Hudi tables.
if (!testFilters(
scanSpec_.get(),
baseReader_.get(),
hiveSplit_->filePath,
hiveSplit_->partitionKeys,
partitionKeys_)) {
emptySplit_ = true;
++runtimeStats.skippedSplits;
runtimeStats.skippedSplitBytes += hiveSplit_->length;
return;
}

void SplitReader::createRowReader(
std::shared_ptr<common::MetadataFilter> metadataFilter) {
auto& fileType = baseReader_->rowType();
auto columnTypes = adaptColumns(fileType, baseReaderOpts_.getFileSchema());

Expand All @@ -207,6 +285,31 @@ void SplitReader::prepareSplit(
baseRowReader_ = baseReader_->createRowReader(baseRowReaderOpts_);
}

bool SplitReader::testEmptySplit(
dwio::common::RuntimeStatistics& runtimeStats) {
emptySplit_ = false;

// Note that this doesn't apply to Hudi tables.
if (!baseReader_ || baseReader_->numberOfRows() == 0) {
emptySplit_ = true;
} else {
// Check filters and see if the whole split can be skipped. Note that this
// doesn't apply to Hudi tables.
if (!testFilters(
scanSpec_.get(),
baseReader_.get(),
hiveSplit_->filePath,
hiveSplit_->partitionKeys,
partitionKeys_)) {
++runtimeStats.skippedSplits;
runtimeStats.skippedSplitBytes += hiveSplit_->length;
emptySplit_ = true;
}
}

return emptySplit_;
}

std::vector<TypePtr> SplitReader::adaptColumns(
const RowTypePtr& fileType,
const std::shared_ptr<const velox::RowType>& tableSchema) {
Expand Down Expand Up @@ -280,91 +383,6 @@ std::vector<TypePtr> SplitReader::adaptColumns(
return columnTypes;
}

uint64_t SplitReader::next(int64_t size, VectorPtr& output) {
if (!baseReaderOpts_.randomSkip()) {
return baseRowReader_->next(size, output);
}
dwio::common::Mutation mutation;
mutation.randomSkip = baseReaderOpts_.randomSkip().get();
return baseRowReader_->next(size, output, &mutation);
}

void SplitReader::resetFilterCaches() {
if (baseRowReader_) {
baseRowReader_->resetFilterCaches();
}
}

bool SplitReader::emptySplit() const {
return emptySplit_;
}

void SplitReader::resetSplit() {
hiveSplit_.reset();
}

int64_t SplitReader::estimatedRowSize() const {
if (!baseRowReader_) {
return DataSource::kUnknownRowSize;
}

auto size = baseRowReader_->estimatedRowSize();
if (size.has_value()) {
return size.value();
}
return DataSource::kUnknownRowSize;
}

void SplitReader::updateRuntimeStats(
dwio::common::RuntimeStatistics& stats) const {
if (baseRowReader_) {
baseRowReader_->updateRuntimeStats(stats);
}
}

bool SplitReader::allPrefetchIssued() const {
return baseRowReader_ && baseRowReader_->allPrefetchIssued();
}

void SplitReader::setPartitionValue(
common::ScanSpec* spec,
const std::string& partitionKey,
const std::optional<std::string>& value) const {
auto it = partitionKeys_->find(partitionKey);
VELOX_CHECK(
it != partitionKeys_->end(),
"ColumnHandle is missing for partition key {}",
partitionKey);
auto type = it->second->dataType();
auto constant = VELOX_DYNAMIC_SCALAR_TYPE_DISPATCH_ALL(
newConstantFromString,
type->kind(),
type,
value,
1,
connectorQueryCtx_->memoryPool());
spec->setConstantValue(constant);
}

std::string SplitReader::toString() const {
std::string partitionKeys;
std::for_each(
partitionKeys_->begin(),
partitionKeys_->end(),
[&](std::pair<
const std::string,
std::shared_ptr<facebook::velox::connector::hive::HiveColumnHandle>>
column) { partitionKeys += " " + column.second->toString(); });
return fmt::format(
"SplitReader: hiveSplit_{} scanSpec_{} readerOutputType_{} partitionKeys_{} reader{} rowReader{}",
hiveSplit_->toString(),
scanSpec_->toString(),
readerOutputType_->toString(),
partitionKeys,
static_cast<const void*>(baseReader_.get()),
static_cast<const void*>(baseRowReader_.get()));
}

} // namespace facebook::velox::connector::hive

template <>
Expand Down
13 changes: 9 additions & 4 deletions velox/connectors/hive/SplitReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

#pragma once

#include "velox/common/base/RandomUtil.h"
#include "velox/connectors/hive/FileHandle.h"
#include "velox/dwio/common/Options.h"

Expand Down Expand Up @@ -84,7 +85,7 @@ class SplitReader {
virtual ~SplitReader() = default;

void configureReaderOptions(
std::shared_ptr<random::RandomSkipTracker> randomSkip);
std::shared_ptr<velox::random::RandomSkipTracker> randomSkip);

/// This function is used by different table formats like Iceberg and Hudi to
/// do additional preparations before reading the split, e.g. Open delete
Expand All @@ -93,7 +94,7 @@ class SplitReader {
std::shared_ptr<common::MetadataFilter> metadataFilter,
dwio::common::RuntimeStatistics& runtimeStats);

virtual uint64_t next(int64_t size, VectorPtr& output);
virtual uint64_t next(uint64_t size, VectorPtr& output);

void resetFilterCaches();

Expand All @@ -110,6 +111,12 @@ class SplitReader {
std::string toString() const;

protected:
void createReader();

bool testEmptySplit(dwio::common::RuntimeStatistics& runtimeStats);

void createRowReader(std::shared_ptr<common::MetadataFilter> metadataFilter);

// Different table formats may have different meatadata columns. This function
// will be used to update the scanSpec for these columns.
virtual std::vector<TypePtr> adaptColumns(
Expand Down Expand Up @@ -137,8 +144,6 @@ class SplitReader {
std::shared_ptr<io::IoStatistics> ioStats_;
dwio::common::ReaderOptions baseReaderOpts_;
dwio::common::RowReaderOptions baseRowReaderOpts_;

private:
bool emptySplit_;
};

Expand Down
Loading

0 comments on commit 227571f

Please sign in to comment.