Skip to content

Commit

Permalink
Update all comments in row_encoder
Browse files Browse the repository at this point in the history
  • Loading branch information
mapleFU committed Aug 19, 2024
1 parent c182138 commit 9643f67
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 57 deletions.
50 changes: 22 additions & 28 deletions cpp/src/arrow/compute/row/row_encoder_internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -145,41 +145,35 @@ void FixedWidthKeyEncoder::AddLengthNull(int32_t* length) {

Status FixedWidthKeyEncoder::Encode(const ExecValue& data, int64_t batch_length,
uint8_t** encoded_bytes) {
auto handle_next_valid_value = [&](std::string_view bytes) {
auto& encoded_ptr = *encoded_bytes++;
*encoded_ptr++ = kValidByte;
memcpy(encoded_ptr, bytes.data(), byte_width_);
encoded_ptr += byte_width_;
};
auto handle_next_null_value = [&] {
auto& encoded_ptr = *encoded_bytes++;
*encoded_ptr++ = kNullByte;
memset(encoded_ptr, 0, byte_width_);
encoded_ptr += byte_width_;
};
if (data.is_array()) {
ArraySpan viewed = data.array;
auto view_ty = fixed_size_binary(byte_width_);
viewed.type = view_ty.get();
VisitArraySpanInline<FixedSizeBinaryType>(
viewed,
[&](std::string_view bytes) {
auto& encoded_ptr = *encoded_bytes++;
*encoded_ptr++ = kValidByte;
memcpy(encoded_ptr, bytes.data(), byte_width_);
encoded_ptr += byte_width_;
},
[&] {
auto& encoded_ptr = *encoded_bytes++;
*encoded_ptr++ = kNullByte;
memset(encoded_ptr, 0, byte_width_);
encoded_ptr += byte_width_;
});
VisitArraySpanInline<FixedSizeBinaryType>(viewed, handle_next_valid_value,
handle_next_null_value);
} else {
const auto& scalar = data.scalar_as<arrow::internal::PrimitiveScalarBase>();
if (scalar.is_valid) {
const std::string_view data = scalar.view();
DCHECK_EQ(data.size(), static_cast<size_t>(byte_width_));
for (int64_t i = 0; i < batch_length; i++) {
auto& encoded_ptr = *encoded_bytes++;
*encoded_ptr++ = kValidByte;
memcpy(encoded_ptr, data.data(), data.size());
encoded_ptr += byte_width_;
handle_next_valid_value(data);
}
} else {
for (int64_t i = 0; i < batch_length; i++) {
auto& encoded_ptr = *encoded_bytes++;
*encoded_ptr++ = kNullByte;
memset(encoded_ptr, 0, byte_width_);
encoded_ptr += byte_width_;
handle_next_null_value();
}
}
}
Expand Down Expand Up @@ -267,11 +261,11 @@ void RowEncoder::Init(const std::vector<TypeHolder>& column_types, ExecContext*

for (size_t i = 0; i < column_types.size(); ++i) {
const bool is_extension = column_types[i].id() == Type::EXTENSION;
const TypeHolder& type = is_extension
? arrow::internal::checked_pointer_cast<ExtensionType>(
column_types[i].GetSharedPtr())
->storage_type()
: column_types[i];
const TypeHolder& type =
is_extension
? arrow::internal::checked_cast<const ExtensionType*>(column_types[i].type)
->storage_type()
: column_types[i];

if (is_extension) {
extension_types_[i] = arrow::internal::checked_pointer_cast<ExtensionType>(
Expand Down Expand Up @@ -379,7 +373,7 @@ Result<ExecBatch> RowEncoder::Decode(int64_t num_rows, const int32_t* row_ids) {
ARROW_ASSIGN_OR_RAISE(out.values[i], ::arrow::internal::GetArrayView(
column_array_data, extension_types_[i]))
} else {
out.values[i] = column_array_data;
out.values[i] = std::move(column_array_data);
}
}

Expand Down
62 changes: 33 additions & 29 deletions cpp/src/arrow/compute/row/row_encoder_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ struct ARROW_EXPORT KeyEncoder {

virtual ~KeyEncoder() = default;

// Fill the length of the encoded key for the given value to the lengths array.
// Increment the length of the encoded key for the given value to the lengths array.
//
// Generally if Encoder is for a fixed-width type, the length of the encoded key
// is ExtraByteForNull + byte_width.
Expand All @@ -48,14 +48,22 @@ struct ARROW_EXPORT KeyEncoder {
virtual void AddLength(const ExecValue& value, int64_t batch_length,
int32_t* lengths) = 0;

// Fill the length of the encoded key for a null value to the length array.
// Increment the length for a null value.
virtual void AddLengthNull(int32_t* length) = 0;

// Encode the value into the encoded_bytes buffer.
//
// If value is an array, the array-size should be batch_length.
// If value is a scalar, the value would repeat batch_length times.
virtual Status Encode(const ExecValue&, int64_t batch_length,
uint8_t** encoded_bytes) = 0;

// Add a null byte to the encoded_bytes buffer.
//
// It's a special case for Encode like `Encode(Null-Scalar, 1, encoded_bytes)`.
virtual void EncodeNull(uint8_t** encoded_bytes) = 0;

// Decode the encoded key from `encoded_bytes` buffer to an ArrayData.
virtual Result<std::shared_ptr<ArrayData>> Decode(uint8_t** encoded_bytes,
int32_t length, MemoryPool*) = 0;

Expand Down Expand Up @@ -150,41 +158,33 @@ struct ARROW_EXPORT VarLengthKeyEncoder : KeyEncoder {

Status Encode(const ExecValue& data, int64_t batch_length,
uint8_t** encoded_bytes) override {
auto handle_next_valid_value = [&encoded_bytes](std::string_view bytes) {
auto& encoded_ptr = *encoded_bytes++;
*encoded_ptr++ = kValidByte;
util::SafeStore(encoded_ptr, static_cast<Offset>(bytes.size()));
encoded_ptr += sizeof(Offset);
memcpy(encoded_ptr, bytes.data(), bytes.size());
encoded_ptr += bytes.size();
};
auto handle_next_null_value = [&encoded_bytes]() {
auto& encoded_ptr = *encoded_bytes++;
*encoded_ptr++ = kNullByte;
util::SafeStore(encoded_ptr, static_cast<Offset>(0));
encoded_ptr += sizeof(Offset);
};
if (data.is_array()) {
VisitArraySpanInline<T>(
data.array,
[&](std::string_view bytes) {
auto& encoded_ptr = *encoded_bytes++;
*encoded_ptr++ = kValidByte;
util::SafeStore(encoded_ptr, static_cast<Offset>(bytes.size()));
encoded_ptr += sizeof(Offset);
memcpy(encoded_ptr, bytes.data(), bytes.size());
encoded_ptr += bytes.size();
},
[&] {
auto& encoded_ptr = *encoded_bytes++;
*encoded_ptr++ = kNullByte;
util::SafeStore(encoded_ptr, static_cast<Offset>(0));
encoded_ptr += sizeof(Offset);
});
VisitArraySpanInline<T>(data.array, handle_next_valid_value,
handle_next_null_value);
} else {
const auto& scalar = data.scalar_as<BaseBinaryScalar>();
if (scalar.is_valid) {
const auto& bytes = *scalar.value;
for (int64_t i = 0; i < batch_length; i++) {
auto& encoded_ptr = *encoded_bytes++;
*encoded_ptr++ = kValidByte;
util::SafeStore(encoded_ptr, static_cast<Offset>(bytes.size()));
encoded_ptr += sizeof(Offset);
memcpy(encoded_ptr, bytes.data(), bytes.size());
encoded_ptr += bytes.size();
handle_next_valid_value(bytes);
}
} else {
for (int64_t i = 0; i < batch_length; i++) {
auto& encoded_ptr = *encoded_bytes++;
*encoded_ptr++ = kNullByte;
util::SafeStore(encoded_ptr, static_cast<Offset>(0));
encoded_ptr += sizeof(Offset);
handle_next_null_value();
}
}
}
Expand Down Expand Up @@ -267,6 +267,7 @@ class ARROW_EXPORT RowEncoder {
Status EncodeAndAppend(const ExecSpan& batch);
Result<ExecBatch> Decode(int64_t num_rows, const int32_t* row_ids);

// Return the encoded row at the given index as a string
inline std::string encoded_row(int32_t i) const {
if (i == kRowIdForNulls()) {
return std::string(reinterpret_cast<const char*>(encoded_nulls_.data()),
Expand All @@ -278,14 +279,17 @@ class ARROW_EXPORT RowEncoder {
}

int32_t num_rows() const {
return offsets_.size() == 0 ? 0 : static_cast<int32_t>(offsets_.size() - 1);
return offsets_.empty() ? 0 : static_cast<int32_t>(offsets_.size() - 1);
}

private:
ExecContext* ctx_{nullptr};
std::vector<std::shared_ptr<KeyEncoder>> encoders_;
// The offsets of each row in the encoded bytes.
// The size would be num_rows + 1 if there are rows.
std::vector<int32_t> offsets_;
std::vector<uint8_t> bytes_;
// A fixed nulls buffer for all null rows(kRowIdForNulls).
std::vector<uint8_t> encoded_nulls_;
std::vector<std::shared_ptr<ExtensionType>> extension_types_;
};
Expand Down

0 comments on commit 9643f67

Please sign in to comment.