Skip to content

Commit

Permalink
Iceberg positional deletes read support
Browse files Browse the repository at this point in the history
This is the first cut to support reading Iceberg tables with delete
files. The design doc can be found at
#5977
  • Loading branch information
yingsu00 committed Aug 3, 2023
1 parent 286eaba commit 2ab2816
Show file tree
Hide file tree
Showing 42 changed files with 993 additions and 33 deletions.
1 change: 1 addition & 0 deletions velox/benchmarks/tpch/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ target_link_libraries(
velox_dwio_parquet_reader
velox_dwio_common_test_utils
velox_hive_connector
velox_hive_iceberg_datasource
velox_exception
velox_memory
velox_process
Expand Down
3 changes: 2 additions & 1 deletion velox/connectors/Connector.h
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,8 @@ class Connector {
const std::unordered_map<
std::string,
std::shared_ptr<connector::ColumnHandle>>& columnHandles,
ConnectorQueryCtx* FOLLY_NONNULL connectorQueryCtx) = 0;
ConnectorQueryCtx* FOLLY_NONNULL connectorQueryCtx,
std::shared_ptr<velox::connector::ConnectorSplit> connectorSplit) = 0;

// Returns true if addSplit of DataSource can use 'dataSource' from
// ConnectorSplit in addSplit(). If so, TableScan can preload splits
Expand Down
4 changes: 3 additions & 1 deletion velox/connectors/fuzzer/FuzzerConnector.h
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,9 @@ class FuzzerConnector final : public Connector {
const std::unordered_map<
std::string,
std::shared_ptr<connector::ColumnHandle>>& /*columnHandles*/,
ConnectorQueryCtx* FOLLY_NONNULL connectorQueryCtx) override final {
ConnectorQueryCtx* FOLLY_NONNULL connectorQueryCtx,
std::shared_ptr<velox::connector::ConnectorSplit> /*connectorSplit*/)
override final {
return std::make_unique<FuzzerDataSource>(
outputType, tableHandle, connectorQueryCtx->memoryPool());
}
Expand Down
3 changes: 3 additions & 0 deletions velox/connectors/hive/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ target_link_libraries(
velox_dwio_parquet_reader
velox_dwio_parquet_writer
velox_file
velox_hive_iceberg_datasource
velox_hive_partition_function
velox_s3fs
velox_hdfs
Expand All @@ -43,6 +44,8 @@ target_link_libraries(velox_hive_partition_function velox_core velox_exec)

add_subdirectory(storage_adapters)

add_subdirectory(iceberg)

if(${VELOX_BUILD_TESTING})
add_subdirectory(tests)
endif()
Expand Down
42 changes: 31 additions & 11 deletions velox/connectors/hive/HiveConnector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
#include "velox/dwio/parquet/RegisterParquetReader.h" // @manual
#include "velox/dwio/parquet/RegisterParquetWriter.h" // @manual
#endif
#include "velox/connectors/hive/iceberg/IcebergDataSource.h"
#include "velox/expression/FieldReference.h"

#include <boost/lexical_cast.hpp>
Expand Down Expand Up @@ -76,7 +77,8 @@ std::unique_ptr<DataSource> HiveConnector::createDataSource(
const std::unordered_map<
std::string,
std::shared_ptr<connector::ColumnHandle>>& columnHandles,
ConnectorQueryCtx* connectorQueryCtx) {
ConnectorQueryCtx* connectorQueryCtx,
std::shared_ptr<velox::connector::ConnectorSplit> connectorSplit) {
dwio::common::ReaderOptions options(connectorQueryCtx->memoryPool());
options.setMaxCoalesceBytes(
HiveConfig::maxCoalescedBytes(connectorQueryCtx->config()));
Expand All @@ -85,16 +87,34 @@ std::unique_ptr<DataSource> HiveConnector::createDataSource(
options.setFileColumnNamesReadAsLowerCase(
HiveConfig::isFileColumnNamesReadAsLowerCase(
connectorQueryCtx->config()));
return std::make_unique<HiveDataSource>(
outputType,
tableHandle,
columnHandles,
&fileHandleFactory_,
connectorQueryCtx->expressionEvaluator(),
connectorQueryCtx->cache(),
connectorQueryCtx->scanId(),
executor_,
options);

auto splitInfo =
std::dynamic_pointer_cast<HiveConnectorSplit>(connectorSplit);
if (splitInfo->customSplitInfo["table_format"] == "hive_iceberg") {
return std::make_unique<iceberg::HiveIcebergDataSource>(
outputType,
tableHandle,
columnHandles,
&fileHandleFactory_,
connectorQueryCtx->expressionEvaluator(),
connectorQueryCtx->cache(),
connectorQueryCtx->scanId(),
executor_,
options,
connectorSplit);
} else {
return std::make_unique<HiveDataSource>(
outputType,
tableHandle,
columnHandles,
&fileHandleFactory_,
connectorQueryCtx->expressionEvaluator(),
connectorQueryCtx->cache(),
connectorQueryCtx->scanId(),
executor_,
options,
connectorSplit);
}
}

std::unique_ptr<DataSink> HiveConnector::createDataSink(
Expand Down
4 changes: 3 additions & 1 deletion velox/connectors/hive/HiveConnector.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,9 @@ class HiveConnector : public Connector {
const std::unordered_map<
std::string,
std::shared_ptr<connector::ColumnHandle>>& columnHandles,
ConnectorQueryCtx* connectorQueryCtx) override;
ConnectorQueryCtx* connectorQueryCtx,
std::shared_ptr<velox::connector::ConnectorSplit> connectorSplit)
override;

bool supportsSplitPreload() override {
return true;
Expand Down
7 changes: 4 additions & 3 deletions velox/connectors/hive/HiveDataSource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -389,15 +389,16 @@ HiveDataSource::HiveDataSource(
cache::AsyncDataCache* cache,
const std::string& scanId,
folly::Executor* executor,
const dwio::common::ReaderOptions& options)
const dwio::common::ReaderOptions& options,
std::shared_ptr<velox::connector::ConnectorSplit> /*connectorSplit*/)
: fileHandleFactory_(fileHandleFactory),
readerOpts_(options),
pool_(&options.getMemoryPool()),
outputType_(outputType),
expressionEvaluator_(expressionEvaluator),
cache_(cache),
scanId_(scanId),
executor_(executor) {
executor_(executor),
outputType_(outputType) {
// Column handled keyed on the column alias, the name used in the query.
for (const auto& [canonicalizedName, columnHandle] : columnHandles) {
auto handle = std::dynamic_pointer_cast<HiveColumnHandle>(columnHandle);
Expand Down
18 changes: 8 additions & 10 deletions velox/connectors/hive/HiveDataSource.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ class HiveDataSource : public DataSource {
cache::AsyncDataCache* cache,
const std::string& scanId,
folly::Executor* executor,
const dwio::common::ReaderOptions& options);
const dwio::common::ReaderOptions& options,
std::shared_ptr<velox::connector::ConnectorSplit> connectorSplit);

void addSplit(std::shared_ptr<ConnectorSplit> split) override;

Expand Down Expand Up @@ -101,12 +102,18 @@ class HiveDataSource : public DataSource {
}

std::shared_ptr<HiveConnectorSplit> split_;
std::shared_ptr<common::ScanSpec> scanSpec_;
FileHandleFactory* fileHandleFactory_;
dwio::common::ReaderOptions readerOpts_;
memory::MemoryPool* pool_;
VectorPtr output_;
RowTypePtr readerOutputType_;
std::unique_ptr<dwio::common::Reader> reader_;
std::unique_ptr<dwio::common::RowReader> rowReader_;
core::ExpressionEvaluator* expressionEvaluator_;
cache::AsyncDataCache* const cache_{nullptr};
const std::string& scanId_;
folly::Executor* executor_;

private:
// Evaluates remainingFilter_ on the specified vector. Returns number of rows
Expand Down Expand Up @@ -143,27 +150,18 @@ class HiveDataSource : public DataSource {
std::unordered_map<std::string, std::shared_ptr<HiveColumnHandle>>
partitionKeys_;
std::shared_ptr<dwio::common::IoStatistics> ioStats_;
std::shared_ptr<common::ScanSpec> scanSpec_;
std::shared_ptr<common::MetadataFilter> metadataFilter_;
dwio::common::RowReaderOptions rowReaderOpts_;
std::unique_ptr<dwio::common::Reader> reader_;
std::unique_ptr<exec::ExprSet> remainingFilterExprSet_;
bool emptySplit_;

dwio::common::RuntimeStatistics runtimeStats_;

std::shared_ptr<FileHandle> fileHandle_;
core::ExpressionEvaluator* expressionEvaluator_;
uint64_t completedRows_ = 0;

// Reusable memory for remaining filter evaluation.
VectorPtr filterResult_;
SelectivityVector filterRows_;
exec::FilterEvalCtx filterEvalCtx_;

cache::AsyncDataCache* const cache_{nullptr};
const std::string& scanId_;
folly::Executor* executor_;
};

} // namespace facebook::velox::connector::hive
1 change: 1 addition & 0 deletions velox/connectors/hive/benchmarks/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ target_link_libraries(
velox_exec
velox_exec_test_lib
velox_hive_connector
velox_hive_iceberg_datasource
velox_hive_partition_function
velox_memory
Folly::folly
Expand Down
32 changes: 32 additions & 0 deletions velox/connectors/hive/iceberg/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# Copyright (c) Facebook, Inc. and its affiliates.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

add_library(velox_hive_iceberg_datasource OBJECT IcebergSplit.cpp
IcebergDataSource.cpp)

target_link_libraries(
velox_hive_iceberg_datasource
Folly::folly
gtest
Boost::headers
Boost::filesystem
gflags::gflags
glog::glog
gtest
gtest_main
xsimd)

if(${VELOX_BUILD_TESTING})
add_subdirectory(tests)
endif()
Loading

0 comments on commit 2ab2816

Please sign in to comment.