diff --git a/velox/connectors/hive/CMakeLists.txt b/velox/connectors/hive/CMakeLists.txt index 8beee704f79e..121185e81406 100644 --- a/velox/connectors/hive/CMakeLists.txt +++ b/velox/connectors/hive/CMakeLists.txt @@ -33,7 +33,7 @@ velox_add_library( velox_link_libraries( velox_hive_connector - PUBLIC velox_hive_iceberg_splitreader + PUBLIC velox_hive_iceberg_connector PRIVATE velox_common_io velox_connector diff --git a/velox/connectors/hive/HiveConnector.cpp b/velox/connectors/hive/HiveConnector.cpp index 4b1a5c608b28..4583db218461 100644 --- a/velox/connectors/hive/HiveConnector.cpp +++ b/velox/connectors/hive/HiveConnector.cpp @@ -21,6 +21,7 @@ #include "velox/connectors/hive/HiveDataSink.h" #include "velox/connectors/hive/HiveDataSource.h" #include "velox/connectors/hive/HivePartitionFunction.h" +#include "velox/connectors/hive/iceberg/IcebergDataSink.h" #include "velox/dwio/dwrf/RegisterDwrfReader.h" #include "velox/dwio/dwrf/RegisterDwrfWriter.h" @@ -88,16 +89,31 @@ std::unique_ptr HiveConnector::createDataSink( std::shared_ptr connectorInsertTableHandle, ConnectorQueryCtx* connectorQueryCtx, CommitStrategy commitStrategy) { - auto hiveInsertHandle = std::dynamic_pointer_cast( - connectorInsertTableHandle); - VELOX_CHECK_NOT_NULL( - hiveInsertHandle, "Hive connector expecting hive write handle!"); - return std::make_unique( - inputType, - hiveInsertHandle, - connectorQueryCtx, - commitStrategy, - hiveConfig_); + if (auto icebergInsertHandle = + std::dynamic_pointer_cast( + connectorInsertTableHandle)) { + VELOX_CHECK_NOT_NULL( + icebergInsertHandle, + "Hive Iceberg connector expecting iceberg write handle!"); + return std::make_unique( + inputType, + icebergInsertHandle, + connectorQueryCtx, + commitStrategy, + hiveConfig_); + } else { + auto hiveInsertHandle = std::dynamic_pointer_cast( + connectorInsertTableHandle); + + VELOX_CHECK_NOT_NULL( + hiveInsertHandle, "Hive connector expecting hive write handle!"); + return std::make_unique( + inputType, + hiveInsertHandle, + connectorQueryCtx, + commitStrategy, + hiveConfig_); + } } std::unique_ptr HivePartitionFunctionSpec::create( diff --git a/velox/connectors/hive/HiveDataSink.cpp b/velox/connectors/hive/HiveDataSink.cpp index 4c75432b6400..63da43d8f558 100644 --- a/velox/connectors/hive/HiveDataSink.cpp +++ b/velox/connectors/hive/HiveDataSink.cpp @@ -89,32 +89,6 @@ std::vector getPartitionChannels( return channels; } -// Returns the column indices of non-partition data columns. -std::vector getNonPartitionChannels( - const std::vector& partitionChannels, - const column_index_t childrenSize) { - std::vector dataChannels; - dataChannels.reserve(childrenSize - partitionChannels.size()); - - for (column_index_t i = 0; i < childrenSize; i++) { - if (std::find(partitionChannels.cbegin(), partitionChannels.cend(), i) == - partitionChannels.cend()) { - dataChannels.push_back(i); - } - } - - return dataChannels; -} - -std::string makePartitionDirectory( - const std::string& tableDirectory, - const std::optional& partitionSubdirectory) { - if (partitionSubdirectory.has_value()) { - return fs::path(tableDirectory) / partitionSubdirectory.value(); - } - return tableDirectory; -} - std::string makeUuid() { return boost::lexical_cast(boost::uuids::random_generator()()); } @@ -374,8 +348,7 @@ HiveDataSink::HiveDataSink( hiveConfig_->isPartitionPathAsLowerCase( connectorQueryCtx->sessionProperties())) : nullptr), - dataChannels_( - getNonPartitionChannels(partitionChannels_, inputType_->size())), + dataChannels_(getDataChannels(partitionChannels_, inputType_->size())), bucketCount_( insertTableHandle_->bucketProperty() == nullptr ? 0 @@ -844,6 +817,32 @@ void HiveDataSink::splitInputRowsAndEnsureWriters() { } } +// Returns the column indices of non-partition data columns. +std::vector HiveDataSink::getDataChannels( + const std::vector& partitionChannels, + const column_index_t childrenSize) const { + std::vector dataChannels; + dataChannels.reserve(childrenSize - partitionChannels.size()); + + for (column_index_t i = 0; i < childrenSize; i++) { + if (std::find(partitionChannels.cbegin(), partitionChannels.cend(), i) == + partitionChannels.cend()) { + dataChannels.push_back(i); + } + } + + return dataChannels; +} + +std::string HiveDataSink::makePartitionDirectory( + const std::string& tableDirectory, + const std::optional& partitionSubdirectory) const { + if (partitionSubdirectory.has_value()) { + return fs::path(tableDirectory) / partitionSubdirectory.value(); + } + return tableDirectory; +} + HiveWriterParameters HiveDataSink::getWriterParameters( const std::optional& partition, std::optional bucketId) const { diff --git a/velox/connectors/hive/HiveDataSink.h b/velox/connectors/hive/HiveDataSink.h index 1b3d3bd464f4..4b867ab5d41b 100644 --- a/velox/connectors/hive/HiveDataSink.h +++ b/velox/connectors/hive/HiveDataSink.h @@ -451,7 +451,7 @@ class HiveDataSink : public DataSink { bool canReclaim() const; - private: + protected: enum class State { kRunning = 0, kAborted = 1, kClosed = 2 }; friend struct fmt::formatter< facebook::velox::connector::hive::HiveDataSink::State>; @@ -528,7 +528,7 @@ class HiveDataSink : public DataSink { // Get the HiveWriter corresponding to the row // from partitionIds and bucketIds. - FOLLY_ALWAYS_INLINE HiveWriterId getWriterId(size_t row) const; + HiveWriterId getWriterId(size_t row) const; // Computes the number of input rows as well as the actual input row indices // to each corresponding (bucketed) partition based on the partition and @@ -548,10 +548,18 @@ class HiveDataSink : public DataSink { maybeCreateBucketSortWriter( std::unique_ptr writer); + virtual std::string makePartitionDirectory( + const std::string& tableDirectory, + const std::optional& partitionSubdirectory) const; + HiveWriterParameters getWriterParameters( const std::optional& partition, std::optional bucketId) const; + virtual std::vector getDataChannels( + const std::vector& partitionChannels, + const column_index_t childrenSize) const; + // Gets write and target file names for a writer based on the table commit // strategy as well as table partitioned type. If commit is not required, the // write file and target file has the same name. If not, add a temp file @@ -569,7 +577,7 @@ class HiveDataSink : public DataSink { } // Invoked to write 'input' to the specified file writer. - void write(size_t index, RowVectorPtr input); + virtual void write(size_t index, RowVectorPtr input); void closeInternal(); @@ -582,13 +590,13 @@ class HiveDataSink : public DataSink { const uint32_t maxOpenWriters_; const std::vector partitionChannels_; const std::unique_ptr partitionIdGenerator_; - // Indices of dataChannel are stored in ascending order - const std::vector dataChannels_; const int32_t bucketCount_{0}; const std::unique_ptr bucketFunction_; const std::shared_ptr writerFactory_; const common::SpillConfig* const spillConfig_; + // Indices of dataChannel are stored in ascending order + std::vector dataChannels_; std::vector sortColumnIndices_; std::vector sortCompareFlags_; diff --git a/velox/connectors/hive/iceberg/CMakeLists.txt b/velox/connectors/hive/iceberg/CMakeLists.txt index bc78005c91bb..43e38f594998 100644 --- a/velox/connectors/hive/iceberg/CMakeLists.txt +++ b/velox/connectors/hive/iceberg/CMakeLists.txt @@ -12,11 +12,14 @@ # See the License for the specific language governing permissions and # limitations under the License. -velox_add_library(velox_hive_iceberg_splitreader IcebergSplitReader.cpp - IcebergSplit.cpp PositionalDeleteFileReader.cpp) +velox_add_library( + velox_hive_iceberg_connector + IcebergSplitReader.cpp + IcebergSplit.cpp + PositionalDeleteFileReader.cpp + IcebergDataSink.cpp) -velox_link_libraries(velox_hive_iceberg_splitreader velox_connector - Folly::folly) +velox_link_libraries(velox_hive_iceberg_connector velox_connector Folly::folly) if(${VELOX_BUILD_TESTING}) add_subdirectory(tests) diff --git a/velox/connectors/hive/iceberg/IcebergDataSink.cpp b/velox/connectors/hive/iceberg/IcebergDataSink.cpp new file mode 100644 index 000000000000..2c5e52239459 --- /dev/null +++ b/velox/connectors/hive/iceberg/IcebergDataSink.cpp @@ -0,0 +1,197 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/connectors/hive/iceberg/IcebergDataSink.h" + +#include +#include "velox/common/base/Counters.h" +#include "velox/common/base/Fs.h" +#include "velox/connectors/hive/HiveConfig.h" +#include "velox/connectors/hive/HivePartitionFunction.h" +#include "velox/connectors/hive/TableHandle.h" +#include "velox/core/ITypedExpr.h" +#include "velox/exec/OperatorUtils.h" + +namespace facebook::velox::connector::hive::iceberg { + +namespace { +#define WRITER_NON_RECLAIMABLE_SECTION_GUARD(index) \ + memory::NonReclaimableSectionGuard nonReclaimableGuard( \ + writerInfo_[(index)]->nonReclaimableSectionHolder.get()) +} // namespace + +IcebergDataSink::IcebergDataSink( + facebook::velox::RowTypePtr inputType, + std::shared_ptr insertTableHandle, + const facebook::velox::connector::ConnectorQueryCtx* connectorQueryCtx, + facebook::velox::connector::CommitStrategy commitStrategy, + const std::shared_ptr& hiveConfig) + : HiveDataSink( + std::move(inputType), + std::move(insertTableHandle), + connectorQueryCtx, + commitStrategy, + hiveConfig) {} + +void IcebergDataSink::appendData(facebook::velox::RowVectorPtr input) { + checkRunning(); + + // Write to unpartitioned table. + if (!isPartitioned()) { + const auto index = ensureWriter(HiveWriterId::unpartitionedId()); + write(index, input); + return; + } + + dataChannels_ = getDataChannels(partitionChannels_, inputType_->size()); + + // Compute partition and bucket numbers. + computePartitionAndBucketIds(input); + + // Lazy load all the input columns. + for (column_index_t i = 0; i < input->childrenSize(); ++i) { + input->childAt(i)->loadedVector(); + } + + // All inputs belong to a single non-bucketed partition. The partition id + // must be zero. + if (!isBucketed() && partitionIdGenerator_->numPartitions() == 1) { + const auto index = ensureWriter(HiveWriterId{0}); + write(index, input); + return; + } + + splitInputRowsAndEnsureWriters(); + + partitionData_.reserve(writers_.size()); + + const auto numRows = partitionIds_.size(); + + for (auto row = 0; row < numRows; ++row) { + auto id = getWriterId(row); + auto it = writerIndexMap_.find(id); + uint32_t index = -1; + if (it != writerIndexMap_.end()) { + index = it->second; + } + + if (partitionData_[index] != nullptr) { + continue; + } + + std::vector partitionValues; + partitionValues.reserve(partitionChannels_.size()); + + auto icebergInsertTableHandle = + std::dynamic_pointer_cast( + insertTableHandle_); + + for (auto i = 0; i < partitionChannels_.size(); ++i) { + auto type = + icebergInsertTableHandle->inputColumns()[partitionChannels_[i]] + ->dataType(); + auto block = input->childAt(partitionChannels_[i]); + partitionValues.insert(partitionValues.begin() + i, block->toString(row)); + } + + partitionData_.insert( + partitionData_.begin() + index, + std::make_shared(partitionValues)); + } + + for (auto index = 0; index < writers_.size(); ++index) { + const vector_size_t partitionSize = partitionSizes_[index]; + if (partitionSize == 0) { + continue; + } + + RowVectorPtr writerInput = partitionSize == input->size() + ? input + : exec::wrap(partitionSize, partitionRows_[index], input); + + write(index, writerInput); + } +} + +void IcebergDataSink::write(size_t index, RowVectorPtr input) { + WRITER_NON_RECLAIMABLE_SECTION_GUARD(index); + + writers_[index]->write(input); + writerInfo_[index]->numWrittenRows += input->size(); +} + +std::vector IcebergDataSink::close() { + checkRunning(); + state_ = State::kClosed; + closeInternal(); + + auto icebergInsertTableHandle = + std::dynamic_pointer_cast( + insertTableHandle_); + + std::vector commitTasks; + commitTasks.reserve(writerInfo_.size()); + auto fileFormat = toString(insertTableHandle_->tableStorageFormat()); + std::string finalFileFormat(fileFormat); + std::transform( + finalFileFormat.begin(), + finalFileFormat.end(), + finalFileFormat.begin(), + ::toupper); + + for (int i = 0; i < writerInfo_.size(); ++i) { + const auto& info = writerInfo_.at(i); + VELOX_CHECK_NOT_NULL(info); + // TODO(imjalpreet): Collect Iceberg file format metrics required by + // com.facebook.presto.iceberg.MetricsWrapper->org.apache.iceberg.Metrics#Metrics + // clang-format off + auto commitDataJson = folly::toJson( + folly::dynamic::object + ("path", info->writerParameters.writeDirectory() + "/" + info->writerParameters.writeFileName()) + ("fileSizeInBytes", ioStats_.at(i)->rawBytesWritten()) + ("metrics", folly::dynamic::object + ("recordCount", info->numWrittenRows)) + ("partitionSpecId", icebergInsertTableHandle->partitionSpec()->specId) + ("partitionDataJson", partitionData_[i]->toJson()) + ("fileFormat", finalFileFormat) + ("referencedDataFile", nullptr)); + // clang-format on + commitTasks.push_back(commitDataJson); + } + return commitTasks; +} + +std::string IcebergDataSink::makePartitionDirectory( + const std::string& tableDirectory, + const std::optional& partitionSubdirectory) const { + std::string tableLocation = fmt::format("{}/data", tableDirectory); + if (partitionSubdirectory.has_value()) { + return fs::path(tableLocation) / partitionSubdirectory.value(); + } + return tableLocation; +} + +// Returns the column indices of data columns. +std::vector IcebergDataSink::getDataChannels( + const std::vector& partitionChannels, + const column_index_t childrenSize) const { + // Create a vector of all possible channels + std::vector dataChannels(childrenSize); + std::iota(dataChannels.begin(), dataChannels.end(), 0); + + return dataChannels; +} +} // namespace facebook::velox::connector::hive::iceberg diff --git a/velox/connectors/hive/iceberg/IcebergDataSink.h b/velox/connectors/hive/iceberg/IcebergDataSink.h new file mode 100644 index 000000000000..405be5d87e66 --- /dev/null +++ b/velox/connectors/hive/iceberg/IcebergDataSink.h @@ -0,0 +1,192 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include + +#include "velox/common/compression/Compression.h" +#include "velox/connectors/Connector.h" +#include "velox/connectors/hive/HiveConfig.h" +#include "velox/connectors/hive/HiveDataSink.h" +#include "velox/connectors/hive/PartitionIdGenerator.h" +#include "velox/dwio/common/Options.h" +#include "velox/dwio/common/Writer.h" +#include "velox/dwio/common/WriterFactory.h" +#include "velox/exec/MemoryReclaimer.h" + +namespace facebook::velox::connector::hive::iceberg { + +struct VeloxIcebergNestedField { + bool optional; + const int32_t id; + const std::string name; + TypePtr prestoType; + std::shared_ptr doc; + + VeloxIcebergNestedField( + bool _optional, + int32_t _id, + const std::string& _name, + TypePtr _prestoType, + std::shared_ptr _doc) + : optional(_optional), + id(_id), + name(_name), + prestoType(std::move(_prestoType)), + doc(std::move(_doc)) {} +}; + +struct VeloxIcebergSchema { + const int32_t schemaId; + std::vector> columns; + std::unordered_map columnNameToIdMapping; + std::unordered_map aliases; + std::vector identifierFieldIds; + + VeloxIcebergSchema( + int32_t _schemaId, + std::vector> _columns, + const std::unordered_map& + _columnNameToIdMapping, + const std::unordered_map& _aliases, + std::vector _identifierFieldIds) + : schemaId(_schemaId), + columns(_columns), + columnNameToIdMapping(_columnNameToIdMapping), + aliases(_aliases), + identifierFieldIds(_identifierFieldIds) {} +}; + +struct VeloxIcebergPartitionSpec { + const int32_t specId; + std::shared_ptr schema; + std::vector fields; + + VeloxIcebergPartitionSpec( + int32_t _specId, + std::shared_ptr _schema, + std::vector _fields) + : specId(_specId), schema(_schema), fields(_fields) {} +}; + +/** + * Represents a request for Iceberg write. + */ +class IcebergInsertTableHandle : public HiveInsertTableHandle { + public: + IcebergInsertTableHandle( + std::vector> inputColumns, + std::shared_ptr locationHandle, + std::shared_ptr schema, + std::shared_ptr partitionSpec, + dwio::common::FileFormat tableStorageFormat = + dwio::common::FileFormat::PARQUET, + std::shared_ptr bucketProperty = nullptr, + std::optional compressionKind = {}, + const std::unordered_map& serdeParameters = {}) + : HiveInsertTableHandle( + std::move(inputColumns), + std::move(locationHandle), + tableStorageFormat, + std::move(bucketProperty), + compressionKind, + serdeParameters), + schema_(std::move(schema)), + partitionSpec_(std::move(partitionSpec)) {} + + virtual ~IcebergInsertTableHandle() = default; + + std::shared_ptr schema() const { + return schema_; + } + + std::shared_ptr partitionSpec() const { + return partitionSpec_; + } + + private: + std::shared_ptr schema_; + std::shared_ptr partitionSpec_; +}; + +class PartitionData { + private: + std::vector partitionValues; + const std::string PARTITION_VALUES_FIELD = "partitionValues"; + + public: + PartitionData(const std::vector& partitionValues) + : partitionValues(partitionValues) { + if (partitionValues.empty()) { + throw std::invalid_argument("partitionValues is null or empty"); + } + } + + int size() const { + return partitionValues.size(); + } + + // Convert to JSON + std::string toJson() const { + try { + folly::dynamic jsonObject = folly::dynamic::object(); + folly::dynamic valuesArray = folly::dynamic::array(); + + for (const auto& value : partitionValues) { + valuesArray.push_back(value); // Directly use the string values + } + + jsonObject[PARTITION_VALUES_FIELD] = valuesArray; + return folly::toJson(jsonObject); // Convert dynamic object to JSON string + } catch (const std::exception& e) { + throw std::runtime_error( + "JSON conversion failed for PartitionData: " + std::string(e.what())); + } + } +}; + +class IcebergDataSink : public HiveDataSink { + public: + IcebergDataSink( + RowTypePtr inputType, + std::shared_ptr insertTableHandle, + const ConnectorQueryCtx* connectorQueryCtx, + CommitStrategy commitStrategy, + const std::shared_ptr& hiveConfig); + + void appendData(RowVectorPtr input) override; + + std::vector close() override; + + protected: + void write(size_t index, RowVectorPtr input) override; + + // Below are structures for partitions from all inputs. partitionData_ + // is indexed by partitionId. + std::vector> partitionData_; + + private: + std::string makePartitionDirectory( + const std::string& tableDirectory, + const std::optional& partitionSubdirectory) const override; + + std::vector getDataChannels( + const std::vector& partitionChannels, + const column_index_t childrenSize) const override; +}; + +} // namespace facebook::velox::connector::hive::iceberg diff --git a/velox/connectors/hive/iceberg/tests/CMakeLists.txt b/velox/connectors/hive/iceberg/tests/CMakeLists.txt index 5808b640af64..64df9e684bab 100644 --- a/velox/connectors/hive/iceberg/tests/CMakeLists.txt +++ b/velox/connectors/hive/iceberg/tests/CMakeLists.txt @@ -45,7 +45,7 @@ if(NOT VELOX_DISABLE_GOOGLETEST) velox_hive_iceberg_test velox_dwio_iceberg_reader_benchmark_lib velox_hive_connector - velox_hive_iceberg_splitreader + velox_hive_iceberg_connector velox_hive_partition_function velox_dwio_common_exception velox_dwio_common_test_utils