From 410e09f1e9233e2faaf32483f7486973a8521bf3 Mon Sep 17 00:00:00 2001 From: Jimmy Lu Date: Thu, 5 Dec 2024 07:11:54 -0800 Subject: [PATCH] fix: Allow association with a delta file with no corresponding row in case of empty base file (#11746) Summary: Pull Request resolved: https://github.com/facebookincubator/velox/pull/11746 Reviewed By: darrenfu Differential Revision: D66773290 --- velox/connectors/hive/SplitReader.cpp | 67 ++++++++++--------- velox/connectors/hive/SplitReader.h | 17 +++-- .../hive/iceberg/IcebergSplitReader.cpp | 4 +- 3 files changed, 50 insertions(+), 38 deletions(-) diff --git a/velox/connectors/hive/SplitReader.cpp b/velox/connectors/hive/SplitReader.cpp index bfa9e52828c40..7b357ea097463 100644 --- a/velox/connectors/hive/SplitReader.cpp +++ b/velox/connectors/hive/SplitReader.cpp @@ -146,14 +146,14 @@ void SplitReader::configureReaderOptions( void SplitReader::prepareSplit( std::shared_ptr metadataFilter, dwio::common::RuntimeStatistics& runtimeStats) { - createReader(std::move(metadataFilter)); + auto rowType = createReader(); if (checkIfSplitIsEmpty(runtimeStats)) { VELOX_CHECK(emptySplit_); return; } - createRowReader(); + createRowReader(std::move(metadataFilter), std::move(rowType)); } uint64_t SplitReader::next(uint64_t size, VectorPtr& output) { @@ -220,8 +220,7 @@ std::string SplitReader::toString() const { static_cast(baseRowReader_.get())); } -void SplitReader::createReader( - std::shared_ptr metadataFilter) { +RowTypePtr SplitReader::createReader() { VELOX_CHECK_NE( baseReaderOpts_.fileFormat(), dwio::common::FileFormat::UNKNOWN); @@ -237,7 +236,7 @@ void SplitReader::createReader( hiveConfig_->ignoreMissingFiles( connectorQueryCtx_->sessionProperties())) { emptySplit_ = true; - return; + return nullptr; } throw; } @@ -262,15 +261,22 @@ void SplitReader::createReader( auto& fileType = baseReader_->rowType(); auto columnTypes = adaptColumns(fileType, baseReaderOpts_.fileSchema()); auto columnNames = fileType->names(); - configureRowReaderOptions( - hiveTableHandle_->tableParameters(), - scanSpec_, - std::move(metadataFilter), - ROW(std::move(columnNames), std::move(columnTypes)), - hiveSplit_, - hiveConfig_, - connectorQueryCtx_->sessionProperties(), - baseRowReaderOpts_); + return ROW(std::move(columnNames), std::move(columnTypes)); +} + +bool SplitReader::filterOnStats( + dwio::common::RuntimeStatistics& runtimeStats) const { + if (testFilters( + scanSpec_.get(), + baseReader_.get(), + hiveSplit_->filePath, + hiveSplit_->partitionKeys, + *partitionKeys_)) { + return true; + } + ++runtimeStats.skippedSplits; + runtimeStats.skippedSplitBytes += hiveSplit_->length; + return false; } bool SplitReader::checkIfSplitIsEmpty( @@ -280,35 +286,32 @@ bool SplitReader::checkIfSplitIsEmpty( if (emptySplit_) { return true; } - - if (!baseReader_ || baseReader_->numberOfRows() == 0) { + if (!baseReader_ || baseReader_->numberOfRows() == 0 || + !filterOnStats(runtimeStats)) { emptySplit_ = true; - } else { - // Check filters and see if the whole split can be skipped. Note that this - // doesn't apply to Hudi tables. - if (!testFilters( - scanSpec_.get(), - baseReader_.get(), - hiveSplit_->filePath, - hiveSplit_->partitionKeys, - *partitionKeys_)) { - ++runtimeStats.skippedSplits; - runtimeStats.skippedSplitBytes += hiveSplit_->length; - emptySplit_ = true; - } } - return emptySplit_; } -void SplitReader::createRowReader() { +void SplitReader::createRowReader( + std::shared_ptr metadataFilter, + RowTypePtr rowType) { VELOX_CHECK_NULL(baseRowReader_); + configureRowReaderOptions( + hiveTableHandle_->tableParameters(), + scanSpec_, + std::move(metadataFilter), + std::move(rowType), + hiveSplit_, + hiveConfig_, + connectorQueryCtx_->sessionProperties(), + baseRowReaderOpts_); baseRowReader_ = baseReader_->createRowReader(baseRowReaderOpts_); } std::vector SplitReader::adaptColumns( const RowTypePtr& fileType, - const std::shared_ptr& tableSchema) { + const std::shared_ptr& tableSchema) const { // Keep track of schema types for columns in file, used by ColumnSelector. std::vector columnTypes = fileType->children(); diff --git a/velox/connectors/hive/SplitReader.h b/velox/connectors/hive/SplitReader.h index b8ab6e10fd040..5466107ca6206 100644 --- a/velox/connectors/hive/SplitReader.h +++ b/velox/connectors/hive/SplitReader.h @@ -113,7 +113,12 @@ class SplitReader { /// Create the dwio::common::Reader object baseReader_, which will be used to /// read the data file's metadata and schema - void createReader(std::shared_ptr metadataFilter); + RowTypePtr createReader(); + + // Check if the filters pass on the column statistics. When delta update is + // present, the corresonding filter should be disabled before calling this + // function. + bool filterOnStats(dwio::common::RuntimeStatistics& runtimeStats) const; /// Check if the hiveSplit_ is empty. The split is considered empty when /// 1) The data file is missing but the user chooses to ignore it @@ -125,19 +130,23 @@ class SplitReader { /// Create the dwio::common::RowReader object baseRowReader_, which owns the /// ColumnReaders that will be used to read the data - void createRowReader(); + void createRowReader( + std::shared_ptr metadataFilter, + RowTypePtr rowType); + private: /// Different table formats may have different meatadata columns. /// This function will be used to update the scanSpec for these columns. - virtual std::vector adaptColumns( + std::vector adaptColumns( const RowTypePtr& fileType, - const std::shared_ptr& tableSchema); + const std::shared_ptr& tableSchema) const; void setPartitionValue( common::ScanSpec* spec, const std::string& partitionKey, const std::optional& value) const; + protected: std::shared_ptr hiveSplit_; const std::shared_ptr hiveTableHandle_; const std::unordered_map< diff --git a/velox/connectors/hive/iceberg/IcebergSplitReader.cpp b/velox/connectors/hive/iceberg/IcebergSplitReader.cpp index 1923837d112cd..dd8c059a53ef8 100644 --- a/velox/connectors/hive/iceberg/IcebergSplitReader.cpp +++ b/velox/connectors/hive/iceberg/IcebergSplitReader.cpp @@ -55,14 +55,14 @@ IcebergSplitReader::IcebergSplitReader( void IcebergSplitReader::prepareSplit( std::shared_ptr metadataFilter, dwio::common::RuntimeStatistics& runtimeStats) { - createReader(std::move(metadataFilter)); + auto rowType = createReader(); if (checkIfSplitIsEmpty(runtimeStats)) { VELOX_CHECK(emptySplit_); return; } - createRowReader(); + createRowReader(std::move(metadataFilter), std::move(rowType)); std::shared_ptr icebergSplit = std::dynamic_pointer_cast(hiveSplit_);