Skip to content

Commit

Permalink
Iceberg positional deletes read support
Browse files Browse the repository at this point in the history
  • Loading branch information
yingsu00 committed Aug 3, 2023
1 parent fe4b05c commit ecba7d1
Show file tree
Hide file tree
Showing 42 changed files with 993 additions and 33 deletions.
1 change: 1 addition & 0 deletions velox/benchmarks/tpch/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ target_link_libraries(
velox_dwio_parquet_reader
velox_dwio_common_test_utils
velox_hive_connector
velox_hive_iceberg_datasource
velox_exception
velox_memory
velox_process
Expand Down
3 changes: 2 additions & 1 deletion velox/connectors/Connector.h
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,8 @@ class Connector {
const std::unordered_map<
std::string,
std::shared_ptr<connector::ColumnHandle>>& columnHandles,
ConnectorQueryCtx* FOLLY_NONNULL connectorQueryCtx) = 0;
ConnectorQueryCtx* FOLLY_NONNULL connectorQueryCtx,
std::shared_ptr<velox::connector::ConnectorSplit> connectorSplit) = 0;

// Returns true if addSplit of DataSource can use 'dataSource' from
// ConnectorSplit in addSplit(). If so, TableScan can preload splits
Expand Down
4 changes: 3 additions & 1 deletion velox/connectors/fuzzer/FuzzerConnector.h
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,9 @@ class FuzzerConnector final : public Connector {
const std::unordered_map<
std::string,
std::shared_ptr<connector::ColumnHandle>>& /*columnHandles*/,
ConnectorQueryCtx* FOLLY_NONNULL connectorQueryCtx) override final {
ConnectorQueryCtx* FOLLY_NONNULL connectorQueryCtx,
std::shared_ptr<velox::connector::ConnectorSplit> /*connectorSplit*/)
override final {
return std::make_unique<FuzzerDataSource>(
outputType, tableHandle, connectorQueryCtx->memoryPool());
}
Expand Down
3 changes: 3 additions & 0 deletions velox/connectors/hive/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ target_link_libraries(
velox_dwio_parquet_reader
velox_dwio_parquet_writer
velox_file
velox_hive_iceberg_datasource
velox_hive_partition_function
velox_s3fs
velox_hdfs
Expand All @@ -43,6 +44,8 @@ target_link_libraries(velox_hive_partition_function velox_core velox_exec)

add_subdirectory(storage_adapters)

add_subdirectory(iceberg)

if(${VELOX_BUILD_TESTING})
add_subdirectory(tests)
endif()
Expand Down
42 changes: 31 additions & 11 deletions velox/connectors/hive/HiveConnector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
#include "velox/dwio/parquet/RegisterParquetReader.h" // @manual
#include "velox/dwio/parquet/RegisterParquetWriter.h" // @manual
#endif
#include "velox/connectors/hive/iceberg/IcebergDataSource.h"
#include "velox/expression/FieldReference.h"

#include <boost/lexical_cast.hpp>
Expand Down Expand Up @@ -76,7 +77,8 @@ std::unique_ptr<DataSource> HiveConnector::createDataSource(
const std::unordered_map<
std::string,
std::shared_ptr<connector::ColumnHandle>>& columnHandles,
ConnectorQueryCtx* connectorQueryCtx) {
ConnectorQueryCtx* connectorQueryCtx,
std::shared_ptr<velox::connector::ConnectorSplit> connectorSplit) {
dwio::common::ReaderOptions options(connectorQueryCtx->memoryPool());
options.setMaxCoalesceBytes(
HiveConfig::maxCoalescedBytes(connectorQueryCtx->config()));
Expand All @@ -85,16 +87,34 @@ std::unique_ptr<DataSource> HiveConnector::createDataSource(
options.setFileColumnNamesReadAsLowerCase(
HiveConfig::isFileColumnNamesReadAsLowerCase(
connectorQueryCtx->config()));
return std::make_unique<HiveDataSource>(
outputType,
tableHandle,
columnHandles,
&fileHandleFactory_,
connectorQueryCtx->expressionEvaluator(),
connectorQueryCtx->cache(),
connectorQueryCtx->scanId(),
executor_,
options);

auto splitInfo =
std::dynamic_pointer_cast<HiveConnectorSplit>(connectorSplit);
if (splitInfo->customSplitInfo["table_format"] == "hive_iceberg") {
return std::make_unique<iceberg::HiveIcebergDataSource>(
outputType,
tableHandle,
columnHandles,
&fileHandleFactory_,
connectorQueryCtx->expressionEvaluator(),
connectorQueryCtx->cache(),
connectorQueryCtx->scanId(),
executor_,
options,
connectorSplit);
} else {
return std::make_unique<HiveDataSource>(
outputType,
tableHandle,
columnHandles,
&fileHandleFactory_,
connectorQueryCtx->expressionEvaluator(),
connectorQueryCtx->cache(),
connectorQueryCtx->scanId(),
executor_,
options,
connectorSplit);
}
}

std::unique_ptr<DataSink> HiveConnector::createDataSink(
Expand Down
4 changes: 3 additions & 1 deletion velox/connectors/hive/HiveConnector.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,9 @@ class HiveConnector : public Connector {
const std::unordered_map<
std::string,
std::shared_ptr<connector::ColumnHandle>>& columnHandles,
ConnectorQueryCtx* connectorQueryCtx) override;
ConnectorQueryCtx* connectorQueryCtx,
std::shared_ptr<velox::connector::ConnectorSplit> connectorSplit)
override;

bool supportsSplitPreload() override {
return true;
Expand Down
7 changes: 4 additions & 3 deletions velox/connectors/hive/HiveDataSource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -389,15 +389,16 @@ HiveDataSource::HiveDataSource(
cache::AsyncDataCache* cache,
const std::string& scanId,
folly::Executor* executor,
const dwio::common::ReaderOptions& options)
const dwio::common::ReaderOptions& options,
std::shared_ptr<velox::connector::ConnectorSplit> /*connectorSplit*/)
: fileHandleFactory_(fileHandleFactory),
readerOpts_(options),
pool_(&options.getMemoryPool()),
outputType_(outputType),
expressionEvaluator_(expressionEvaluator),
cache_(cache),
scanId_(scanId),
executor_(executor) {
executor_(executor),
outputType_(outputType) {
// Column handled keyed on the column alias, the name used in the query.
for (const auto& [canonicalizedName, columnHandle] : columnHandles) {
auto handle = std::dynamic_pointer_cast<HiveColumnHandle>(columnHandle);
Expand Down
18 changes: 8 additions & 10 deletions velox/connectors/hive/HiveDataSource.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ class HiveDataSource : public DataSource {
cache::AsyncDataCache* cache,
const std::string& scanId,
folly::Executor* executor,
const dwio::common::ReaderOptions& options);
const dwio::common::ReaderOptions& options,
std::shared_ptr<velox::connector::ConnectorSplit> connectorSplit);

void addSplit(std::shared_ptr<ConnectorSplit> split) override;

Expand Down Expand Up @@ -101,12 +102,18 @@ class HiveDataSource : public DataSource {
}

std::shared_ptr<HiveConnectorSplit> split_;
std::shared_ptr<common::ScanSpec> scanSpec_;
FileHandleFactory* fileHandleFactory_;
dwio::common::ReaderOptions readerOpts_;
memory::MemoryPool* pool_;
VectorPtr output_;
RowTypePtr readerOutputType_;
std::unique_ptr<dwio::common::Reader> reader_;
std::unique_ptr<dwio::common::RowReader> rowReader_;
core::ExpressionEvaluator* expressionEvaluator_;
cache::AsyncDataCache* const cache_{nullptr};
const std::string& scanId_;
folly::Executor* executor_;

private:
// Evaluates remainingFilter_ on the specified vector. Returns number of rows
Expand Down Expand Up @@ -143,27 +150,18 @@ class HiveDataSource : public DataSource {
std::unordered_map<std::string, std::shared_ptr<HiveColumnHandle>>
partitionKeys_;
std::shared_ptr<dwio::common::IoStatistics> ioStats_;
std::shared_ptr<common::ScanSpec> scanSpec_;
std::shared_ptr<common::MetadataFilter> metadataFilter_;
dwio::common::RowReaderOptions rowReaderOpts_;
std::unique_ptr<dwio::common::Reader> reader_;
std::unique_ptr<exec::ExprSet> remainingFilterExprSet_;
bool emptySplit_;

dwio::common::RuntimeStatistics runtimeStats_;

std::shared_ptr<FileHandle> fileHandle_;
core::ExpressionEvaluator* expressionEvaluator_;
uint64_t completedRows_ = 0;

// Reusable memory for remaining filter evaluation.
VectorPtr filterResult_;
SelectivityVector filterRows_;
exec::FilterEvalCtx filterEvalCtx_;

cache::AsyncDataCache* const cache_{nullptr};
const std::string& scanId_;
folly::Executor* executor_;
};

} // namespace facebook::velox::connector::hive
1 change: 1 addition & 0 deletions velox/connectors/hive/benchmarks/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ target_link_libraries(
velox_exec
velox_exec_test_lib
velox_hive_connector
velox_hive_iceberg_datasource
velox_hive_partition_function
velox_memory
Folly::folly
Expand Down
32 changes: 32 additions & 0 deletions velox/connectors/hive/iceberg/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# Copyright (c) Facebook, Inc. and its affiliates.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

add_library(velox_hive_iceberg_datasource OBJECT IcebergSplit.cpp
IcebergDataSource.cpp)

target_link_libraries(
velox_hive_iceberg_datasource
Folly::folly
gtest
Boost::headers
Boost::filesystem
gflags::gflags
glog::glog
gtest
gtest_main
xsimd)

if(${VELOX_BUILD_TESTING})
add_subdirectory(tests)
endif()
Loading

0 comments on commit ecba7d1

Please sign in to comment.