diff --git a/velox/connectors/hive/SplitReader.cpp b/velox/connectors/hive/SplitReader.cpp index 072a5f68d54a..eaafe82efafe 100644 --- a/velox/connectors/hive/SplitReader.cpp +++ b/velox/connectors/hive/SplitReader.cpp @@ -128,7 +128,7 @@ SplitReader::SplitReader( baseReaderOpts_(connectorQueryCtx->memoryPool()) {} void SplitReader::configureReaderOptions( - std::shared_ptr randomSkip) { + std::shared_ptr randomSkip) { hive::configureReaderOptions( baseReaderOpts_, hiveConfig_, @@ -141,6 +141,101 @@ void SplitReader::configureReaderOptions( void SplitReader::prepareSplit( std::shared_ptr metadataFilter, dwio::common::RuntimeStatistics& runtimeStats) { + createReader(); + + if (testEmptySplit(runtimeStats)) { + return; + } + + createRowReader(metadataFilter); +} + +uint64_t SplitReader::next(uint64_t size, VectorPtr& output) { + if (!baseReaderOpts_.randomSkip()) { + return baseRowReader_->next(size, output); + } + dwio::common::Mutation mutation; + mutation.randomSkip = baseReaderOpts_.randomSkip().get(); + return baseRowReader_->next(size, output, &mutation); +} + +void SplitReader::resetFilterCaches() { + if (baseRowReader_) { + baseRowReader_->resetFilterCaches(); + } +} + +bool SplitReader::emptySplit() const { + return emptySplit_; +} + +void SplitReader::resetSplit() { + hiveSplit_.reset(); +} + +int64_t SplitReader::estimatedRowSize() const { + if (!baseRowReader_) { + return DataSource::kUnknownRowSize; + } + + auto size = baseRowReader_->estimatedRowSize(); + if (size.has_value()) { + return size.value(); + } + return DataSource::kUnknownRowSize; +} + +void SplitReader::updateRuntimeStats( + dwio::common::RuntimeStatistics& stats) const { + if (baseRowReader_) { + baseRowReader_->updateRuntimeStats(stats); + } +} + +bool SplitReader::allPrefetchIssued() const { + return baseRowReader_ && baseRowReader_->allPrefetchIssued(); +} + +void SplitReader::setPartitionValue( + common::ScanSpec* spec, + const std::string& partitionKey, + const std::optional& value) const { + auto it = partitionKeys_->find(partitionKey); + VELOX_CHECK( + it != partitionKeys_->end(), + "ColumnHandle is missing for partition key {}", + partitionKey); + auto type = it->second->dataType(); + auto constant = VELOX_DYNAMIC_SCALAR_TYPE_DISPATCH_ALL( + newConstantFromString, + type->kind(), + type, + value, + 1, + connectorQueryCtx_->memoryPool()); + spec->setConstantValue(constant); +} + +std::string SplitReader::toString() const { + std::string partitionKeys; + std::for_each( + partitionKeys_->begin(), + partitionKeys_->end(), + [&](std::pair< + const std::string, + std::shared_ptr> + column) { partitionKeys += " " + column.second->toString(); }); + return fmt::format( + "SplitReader: hiveSplit_{} scanSpec_{} readerOutputType_{} partitionKeys_{} reader{} rowReader{}", + hiveSplit_->toString(), + scanSpec_->toString(), + readerOutputType_->toString(), + partitionKeys, + static_cast(baseReader_.get()), + static_cast(baseRowReader_.get())); +} + +void SplitReader::createReader() { VELOX_CHECK_NE( baseReaderOpts_.getFileFormat(), dwio::common::FileFormat::UNKNOWN); @@ -157,6 +252,7 @@ void SplitReader::prepareSplit( throw; } } + // Here we keep adding new entries to CacheTTLController when new fileHandles // are generated, if CacheTTLController was created. Creator of // CacheTTLController needs to make sure a size control strategy was available @@ -169,28 +265,10 @@ void SplitReader::prepareSplit( baseReader_ = dwio::common::getReaderFactory(baseReaderOpts_.getFileFormat()) ->createReader(std::move(baseFileInput), baseReaderOpts_); +} - // Note that this doesn't apply to Hudi tables. - emptySplit_ = false; - if (baseReader_->numberOfRows() == 0) { - emptySplit_ = true; - return; - } - - // Check filters and see if the whole split can be skipped. Note that this - // doesn't apply to Hudi tables. - if (!testFilters( - scanSpec_.get(), - baseReader_.get(), - hiveSplit_->filePath, - hiveSplit_->partitionKeys, - partitionKeys_)) { - emptySplit_ = true; - ++runtimeStats.skippedSplits; - runtimeStats.skippedSplitBytes += hiveSplit_->length; - return; - } - +void SplitReader::createRowReader( + std::shared_ptr metadataFilter) { auto& fileType = baseReader_->rowType(); auto columnTypes = adaptColumns(fileType, baseReaderOpts_.getFileSchema()); @@ -207,6 +285,31 @@ void SplitReader::prepareSplit( baseRowReader_ = baseReader_->createRowReader(baseRowReaderOpts_); } +bool SplitReader::testEmptySplit( + dwio::common::RuntimeStatistics& runtimeStats) { + emptySplit_ = false; + + // Note that this doesn't apply to Hudi tables. + if (!baseReader_ || baseReader_->numberOfRows() == 0) { + emptySplit_ = true; + } else { + // Check filters and see if the whole split can be skipped. Note that this + // doesn't apply to Hudi tables. + if (!testFilters( + scanSpec_.get(), + baseReader_.get(), + hiveSplit_->filePath, + hiveSplit_->partitionKeys, + partitionKeys_)) { + ++runtimeStats.skippedSplits; + runtimeStats.skippedSplitBytes += hiveSplit_->length; + emptySplit_ = true; + } + } + + return emptySplit_; +} + std::vector SplitReader::adaptColumns( const RowTypePtr& fileType, const std::shared_ptr& tableSchema) { @@ -280,91 +383,6 @@ std::vector SplitReader::adaptColumns( return columnTypes; } -uint64_t SplitReader::next(int64_t size, VectorPtr& output) { - if (!baseReaderOpts_.randomSkip()) { - return baseRowReader_->next(size, output); - } - dwio::common::Mutation mutation; - mutation.randomSkip = baseReaderOpts_.randomSkip().get(); - return baseRowReader_->next(size, output, &mutation); -} - -void SplitReader::resetFilterCaches() { - if (baseRowReader_) { - baseRowReader_->resetFilterCaches(); - } -} - -bool SplitReader::emptySplit() const { - return emptySplit_; -} - -void SplitReader::resetSplit() { - hiveSplit_.reset(); -} - -int64_t SplitReader::estimatedRowSize() const { - if (!baseRowReader_) { - return DataSource::kUnknownRowSize; - } - - auto size = baseRowReader_->estimatedRowSize(); - if (size.has_value()) { - return size.value(); - } - return DataSource::kUnknownRowSize; -} - -void SplitReader::updateRuntimeStats( - dwio::common::RuntimeStatistics& stats) const { - if (baseRowReader_) { - baseRowReader_->updateRuntimeStats(stats); - } -} - -bool SplitReader::allPrefetchIssued() const { - return baseRowReader_ && baseRowReader_->allPrefetchIssued(); -} - -void SplitReader::setPartitionValue( - common::ScanSpec* spec, - const std::string& partitionKey, - const std::optional& value) const { - auto it = partitionKeys_->find(partitionKey); - VELOX_CHECK( - it != partitionKeys_->end(), - "ColumnHandle is missing for partition key {}", - partitionKey); - auto type = it->second->dataType(); - auto constant = VELOX_DYNAMIC_SCALAR_TYPE_DISPATCH_ALL( - newConstantFromString, - type->kind(), - type, - value, - 1, - connectorQueryCtx_->memoryPool()); - spec->setConstantValue(constant); -} - -std::string SplitReader::toString() const { - std::string partitionKeys; - std::for_each( - partitionKeys_->begin(), - partitionKeys_->end(), - [&](std::pair< - const std::string, - std::shared_ptr> - column) { partitionKeys += " " + column.second->toString(); }); - return fmt::format( - "SplitReader: hiveSplit_{} scanSpec_{} readerOutputType_{} partitionKeys_{} reader{} rowReader{}", - hiveSplit_->toString(), - scanSpec_->toString(), - readerOutputType_->toString(), - partitionKeys, - static_cast(baseReader_.get()), - static_cast(baseRowReader_.get())); -} - } // namespace facebook::velox::connector::hive template <> diff --git a/velox/connectors/hive/SplitReader.h b/velox/connectors/hive/SplitReader.h index add987a0aef6..fcbda9c50660 100644 --- a/velox/connectors/hive/SplitReader.h +++ b/velox/connectors/hive/SplitReader.h @@ -16,6 +16,7 @@ #pragma once +#include "velox/common/base/RandomUtil.h" #include "velox/connectors/hive/FileHandle.h" #include "velox/dwio/common/Options.h" @@ -84,7 +85,7 @@ class SplitReader { virtual ~SplitReader() = default; void configureReaderOptions( - std::shared_ptr randomSkip); + std::shared_ptr randomSkip); /// This function is used by different table formats like Iceberg and Hudi to /// do additional preparations before reading the split, e.g. Open delete @@ -93,7 +94,7 @@ class SplitReader { std::shared_ptr metadataFilter, dwio::common::RuntimeStatistics& runtimeStats); - virtual uint64_t next(int64_t size, VectorPtr& output); + virtual uint64_t next(uint64_t size, VectorPtr& output); void resetFilterCaches(); @@ -110,6 +111,12 @@ class SplitReader { std::string toString() const; protected: + void createReader(); + + bool testEmptySplit(dwio::common::RuntimeStatistics& runtimeStats); + + void createRowReader(std::shared_ptr metadataFilter); + // Different table formats may have different meatadata columns. This function // will be used to update the scanSpec for these columns. virtual std::vector adaptColumns( @@ -137,8 +144,6 @@ class SplitReader { std::shared_ptr ioStats_; dwio::common::ReaderOptions baseReaderOpts_; dwio::common::RowReaderOptions baseRowReaderOpts_; - - private: bool emptySplit_; }; diff --git a/velox/connectors/hive/iceberg/IcebergSplitReader.cpp b/velox/connectors/hive/iceberg/IcebergSplitReader.cpp index bde98bf9a445..268432c6705e 100644 --- a/velox/connectors/hive/iceberg/IcebergSplitReader.cpp +++ b/velox/connectors/hive/iceberg/IcebergSplitReader.cpp @@ -27,9 +27,10 @@ using namespace facebook::velox::dwio::common; namespace facebook::velox::connector::hive::iceberg { IcebergSplitReader::IcebergSplitReader( - std::shared_ptr hiveSplit, - std::shared_ptr hiveTableHandle, - std::shared_ptr scanSpec, + const std::shared_ptr& + hiveSplit, + const std::shared_ptr& hiveTableHandle, + const std::shared_ptr scanSpec, const RowTypePtr readerOutputType, std::unordered_map>* partitionKeys, @@ -48,39 +49,48 @@ IcebergSplitReader::IcebergSplitReader( executor, connectorQueryCtx, hiveConfig, - ioStats) {} + ioStats), + baseReadOffset_(0), + splitOffset_(0) {} void IcebergSplitReader::prepareSplit( std::shared_ptr metadataFilter, dwio::common::RuntimeStatistics& runtimeStats) { - SplitReader::prepareSplit(metadataFilter, runtimeStats); - baseReadOffset_ = 0; - positionalDeleteFileReaders_.clear(); - splitOffset_ = baseRowReader_->nextRowNumber(); + createReader(); + + if (testEmptySplit(runtimeStats)) { + return; + } + + createRowReader(metadataFilter); - // TODO: Deserialize the std::vector deleteFiles. For now - // we assume it's already deserialized. std::shared_ptr icebergSplit = std::dynamic_pointer_cast(hiveSplit_); + baseReadOffset_ = 0; + splitOffset_ = baseRowReader_->nextRowNumber(); + positionalDeleteFileReaders_.clear(); const auto& deleteFiles = icebergSplit->deleteFiles; for (const auto& deleteFile : deleteFiles) { - positionalDeleteFileReaders_.push_back( - std::make_unique( - deleteFile, - hiveSplit_->filePath, - fileHandleFactory_, - connectorQueryCtx_, - executor_, - hiveConfig_, - ioStats_, - runtimeStats, - splitOffset_, - hiveSplit_->connectorId)); + if (deleteFile.content == FileContent::kPositionalDeletes && + deleteFile.recordCount > 0) { + positionalDeleteFileReaders_.push_back( + std::make_unique( + deleteFile, + hiveSplit_->filePath, + fileHandleFactory_, + connectorQueryCtx_, + executor_, + hiveConfig_, + ioStats_, + runtimeStats, + splitOffset_, + hiveSplit_->connectorId)); + } } } -uint64_t IcebergSplitReader::next(int64_t size, VectorPtr& output) { +uint64_t IcebergSplitReader::next(uint64_t size, VectorPtr& output) { Mutation mutation; mutation.randomSkip = baseReaderOpts_.randomSkip().get(); mutation.deletedRows = nullptr; diff --git a/velox/connectors/hive/iceberg/IcebergSplitReader.h b/velox/connectors/hive/iceberg/IcebergSplitReader.h index 5c5552369735..7957075e133d 100644 --- a/velox/connectors/hive/iceberg/IcebergSplitReader.h +++ b/velox/connectors/hive/iceberg/IcebergSplitReader.h @@ -27,9 +27,10 @@ class IcebergDeleteFile; class IcebergSplitReader : public SplitReader { public: IcebergSplitReader( - std::shared_ptr hiveSplit, - std::shared_ptr hiveTableHandle, - std::shared_ptr scanSpec, + const std::shared_ptr& + hiveSplit, + const std::shared_ptr& hiveTableHandle, + const std::shared_ptr scanSpec, const RowTypePtr readerOutputType, std::unordered_map>* partitionKeys, @@ -45,7 +46,7 @@ class IcebergSplitReader : public SplitReader { std::shared_ptr metadataFilter, dwio::common::RuntimeStatistics& runtimeStats) override; - uint64_t next(int64_t size, VectorPtr& output) override; + uint64_t next(uint64_t size, VectorPtr& output) override; private: // The read offset to the beginning of the split in number of rows for the