Skip to content

Commit

Permalink
Introducing IcebergSplitReader
Browse files Browse the repository at this point in the history
The IcebergSplitReader now supports Iceberg positional deletes.
  • Loading branch information
yingsu00 committed Nov 1, 2023
1 parent 937130a commit 23ce4b1
Show file tree
Hide file tree
Showing 39 changed files with 1,054 additions and 32 deletions.
3 changes: 3 additions & 0 deletions scripts/setup-macos.sh
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@
# $ scripts/setup-macos.sh install_googletest install_fmt
#

INSTALL_PREREQUISITES="N"
PROMPT_ALWAYS_RESPOND="n"

set -e # Exit on error.
set -x # Print commands that are executed.

Expand Down
1 change: 1 addition & 0 deletions velox/benchmarks/tpch/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ target_link_libraries(
velox_dwio_parquet_reader
velox_dwio_common_test_utils
velox_hive_connector
# velox_hive_iceberg_splitreader
velox_exception
velox_memory
velox_process
Expand Down
15 changes: 13 additions & 2 deletions velox/connectors/hive/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ add_library(velox_hive_config OBJECT HiveConfig.cpp)

target_link_libraries(velox_hive_config velox_exception)

add_subdirectory(iceberg)

#find_package(iceberg CONFIG REQUIRED)

add_library(
velox_hive_connector OBJECT
FileHandle.cpp
Expand All @@ -28,9 +32,10 @@ add_library(
SplitReader.cpp
TableHandle.cpp)

#target_include_directories(velox_hive_connector PUBLIC iceberg::iceberg)
target_link_libraries(
velox_hive_connector
velox_common_io
PRIVATE velox_common_io
velox_connector
velox_dwio_catalog_fbhive
velox_dwio_dwrf_reader
Expand All @@ -41,12 +46,17 @@ target_link_libraries(
velox_hive_partition_function
velox_s3fs
velox_hdfs
velox_gcs)
velox_gcs
PUBLIC velox_hive_iceberg_splitreader
)



add_library(velox_hive_partition_function HivePartitionFunction.cpp)

target_link_libraries(velox_hive_partition_function velox_core velox_exec)

#add_subdirectory(iceberg)
add_subdirectory(storage_adapters)

if(${VELOX_BUILD_TESTING})
Expand All @@ -56,3 +66,4 @@ endif()
if(${VELOX_ENABLE_BENCHMARKS})
add_subdirectory(benchmarks)
endif()

8 changes: 4 additions & 4 deletions velox/connectors/hive/FileHandle.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,15 @@

#pragma once

#include <cstdint>
#include <memory>
#include <string>

#include "velox/common/caching/CachedFactory.h"
#include "velox/common/caching/FileIds.h"
#include "velox/common/file/File.h"
#include "velox/dwio/common/InputStream.h"

//#include <cstdint>
//#include <memory>
//#include <string>

namespace facebook::velox {

class Config;
Expand Down
3 changes: 1 addition & 2 deletions velox/connectors/hive/HiveDataSource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -501,8 +501,8 @@ std::unique_ptr<SplitReader> HiveDataSource::createSplitReader() {
readerOutputType_,
&partitionKeys_,
fileHandleFactory_,
executor_,
connectorQueryCtx_,
executor_,
ioStats_);
}

Expand All @@ -518,7 +518,6 @@ void HiveDataSource::addSplit(std::shared_ptr<ConnectorSplit> split) {
if (splitReader_) {
splitReader_.reset();
}

splitReader_ = createSplitReader();
splitReader_->prepareSplit(metadataFilter_, runtimeStats_);
}
Expand Down
6 changes: 3 additions & 3 deletions velox/connectors/hive/HiveDataSource.h
Original file line number Diff line number Diff line change
Expand Up @@ -141,9 +141,9 @@ class HiveDataSource : public DataSource {
SelectivityVector filterRows_;
exec::FilterEvalCtx filterEvalCtx_;

FileHandleFactory* fileHandleFactory_;
const ConnectorQueryCtx* const connectorQueryCtx_;
folly::Executor* executor_;
FileHandleFactory* const fileHandleFactory_;
ConnectorQueryCtx* const connectorQueryCtx_;
folly::Executor* const executor_;
};

} // namespace facebook::velox::connector::hive
44 changes: 30 additions & 14 deletions velox/connectors/hive/SplitReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,13 @@
#include "velox/connectors/hive/HiveConfig.h"
#include "velox/connectors/hive/HiveConnectorSplit.h"
#include "velox/connectors/hive/TableHandle.h"
#include "velox/connectors/hive/iceberg/IcebergSplitReader.h"
#include "velox/dwio/common/CachedBufferedInput.h"
#include "velox/dwio/common/Options.h"
#include "velox/dwio/common/ReaderFactory.h"

#include <folly/Conv.h>
#include <gtest/gtest.h>
// #include <gtest/gtest.h>

#include <string>
#include <unordered_map>
Expand Down Expand Up @@ -144,19 +145,34 @@ std::unique_ptr<SplitReader> SplitReader::create(
std::unordered_map<std::string, std::shared_ptr<HiveColumnHandle>>*
partitionKeys,
FileHandleFactory* fileHandleFactory,
ConnectorQueryCtx* connectorQueryCtx,
folly::Executor* executor,
const ConnectorQueryCtx* connectorQueryCtx,
std::shared_ptr<io::IoStatistics> ioStats) {
return std::make_unique<SplitReader>(
hiveSplit,
hiveTableHandle,
scanSpec,
readerOutputType,
partitionKeys,
fileHandleFactory,
executor,
connectorQueryCtx,
ioStats);
// Create the SplitReader based on hiveSplit->customSplitInfo["table_format"]
if (hiveSplit->customSplitInfo["table_format"] == "hive_iceberg") {
return std::make_unique<iceberg::IcebergSplitReader>(
hiveSplit,
hiveTableHandle,
scanSpec,
readerOutputType,
partitionKeys,
fileHandleFactory,

connectorQueryCtx,
executor,
ioStats);
} else {
return std::make_unique<SplitReader>(
hiveSplit,
hiveTableHandle,
scanSpec,
readerOutputType,
partitionKeys,
fileHandleFactory,
connectorQueryCtx,
executor,
ioStats);
}
}

SplitReader::SplitReader(
Expand All @@ -167,17 +183,17 @@ SplitReader::SplitReader(
std::unordered_map<std::string, std::shared_ptr<HiveColumnHandle>>*
partitionKeys,
FileHandleFactory* fileHandleFactory,
ConnectorQueryCtx* connectorQueryCtx,
folly::Executor* executor,
const ConnectorQueryCtx* connectorQueryCtx,
std::shared_ptr<io::IoStatistics> ioStats)
: hiveSplit_(hiveSplit),
hiveTableHandle_(hiveTableHandle),
scanSpec_(scanSpec),
readerOutputType_(readerOutputType),
partitionKeys_(partitionKeys),
fileHandleFactory_(fileHandleFactory),
executor_(executor),
connectorQueryCtx_(connectorQueryCtx),
executor_(executor),
ioStats_(ioStats),
baseReaderOpts_(connectorQueryCtx->memoryPool()) {}

Expand Down
10 changes: 5 additions & 5 deletions velox/connectors/hive/SplitReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ class SplitReader {
std::unordered_map<std::string, std::shared_ptr<HiveColumnHandle>>*
partitionKeys,
FileHandleFactory* fileHandleFactory,
folly::Executor* executor,
const ConnectorQueryCtx* connectorQueryCtx,
ConnectorQueryCtx* connectorQueryCtx,
folly::Executor* executor,
std::shared_ptr<io::IoStatistics> ioStats);

SplitReader(
Expand All @@ -63,8 +63,8 @@ class SplitReader {
std::unordered_map<std::string, std::shared_ptr<HiveColumnHandle>>*
partitionKeys,
FileHandleFactory* fileHandleFactory,
folly::Executor* executor,
const ConnectorQueryCtx* connectorQueryCtx,
ConnectorQueryCtx* connectorQueryCtx,
folly::Executor* executor,
std::shared_ptr<io::IoStatistics> ioStats);

virtual ~SplitReader() = default;
Expand Down Expand Up @@ -122,8 +122,8 @@ class SplitReader {
std::unique_ptr<dwio::common::Reader> baseReader_;
std::unique_ptr<dwio::common::RowReader> baseRowReader_;
FileHandleFactory* const fileHandleFactory_;
ConnectorQueryCtx* const connectorQueryCtx_;
folly::Executor* const executor_;
const ConnectorQueryCtx* const connectorQueryCtx_;
std::shared_ptr<io::IoStatistics> ioStats_;

private:
Expand Down
1 change: 1 addition & 0 deletions velox/connectors/hive/benchmarks/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ target_link_libraries(
velox_exec
velox_exec_test_lib
velox_hive_connector
velox_hive_iceberg_splitreader
velox_hive_partition_function
velox_memory
Folly::folly
Expand Down
40 changes: 40 additions & 0 deletions velox/connectors/hive/iceberg/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# Copyright (c) Facebook, Inc. and its affiliates.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

#add_library(velox_hive_iceberg_splitreader OBJECT IMPORTED GLOBAL
# IcebergSplitReader.cpp IcebergSplit.cpp DeleteFileReader.cpp)

add_library(velox_hive_iceberg_splitreader OBJECT
IcebergSplitReader.cpp IcebergSplit.cpp DeleteFileReader.cpp)

target_link_libraries(
velox_hive_iceberg_splitreader
Folly::folly
gtest
Boost::headers
Boost::filesystem
gflags::gflags
glog::glog
gtest
gtest_main
xsimd)

#install(TARGETS velox_hive_iceberg_splitreader EXPORT iceberg)
#message (STATUS "Installed velox_hive_iceberg_splitreader")
#install(EXPORT iceberg
# FILE icebergConfig.cmake
# NAMESPACE iceberg::
# DESTINATION lib/cmake/iceberg)

add_subdirectory(tests)
Loading

0 comments on commit 23ce4b1

Please sign in to comment.