Skip to content

Commit

Permalink
Add support for writing iceberg tables
Browse files Browse the repository at this point in the history
Introduce IcebergPageSink
  • Loading branch information
imjalpreet committed Sep 13, 2024
1 parent 98bbb73 commit dcb40ae
Show file tree
Hide file tree
Showing 8 changed files with 445 additions and 48 deletions.
2 changes: 1 addition & 1 deletion velox/connectors/hive/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ velox_add_library(

velox_link_libraries(
velox_hive_connector
PUBLIC velox_hive_iceberg_splitreader
PUBLIC velox_hive_iceberg_connector
PRIVATE
velox_common_io
velox_connector
Expand Down
32 changes: 22 additions & 10 deletions velox/connectors/hive/HiveConnector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include "velox/common/base/Fs.h"
#include "velox/connectors/hive/HiveConfig.h"
#include "velox/connectors/hive/HiveDataSink.h"
#include "velox/connectors/hive/iceberg/IcebergDataSink.h"
#include "velox/connectors/hive/HiveDataSource.h"
#include "velox/connectors/hive/HivePartitionFunction.h"
#include "velox/dwio/dwrf/RegisterDwrfReader.h"
Expand Down Expand Up @@ -88,16 +89,27 @@ std::unique_ptr<DataSink> HiveConnector::createDataSink(
std::shared_ptr<ConnectorInsertTableHandle> connectorInsertTableHandle,
ConnectorQueryCtx* connectorQueryCtx,
CommitStrategy commitStrategy) {
auto hiveInsertHandle = std::dynamic_pointer_cast<HiveInsertTableHandle>(
connectorInsertTableHandle);
VELOX_CHECK_NOT_NULL(
hiveInsertHandle, "Hive connector expecting hive write handle!");
return std::make_unique<HiveDataSink>(
inputType,
hiveInsertHandle,
connectorQueryCtx,
commitStrategy,
hiveConfig_);
if (auto icebergInsertHandle = std::dynamic_pointer_cast<iceberg::IcebergInsertTableHandle>(connectorInsertTableHandle)) {
VELOX_CHECK_NOT_NULL(
icebergInsertHandle, "Hive Iceberg connector expecting iceberg write handle!");
return std::make_unique<iceberg::IcebergDataSink>(
inputType,
icebergInsertHandle,
connectorQueryCtx,
commitStrategy,
hiveConfig_);
}

if (auto hiveInsertHandle = std::dynamic_pointer_cast<HiveInsertTableHandle>(connectorInsertTableHandle)) {
VELOX_CHECK_NOT_NULL(
hiveInsertHandle, "Hive connector expecting hive write handle!");
return std::make_unique<HiveDataSink>(
inputType,
hiveInsertHandle,
connectorQueryCtx,
commitStrategy,
hiveConfig_);
}
}

std::unique_ptr<core::PartitionFunction> HivePartitionFunctionSpec::create(
Expand Down
55 changes: 27 additions & 28 deletions velox/connectors/hive/HiveDataSink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,32 +89,6 @@ std::vector<column_index_t> getPartitionChannels(
return channels;
}

// Returns the column indices of non-partition data columns.
std::vector<column_index_t> getNonPartitionChannels(
const std::vector<column_index_t>& partitionChannels,
const column_index_t childrenSize) {
std::vector<column_index_t> dataChannels;
dataChannels.reserve(childrenSize - partitionChannels.size());

for (column_index_t i = 0; i < childrenSize; i++) {
if (std::find(partitionChannels.cbegin(), partitionChannels.cend(), i) ==
partitionChannels.cend()) {
dataChannels.push_back(i);
}
}

return dataChannels;
}

std::string makePartitionDirectory(
const std::string& tableDirectory,
const std::optional<std::string>& partitionSubdirectory) {
if (partitionSubdirectory.has_value()) {
return fs::path(tableDirectory) / partitionSubdirectory.value();
}
return tableDirectory;
}

std::string makeUuid() {
return boost::lexical_cast<std::string>(boost::uuids::random_generator()());
}
Expand Down Expand Up @@ -374,8 +348,7 @@ HiveDataSink::HiveDataSink(
hiveConfig_->isPartitionPathAsLowerCase(
connectorQueryCtx->sessionProperties()))
: nullptr),
dataChannels_(
getNonPartitionChannels(partitionChannels_, inputType_->size())),
dataChannels_(getDataChannels(partitionChannels_, inputType_->size())),
bucketCount_(
insertTableHandle_->bucketProperty() == nullptr
? 0
Expand Down Expand Up @@ -844,6 +817,32 @@ void HiveDataSink::splitInputRowsAndEnsureWriters() {
}
}

// Returns the column indices of non-partition data columns.
std::vector<column_index_t> HiveDataSink::getDataChannels(
const std::vector<column_index_t>& partitionChannels,
const column_index_t childrenSize) const {
std::vector<column_index_t> dataChannels;
dataChannels.reserve(childrenSize - partitionChannels.size());

for (column_index_t i = 0; i < childrenSize; i++) {
if (std::find(partitionChannels.cbegin(), partitionChannels.cend(), i) ==
partitionChannels.cend()) {
dataChannels.push_back(i);
}
}

return dataChannels;
}

std::string HiveDataSink::makePartitionDirectory(
const std::string& tableDirectory,
const std::optional<std::string>& partitionSubdirectory) const {
if (partitionSubdirectory.has_value()) {
return fs::path(tableDirectory) / partitionSubdirectory.value();
}
return tableDirectory;
}

HiveWriterParameters HiveDataSink::getWriterParameters(
const std::optional<std::string>& partition,
std::optional<uint32_t> bucketId) const {
Expand Down
18 changes: 13 additions & 5 deletions velox/connectors/hive/HiveDataSink.h
Original file line number Diff line number Diff line change
Expand Up @@ -451,7 +451,7 @@ class HiveDataSink : public DataSink {

bool canReclaim() const;

private:
protected:
enum class State { kRunning = 0, kAborted = 1, kClosed = 2 };
friend struct fmt::formatter<
facebook::velox::connector::hive::HiveDataSink::State>;
Expand Down Expand Up @@ -528,7 +528,7 @@ class HiveDataSink : public DataSink {

// Get the HiveWriter corresponding to the row
// from partitionIds and bucketIds.
FOLLY_ALWAYS_INLINE HiveWriterId getWriterId(size_t row) const;
HiveWriterId getWriterId(size_t row) const;

// Computes the number of input rows as well as the actual input row indices
// to each corresponding (bucketed) partition based on the partition and
Expand All @@ -548,10 +548,18 @@ class HiveDataSink : public DataSink {
maybeCreateBucketSortWriter(
std::unique_ptr<facebook::velox::dwio::common::Writer> writer);

virtual std::string makePartitionDirectory(
const std::string& tableDirectory,
const std::optional<std::string>& partitionSubdirectory) const;

HiveWriterParameters getWriterParameters(
const std::optional<std::string>& partition,
std::optional<uint32_t> bucketId) const;

virtual std::vector<column_index_t> getDataChannels(
const std::vector<column_index_t>& partitionChannels,
const column_index_t childrenSize) const;

// Gets write and target file names for a writer based on the table commit
// strategy as well as table partitioned type. If commit is not required, the
// write file and target file has the same name. If not, add a temp file
Expand All @@ -569,7 +577,7 @@ class HiveDataSink : public DataSink {
}

// Invoked to write 'input' to the specified file writer.
void write(size_t index, RowVectorPtr input);
virtual void write(size_t index, RowVectorPtr input);

void closeInternal();

Expand All @@ -582,13 +590,13 @@ class HiveDataSink : public DataSink {
const uint32_t maxOpenWriters_;
const std::vector<column_index_t> partitionChannels_;
const std::unique_ptr<PartitionIdGenerator> partitionIdGenerator_;
// Indices of dataChannel are stored in ascending order
const std::vector<column_index_t> dataChannels_;
const int32_t bucketCount_{0};
const std::unique_ptr<core::PartitionFunction> bucketFunction_;
const std::shared_ptr<dwio::common::WriterFactory> writerFactory_;
const common::SpillConfig* const spillConfig_;

// Indices of dataChannel are stored in ascending order
std::vector<column_index_t> dataChannels_;
std::vector<column_index_t> sortColumnIndices_;
std::vector<CompareFlags> sortCompareFlags_;

Expand Down
6 changes: 3 additions & 3 deletions velox/connectors/hive/iceberg/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.

velox_add_library(velox_hive_iceberg_splitreader IcebergSplitReader.cpp
IcebergSplit.cpp PositionalDeleteFileReader.cpp)
velox_add_library(velox_hive_iceberg_connector IcebergSplitReader.cpp
IcebergSplit.cpp PositionalDeleteFileReader.cpp IcebergDataSink.cpp)

velox_link_libraries(velox_hive_iceberg_splitreader velox_connector
velox_link_libraries(velox_hive_iceberg_connector velox_connector
Folly::folly)

if(${VELOX_BUILD_TESTING})
Expand Down
184 changes: 184 additions & 0 deletions velox/connectors/hive/iceberg/IcebergDataSink.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "velox/connectors/hive/iceberg/IcebergDataSink.h"

#include "velox/common/base/Counters.h"
#include "velox/common/base/Fs.h"
#include "velox/connectors/hive/HiveConfig.h"
#include "velox/connectors/hive/HivePartitionFunction.h"
#include "velox/connectors/hive/TableHandle.h"
#include "velox/core/ITypedExpr.h"
#include "velox/exec/OperatorUtils.h"
#include <utility>

namespace facebook::velox::connector::hive::iceberg {

namespace {
#define WRITER_NON_RECLAIMABLE_SECTION_GUARD(index) \
memory::NonReclaimableSectionGuard nonReclaimableGuard( \
writerInfo_[(index)]->nonReclaimableSectionHolder.get())
} // namespace

IcebergDataSink::IcebergDataSink(
facebook::velox::RowTypePtr inputType,
std::shared_ptr<const IcebergInsertTableHandle> insertTableHandle,
const facebook::velox::connector::ConnectorQueryCtx* connectorQueryCtx,
facebook::velox::connector::CommitStrategy commitStrategy,
const std::shared_ptr<const HiveConfig>& hiveConfig)
: HiveDataSink(
std::move(inputType),
std::move(insertTableHandle),
connectorQueryCtx,
commitStrategy,
hiveConfig) {}

void IcebergDataSink::appendData(facebook::velox::RowVectorPtr input) {
checkRunning();

// Write to unpartitioned table.
if (!isPartitioned()) {
const auto index = ensureWriter(HiveWriterId::unpartitionedId());
write(index, input);
return;
}

dataChannels_ = getDataChannels(partitionChannels_, inputType_->size());

// Compute partition and bucket numbers.
computePartitionAndBucketIds(input);

// Lazy load all the input columns.
for (column_index_t i = 0; i < input->childrenSize(); ++i) {
input->childAt(i)->loadedVector();
}

// All inputs belong to a single non-bucketed partition. The partition id
// must be zero.
if (!isBucketed() && partitionIdGenerator_->numPartitions() == 1) {
const auto index = ensureWriter(HiveWriterId{0});
write(index, input);
return;
}

splitInputRowsAndEnsureWriters();

partitionData_.reserve(writers_.size());

const auto numRows = partitionIds_.size();

for (auto row = 0; row < numRows; ++row) {
auto id = getWriterId(row);
auto it = writerIndexMap_.find(id);
uint32_t index = -1;
if (it != writerIndexMap_.end()) {
index = it->second;
}

if (partitionData_[index] != nullptr) {
continue;
}

std::vector<std::string> partitionValues;
partitionValues.reserve(partitionChannels_.size());

auto icebergInsertTableHandle = std::dynamic_pointer_cast<const IcebergInsertTableHandle>(insertTableHandle_);

for (auto i = 0; i < partitionChannels_.size(); ++i) {
auto type = icebergInsertTableHandle->inputColumns()[partitionChannels_[i]]->dataType();
auto block = input->childAt(partitionChannels_[i]);
partitionValues.insert(partitionValues.begin() + i, block->toString(row));
}

partitionData_.insert(partitionData_.begin() + index, std::make_shared<PartitionData>(partitionValues));
}

for (auto index = 0; index < writers_.size(); ++index) {
const vector_size_t partitionSize = partitionSizes_[index];
if (partitionSize == 0) {
continue;
}

RowVectorPtr writerInput = partitionSize == input->size()
? input
: exec::wrap(partitionSize, partitionRows_[index], input);

write(index, writerInput);
}
}

void IcebergDataSink::write(size_t index, RowVectorPtr input) {
WRITER_NON_RECLAIMABLE_SECTION_GUARD(index);

writers_[index]->write(input);
writerInfo_[index]->numWrittenRows += input->size();
}

std::vector<std::string> IcebergDataSink::close() {
checkRunning();
state_ = State::kClosed;
closeInternal();

auto icebergInsertTableHandle = std::dynamic_pointer_cast<const IcebergInsertTableHandle>(insertTableHandle_);

std::vector<std::string> commitTasks;
commitTasks.reserve(writerInfo_.size());
auto fileFormat = toString(insertTableHandle_->tableStorageFormat());
std::string finalFileFormat(fileFormat);
std::transform(finalFileFormat.begin(), finalFileFormat.end(), finalFileFormat.begin(), ::toupper);

for (int i = 0; i < writerInfo_.size(); ++i) {
const auto& info = writerInfo_.at(i);
VELOX_CHECK_NOT_NULL(info);
// TODO(imjalpreet): Collect Iceberg file format metrics required by com.facebook.presto.iceberg.MetricsWrapper->org.apache.iceberg.Metrics#Metrics
// clang-format off
auto commitDataJson = folly::toJson(
folly::dynamic::object
("path", info->writerParameters.writeDirectory() + "/" + info->writerParameters.writeFileName())
("fileSizeInBytes", ioStats_.at(i)->rawBytesWritten())
("metrics", folly::dynamic::object
("recordCount", info->numWrittenRows))
("partitionSpecId", icebergInsertTableHandle->partitionSpec()->specId)
("partitionDataJson", partitionData_[i]->toJson())
("fileFormat", finalFileFormat)
("referencedDataFile", nullptr));
// clang-format on
commitTasks.push_back(commitDataJson);
}
return commitTasks;
}

std::string IcebergDataSink::makePartitionDirectory(
const std::string& tableDirectory,
const std::optional<std::string>& partitionSubdirectory) const {
std::string tableLocation = fmt::format("{}/data", tableDirectory);
if (partitionSubdirectory.has_value()) {
return fs::path(tableLocation) / partitionSubdirectory.value();
}
return tableLocation;
}

// Returns the column indices of data columns.
std::vector<column_index_t> IcebergDataSink::getDataChannels(
const std::vector<column_index_t>& partitionChannels,
const column_index_t childrenSize) const {
// Create a vector of all possible channels
std::vector<column_index_t> dataChannels(childrenSize);
std::iota(dataChannels.begin(), dataChannels.end(), 0);

return dataChannels;
}
}
Loading

0 comments on commit dcb40ae

Please sign in to comment.