From 286eabacd9f179112f2bf028a5e701b85081ebde Mon Sep 17 00:00:00 2001 From: yingsu00 Date: Thu, 27 Jul 2023 20:54:38 -0700 Subject: [PATCH 1/2] Add support to create ordered vector in BatchMaker --- velox/dwio/common/tests/utils/BatchMaker.cpp | 52 +++++++++++++++----- velox/dwio/common/tests/utils/BatchMaker.h | 8 +++ 2 files changed, 49 insertions(+), 11 deletions(-) diff --git a/velox/dwio/common/tests/utils/BatchMaker.cpp b/velox/dwio/common/tests/utils/BatchMaker.cpp index 52ce6973b80e..207677e9c428 100644 --- a/velox/dwio/common/tests/utils/BatchMaker.cpp +++ b/velox/dwio/common/tests/utils/BatchMaker.cpp @@ -41,7 +41,7 @@ template VectorPtr createScalar( size_t size, std::mt19937& gen, - std::function val, + std::function val, MemoryPool& pool, std::function isNullAt, const TypePtr type = CppToType::create()) { @@ -56,7 +56,7 @@ VectorPtr createScalar( auto notNull = isNotNull(gen, i, isNullAt); bits::setNull(nullsPtr, i, !notNull); if (notNull) { - valuesPtr[i] = val(); + valuesPtr[i] = val(i); } else { nullCount++; } @@ -86,7 +86,7 @@ VectorPtr BatchMaker::createVector( return createScalar( size, gen, - [&gen]() { return Random::rand32(0, 2, gen) == 0; }, + [&gen](size_t) { return Random::rand32(0, 2, gen) == 0; }, pool, isNullAt); } @@ -101,7 +101,7 @@ VectorPtr BatchMaker::createVector( return createScalar( size, gen, - [&gen]() { return static_cast(Random::rand32(gen)); }, + [&gen](size_t) { return static_cast(Random::rand32(gen)); }, pool, isNullAt); } @@ -116,7 +116,7 @@ VectorPtr BatchMaker::createVector( return createScalar( size, gen, - [&gen]() { return static_cast(Random::rand32(gen)); }, + [&gen](size_t) { return static_cast(Random::rand32(gen)); }, pool, isNullAt); } @@ -131,7 +131,7 @@ VectorPtr BatchMaker::createVector( return createScalar( size, gen, - [&gen]() { return static_cast(Random::rand32(gen)); }, + [&gen](size_t) { return static_cast(Random::rand32(gen)); }, pool, isNullAt); } @@ -144,7 +144,11 @@ VectorPtr BatchMaker::createVector( std::mt19937& gen, std::function isNullAt) { return createScalar( - size, gen, [&gen]() { return Random::rand64(gen); }, pool, isNullAt); + size, + gen, + [&gen](size_t) { return Random::rand64(gen); }, + pool, + isNullAt); } template <> @@ -157,7 +161,7 @@ VectorPtr BatchMaker::createVector( return createScalar( size, gen, - [&gen]() { return static_cast(Random::randDouble01(gen)); }, + [&gen](size_t) { return static_cast(Random::randDouble01(gen)); }, pool, isNullAt); } @@ -172,7 +176,7 @@ VectorPtr BatchMaker::createVector( return createScalar( size, gen, - [&gen]() { return Random::randDouble01(gen); }, + [&gen](size_t) { return Random::randDouble01(gen); }, pool, isNullAt); } @@ -187,7 +191,7 @@ VectorPtr BatchMaker::createVector( return createScalar( size, gen, - [&gen]() { + [&gen](size_t) { return HugeInt::build(Random::rand32(gen), Random::rand32(gen)); }, pool, @@ -270,7 +274,7 @@ VectorPtr BatchMaker::createVector( return createScalar( size, gen, - [&gen]() { + [&gen](size_t) { return Timestamp( TIME_OFFSET + Random::rand32(0, 60 * 60 * 24 * 365, gen), Random::rand32(0, 1'000'000, gen)); @@ -596,6 +600,32 @@ VectorPtr BatchMaker::createVector( &pool, type, nulls, size, offsets, lengths, keys, values, nullCount); } +template +VectorPtr BatchMaker::createOrderedVector( + const std::shared_ptr& type, + size_t size, + size_t base, + memory::MemoryPool& pool, + std::function isNullAt) { + VELOX_NYI(); +} + +template <> +VectorPtr BatchMaker::createOrderedVector( + const std::shared_ptr& type, + size_t size, + size_t base, + memory::MemoryPool& pool, + std::function isNullAt) { + std::mt19937 gen{0}; + return createScalar( + size, + gen, + [&base](size_t i) { return static_cast(i) + base; }, + pool, + isNullAt); +} + VectorPtr BatchMaker::createBatch( const std::shared_ptr& type, uint64_t capacity, diff --git a/velox/dwio/common/tests/utils/BatchMaker.h b/velox/dwio/common/tests/utils/BatchMaker.h index 10588f30702d..e1db818ab822 100644 --- a/velox/dwio/common/tests/utils/BatchMaker.h +++ b/velox/dwio/common/tests/utils/BatchMaker.h @@ -62,6 +62,14 @@ struct BatchMaker { std::mt19937 gen{seed}; return createVector(type, size, pool, gen, isNullAt); } + + template + static VectorPtr createOrderedVector( + const std::shared_ptr& type, + size_t size, + size_t base, + memory::MemoryPool& pool, + std::function isNullAt = nullptr); }; } // namespace facebook::velox::test From 2ab28160a10affc5b2d26488aed09680637c859a Mon Sep 17 00:00:00 2001 From: yingsu00 Date: Thu, 27 Jul 2023 20:56:38 -0700 Subject: [PATCH 2/2] Iceberg positional deletes read support This is the first cut to support reading Iceberg tables with delete files. The design doc can be found at https://github.com/facebookincubator/velox/issues/5977 --- velox/benchmarks/tpch/CMakeLists.txt | 1 + velox/connectors/Connector.h | 3 +- velox/connectors/fuzzer/FuzzerConnector.h | 4 +- velox/connectors/hive/CMakeLists.txt | 3 + velox/connectors/hive/HiveConnector.cpp | 42 +++- velox/connectors/hive/HiveConnector.h | 4 +- velox/connectors/hive/HiveDataSource.cpp | 7 +- velox/connectors/hive/HiveDataSource.h | 18 +- .../connectors/hive/benchmarks/CMakeLists.txt | 1 + velox/connectors/hive/iceberg/CMakeLists.txt | 32 +++ .../hive/iceberg/IcebergDataSource.cpp | 235 ++++++++++++++++++ .../hive/iceberg/IcebergDataSource.h | 69 +++++ .../hive/iceberg/IcebergDeleteFile.h | 69 +++++ .../hive/iceberg/IcebergMetadataColumn.h | 56 +++++ .../connectors/hive/iceberg/IcebergSplit.cpp | 67 +++++ velox/connectors/hive/iceberg/IcebergSplit.h | 53 ++++ .../hive/iceberg/tests/CMakeLists.txt | 29 +++ .../hive/iceberg/tests/IcebergReadTest.cpp | 230 +++++++++++++++++ .../hive/iceberg/tests/IcebergReadTest.h | 62 +++++ .../gcs/examples/CMakeLists.txt | 1 + .../storage_adapters/gcs/tests/CMakeLists.txt | 1 + .../hdfs/tests/CMakeLists.txt | 1 + .../s3fs/tests/CMakeLists.txt | 1 + velox/connectors/hive/tests/CMakeLists.txt | 1 + velox/connectors/tests/ConnectorTest.cpp | 4 +- velox/connectors/tpch/TpchConnector.h | 3 +- velox/dwio/common/tests/utils/DataFiles.h | 1 + velox/dwio/parquet/tests/CMakeLists.txt | 1 + .../dwio/parquet/tests/reader/CMakeLists.txt | 2 + velox/examples/CMakeLists.txt | 1 + velox/exec/TableScan.cpp | 7 +- velox/exec/tests/AsyncConnectorTest.cpp | 4 +- velox/exec/tests/CMakeLists.txt | 2 + velox/exec/tests/utils/CMakeLists.txt | 1 + .../exec/tests/utils/HiveConnectorTestBase.h | 2 +- .../codegen/benchmark/CMakeLists.txt | 1 + .../vector_function/tests/CMakeLists.txt | 1 + velox/expression/tests/CMakeLists.txt | 2 + .../aggregates/benchmarks/CMakeLists.txt | 1 + .../prestosql/aggregates/tests/CMakeLists.txt | 1 + .../sparksql/aggregates/tests/CMakeLists.txt | 1 + velox/substrait/tests/CMakeLists.txt | 1 + 42 files changed, 993 insertions(+), 33 deletions(-) create mode 100644 velox/connectors/hive/iceberg/CMakeLists.txt create mode 100644 velox/connectors/hive/iceberg/IcebergDataSource.cpp create mode 100644 velox/connectors/hive/iceberg/IcebergDataSource.h create mode 100644 velox/connectors/hive/iceberg/IcebergDeleteFile.h create mode 100644 velox/connectors/hive/iceberg/IcebergMetadataColumn.h create mode 100644 velox/connectors/hive/iceberg/IcebergSplit.cpp create mode 100644 velox/connectors/hive/iceberg/IcebergSplit.h create mode 100644 velox/connectors/hive/iceberg/tests/CMakeLists.txt create mode 100644 velox/connectors/hive/iceberg/tests/IcebergReadTest.cpp create mode 100644 velox/connectors/hive/iceberg/tests/IcebergReadTest.h diff --git a/velox/benchmarks/tpch/CMakeLists.txt b/velox/benchmarks/tpch/CMakeLists.txt index ef0147ad5c93..42ac1b5bac59 100644 --- a/velox/benchmarks/tpch/CMakeLists.txt +++ b/velox/benchmarks/tpch/CMakeLists.txt @@ -24,6 +24,7 @@ target_link_libraries( velox_dwio_parquet_reader velox_dwio_common_test_utils velox_hive_connector + velox_hive_iceberg_datasource velox_exception velox_memory velox_process diff --git a/velox/connectors/Connector.h b/velox/connectors/Connector.h index b8fa93f191dc..49519cd4695a 100644 --- a/velox/connectors/Connector.h +++ b/velox/connectors/Connector.h @@ -331,7 +331,8 @@ class Connector { const std::unordered_map< std::string, std::shared_ptr>& columnHandles, - ConnectorQueryCtx* FOLLY_NONNULL connectorQueryCtx) = 0; + ConnectorQueryCtx* FOLLY_NONNULL connectorQueryCtx, + std::shared_ptr connectorSplit) = 0; // Returns true if addSplit of DataSource can use 'dataSource' from // ConnectorSplit in addSplit(). If so, TableScan can preload splits diff --git a/velox/connectors/fuzzer/FuzzerConnector.h b/velox/connectors/fuzzer/FuzzerConnector.h index 89d544360b51..9b72b01f5fb6 100644 --- a/velox/connectors/fuzzer/FuzzerConnector.h +++ b/velox/connectors/fuzzer/FuzzerConnector.h @@ -113,7 +113,9 @@ class FuzzerConnector final : public Connector { const std::unordered_map< std::string, std::shared_ptr>& /*columnHandles*/, - ConnectorQueryCtx* FOLLY_NONNULL connectorQueryCtx) override final { + ConnectorQueryCtx* FOLLY_NONNULL connectorQueryCtx, + std::shared_ptr /*connectorSplit*/) + override final { return std::make_unique( outputType, tableHandle, connectorQueryCtx->memoryPool()); } diff --git a/velox/connectors/hive/CMakeLists.txt b/velox/connectors/hive/CMakeLists.txt index aad6e5f18554..92463ac3fcdc 100644 --- a/velox/connectors/hive/CMakeLists.txt +++ b/velox/connectors/hive/CMakeLists.txt @@ -32,6 +32,7 @@ target_link_libraries( velox_dwio_parquet_reader velox_dwio_parquet_writer velox_file + velox_hive_iceberg_datasource velox_hive_partition_function velox_s3fs velox_hdfs @@ -43,6 +44,8 @@ target_link_libraries(velox_hive_partition_function velox_core velox_exec) add_subdirectory(storage_adapters) +add_subdirectory(iceberg) + if(${VELOX_BUILD_TESTING}) add_subdirectory(tests) endif() diff --git a/velox/connectors/hive/HiveConnector.cpp b/velox/connectors/hive/HiveConnector.cpp index 49e99f3aef4e..a4f71833be22 100644 --- a/velox/connectors/hive/HiveConnector.cpp +++ b/velox/connectors/hive/HiveConnector.cpp @@ -40,6 +40,7 @@ #include "velox/dwio/parquet/RegisterParquetReader.h" // @manual #include "velox/dwio/parquet/RegisterParquetWriter.h" // @manual #endif +#include "velox/connectors/hive/iceberg/IcebergDataSource.h" #include "velox/expression/FieldReference.h" #include @@ -76,7 +77,8 @@ std::unique_ptr HiveConnector::createDataSource( const std::unordered_map< std::string, std::shared_ptr>& columnHandles, - ConnectorQueryCtx* connectorQueryCtx) { + ConnectorQueryCtx* connectorQueryCtx, + std::shared_ptr connectorSplit) { dwio::common::ReaderOptions options(connectorQueryCtx->memoryPool()); options.setMaxCoalesceBytes( HiveConfig::maxCoalescedBytes(connectorQueryCtx->config())); @@ -85,16 +87,34 @@ std::unique_ptr HiveConnector::createDataSource( options.setFileColumnNamesReadAsLowerCase( HiveConfig::isFileColumnNamesReadAsLowerCase( connectorQueryCtx->config())); - return std::make_unique( - outputType, - tableHandle, - columnHandles, - &fileHandleFactory_, - connectorQueryCtx->expressionEvaluator(), - connectorQueryCtx->cache(), - connectorQueryCtx->scanId(), - executor_, - options); + + auto splitInfo = + std::dynamic_pointer_cast(connectorSplit); + if (splitInfo->customSplitInfo["table_format"] == "hive_iceberg") { + return std::make_unique( + outputType, + tableHandle, + columnHandles, + &fileHandleFactory_, + connectorQueryCtx->expressionEvaluator(), + connectorQueryCtx->cache(), + connectorQueryCtx->scanId(), + executor_, + options, + connectorSplit); + } else { + return std::make_unique( + outputType, + tableHandle, + columnHandles, + &fileHandleFactory_, + connectorQueryCtx->expressionEvaluator(), + connectorQueryCtx->cache(), + connectorQueryCtx->scanId(), + executor_, + options, + connectorSplit); + } } std::unique_ptr HiveConnector::createDataSink( diff --git a/velox/connectors/hive/HiveConnector.h b/velox/connectors/hive/HiveConnector.h index edd94f7e6302..70f87e72e08f 100644 --- a/velox/connectors/hive/HiveConnector.h +++ b/velox/connectors/hive/HiveConnector.h @@ -51,7 +51,9 @@ class HiveConnector : public Connector { const std::unordered_map< std::string, std::shared_ptr>& columnHandles, - ConnectorQueryCtx* connectorQueryCtx) override; + ConnectorQueryCtx* connectorQueryCtx, + std::shared_ptr connectorSplit) + override; bool supportsSplitPreload() override { return true; diff --git a/velox/connectors/hive/HiveDataSource.cpp b/velox/connectors/hive/HiveDataSource.cpp index 45487d52362b..2ed15e66b4e3 100644 --- a/velox/connectors/hive/HiveDataSource.cpp +++ b/velox/connectors/hive/HiveDataSource.cpp @@ -389,15 +389,16 @@ HiveDataSource::HiveDataSource( cache::AsyncDataCache* cache, const std::string& scanId, folly::Executor* executor, - const dwio::common::ReaderOptions& options) + const dwio::common::ReaderOptions& options, + std::shared_ptr /*connectorSplit*/) : fileHandleFactory_(fileHandleFactory), readerOpts_(options), pool_(&options.getMemoryPool()), - outputType_(outputType), expressionEvaluator_(expressionEvaluator), cache_(cache), scanId_(scanId), - executor_(executor) { + executor_(executor), + outputType_(outputType) { // Column handled keyed on the column alias, the name used in the query. for (const auto& [canonicalizedName, columnHandle] : columnHandles) { auto handle = std::dynamic_pointer_cast(columnHandle); diff --git a/velox/connectors/hive/HiveDataSource.h b/velox/connectors/hive/HiveDataSource.h index 74e93195e68b..73588ed50d60 100644 --- a/velox/connectors/hive/HiveDataSource.h +++ b/velox/connectors/hive/HiveDataSource.h @@ -40,7 +40,8 @@ class HiveDataSource : public DataSource { cache::AsyncDataCache* cache, const std::string& scanId, folly::Executor* executor, - const dwio::common::ReaderOptions& options); + const dwio::common::ReaderOptions& options, + std::shared_ptr connectorSplit); void addSplit(std::shared_ptr split) override; @@ -101,12 +102,18 @@ class HiveDataSource : public DataSource { } std::shared_ptr split_; + std::shared_ptr scanSpec_; FileHandleFactory* fileHandleFactory_; dwio::common::ReaderOptions readerOpts_; memory::MemoryPool* pool_; VectorPtr output_; RowTypePtr readerOutputType_; + std::unique_ptr reader_; std::unique_ptr rowReader_; + core::ExpressionEvaluator* expressionEvaluator_; + cache::AsyncDataCache* const cache_{nullptr}; + const std::string& scanId_; + folly::Executor* executor_; private: // Evaluates remainingFilter_ on the specified vector. Returns number of rows @@ -143,27 +150,18 @@ class HiveDataSource : public DataSource { std::unordered_map> partitionKeys_; std::shared_ptr ioStats_; - std::shared_ptr scanSpec_; std::shared_ptr metadataFilter_; dwio::common::RowReaderOptions rowReaderOpts_; - std::unique_ptr reader_; std::unique_ptr remainingFilterExprSet_; bool emptySplit_; - dwio::common::RuntimeStatistics runtimeStats_; - std::shared_ptr fileHandle_; - core::ExpressionEvaluator* expressionEvaluator_; uint64_t completedRows_ = 0; // Reusable memory for remaining filter evaluation. VectorPtr filterResult_; SelectivityVector filterRows_; exec::FilterEvalCtx filterEvalCtx_; - - cache::AsyncDataCache* const cache_{nullptr}; - const std::string& scanId_; - folly::Executor* executor_; }; } // namespace facebook::velox::connector::hive diff --git a/velox/connectors/hive/benchmarks/CMakeLists.txt b/velox/connectors/hive/benchmarks/CMakeLists.txt index ddc3e0cf67ca..8ab6e92a7594 100644 --- a/velox/connectors/hive/benchmarks/CMakeLists.txt +++ b/velox/connectors/hive/benchmarks/CMakeLists.txt @@ -27,6 +27,7 @@ target_link_libraries( velox_exec velox_exec_test_lib velox_hive_connector + velox_hive_iceberg_datasource velox_hive_partition_function velox_memory Folly::folly diff --git a/velox/connectors/hive/iceberg/CMakeLists.txt b/velox/connectors/hive/iceberg/CMakeLists.txt new file mode 100644 index 000000000000..211ca58f7bcf --- /dev/null +++ b/velox/connectors/hive/iceberg/CMakeLists.txt @@ -0,0 +1,32 @@ +# Copyright (c) Facebook, Inc. and its affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +add_library(velox_hive_iceberg_datasource OBJECT IcebergSplit.cpp + IcebergDataSource.cpp) + +target_link_libraries( + velox_hive_iceberg_datasource + Folly::folly + gtest + Boost::headers + Boost::filesystem + gflags::gflags + glog::glog + gtest + gtest_main + xsimd) + +if(${VELOX_BUILD_TESTING}) + add_subdirectory(tests) +endif() diff --git a/velox/connectors/hive/iceberg/IcebergDataSource.cpp b/velox/connectors/hive/iceberg/IcebergDataSource.cpp new file mode 100644 index 000000000000..258dd0f099c2 --- /dev/null +++ b/velox/connectors/hive/iceberg/IcebergDataSource.cpp @@ -0,0 +1,235 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/connectors/hive/iceberg/IcebergDataSource.h" + +#include "velox/connectors/hive/iceberg/IcebergMetadataColumn.h" +#include "velox/connectors/hive/iceberg/IcebergSplit.h" +#include "velox/dwio/common/BufferUtil.h" + +using namespace facebook::velox::dwio::common; + +namespace facebook::velox::connector::hive::iceberg { + +HiveIcebergDataSource::HiveIcebergDataSource( + const RowTypePtr& outputType, + const std::shared_ptr& tableHandle, + const std::unordered_map< + std::string, + std::shared_ptr>& + columnHandles, // output columns + FileHandleFactory* fileHandleFactory, + core::ExpressionEvaluator* expressionEvaluator, + cache::AsyncDataCache* cache, + const std::string& scanId, + folly::Executor* executor, + const dwio::common::ReaderOptions& options, + std::shared_ptr split) + : HiveDataSource( + outputType, + tableHandle, + columnHandles, + fileHandleFactory, + expressionEvaluator, + cache, + scanId, + executor, + options, + split), + connectorId_(split->connectorId), + deleteFileContentsOffset_(0), + deleteRowsOffset_(0), + numDeleteRowsInSplit_(0) {} + +void HiveIcebergDataSource::addSplit(std::shared_ptr split) { + readOffset_ = 0; + deleteFileContents_.clear(); + deleteFileContentsOffset_ = 0; + deleteRowsOffset_ = 0; + numDeleteRowsInSplit_ = 0; + + HiveDataSource::addSplit(split); + + firstRowInSplit_ = rowReader_->nextRowNumber(); + + // TODO: Deserialize the std::vector deleteFiles. For now + // we assume it's already deserialized. + std::shared_ptr icebergSplit = + std::dynamic_pointer_cast(split); + + auto& deleteFiles = icebergSplit->deleteFiles; + for (auto& deleteFile : deleteFiles) { + if (deleteFile.content == FileContent::kEqualityDeletes) { + readEqualityDeletes(deleteFile); + } else if (deleteFile.content == FileContent::kPositionalDeletes) { + readPositionalDeletes(deleteFile); + } + } +} + +uint64_t HiveIcebergDataSource::readNext(uint64_t size) { + // The equality deletes should have been put into filter in scanspec. Now we + // just need to deal with positional deletes. The deleteFileContents_ contains + // the deleted row numbers relative to the start of the file, and we need to + // convert them into row numbers relative to the start of the split. + Mutation mutation; + mutation.deletedRows = nullptr; + + if (numDeleteRowsInSplit_ > 0) { + auto numBytes = bits::nbytes(size); + dwio::common::ensureCapacity(deleteBitmap_, numBytes, pool_); + std::memset((void*)deleteBitmap_->as(), 0L, numBytes); + + while (deleteFileContentsOffset_ < deleteFileContents_.size()) { + auto deleteFile = deleteFileContents_[deleteFileContentsOffset_]; + VectorPtr& posVector = deleteFile->childAt(1); + const int64_t* deleteRows = + posVector->as>()->rawValues(); + + auto deleteRowsOffsetBefore = deleteRowsOffset_; + while (deleteRowsOffset_ < posVector->size() && + deleteRows[deleteRowsOffset_] < + firstRowInSplit_ + readOffset_ + size) { + bits::setBit( + deleteBitmap_->asMutable(), + deleteRows[deleteRowsOffset_] - readOffset_ - firstRowInSplit_); + deleteRowsOffset_++; + } + + numDeleteRowsInSplit_ -= deleteRowsOffset_ - deleteRowsOffsetBefore; + if (deleteRowsOffset_ == posVector->size()) { + deleteFileContentsOffset_++; + deleteRowsOffset_ = 0; + } else { + break; + } + } + + deleteBitmap_->setSize(numBytes); + mutation.deletedRows = deleteBitmap_->as(); + } + + auto rowsScanned = HiveDataSource::rowReader_->next(size, output_, &mutation); + readOffset_ += rowsScanned; + + return rowsScanned; +} + +void HiveIcebergDataSource::readPositionalDeletes( + const IcebergDeleteFile& deleteFile) { + auto filePathColumn = ICEBERG_DELETE_FILE_PATH_COLUMN(); + auto positionsColumn = ICEBERG_DELETE_FILE_POSITIONS_COLUMN(); + + std::vector deleteColumnNames( + {filePathColumn->name, positionsColumn->name}); + std::vector> deleteColumnTypes( + {filePathColumn->type, positionsColumn->type}); + + RowTypePtr deleteRowType = + ROW(std::move(deleteColumnNames), std::move(deleteColumnTypes)); + + std::unordered_map> + deleteColumnHandles; + deleteColumnHandles[filePathColumn->name] = + std::make_shared( + filePathColumn->name, + HiveColumnHandle::ColumnType::kRegular, + VARCHAR(), + VARCHAR()); + deleteColumnHandles[positionsColumn->name] = + std::make_shared( + positionsColumn->name, + HiveColumnHandle::ColumnType::kRegular, + BIGINT(), + BIGINT()); + + openDeletes(deleteFile, deleteRowType, deleteColumnHandles); +} + +void HiveIcebergDataSource::readEqualityDeletes( + const IcebergDeleteFile& deleteFile) { + VELOX_NYI(); +} + +void HiveIcebergDataSource::openDeletes( + const IcebergDeleteFile& deleteFile, + const RowTypePtr& deleteRowType, + const std::unordered_map< + std::string, + std::shared_ptr>& deleteColumnHandles) { + size_t lastPathDelimiterPos = deleteFile.filePath.find_last_of('/'); + std::string deleteFileName = deleteFile.filePath.substr( + lastPathDelimiterPos, deleteFile.filePath.size() - lastPathDelimiterPos); + + SubfieldFilters subfieldFilters = {}; + auto deleteTableHandle = std::make_shared( + connectorId_, deleteFileName, false, std::move(subfieldFilters), nullptr); + + auto deleteSplit = std::make_shared( + connectorId_, + deleteFile.filePath, + deleteFile.fileFormat, + 0, + deleteFile.fileSizeInBytes); + + auto deleteFileDataSource = std::make_shared( + deleteRowType, + deleteTableHandle, + deleteColumnHandles, + fileHandleFactory_, + expressionEvaluator_, + cache_, + scanId_ + deleteFile.filePath, + executor_, + readerOpts_, + deleteSplit); + + deleteFileDataSource->addSplit(deleteSplit); + + ContinueFuture blockingFuture(ContinueFuture::makeEmpty()); + uint64_t numDeleteRowsInFile = 0; + while (true) { + std::optional deletesResult = + deleteFileDataSource->next(deleteFile.recordCount, blockingFuture); + + if (!deletesResult.has_value()) { + return; + } + + auto data = deletesResult.value(); + if (data) { + if (data->size() > 0) { + VELOX_CHECK(data->childrenSize() > 0); + numDeleteRowsInFile += data->childAt(0)->size(); + + data->loadedVector(); + deleteFileContents_.push_back(data); + } + } else { + numDeleteRowsInSplit_ += numDeleteRowsInFile; + VELOX_CHECK( + numDeleteRowsInFile == deleteFile.recordCount, + "Failed to read Iceberg delete file %s. Supposed to read %lld rows, actual read %lld rows", + deleteFile.filePath, + deleteFile.recordCount, + numDeleteRowsInFile); + + return; + } + }; +} + +} // namespace facebook::velox::connector::hive::iceberg \ No newline at end of file diff --git a/velox/connectors/hive/iceberg/IcebergDataSource.h b/velox/connectors/hive/iceberg/IcebergDataSource.h new file mode 100644 index 000000000000..9ec8ddcdf222 --- /dev/null +++ b/velox/connectors/hive/iceberg/IcebergDataSource.h @@ -0,0 +1,69 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include "velox/connectors/hive/HiveDataSource.h" +#include "velox/connectors/hive/iceberg/IcebergDeleteFile.h" + +namespace facebook::velox::connector::hive::iceberg { + +class HiveIcebergDataSource : public HiveDataSource { + public: + HiveIcebergDataSource( + const RowTypePtr& outputType, + const std::shared_ptr& tableHandle, + const std::unordered_map< + std::string, + std::shared_ptr>& columnHandles, + FileHandleFactory* fileHandleFactory, + core::ExpressionEvaluator* expressionEvaluator, + cache::AsyncDataCache* cache, + const std::string& scanId, + folly::Executor* executor, + const dwio::common::ReaderOptions& options, + std::shared_ptr connectorSplit); + + void addSplit(std::shared_ptr split) override; + uint64_t readNext(uint64_t size) override; + + private: + void readPositionalDeletes(const IcebergDeleteFile& deleteFile); + void readEqualityDeletes(const IcebergDeleteFile& deleteFile); + void openDeletes( + const IcebergDeleteFile& deleteFile, + const RowTypePtr& deleteRowType, + const std::unordered_map< + std::string, + std::shared_ptr>& deleteColumnHandles); + + std::string connectorId_; + BufferPtr deleteBitmap_; + // The read offset to the beginning of the split in number of rows for the + // current batch + uint64_t readOffset_; + // The file row position for the first row in the split + uint64_t firstRowInSplit_; + // Materialized delete file contents, one RowVectorPtr for each delete file + std::vector deleteFileContents_; + // The index into deleteFileContents_ vector for the current batch + uint32_t deleteFileContentsOffset_; + // The index within each delete file + uint32_t deleteRowsOffset_; + // Total number of delete rows in the split + uint64_t numDeleteRowsInSplit_; +}; + +} // namespace facebook::velox::connector::hive::iceberg \ No newline at end of file diff --git a/velox/connectors/hive/iceberg/IcebergDeleteFile.h b/velox/connectors/hive/iceberg/IcebergDeleteFile.h new file mode 100644 index 000000000000..1cff2b8efbe2 --- /dev/null +++ b/velox/connectors/hive/iceberg/IcebergDeleteFile.h @@ -0,0 +1,69 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include "velox/dwio/common/Options.h" + +#include +#include +#include + +namespace facebook::velox::connector::hive::iceberg { + +enum class FileContent { + kData, + kPositionalDeletes, + kEqualityDeletes, +}; + +struct IcebergDeleteFile { + FileContent content; + const std::string filePath; + dwio::common::FileFormat fileFormat; + uint64_t recordCount; + uint64_t fileSizeInBytes; + // The field ids for the delete columns for equality delete files + std::vector equalityFieldIds; + // The lower bounds of the in-file positions for the deleted rows, identified + // by each column's field id. E.g. The deleted rows for a column with field id + // 1 is in range [10, 50], where 10 and 50 are the deleted row positions in + // the data file, then lowerBounds would contain entry <1, "10"> + std::unordered_map lowerBounds; + // The upper bounds of the in-file positions for the deleted rows, identified + // by each column's field id. E.g. The deleted rows for a column with field id + // 1 is in range [10, 50], then upperBounds will contain entry <1, "50"> + std::unordered_map upperBounds; + + IcebergDeleteFile( + FileContent _content, + const std::string& _filePath, + dwio::common::FileFormat _fileFormat, + uint64_t _recordCount, + uint64_t _fileSizeInBytes, + std::vector _equalityFieldIds = {}, + std::unordered_map _lowerBounds = {}, + std::unordered_map _upperBounds = {}) + : content(_content), + filePath(_filePath), + fileFormat(_fileFormat), + recordCount(_recordCount), + fileSizeInBytes(_fileSizeInBytes), + equalityFieldIds(_equalityFieldIds), + lowerBounds(_lowerBounds), + upperBounds(_upperBounds) {} +}; + +} // namespace facebook::velox::connector::hive::iceberg \ No newline at end of file diff --git a/velox/connectors/hive/iceberg/IcebergMetadataColumn.h b/velox/connectors/hive/iceberg/IcebergMetadataColumn.h new file mode 100644 index 000000000000..c2ffb05a2006 --- /dev/null +++ b/velox/connectors/hive/iceberg/IcebergMetadataColumn.h @@ -0,0 +1,56 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "velox/type/Type.h" + +#include + +namespace facebook::velox::connector::hive::iceberg { + +const std::string kIcebergDeleteFilePathColumn = "file_path"; +const std::string kIcebergRowPositionColumn = "pos"; + +struct IcebergMetadataColumn { + int id; + std::string name; + std::shared_ptr type; + std::string doc; + + IcebergMetadataColumn( + int _id, + const std::string& _name, + std::shared_ptr _type, + const std::string& _doc) + : id(_id), name(_name), type(_type), doc(_doc) {} +}; + +#define ICEBERG_DELETE_FILE_PATH_COLUMN() \ + std::make_shared( \ + 2147483546, \ + kIcebergDeleteFilePathColumn, \ + VARCHAR(), \ + "Path of a file in which a deleted row is stored"); + +#define ICEBERG_DELETE_FILE_POSITIONS_COLUMN() \ + std::make_shared( \ + 2147483545, \ + kIcebergRowPositionColumn, \ + BIGINT(), \ + "Ordinal position of a deleted row in the data file"); + +} // namespace facebook::velox::connector::hive::iceberg diff --git a/velox/connectors/hive/iceberg/IcebergSplit.cpp b/velox/connectors/hive/iceberg/IcebergSplit.cpp new file mode 100644 index 000000000000..642c2f3b91e5 --- /dev/null +++ b/velox/connectors/hive/iceberg/IcebergSplit.cpp @@ -0,0 +1,67 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/connectors/hive/iceberg/IcebergSplit.h" + +namespace facebook::velox::connector::hive::iceberg { + +HiveIcebergSplit::HiveIcebergSplit( + const std::string& _connectorId, + const std::string& _filePath, + dwio::common::FileFormat _fileFormat, + uint64_t _start, + uint64_t _length, + const std::unordered_map>& + _partitionKeys, + std::optional _tableBucketNumber, + const std::unordered_map& _customSplitInfo, + const std::shared_ptr& _extraFileInfo) + : HiveConnectorSplit( + _connectorId, + _filePath, + _fileFormat, + _start, + _length, + _partitionKeys, + _tableBucketNumber) { + // TODO: Deserialize _extraFileInfo to get deleteFiles; +} + +// For tests only +HiveIcebergSplit::HiveIcebergSplit( + const std::string& _connectorId, + const std::string& _filePath, + dwio::common::FileFormat _fileFormat, + uint64_t _start, + uint64_t _length, + const std::unordered_map>& + _partitionKeys, + std::optional _tableBucketNumber, + const std::unordered_map& _customSplitInfo, + const std::shared_ptr& _extraFileInfo, + std::vector _deletes) + : HiveConnectorSplit( + _connectorId, + _filePath, + _fileFormat, + _start, + _length, + _partitionKeys, + _tableBucketNumber, + _customSplitInfo, + _extraFileInfo), + deleteFiles(_deletes) {} +} // namespace facebook::velox::connector::hive::iceberg diff --git a/velox/connectors/hive/iceberg/IcebergSplit.h b/velox/connectors/hive/iceberg/IcebergSplit.h new file mode 100644 index 000000000000..0e8d4e1591fa --- /dev/null +++ b/velox/connectors/hive/iceberg/IcebergSplit.h @@ -0,0 +1,53 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include "velox/connectors/hive/HiveConnectorSplit.h" +#include "velox/connectors/hive/iceberg/IcebergDeleteFile.h" + +namespace facebook::velox::connector::hive::iceberg { + +struct HiveIcebergSplit : public connector::hive::HiveConnectorSplit { + std::vector deleteFiles; + + HiveIcebergSplit( + const std::string& connectorId, + const std::string& _filePath, + dwio::common::FileFormat _fileFormat, + uint64_t _start = 0, + uint64_t _length = std::numeric_limits::max(), + const std::unordered_map>& + _partitionKeys = {}, + std::optional _tableBucketNumber = std::nullopt, + const std::unordered_map& _customSplitInfo = {}, + const std::shared_ptr& _extraFileInfo = {}); + + // For tests only + HiveIcebergSplit( + const std::string& connectorId, + const std::string& _filePath, + dwio::common::FileFormat _fileFormat, + uint64_t _start = 0, + uint64_t _length = std::numeric_limits::max(), + const std::unordered_map>& + _partitionKeys = {}, + std::optional _tableBucketNumber = std::nullopt, + const std::unordered_map& _customSplitInfo = {}, + const std::shared_ptr& _extraFileInfo = {}, + std::vector deletes = {}); +}; + +} // namespace facebook::velox::connector::hive::iceberg \ No newline at end of file diff --git a/velox/connectors/hive/iceberg/tests/CMakeLists.txt b/velox/connectors/hive/iceberg/tests/CMakeLists.txt new file mode 100644 index 000000000000..1a199c897578 --- /dev/null +++ b/velox/connectors/hive/iceberg/tests/CMakeLists.txt @@ -0,0 +1,29 @@ +# Copyright (c) Facebook, Inc. and its affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +add_executable(velox_hive_iceberg_test IcebergReadTest.cpp) +add_test(velox_hive_iceberg_test velox_hive_iceberg_test) + +target_link_libraries( + velox_hive_iceberg_test + velox_hive_connector + velox_hive_iceberg_datasource + velox_hive_partition_function + velox_hive_iceberg_datasource + velox_dwio_common_exception + velox_dwio_common_test_utils + velox_vector_test_lib + velox_exec + velox_exec_test_lib + gtest + gtest_main) diff --git a/velox/connectors/hive/iceberg/tests/IcebergReadTest.cpp b/velox/connectors/hive/iceberg/tests/IcebergReadTest.cpp new file mode 100644 index 000000000000..a6a9ea3dfb3d --- /dev/null +++ b/velox/connectors/hive/iceberg/tests/IcebergReadTest.cpp @@ -0,0 +1,230 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/common/file/FileSystems.h" +#include "velox/connectors/Connector.h" +#include "velox/connectors/hive/HiveConnectorSplit.h" +#include "velox/connectors/hive/iceberg/IcebergDeleteFile.h" +#include "velox/connectors/hive/iceberg/IcebergMetadataColumn.h" +#include "velox/connectors/hive/iceberg/IcebergSplit.h" +#include "velox/dwio/common/tests/utils/BatchMaker.h" +#include "velox/dwio/dwrf/reader/DwrfReader.h" +#include "velox/dwio/dwrf/writer/Writer.h" +#include "velox/exec/PlanNodeStats.h" +#include "velox/exec/tests/utils/HiveConnectorTestBase.h" +#include "velox/exec/tests/utils/PlanBuilder.h" + +#include +#include + +using namespace facebook::velox::exec::test; +using namespace facebook::velox::exec; +using namespace facebook::velox::dwio::common; +using namespace facebook::velox::test; + +namespace facebook::velox::connector::hive::iceberg { + +class HiveIcebergTest : public HiveConnectorTestBase { + public: + void SetUp() override { + folly::SingletonVault::singleton()->registrationComplete(); + auto hiveConnector = + connector::getConnectorFactory( + connector::hive::HiveConnectorFactory::kHiveConnectorName) + ->newConnector(kHiveConnectorId, nullptr, ioExecutor_.get()); + connector::registerConnector(hiveConnector); + dwrf::registerDwrfReaderFactory(); + dwrf::registerDwrfWriterFactory(); + } + + void TearDown() override { + ioExecutor_.reset(); + dwrf::unregisterDwrfReaderFactory(); + dwrf::unregisterDwrfWriterFactory(); + connector::unregisterConnector(kHiveConnectorId); + OperatorTestBase::TearDown(); + } + + void assertPositionalDeletes(const std::vector& deleteRows) { + assertPositionalDeletes( + deleteRows, + "SELECT * FROM tmp WHERE c0 NOT IN (" + makeNotInList(deleteRows) + + ")"); + } + void assertPositionalDeletes( + const std::vector& deleteRows, + std::string duckdbSql) { + std::shared_ptr dataFilePath = writeDataFile(rowCount); + std::shared_ptr deleteFilePath = + writePositionDeleteFile(dataFilePath->path, deleteRows); + + IcebergDeleteFile deleteFile( + FileContent::kPositionalDeletes, + deleteFilePath->path, + fileFomat_, + deleteRows.size(), + testing::internal::GetFileSize( + std::fopen(deleteFilePath->path.c_str(), "r"))); + + auto icebergSplit = makeIcebergSplit(dataFilePath->path, {deleteFile}); + + auto plan = tableScanNode(); + auto task = OperatorTestBase::assertQuery(plan, {icebergSplit}, duckdbSql); + + auto planStats = toPlanStats(task->taskStats()); + auto scanNodeId = plan->id(); + auto it = planStats.find(scanNodeId); + ASSERT_TRUE(it != planStats.end()); + ASSERT_TRUE(it->second.peakMemoryBytes > 0); + } + + std::vector makeRandomDeleteRows(int32_t maxRowNumber) { + std::mt19937 gen{0}; + std::vector deleteRows; + for (int i = 0; i < maxRowNumber; i++) { + if (folly::Random::rand32(0, 10, gen) > 8) { + deleteRows.push_back(i); + } + } + return deleteRows; + } + + std::vector makeFullDeleteRows(int32_t maxRowNumber) { + std::vector deleteRows; + deleteRows.resize(maxRowNumber); + std::iota(deleteRows.begin(), deleteRows.end(), 0); + return deleteRows; + } + + const static int rowCount = 20000; + + private: + std::shared_ptr makeIcebergSplit( + const std::string& dataFilePath, + const std::vector& deleteFiles = {}) { + std::unordered_map> partitionKeys; + std::unordered_map customSplitInfo; + customSplitInfo["table_format"] = "hive_iceberg"; + + auto file = filesystems::getFileSystem(dataFilePath, nullptr) + ->openFileForRead(dataFilePath); + const int64_t fileSize = file->size(); + + return std::make_shared( + kHiveConnectorId, + dataFilePath, + fileFomat_, + 0, + fileSize, + partitionKeys, + std::nullopt, + customSplitInfo, + nullptr, + deleteFiles); + } + + std::vector makeVectors(int32_t count, int32_t rowsPerVector) { + std::vector vectors; + + for (int i = 0; i < count; i++) { + VectorPtr c0 = BatchMaker::createOrderedVector( + BIGINT(), rowsPerVector, rowsPerVector * i, *pool_, [](size_t i) { + return false; + }); + vectors.push_back(makeRowVector({"c0"}, {c0})); + } + + return vectors; + } + + std::shared_ptr writeDataFile(uint64_t numRows) { + auto dataVectors = makeVectors(1, numRows); + + auto dataFilePath = TempFilePath::create(); + writeToFile(dataFilePath->path, dataVectors); + createDuckDbTable(dataVectors); + return dataFilePath; + } + + std::shared_ptr writePositionDeleteFile( + const std::string& dataFilePath, + const std::vector& deleteRows) { + uint32_t numDeleteRows = deleteRows.size(); + + auto filePathVector = makeFlatVector( + numDeleteRows, [&](auto row) { return StringView(dataFilePath); }); + auto deletePositionsVector = makeFlatVector(deleteRows); + RowVectorPtr deleteFileVectors = makeRowVector( + {pathColumn_->name, posColumn_->name}, + {filePathVector, deletePositionsVector}); + + auto deleteFilePath = TempFilePath::create(); + writeToFile(deleteFilePath->path, deleteFileVectors); + + return deleteFilePath; + } + + std::string makeNotInList(const std::vector& deleteRows) { + if (deleteRows.empty()) { + return ""; + } + + return std::accumulate( + deleteRows.begin() + 1, + deleteRows.end(), + std::to_string(deleteRows[0]), + [](const std::string& a, int64_t b) { + return a + ", " + std::to_string(b); + }); + } + + std::shared_ptr assertQuery( + const core::PlanNodePtr& plan, + std::shared_ptr dataFilePath, + const std::vector& deleteFiles, + const std::string& duckDbSql) { + auto icebergSplit = makeIcebergSplit(dataFilePath->path, deleteFiles); + return OperatorTestBase::assertQuery(plan, {icebergSplit}, duckDbSql); + } + + core::PlanNodePtr tableScanNode() { + return PlanBuilder(pool_.get()).tableScan(rowType_).planNode(); + } + + private: + dwio::common::FileFormat fileFomat_{dwio::common::FileFormat::DWRF}; + RowTypePtr rowType_{ROW({"c0"}, {BIGINT()})}; + std::shared_ptr pathColumn_ = + ICEBERG_DELETE_FILE_PATH_COLUMN(); + std::shared_ptr posColumn_ = + ICEBERG_DELETE_FILE_POSITIONS_COLUMN(); +}; + +TEST_F(HiveIcebergTest, positionalDeletes) { + // Delete row 0, 1, 2, 3 from 20000 rows + assertPositionalDeletes({0, 1, 2, 3}); + // Delete the first and last row in each batch (10000 rows per batch) + assertPositionalDeletes({0, 9999, 10000, 19999}); + // Delete random rows + assertPositionalDeletes(makeRandomDeleteRows(rowCount)); + // Delete 0 rows + assertPositionalDeletes({}, "SELECT * FROM tmp"); + // Delete all rows + assertPositionalDeletes( + makeFullDeleteRows(rowCount), "SELECT * FROM tmp WHERE 1 = 0"); +} + +} // namespace facebook::velox::connector::hive::iceberg \ No newline at end of file diff --git a/velox/connectors/hive/iceberg/tests/IcebergReadTest.h b/velox/connectors/hive/iceberg/tests/IcebergReadTest.h new file mode 100644 index 000000000000..b0ab28c39cae --- /dev/null +++ b/velox/connectors/hive/iceberg/tests/IcebergReadTest.h @@ -0,0 +1,62 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include "velox/connectors/hive/HiveConnectorSplit.h" +#include "velox/connectors/hive/iceberg/IcebergSplit.h" +#include "velox/exec/tests/utils/HiveConnectorTestBase.h" + +#include + +namespace facebook::velox::connector::hive::iceberg { + +class IcebergSplitBuilder : public exec::test::HiveConnectorSplitBuilder { + public: + const std::string kHiveConnectorId = "test-hive"; + + IcebergSplitBuilder(std::string filePath) + : HiveConnectorSplitBuilder{std::move(filePath)} {} + + IcebergSplitBuilder& deleteFiles( + const std::vector& deleteFiles) { + deleteFiles_ = deleteFiles; + return *this; + } + + IcebergSplitBuilder& deleteFile(const IcebergDeleteFile& deleteFile) { + deleteFiles_.push_back(std::move(deleteFile)); + return *this; + } + + std::shared_ptr build() const { + return std::make_shared( + kHiveConnectorId, + "file:" + filePath_, + fileFormat_, + start_, + length_, + partitionKeys_, + tableBucketNumber_, + {}, + {}, + deleteFiles_); + } + + private: + std::vector deleteFiles_; +}; + +} // namespace facebook::velox::connector::hive::iceberg diff --git a/velox/connectors/hive/storage_adapters/gcs/examples/CMakeLists.txt b/velox/connectors/hive/storage_adapters/gcs/examples/CMakeLists.txt index 29f934287460..ed78f94a54b7 100644 --- a/velox/connectors/hive/storage_adapters/gcs/examples/CMakeLists.txt +++ b/velox/connectors/hive/storage_adapters/gcs/examples/CMakeLists.txt @@ -20,5 +20,6 @@ target_link_libraries( velox_gcs velox_core velox_hive_connector + velox_hive_iceberg_datasource velox_dwio_common_exception velox_exec) diff --git a/velox/connectors/hive/storage_adapters/gcs/tests/CMakeLists.txt b/velox/connectors/hive/storage_adapters/gcs/tests/CMakeLists.txt index b44aa355c09f..c27a9b926f72 100644 --- a/velox/connectors/hive/storage_adapters/gcs/tests/CMakeLists.txt +++ b/velox/connectors/hive/storage_adapters/gcs/tests/CMakeLists.txt @@ -20,6 +20,7 @@ target_link_libraries( velox_gcs velox_core velox_hive_connector + velox_hive_iceberg_datasource velox_dwio_common_exception velox_exec gmock diff --git a/velox/connectors/hive/storage_adapters/hdfs/tests/CMakeLists.txt b/velox/connectors/hive/storage_adapters/hdfs/tests/CMakeLists.txt index 341970ef8162..19de21e9ab09 100644 --- a/velox/connectors/hive/storage_adapters/hdfs/tests/CMakeLists.txt +++ b/velox/connectors/hive/storage_adapters/hdfs/tests/CMakeLists.txt @@ -21,6 +21,7 @@ target_link_libraries( velox_core velox_exec_test_lib velox_hive_connector + velox_hive_iceberg_datasource velox_dwio_common_exception velox_exec gtest diff --git a/velox/connectors/hive/storage_adapters/s3fs/tests/CMakeLists.txt b/velox/connectors/hive/storage_adapters/s3fs/tests/CMakeLists.txt index b9dabecc1f73..bf31aa63ac19 100644 --- a/velox/connectors/hive/storage_adapters/s3fs/tests/CMakeLists.txt +++ b/velox/connectors/hive/storage_adapters/s3fs/tests/CMakeLists.txt @@ -21,6 +21,7 @@ target_link_libraries( velox_core velox_exec_test_lib velox_hive_connector + velox_hive_iceberg_datasource velox_dwio_common_exception velox_exec gtest diff --git a/velox/connectors/hive/tests/CMakeLists.txt b/velox/connectors/hive/tests/CMakeLists.txt index f3b348ca1767..da315d1d8822 100644 --- a/velox/connectors/hive/tests/CMakeLists.txt +++ b/velox/connectors/hive/tests/CMakeLists.txt @@ -26,6 +26,7 @@ add_test(velox_hive_connector_test velox_hive_connector_test) target_link_libraries( velox_hive_connector_test velox_hive_connector + velox_hive_iceberg_datasource velox_hive_partition_function velox_dwio_common_exception velox_vector_fuzzer diff --git a/velox/connectors/tests/ConnectorTest.cpp b/velox/connectors/tests/ConnectorTest.cpp index ba54bceed625..419c11903809 100644 --- a/velox/connectors/tests/ConnectorTest.cpp +++ b/velox/connectors/tests/ConnectorTest.cpp @@ -33,7 +33,9 @@ class TestConnector : public connector::Connector { const std::unordered_map< std::string, std::shared_ptr>& /* columnHandles */, - connector::ConnectorQueryCtx* connectorQueryCtx) override { + connector::ConnectorQueryCtx* connectorQueryCtx, + std::shared_ptr connectorSplit) + override { VELOX_NYI(); } diff --git a/velox/connectors/tpch/TpchConnector.h b/velox/connectors/tpch/TpchConnector.h index 8f25c4649c91..11d33aa2777c 100644 --- a/velox/connectors/tpch/TpchConnector.h +++ b/velox/connectors/tpch/TpchConnector.h @@ -140,7 +140,8 @@ class TpchConnector final : public Connector { const std::unordered_map< std::string, std::shared_ptr>& columnHandles, - ConnectorQueryCtx* FOLLY_NONNULL connectorQueryCtx) override final { + ConnectorQueryCtx* FOLLY_NONNULL connectorQueryCtx, + std::shared_ptr split) override final { return std::make_unique( outputType, tableHandle, diff --git a/velox/dwio/common/tests/utils/DataFiles.h b/velox/dwio/common/tests/utils/DataFiles.h index 564ea7e6297f..914cbc1b015a 100644 --- a/velox/dwio/common/tests/utils/DataFiles.h +++ b/velox/dwio/common/tests/utils/DataFiles.h @@ -23,4 +23,5 @@ std::string getDataFilePath( const std::string& baseDir, const std::string& filePath); +uint64_t getFileSize(const std::string& filePath); } diff --git a/velox/dwio/parquet/tests/CMakeLists.txt b/velox/dwio/parquet/tests/CMakeLists.txt index 2599becc6f59..10ae7b14a648 100644 --- a/velox/dwio/parquet/tests/CMakeLists.txt +++ b/velox/dwio/parquet/tests/CMakeLists.txt @@ -36,5 +36,6 @@ target_link_libraries( velox_exec_test_lib velox_exec velox_hive_connector + velox_hive_iceberg_datasource velox_aggregates ${TEST_LINK_LIBS}) diff --git a/velox/dwio/parquet/tests/reader/CMakeLists.txt b/velox/dwio/parquet/tests/reader/CMakeLists.txt index ff8aa8a9c27e..99a4ce99b168 100644 --- a/velox/dwio/parquet/tests/reader/CMakeLists.txt +++ b/velox/dwio/parquet/tests/reader/CMakeLists.txt @@ -51,6 +51,7 @@ target_link_libraries( velox_exec_test_lib velox_exec velox_hive_connector + velox_hive_iceberg_datasource Folly::folly ${FOLLY_BENCHMARK}) @@ -81,6 +82,7 @@ target_link_libraries( velox_exec_test_lib velox_exec velox_hive_connector + velox_hive_iceberg_datasource velox_link_libs ${TEST_LINK_LIBS}) diff --git a/velox/examples/CMakeLists.txt b/velox/examples/CMakeLists.txt index 8625d34ce82e..2765ae443956 100644 --- a/velox/examples/CMakeLists.txt +++ b/velox/examples/CMakeLists.txt @@ -45,6 +45,7 @@ target_link_libraries( velox_exec velox_exec_test_lib velox_hive_connector + velox_hive_iceberg_datasource velox_memory) add_executable(velox_example_vector_reader_writer VectorReaderWriter.cpp) diff --git a/velox/exec/TableScan.cpp b/velox/exec/TableScan.cpp index 089468907e03..83b6fd497ff6 100644 --- a/velox/exec/TableScan.cpp +++ b/velox/exec/TableScan.cpp @@ -106,7 +106,8 @@ RowVectorPtr TableScan::getOutput() { outputType_, tableHandle_, columnHandles_, - connectorQueryCtx_.get()); + connectorQueryCtx_.get(), + connectorSplit); for (const auto& entry : pendingDynamicFilters_) { dataSource_->addDynamicFilter(entry.first, entry.second); } @@ -233,7 +234,9 @@ void TableScan::preload(std::shared_ptr split) { }, &debugString}); - auto ptr = connector->createDataSource(type, table, columns, ctx.get()); + auto ptr = + connector->createDataSource(type, table, columns, ctx.get(), split); + if (task->isCancelled()) { return nullptr; } diff --git a/velox/exec/tests/AsyncConnectorTest.cpp b/velox/exec/tests/AsyncConnectorTest.cpp index 30e4f8fb0dd5..e18f9e7b7454 100644 --- a/velox/exec/tests/AsyncConnectorTest.cpp +++ b/velox/exec/tests/AsyncConnectorTest.cpp @@ -142,7 +142,9 @@ class TestConnector : public connector::Connector { const std::unordered_map< std::string, std::shared_ptr>& /* columnHandles */, - connector::ConnectorQueryCtx* connectorQueryCtx) override { + connector::ConnectorQueryCtx* connectorQueryCtx, + std::shared_ptr connectorSplit) + override { return std::make_unique(connectorQueryCtx->memoryPool()); } diff --git a/velox/exec/tests/CMakeLists.txt b/velox/exec/tests/CMakeLists.txt index abd007f7b052..2d1544086a1b 100644 --- a/velox/exec/tests/CMakeLists.txt +++ b/velox/exec/tests/CMakeLists.txt @@ -118,6 +118,7 @@ target_link_libraries( velox_functions_prestosql velox_functions_test_lib velox_hive_connector + velox_hive_iceberg_datasource velox_memory velox_serialization velox_test_util @@ -152,6 +153,7 @@ target_link_libraries( velox_functions_prestosql velox_functions_test_lib velox_hive_connector + velox_hive_iceberg_datasource velox_memory velox_serialization velox_test_util diff --git a/velox/exec/tests/utils/CMakeLists.txt b/velox/exec/tests/utils/CMakeLists.txt index c3ed321bf335..2f079057131f 100644 --- a/velox/exec/tests/utils/CMakeLists.txt +++ b/velox/exec/tests/utils/CMakeLists.txt @@ -44,6 +44,7 @@ target_link_libraries( velox_dwio_common_test_utils velox_type_fbhive velox_hive_connector + velox_hive_iceberg_datasource velox_tpch_connector velox_presto_serializer velox_functions_prestosql diff --git a/velox/exec/tests/utils/HiveConnectorTestBase.h b/velox/exec/tests/utils/HiveConnectorTestBase.h index 31afdb5479c8..3012d9114ac7 100644 --- a/velox/exec/tests/utils/HiveConnectorTestBase.h +++ b/velox/exec/tests/utils/HiveConnectorTestBase.h @@ -229,7 +229,7 @@ class HiveConnectorSplitBuilder { tableBucketNumber_); } - private: + protected: const std::string filePath_; dwio::common::FileFormat fileFormat_{dwio::common::FileFormat::DWRF}; uint64_t start_{0}; diff --git a/velox/experimental/codegen/benchmark/CMakeLists.txt b/velox/experimental/codegen/benchmark/CMakeLists.txt index 8541b47383ba..e8e6cd605d24 100644 --- a/velox/experimental/codegen/benchmark/CMakeLists.txt +++ b/velox/experimental/codegen/benchmark/CMakeLists.txt @@ -23,6 +23,7 @@ target_link_libraries( velox_codegen_utils_resource_path velox_codegen_code_generator velox_hive_connector + velox_hive_iceberg_datasource ${FOLLY_BENCHMARK} Folly::folly Boost::filesystem diff --git a/velox/experimental/codegen/vector_function/tests/CMakeLists.txt b/velox/experimental/codegen/vector_function/tests/CMakeLists.txt index 7f75bec3e314..2f94dd4f4eb3 100644 --- a/velox/experimental/codegen/vector_function/tests/CMakeLists.txt +++ b/velox/experimental/codegen/vector_function/tests/CMakeLists.txt @@ -29,6 +29,7 @@ target_link_libraries( velox_functions_lib velox_functions_prestosql velox_hive_connector + velox_hive_iceberg_datasource velox_core velox_type velox_serialization diff --git a/velox/expression/tests/CMakeLists.txt b/velox/expression/tests/CMakeLists.txt index 7d2c68de03d4..477fa861ae46 100644 --- a/velox/expression/tests/CMakeLists.txt +++ b/velox/expression/tests/CMakeLists.txt @@ -64,8 +64,10 @@ target_link_libraries( velox_expression_test velox_aggregates velox_hive_connector + velox_hive_iceberg_datasource velox_dwio_common velox_dwio_common_exception + velox_dwio_common_test_utils velox_exec_test_lib velox_expression velox_expression_test_utility diff --git a/velox/functions/prestosql/aggregates/benchmarks/CMakeLists.txt b/velox/functions/prestosql/aggregates/benchmarks/CMakeLists.txt index e6d18d340dfa..d339f6b495ef 100644 --- a/velox/functions/prestosql/aggregates/benchmarks/CMakeLists.txt +++ b/velox/functions/prestosql/aggregates/benchmarks/CMakeLists.txt @@ -18,6 +18,7 @@ target_link_libraries( velox_aggregates_simple_aggregates_benchmarks velox_aggregates velox_hive_connector + velox_hive_iceberg_datasource velox_functions_lib velox_exec_test_lib velox_functions_prestosql diff --git a/velox/functions/prestosql/aggregates/tests/CMakeLists.txt b/velox/functions/prestosql/aggregates/tests/CMakeLists.txt index 14538eb42843..9217af4be26e 100644 --- a/velox/functions/prestosql/aggregates/tests/CMakeLists.txt +++ b/velox/functions/prestosql/aggregates/tests/CMakeLists.txt @@ -66,6 +66,7 @@ target_link_libraries( velox_functions_prestosql velox_functions_lib velox_hive_connector + velox_hive_iceberg_datasource velox_simple_aggregate velox_type velox_vector_fuzzer diff --git a/velox/functions/sparksql/aggregates/tests/CMakeLists.txt b/velox/functions/sparksql/aggregates/tests/CMakeLists.txt index 806d539c5610..b8d8a53e356a 100644 --- a/velox/functions/sparksql/aggregates/tests/CMakeLists.txt +++ b/velox/functions/sparksql/aggregates/tests/CMakeLists.txt @@ -32,6 +32,7 @@ target_link_libraries( velox_functions_aggregates_test_lib velox_functions_spark_aggregates velox_hive_connector + velox_hive_iceberg_datasource gflags::gflags gtest gtest_main) diff --git a/velox/substrait/tests/CMakeLists.txt b/velox/substrait/tests/CMakeLists.txt index f453fb61b7c4..8f08d487068b 100644 --- a/velox/substrait/tests/CMakeLists.txt +++ b/velox/substrait/tests/CMakeLists.txt @@ -41,6 +41,7 @@ target_link_libraries( velox_functions_lib velox_functions_prestosql velox_hive_connector + velox_hive_iceberg_datasource velox_type velox_serialization velox_exec_test_lib