Skip to content

Commit

Permalink
RowEncoder: remove calling for AddLength if all column are fixed
Browse files Browse the repository at this point in the history
  • Loading branch information
mapleFU committed Sep 3, 2024
1 parent 4ed5a14 commit f8953d5
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 4 deletions.
2 changes: 1 addition & 1 deletion cpp/src/arrow/acero/hash_join_dict.cc
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ Status HashJoinDictBuild::Init(ExecContext* ctx, std::shared_ptr<Array> dictiona

auto iter = hash_table_.find(str);
if (iter == hash_table_.end()) {
hash_table_.insert(std::make_pair(str, num_entries));
hash_table_.insert(std::make_pair(std::move(str), num_entries));
ids[i] = num_entries;
entries_to_take.push_back(static_cast<int32_t>(i));
++num_entries;
Expand Down
31 changes: 31 additions & 0 deletions cpp/src/arrow/compute/row/row_encoder_internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,18 @@ void RowEncoder::Init(const std::vector<TypeHolder>& column_types, ExecContext*
ARROW_DCHECK(false);
}

int32_t fixed_length_accum = 0;
for (size_t i = 0; i < column_types.size(); ++i) {
auto encoder_info = encoders_[i]->GetEncoderInfo();
if (!encoder_info.is_fixed_width) {
fixed_length_accum = kInvalidFixedWidthOffset;
break;
} else {
fixed_length_accum += encoder_info.fixed_width;
}
}
this->fixed_width_length_ = fixed_length_accum;

int32_t total_length = 0;
for (size_t i = 0; i < column_types.size(); ++i) {
encoders_[i]->AddLengthNull(&total_length);
Expand All @@ -321,7 +333,26 @@ void RowEncoder::Clear() {
bytes_.clear();
}

Status RowEncoder::EncodeAndAppendForFixedWidth(const ExecSpan& batch) {
// TODO(mwish): debug check AddLength accumulates the fixed_width_length_ correctly.
size_t length_before =
static_cast<size_t>(this->fixed_width_length_) * this->fixed_with_row_count_;
bytes_.resize(length_before + batch.length * this->fixed_width_length_);
std::vector<uint8_t*> buf_ptrs(batch.length);
for (int64_t i = 0; i < batch.length; ++i) {
buf_ptrs[i] = bytes_.data() + length_before + i * fixed_width_length_;
}
fixed_with_row_count_ += batch.length;
for (int i = 0; i < batch.num_values(); ++i) {
RETURN_NOT_OK(encoders_[i]->Encode(batch[i], batch.length, buf_ptrs.data()));
}
return Status::OK();
}

Status RowEncoder::EncodeAndAppend(const ExecSpan& batch) {
if (fixed_width_length_ != kInvalidFixedWidthOffset) {
return EncodeAndAppendForFixedWidth(batch);
}
if (offsets_.empty()) {
offsets_.resize(1);
offsets_[0] = 0;
Expand Down
51 changes: 48 additions & 3 deletions cpp/src/arrow/compute/row/row_encoder_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,13 @@ struct ARROW_EXPORT KeyEncoder {
static bool IsNull(const uint8_t* encoded_bytes) {
return encoded_bytes[0] == kNullByte;
}

struct EncoderInfo {
bool is_fixed_width;
int32_t fixed_width;
};

virtual EncoderInfo GetEncoderInfo() const = 0;
};

struct ARROW_EXPORT BooleanKeyEncoder : KeyEncoder {
Expand All @@ -99,6 +106,10 @@ struct ARROW_EXPORT BooleanKeyEncoder : KeyEncoder {

Result<std::shared_ptr<ArrayData>> Decode(uint8_t** encoded_bytes, int32_t length,
MemoryPool* pool) override;

EncoderInfo GetEncoderInfo() const override {
return EncoderInfo{/*is_fixed_width=*/true, 2};
}
};

struct ARROW_EXPORT FixedWidthKeyEncoder : KeyEncoder {
Expand All @@ -118,6 +129,10 @@ struct ARROW_EXPORT FixedWidthKeyEncoder : KeyEncoder {
Result<std::shared_ptr<ArrayData>> Decode(uint8_t** encoded_bytes, int32_t length,
MemoryPool* pool) override;

EncoderInfo GetEncoderInfo() const override {
return EncoderInfo{/*is_fixed_width=*/true, byte_width_};
}

std::shared_ptr<DataType> type_;
const int byte_width_;
};
Expand All @@ -132,6 +147,8 @@ struct ARROW_EXPORT DictionaryKeyEncoder : FixedWidthKeyEncoder {
Result<std::shared_ptr<ArrayData>> Decode(uint8_t** encoded_bytes, int32_t length,
MemoryPool* pool) override;

EncoderInfo GetEncoderInfo() const override;

MemoryPool* pool_;
std::shared_ptr<Array> dictionary_;
};
Expand Down Expand Up @@ -248,6 +265,10 @@ struct ARROW_EXPORT VarLengthKeyEncoder : KeyEncoder {

explicit VarLengthKeyEncoder(std::shared_ptr<DataType> type) : type_(std::move(type)) {}

EncoderInfo GetEncoderInfo() const override {
return EncoderInfo{/*is_fixed_width=*/false, /*fixed_width=*/5};
}

std::shared_ptr<DataType> type_;
};

Expand All @@ -267,6 +288,10 @@ struct ARROW_EXPORT NullKeyEncoder : KeyEncoder {
MemoryPool* pool) override {
return ArrayData::Make(null(), length, {NULLPTR}, length);
}

EncoderInfo GetEncoderInfo() const override {
return EncoderInfo{/*is_fixed_width=*/true, /*fixed_width=*/0};
}
};

/// RowEncoder encodes ExecSpan to a variable length byte sequence
Expand Down Expand Up @@ -348,18 +373,38 @@ class ARROW_EXPORT RowEncoder {
return std::string(reinterpret_cast<const char*>(encoded_nulls_.data()),
encoded_nulls_.size());
}
int32_t row_length = offsets_[i + 1] - offsets_[i];
return std::string(reinterpret_cast<const char*>(bytes_.data() + offsets_[i]),
int32_t row_length = 0;
int32_t row_offset = 0;
if (fixed_width_length_ != kInvalidFixedWidthOffset) {
row_length = fixed_width_length_;
row_offset = fixed_width_length_ * i;
} else {
row_length = offsets_[i + 1] - offsets_[i];
row_offset = offsets_[i];
}
return std::string(reinterpret_cast<const char*>(bytes_.data() + row_offset),
row_length);
}

int32_t num_rows() const {
return offsets_.empty() ? 0 : static_cast<int32_t>(offsets_.size() - 1);
if (kInvalidFixedWidthOffset == fixed_width_length_) {
return fixed_with_row_count_;
}
return offsets_.empty() ? 0 : offsets_[0];
}

private:
Status EncodeAndAppendForFixedWidth(const ExecSpan& batch);

private:
static constexpr int32_t kInvalidFixedWidthOffset = 1;
ExecContext* ctx_{nullptr};
std::vector<std::shared_ptr<KeyEncoder>> encoders_;
// When all columns in a row are Fixed-width or NA, the encoded row
// doesn't need to maintain the column offsets. In this case, the
// offsets_.size() would be also be empty.
int32_t fixed_width_length_{kInvalidFixedWidthOffset};
int32_t fixed_with_row_count_{0};
// offsets_ vector stores the starting position (offset) of each encoded row
// within the bytes_ vector. This allows for quick access to individual rows.
//
Expand Down

0 comments on commit f8953d5

Please sign in to comment.