Skip to content

Commit

Permalink
fix: Allow association with a delta file with no corresponding row in…
Browse files Browse the repository at this point in the history
… case of empty base file (facebookincubator#11746)

Summary: Pull Request resolved: facebookincubator#11746

Reviewed By: darrenfu

Differential Revision: D66773290
  • Loading branch information
Yuhta authored and facebook-github-bot committed Dec 5, 2024
1 parent 37a5ffb commit d55ed33
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 38 deletions.
67 changes: 35 additions & 32 deletions velox/connectors/hive/SplitReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -146,14 +146,14 @@ void SplitReader::configureReaderOptions(
void SplitReader::prepareSplit(
std::shared_ptr<common::MetadataFilter> metadataFilter,
dwio::common::RuntimeStatistics& runtimeStats) {
createReader(std::move(metadataFilter));
auto rowType = createReader();

if (checkIfSplitIsEmpty(runtimeStats)) {
VELOX_CHECK(emptySplit_);
return;
}

createRowReader();
createRowReader(std::move(metadataFilter), std::move(rowType));
}

uint64_t SplitReader::next(uint64_t size, VectorPtr& output) {
Expand Down Expand Up @@ -220,8 +220,7 @@ std::string SplitReader::toString() const {
static_cast<const void*>(baseRowReader_.get()));
}

void SplitReader::createReader(
std::shared_ptr<common::MetadataFilter> metadataFilter) {
RowTypePtr SplitReader::createReader() {
VELOX_CHECK_NE(
baseReaderOpts_.fileFormat(), dwio::common::FileFormat::UNKNOWN);

Expand All @@ -237,7 +236,7 @@ void SplitReader::createReader(
hiveConfig_->ignoreMissingFiles(
connectorQueryCtx_->sessionProperties())) {
emptySplit_ = true;
return;
return nullptr;
}
throw;
}
Expand All @@ -262,15 +261,22 @@ void SplitReader::createReader(
auto& fileType = baseReader_->rowType();
auto columnTypes = adaptColumns(fileType, baseReaderOpts_.fileSchema());
auto columnNames = fileType->names();
configureRowReaderOptions(
hiveTableHandle_->tableParameters(),
scanSpec_,
std::move(metadataFilter),
ROW(std::move(columnNames), std::move(columnTypes)),
hiveSplit_,
hiveConfig_,
connectorQueryCtx_->sessionProperties(),
baseRowReaderOpts_);
return ROW(std::move(columnNames), std::move(columnTypes));
}

bool SplitReader::filterOnStats(
dwio::common::RuntimeStatistics& runtimeStats) const {
if (testFilters(
scanSpec_.get(),
baseReader_.get(),
hiveSplit_->filePath,
hiveSplit_->partitionKeys,
*partitionKeys_)) {
return true;
}
++runtimeStats.skippedSplits;
runtimeStats.skippedSplitBytes += hiveSplit_->length;
return false;
}

bool SplitReader::checkIfSplitIsEmpty(
Expand All @@ -280,35 +286,32 @@ bool SplitReader::checkIfSplitIsEmpty(
if (emptySplit_) {
return true;
}

if (!baseReader_ || baseReader_->numberOfRows() == 0) {
if (!baseReader_ || baseReader_->numberOfRows() == 0 ||
!filterOnStats(runtimeStats)) {
emptySplit_ = true;
} else {
// Check filters and see if the whole split can be skipped. Note that this
// doesn't apply to Hudi tables.
if (!testFilters(
scanSpec_.get(),
baseReader_.get(),
hiveSplit_->filePath,
hiveSplit_->partitionKeys,
*partitionKeys_)) {
++runtimeStats.skippedSplits;
runtimeStats.skippedSplitBytes += hiveSplit_->length;
emptySplit_ = true;
}
}

return emptySplit_;
}

void SplitReader::createRowReader() {
void SplitReader::createRowReader(
std::shared_ptr<common::MetadataFilter> metadataFilter,
RowTypePtr rowType) {
VELOX_CHECK_NULL(baseRowReader_);
configureRowReaderOptions(
hiveTableHandle_->tableParameters(),
scanSpec_,
std::move(metadataFilter),
std::move(rowType),
hiveSplit_,
hiveConfig_,
connectorQueryCtx_->sessionProperties(),
baseRowReaderOpts_);
baseRowReader_ = baseReader_->createRowReader(baseRowReaderOpts_);
}

std::vector<TypePtr> SplitReader::adaptColumns(
const RowTypePtr& fileType,
const std::shared_ptr<const velox::RowType>& tableSchema) {
const std::shared_ptr<const velox::RowType>& tableSchema) const {
// Keep track of schema types for columns in file, used by ColumnSelector.
std::vector<TypePtr> columnTypes = fileType->children();

Expand Down
17 changes: 13 additions & 4 deletions velox/connectors/hive/SplitReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,12 @@ class SplitReader {

/// Create the dwio::common::Reader object baseReader_, which will be used to
/// read the data file's metadata and schema
void createReader(std::shared_ptr<common::MetadataFilter> metadataFilter);
RowTypePtr createReader();

// Check if the filters pass on the column statistics. When delta update is
// present, the corresonding filter should be disabled before calling this
// function.
bool filterOnStats(dwio::common::RuntimeStatistics& runtimeStats) const;

/// Check if the hiveSplit_ is empty. The split is considered empty when
/// 1) The data file is missing but the user chooses to ignore it
Expand All @@ -125,19 +130,23 @@ class SplitReader {

/// Create the dwio::common::RowReader object baseRowReader_, which owns the
/// ColumnReaders that will be used to read the data
void createRowReader();
void createRowReader(
std::shared_ptr<common::MetadataFilter> metadataFilter,
RowTypePtr rowType);

private:
/// Different table formats may have different meatadata columns.
/// This function will be used to update the scanSpec for these columns.
virtual std::vector<TypePtr> adaptColumns(
std::vector<TypePtr> adaptColumns(
const RowTypePtr& fileType,
const std::shared_ptr<const velox::RowType>& tableSchema);
const std::shared_ptr<const velox::RowType>& tableSchema) const;

void setPartitionValue(
common::ScanSpec* spec,
const std::string& partitionKey,
const std::optional<std::string>& value) const;

protected:
std::shared_ptr<const HiveConnectorSplit> hiveSplit_;
const std::shared_ptr<const HiveTableHandle> hiveTableHandle_;
const std::unordered_map<
Expand Down
4 changes: 2 additions & 2 deletions velox/connectors/hive/iceberg/IcebergSplitReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,14 @@ IcebergSplitReader::IcebergSplitReader(
void IcebergSplitReader::prepareSplit(
std::shared_ptr<common::MetadataFilter> metadataFilter,
dwio::common::RuntimeStatistics& runtimeStats) {
createReader(std::move(metadataFilter));
auto rowType = createReader();

if (checkIfSplitIsEmpty(runtimeStats)) {
VELOX_CHECK(emptySplit_);
return;
}

createRowReader();
createRowReader(std::move(metadataFilter), std::move(rowType));

std::shared_ptr<const HiveIcebergSplit> icebergSplit =
std::dynamic_pointer_cast<const HiveIcebergSplit>(hiveSplit_);
Expand Down

0 comments on commit d55ed33

Please sign in to comment.