Skip to content

Commit

Permalink
Add support for writing iceberg tables
Browse files Browse the repository at this point in the history
Introduce IcebergPageSink
  • Loading branch information
imjalpreet committed Sep 13, 2024
1 parent 98bbb73 commit 02e6d32
Show file tree
Hide file tree
Showing 8 changed files with 464 additions and 49 deletions.
2 changes: 1 addition & 1 deletion velox/connectors/hive/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ velox_add_library(

velox_link_libraries(
velox_hive_connector
PUBLIC velox_hive_iceberg_splitreader
PUBLIC velox_hive_iceberg_connector
PRIVATE
velox_common_io
velox_connector
Expand Down
36 changes: 26 additions & 10 deletions velox/connectors/hive/HiveConnector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include "velox/connectors/hive/HiveDataSink.h"
#include "velox/connectors/hive/HiveDataSource.h"
#include "velox/connectors/hive/HivePartitionFunction.h"
#include "velox/connectors/hive/iceberg/IcebergDataSink.h"
#include "velox/dwio/dwrf/RegisterDwrfReader.h"
#include "velox/dwio/dwrf/RegisterDwrfWriter.h"

Expand Down Expand Up @@ -88,16 +89,31 @@ std::unique_ptr<DataSink> HiveConnector::createDataSink(
std::shared_ptr<ConnectorInsertTableHandle> connectorInsertTableHandle,
ConnectorQueryCtx* connectorQueryCtx,
CommitStrategy commitStrategy) {
auto hiveInsertHandle = std::dynamic_pointer_cast<HiveInsertTableHandle>(
connectorInsertTableHandle);
VELOX_CHECK_NOT_NULL(
hiveInsertHandle, "Hive connector expecting hive write handle!");
return std::make_unique<HiveDataSink>(
inputType,
hiveInsertHandle,
connectorQueryCtx,
commitStrategy,
hiveConfig_);
if (auto icebergInsertHandle =
std::dynamic_pointer_cast<iceberg::IcebergInsertTableHandle>(
connectorInsertTableHandle)) {
VELOX_CHECK_NOT_NULL(
icebergInsertHandle,
"Hive Iceberg connector expecting iceberg write handle!");
return std::make_unique<iceberg::IcebergDataSink>(
inputType,
icebergInsertHandle,
connectorQueryCtx,
commitStrategy,
hiveConfig_);
} else {
auto hiveInsertHandle = std::dynamic_pointer_cast<HiveInsertTableHandle>(
connectorInsertTableHandle);

VELOX_CHECK_NOT_NULL(
hiveInsertHandle, "Hive connector expecting hive write handle!");
return std::make_unique<HiveDataSink>(
inputType,
hiveInsertHandle,
connectorQueryCtx,
commitStrategy,
hiveConfig_);
}
}

std::unique_ptr<core::PartitionFunction> HivePartitionFunctionSpec::create(
Expand Down
55 changes: 27 additions & 28 deletions velox/connectors/hive/HiveDataSink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,32 +89,6 @@ std::vector<column_index_t> getPartitionChannels(
return channels;
}

// Returns the column indices of non-partition data columns.
std::vector<column_index_t> getNonPartitionChannels(
const std::vector<column_index_t>& partitionChannels,
const column_index_t childrenSize) {
std::vector<column_index_t> dataChannels;
dataChannels.reserve(childrenSize - partitionChannels.size());

for (column_index_t i = 0; i < childrenSize; i++) {
if (std::find(partitionChannels.cbegin(), partitionChannels.cend(), i) ==
partitionChannels.cend()) {
dataChannels.push_back(i);
}
}

return dataChannels;
}

std::string makePartitionDirectory(
const std::string& tableDirectory,
const std::optional<std::string>& partitionSubdirectory) {
if (partitionSubdirectory.has_value()) {
return fs::path(tableDirectory) / partitionSubdirectory.value();
}
return tableDirectory;
}

std::string makeUuid() {
return boost::lexical_cast<std::string>(boost::uuids::random_generator()());
}
Expand Down Expand Up @@ -374,8 +348,7 @@ HiveDataSink::HiveDataSink(
hiveConfig_->isPartitionPathAsLowerCase(
connectorQueryCtx->sessionProperties()))
: nullptr),
dataChannels_(
getNonPartitionChannels(partitionChannels_, inputType_->size())),
dataChannels_(getDataChannels(partitionChannels_, inputType_->size())),
bucketCount_(
insertTableHandle_->bucketProperty() == nullptr
? 0
Expand Down Expand Up @@ -844,6 +817,32 @@ void HiveDataSink::splitInputRowsAndEnsureWriters() {
}
}

// Returns the column indices of non-partition data columns.
std::vector<column_index_t> HiveDataSink::getDataChannels(
const std::vector<column_index_t>& partitionChannels,
const column_index_t childrenSize) const {
std::vector<column_index_t> dataChannels;
dataChannels.reserve(childrenSize - partitionChannels.size());

for (column_index_t i = 0; i < childrenSize; i++) {
if (std::find(partitionChannels.cbegin(), partitionChannels.cend(), i) ==
partitionChannels.cend()) {
dataChannels.push_back(i);
}
}

return dataChannels;
}

std::string HiveDataSink::makePartitionDirectory(
const std::string& tableDirectory,
const std::optional<std::string>& partitionSubdirectory) const {
if (partitionSubdirectory.has_value()) {
return fs::path(tableDirectory) / partitionSubdirectory.value();
}
return tableDirectory;
}

HiveWriterParameters HiveDataSink::getWriterParameters(
const std::optional<std::string>& partition,
std::optional<uint32_t> bucketId) const {
Expand Down
18 changes: 13 additions & 5 deletions velox/connectors/hive/HiveDataSink.h
Original file line number Diff line number Diff line change
Expand Up @@ -451,7 +451,7 @@ class HiveDataSink : public DataSink {

bool canReclaim() const;

private:
protected:
enum class State { kRunning = 0, kAborted = 1, kClosed = 2 };
friend struct fmt::formatter<
facebook::velox::connector::hive::HiveDataSink::State>;
Expand Down Expand Up @@ -528,7 +528,7 @@ class HiveDataSink : public DataSink {

// Get the HiveWriter corresponding to the row
// from partitionIds and bucketIds.
FOLLY_ALWAYS_INLINE HiveWriterId getWriterId(size_t row) const;
HiveWriterId getWriterId(size_t row) const;

// Computes the number of input rows as well as the actual input row indices
// to each corresponding (bucketed) partition based on the partition and
Expand All @@ -548,10 +548,18 @@ class HiveDataSink : public DataSink {
maybeCreateBucketSortWriter(
std::unique_ptr<facebook::velox::dwio::common::Writer> writer);

virtual std::string makePartitionDirectory(
const std::string& tableDirectory,
const std::optional<std::string>& partitionSubdirectory) const;

HiveWriterParameters getWriterParameters(
const std::optional<std::string>& partition,
std::optional<uint32_t> bucketId) const;

virtual std::vector<column_index_t> getDataChannels(
const std::vector<column_index_t>& partitionChannels,
const column_index_t childrenSize) const;

// Gets write and target file names for a writer based on the table commit
// strategy as well as table partitioned type. If commit is not required, the
// write file and target file has the same name. If not, add a temp file
Expand All @@ -569,7 +577,7 @@ class HiveDataSink : public DataSink {
}

// Invoked to write 'input' to the specified file writer.
void write(size_t index, RowVectorPtr input);
virtual void write(size_t index, RowVectorPtr input);

void closeInternal();

Expand All @@ -582,13 +590,13 @@ class HiveDataSink : public DataSink {
const uint32_t maxOpenWriters_;
const std::vector<column_index_t> partitionChannels_;
const std::unique_ptr<PartitionIdGenerator> partitionIdGenerator_;
// Indices of dataChannel are stored in ascending order
const std::vector<column_index_t> dataChannels_;
const int32_t bucketCount_{0};
const std::unique_ptr<core::PartitionFunction> bucketFunction_;
const std::shared_ptr<dwio::common::WriterFactory> writerFactory_;
const common::SpillConfig* const spillConfig_;

// Indices of dataChannel are stored in ascending order
std::vector<column_index_t> dataChannels_;
std::vector<column_index_t> sortColumnIndices_;
std::vector<CompareFlags> sortCompareFlags_;

Expand Down
11 changes: 7 additions & 4 deletions velox/connectors/hive/iceberg/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,14 @@
# See the License for the specific language governing permissions and
# limitations under the License.

velox_add_library(velox_hive_iceberg_splitreader IcebergSplitReader.cpp
IcebergSplit.cpp PositionalDeleteFileReader.cpp)
velox_add_library(
velox_hive_iceberg_connector
IcebergSplitReader.cpp
IcebergSplit.cpp
PositionalDeleteFileReader.cpp
IcebergDataSink.cpp)

velox_link_libraries(velox_hive_iceberg_splitreader velox_connector
Folly::folly)
velox_link_libraries(velox_hive_iceberg_connector velox_connector Folly::folly)

if(${VELOX_BUILD_TESTING})
add_subdirectory(tests)
Expand Down
Loading

0 comments on commit 02e6d32

Please sign in to comment.