Skip to content

Commit

Permalink
[Temp Commit] Refactor HiveDataSink and IcebergDataSink
Browse files Browse the repository at this point in the history
  • Loading branch information
imjalpreet committed Nov 14, 2024
1 parent ff22387 commit e3aa199
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 89 deletions.
21 changes: 11 additions & 10 deletions velox/connectors/hive/HiveDataSink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -359,8 +359,7 @@ HiveDataSink::HiveDataSink(
: nullptr),
writerFactory_(dwio::common::getWriterFactory(
insertTableHandle_->tableStorageFormat())),
spillConfig_(connectorQueryCtx->spillConfig()),
dataChannels_(getDataChannels(partitionChannels_, inputType_->size())) {
spillConfig_(connectorQueryCtx->spillConfig()) {
if (isBucketed()) {
VELOX_USER_CHECK_LT(
bucketCount_, maxBucketCount(), "bucketCount exceeds the limit");
Expand Down Expand Up @@ -411,6 +410,10 @@ void HiveDataSink::appendData(RowVectorPtr input) {
return;
}

if (dataChannels_.empty()) {
dataChannels_ = getDataChannels();
}

// Compute partition and bucket numbers.
computePartitionAndBucketIds(input);

Expand Down Expand Up @@ -786,7 +789,7 @@ HiveWriterId HiveDataSink::getWriterId(size_t row) const {
return HiveWriterId{partitionId, bucketId};
}

void HiveDataSink::splitInputRowsAndEnsureWriters(RowVectorPtr input) {
void HiveDataSink::splitInputRowsAndEnsureWriters(RowVectorPtr /* input */) {
VELOX_CHECK(isPartitioned() || isBucketed());
if (isBucketed() && isPartitioned()) {
VELOX_CHECK_EQ(bucketIds_.size(), partitionIds_.size());
Expand Down Expand Up @@ -823,15 +826,13 @@ void HiveDataSink::splitInputRowsAndEnsureWriters(RowVectorPtr input) {
}

// Returns the column indices of non-partition data columns.
std::vector<column_index_t> HiveDataSink::getDataChannels(
const std::vector<column_index_t>& partitionChannels,
const column_index_t childrenSize) const {
std::vector<column_index_t> HiveDataSink::getDataChannels() const {
std::vector<column_index_t> dataChannels;
dataChannels.reserve(childrenSize - partitionChannels.size());
dataChannels.reserve(inputType_->size() - partitionChannels_.size());

for (column_index_t i = 0; i < childrenSize; i++) {
if (std::find(partitionChannels.cbegin(), partitionChannels.cend(), i) ==
partitionChannels.cend()) {
for (column_index_t i = 0; i < inputType_->size(); i++) {
if (std::find(partitionChannels_.cbegin(), partitionChannels_.cend(), i) ==
partitionChannels_.cend()) {
dataChannels.push_back(i);
}
}
Expand Down
6 changes: 2 additions & 4 deletions velox/connectors/hive/HiveDataSink.h
Original file line number Diff line number Diff line change
Expand Up @@ -558,9 +558,7 @@ class HiveDataSink : public DataSink {
const std::optional<std::string>& partition,
std::optional<uint32_t> bucketId) const;

virtual std::vector<column_index_t> getDataChannels(
const std::vector<column_index_t>& partitionChannels,
const column_index_t childrenSize) const;
virtual std::vector<column_index_t> getDataChannels() const;

// Gets write and target file names for a writer based on the table commit
// strategy as well as table partitioned type. If commit is not required, the
Expand All @@ -579,7 +577,7 @@ class HiveDataSink : public DataSink {
}

// Invoked to write 'input' to the specified file writer.
virtual void write(size_t index, RowVectorPtr input);
void write(size_t index, RowVectorPtr input);

void closeInternal();

Expand Down
74 changes: 6 additions & 68 deletions velox/connectors/hive/iceberg/IcebergDataSink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,57 +41,6 @@ IcebergDataSink::IcebergDataSink(
commitStrategy,
hiveConfig) {}

void IcebergDataSink::appendData(facebook::velox::RowVectorPtr input) {
checkRunning();

// Write to unpartitioned table.
if (!isPartitioned()) {
const auto index = ensureWriter(HiveWriterId::unpartitionedId());
write(index, input);
return;
}

dataChannels_ = getDataChannels(partitionChannels_, inputType_->size());

// Compute partition and bucket numbers.
computePartitionAndBucketIds(input);

// Lazy load all the input columns.
for (column_index_t i = 0; i < input->childrenSize(); ++i) {
input->childAt(i)->loadedVector();
}

// All inputs belong to a single non-bucketed partition. The partition id
// must be zero.
if (!isBucketed() && partitionIdGenerator_->numPartitions() == 1) {
const auto index = ensureWriter(HiveWriterId{0});
write(index, input);
return;
}

splitInputRowsAndEnsureWriters(input);

for (auto index = 0; index < writers_.size(); ++index) {
const vector_size_t partitionSize = partitionSizes_[index];
if (partitionSize == 0) {
continue;
}

RowVectorPtr writerInput = partitionSize == input->size()
? input
: exec::wrap(partitionSize, partitionRows_[index], input);

write(index, writerInput);
}
}

void IcebergDataSink::write(size_t index, RowVectorPtr input) {
WRITER_NON_RECLAIMABLE_SECTION_GUARD(index);

writers_[index]->write(input);
writerInfo_[index]->numWrittenRows += input->size();
}

std::vector<std::string> IcebergDataSink::close() {
checkRunning();
state_ = State::kClosed;
Expand Down Expand Up @@ -160,24 +109,18 @@ void IcebergDataSink::splitInputRowsAndEnsureWriters(RowVectorPtr input) {
continue;
}

std::vector<std::string> partitionValues;
partitionValues.reserve(partitionChannels_.size());
std::vector<std::string> partitionValues(partitionChannels_.size());

auto icebergInsertTableHandle =
std::dynamic_pointer_cast<const IcebergInsertTableHandle>(
insertTableHandle_);

for (auto i = 0; i < partitionChannels_.size(); ++i) {
auto type =
icebergInsertTableHandle->inputColumns()[partitionChannels_[i]]
->dataType();
auto block = input->childAt(partitionChannels_[i]);
partitionValues.insert(partitionValues.begin() + i, block->toString(row));
partitionValues[i] = block->toString(row);
}

partitionData_.insert(
partitionData_.begin() + index,
std::make_shared<PartitionData>(partitionValues));
partitionData_[index] = std::make_shared<PartitionData>(partitionValues);
}

for (uint32_t i = 0; i < partitionSizes_.size(); ++i) {
Expand All @@ -189,10 +132,7 @@ void IcebergDataSink::splitInputRowsAndEnsureWriters(RowVectorPtr input) {
}

void IcebergDataSink::extendBuffersForPartitionedTables() {
// Extends the buffer used for partition rows calculations.
partitionSizes_.emplace_back(0);
partitionRows_.emplace_back(nullptr);
rawPartitionRows_.emplace_back(nullptr);
HiveDataSink::extendBuffersForPartitionedTables();

// Extend the buffer used for partitionData
partitionData_.emplace_back(nullptr);
Expand All @@ -209,11 +149,9 @@ std::string IcebergDataSink::makePartitionDirectory(
}

// Returns the column indices of data columns.
std::vector<column_index_t> IcebergDataSink::getDataChannels(
const std::vector<column_index_t>& partitionChannels,
const column_index_t childrenSize) const {
std::vector<column_index_t> IcebergDataSink::getDataChannels() const {
// Create a vector of all possible channels
std::vector<column_index_t> dataChannels(childrenSize);
std::vector<column_index_t> dataChannels(inputType_->size());
std::iota(dataChannels.begin(), dataChannels.end(), 0);

return dataChannels;
Expand Down
8 changes: 1 addition & 7 deletions velox/connectors/hive/iceberg/IcebergDataSink.h
Original file line number Diff line number Diff line change
Expand Up @@ -158,13 +158,9 @@ class IcebergDataSink : public HiveDataSink {
CommitStrategy commitStrategy,
const std::shared_ptr<const HiveConfig>& hiveConfig);

void appendData(RowVectorPtr input) override;

std::vector<std::string> close() override;

protected:
void write(size_t index, RowVectorPtr input) override;

// Below are structures for partitions from all inputs. partitionData_
// is indexed by partitionId.
std::vector<std::shared_ptr<PartitionData>> partitionData_;
Expand All @@ -178,9 +174,7 @@ class IcebergDataSink : public HiveDataSink {
const std::string& tableDirectory,
const std::optional<std::string>& partitionSubdirectory) const override;

std::vector<column_index_t> getDataChannels(
const std::vector<column_index_t>& partitionChannels,
const column_index_t childrenSize) const override;
std::vector<column_index_t> getDataChannels() const override;
};

} // namespace facebook::velox::connector::hive::iceberg

0 comments on commit e3aa199

Please sign in to comment.