Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for writing iceberg tables #10996

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion velox/connectors/hive/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ velox_add_library(

velox_link_libraries(
velox_hive_connector
PUBLIC velox_hive_iceberg_splitreader
PUBLIC velox_hive_iceberg_connector
PRIVATE
velox_common_io
velox_connector
Expand Down
36 changes: 26 additions & 10 deletions velox/connectors/hive/HiveConnector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include "velox/connectors/hive/HiveDataSink.h"
#include "velox/connectors/hive/HiveDataSource.h"
#include "velox/connectors/hive/HivePartitionFunction.h"
#include "velox/connectors/hive/iceberg/IcebergDataSink.h"
#include "velox/dwio/dwrf/RegisterDwrfReader.h"
#include "velox/dwio/dwrf/RegisterDwrfWriter.h"

Expand Down Expand Up @@ -88,16 +89,31 @@ std::unique_ptr<DataSink> HiveConnector::createDataSink(
std::shared_ptr<ConnectorInsertTableHandle> connectorInsertTableHandle,
ConnectorQueryCtx* connectorQueryCtx,
CommitStrategy commitStrategy) {
auto hiveInsertHandle = std::dynamic_pointer_cast<HiveInsertTableHandle>(
connectorInsertTableHandle);
VELOX_CHECK_NOT_NULL(
hiveInsertHandle, "Hive connector expecting hive write handle!");
return std::make_unique<HiveDataSink>(
inputType,
hiveInsertHandle,
connectorQueryCtx,
commitStrategy,
hiveConfig_);
if (auto icebergInsertHandle =
std::dynamic_pointer_cast<iceberg::IcebergInsertTableHandle>(
connectorInsertTableHandle)) {
VELOX_CHECK_NOT_NULL(
icebergInsertHandle,
"Hive Iceberg connector expecting iceberg write handle!");
return std::make_unique<iceberg::IcebergDataSink>(
inputType,
icebergInsertHandle,
connectorQueryCtx,
commitStrategy,
hiveConfig_);
} else {
auto hiveInsertHandle = std::dynamic_pointer_cast<HiveInsertTableHandle>(
connectorInsertTableHandle);

VELOX_CHECK_NOT_NULL(
hiveInsertHandle, "Hive connector expecting hive write handle!");
return std::make_unique<HiveDataSink>(
inputType,
hiveInsertHandle,
connectorQueryCtx,
commitStrategy,
hiveConfig_);
}
}

std::unique_ptr<core::PartitionFunction> HivePartitionFunctionSpec::create(
Expand Down
73 changes: 39 additions & 34 deletions velox/connectors/hive/HiveDataSink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,32 +89,6 @@ std::vector<column_index_t> getPartitionChannels(
return channels;
}

// Returns the column indices of non-partition data columns.
std::vector<column_index_t> getNonPartitionChannels(
const std::vector<column_index_t>& partitionChannels,
const column_index_t childrenSize) {
std::vector<column_index_t> dataChannels;
dataChannels.reserve(childrenSize - partitionChannels.size());

for (column_index_t i = 0; i < childrenSize; i++) {
if (std::find(partitionChannels.cbegin(), partitionChannels.cend(), i) ==
partitionChannels.cend()) {
dataChannels.push_back(i);
}
}

return dataChannels;
}

std::string makePartitionDirectory(
const std::string& tableDirectory,
const std::optional<std::string>& partitionSubdirectory) {
if (partitionSubdirectory.has_value()) {
return fs::path(tableDirectory) / partitionSubdirectory.value();
}
return tableDirectory;
}

std::string makeUuid() {
return boost::lexical_cast<std::string>(boost::uuids::random_generator()());
}
Expand Down Expand Up @@ -374,8 +348,6 @@ HiveDataSink::HiveDataSink(
hiveConfig_->isPartitionPathAsLowerCase(
connectorQueryCtx->sessionProperties()))
: nullptr),
dataChannels_(
getNonPartitionChannels(partitionChannels_, inputType_->size())),
bucketCount_(
insertTableHandle_->bucketProperty() == nullptr
? 0
Expand Down Expand Up @@ -438,6 +410,10 @@ void HiveDataSink::appendData(RowVectorPtr input) {
return;
}

if (dataChannels_.empty()) {
dataChannels_ = getDataChannels();
}

// Compute partition and bucket numbers.
computePartitionAndBucketIds(input);

Expand All @@ -454,7 +430,7 @@ void HiveDataSink::appendData(RowVectorPtr input) {
return;
}

splitInputRowsAndEnsureWriters();
splitInputRowsAndEnsureWriters(input);

for (auto index = 0; index < writers_.size(); ++index) {
const vector_size_t partitionSize = partitionSizes_[index];
Expand Down Expand Up @@ -759,10 +735,8 @@ uint32_t HiveDataSink::appendWriter(const HiveWriterId& id) {
options);
writer = maybeCreateBucketSortWriter(std::move(writer));
writers_.emplace_back(std::move(writer));
// Extends the buffer used for partition rows calculations.
partitionSizes_.emplace_back(0);
partitionRows_.emplace_back(nullptr);
rawPartitionRows_.emplace_back(nullptr);

extendBuffersForPartitionedTables();

writerIndexMap_.emplace(id, writers_.size() - 1);
return writerIndexMap_[id];
Expand Down Expand Up @@ -794,6 +768,13 @@ HiveDataSink::maybeCreateBucketSortWriter(
connectorQueryCtx_->sessionProperties()));
}

void HiveDataSink::extendBuffersForPartitionedTables() {
// Extends the buffer used for partition rows calculations.
partitionSizes_.emplace_back(0);
partitionRows_.emplace_back(nullptr);
rawPartitionRows_.emplace_back(nullptr);
}

HiveWriterId HiveDataSink::getWriterId(size_t row) const {
std::optional<int32_t> partitionId;
if (isPartitioned()) {
Expand All @@ -808,7 +789,7 @@ HiveWriterId HiveDataSink::getWriterId(size_t row) const {
return HiveWriterId{partitionId, bucketId};
}

void HiveDataSink::splitInputRowsAndEnsureWriters() {
void HiveDataSink::splitInputRowsAndEnsureWriters(RowVectorPtr /* input */) {
VELOX_CHECK(isPartitioned() || isBucketed());
if (isBucketed() && isPartitioned()) {
VELOX_CHECK_EQ(bucketIds_.size(), partitionIds_.size());
Expand Down Expand Up @@ -844,6 +825,30 @@ void HiveDataSink::splitInputRowsAndEnsureWriters() {
}
}

// Returns the column indices of non-partition data columns.
std::vector<column_index_t> HiveDataSink::getDataChannels() const {
std::vector<column_index_t> dataChannels;
dataChannels.reserve(inputType_->size() - partitionChannels_.size());

for (column_index_t i = 0; i < inputType_->size(); i++) {
if (std::find(partitionChannels_.cbegin(), partitionChannels_.cend(), i) ==
partitionChannels_.cend()) {
dataChannels.push_back(i);
}
}

return dataChannels;
}

std::string HiveDataSink::makePartitionDirectory(
yingsu00 marked this conversation as resolved.
Show resolved Hide resolved
const std::string& tableDirectory,
const std::optional<std::string>& partitionSubdirectory) const {
if (partitionSubdirectory.has_value()) {
return fs::path(tableDirectory) / partitionSubdirectory.value();
}
return tableDirectory;
}

HiveWriterParameters HiveDataSink::getWriterParameters(
const std::optional<std::string>& partition,
std::optional<uint32_t> bucketId) const {
Expand Down
18 changes: 13 additions & 5 deletions velox/connectors/hive/HiveDataSink.h
Original file line number Diff line number Diff line change
Expand Up @@ -451,7 +451,7 @@ class HiveDataSink : public DataSink {

bool canReclaim() const;

private:
protected:
enum class State { kRunning = 0, kAborted = 1, kClosed = 2 };
friend struct fmt::formatter<
facebook::velox::connector::hive::HiveDataSink::State>;
Expand Down Expand Up @@ -528,13 +528,13 @@ class HiveDataSink : public DataSink {

// Get the HiveWriter corresponding to the row
// from partitionIds and bucketIds.
FOLLY_ALWAYS_INLINE HiveWriterId getWriterId(size_t row) const;
HiveWriterId getWriterId(size_t row) const;

// Computes the number of input rows as well as the actual input row indices
// to each corresponding (bucketed) partition based on the partition and
// bucket ids calculated by 'computePartitionAndBucketIds'. The function also
// ensures that there is a writer created for each (bucketed) partition.
void splitInputRowsAndEnsureWriters();
virtual void splitInputRowsAndEnsureWriters(RowVectorPtr input);

// Makes sure to create one writer for the given writer id. The function
// returns the corresponding index in 'writers_'.
Expand All @@ -548,10 +548,18 @@ class HiveDataSink : public DataSink {
maybeCreateBucketSortWriter(
std::unique_ptr<facebook::velox::dwio::common::Writer> writer);

virtual void extendBuffersForPartitionedTables();

virtual std::string makePartitionDirectory(
const std::string& tableDirectory,
const std::optional<std::string>& partitionSubdirectory) const;

HiveWriterParameters getWriterParameters(
const std::optional<std::string>& partition,
std::optional<uint32_t> bucketId) const;

virtual std::vector<column_index_t> getDataChannels() const;

// Gets write and target file names for a writer based on the table commit
// strategy as well as table partitioned type. If commit is not required, the
// write file and target file has the same name. If not, add a temp file
Expand Down Expand Up @@ -582,13 +590,13 @@ class HiveDataSink : public DataSink {
const uint32_t maxOpenWriters_;
const std::vector<column_index_t> partitionChannels_;
const std::unique_ptr<PartitionIdGenerator> partitionIdGenerator_;
// Indices of dataChannel are stored in ascending order
const std::vector<column_index_t> dataChannels_;
const int32_t bucketCount_{0};
const std::unique_ptr<core::PartitionFunction> bucketFunction_;
const std::shared_ptr<dwio::common::WriterFactory> writerFactory_;
const common::SpillConfig* const spillConfig_;

// Indices of dataChannel are stored in ascending order
std::vector<column_index_t> dataChannels_;
std::vector<column_index_t> sortColumnIndices_;
std::vector<CompareFlags> sortCompareFlags_;

Expand Down
11 changes: 7 additions & 4 deletions velox/connectors/hive/iceberg/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,14 @@
# See the License for the specific language governing permissions and
# limitations under the License.

velox_add_library(velox_hive_iceberg_splitreader IcebergSplitReader.cpp
IcebergSplit.cpp PositionalDeleteFileReader.cpp)
velox_add_library(
velox_hive_iceberg_connector
IcebergSplitReader.cpp
IcebergSplit.cpp
PositionalDeleteFileReader.cpp
IcebergDataSink.cpp)

velox_link_libraries(velox_hive_iceberg_splitreader velox_connector
Folly::folly)
velox_link_libraries(velox_hive_iceberg_connector velox_connector Folly::folly)

if(${VELOX_BUILD_TESTING})
add_subdirectory(tests)
Expand Down
Loading