From f8953d552627ef442da3268bb5c86bd6b1936891 Mon Sep 17 00:00:00 2001 From: mwish Date: Tue, 3 Sep 2024 11:32:01 +0800 Subject: [PATCH] RowEncoder: remove calling for AddLength if all column are fixed --- cpp/src/arrow/acero/hash_join_dict.cc | 2 +- .../arrow/compute/row/row_encoder_internal.cc | 31 +++++++++++ .../arrow/compute/row/row_encoder_internal.h | 51 +++++++++++++++++-- 3 files changed, 80 insertions(+), 4 deletions(-) diff --git a/cpp/src/arrow/acero/hash_join_dict.cc b/cpp/src/arrow/acero/hash_join_dict.cc index 8db9dddb2c3a0..e3b3b379b045e 100644 --- a/cpp/src/arrow/acero/hash_join_dict.cc +++ b/cpp/src/arrow/acero/hash_join_dict.cc @@ -266,7 +266,7 @@ Status HashJoinDictBuild::Init(ExecContext* ctx, std::shared_ptr dictiona auto iter = hash_table_.find(str); if (iter == hash_table_.end()) { - hash_table_.insert(std::make_pair(str, num_entries)); + hash_table_.insert(std::make_pair(std::move(str), num_entries)); ids[i] = num_entries; entries_to_take.push_back(static_cast(i)); ++num_entries; diff --git a/cpp/src/arrow/compute/row/row_encoder_internal.cc b/cpp/src/arrow/compute/row/row_encoder_internal.cc index 0965e4e8f9571..de89f30fd010d 100644 --- a/cpp/src/arrow/compute/row/row_encoder_internal.cc +++ b/cpp/src/arrow/compute/row/row_encoder_internal.cc @@ -305,6 +305,18 @@ void RowEncoder::Init(const std::vector& column_types, ExecContext* ARROW_DCHECK(false); } + int32_t fixed_length_accum = 0; + for (size_t i = 0; i < column_types.size(); ++i) { + auto encoder_info = encoders_[i]->GetEncoderInfo(); + if (!encoder_info.is_fixed_width) { + fixed_length_accum = kInvalidFixedWidthOffset; + break; + } else { + fixed_length_accum += encoder_info.fixed_width; + } + } + this->fixed_width_length_ = fixed_length_accum; + int32_t total_length = 0; for (size_t i = 0; i < column_types.size(); ++i) { encoders_[i]->AddLengthNull(&total_length); @@ -321,7 +333,26 @@ void RowEncoder::Clear() { bytes_.clear(); } +Status RowEncoder::EncodeAndAppendForFixedWidth(const ExecSpan& batch) { + // TODO(mwish): debug check AddLength accumulates the fixed_width_length_ correctly. + size_t length_before = + static_cast(this->fixed_width_length_) * this->fixed_with_row_count_; + bytes_.resize(length_before + batch.length * this->fixed_width_length_); + std::vector buf_ptrs(batch.length); + for (int64_t i = 0; i < batch.length; ++i) { + buf_ptrs[i] = bytes_.data() + length_before + i * fixed_width_length_; + } + fixed_with_row_count_ += batch.length; + for (int i = 0; i < batch.num_values(); ++i) { + RETURN_NOT_OK(encoders_[i]->Encode(batch[i], batch.length, buf_ptrs.data())); + } + return Status::OK(); +} + Status RowEncoder::EncodeAndAppend(const ExecSpan& batch) { + if (fixed_width_length_ != kInvalidFixedWidthOffset) { + return EncodeAndAppendForFixedWidth(batch); + } if (offsets_.empty()) { offsets_.resize(1); offsets_[0] = 0; diff --git a/cpp/src/arrow/compute/row/row_encoder_internal.h b/cpp/src/arrow/compute/row/row_encoder_internal.h index 4d6cc34af2342..f6611d4ab0c90 100644 --- a/cpp/src/arrow/compute/row/row_encoder_internal.h +++ b/cpp/src/arrow/compute/row/row_encoder_internal.h @@ -83,6 +83,13 @@ struct ARROW_EXPORT KeyEncoder { static bool IsNull(const uint8_t* encoded_bytes) { return encoded_bytes[0] == kNullByte; } + + struct EncoderInfo { + bool is_fixed_width; + int32_t fixed_width; + }; + + virtual EncoderInfo GetEncoderInfo() const = 0; }; struct ARROW_EXPORT BooleanKeyEncoder : KeyEncoder { @@ -99,6 +106,10 @@ struct ARROW_EXPORT BooleanKeyEncoder : KeyEncoder { Result> Decode(uint8_t** encoded_bytes, int32_t length, MemoryPool* pool) override; + + EncoderInfo GetEncoderInfo() const override { + return EncoderInfo{/*is_fixed_width=*/true, 2}; + } }; struct ARROW_EXPORT FixedWidthKeyEncoder : KeyEncoder { @@ -118,6 +129,10 @@ struct ARROW_EXPORT FixedWidthKeyEncoder : KeyEncoder { Result> Decode(uint8_t** encoded_bytes, int32_t length, MemoryPool* pool) override; + EncoderInfo GetEncoderInfo() const override { + return EncoderInfo{/*is_fixed_width=*/true, byte_width_}; + } + std::shared_ptr type_; const int byte_width_; }; @@ -132,6 +147,8 @@ struct ARROW_EXPORT DictionaryKeyEncoder : FixedWidthKeyEncoder { Result> Decode(uint8_t** encoded_bytes, int32_t length, MemoryPool* pool) override; + EncoderInfo GetEncoderInfo() const override; + MemoryPool* pool_; std::shared_ptr dictionary_; }; @@ -248,6 +265,10 @@ struct ARROW_EXPORT VarLengthKeyEncoder : KeyEncoder { explicit VarLengthKeyEncoder(std::shared_ptr type) : type_(std::move(type)) {} + EncoderInfo GetEncoderInfo() const override { + return EncoderInfo{/*is_fixed_width=*/false, /*fixed_width=*/5}; + } + std::shared_ptr type_; }; @@ -267,6 +288,10 @@ struct ARROW_EXPORT NullKeyEncoder : KeyEncoder { MemoryPool* pool) override { return ArrayData::Make(null(), length, {NULLPTR}, length); } + + EncoderInfo GetEncoderInfo() const override { + return EncoderInfo{/*is_fixed_width=*/true, /*fixed_width=*/0}; + } }; /// RowEncoder encodes ExecSpan to a variable length byte sequence @@ -348,18 +373,38 @@ class ARROW_EXPORT RowEncoder { return std::string(reinterpret_cast(encoded_nulls_.data()), encoded_nulls_.size()); } - int32_t row_length = offsets_[i + 1] - offsets_[i]; - return std::string(reinterpret_cast(bytes_.data() + offsets_[i]), + int32_t row_length = 0; + int32_t row_offset = 0; + if (fixed_width_length_ != kInvalidFixedWidthOffset) { + row_length = fixed_width_length_; + row_offset = fixed_width_length_ * i; + } else { + row_length = offsets_[i + 1] - offsets_[i]; + row_offset = offsets_[i]; + } + return std::string(reinterpret_cast(bytes_.data() + row_offset), row_length); } int32_t num_rows() const { - return offsets_.empty() ? 0 : static_cast(offsets_.size() - 1); + if (kInvalidFixedWidthOffset == fixed_width_length_) { + return fixed_with_row_count_; + } + return offsets_.empty() ? 0 : offsets_[0]; } private: + Status EncodeAndAppendForFixedWidth(const ExecSpan& batch); + + private: + static constexpr int32_t kInvalidFixedWidthOffset = 1; ExecContext* ctx_{nullptr}; std::vector> encoders_; + // When all columns in a row are Fixed-width or NA, the encoded row + // doesn't need to maintain the column offsets. In this case, the + // offsets_.size() would be also be empty. + int32_t fixed_width_length_{kInvalidFixedWidthOffset}; + int32_t fixed_with_row_count_{0}; // offsets_ vector stores the starting position (offset) of each encoded row // within the bytes_ vector. This allows for quick access to individual rows. //