From f5bcfb9c66c391972b75b4a6a8c5b621b8f298df Mon Sep 17 00:00:00 2001 From: yingsu00 Date: Thu, 9 Nov 2023 19:26:55 -0800 Subject: [PATCH 1/6] Update Arrow namespaces in Parquet reader Recently Arrow lib was partially moved into Parquet writer. Some of the Parquet reader files still depends on some Arrow classes which will be removed in the near future. The namespaces are now moved from arrow:: to ::arrow because it is still depending on the third_party arrow_ep installation and this commit fixes it to avoid build failures. --- velox/dwio/parquet/reader/PageReader.cpp | 16 ++++++++-------- velox/dwio/parquet/reader/PageReader.h | 4 ++-- velox/dwio/parquet/reader/ParquetTypeWithId.h | 3 ++- 3 files changed, 12 insertions(+), 11 deletions(-) diff --git a/velox/dwio/parquet/reader/PageReader.cpp b/velox/dwio/parquet/reader/PageReader.cpp index 11d86cd96cfb..ddabb8fa92fd 100644 --- a/velox/dwio/parquet/reader/PageReader.cpp +++ b/velox/dwio/parquet/reader/PageReader.cpp @@ -243,10 +243,10 @@ void PageReader::prepareDataPageV1(const PageHeader& pageHeader, int64_t row) { auto pageEnd = pageData_ + pageHeader.uncompressed_page_size; if (maxRepeat_ > 0) { uint32_t repeatLength = readField(pageData_); - repeatDecoder_ = std::make_unique( + repeatDecoder_ = std::make_unique<::arrow::util::RleDecoder>( reinterpret_cast(pageData_), repeatLength, - arrow::bit_util::NumRequiredBits(maxRepeat_)); + ::arrow::bit_util::NumRequiredBits(maxRepeat_)); pageData_ += repeatLength; } @@ -257,12 +257,12 @@ void PageReader::prepareDataPageV1(const PageHeader& pageHeader, int64_t row) { defineDecoder_ = std::make_unique( pageData_, pageData_ + defineLength, - arrow::bit_util::NumRequiredBits(maxDefine_)); + ::arrow::bit_util::NumRequiredBits(maxDefine_)); } - wideDefineDecoder_ = std::make_unique( + wideDefineDecoder_ = std::make_unique<::arrow::util::RleDecoder>( reinterpret_cast(pageData_), defineLength, - arrow::bit_util::NumRequiredBits(maxDefine_)); + ::arrow::bit_util::NumRequiredBits(maxDefine_)); pageData_ += defineLength; } encodedDataSize_ = pageEnd - pageData_; @@ -301,17 +301,17 @@ void PageReader::prepareDataPageV2(const PageHeader& pageHeader, int64_t row) { pageData_ = readBytes(bytes, pageBuffer_); if (repeatLength) { - repeatDecoder_ = std::make_unique( + repeatDecoder_ = std::make_unique<::arrow::util::RleDecoder>( reinterpret_cast(pageData_), repeatLength, - arrow::bit_util::NumRequiredBits(maxRepeat_)); + ::arrow::bit_util::NumRequiredBits(maxRepeat_)); } if (maxDefine_ > 0) { defineDecoder_ = std::make_unique( pageData_ + repeatLength, pageData_ + repeatLength + defineLength, - arrow::bit_util::NumRequiredBits(maxDefine_)); + ::arrow::bit_util::NumRequiredBits(maxDefine_)); } auto levelsSize = repeatLength + defineLength; pageData_ += levelsSize; diff --git a/velox/dwio/parquet/reader/PageReader.h b/velox/dwio/parquet/reader/PageReader.h index c079b6a64c38..0fb4d3dc35d1 100644 --- a/velox/dwio/parquet/reader/PageReader.h +++ b/velox/dwio/parquet/reader/PageReader.h @@ -356,8 +356,8 @@ class PageReader { // Decoder for single bit definition levels. the arrow decoders are used for // multibit levels pending fixing RleBpDecoder for the case. std::unique_ptr defineDecoder_; - std::unique_ptr repeatDecoder_; - std::unique_ptr wideDefineDecoder_; + std::unique_ptr<::arrow::util::RleDecoder> repeatDecoder_; + std::unique_ptr<::arrow::util::RleDecoder> wideDefineDecoder_; // True for a leaf column for which repdefs are loaded for the whole column // chunk. This is typically the leaftmost leaf of a list. Other leaves under diff --git a/velox/dwio/parquet/reader/ParquetTypeWithId.h b/velox/dwio/parquet/reader/ParquetTypeWithId.h index 08b1449a8f19..eca23f4221a8 100644 --- a/velox/dwio/parquet/reader/ParquetTypeWithId.h +++ b/velox/dwio/parquet/reader/ParquetTypeWithId.h @@ -16,10 +16,11 @@ #pragma once -#include #include "velox/dwio/common/TypeWithId.h" #include "velox/dwio/parquet/thrift/ParquetThriftTypes.h" +#include + namespace facebook::velox::parquet { /// Describes what to extract from leaf repetition / definition From 913adb7fdb0789fe648e5cde0bc93ccaebbf8d84 Mon Sep 17 00:00:00 2001 From: yingsu00 Date: Thu, 9 Nov 2023 19:23:25 -0800 Subject: [PATCH 2/6] Fix build issues for RowContainerSortBenchmark RowContainerSortBenchmark depends on Parquet reader by default, but Parquet support is controled by VELOX_ENABLE_PARQUET and it's defaulted to OFF. This caused build failures because it tries to include some Arrow headers which is not installed if VELOX_ENABLE_PARQUET is off. This commit guard this benchmark by checking VELOX_ENABLE_PARQUET first. --- velox/exec/benchmarks/CMakeLists.txt | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/velox/exec/benchmarks/CMakeLists.txt b/velox/exec/benchmarks/CMakeLists.txt index 45a80e2873af..92fd47a6bb60 100644 --- a/velox/exec/benchmarks/CMakeLists.txt +++ b/velox/exec/benchmarks/CMakeLists.txt @@ -37,7 +37,15 @@ add_executable(velox_hash_benchmark HashTableBenchmark.cpp) target_link_libraries(velox_hash_benchmark velox_exec velox_exec_test_lib velox_vector_test_lib ${FOLLY_BENCHMARK}) -add_executable(velox_sort_benchmark RowContainerSortBenchmark.cpp) - -target_link_libraries(velox_sort_benchmark velox_exec velox_exec_test_lib - velox_vector_test_lib ${FOLLY_BENCHMARK}) +if(${VELOX_ENABLE_PARQUET}) + add_executable(velox_sort_benchmark RowContainerSortBenchmark.cpp) + + target_link_libraries( + velox_sort_benchmark + velox_exec + velox_exec_test_lib + velox_vector_test_lib + ${FOLLY_BENCHMARK} + arrow + thrift) +endif() From 0971056f2018dc75f9c36f72b7caa22cbad5f3f1 Mon Sep 17 00:00:00 2001 From: yingsu00 Date: Thu, 9 Nov 2023 19:35:47 -0800 Subject: [PATCH 3/6] Clean up include headers in S3InsertTest.cpp S3InsertTest.cpp used to include the ParquetReader.h, and caused build failures if VELOX_ENABLE_PARQUET and VELOX_ENABLE_ARROW were not turned on. However the include of ParquetReader.h is not needed. This commit cleans up the includes for S3InsertTest.cpp --- .../hive/storage_adapters/s3fs/tests/S3InsertTest.cpp | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/velox/connectors/hive/storage_adapters/s3fs/tests/S3InsertTest.cpp b/velox/connectors/hive/storage_adapters/s3fs/tests/S3InsertTest.cpp index 57dfcded14c0..a4bf93471e46 100644 --- a/velox/connectors/hive/storage_adapters/s3fs/tests/S3InsertTest.cpp +++ b/velox/connectors/hive/storage_adapters/s3fs/tests/S3InsertTest.cpp @@ -14,18 +14,17 @@ * limitations under the License. */ -#include -#include "gtest/gtest.h" -#include "velox/common/file/FileSystems.h" #include "velox/connectors/hive/storage_adapters/s3fs/RegisterS3FileSystem.h" #include "velox/connectors/hive/storage_adapters/s3fs/tests/MinioServer.h" -#include "velox/dwio/parquet/reader/ParquetReader.h" #include "velox/exec/TableWriter.h" #include "velox/exec/tests/utils/AssertQueryBuilder.h" #include "velox/exec/tests/utils/HiveConnectorTestBase.h" #include "velox/exec/tests/utils/PlanBuilder.h" +#include +#include + using namespace facebook::velox; using namespace facebook::velox::core; using namespace facebook::velox::exec; From 87130f48dadb1b2a343f99100772e9c539be2745 Mon Sep 17 00:00:00 2001 From: yingsu00 Date: Mon, 9 Oct 2023 15:19:19 -0700 Subject: [PATCH 4/6] Move connectorQueryCtx_ and other required fields to SplitReader The upcoming IcebergSplitReader will need to use fileHandleFactory_, executor_, connectorQueryCtx_, etc because it needs to create another HiveDataSource to read the delete files. This PR copies these required fields to SplitReader. Moreover, since the SplitReader already owns the baseReader_, the creation and configuration of ReaderOptions was also moved to SplitReader in a single function configureReaderOptions(). Previously the configuration of ReaderOptions was scattered at multiple locations in HiveDataSource. --- velox/connectors/hive/HiveConnector.cpp | 15 +- velox/connectors/hive/HiveDataSource.cpp | 136 +++------------- velox/connectors/hive/HiveDataSource.h | 22 +-- velox/connectors/hive/SplitReader.cpp | 199 +++++++++++++++++++---- velox/connectors/hive/SplitReader.h | 54 ++++-- 5 files changed, 236 insertions(+), 190 deletions(-) diff --git a/velox/connectors/hive/HiveConnector.cpp b/velox/connectors/hive/HiveConnector.cpp index 522ec99058f4..184d40cd981a 100644 --- a/velox/connectors/hive/HiveConnector.cpp +++ b/velox/connectors/hive/HiveConnector.cpp @@ -75,27 +75,14 @@ std::unique_ptr HiveConnector::createDataSource( std::string, std::shared_ptr>& columnHandles, ConnectorQueryCtx* connectorQueryCtx) { - dwio::common::ReaderOptions options(connectorQueryCtx->memoryPool()); - options.setMaxCoalesceBytes( - HiveConfig::maxCoalescedBytes(connectorQueryCtx->config())); - options.setMaxCoalesceDistance( - HiveConfig::maxCoalescedDistanceBytes(connectorQueryCtx->config())); - options.setFileColumnNamesReadAsLowerCase( - HiveConfig::isFileColumnNamesReadAsLowerCase( - connectorQueryCtx->config())); - options.setUseColumnNamesForColumnMapping( - HiveConfig::isOrcUseColumnNames(connectorQueryCtx->config())); return std::make_unique( outputType, tableHandle, columnHandles, &fileHandleFactory_, - connectorQueryCtx->expressionEvaluator(), - connectorQueryCtx->cache(), - connectorQueryCtx->scanId(), executor_, - options); + connectorQueryCtx); } std::unique_ptr HiveConnector::createDataSink( diff --git a/velox/connectors/hive/HiveDataSource.cpp b/velox/connectors/hive/HiveDataSource.cpp index 279f892bb50d..16bd3a503ede 100644 --- a/velox/connectors/hive/HiveDataSource.cpp +++ b/velox/connectors/hive/HiveDataSource.cpp @@ -16,15 +16,14 @@ #include "velox/connectors/hive/HiveDataSource.h" -#include -#include - -#include "velox/dwio/common/CachedBufferedInput.h" -#include "velox/dwio/common/DirectBufferedInput.h" +#include "velox/connectors/hive/HiveConfig.h" #include "velox/dwio/common/ReaderFactory.h" #include "velox/expression/ExprToSubfieldFilter.h" #include "velox/expression/FieldReference.h" +#include +#include + namespace facebook::velox::connector::hive { class HiveTableHandle; @@ -358,18 +357,13 @@ HiveDataSource::HiveDataSource( std::string, std::shared_ptr>& columnHandles, FileHandleFactory* fileHandleFactory, - core::ExpressionEvaluator* expressionEvaluator, - cache::AsyncDataCache* cache, - const std::string& scanId, folly::Executor* executor, - const dwio::common::ReaderOptions& options) - : fileHandleFactory_(fileHandleFactory), - readerOpts_(options), - pool_(&options.getMemoryPool()), + ConnectorQueryCtx* connectorQueryCtx) + : pool_(connectorQueryCtx->memoryPool()), outputType_(outputType), - expressionEvaluator_(expressionEvaluator), - cache_(cache), - scanId_(scanId), + expressionEvaluator_(connectorQueryCtx->expressionEvaluator()), + fileHandleFactory_(fileHandleFactory), + connectorQueryCtx_(connectorQueryCtx), executor_(executor) { // Column handled keyed on the column alias, the name used in the query. for (const auto& [canonicalizedName, columnHandle] : columnHandles) { @@ -410,7 +404,8 @@ HiveDataSource::HiveDataSource( VELOX_CHECK( hiveTableHandle_ != nullptr, "TableHandle must be an instance of HiveTableHandle"); - if (readerOpts_.isFileColumnNamesReadAsLowerCase()) { + if (HiveConfig::isFileColumnNamesReadAsLowerCase( + connectorQueryCtx->config())) { checkColumnNameLowerCase(outputType_); checkColumnNameLowerCase(hiveTableHandle_->subfieldFilters()); checkColumnNameLowerCase(hiveTableHandle_->remainingFilter()); @@ -474,62 +469,20 @@ HiveDataSource::HiveDataSource( *scanSpec_, *remainingFilter, expressionEvaluator_); } - readerOpts_.setFileSchema(hiveTableHandle_->dataColumns()); ioStats_ = std::make_shared(); } -inline uint8_t parseDelimiter(const std::string& delim) { - for (char const& ch : delim) { - if (!std::isdigit(ch)) { - return delim[0]; - } - } - return stoi(delim); -} - -void HiveDataSource::parseSerdeParameters( - const std::unordered_map& serdeParameters) { - auto fieldIt = serdeParameters.find(dwio::common::SerDeOptions::kFieldDelim); - if (fieldIt == serdeParameters.end()) { - fieldIt = serdeParameters.find("serialization.format"); - } - auto collectionIt = - serdeParameters.find(dwio::common::SerDeOptions::kCollectionDelim); - if (collectionIt == serdeParameters.end()) { - // For collection delimiter, Hive 1.x, 2.x uses "colelction.delim", but - // Hive 3.x uses "collection.delim". - // See: https://issues.apache.org/jira/browse/HIVE-16922) - collectionIt = serdeParameters.find("colelction.delim"); - } - auto mapKeyIt = - serdeParameters.find(dwio::common::SerDeOptions::kMapKeyDelim); - - if (fieldIt == serdeParameters.end() && - collectionIt == serdeParameters.end() && - mapKeyIt == serdeParameters.end()) { - return; - } - - uint8_t fieldDelim = '\1'; - uint8_t collectionDelim = '\2'; - uint8_t mapKeyDelim = '\3'; - if (fieldIt != serdeParameters.end()) { - fieldDelim = parseDelimiter(fieldIt->second); - } - if (collectionIt != serdeParameters.end()) { - collectionDelim = parseDelimiter(collectionIt->second); - } - if (mapKeyIt != serdeParameters.end()) { - mapKeyDelim = parseDelimiter(mapKeyIt->second); - } - dwio::common::SerDeOptions serDeOptions( - fieldDelim, collectionDelim, mapKeyDelim); - readerOpts_.setSerDeOptions(serDeOptions); -} - std::unique_ptr HiveDataSource::createSplitReader() { return SplitReader::create( - split_, readerOutputType_, partitionKeys_, scanSpec_, pool_); + split_, + hiveTableHandle_, + scanSpec_, + readerOutputType_, + &partitionKeys_, + fileHandleFactory_, + executor_, + connectorQueryCtx_, + ioStats_); } void HiveDataSource::addSplit(std::shared_ptr split) { @@ -541,30 +494,12 @@ void HiveDataSource::addSplit(std::shared_ptr split) { VLOG(1) << "Adding split " << split_->toString(); - if (readerOpts_.getFileFormat() != dwio::common::FileFormat::UNKNOWN) { - VELOX_CHECK( - readerOpts_.getFileFormat() == split_->fileFormat, - "HiveDataSource received splits of different formats: {} and {}", - toString(readerOpts_.getFileFormat()), - toString(split_->fileFormat)); - } else { - parseSerdeParameters(split_->serdeParameters); - readerOpts_.setFileFormat(split_->fileFormat); - } - - auto fileHandle = fileHandleFactory_->generate(split_->filePath).second; - auto input = createBufferedInput(*fileHandle, readerOpts_); - if (splitReader_) { splitReader_.reset(); } + splitReader_ = createSplitReader(); - splitReader_->prepareSplit( - hiveTableHandle_, - readerOpts_, - std::move(input), - metadataFilter_, - runtimeStats_); + splitReader_->prepareSplit(metadataFilter_, runtimeStats_); } std::optional HiveDataSource::next( @@ -788,33 +723,6 @@ std::shared_ptr HiveDataSource::makeScanSpec( return spec; } -std::unique_ptr -HiveDataSource::createBufferedInput( - const FileHandle& fileHandle, - const dwio::common::ReaderOptions& readerOpts) { - if (cache_) { - return std::make_unique( - fileHandle.file, - dwio::common::MetricsLog::voidLog(), - fileHandle.uuid.id(), - cache_, - Connector::getTracker(scanId_, readerOpts.loadQuantum()), - fileHandle.groupId.id(), - ioStats_, - executor_, - readerOpts); - } - return std::make_unique( - fileHandle.file, - dwio::common::MetricsLog::voidLog(), - fileHandle.uuid.id(), - Connector::getTracker(scanId_, readerOpts.loadQuantum()), - fileHandle.groupId.id(), - ioStats_, - executor_, - readerOpts); -} - vector_size_t HiveDataSource::evaluateRemainingFilter(RowVectorPtr& rowVector) { filterRows_.resize(output_->size()); diff --git a/velox/connectors/hive/HiveDataSource.h b/velox/connectors/hive/HiveDataSource.h index 03d86a137108..2d54db3fda64 100644 --- a/velox/connectors/hive/HiveDataSource.h +++ b/velox/connectors/hive/HiveDataSource.h @@ -38,11 +38,8 @@ class HiveDataSource : public DataSource { std::string, std::shared_ptr>& columnHandles, FileHandleFactory* fileHandleFactory, - core::ExpressionEvaluator* expressionEvaluator, - cache::AsyncDataCache* cache, - const std::string& scanId, folly::Executor* executor, - const dwio::common::ReaderOptions& options); + ConnectorQueryCtx* connectorQueryCtx); void addSplit(std::shared_ptr split) override; @@ -95,15 +92,9 @@ class HiveDataSource : public DataSource { protected: virtual std::unique_ptr createSplitReader(); - std::unique_ptr createBufferedInput( - const FileHandle&, - const dwio::common::ReaderOptions&); - std::shared_ptr split_; - FileHandleFactory* fileHandleFactory_; - dwio::common::ReaderOptions readerOpts_; - std::shared_ptr scanSpec_; memory::MemoryPool* pool_; + std::shared_ptr scanSpec_; VectorPtr output_; std::unique_ptr splitReader_; @@ -128,9 +119,6 @@ class HiveDataSource : public DataSource { // hold adaptation. void resetSplit(); - void parseSerdeParameters( - const std::unordered_map& serdeParameters); - const RowVectorPtr& getEmptyOutput() { if (!emptyOutput_) { emptyOutput_ = RowVector::createEmpty(outputType_, pool_); @@ -140,7 +128,7 @@ class HiveDataSource : public DataSource { std::shared_ptr hiveTableHandle_; - // The row type for the data source output, not including filter only columns + // The row type for the data source output, not including filter-only columns const RowTypePtr outputType_; std::shared_ptr ioStats_; std::shared_ptr metadataFilter_; @@ -155,8 +143,8 @@ class HiveDataSource : public DataSource { SelectivityVector filterRows_; exec::FilterEvalCtx filterEvalCtx_; - cache::AsyncDataCache* const cache_{nullptr}; - const std::string& scanId_; + FileHandleFactory* fileHandleFactory_; + const ConnectorQueryCtx* const connectorQueryCtx_; folly::Executor* executor_; }; diff --git a/velox/connectors/hive/SplitReader.cpp b/velox/connectors/hive/SplitReader.cpp index a42b64e37141..516acbcd290c 100644 --- a/velox/connectors/hive/SplitReader.cpp +++ b/velox/connectors/hive/SplitReader.cpp @@ -16,10 +16,20 @@ #include "velox/connectors/hive/SplitReader.h" +#include "velox/connectors/hive/HiveConfig.h" #include "velox/connectors/hive/HiveConnectorSplit.h" #include "velox/connectors/hive/TableHandle.h" +#include "velox/dwio/common/CachedBufferedInput.h" +#include "velox/dwio/common/DirectBufferedInput.h" +#include "velox/dwio/common/Options.h" #include "velox/dwio/common/ReaderFactory.h" +#include +#include + +#include +#include + namespace facebook::velox::connector::hive { namespace { @@ -57,7 +67,7 @@ bool testFilters( const std::string& filePath, const std::unordered_map>& partitionKey, - std::unordered_map>& + std::unordered_map>* partitionKeysHandle) { auto totalRows = reader->numberOfRows(); const auto& fileTypeWithId = reader->typeWithId(); @@ -70,7 +80,7 @@ bool testFilters( auto iter = partitionKey.find(name); if (iter != partitionKey.end() && iter->second.has_value()) { return applyPartitionFilter( - partitionKeysHandle[name]->dataType()->kind(), + (*partitionKeysHandle)[name]->dataType()->kind(), iter->second.value(), child->filter()); } @@ -116,41 +126,72 @@ velox::variant convertFromString(const std::optional& value) { return velox::variant(ToKind); } +inline uint8_t parseDelimiter(const std::string& delim) { + for (char const& ch : delim) { + if (!std::isdigit(ch)) { + return delim[0]; + } + } + return stoi(delim); +} + } // namespace std::unique_ptr SplitReader::create( std::shared_ptr hiveSplit, + std::shared_ptr hiveTableHandle, + std::shared_ptr scanSpec, const RowTypePtr readerOutputType, - std::unordered_map>& + std::unordered_map>* partitionKeys, - std::shared_ptr scanSpec, - memory::MemoryPool* pool) { - // Create the SplitReader based on hiveSplit->customSplitInfo["table_format"] + FileHandleFactory* fileHandleFactory, + folly::Executor* executor, + const ConnectorQueryCtx* connectorQueryCtx, + std::shared_ptr ioStats) { return std::make_unique( - hiveSplit, readerOutputType, partitionKeys, scanSpec, pool); + hiveSplit, + hiveTableHandle, + scanSpec, + readerOutputType, + partitionKeys, + fileHandleFactory, + executor, + connectorQueryCtx, + ioStats); } SplitReader::SplitReader( std::shared_ptr hiveSplit, + std::shared_ptr hiveTableHandle, + std::shared_ptr scanSpec, const RowTypePtr readerOutputType, - std::unordered_map>& + std::unordered_map>* partitionKeys, - std::shared_ptr scanSpec, - memory::MemoryPool* pool) - : hiveSplit_(std::move(hiveSplit)), + FileHandleFactory* fileHandleFactory, + folly::Executor* executor, + const ConnectorQueryCtx* connectorQueryCtx, + std::shared_ptr ioStats) + : hiveSplit_(hiveSplit), + hiveTableHandle_(hiveTableHandle), + scanSpec_(scanSpec), readerOutputType_(readerOutputType), partitionKeys_(partitionKeys), - scanSpec_(std::move(scanSpec)), - pool_(pool) {} + fileHandleFactory_(fileHandleFactory), + executor_(executor), + connectorQueryCtx_(connectorQueryCtx), + ioStats_(ioStats), + baseReaderOpts_(connectorQueryCtx->memoryPool()) {} void SplitReader::prepareSplit( - const std::shared_ptr& hiveTableHandle, - const dwio::common::ReaderOptions& readerOptions, - std::unique_ptr baseFileInput, std::shared_ptr metadataFilter, dwio::common::RuntimeStatistics& runtimeStats) { - baseReader_ = dwio::common::getReaderFactory(readerOptions.getFileFormat()) - ->createReader(std::move(baseFileInput), readerOptions); + configureReaderOptions(); + + auto fileHandle = fileHandleFactory_->generate(hiveSplit_->filePath).second; + auto baseFileInput = createBufferedInput(*fileHandle, baseReaderOpts_); + + baseReader_ = dwio::common::getReaderFactory(baseReaderOpts_.getFileFormat()) + ->createReader(std::move(baseFileInput), baseReaderOpts_); // Note that this doesn't apply to Hudi tables. emptySplit_ = false; @@ -174,23 +215,23 @@ void SplitReader::prepareSplit( } auto& fileType = baseReader_->rowType(); - auto columnTypes = adaptColumns(fileType, readerOptions.getFileSchema()); + auto columnTypes = adaptColumns(fileType, baseReaderOpts_.getFileSchema()); - auto skipRowsIt = hiveTableHandle->tableParameters().find( + auto skipRowsIt = hiveTableHandle_->tableParameters().find( dwio::common::TableParameter::kSkipHeaderLineCount); - if (skipRowsIt != hiveTableHandle->tableParameters().end()) { - rowReaderOpts_.setSkipRows(folly::to(skipRowsIt->second)); + if (skipRowsIt != hiveTableHandle_->tableParameters().end()) { + baseRowReaderOpts_.setSkipRows(folly::to(skipRowsIt->second)); } - rowReaderOpts_.setScanSpec(scanSpec_); - rowReaderOpts_.setMetadataFilter(metadataFilter); + baseRowReaderOpts_.setScanSpec(scanSpec_); + baseRowReaderOpts_.setMetadataFilter(metadataFilter); configureRowReaderOptions( - rowReaderOpts_, + baseRowReaderOpts_, ROW(std::vector(fileType->names()), std::move(columnTypes))); // NOTE: we firstly reset the finished 'baseRowReader_' of previous split // before setting up for the next one to avoid doubling the peak memory usage. baseRowReader_.reset(); - baseRowReader_ = baseReader_->createRowReader(rowReaderOpts_); + baseRowReader_ = baseReader_->createRowReader(baseRowReaderOpts_); } std::vector SplitReader::adaptColumns( @@ -287,22 +328,24 @@ void SplitReader::setConstantValue( common::ScanSpec* spec, const TypePtr& type, const velox::variant& value) const { - spec->setConstantValue(BaseVector::createConstant(type, value, 1, pool_)); + spec->setConstantValue(BaseVector::createConstant( + type, value, 1, connectorQueryCtx_->memoryPool())); } void SplitReader::setNullConstantValue( common::ScanSpec* spec, const TypePtr& type) const { - spec->setConstantValue(BaseVector::createNullConstant(type, 1, pool_)); + spec->setConstantValue(BaseVector::createNullConstant( + type, 1, connectorQueryCtx_->memoryPool())); } void SplitReader::setPartitionValue( common::ScanSpec* spec, const std::string& partitionKey, const std::optional& value) const { - auto it = partitionKeys_.find(partitionKey); + auto it = partitionKeys_->find(partitionKey); VELOX_CHECK( - it != partitionKeys_.end(), + it != partitionKeys_->end(), "ColumnHandle is missing for partition key {}", partitionKey); auto constValue = VELOX_DYNAMIC_SCALAR_TYPE_DISPATCH( @@ -310,6 +353,30 @@ void SplitReader::setPartitionValue( setConstantValue(spec, it->second->dataType(), constValue); } +void SplitReader::configureReaderOptions() { + baseReaderOpts_.setMaxCoalesceBytes( + HiveConfig::maxCoalescedBytes(connectorQueryCtx_->config())); + baseReaderOpts_.setMaxCoalesceDistance( + HiveConfig::maxCoalescedDistanceBytes(connectorQueryCtx_->config())); + baseReaderOpts_.setFileColumnNamesReadAsLowerCase( + HiveConfig::isFileColumnNamesReadAsLowerCase( + connectorQueryCtx_->config())); + baseReaderOpts_.setUseColumnNamesForColumnMapping( + HiveConfig::isOrcUseColumnNames(connectorQueryCtx_->config())); + baseReaderOpts_.setFileSchema(hiveTableHandle_->dataColumns()); + + if (baseReaderOpts_.getFileFormat() != dwio::common::FileFormat::UNKNOWN) { + VELOX_CHECK( + baseReaderOpts_.getFileFormat() == hiveSplit_->fileFormat, + "HiveDataSource received splits of different formats: {} and {}", + dwio::common::toString(baseReaderOpts_.getFileFormat()), + dwio::common::toString(hiveSplit_->fileFormat)); + } else { + parseSerdeParameters(hiveSplit_->serdeParameters); + baseReaderOpts_.setFileFormat(hiveSplit_->fileFormat); + } +} + void SplitReader::configureRowReaderOptions( dwio::common::RowReaderOptions& options, const RowTypePtr& rowType) { @@ -329,11 +396,79 @@ void SplitReader::configureRowReaderOptions( options.select(cs).range(hiveSplit_->start, hiveSplit_->length); } +void SplitReader::parseSerdeParameters( + const std::unordered_map& serdeParameters) { + auto fieldIt = serdeParameters.find(dwio::common::SerDeOptions::kFieldDelim); + if (fieldIt == serdeParameters.end()) { + fieldIt = serdeParameters.find("serialization.format"); + } + auto collectionIt = + serdeParameters.find(dwio::common::SerDeOptions::kCollectionDelim); + if (collectionIt == serdeParameters.end()) { + // For collection delimiter, Hive 1.x, 2.x uses "colelction.delim", but + // Hive 3.x uses "collection.delim". + // See: https://issues.apache.org/jira/browse/HIVE-16922) + collectionIt = serdeParameters.find("colelction.delim"); + } + auto mapKeyIt = + serdeParameters.find(dwio::common::SerDeOptions::kMapKeyDelim); + + if (fieldIt == serdeParameters.end() && + collectionIt == serdeParameters.end() && + mapKeyIt == serdeParameters.end()) { + return; + } + + uint8_t fieldDelim = '\1'; + uint8_t collectionDelim = '\2'; + uint8_t mapKeyDelim = '\3'; + if (fieldIt != serdeParameters.end()) { + fieldDelim = parseDelimiter(fieldIt->second); + } + if (collectionIt != serdeParameters.end()) { + collectionDelim = parseDelimiter(collectionIt->second); + } + if (mapKeyIt != serdeParameters.end()) { + mapKeyDelim = parseDelimiter(mapKeyIt->second); + } + dwio::common::SerDeOptions serDeOptions( + fieldDelim, collectionDelim, mapKeyDelim); + baseReaderOpts_.setSerDeOptions(serDeOptions); +} + +std::unique_ptr SplitReader::createBufferedInput( + const FileHandle& fileHandle, + const dwio::common::ReaderOptions& readerOpts) { + if (connectorQueryCtx_->cache()) { + return std::make_unique( + fileHandle.file, + dwio::common::MetricsLog::voidLog(), + fileHandle.uuid.id(), + connectorQueryCtx_->cache(), + Connector::getTracker( + connectorQueryCtx_->scanId(), readerOpts.loadQuantum()), + fileHandle.groupId.id(), + ioStats_, + executor_, + readerOpts); + } + return std::make_unique( + fileHandle.file, + dwio::common::MetricsLog::voidLog(), + fileHandle.uuid.id(), + Connector::getTracker( + connectorQueryCtx_->scanId(), readerOpts.loadQuantum()), + fileHandle.groupId.id(), + ioStats_, + executor_, + readerOpts); +} + std::string SplitReader::toString() const { std::string partitionKeys; std::for_each( - partitionKeys_.begin(), - partitionKeys_.end(), + partitionKeys_->begin(), + partitionKeys_->end(), [&](std::pair< const std::string, std::shared_ptr> diff --git a/velox/connectors/hive/SplitReader.h b/velox/connectors/hive/SplitReader.h index 0cac6c4f9d6a..52417c642571 100644 --- a/velox/connectors/hive/SplitReader.h +++ b/velox/connectors/hive/SplitReader.h @@ -16,9 +16,18 @@ #pragma once +#include "velox/connectors/hive/FileHandle.h" #include "velox/dwio/common/Reader.h" #include "velox/type/Type.h" +namespace facebook::velox::cache { +class AsyncDataCache; +} + +namespace facebook::velox::connector { +class ConnectorQueryCtx; +} + namespace facebook::velox::dwio::common { class BufferedInput; } @@ -36,19 +45,27 @@ class SplitReader { public: static std::unique_ptr create( std::shared_ptr hiveSplit, + std::shared_ptr hiveTableHandle, + std::shared_ptr scanSpec, const RowTypePtr readerOutputType, - std::unordered_map>& + std::unordered_map>* partitionKeys, - std::shared_ptr scanSpec, - memory::MemoryPool* pool); + FileHandleFactory* fileHandleFactory, + folly::Executor* executor, + const ConnectorQueryCtx* connectorQueryCtx, + std::shared_ptr ioStats); SplitReader( std::shared_ptr hiveSplit, + std::shared_ptr hiveTableHandle, + std::shared_ptr scanSpec, const RowTypePtr readerOutputType, - std::unordered_map>& + std::unordered_map>* partitionKeys, - std::shared_ptr scanSpec, - memory::MemoryPool* pool); + FileHandleFactory* fileHandleFactory, + folly::Executor* executor, + const ConnectorQueryCtx* connectorQueryCtx, + std::shared_ptr ioStats); virtual ~SplitReader() = default; @@ -56,9 +73,6 @@ class SplitReader { /// do additional preparations before reading the split, e.g. Open delete /// files or log files, and add column adapatations for metadata columns virtual void prepareSplit( - const std::shared_ptr& hiveTableHandle, - const dwio::common::ReaderOptions& readerOptions, - std::unique_ptr baseFileInput, std::shared_ptr metadataFilter, dwio::common::RuntimeStatistics& runtimeStats); @@ -100,20 +114,34 @@ class SplitReader { const std::optional& value) const; std::shared_ptr hiveSplit_; + std::shared_ptr hiveTableHandle_; + std::shared_ptr scanSpec_; RowTypePtr readerOutputType_; - std::unordered_map>& + std::unordered_map>* partitionKeys_; - std::shared_ptr scanSpec_; - memory::MemoryPool* pool_; std::unique_ptr baseReader_; - dwio::common::RowReaderOptions rowReaderOpts_; std::unique_ptr baseRowReader_; + FileHandleFactory* const fileHandleFactory_; + folly::Executor* const executor_; + const ConnectorQueryCtx* const connectorQueryCtx_; + std::shared_ptr ioStats_; private: + void configureReaderOptions(); + void configureRowReaderOptions( dwio::common::RowReaderOptions& options, const RowTypePtr& rowType); + void parseSerdeParameters( + const std::unordered_map& serdeParameters); + + std::unique_ptr createBufferedInput( + const FileHandle& fileHandle, + const dwio::common::ReaderOptions& readerOpts); + + dwio::common::ReaderOptions baseReaderOpts_; + dwio::common::RowReaderOptions baseRowReaderOpts_; bool emptySplit_; }; From f44ad01d53aee6df9357d05d8ce90d3a223c833d Mon Sep 17 00:00:00 2001 From: yingsu00 Date: Mon, 9 Oct 2023 15:19:19 -0700 Subject: [PATCH 5/6] Introducing IcebergSplitReader In this commit we introduces IcebergSplitReader which supports reading Iceberg splits with positional delete files. --- velox/common/testutil/tests/CMakeLists.txt | 10 +- velox/connectors/hive/CMakeLists.txt | 28 +-- velox/connectors/hive/FileHandle.h | 8 +- velox/connectors/hive/HiveConnector.cpp | 1 - velox/connectors/hive/HiveDataSource.cpp | 3 +- velox/connectors/hive/HiveDataSource.h | 6 +- velox/connectors/hive/SplitReader.cpp | 44 ++-- velox/connectors/hive/SplitReader.h | 6 +- velox/connectors/hive/iceberg/CMakeLists.txt | 27 +++ .../hive/iceberg/DeleteFileReader.cpp | 192 ++++++++++++++++ .../hive/iceberg/DeleteFileReader.h | 84 +++++++ .../hive/iceberg/IcebergDeleteFile.h | 69 ++++++ .../hive/iceberg/IcebergMetadataColumns.h | 56 +++++ .../connectors/hive/iceberg/IcebergSplit.cpp | 69 ++++++ velox/connectors/hive/iceberg/IcebergSplit.h | 56 +++++ .../hive/iceberg/IcebergSplitReader.cpp | 105 +++++++++ .../hive/iceberg/IcebergSplitReader.h | 61 +++++ .../hive/iceberg/cmake/Config.cmake.in | 4 + .../hive/iceberg/tests/CMakeLists.txt | 34 +++ .../hive/iceberg/tests/IcebergReadTest.cpp | 212 ++++++++++++++++++ velox/dwio/dwrf/test/CMakeLists.txt | 1 + .../dwio/parquet/tests/reader/CMakeLists.txt | 1 + 22 files changed, 1029 insertions(+), 48 deletions(-) create mode 100644 velox/connectors/hive/iceberg/CMakeLists.txt create mode 100644 velox/connectors/hive/iceberg/DeleteFileReader.cpp create mode 100644 velox/connectors/hive/iceberg/DeleteFileReader.h create mode 100644 velox/connectors/hive/iceberg/IcebergDeleteFile.h create mode 100644 velox/connectors/hive/iceberg/IcebergMetadataColumns.h create mode 100644 velox/connectors/hive/iceberg/IcebergSplit.cpp create mode 100644 velox/connectors/hive/iceberg/IcebergSplit.h create mode 100644 velox/connectors/hive/iceberg/IcebergSplitReader.cpp create mode 100644 velox/connectors/hive/iceberg/IcebergSplitReader.h create mode 100644 velox/connectors/hive/iceberg/cmake/Config.cmake.in create mode 100644 velox/connectors/hive/iceberg/tests/CMakeLists.txt create mode 100644 velox/connectors/hive/iceberg/tests/IcebergReadTest.cpp diff --git a/velox/common/testutil/tests/CMakeLists.txt b/velox/common/testutil/tests/CMakeLists.txt index 02b7ee5f1045..d232c201d931 100644 --- a/velox/common/testutil/tests/CMakeLists.txt +++ b/velox/common/testutil/tests/CMakeLists.txt @@ -16,11 +16,5 @@ add_executable(velox_test_util_test TestValueTest.cpp SpillConfigTest.cpp) gtest_add_tests(velox_test_util_test "" AUTO) target_link_libraries( - velox_test_util_test - PRIVATE - velox_test_util - velox_exception - velox_spill_config - velox_exec - gtest - gtest_main) + velox_test_util_test PRIVATE velox_test_util velox_exception + velox_spill_config velox_exec gtest gtest_main) diff --git a/velox/connectors/hive/CMakeLists.txt b/velox/connectors/hive/CMakeLists.txt index 5fbd19245169..772aac7d7506 100644 --- a/velox/connectors/hive/CMakeLists.txt +++ b/velox/connectors/hive/CMakeLists.txt @@ -13,9 +13,10 @@ # limitations under the License. add_library(velox_hive_config OBJECT HiveConfig.cpp) - target_link_libraries(velox_hive_config velox_exception) +add_subdirectory(iceberg) + add_library( velox_hive_connector OBJECT FileHandle.cpp @@ -30,18 +31,19 @@ add_library( target_link_libraries( velox_hive_connector - velox_common_io - velox_connector - velox_dwio_catalog_fbhive - velox_dwio_dwrf_reader - velox_dwio_dwrf_writer - velox_dwio_parquet_reader - velox_dwio_parquet_writer - velox_file - velox_hive_partition_function - velox_s3fs - velox_hdfs - velox_gcs) + PUBLIC velox_hive_iceberg_splitreader + PRIVATE velox_common_io + velox_connector + velox_dwio_catalog_fbhive + velox_dwio_dwrf_reader + velox_dwio_dwrf_writer + velox_dwio_parquet_reader + velox_dwio_parquet_writer + velox_file + velox_hive_partition_function + velox_s3fs + velox_hdfs + velox_gcs) add_library(velox_hive_partition_function HivePartitionFunction.cpp) diff --git a/velox/connectors/hive/FileHandle.h b/velox/connectors/hive/FileHandle.h index 15edd9d2ac2f..9482051fcb3b 100644 --- a/velox/connectors/hive/FileHandle.h +++ b/velox/connectors/hive/FileHandle.h @@ -25,15 +25,15 @@ #pragma once -#include -#include -#include - #include "velox/common/caching/CachedFactory.h" #include "velox/common/caching/FileIds.h" #include "velox/common/file/File.h" #include "velox/dwio/common/InputStream.h" +//#include +//#include +//#include + namespace facebook::velox { class Config; diff --git a/velox/connectors/hive/HiveConnector.cpp b/velox/connectors/hive/HiveConnector.cpp index 184d40cd981a..be753d2f387d 100644 --- a/velox/connectors/hive/HiveConnector.cpp +++ b/velox/connectors/hive/HiveConnector.cpp @@ -75,7 +75,6 @@ std::unique_ptr HiveConnector::createDataSource( std::string, std::shared_ptr>& columnHandles, ConnectorQueryCtx* connectorQueryCtx) { - return std::make_unique( outputType, tableHandle, diff --git a/velox/connectors/hive/HiveDataSource.cpp b/velox/connectors/hive/HiveDataSource.cpp index 16bd3a503ede..19b7b49b7aaa 100644 --- a/velox/connectors/hive/HiveDataSource.cpp +++ b/velox/connectors/hive/HiveDataSource.cpp @@ -480,8 +480,8 @@ std::unique_ptr HiveDataSource::createSplitReader() { readerOutputType_, &partitionKeys_, fileHandleFactory_, - executor_, connectorQueryCtx_, + executor_, ioStats_); } @@ -497,7 +497,6 @@ void HiveDataSource::addSplit(std::shared_ptr split) { if (splitReader_) { splitReader_.reset(); } - splitReader_ = createSplitReader(); splitReader_->prepareSplit(metadataFilter_, runtimeStats_); } diff --git a/velox/connectors/hive/HiveDataSource.h b/velox/connectors/hive/HiveDataSource.h index 2d54db3fda64..8e448e18a35f 100644 --- a/velox/connectors/hive/HiveDataSource.h +++ b/velox/connectors/hive/HiveDataSource.h @@ -143,9 +143,9 @@ class HiveDataSource : public DataSource { SelectivityVector filterRows_; exec::FilterEvalCtx filterEvalCtx_; - FileHandleFactory* fileHandleFactory_; - const ConnectorQueryCtx* const connectorQueryCtx_; - folly::Executor* executor_; + FileHandleFactory* const fileHandleFactory_; + ConnectorQueryCtx* const connectorQueryCtx_; + folly::Executor* const executor_; }; } // namespace facebook::velox::connector::hive diff --git a/velox/connectors/hive/SplitReader.cpp b/velox/connectors/hive/SplitReader.cpp index 516acbcd290c..cecf268dc07e 100644 --- a/velox/connectors/hive/SplitReader.cpp +++ b/velox/connectors/hive/SplitReader.cpp @@ -19,13 +19,14 @@ #include "velox/connectors/hive/HiveConfig.h" #include "velox/connectors/hive/HiveConnectorSplit.h" #include "velox/connectors/hive/TableHandle.h" +#include "velox/connectors/hive/iceberg/IcebergSplitReader.h" #include "velox/dwio/common/CachedBufferedInput.h" #include "velox/dwio/common/DirectBufferedInput.h" #include "velox/dwio/common/Options.h" #include "velox/dwio/common/ReaderFactory.h" #include -#include +// #include #include #include @@ -145,19 +146,34 @@ std::unique_ptr SplitReader::create( std::unordered_map>* partitionKeys, FileHandleFactory* fileHandleFactory, + ConnectorQueryCtx* connectorQueryCtx, folly::Executor* executor, - const ConnectorQueryCtx* connectorQueryCtx, std::shared_ptr ioStats) { - return std::make_unique( - hiveSplit, - hiveTableHandle, - scanSpec, - readerOutputType, - partitionKeys, - fileHandleFactory, - executor, - connectorQueryCtx, - ioStats); + // Create the SplitReader based on hiveSplit->customSplitInfo["table_format"] + if (hiveSplit->customSplitInfo["table_format"] == "hive_iceberg") { + return std::make_unique( + hiveSplit, + hiveTableHandle, + scanSpec, + readerOutputType, + partitionKeys, + fileHandleFactory, + + connectorQueryCtx, + executor, + ioStats); + } else { + return std::make_unique( + hiveSplit, + hiveTableHandle, + scanSpec, + readerOutputType, + partitionKeys, + fileHandleFactory, + connectorQueryCtx, + executor, + ioStats); + } } SplitReader::SplitReader( @@ -168,8 +184,8 @@ SplitReader::SplitReader( std::unordered_map>* partitionKeys, FileHandleFactory* fileHandleFactory, + ConnectorQueryCtx* connectorQueryCtx, folly::Executor* executor, - const ConnectorQueryCtx* connectorQueryCtx, std::shared_ptr ioStats) : hiveSplit_(hiveSplit), hiveTableHandle_(hiveTableHandle), @@ -177,8 +193,8 @@ SplitReader::SplitReader( readerOutputType_(readerOutputType), partitionKeys_(partitionKeys), fileHandleFactory_(fileHandleFactory), - executor_(executor), connectorQueryCtx_(connectorQueryCtx), + executor_(executor), ioStats_(ioStats), baseReaderOpts_(connectorQueryCtx->memoryPool()) {} diff --git a/velox/connectors/hive/SplitReader.h b/velox/connectors/hive/SplitReader.h index 52417c642571..0f7fabd7b0f3 100644 --- a/velox/connectors/hive/SplitReader.h +++ b/velox/connectors/hive/SplitReader.h @@ -51,8 +51,8 @@ class SplitReader { std::unordered_map>* partitionKeys, FileHandleFactory* fileHandleFactory, + ConnectorQueryCtx* connectorQueryCtx, folly::Executor* executor, - const ConnectorQueryCtx* connectorQueryCtx, std::shared_ptr ioStats); SplitReader( @@ -63,8 +63,8 @@ class SplitReader { std::unordered_map>* partitionKeys, FileHandleFactory* fileHandleFactory, + ConnectorQueryCtx* connectorQueryCtx, folly::Executor* executor, - const ConnectorQueryCtx* connectorQueryCtx, std::shared_ptr ioStats); virtual ~SplitReader() = default; @@ -122,8 +122,8 @@ class SplitReader { std::unique_ptr baseReader_; std::unique_ptr baseRowReader_; FileHandleFactory* const fileHandleFactory_; + ConnectorQueryCtx* const connectorQueryCtx_; folly::Executor* const executor_; - const ConnectorQueryCtx* const connectorQueryCtx_; std::shared_ptr ioStats_; private: diff --git a/velox/connectors/hive/iceberg/CMakeLists.txt b/velox/connectors/hive/iceberg/CMakeLists.txt new file mode 100644 index 000000000000..d6f9148680c5 --- /dev/null +++ b/velox/connectors/hive/iceberg/CMakeLists.txt @@ -0,0 +1,27 @@ +# Copyright (c) Facebook, Inc. and its affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +add_library(velox_hive_iceberg_splitreader + IcebergSplitReader.cpp IcebergSplit.cpp DeleteFileReader.cpp) + +target_link_libraries( + velox_hive_iceberg_splitreader + Folly::folly + gflags::gflags + glog::glog + gtest + gtest_main + xsimd) + +add_subdirectory(tests) diff --git a/velox/connectors/hive/iceberg/DeleteFileReader.cpp b/velox/connectors/hive/iceberg/DeleteFileReader.cpp new file mode 100644 index 000000000000..143f2e73609b --- /dev/null +++ b/velox/connectors/hive/iceberg/DeleteFileReader.cpp @@ -0,0 +1,192 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/connectors/hive/iceberg/DeleteFileReader.h" + +#include "velox/connectors/hive/iceberg/IcebergDeleteFile.h" +#include "velox/connectors/hive/iceberg/IcebergMetadataColumns.h" + +namespace facebook::velox::connector::hive::iceberg { +DeleteFileReader::DeleteFileReader( + const IcebergDeleteFile& deleteFile, + const std::string& baseFilePath, + FileHandleFactory* fileHandleFactory, + folly::Executor* executor, + ConnectorQueryCtx* connectorQueryCtx, + uint64_t splitOffset, + const std::string& connectorId) + : deleteFile_(deleteFile), + baseFilePath_(baseFilePath), + fileHandleFactory_(fileHandleFactory), + executor_(executor), + connectorQueryCtx_(connectorQueryCtx), + splitOffset_(splitOffset), + deletePositionsVector_(nullptr), + deletePositionsOffset_(-1), + endOfFile_(false) { + if (deleteFile.content == FileContent::kPositionalDeletes) { + createPositionalDeleteDataSource(deleteFile, connectorId); + } else if (deleteFile.content == FileContent::kEqualityDeletes) { + VELOX_NYI("Iceberg equality delete files are not supported yet."); + } else { + VELOX_FAIL("Unrecogonized Iceberg delete file type: ", deleteFile.content); + } +} + +void DeleteFileReader::createPositionalDeleteDataSource( + const IcebergDeleteFile& deleteFile, + const std::string& connectorId) { + auto filePathColumn = ICEBERG_DELETE_FILE_PATH_COLUMN(); + auto positionsColumn = ICEBERG_DELETE_FILE_POSITIONS_COLUMN(); + + std::vector deleteColumnNames( + {filePathColumn->name, positionsColumn->name}); + std::vector> deleteColumnTypes( + {filePathColumn->type, positionsColumn->type}); + + RowTypePtr deleteRowType = + ROW(std::move(deleteColumnNames), std::move(deleteColumnTypes)); + + std::unordered_map> + deleteColumnHandles; + deleteColumnHandles[filePathColumn->name] = + std::make_shared( + filePathColumn->name, + HiveColumnHandle::ColumnType::kRegular, + VARCHAR(), + VARCHAR()); + deleteColumnHandles[positionsColumn->name] = + std::make_shared( + positionsColumn->name, + HiveColumnHandle::ColumnType::kRegular, + BIGINT(), + BIGINT()); + + size_t lastPathDelimiterPos = deleteFile.filePath.find_last_of('/'); + std::string deleteFileName = deleteFile.filePath.substr( + lastPathDelimiterPos, deleteFile.filePath.size() - lastPathDelimiterPos); + + // TODO: Build filters on the path column: filePathColumn = baseFilePath_ + // TODO: Build filters on the positionsColumn: + // positionsColumn >= baseReadOffset_ + splitOffsetInFile + SubfieldFilters subfieldFilters = {}; + + auto deleteTableHandle = std::make_shared( + connectorId, deleteFileName, false, std::move(subfieldFilters), nullptr); + + auto deleteSplit = std::make_shared( + connectorId, + deleteFile.filePath, + deleteFile.fileFormat, + 0, + deleteFile.fileSizeInBytes); + + deleteDataSource_ = std::make_unique( + deleteRowType, + deleteTableHandle, + deleteColumnHandles, + fileHandleFactory_, + executor_, + connectorQueryCtx_); + + deleteDataSource_->addSplit(deleteSplit); +} + +void DeleteFileReader::readDeletePositions( + uint64_t baseReadOffset, + uint64_t size, + int8_t* deleteBitmap) { + ContinueFuture blockingFuture(ContinueFuture::makeEmpty()); + // We are going to read to the row number up to the end of the batch. For the + // same base file, the deleted rows are in ascending order in the same delete + // file + int64_t rowNumberUpperBound = splitOffset_ + baseReadOffset + size; + + // Finish unused delete positions from last batch + if (deletePositionsVector_ && + deletePositionsOffset_ < deletePositionsVector_->size()) { + readDeletePositionsVec(baseReadOffset, rowNumberUpperBound, deleteBitmap); + + if (readFinishedForBatch(rowNumberUpperBound)) { + return; + } + } + + uint64_t numRowsToRead = std::min(size, deleteFile_.recordCount); + while (true) { + std::optional deletesResult = + deleteDataSource_->next(numRowsToRead, blockingFuture); + + if (!deletesResult.has_value()) { + return; + } + + auto data = deletesResult.value(); + if (data) { + if (data->size() > 0) { + VELOX_CHECK(data->childrenSize() > 0); + data->loadedVector(); + + deletePositionsVector_ = data->childAt(1); + deletePositionsOffset_ = 0; + + readDeletePositionsVec( + baseReadOffset, rowNumberUpperBound, deleteBitmap); + + if (readFinishedForBatch(rowNumberUpperBound)) { + return; + } + } + continue; + } else { + endOfFile_ = true; + return; + } + } +} + +bool DeleteFileReader::endOfFile() { + return endOfFile_; +} + +void DeleteFileReader::readDeletePositionsVec( + uint64_t baseReadOffset, + int64_t rowNumberUpperBound, + int8_t* deleteBitmap) { + // Convert the positions in file into positions relative to the start of the + // split. + const int64_t* deletePositions = + deletePositionsVector_->as>()->rawValues(); + int64_t offset = baseReadOffset + splitOffset_; + while (deletePositionsOffset_ < deletePositionsVector_->size() && + deletePositions[deletePositionsOffset_] < rowNumberUpperBound) { + bits::setBit( + deleteBitmap, deletePositions[deletePositionsOffset_] - offset); + deletePositionsOffset_++; + } +} + +bool DeleteFileReader::readFinishedForBatch(int64_t rowNumberUpperBound) { + const int64_t* deletePositions = + deletePositionsVector_->as>()->rawValues(); + if (deletePositionsOffset_ < deletePositionsVector_->size() && + deletePositions[deletePositionsOffset_] >= rowNumberUpperBound) { + return true; + } + return false; +} + +} // namespace facebook::velox::connector::hive::iceberg \ No newline at end of file diff --git a/velox/connectors/hive/iceberg/DeleteFileReader.h b/velox/connectors/hive/iceberg/DeleteFileReader.h new file mode 100644 index 000000000000..140b6f3d8889 --- /dev/null +++ b/velox/connectors/hive/iceberg/DeleteFileReader.h @@ -0,0 +1,84 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "velox/connectors/hive/FileHandle.h" +#include "velox/connectors/hive/HiveDataSource.h" + +#include +#include + +namespace facebook::velox::cache { +class AsyncDataCache; +} + +namespace facebook::velox::connector { +class ConnectorQueryCtx; +} + +namespace facebook::velox::core { +class ExpressionEvaluator; +} + +namespace facebook::velox::connector::hive::iceberg { + +class IcebergDeleteFile; + +class DeleteFileReader { + public: + DeleteFileReader( + const IcebergDeleteFile& deleteFile, + const std::string& baseFilePath, + FileHandleFactory* fileHandleFactory, + folly::Executor* executor, + ConnectorQueryCtx* connectorQueryCtx, + uint64_t splitOffset, + const std::string& connectorId); + + void readDeletePositions( + uint64_t baseReadOffset, + uint64_t size, + int8_t* deleteBitmap); + + bool endOfFile(); + + private: + void createPositionalDeleteDataSource( + const IcebergDeleteFile& deleteFile, + const std::string& connectorId); + + void readDeletePositionsVec( + uint64_t baseReadOffset, + int64_t rowNumberUpperBound, + int8_t* deleteBitmap); + + bool readFinishedForBatch(int64_t rowNumberUpperBound); + + const IcebergDeleteFile& deleteFile_; + const std::string& baseFilePath_; + FileHandleFactory* const fileHandleFactory_; + folly::Executor* const executor_; + ConnectorQueryCtx* const connectorQueryCtx_; + uint64_t splitOffset_; + + std::unique_ptr deleteDataSource_; + VectorPtr deletePositionsVector_; + uint64_t deletePositionsOffset_; + bool endOfFile_; +}; + +} // namespace facebook::velox::connector::hive::iceberg \ No newline at end of file diff --git a/velox/connectors/hive/iceberg/IcebergDeleteFile.h b/velox/connectors/hive/iceberg/IcebergDeleteFile.h new file mode 100644 index 000000000000..1cff2b8efbe2 --- /dev/null +++ b/velox/connectors/hive/iceberg/IcebergDeleteFile.h @@ -0,0 +1,69 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include "velox/dwio/common/Options.h" + +#include +#include +#include + +namespace facebook::velox::connector::hive::iceberg { + +enum class FileContent { + kData, + kPositionalDeletes, + kEqualityDeletes, +}; + +struct IcebergDeleteFile { + FileContent content; + const std::string filePath; + dwio::common::FileFormat fileFormat; + uint64_t recordCount; + uint64_t fileSizeInBytes; + // The field ids for the delete columns for equality delete files + std::vector equalityFieldIds; + // The lower bounds of the in-file positions for the deleted rows, identified + // by each column's field id. E.g. The deleted rows for a column with field id + // 1 is in range [10, 50], where 10 and 50 are the deleted row positions in + // the data file, then lowerBounds would contain entry <1, "10"> + std::unordered_map lowerBounds; + // The upper bounds of the in-file positions for the deleted rows, identified + // by each column's field id. E.g. The deleted rows for a column with field id + // 1 is in range [10, 50], then upperBounds will contain entry <1, "50"> + std::unordered_map upperBounds; + + IcebergDeleteFile( + FileContent _content, + const std::string& _filePath, + dwio::common::FileFormat _fileFormat, + uint64_t _recordCount, + uint64_t _fileSizeInBytes, + std::vector _equalityFieldIds = {}, + std::unordered_map _lowerBounds = {}, + std::unordered_map _upperBounds = {}) + : content(_content), + filePath(_filePath), + fileFormat(_fileFormat), + recordCount(_recordCount), + fileSizeInBytes(_fileSizeInBytes), + equalityFieldIds(_equalityFieldIds), + lowerBounds(_lowerBounds), + upperBounds(_upperBounds) {} +}; + +} // namespace facebook::velox::connector::hive::iceberg \ No newline at end of file diff --git a/velox/connectors/hive/iceberg/IcebergMetadataColumns.h b/velox/connectors/hive/iceberg/IcebergMetadataColumns.h new file mode 100644 index 000000000000..d4442321e53a --- /dev/null +++ b/velox/connectors/hive/iceberg/IcebergMetadataColumns.h @@ -0,0 +1,56 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "velox/type/Type.h" + +#include + +namespace facebook::velox::connector::hive::iceberg { + +const std::string kIcebergDeleteFilePathColumn = "file_path"; +const std::string kIcebergRowPositionColumn = "pos"; + +struct IcebergMetadataColumn { + int id; + std::string name; + std::shared_ptr type; + std::string doc; + + IcebergMetadataColumn( + int _id, + const std::string& _name, + std::shared_ptr _type, + const std::string& _doc) + : id(_id), name(_name), type(_type), doc(_doc) {} +}; + +#define ICEBERG_DELETE_FILE_PATH_COLUMN() \ + std::make_shared( \ + 2147483546, \ + kIcebergDeleteFilePathColumn, \ + VARCHAR(), \ + "Path of a file in which a deleted row is stored"); + +#define ICEBERG_DELETE_FILE_POSITIONS_COLUMN() \ + std::make_shared( \ + 2147483545, \ + kIcebergRowPositionColumn, \ + BIGINT(), \ + "Ordinal position of a deleted row in the data file"); + +} // namespace facebook::velox::connector::hive::iceberg \ No newline at end of file diff --git a/velox/connectors/hive/iceberg/IcebergSplit.cpp b/velox/connectors/hive/iceberg/IcebergSplit.cpp new file mode 100644 index 000000000000..fe256b073b17 --- /dev/null +++ b/velox/connectors/hive/iceberg/IcebergSplit.cpp @@ -0,0 +1,69 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/connectors/hive/iceberg/IcebergSplit.h" + +#include "velox/connectors/hive/iceberg/IcebergDeleteFile.h" + +namespace facebook::velox::connector::hive::iceberg { + +HiveIcebergSplit::HiveIcebergSplit( + const std::string& _connectorId, + const std::string& _filePath, + dwio::common::FileFormat _fileFormat, + uint64_t _start, + uint64_t _length, + const std::unordered_map>& + _partitionKeys, + std::optional _tableBucketNumber, + const std::unordered_map& _customSplitInfo, + const std::shared_ptr& _extraFileInfo) + : HiveConnectorSplit( + _connectorId, + _filePath, + _fileFormat, + _start, + _length, + _partitionKeys, + _tableBucketNumber) { + // TODO: Deserialize _extraFileInfo to get deleteFiles; +} + +// For tests only +HiveIcebergSplit::HiveIcebergSplit( + const std::string& _connectorId, + const std::string& _filePath, + dwio::common::FileFormat _fileFormat, + uint64_t _start, + uint64_t _length, + const std::unordered_map>& + _partitionKeys, + std::optional _tableBucketNumber, + const std::unordered_map& _customSplitInfo, + const std::shared_ptr& _extraFileInfo, + std::vector _deletes) + : HiveConnectorSplit( + _connectorId, + _filePath, + _fileFormat, + _start, + _length, + _partitionKeys, + _tableBucketNumber, + _customSplitInfo, + _extraFileInfo), + deleteFiles(_deletes) {} +} // namespace facebook::velox::connector::hive::iceberg \ No newline at end of file diff --git a/velox/connectors/hive/iceberg/IcebergSplit.h b/velox/connectors/hive/iceberg/IcebergSplit.h new file mode 100644 index 000000000000..25f1e6a4e4b1 --- /dev/null +++ b/velox/connectors/hive/iceberg/IcebergSplit.h @@ -0,0 +1,56 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include "velox/connectors/hive/HiveConnectorSplit.h" + +#include + +namespace facebook::velox::connector::hive::iceberg { + +class IcebergDeleteFile; + +struct HiveIcebergSplit : public connector::hive::HiveConnectorSplit { + std::vector deleteFiles; + + HiveIcebergSplit( + const std::string& connectorId, + const std::string& _filePath, + dwio::common::FileFormat _fileFormat, + uint64_t _start = 0, + uint64_t _length = std::numeric_limits::max(), + const std::unordered_map>& + _partitionKeys = {}, + std::optional _tableBucketNumber = std::nullopt, + const std::unordered_map& _customSplitInfo = {}, + const std::shared_ptr& _extraFileInfo = {}); + + // For tests only + HiveIcebergSplit( + const std::string& connectorId, + const std::string& _filePath, + dwio::common::FileFormat _fileFormat, + uint64_t _start = 0, + uint64_t _length = std::numeric_limits::max(), + const std::unordered_map>& + _partitionKeys = {}, + std::optional _tableBucketNumber = std::nullopt, + const std::unordered_map& _customSplitInfo = {}, + const std::shared_ptr& _extraFileInfo = {}, + std::vector deletes = {}); +}; + +} // namespace facebook::velox::connector::hive::iceberg \ No newline at end of file diff --git a/velox/connectors/hive/iceberg/IcebergSplitReader.cpp b/velox/connectors/hive/iceberg/IcebergSplitReader.cpp new file mode 100644 index 000000000000..5c4cbea344ec --- /dev/null +++ b/velox/connectors/hive/iceberg/IcebergSplitReader.cpp @@ -0,0 +1,105 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/connectors/hive/iceberg/IcebergSplitReader.h" + +#include "velox/connectors/hive/iceberg/IcebergDeleteFile.h" +#include "velox/connectors/hive/iceberg/IcebergSplit.h" +#include "velox/dwio/common/BufferUtil.h" + +using namespace facebook::velox::dwio::common; + +namespace facebook::velox::connector::hive::iceberg { + +IcebergSplitReader::IcebergSplitReader( + std::shared_ptr hiveSplit, + std::shared_ptr hiveTableHandle, + std::shared_ptr scanSpec, + const RowTypePtr readerOutputType, + std::unordered_map>* + partitionKeys, + FileHandleFactory* fileHandleFactory, + ConnectorQueryCtx* connectorQueryCtx, + folly::Executor* executor, + std::shared_ptr ioStats) + : SplitReader( + hiveSplit, + hiveTableHandle, + scanSpec, + readerOutputType, + partitionKeys, + fileHandleFactory, + connectorQueryCtx, + executor, + ioStats) {} + +void IcebergSplitReader::prepareSplit( + std::shared_ptr metadataFilter, + dwio::common::RuntimeStatistics& runtimeStats) { + SplitReader::prepareSplit(metadataFilter, runtimeStats); + baseReadOffset_ = 0; + deleteFileReaders_.clear(); + splitOffset_ = baseRowReader_->nextRowNumber(); + + // TODO: Deserialize the std::vector deleteFiles. For now + // we assume it's already deserialized. + std::shared_ptr icebergSplit = + std::dynamic_pointer_cast(hiveSplit_); + + const auto& deleteFiles = icebergSplit->deleteFiles; + for (const auto& deleteFile : deleteFiles) { + deleteFileReaders_.push_back(std::make_unique( + deleteFile, + hiveSplit_->filePath, + fileHandleFactory_, + executor_, + connectorQueryCtx_, + splitOffset_, + hiveSplit_->connectorId)); + } +} + +uint64_t IcebergSplitReader::next(int64_t size, VectorPtr& output) { + Mutation mutation; + mutation.deletedRows = nullptr; + + if (!deleteFileReaders_.empty()) { + auto numBytes = bits::nbytes(size); + dwio::common::ensureCapacity( + deleteBitmap_, numBytes, connectorQueryCtx_->memoryPool()); + std::memset((void*)deleteBitmap_->as(), 0L, numBytes); + + for (auto iter = deleteFileReaders_.begin(); + iter != deleteFileReaders_.end(); + iter++) { + (*iter)->readDeletePositions( + baseReadOffset_, size, deleteBitmap_->asMutable()); + if ((*iter)->endOfFile()) { + iter = deleteFileReaders_.erase(iter); + } + } + + deleteBitmap_->setSize(numBytes); + mutation.deletedRows = deleteBitmap_->as(); + } + + auto rowsScanned = baseRowReader_->next(size, output, &mutation); + baseReadOffset_ += rowsScanned; + + return rowsScanned; +} + +} // namespace facebook::velox::connector::hive::iceberg \ No newline at end of file diff --git a/velox/connectors/hive/iceberg/IcebergSplitReader.h b/velox/connectors/hive/iceberg/IcebergSplitReader.h new file mode 100644 index 000000000000..520ff5e9aa4b --- /dev/null +++ b/velox/connectors/hive/iceberg/IcebergSplitReader.h @@ -0,0 +1,61 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "velox/connectors/hive/SplitReader.h" +#include "velox/connectors/hive/iceberg/DeleteFileReader.h" + +namespace facebook::velox::connector { +class ColumnHandle; +} + +namespace facebook::velox::connector::hive::iceberg { + +class IcebergDeleteFile; + +class IcebergSplitReader : public SplitReader { + public: + IcebergSplitReader( + std::shared_ptr hiveSplit, + std::shared_ptr hiveTableHandle, + std::shared_ptr scanSpec, + const RowTypePtr readerOutputType, + std::unordered_map>* + partitionKeys, + FileHandleFactory* fileHandleFactory, + ConnectorQueryCtx* connectorQueryCtx, + folly::Executor* executor, + std::shared_ptr ioStats); + + ~IcebergSplitReader() override = default; + + void prepareSplit( + std::shared_ptr metadataFilter, + dwio::common::RuntimeStatistics& runtimeStats) override; + + uint64_t next(int64_t size, VectorPtr& output) override; + + private: + // The read offset to the beginning of the split in number of rows for the + // current batch for the base data file + uint64_t baseReadOffset_; + // The file position for the first row in the split + uint64_t splitOffset_; + std::list> deleteFileReaders_; + BufferPtr deleteBitmap_; +}; +} // namespace facebook::velox::connector::hive::iceberg diff --git a/velox/connectors/hive/iceberg/cmake/Config.cmake.in b/velox/connectors/hive/iceberg/cmake/Config.cmake.in new file mode 100644 index 000000000000..0bf51ecc28f5 --- /dev/null +++ b/velox/connectors/hive/iceberg/cmake/Config.cmake.in @@ -0,0 +1,4 @@ +@PACKAGE_INIT@ + +include("${CMAKE_CURRENT_LIST_DIR}/iceberg.cmake") +check_required_components("@PROJECT_NAME@") diff --git a/velox/connectors/hive/iceberg/tests/CMakeLists.txt b/velox/connectors/hive/iceberg/tests/CMakeLists.txt new file mode 100644 index 000000000000..63603c724ec2 --- /dev/null +++ b/velox/connectors/hive/iceberg/tests/CMakeLists.txt @@ -0,0 +1,34 @@ +# Copyright (c) Facebook, Inc. and its affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +if(NOT VELOX_DISABLE_GOOGLETEST) + + add_executable(velox_hive_iceberg_test IcebergReadTest.cpp) + add_test(velox_hive_iceberg_test velox_hive_iceberg_test) + + target_link_libraries( + velox_hive_iceberg_test + velox_hive_connector + velox_hive_iceberg_splitreader + velox_hive_partition_function + velox_dwio_common_exception + velox_dwio_common_test_utils + velox_dwio_dwrf_proto + velox_vector_test_lib + velox_exec + velox_exec_test_lib + Folly::folly + gtest + gtest_main) + +endif() diff --git a/velox/connectors/hive/iceberg/tests/IcebergReadTest.cpp b/velox/connectors/hive/iceberg/tests/IcebergReadTest.cpp new file mode 100644 index 000000000000..6b599861312a --- /dev/null +++ b/velox/connectors/hive/iceberg/tests/IcebergReadTest.cpp @@ -0,0 +1,212 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/common/file/FileSystems.h" +#include "velox/connectors/hive/HiveConnectorSplit.h" +#include "velox/connectors/hive/iceberg/IcebergDeleteFile.h" +#include "velox/connectors/hive/iceberg/IcebergMetadataColumns.h" +#include "velox/connectors/hive/iceberg/IcebergSplit.h" +#include "velox/exec/PlanNodeStats.h" +#include "velox/exec/tests/utils/HiveConnectorTestBase.h" +#include "velox/exec/tests/utils/PlanBuilder.h" + +#include + +using namespace facebook::velox::exec::test; +using namespace facebook::velox::exec; +using namespace facebook::velox::dwio::common; +using namespace facebook::velox::test; + +namespace facebook::velox::connector::hive::iceberg { + +class HiveIcebergTest : public HiveConnectorTestBase { + public: + void assertPositionalDeletes(const std::vector& deleteRows) { + assertPositionalDeletes( + deleteRows, + "SELECT * FROM tmp WHERE c0 NOT IN (" + makeNotInList(deleteRows) + + ")"); + } + void assertPositionalDeletes( + const std::vector& deleteRows, + std::string duckdbSql) { + std::shared_ptr dataFilePath = writeDataFile(rowCount); + std::shared_ptr deleteFilePath = + writePositionDeleteFile(dataFilePath->path, deleteRows); + + IcebergDeleteFile deleteFile( + FileContent::kPositionalDeletes, + deleteFilePath->path, + fileFomat_, + deleteRows.size(), + testing::internal::GetFileSize( + std::fopen(deleteFilePath->path.c_str(), "r"))); + + auto icebergSplit = makeIcebergSplit(dataFilePath->path, {deleteFile}); + + auto plan = tableScanNode(); + auto task = OperatorTestBase::assertQuery(plan, {icebergSplit}, duckdbSql); + + auto planStats = toPlanStats(task->taskStats()); + auto scanNodeId = plan->id(); + auto it = planStats.find(scanNodeId); + ASSERT_TRUE(it != planStats.end()); + ASSERT_TRUE(it->second.peakMemoryBytes > 0); + } + + std::vector makeRandomDeleteRows(int32_t maxRowNumber) { + std::mt19937 gen{0}; + std::vector deleteRows; + for (int i = 0; i < maxRowNumber; i++) { + if (folly::Random::rand32(0, 10, gen) > 8) { + deleteRows.push_back(i); + } + } + return deleteRows; + } + + std::vector makeSequenceRows(int32_t maxRowNumber) { + std::vector deleteRows; + deleteRows.resize(maxRowNumber); + std::iota(deleteRows.begin(), deleteRows.end(), 0); + return deleteRows; + } + + const static int rowCount = 20000; + + private: + std::shared_ptr makeIcebergSplit( + const std::string& dataFilePath, + const std::vector& deleteFiles = {}) { + std::unordered_map> partitionKeys; + std::unordered_map customSplitInfo; + customSplitInfo["table_format"] = "hive_iceberg"; + + auto file = filesystems::getFileSystem(dataFilePath, nullptr) + ->openFileForRead(dataFilePath); + const int64_t fileSize = file->size(); + + return std::make_shared( + kHiveConnectorId, + dataFilePath, + fileFomat_, + 0, + fileSize, + partitionKeys, + std::nullopt, + customSplitInfo, + nullptr, + deleteFiles); + } + + std::vector makeVectors(int32_t count, int32_t rowsPerVector) { + std::vector vectors; + + for (int i = 0; i < count; i++) { + auto data = makeSequenceRows(rowsPerVector); + VectorPtr c0 = vectorMaker_.flatVector(data); + vectors.push_back(makeRowVector({"c0"}, {c0})); + } + + return vectors; + } + + std::shared_ptr writeDataFile(uint64_t numRows) { + auto dataVectors = makeVectors(1, numRows); + + auto dataFilePath = TempFilePath::create(); + writeToFile(dataFilePath->path, dataVectors); + createDuckDbTable(dataVectors); + return dataFilePath; + } + + std::shared_ptr writePositionDeleteFile( + const std::string& dataFilePath, + const std::vector& deleteRows) { + uint32_t numDeleteRows = deleteRows.size(); + + auto child = vectorMaker_.flatVector(std::vector{1UL}); + + auto filePathVector = vectorMaker_.flatVector( + numDeleteRows, [&](auto row) { return StringView(dataFilePath); }); + auto deletePositionsVector = vectorMaker_.flatVector(deleteRows); + RowVectorPtr deleteFileVectors = makeRowVector( + {pathColumn_->name, posColumn_->name}, + {filePathVector, deletePositionsVector}); + + auto deleteFilePath = TempFilePath::create(); + writeToFile(deleteFilePath->path, deleteFileVectors); + + return deleteFilePath; + } + + std::string makeNotInList(const std::vector& deleteRows) { + if (deleteRows.empty()) { + return ""; + } + + return std::accumulate( + deleteRows.begin() + 1, + deleteRows.end(), + std::to_string(deleteRows[0]), + [](const std::string& a, int64_t b) { + return a + ", " + std::to_string(b); + }); + } + + std::shared_ptr assertQuery( + const core::PlanNodePtr& plan, + std::shared_ptr dataFilePath, + const std::vector& deleteFiles, + const std::string& duckDbSql) { + auto icebergSplit = makeIcebergSplit(dataFilePath->path, deleteFiles); + return OperatorTestBase::assertQuery(plan, {icebergSplit}, duckDbSql); + } + + core::PlanNodePtr tableScanNode() { + return PlanBuilder(pool_.get()).tableScan(rowType_).planNode(); + } + + private: + dwio::common::FileFormat fileFomat_{dwio::common::FileFormat::DWRF}; + RowTypePtr rowType_{ROW({"c0"}, {BIGINT()})}; + std::shared_ptr pathColumn_ = + ICEBERG_DELETE_FILE_PATH_COLUMN(); + std::shared_ptr posColumn_ = + ICEBERG_DELETE_FILE_POSITIONS_COLUMN(); +}; + +TEST_F(HiveIcebergTest, positionalDeletes) { + folly::SingletonVault::singleton()->registrationComplete(); + + // Delete row 0, 1, 2, 3 from the first batch out of two. + assertPositionalDeletes({0, 1, 2, 3}); + // Delete the first and last row in each batch (10000 rows per batch) + assertPositionalDeletes({0, 9999, 10000, 19999}); + // Delete several rows in the second batch (10000 rows per batch) + assertPositionalDeletes({10000, 10002, 19999}); + // Delete random rows + assertPositionalDeletes(makeRandomDeleteRows(rowCount)); + // Delete 0 rows + assertPositionalDeletes({}, "SELECT * FROM tmp"); + // Delete all rows + assertPositionalDeletes( + makeSequenceRows(rowCount), "SELECT * FROM tmp WHERE 1 = 0"); + // Delete rows that don't exist + assertPositionalDeletes({20000, 29999}); +} + +} // namespace facebook::velox::connector::hive::iceberg \ No newline at end of file diff --git a/velox/dwio/dwrf/test/CMakeLists.txt b/velox/dwio/dwrf/test/CMakeLists.txt index 063fdc8b26da..026b5bc48df2 100644 --- a/velox/dwio/dwrf/test/CMakeLists.txt +++ b/velox/dwio/dwrf/test/CMakeLists.txt @@ -347,6 +347,7 @@ add_test(velox_dwrf_e2e_filter_test velox_dwrf_e2e_filter_test) target_link_libraries( velox_dwrf_e2e_filter_test velox_e2e_filter_test_base + velox_hive_connector velox_link_libs velox_test_util Folly::folly diff --git a/velox/dwio/parquet/tests/reader/CMakeLists.txt b/velox/dwio/parquet/tests/reader/CMakeLists.txt index ff8aa8a9c27e..c94ab590f638 100644 --- a/velox/dwio/parquet/tests/reader/CMakeLists.txt +++ b/velox/dwio/parquet/tests/reader/CMakeLists.txt @@ -43,6 +43,7 @@ target_link_libraries( ZLIB::ZLIB ${TEST_LINK_LIBS}) +message(STATUS "velox_hive_connector_libs ${velox_hive_connector_libs}") add_executable(velox_dwio_parquet_reader_benchmark ParquetReaderBenchmark.cpp) target_link_libraries( velox_dwio_parquet_reader_benchmark From 6edc1dbdb520b4512c56c2410e875a50f84f62bf Mon Sep 17 00:00:00 2001 From: yingsu00 Date: Wed, 8 Nov 2023 20:29:25 -0800 Subject: [PATCH 6/6] Add subfield filter for the delete file path column --- .../hive/iceberg/DeleteFileReader.cpp | 8 +- .../hive/iceberg/tests/IcebergReadTest.cpp | 98 ++++++++++++++++--- 2 files changed, 90 insertions(+), 16 deletions(-) diff --git a/velox/connectors/hive/iceberg/DeleteFileReader.cpp b/velox/connectors/hive/iceberg/DeleteFileReader.cpp index 143f2e73609b..daa428e2475c 100644 --- a/velox/connectors/hive/iceberg/DeleteFileReader.cpp +++ b/velox/connectors/hive/iceberg/DeleteFileReader.cpp @@ -18,6 +18,7 @@ #include "velox/connectors/hive/iceberg/IcebergDeleteFile.h" #include "velox/connectors/hive/iceberg/IcebergMetadataColumns.h" +#include "velox/type/Filter.h" namespace facebook::velox::connector::hive::iceberg { DeleteFileReader::DeleteFileReader( @@ -82,7 +83,12 @@ void DeleteFileReader::createPositionalDeleteDataSource( // TODO: Build filters on the path column: filePathColumn = baseFilePath_ // TODO: Build filters on the positionsColumn: // positionsColumn >= baseReadOffset_ + splitOffsetInFile - SubfieldFilters subfieldFilters = {}; + SubfieldFilters subfieldFilters; + std::vector values = {baseFilePath_}; + std::unique_ptr pathFilter = + std::make_unique(values, false); + subfieldFilters[common::Subfield(filePathColumn->name)] = + std::move(pathFilter); auto deleteTableHandle = std::make_shared( connectorId, deleteFileName, false, std::move(subfieldFilters), nullptr); diff --git a/velox/connectors/hive/iceberg/tests/IcebergReadTest.cpp b/velox/connectors/hive/iceberg/tests/IcebergReadTest.cpp index 6b599861312a..3b6c24d38d9a 100644 --- a/velox/connectors/hive/iceberg/tests/IcebergReadTest.cpp +++ b/velox/connectors/hive/iceberg/tests/IcebergReadTest.cpp @@ -34,24 +34,36 @@ namespace facebook::velox::connector::hive::iceberg { class HiveIcebergTest : public HiveConnectorTestBase { public: - void assertPositionalDeletes(const std::vector& deleteRows) { + void assertPositionalDeletes( + const std::vector& deleteRows, + bool multipleBaseFiles = false) { assertPositionalDeletes( deleteRows, - "SELECT * FROM tmp WHERE c0 NOT IN (" + makeNotInList(deleteRows) + - ")"); + "SELECT * FROM tmp WHERE c0 NOT IN (" + makeNotInList(deleteRows) + ")", + multipleBaseFiles); } void assertPositionalDeletes( const std::vector& deleteRows, - std::string duckdbSql) { + std::string duckdbSql, + bool multipleBaseFiles = false) { std::shared_ptr dataFilePath = writeDataFile(rowCount); - std::shared_ptr deleteFilePath = - writePositionDeleteFile(dataFilePath->path, deleteRows); + + std::mt19937 gen{0}; + int64_t numDeleteRowsBefore = + multipleBaseFiles ? folly::Random::rand32(0, 1000, gen) : 0; + int64_t numDeleteRowsAfter = + multipleBaseFiles ? folly::Random::rand32(0, 1000, gen) : 0; + std::shared_ptr deleteFilePath = writePositionDeleteFile( + dataFilePath->path, + deleteRows, + numDeleteRowsBefore, + numDeleteRowsAfter); IcebergDeleteFile deleteFile( FileContent::kPositionalDeletes, deleteFilePath->path, fileFomat_, - deleteRows.size(), + deleteRows.size() + numDeleteRowsBefore + numDeleteRowsAfter, testing::internal::GetFileSize( std::fopen(deleteFilePath->path.c_str(), "r"))); @@ -135,14 +147,50 @@ class HiveIcebergTest : public HiveConnectorTestBase { std::shared_ptr writePositionDeleteFile( const std::string& dataFilePath, - const std::vector& deleteRows) { - uint32_t numDeleteRows = deleteRows.size(); + const std::vector& deleteRows, + int64_t numRowsBefore = 0, + int64_t numRowsAfter = 0) { + // if containsMultipleDataFiles == true, we will write rows for other base + // files before and after the target base file + uint32_t numDeleteRows = numRowsBefore + deleteRows.size() + numRowsAfter; auto child = vectorMaker_.flatVector(std::vector{1UL}); - auto filePathVector = vectorMaker_.flatVector( - numDeleteRows, [&](auto row) { return StringView(dataFilePath); }); - auto deletePositionsVector = vectorMaker_.flatVector(deleteRows); + auto filePathVector = + vectorMaker_.flatVector(numDeleteRows, [&](auto row) { + if (row < numRowsBefore) { + std::string dataFilePathBefore = dataFilePath + "_before"; + return StringView(dataFilePathBefore); + } else if ( + row >= numRowsBefore && row < deleteRows.size() + numRowsBefore) { + return StringView(dataFilePath); + } else if ( + row >= deleteRows.size() + numRowsBefore && row < numDeleteRows) { + std::string dataFilePathAfter = dataFilePath + "_after"; + return StringView(dataFilePathAfter); + } else { + return StringView(); + } + }); + + std::vector deleteRowsVec; + deleteRowsVec.reserve(numDeleteRows); + + if (numRowsBefore > 0) { + auto rowsBefore = makeSequenceRows(numRowsBefore); + deleteRowsVec.insert( + deleteRowsVec.end(), rowsBefore.begin(), rowsBefore.end()); + } + deleteRowsVec.insert( + deleteRowsVec.end(), deleteRows.begin(), deleteRows.end()); + if (numRowsAfter > 0) { + auto rowsAfter = makeSequenceRows(numRowsAfter); + deleteRowsVec.insert( + deleteRowsVec.end(), rowsAfter.begin(), rowsAfter.end()); + } + + auto deletePositionsVector = + vectorMaker_.flatVector(deleteRowsVec); RowVectorPtr deleteFileVectors = makeRowVector( {pathColumn_->name, posColumn_->name}, {filePathVector, deletePositionsVector}); @@ -189,7 +237,7 @@ class HiveIcebergTest : public HiveConnectorTestBase { ICEBERG_DELETE_FILE_POSITIONS_COLUMN(); }; -TEST_F(HiveIcebergTest, positionalDeletes) { +TEST_F(HiveIcebergTest, positionalDeletesSingleBaseFile) { folly::SingletonVault::singleton()->registrationComplete(); // Delete row 0, 1, 2, 3 from the first batch out of two. @@ -201,12 +249,32 @@ TEST_F(HiveIcebergTest, positionalDeletes) { // Delete random rows assertPositionalDeletes(makeRandomDeleteRows(rowCount)); // Delete 0 rows - assertPositionalDeletes({}, "SELECT * FROM tmp"); + assertPositionalDeletes({}, "SELECT * FROM tmp", false); // Delete all rows assertPositionalDeletes( - makeSequenceRows(rowCount), "SELECT * FROM tmp WHERE 1 = 0"); + makeSequenceRows(rowCount), "SELECT * FROM tmp WHERE 1 = 0", false); // Delete rows that don't exist assertPositionalDeletes({20000, 29999}); } +TEST_F(HiveIcebergTest, positionalDeletesMultipleBaseFiles) { + folly::SingletonVault::singleton()->registrationComplete(); + + // // Delete row 0, 1, 2, 3 from the first batch out of two. + // assertPositionalDeletes({0, 1, 2, 3}, true); + // Delete the first and last row in each batch (10000 rows per batch) + assertPositionalDeletes({0, 9999, 10000, 19999}, true); + // Delete several rows in the second batch (10000 rows per batch) + assertPositionalDeletes({10000, 10002, 19999}, true); + // Delete random rows + assertPositionalDeletes(makeRandomDeleteRows(rowCount), true); + // Delete 0 rows + assertPositionalDeletes({}, "SELECT * FROM tmp", true); + // Delete all rows + assertPositionalDeletes( + makeSequenceRows(rowCount), "SELECT * FROM tmp WHERE 1 = 0", true); + // Delete rows that don't exist + assertPositionalDeletes({20000, 29999}, true); +} + } // namespace facebook::velox::connector::hive::iceberg \ No newline at end of file