From 4d0e31c2b86ebf02cb23ece92879a3eeed2774c0 Mon Sep 17 00:00:00 2001 From: yingsu00 Date: Mon, 9 Oct 2023 15:19:19 -0700 Subject: [PATCH] Move connectorQueryCtx_ and other required fields to SplitReader The upcoming IcebergSplitReader will need to use fileHandleFactory_, executor_, connectorQueryCtx_, etc because it needs to create another HiveDataSource to read the delete files. This PR copies these required fields to SplitReader. Moreover, since the SplitReader already owns the baseReader_, the creation and configuration of ReaderOptions was also moved to SplitReader in a single function configureReaderOptions(). Previously the configuration of ReaderOptions was scattered at multiple locations in HiveDataSource. --- velox/connectors/hive/HiveConnector.cpp | 15 +- velox/connectors/hive/HiveDataSource.cpp | 131 +++------------ velox/connectors/hive/HiveDataSource.h | 22 +-- velox/connectors/hive/SplitReader.cpp | 193 +++++++++++++++++++---- velox/connectors/hive/SplitReader.h | 54 +++++-- 5 files changed, 230 insertions(+), 185 deletions(-) diff --git a/velox/connectors/hive/HiveConnector.cpp b/velox/connectors/hive/HiveConnector.cpp index 522ec99058f4c..184d40cd981ac 100644 --- a/velox/connectors/hive/HiveConnector.cpp +++ b/velox/connectors/hive/HiveConnector.cpp @@ -75,27 +75,14 @@ std::unique_ptr HiveConnector::createDataSource( std::string, std::shared_ptr>& columnHandles, ConnectorQueryCtx* connectorQueryCtx) { - dwio::common::ReaderOptions options(connectorQueryCtx->memoryPool()); - options.setMaxCoalesceBytes( - HiveConfig::maxCoalescedBytes(connectorQueryCtx->config())); - options.setMaxCoalesceDistance( - HiveConfig::maxCoalescedDistanceBytes(connectorQueryCtx->config())); - options.setFileColumnNamesReadAsLowerCase( - HiveConfig::isFileColumnNamesReadAsLowerCase( - connectorQueryCtx->config())); - options.setUseColumnNamesForColumnMapping( - HiveConfig::isOrcUseColumnNames(connectorQueryCtx->config())); return std::make_unique( outputType, tableHandle, columnHandles, &fileHandleFactory_, - connectorQueryCtx->expressionEvaluator(), - connectorQueryCtx->cache(), - connectorQueryCtx->scanId(), executor_, - options); + connectorQueryCtx); } std::unique_ptr HiveConnector::createDataSink( diff --git a/velox/connectors/hive/HiveDataSource.cpp b/velox/connectors/hive/HiveDataSource.cpp index ea9ad39f5d984..16bd3a503eded 100644 --- a/velox/connectors/hive/HiveDataSource.cpp +++ b/velox/connectors/hive/HiveDataSource.cpp @@ -16,14 +16,14 @@ #include "velox/connectors/hive/HiveDataSource.h" -#include -#include - -#include "velox/dwio/common/CachedBufferedInput.h" +#include "velox/connectors/hive/HiveConfig.h" #include "velox/dwio/common/ReaderFactory.h" #include "velox/expression/ExprToSubfieldFilter.h" #include "velox/expression/FieldReference.h" +#include +#include + namespace facebook::velox::connector::hive { class HiveTableHandle; @@ -357,18 +357,13 @@ HiveDataSource::HiveDataSource( std::string, std::shared_ptr>& columnHandles, FileHandleFactory* fileHandleFactory, - core::ExpressionEvaluator* expressionEvaluator, - cache::AsyncDataCache* cache, - const std::string& scanId, folly::Executor* executor, - const dwio::common::ReaderOptions& options) - : fileHandleFactory_(fileHandleFactory), - readerOpts_(options), - pool_(&options.getMemoryPool()), + ConnectorQueryCtx* connectorQueryCtx) + : pool_(connectorQueryCtx->memoryPool()), outputType_(outputType), - expressionEvaluator_(expressionEvaluator), - cache_(cache), - scanId_(scanId), + expressionEvaluator_(connectorQueryCtx->expressionEvaluator()), + fileHandleFactory_(fileHandleFactory), + connectorQueryCtx_(connectorQueryCtx), executor_(executor) { // Column handled keyed on the column alias, the name used in the query. for (const auto& [canonicalizedName, columnHandle] : columnHandles) { @@ -409,7 +404,8 @@ HiveDataSource::HiveDataSource( VELOX_CHECK( hiveTableHandle_ != nullptr, "TableHandle must be an instance of HiveTableHandle"); - if (readerOpts_.isFileColumnNamesReadAsLowerCase()) { + if (HiveConfig::isFileColumnNamesReadAsLowerCase( + connectorQueryCtx->config())) { checkColumnNameLowerCase(outputType_); checkColumnNameLowerCase(hiveTableHandle_->subfieldFilters()); checkColumnNameLowerCase(hiveTableHandle_->remainingFilter()); @@ -473,62 +469,20 @@ HiveDataSource::HiveDataSource( *scanSpec_, *remainingFilter, expressionEvaluator_); } - readerOpts_.setFileSchema(hiveTableHandle_->dataColumns()); ioStats_ = std::make_shared(); } -inline uint8_t parseDelimiter(const std::string& delim) { - for (char const& ch : delim) { - if (!std::isdigit(ch)) { - return delim[0]; - } - } - return stoi(delim); -} - -void HiveDataSource::parseSerdeParameters( - const std::unordered_map& serdeParameters) { - auto fieldIt = serdeParameters.find(dwio::common::SerDeOptions::kFieldDelim); - if (fieldIt == serdeParameters.end()) { - fieldIt = serdeParameters.find("serialization.format"); - } - auto collectionIt = - serdeParameters.find(dwio::common::SerDeOptions::kCollectionDelim); - if (collectionIt == serdeParameters.end()) { - // For collection delimiter, Hive 1.x, 2.x uses "colelction.delim", but - // Hive 3.x uses "collection.delim". - // See: https://issues.apache.org/jira/browse/HIVE-16922) - collectionIt = serdeParameters.find("colelction.delim"); - } - auto mapKeyIt = - serdeParameters.find(dwio::common::SerDeOptions::kMapKeyDelim); - - if (fieldIt == serdeParameters.end() && - collectionIt == serdeParameters.end() && - mapKeyIt == serdeParameters.end()) { - return; - } - - uint8_t fieldDelim = '\1'; - uint8_t collectionDelim = '\2'; - uint8_t mapKeyDelim = '\3'; - if (fieldIt != serdeParameters.end()) { - fieldDelim = parseDelimiter(fieldIt->second); - } - if (collectionIt != serdeParameters.end()) { - collectionDelim = parseDelimiter(collectionIt->second); - } - if (mapKeyIt != serdeParameters.end()) { - mapKeyDelim = parseDelimiter(mapKeyIt->second); - } - dwio::common::SerDeOptions serDeOptions( - fieldDelim, collectionDelim, mapKeyDelim); - readerOpts_.setSerDeOptions(serDeOptions); -} - std::unique_ptr HiveDataSource::createSplitReader() { return SplitReader::create( - split_, readerOutputType_, partitionKeys_, scanSpec_, pool_); + split_, + hiveTableHandle_, + scanSpec_, + readerOutputType_, + &partitionKeys_, + fileHandleFactory_, + executor_, + connectorQueryCtx_, + ioStats_); } void HiveDataSource::addSplit(std::shared_ptr split) { @@ -540,30 +494,12 @@ void HiveDataSource::addSplit(std::shared_ptr split) { VLOG(1) << "Adding split " << split_->toString(); - if (readerOpts_.getFileFormat() != dwio::common::FileFormat::UNKNOWN) { - VELOX_CHECK( - readerOpts_.getFileFormat() == split_->fileFormat, - "HiveDataSource received splits of different formats: {} and {}", - toString(readerOpts_.getFileFormat()), - toString(split_->fileFormat)); - } else { - parseSerdeParameters(split_->serdeParameters); - readerOpts_.setFileFormat(split_->fileFormat); - } - - auto fileHandle = fileHandleFactory_->generate(split_->filePath).second; - auto input = createBufferedInput(*fileHandle, readerOpts_); - if (splitReader_) { splitReader_.reset(); } + splitReader_ = createSplitReader(); - splitReader_->prepareSplit( - hiveTableHandle_, - readerOpts_, - std::move(input), - metadataFilter_, - runtimeStats_); + splitReader_->prepareSplit(metadataFilter_, runtimeStats_); } std::optional HiveDataSource::next( @@ -787,29 +723,6 @@ std::shared_ptr HiveDataSource::makeScanSpec( return spec; } -std::unique_ptr -HiveDataSource::createBufferedInput( - const FileHandle& fileHandle, - const dwio::common::ReaderOptions& readerOpts) { - if (cache_) { - return std::make_unique( - fileHandle.file, - dwio::common::MetricsLog::voidLog(), - fileHandle.uuid.id(), - cache_, - Connector::getTracker(scanId_, readerOpts.loadQuantum()), - fileHandle.groupId.id(), - ioStats_, - executor_, - readerOpts); - } - return std::make_unique( - fileHandle.file, - readerOpts.getMemoryPool(), - dwio::common::MetricsLog::voidLog(), - ioStats_.get()); -} - vector_size_t HiveDataSource::evaluateRemainingFilter(RowVectorPtr& rowVector) { filterRows_.resize(output_->size()); diff --git a/velox/connectors/hive/HiveDataSource.h b/velox/connectors/hive/HiveDataSource.h index 03d86a1371087..2d54db3fda64a 100644 --- a/velox/connectors/hive/HiveDataSource.h +++ b/velox/connectors/hive/HiveDataSource.h @@ -38,11 +38,8 @@ class HiveDataSource : public DataSource { std::string, std::shared_ptr>& columnHandles, FileHandleFactory* fileHandleFactory, - core::ExpressionEvaluator* expressionEvaluator, - cache::AsyncDataCache* cache, - const std::string& scanId, folly::Executor* executor, - const dwio::common::ReaderOptions& options); + ConnectorQueryCtx* connectorQueryCtx); void addSplit(std::shared_ptr split) override; @@ -95,15 +92,9 @@ class HiveDataSource : public DataSource { protected: virtual std::unique_ptr createSplitReader(); - std::unique_ptr createBufferedInput( - const FileHandle&, - const dwio::common::ReaderOptions&); - std::shared_ptr split_; - FileHandleFactory* fileHandleFactory_; - dwio::common::ReaderOptions readerOpts_; - std::shared_ptr scanSpec_; memory::MemoryPool* pool_; + std::shared_ptr scanSpec_; VectorPtr output_; std::unique_ptr splitReader_; @@ -128,9 +119,6 @@ class HiveDataSource : public DataSource { // hold adaptation. void resetSplit(); - void parseSerdeParameters( - const std::unordered_map& serdeParameters); - const RowVectorPtr& getEmptyOutput() { if (!emptyOutput_) { emptyOutput_ = RowVector::createEmpty(outputType_, pool_); @@ -140,7 +128,7 @@ class HiveDataSource : public DataSource { std::shared_ptr hiveTableHandle_; - // The row type for the data source output, not including filter only columns + // The row type for the data source output, not including filter-only columns const RowTypePtr outputType_; std::shared_ptr ioStats_; std::shared_ptr metadataFilter_; @@ -155,8 +143,8 @@ class HiveDataSource : public DataSource { SelectivityVector filterRows_; exec::FilterEvalCtx filterEvalCtx_; - cache::AsyncDataCache* const cache_{nullptr}; - const std::string& scanId_; + FileHandleFactory* fileHandleFactory_; + const ConnectorQueryCtx* const connectorQueryCtx_; folly::Executor* executor_; }; diff --git a/velox/connectors/hive/SplitReader.cpp b/velox/connectors/hive/SplitReader.cpp index a42b64e37141b..f8e5a8f8cffe7 100644 --- a/velox/connectors/hive/SplitReader.cpp +++ b/velox/connectors/hive/SplitReader.cpp @@ -16,10 +16,19 @@ #include "velox/connectors/hive/SplitReader.h" +#include "velox/connectors/hive/HiveConfig.h" #include "velox/connectors/hive/HiveConnectorSplit.h" #include "velox/connectors/hive/TableHandle.h" +#include "velox/dwio/common/CachedBufferedInput.h" +#include "velox/dwio/common/Options.h" #include "velox/dwio/common/ReaderFactory.h" +#include +#include + +#include +#include + namespace facebook::velox::connector::hive { namespace { @@ -57,7 +66,7 @@ bool testFilters( const std::string& filePath, const std::unordered_map>& partitionKey, - std::unordered_map>& + std::unordered_map>* partitionKeysHandle) { auto totalRows = reader->numberOfRows(); const auto& fileTypeWithId = reader->typeWithId(); @@ -70,7 +79,7 @@ bool testFilters( auto iter = partitionKey.find(name); if (iter != partitionKey.end() && iter->second.has_value()) { return applyPartitionFilter( - partitionKeysHandle[name]->dataType()->kind(), + (*partitionKeysHandle)[name]->dataType()->kind(), iter->second.value(), child->filter()); } @@ -116,41 +125,72 @@ velox::variant convertFromString(const std::optional& value) { return velox::variant(ToKind); } +inline uint8_t parseDelimiter(const std::string& delim) { + for (char const& ch : delim) { + if (!std::isdigit(ch)) { + return delim[0]; + } + } + return stoi(delim); +} + } // namespace std::unique_ptr SplitReader::create( std::shared_ptr hiveSplit, + std::shared_ptr hiveTableHandle, + std::shared_ptr scanSpec, const RowTypePtr readerOutputType, - std::unordered_map>& + std::unordered_map>* partitionKeys, - std::shared_ptr scanSpec, - memory::MemoryPool* pool) { - // Create the SplitReader based on hiveSplit->customSplitInfo["table_format"] + FileHandleFactory* fileHandleFactory, + folly::Executor* executor, + const ConnectorQueryCtx* connectorQueryCtx, + std::shared_ptr ioStats) { return std::make_unique( - hiveSplit, readerOutputType, partitionKeys, scanSpec, pool); + hiveSplit, + hiveTableHandle, + scanSpec, + readerOutputType, + partitionKeys, + fileHandleFactory, + executor, + connectorQueryCtx, + ioStats); } SplitReader::SplitReader( std::shared_ptr hiveSplit, + std::shared_ptr hiveTableHandle, + std::shared_ptr scanSpec, const RowTypePtr readerOutputType, - std::unordered_map>& + std::unordered_map>* partitionKeys, - std::shared_ptr scanSpec, - memory::MemoryPool* pool) - : hiveSplit_(std::move(hiveSplit)), + FileHandleFactory* fileHandleFactory, + folly::Executor* executor, + const ConnectorQueryCtx* connectorQueryCtx, + std::shared_ptr ioStats) + : hiveSplit_(hiveSplit), + hiveTableHandle_(hiveTableHandle), + scanSpec_(scanSpec), readerOutputType_(readerOutputType), partitionKeys_(partitionKeys), - scanSpec_(std::move(scanSpec)), - pool_(pool) {} + fileHandleFactory_(fileHandleFactory), + executor_(executor), + connectorQueryCtx_(connectorQueryCtx), + ioStats_(ioStats), + baseReaderOpts_(connectorQueryCtx->memoryPool()) {} void SplitReader::prepareSplit( - const std::shared_ptr& hiveTableHandle, - const dwio::common::ReaderOptions& readerOptions, - std::unique_ptr baseFileInput, std::shared_ptr metadataFilter, dwio::common::RuntimeStatistics& runtimeStats) { - baseReader_ = dwio::common::getReaderFactory(readerOptions.getFileFormat()) - ->createReader(std::move(baseFileInput), readerOptions); + configureReaderOptions(); + + auto fileHandle = fileHandleFactory_->generate(hiveSplit_->filePath).second; + auto baseFileInput = createBufferedInput(*fileHandle, baseReaderOpts_); + + baseReader_ = dwio::common::getReaderFactory(baseReaderOpts_.getFileFormat()) + ->createReader(std::move(baseFileInput), baseReaderOpts_); // Note that this doesn't apply to Hudi tables. emptySplit_ = false; @@ -174,23 +214,23 @@ void SplitReader::prepareSplit( } auto& fileType = baseReader_->rowType(); - auto columnTypes = adaptColumns(fileType, readerOptions.getFileSchema()); + auto columnTypes = adaptColumns(fileType, baseReaderOpts_.getFileSchema()); - auto skipRowsIt = hiveTableHandle->tableParameters().find( + auto skipRowsIt = hiveTableHandle_->tableParameters().find( dwio::common::TableParameter::kSkipHeaderLineCount); - if (skipRowsIt != hiveTableHandle->tableParameters().end()) { - rowReaderOpts_.setSkipRows(folly::to(skipRowsIt->second)); + if (skipRowsIt != hiveTableHandle_->tableParameters().end()) { + baseRowReaderOpts_.setSkipRows(folly::to(skipRowsIt->second)); } - rowReaderOpts_.setScanSpec(scanSpec_); - rowReaderOpts_.setMetadataFilter(metadataFilter); + baseRowReaderOpts_.setScanSpec(scanSpec_); + baseRowReaderOpts_.setMetadataFilter(metadataFilter); configureRowReaderOptions( - rowReaderOpts_, + baseRowReaderOpts_, ROW(std::vector(fileType->names()), std::move(columnTypes))); // NOTE: we firstly reset the finished 'baseRowReader_' of previous split // before setting up for the next one to avoid doubling the peak memory usage. baseRowReader_.reset(); - baseRowReader_ = baseReader_->createRowReader(rowReaderOpts_); + baseRowReader_ = baseReader_->createRowReader(baseRowReaderOpts_); } std::vector SplitReader::adaptColumns( @@ -287,22 +327,24 @@ void SplitReader::setConstantValue( common::ScanSpec* spec, const TypePtr& type, const velox::variant& value) const { - spec->setConstantValue(BaseVector::createConstant(type, value, 1, pool_)); + spec->setConstantValue(BaseVector::createConstant( + type, value, 1, connectorQueryCtx_->memoryPool())); } void SplitReader::setNullConstantValue( common::ScanSpec* spec, const TypePtr& type) const { - spec->setConstantValue(BaseVector::createNullConstant(type, 1, pool_)); + spec->setConstantValue(BaseVector::createNullConstant( + type, 1, connectorQueryCtx_->memoryPool())); } void SplitReader::setPartitionValue( common::ScanSpec* spec, const std::string& partitionKey, const std::optional& value) const { - auto it = partitionKeys_.find(partitionKey); + auto it = partitionKeys_->find(partitionKey); VELOX_CHECK( - it != partitionKeys_.end(), + it != partitionKeys_->end(), "ColumnHandle is missing for partition key {}", partitionKey); auto constValue = VELOX_DYNAMIC_SCALAR_TYPE_DISPATCH( @@ -310,6 +352,30 @@ void SplitReader::setPartitionValue( setConstantValue(spec, it->second->dataType(), constValue); } +void SplitReader::configureReaderOptions() { + baseReaderOpts_.setMaxCoalesceBytes( + HiveConfig::maxCoalescedBytes(connectorQueryCtx_->config())); + baseReaderOpts_.setMaxCoalesceDistance( + HiveConfig::maxCoalescedDistanceBytes(connectorQueryCtx_->config())); + baseReaderOpts_.setFileColumnNamesReadAsLowerCase( + HiveConfig::isFileColumnNamesReadAsLowerCase( + connectorQueryCtx_->config())); + baseReaderOpts_.setUseColumnNamesForColumnMapping( + HiveConfig::isOrcUseColumnNames(connectorQueryCtx_->config())); + baseReaderOpts_.setFileSchema(hiveTableHandle_->dataColumns()); + + if (baseReaderOpts_.getFileFormat() != dwio::common::FileFormat::UNKNOWN) { + VELOX_CHECK( + baseReaderOpts_.getFileFormat() == hiveSplit_->fileFormat, + "HiveDataSource received splits of different formats: {} and {}", + dwio::common::toString(baseReaderOpts_.getFileFormat()), + dwio::common::toString(hiveSplit_->fileFormat)); + } else { + parseSerdeParameters(hiveSplit_->serdeParameters); + baseReaderOpts_.setFileFormat(hiveSplit_->fileFormat); + } +} + void SplitReader::configureRowReaderOptions( dwio::common::RowReaderOptions& options, const RowTypePtr& rowType) { @@ -329,11 +395,74 @@ void SplitReader::configureRowReaderOptions( options.select(cs).range(hiveSplit_->start, hiveSplit_->length); } +void SplitReader::parseSerdeParameters( + const std::unordered_map& serdeParameters) { + auto fieldIt = serdeParameters.find(dwio::common::SerDeOptions::kFieldDelim); + if (fieldIt == serdeParameters.end()) { + fieldIt = serdeParameters.find("serialization.format"); + } + auto collectionIt = + serdeParameters.find(dwio::common::SerDeOptions::kCollectionDelim); + if (collectionIt == serdeParameters.end()) { + // For collection delimiter, Hive 1.x, 2.x uses "colelction.delim", but + // Hive 3.x uses "collection.delim". + // See: https://issues.apache.org/jira/browse/HIVE-16922) + collectionIt = serdeParameters.find("colelction.delim"); + } + auto mapKeyIt = + serdeParameters.find(dwio::common::SerDeOptions::kMapKeyDelim); + + if (fieldIt == serdeParameters.end() && + collectionIt == serdeParameters.end() && + mapKeyIt == serdeParameters.end()) { + return; + } + + uint8_t fieldDelim = '\1'; + uint8_t collectionDelim = '\2'; + uint8_t mapKeyDelim = '\3'; + if (fieldIt != serdeParameters.end()) { + fieldDelim = parseDelimiter(fieldIt->second); + } + if (collectionIt != serdeParameters.end()) { + collectionDelim = parseDelimiter(collectionIt->second); + } + if (mapKeyIt != serdeParameters.end()) { + mapKeyDelim = parseDelimiter(mapKeyIt->second); + } + dwio::common::SerDeOptions serDeOptions( + fieldDelim, collectionDelim, mapKeyDelim); + baseReaderOpts_.setSerDeOptions(serDeOptions); +} + +std::unique_ptr SplitReader::createBufferedInput( + const FileHandle& fileHandle, + const dwio::common::ReaderOptions& readerOpts) { + if (connectorQueryCtx_->cache()) { + return std::make_unique( + fileHandle.file, + dwio::common::MetricsLog::voidLog(), + fileHandle.uuid.id(), + connectorQueryCtx_->cache(), + Connector::getTracker( + connectorQueryCtx_->scanId(), readerOpts.loadQuantum()), + fileHandle.groupId.id(), + ioStats_, + executor_, + readerOpts); + } + return std::make_unique( + fileHandle.file, + readerOpts.getMemoryPool(), + dwio::common::MetricsLog::voidLog(), + ioStats_.get()); +} + std::string SplitReader::toString() const { std::string partitionKeys; std::for_each( - partitionKeys_.begin(), - partitionKeys_.end(), + partitionKeys_->begin(), + partitionKeys_->end(), [&](std::pair< const std::string, std::shared_ptr> diff --git a/velox/connectors/hive/SplitReader.h b/velox/connectors/hive/SplitReader.h index 0cac6c4f9d6a3..52417c6425711 100644 --- a/velox/connectors/hive/SplitReader.h +++ b/velox/connectors/hive/SplitReader.h @@ -16,9 +16,18 @@ #pragma once +#include "velox/connectors/hive/FileHandle.h" #include "velox/dwio/common/Reader.h" #include "velox/type/Type.h" +namespace facebook::velox::cache { +class AsyncDataCache; +} + +namespace facebook::velox::connector { +class ConnectorQueryCtx; +} + namespace facebook::velox::dwio::common { class BufferedInput; } @@ -36,19 +45,27 @@ class SplitReader { public: static std::unique_ptr create( std::shared_ptr hiveSplit, + std::shared_ptr hiveTableHandle, + std::shared_ptr scanSpec, const RowTypePtr readerOutputType, - std::unordered_map>& + std::unordered_map>* partitionKeys, - std::shared_ptr scanSpec, - memory::MemoryPool* pool); + FileHandleFactory* fileHandleFactory, + folly::Executor* executor, + const ConnectorQueryCtx* connectorQueryCtx, + std::shared_ptr ioStats); SplitReader( std::shared_ptr hiveSplit, + std::shared_ptr hiveTableHandle, + std::shared_ptr scanSpec, const RowTypePtr readerOutputType, - std::unordered_map>& + std::unordered_map>* partitionKeys, - std::shared_ptr scanSpec, - memory::MemoryPool* pool); + FileHandleFactory* fileHandleFactory, + folly::Executor* executor, + const ConnectorQueryCtx* connectorQueryCtx, + std::shared_ptr ioStats); virtual ~SplitReader() = default; @@ -56,9 +73,6 @@ class SplitReader { /// do additional preparations before reading the split, e.g. Open delete /// files or log files, and add column adapatations for metadata columns virtual void prepareSplit( - const std::shared_ptr& hiveTableHandle, - const dwio::common::ReaderOptions& readerOptions, - std::unique_ptr baseFileInput, std::shared_ptr metadataFilter, dwio::common::RuntimeStatistics& runtimeStats); @@ -100,20 +114,34 @@ class SplitReader { const std::optional& value) const; std::shared_ptr hiveSplit_; + std::shared_ptr hiveTableHandle_; + std::shared_ptr scanSpec_; RowTypePtr readerOutputType_; - std::unordered_map>& + std::unordered_map>* partitionKeys_; - std::shared_ptr scanSpec_; - memory::MemoryPool* pool_; std::unique_ptr baseReader_; - dwio::common::RowReaderOptions rowReaderOpts_; std::unique_ptr baseRowReader_; + FileHandleFactory* const fileHandleFactory_; + folly::Executor* const executor_; + const ConnectorQueryCtx* const connectorQueryCtx_; + std::shared_ptr ioStats_; private: + void configureReaderOptions(); + void configureRowReaderOptions( dwio::common::RowReaderOptions& options, const RowTypePtr& rowType); + void parseSerdeParameters( + const std::unordered_map& serdeParameters); + + std::unique_ptr createBufferedInput( + const FileHandle& fileHandle, + const dwio::common::ReaderOptions& readerOpts); + + dwio::common::ReaderOptions baseReaderOpts_; + dwio::common::RowReaderOptions baseRowReaderOpts_; bool emptySplit_; };