diff --git a/velox/connectors/hive/HiveDataSink.cpp b/velox/connectors/hive/HiveDataSink.cpp index 530e90fc1b99..3e776cd0e0fb 100644 --- a/velox/connectors/hive/HiveDataSink.cpp +++ b/velox/connectors/hive/HiveDataSink.cpp @@ -359,8 +359,7 @@ HiveDataSink::HiveDataSink( : nullptr), writerFactory_(dwio::common::getWriterFactory( insertTableHandle_->tableStorageFormat())), - spillConfig_(connectorQueryCtx->spillConfig()), - dataChannels_(getDataChannels(partitionChannels_, inputType_->size())) { + spillConfig_(connectorQueryCtx->spillConfig()) { if (isBucketed()) { VELOX_USER_CHECK_LT( bucketCount_, maxBucketCount(), "bucketCount exceeds the limit"); @@ -411,6 +410,10 @@ void HiveDataSink::appendData(RowVectorPtr input) { return; } + if (dataChannels_.empty()) { + dataChannels_ = getDataChannels(); + } + // Compute partition and bucket numbers. computePartitionAndBucketIds(input); @@ -786,7 +789,7 @@ HiveWriterId HiveDataSink::getWriterId(size_t row) const { return HiveWriterId{partitionId, bucketId}; } -void HiveDataSink::splitInputRowsAndEnsureWriters(RowVectorPtr input) { +void HiveDataSink::splitInputRowsAndEnsureWriters(RowVectorPtr /* input */) { VELOX_CHECK(isPartitioned() || isBucketed()); if (isBucketed() && isPartitioned()) { VELOX_CHECK_EQ(bucketIds_.size(), partitionIds_.size()); @@ -823,15 +826,13 @@ void HiveDataSink::splitInputRowsAndEnsureWriters(RowVectorPtr input) { } // Returns the column indices of non-partition data columns. -std::vector HiveDataSink::getDataChannels( - const std::vector& partitionChannels, - const column_index_t childrenSize) const { +std::vector HiveDataSink::getDataChannels() const { std::vector dataChannels; - dataChannels.reserve(childrenSize - partitionChannels.size()); + dataChannels.reserve(inputType_->size() - partitionChannels_.size()); - for (column_index_t i = 0; i < childrenSize; i++) { - if (std::find(partitionChannels.cbegin(), partitionChannels.cend(), i) == - partitionChannels.cend()) { + for (column_index_t i = 0; i < inputType_->size(); i++) { + if (std::find(partitionChannels_.cbegin(), partitionChannels_.cend(), i) == + partitionChannels_.cend()) { dataChannels.push_back(i); } } diff --git a/velox/connectors/hive/HiveDataSink.h b/velox/connectors/hive/HiveDataSink.h index 81d47d2dbd63..5b3226244c3c 100644 --- a/velox/connectors/hive/HiveDataSink.h +++ b/velox/connectors/hive/HiveDataSink.h @@ -558,9 +558,7 @@ class HiveDataSink : public DataSink { const std::optional& partition, std::optional bucketId) const; - virtual std::vector getDataChannels( - const std::vector& partitionChannels, - const column_index_t childrenSize) const; + virtual std::vector getDataChannels() const; // Gets write and target file names for a writer based on the table commit // strategy as well as table partitioned type. If commit is not required, the @@ -579,7 +577,7 @@ class HiveDataSink : public DataSink { } // Invoked to write 'input' to the specified file writer. - virtual void write(size_t index, RowVectorPtr input); + void write(size_t index, RowVectorPtr input); void closeInternal(); diff --git a/velox/connectors/hive/iceberg/IcebergDataSink.cpp b/velox/connectors/hive/iceberg/IcebergDataSink.cpp index 27eb4a601c6a..a07896704bdd 100644 --- a/velox/connectors/hive/iceberg/IcebergDataSink.cpp +++ b/velox/connectors/hive/iceberg/IcebergDataSink.cpp @@ -18,7 +18,6 @@ #include "velox/common/base/Fs.h" #include "velox/connectors/hive/TableHandle.h" -#include "velox/exec/OperatorUtils.h" namespace facebook::velox::connector::hive::iceberg { @@ -41,57 +40,6 @@ IcebergDataSink::IcebergDataSink( commitStrategy, hiveConfig) {} -void IcebergDataSink::appendData(facebook::velox::RowVectorPtr input) { - checkRunning(); - - // Write to unpartitioned table. - if (!isPartitioned()) { - const auto index = ensureWriter(HiveWriterId::unpartitionedId()); - write(index, input); - return; - } - - dataChannels_ = getDataChannels(partitionChannels_, inputType_->size()); - - // Compute partition and bucket numbers. - computePartitionAndBucketIds(input); - - // Lazy load all the input columns. - for (column_index_t i = 0; i < input->childrenSize(); ++i) { - input->childAt(i)->loadedVector(); - } - - // All inputs belong to a single non-bucketed partition. The partition id - // must be zero. - if (!isBucketed() && partitionIdGenerator_->numPartitions() == 1) { - const auto index = ensureWriter(HiveWriterId{0}); - write(index, input); - return; - } - - splitInputRowsAndEnsureWriters(input); - - for (auto index = 0; index < writers_.size(); ++index) { - const vector_size_t partitionSize = partitionSizes_[index]; - if (partitionSize == 0) { - continue; - } - - RowVectorPtr writerInput = partitionSize == input->size() - ? input - : exec::wrap(partitionSize, partitionRows_[index], input); - - write(index, writerInput); - } -} - -void IcebergDataSink::write(size_t index, RowVectorPtr input) { - WRITER_NON_RECLAIMABLE_SECTION_GUARD(index); - - writers_[index]->write(input); - writerInfo_[index]->numWrittenRows += input->size(); -} - std::vector IcebergDataSink::close() { checkRunning(); state_ = State::kClosed; @@ -160,24 +108,18 @@ void IcebergDataSink::splitInputRowsAndEnsureWriters(RowVectorPtr input) { continue; } - std::vector partitionValues; - partitionValues.reserve(partitionChannels_.size()); + std::vector partitionValues(partitionChannels_.size()); auto icebergInsertTableHandle = std::dynamic_pointer_cast( insertTableHandle_); for (auto i = 0; i < partitionChannels_.size(); ++i) { - auto type = - icebergInsertTableHandle->inputColumns()[partitionChannels_[i]] - ->dataType(); auto block = input->childAt(partitionChannels_[i]); - partitionValues.insert(partitionValues.begin() + i, block->toString(row)); + partitionValues[i] = block->toString(row); } - partitionData_.insert( - partitionData_.begin() + index, - std::make_shared(partitionValues)); + partitionData_[index] = std::make_shared(partitionValues); } for (uint32_t i = 0; i < partitionSizes_.size(); ++i) { @@ -189,10 +131,7 @@ void IcebergDataSink::splitInputRowsAndEnsureWriters(RowVectorPtr input) { } void IcebergDataSink::extendBuffersForPartitionedTables() { - // Extends the buffer used for partition rows calculations. - partitionSizes_.emplace_back(0); - partitionRows_.emplace_back(nullptr); - rawPartitionRows_.emplace_back(nullptr); + HiveDataSink::extendBuffersForPartitionedTables(); // Extend the buffer used for partitionData partitionData_.emplace_back(nullptr); @@ -209,11 +148,9 @@ std::string IcebergDataSink::makePartitionDirectory( } // Returns the column indices of data columns. -std::vector IcebergDataSink::getDataChannels( - const std::vector& partitionChannels, - const column_index_t childrenSize) const { +std::vector IcebergDataSink::getDataChannels() const { // Create a vector of all possible channels - std::vector dataChannels(childrenSize); + std::vector dataChannels(inputType_->size()); std::iota(dataChannels.begin(), dataChannels.end(), 0); return dataChannels; diff --git a/velox/connectors/hive/iceberg/IcebergDataSink.h b/velox/connectors/hive/iceberg/IcebergDataSink.h index 5aa5ce97dc35..f43325cef76b 100644 --- a/velox/connectors/hive/iceberg/IcebergDataSink.h +++ b/velox/connectors/hive/iceberg/IcebergDataSink.h @@ -114,10 +114,6 @@ class IcebergInsertTableHandle : public HiveInsertTableHandle { }; class PartitionData { - private: - std::vector partitionValues; - const std::string PARTITION_VALUES_FIELD = "partitionValues"; - public: PartitionData(const std::vector& partitionValues) : partitionValues(partitionValues) { @@ -147,6 +143,10 @@ class PartitionData { "JSON conversion failed for PartitionData: " + std::string(e.what())); } } + + private: + std::vector partitionValues; + const std::string PARTITION_VALUES_FIELD = "partitionValues"; }; class IcebergDataSink : public HiveDataSink { @@ -158,13 +158,9 @@ class IcebergDataSink : public HiveDataSink { CommitStrategy commitStrategy, const std::shared_ptr& hiveConfig); - void appendData(RowVectorPtr input) override; - std::vector close() override; protected: - void write(size_t index, RowVectorPtr input) override; - // Below are structures for partitions from all inputs. partitionData_ // is indexed by partitionId. std::vector> partitionData_; @@ -178,9 +174,7 @@ class IcebergDataSink : public HiveDataSink { const std::string& tableDirectory, const std::optional& partitionSubdirectory) const override; - std::vector getDataChannels( - const std::vector& partitionChannels, - const column_index_t childrenSize) const override; + std::vector getDataChannels() const override; }; } // namespace facebook::velox::connector::hive::iceberg