From a16021a016176df02a0c689ce6fee3f7d7f907ad Mon Sep 17 00:00:00 2001 From: mwish Date: Mon, 2 Sep 2024 23:06:10 +0800 Subject: [PATCH] GH-43758: [C++] Compute: More comment in RowEncoder (#43763) ### Rationale for this change Some comments for RowEncoder ### What changes are included in this PR? Some comments for RowEncoder ### Are these changes tested? Covered by existing ### Are there any user-facing changes? no * GitHub Issue: #43758 Lead-authored-by: mwish Co-authored-by: mwish <1506118561@qq.com> Co-authored-by: mwish Co-authored-by: Antoine Pitrou Co-authored-by: Rossi Sun Signed-off-by: mwish --- cpp/src/arrow/compute/light_array_internal.h | 6 +- .../arrow/compute/row/row_encoder_internal.cc | 56 +++---- .../arrow/compute/row/row_encoder_internal.h | 154 ++++++++++++++---- cpp/src/arrow/compute/row/row_internal.h | 2 +- 4 files changed, 155 insertions(+), 63 deletions(-) diff --git a/cpp/src/arrow/compute/light_array_internal.h b/cpp/src/arrow/compute/light_array_internal.h index b8e48f096baeb..5adb06e540009 100644 --- a/cpp/src/arrow/compute/light_array_internal.h +++ b/cpp/src/arrow/compute/light_array_internal.h @@ -65,12 +65,12 @@ struct ARROW_EXPORT KeyColumnMetadata { /// If this is true the column will have a validity buffer and /// a data buffer and the third buffer will be unused. bool is_fixed_length; - /// \brief True if this column is the null type + /// \brief True if this column is the null type(NA). bool is_null_type; /// \brief The number of bytes for each item /// /// Zero has a special meaning, indicating a bit vector with one bit per value if it - /// isn't a null type column. + /// isn't a null type column. Generally, this means that the column is a boolean type. /// /// For a varying-length binary column this represents the number of bytes per offset. uint32_t fixed_length; @@ -405,7 +405,7 @@ class ARROW_EXPORT ExecBatchBuilder { int num_rows() const { return values_.empty() ? 0 : values_[0].num_rows(); } - static int num_rows_max() { return 1 << kLogNumRows; } + static constexpr int num_rows_max() { return 1 << kLogNumRows; } private: static constexpr int kLogNumRows = 15; diff --git a/cpp/src/arrow/compute/row/row_encoder_internal.cc b/cpp/src/arrow/compute/row/row_encoder_internal.cc index 414cc6793a5a3..0965e4e8f9571 100644 --- a/cpp/src/arrow/compute/row/row_encoder_internal.cc +++ b/cpp/src/arrow/compute/row/row_encoder_internal.cc @@ -145,41 +145,37 @@ void FixedWidthKeyEncoder::AddLengthNull(int32_t* length) { Status FixedWidthKeyEncoder::Encode(const ExecValue& data, int64_t batch_length, uint8_t** encoded_bytes) { + auto handle_next_valid_value = [&](std::string_view bytes) { + auto& encoded_ptr = *encoded_bytes++; + *encoded_ptr++ = kValidByte; + memcpy(encoded_ptr, bytes.data(), byte_width_); + encoded_ptr += byte_width_; + }; + auto handle_next_null_value = [&] { + auto& encoded_ptr = *encoded_bytes++; + *encoded_ptr++ = kNullByte; + memset(encoded_ptr, 0, byte_width_); + encoded_ptr += byte_width_; + }; if (data.is_array()) { ArraySpan viewed = data.array; + // The original type might not be FixedSizeBinaryType, but it would + // treat the input as binary data. auto view_ty = fixed_size_binary(byte_width_); viewed.type = view_ty.get(); - VisitArraySpanInline( - viewed, - [&](std::string_view bytes) { - auto& encoded_ptr = *encoded_bytes++; - *encoded_ptr++ = kValidByte; - memcpy(encoded_ptr, bytes.data(), byte_width_); - encoded_ptr += byte_width_; - }, - [&] { - auto& encoded_ptr = *encoded_bytes++; - *encoded_ptr++ = kNullByte; - memset(encoded_ptr, 0, byte_width_); - encoded_ptr += byte_width_; - }); + VisitArraySpanInline(viewed, handle_next_valid_value, + handle_next_null_value); } else { const auto& scalar = data.scalar_as(); if (scalar.is_valid) { - const std::string_view data = scalar.view(); - DCHECK_EQ(data.size(), static_cast(byte_width_)); + const std::string_view scalar_data = scalar.view(); + DCHECK_EQ(scalar_data.size(), static_cast(byte_width_)); for (int64_t i = 0; i < batch_length; i++) { - auto& encoded_ptr = *encoded_bytes++; - *encoded_ptr++ = kValidByte; - memcpy(encoded_ptr, data.data(), data.size()); - encoded_ptr += byte_width_; + handle_next_valid_value(scalar_data); } } else { for (int64_t i = 0; i < batch_length; i++) { - auto& encoded_ptr = *encoded_bytes++; - *encoded_ptr++ = kNullByte; - memset(encoded_ptr, 0, byte_width_); - encoded_ptr += byte_width_; + handle_next_null_value(); } } } @@ -267,11 +263,11 @@ void RowEncoder::Init(const std::vector& column_types, ExecContext* for (size_t i = 0; i < column_types.size(); ++i) { const bool is_extension = column_types[i].id() == Type::EXTENSION; - const TypeHolder& type = is_extension - ? arrow::internal::checked_pointer_cast( - column_types[i].GetSharedPtr()) - ->storage_type() - : column_types[i]; + const TypeHolder& type = + is_extension + ? arrow::internal::checked_cast(column_types[i].type) + ->storage_type() + : column_types[i]; if (is_extension) { extension_types_[i] = arrow::internal::checked_pointer_cast( @@ -379,7 +375,7 @@ Result RowEncoder::Decode(int64_t num_rows, const int32_t* row_ids) { ARROW_ASSIGN_OR_RAISE(out.values[i], ::arrow::internal::GetArrayView( column_array_data, extension_types_[i])) } else { - out.values[i] = column_array_data; + out.values[i] = std::move(column_array_data); } } diff --git a/cpp/src/arrow/compute/row/row_encoder_internal.h b/cpp/src/arrow/compute/row/row_encoder_internal.h index 60eb14af504f7..4d6cc34af2342 100644 --- a/cpp/src/arrow/compute/row/row_encoder_internal.h +++ b/cpp/src/arrow/compute/row/row_encoder_internal.h @@ -38,16 +38,41 @@ struct ARROW_EXPORT KeyEncoder { virtual ~KeyEncoder() = default; + // Increment the values in the lengths array by the length of the encoded key for the + // corresponding value in the given column. + // + // Generally if Encoder is for a fixed-width type, the length of the encoded key + // would add ExtraByteForNull + byte_width. + // If Encoder is for a variable-width type, the length would add ExtraByteForNull + + // sizeof(Offset) + buffer_size. + // If Encoder is for null type, the length would add 0. virtual void AddLength(const ExecValue& value, int64_t batch_length, int32_t* lengths) = 0; + // Increment the length by the length of an encoded null value. + // It's a special case for AddLength like `AddLength(Null-Scalar, 1, lengths)`. virtual void AddLengthNull(int32_t* length) = 0; + // Encode the column into the encoded_bytes, which is an array of pointers to each row + // buffer. + // + // If value is an array, the array-size should be batch_length. + // If value is a scalar, the value would repeat batch_length times. + // NB: The pointers in the encoded_bytes will be advanced as values being encoded into. virtual Status Encode(const ExecValue&, int64_t batch_length, uint8_t** encoded_bytes) = 0; + // Encode a null value into the encoded_bytes, which is an array of pointers to each row + // buffer. + // + // It's a special case for Encode like `Encode(Null-Scalar, 1, encoded_bytes)`. + // NB: The pointers in the encoded_bytes will be advanced as values being encoded into. virtual void EncodeNull(uint8_t** encoded_bytes) = 0; + // Decode the encoded key from the encoded_bytes, which is an array of pointers to each + // row buffer, into an ArrayData. + // + // NB: The pointers in the encoded_bytes will be advanced as values being decoded from. virtual Result> Decode(uint8_t** encoded_bytes, int32_t length, MemoryPool*) = 0; @@ -94,7 +119,7 @@ struct ARROW_EXPORT FixedWidthKeyEncoder : KeyEncoder { MemoryPool* pool) override; std::shared_ptr type_; - int byte_width_; + const int byte_width_; }; struct ARROW_EXPORT DictionaryKeyEncoder : FixedWidthKeyEncoder { @@ -118,6 +143,7 @@ struct ARROW_EXPORT VarLengthKeyEncoder : KeyEncoder { void AddLength(const ExecValue& data, int64_t batch_length, int32_t* lengths) override { if (data.is_array()) { int64_t i = 0; + ARROW_DCHECK_EQ(data.array.length, batch_length); VisitArraySpanInline( data.array, [&](std::string_view bytes) { @@ -142,41 +168,34 @@ struct ARROW_EXPORT VarLengthKeyEncoder : KeyEncoder { Status Encode(const ExecValue& data, int64_t batch_length, uint8_t** encoded_bytes) override { + auto handle_next_valid_value = [&encoded_bytes](std::string_view bytes) { + auto& encoded_ptr = *encoded_bytes++; + *encoded_ptr++ = kValidByte; + util::SafeStore(encoded_ptr, static_cast(bytes.size())); + encoded_ptr += sizeof(Offset); + memcpy(encoded_ptr, bytes.data(), bytes.size()); + encoded_ptr += bytes.size(); + }; + auto handle_next_null_value = [&encoded_bytes]() { + auto& encoded_ptr = *encoded_bytes++; + *encoded_ptr++ = kNullByte; + util::SafeStore(encoded_ptr, static_cast(0)); + encoded_ptr += sizeof(Offset); + }; if (data.is_array()) { - VisitArraySpanInline( - data.array, - [&](std::string_view bytes) { - auto& encoded_ptr = *encoded_bytes++; - *encoded_ptr++ = kValidByte; - util::SafeStore(encoded_ptr, static_cast(bytes.size())); - encoded_ptr += sizeof(Offset); - memcpy(encoded_ptr, bytes.data(), bytes.size()); - encoded_ptr += bytes.size(); - }, - [&] { - auto& encoded_ptr = *encoded_bytes++; - *encoded_ptr++ = kNullByte; - util::SafeStore(encoded_ptr, static_cast(0)); - encoded_ptr += sizeof(Offset); - }); + DCHECK_EQ(data.length(), batch_length); + VisitArraySpanInline(data.array, handle_next_valid_value, + handle_next_null_value); } else { const auto& scalar = data.scalar_as(); if (scalar.is_valid) { - const auto& bytes = *scalar.value; + const auto bytes = std::string_view{*scalar.value}; for (int64_t i = 0; i < batch_length; i++) { - auto& encoded_ptr = *encoded_bytes++; - *encoded_ptr++ = kValidByte; - util::SafeStore(encoded_ptr, static_cast(bytes.size())); - encoded_ptr += sizeof(Offset); - memcpy(encoded_ptr, bytes.data(), bytes.size()); - encoded_ptr += bytes.size(); + handle_next_valid_value(bytes); } } else { for (int64_t i = 0; i < batch_length; i++) { - auto& encoded_ptr = *encoded_bytes++; - *encoded_ptr++ = kNullByte; - util::SafeStore(encoded_ptr, static_cast(0)); - encoded_ptr += sizeof(Offset); + handle_next_null_value(); } } } @@ -250,6 +269,68 @@ struct ARROW_EXPORT NullKeyEncoder : KeyEncoder { } }; +/// RowEncoder encodes ExecSpan to a variable length byte sequence +/// created by concatenating the encoded form of each column. The encoding +/// for each column depends on its data type. +/// +/// This is used to encode columns into row-major format, which will be +/// beneficial for grouping and joining operations. +/// +/// Unlike DuckDB and arrow-rs, currently this row format can not help +/// sortings because the row-format is uncomparable. +/// +/// # Key Column Encoding +/// +/// The row format is composed of the the KeyColumn encodings for each, +/// and the column is encoded as follows: +/// 1. A null byte for each column, indicating whether the column is null. +/// "1" for null, "0" for non-null. +/// 2. The "fixed width" encoding for the column, it would exist whether +/// the column is null or not. +/// 3. The "variable payload" encoding for the column, it would exists only +/// for non-null string/binary columns. +/// For string/binary columns, the length of the payload is in +/// "fixed width" part, and the binary contents are in the +/// "variable payload" part. +/// 4. Specially, if all columns in a row are null, the caller may decide +/// to refer to kRowIdForNulls instead of actually encoding/decoding +/// it using any KeyEncoder. See the comment for encoded_nulls_. +/// +/// The endianness of the encoded bytes is platform-dependent. +/// +/// ## Null Type +/// +/// Null Type is a special case, it doesn't occupy any space in the +/// encoded row. +/// +/// ## Fixed Width Type +/// +/// Fixed Width Type is encoded as a fixed-width byte sequence. For example: +/// ``` +/// Int8: 5, null, 6 +/// ``` +/// Would be encoded as [0 5], [1 0], [0 6]. +/// +/// ### Dictionary Type +/// +/// Dictionary Type is encoded as a fixed-width byte sequence using +/// dictionary indices, the dictionary should be identical for all +/// rows. +/// +/// ## Variable Width Type +/// +/// Variable Width Type is encoded as: +/// [null byte, variable-byte length, variable bytes]. For example: +/// +/// String "abc" Would be encoded as: +/// 0 ( 1 byte for not null) + 3 ( 4 bytes for length ) + "abc" (payload) +/// +/// Null string Would be encoded as: +/// 1 ( 1 byte for null) + 0 ( 4 bytes for length ) +/// +/// # Row Encoding +/// +/// The row format is the concatenation of the encodings of each column. class ARROW_EXPORT RowEncoder { public: static constexpr int kRowIdForNulls() { return -1; } @@ -259,6 +340,9 @@ class ARROW_EXPORT RowEncoder { Status EncodeAndAppend(const ExecSpan& batch); Result Decode(int64_t num_rows, const int32_t* row_ids); + // Returns the encoded representation of the row at index i. + // If i is kRowIdForNulls, it returns the pre-encoded all-nulls + // row. inline std::string encoded_row(int32_t i) const { if (i == kRowIdForNulls()) { return std::string(reinterpret_cast(encoded_nulls_.data()), @@ -270,14 +354,26 @@ class ARROW_EXPORT RowEncoder { } int32_t num_rows() const { - return offsets_.size() == 0 ? 0 : static_cast(offsets_.size() - 1); + return offsets_.empty() ? 0 : static_cast(offsets_.size() - 1); } private: ExecContext* ctx_{nullptr}; std::vector> encoders_; + // offsets_ vector stores the starting position (offset) of each encoded row + // within the bytes_ vector. This allows for quick access to individual rows. + // + // The size would be num_rows + 1 if not empty, the last element is the total + // length of the bytes_ vector. std::vector offsets_; + // The encoded bytes of all non "kRowIdForNulls" rows. std::vector bytes_; + // A pre-encoded constant row with all its columns being null. Useful when + // the caller is certain that an entire row is null and then uses kRowIdForNulls + // to refer to it. + // + // EncodeAndAppend would never append this row, but encoded_row and Decode would + // return this row when kRowIdForNulls is passed. std::vector encoded_nulls_; std::vector> extension_types_; }; diff --git a/cpp/src/arrow/compute/row/row_internal.h b/cpp/src/arrow/compute/row/row_internal.h index 094a9c31efe0a..3ab86fd1fc6ed 100644 --- a/cpp/src/arrow/compute/row/row_internal.h +++ b/cpp/src/arrow/compute/row/row_internal.h @@ -38,7 +38,7 @@ struct ARROW_EXPORT RowTableMetadata { /// For a fixed-length binary row, common size of rows in bytes, /// rounded up to the multiple of alignment. /// - /// For a varying-length binary, size of all encoded fixed-length key columns, + /// For a varying-length binary row, size of all encoded fixed-length key columns, /// including lengths of varying-length columns, rounded up to the multiple of string /// alignment. uint32_t fixed_length;