From 5f1f0dd503ac55facfb91ae0c528b88b306831df Mon Sep 17 00:00:00 2001 From: Ed Seidl Date: Tue, 7 May 2024 20:15:50 -0700 Subject: [PATCH] Round trip FIXED_LEN_BYTE_ARRAY data properly in Parquet writer (#15600) #13437 added the ability to consume FIXED_LEN_BYTE_ARRAY encoded data and represent it as lists of `UINT8`. When trying to write this data back to Parquet there are two problems. 1) the notion of fixed length is lost, and 2) the `UINT8` data is written as a list of `INT32` which can quadruple the storage required. This PR addresses both issues by adding fields to the input and output metadata to allow for preserving the form of the original data. Authors: - Ed Seidl (https://github.com/etseidl) - Vukasin Milovanovic (https://github.com/vuule) Approvers: - Vukasin Milovanovic (https://github.com/vuule) - Muhammad Haseeb (https://github.com/mhaseeb123) URL: https://github.com/rapidsai/cudf/pull/15600 --- cpp/include/cudf/io/types.hpp | 59 ++++++++++++++- cpp/src/io/functions.cpp | 2 + cpp/src/io/parquet/page_enc.cu | 32 +++++--- cpp/src/io/parquet/parquet_gpu.hpp | 1 + cpp/src/io/parquet/reader_impl.cpp | 16 ++-- cpp/src/io/parquet/reader_impl_helpers.cpp | 7 +- cpp/src/io/parquet/writer_impl.cu | 13 +++- cpp/src/io/utilities/column_buffer.cpp | 5 ++ cpp/tests/io/parquet_writer_test.cpp | 86 ++++++++++++++++++++++ 9 files changed, 198 insertions(+), 23 deletions(-) diff --git a/cpp/include/cudf/io/types.hpp b/cpp/include/cudf/io/types.hpp index b3dea0ab280..150e997f533 100644 --- a/cpp/include/cudf/io/types.hpp +++ b/cpp/include/cudf/io/types.hpp @@ -236,6 +236,8 @@ enum dictionary_policy { struct column_name_info { std::string name; ///< Column name std::optional is_nullable; ///< Column nullability + std::optional is_binary; ///< Column is binary (i.e. not a list) + std::optional type_length; ///< Byte width of data (for fixed length data) std::vector children; ///< Child column names /** @@ -243,9 +245,12 @@ struct column_name_info { * * @param _name Column name * @param _is_nullable True if column is nullable + * @param _is_binary True if column is binary data */ - column_name_info(std::string const& _name, std::optional _is_nullable = std::nullopt) - : name(_name), is_nullable(_is_nullable) + column_name_info(std::string const& _name, + std::optional _is_nullable = std::nullopt, + std::optional _is_binary = std::nullopt) + : name(_name), is_nullable(_is_nullable), is_binary(_is_binary) { } @@ -606,6 +611,7 @@ class column_in_metadata { bool _skip_compression = false; std::optional _decimal_precision; std::optional _parquet_field_id; + std::optional _type_length; std::vector children; column_encoding _encoding = column_encoding::USE_DEFAULT; @@ -693,6 +699,19 @@ class column_in_metadata { return *this; } + /** + * @brief Set the data length of the column. Only valid if this column is a + * fixed-length byte array. + * + * @param length The data length to set for this column + * @return this for chaining + */ + column_in_metadata& set_type_length(int32_t length) noexcept + { + _type_length = length; + return *this; + } + /** * @brief Set the parquet field id of this column. * @@ -826,6 +845,22 @@ class column_in_metadata { */ [[nodiscard]] uint8_t get_decimal_precision() const { return _decimal_precision.value(); } + /** + * @brief Get whether type length has been set for this column + * + * @return Boolean indicating whether type length has been set for this column + */ + [[nodiscard]] bool is_type_length_set() const noexcept { return _type_length.has_value(); } + + /** + * @brief Get the type length that was set for this column. + * + * @throws std::bad_optional_access If type length was not set for this + * column. Check using `is_type_length_set()` first. + * @return The decimal precision that was set for this column + */ + [[nodiscard]] uint8_t get_type_length() const { return _type_length.value(); } + /** * @brief Get whether parquet field id has been set for this column. * @@ -932,6 +967,7 @@ struct partition_info { class reader_column_schema { // Whether to read binary data as a string column bool _convert_binary_to_strings{true}; + int32_t _type_length{0}; std::vector children; @@ -997,6 +1033,18 @@ class reader_column_schema { return *this; } + /** + * @brief Sets the length of fixed length data. + * + * @param type_length Size of the data type in bytes + * @return this for chaining + */ + reader_column_schema& set_type_length(int32_t type_length) + { + _type_length = type_length; + return *this; + } + /** * @brief Get whether to encode this column as binary or string data * @@ -1007,6 +1055,13 @@ class reader_column_schema { return _convert_binary_to_strings; } + /** + * @brief Get the length in bytes of this fixed length data. + * + * @return The length in bytes of the data type + */ + [[nodiscard]] int32_t get_type_length() const { return _type_length; } + /** * @brief Get the number of child objects * diff --git a/cpp/src/io/functions.cpp b/cpp/src/io/functions.cpp index 98b010109ec..0358a1a6b86 100644 --- a/cpp/src/io/functions.cpp +++ b/cpp/src/io/functions.cpp @@ -592,6 +592,8 @@ table_input_metadata::table_input_metadata(table_metadata const& metadata) [&](column_name_info const& name) { auto col_meta = column_in_metadata{name.name}; if (name.is_nullable.has_value()) { col_meta.set_nullability(name.is_nullable.value()); } + if (name.is_binary.value_or(false)) { col_meta.set_output_as_binary(true); } + if (name.type_length.has_value()) { col_meta.set_type_length(name.type_length.value()); } std::transform(name.children.begin(), name.children.end(), std::back_inserter(col_meta.children), diff --git a/cpp/src/io/parquet/page_enc.cu b/cpp/src/io/parquet/page_enc.cu index 11b18579c58..e9558735929 100644 --- a/cpp/src/io/parquet/page_enc.cu +++ b/cpp/src/io/parquet/page_enc.cu @@ -109,10 +109,10 @@ using rle_page_enc_state_s = page_enc_state_s; /** * @brief Returns the size of the type in the Parquet file. */ -constexpr uint32_t physical_type_len(Type physical_type, type_id id) +constexpr uint32_t physical_type_len(Type physical_type, type_id id, int type_length) { - if (physical_type == FIXED_LEN_BYTE_ARRAY and id == type_id::DECIMAL128) { - return sizeof(__int128_t); + if (physical_type == FIXED_LEN_BYTE_ARRAY) { + return id == type_id::DECIMAL128 ? sizeof(__int128_t) : type_length; } switch (physical_type) { case INT96: return 12u; @@ -183,7 +183,7 @@ void __device__ calculate_frag_size(frag_init_state_s* const s, int t) auto const physical_type = s->col.physical_type; auto const leaf_type = s->col.leaf_column->type().id(); - auto const dtype_len = physical_type_len(physical_type, leaf_type); + auto const dtype_len = physical_type_len(physical_type, leaf_type, s->col.type_length); auto const nvals = s->frag.num_leaf_values; auto const start_value_idx = s->frag.start_value_idx; @@ -541,7 +541,8 @@ __device__ size_t delta_data_len(Type physical_type, size_t page_size, encode_kernel_mask encoding) { - auto const dtype_len_out = physical_type_len(physical_type, type_id); + // dtype_len_out is for the lengths, rather than the char data, so pass sizeof(int32_t) + auto const dtype_len_out = physical_type_len(physical_type, type_id, sizeof(int32_t)); auto const dtype_len = [&]() -> uint32_t { if (physical_type == INT32) { return int32_logical_len(type_id); } if (physical_type == INT96) { return sizeof(int64_t); } @@ -1662,7 +1663,7 @@ CUDF_KERNEL void __launch_bounds__(block_size, 8) __syncthreads(); auto const physical_type = s->col.physical_type; auto const type_id = s->col.leaf_column->type().id(); - auto const dtype_len_out = physical_type_len(physical_type, type_id); + auto const dtype_len_out = physical_type_len(physical_type, type_id, s->col.type_length); auto const dtype_len_in = [&]() -> uint32_t { if (physical_type == INT32) { return int32_logical_len(type_id); } if (physical_type == INT96) { return sizeof(int64_t); } @@ -1837,6 +1838,19 @@ CUDF_KERNEL void __launch_bounds__(block_size, 8) thrust::make_reverse_iterator(v_char_ptr), dst + pos); } + } else { + auto const elem = + get_element(*(s->col.leaf_column), val_idx); + if (len != 0 and elem.data() != nullptr) { + if (is_split_stream) { + auto const v_char_ptr = reinterpret_cast(elem.data()); + for (int i = 0; i < dtype_len_out; i++, pos += stride) { + dst[pos] = v_char_ptr[i]; + } + } else { + memcpy(dst + pos, elem.data(), len); + } + } } } break; } @@ -1884,7 +1898,7 @@ CUDF_KERNEL void __launch_bounds__(block_size, 8) // Encode data values auto const physical_type = s->col.physical_type; auto const type_id = s->col.leaf_column->type().id(); - auto const dtype_len_out = physical_type_len(physical_type, type_id); + auto const dtype_len_out = physical_type_len(physical_type, type_id, s->col.type_length); auto const dtype_len_in = [&]() -> uint32_t { if (physical_type == INT32) { return int32_logical_len(type_id); } if (physical_type == INT96) { return sizeof(int64_t); } @@ -2016,7 +2030,7 @@ CUDF_KERNEL void __launch_bounds__(block_size, 8) // Encode data values auto const physical_type = s->col.physical_type; auto const type_id = s->col.leaf_column->type().id(); - auto const dtype_len_out = physical_type_len(physical_type, type_id); + auto const dtype_len_out = physical_type_len(physical_type, type_id, s->col.type_length); auto const dtype_len_in = [&]() -> uint32_t { if (physical_type == INT32) { return int32_logical_len(type_id); } if (physical_type == INT96) { return sizeof(int64_t); } @@ -3218,7 +3232,7 @@ __device__ int32_t calculate_boundary_order(statistics_chunk const* s, } // align ptr to an 8-byte boundary. address returned will be <= ptr. -constexpr __device__ void* align8(void* ptr) +inline __device__ void* align8(void* ptr) { // it's ok to round down because we have an extra 7 bytes in the buffer auto algn = 3 & reinterpret_cast(ptr); diff --git a/cpp/src/io/parquet/parquet_gpu.hpp b/cpp/src/io/parquet/parquet_gpu.hpp index 3b18175dccd..e3e4d8736c7 100644 --- a/cpp/src/io/parquet/parquet_gpu.hpp +++ b/cpp/src/io/parquet/parquet_gpu.hpp @@ -472,6 +472,7 @@ struct chunk_page_info { struct parquet_column_device_view : stats_column_desc { Type physical_type; //!< physical data type ConvertedType converted_type; //!< logical data type + int32_t type_length; //!< length of fixed_length_byte_array data uint8_t level_bits; //!< bits to encode max definition (lower nibble) & repetition (upper nibble) //!< levels constexpr uint8_t num_def_level_bits() const { return level_bits & 0xf; } diff --git a/cpp/src/io/parquet/reader_impl.cpp b/cpp/src/io/parquet/reader_impl.cpp index 0602b5ec007..3af4d5cdb86 100644 --- a/cpp/src/io/parquet/reader_impl.cpp +++ b/cpp/src/io/parquet/reader_impl.cpp @@ -510,14 +510,18 @@ table_with_metadata reader::impl::read_chunk_internal( // Create the final output cudf columns. for (size_t i = 0; i < _output_buffers.size(); ++i) { - auto metadata = _reader_column_schema.has_value() - ? std::make_optional((*_reader_column_schema)[i]) - : std::nullopt; - auto const& schema = _metadata->get_schema(_output_column_schemas[i]); - // FIXED_LEN_BYTE_ARRAY never read as string - if (schema.type == FIXED_LEN_BYTE_ARRAY and schema.converted_type != DECIMAL) { + auto metadata = _reader_column_schema.has_value() + ? std::make_optional((*_reader_column_schema)[i]) + : std::nullopt; + auto const& schema = _metadata->get_schema(_output_column_schemas[i]); + auto const logical_type = schema.logical_type.value_or(LogicalType{}); + // FIXED_LEN_BYTE_ARRAY never read as string. + // TODO: if we ever decide that the default reader behavior is to treat unannotated BINARY as + // binary and not strings, this test needs to change. + if (schema.type == FIXED_LEN_BYTE_ARRAY and logical_type.type != LogicalType::DECIMAL) { metadata = std::make_optional(); metadata->set_convert_binary_to_strings(false); + metadata->set_type_length(schema.type_length); } // Only construct `out_metadata` if `_output_metadata` has not been cached. if (!_output_metadata) { diff --git a/cpp/src/io/parquet/reader_impl_helpers.cpp b/cpp/src/io/parquet/reader_impl_helpers.cpp index c47beb8d7ed..68dbf532a68 100644 --- a/cpp/src/io/parquet/reader_impl_helpers.cpp +++ b/cpp/src/io/parquet/reader_impl_helpers.cpp @@ -165,9 +165,10 @@ type_id to_type_id(SchemaElement const& schema, case FLOAT: return type_id::FLOAT32; case DOUBLE: return type_id::FLOAT64; case BYTE_ARRAY: - case FIXED_LEN_BYTE_ARRAY: - // Can be mapped to INT32 (32-bit hash) or STRING - return strings_to_categorical ? type_id::INT32 : type_id::STRING; + // strings can be mapped to a 32-bit hash + if (strings_to_categorical) { return type_id::INT32; } + [[fallthrough]]; + case FIXED_LEN_BYTE_ARRAY: return type_id::STRING; case INT96: return (timestamp_type_id != type_id::EMPTY) ? timestamp_type_id : type_id::TIMESTAMP_NANOSECONDS; diff --git a/cpp/src/io/parquet/writer_impl.cu b/cpp/src/io/parquet/writer_impl.cu index 24aa630a05f..1dfced94f5b 100644 --- a/cpp/src/io/parquet/writer_impl.cu +++ b/cpp/src/io/parquet/writer_impl.cu @@ -755,7 +755,14 @@ std::vector construct_schema_tree( } schema_tree_node col_schema{}; - col_schema.type = Type::BYTE_ARRAY; + // test if this should be output as FIXED_LEN_BYTE_ARRAY + if (col_meta.is_type_length_set()) { + col_schema.type = Type::FIXED_LEN_BYTE_ARRAY; + col_schema.type_length = col_meta.get_type_length(); + } else { + col_schema.type = Type::BYTE_ARRAY; + } + col_schema.converted_type = thrust::nullopt; col_schema.stats_dtype = statistics_dtype::dtype_byte_array; col_schema.repetition_type = col_nullable ? OPTIONAL : REQUIRED; @@ -1075,6 +1082,7 @@ parquet_column_device_view parquet_column_view::get_device_view(rmm::cuda_stream auto desc = parquet_column_device_view{}; // Zero out all fields desc.stats_dtype = schema_node.stats_dtype; desc.ts_scale = schema_node.ts_scale; + desc.type_length = schema_node.type_length; if (is_list()) { desc.level_offsets = _dremel_offsets.data(); @@ -1317,8 +1325,7 @@ build_chunk_dictionaries(hostdevice_2dvector& chunks, chunk_col_desc.requested_encoding != column_encoding::USE_DEFAULT && chunk_col_desc.requested_encoding != column_encoding::DICTIONARY; auto const is_type_non_dict = - chunk_col_desc.physical_type == Type::BOOLEAN || - (chunk_col_desc.output_as_byte_array && chunk_col_desc.physical_type == Type::BYTE_ARRAY); + chunk_col_desc.physical_type == Type::BOOLEAN || chunk_col_desc.output_as_byte_array; if (is_type_non_dict || is_requested_non_dict) { chunk.use_dictionary = false; diff --git a/cpp/src/io/utilities/column_buffer.cpp b/cpp/src/io/utilities/column_buffer.cpp index db84778edc6..5ef43599838 100644 --- a/cpp/src/io/utilities/column_buffer.cpp +++ b/cpp/src/io/utilities/column_buffer.cpp @@ -188,6 +188,11 @@ std::unique_ptr make_column(column_buffer_base& buffer, if (schema_info != nullptr) { schema_info->children.push_back(column_name_info{"offsets"}); schema_info->children.push_back(column_name_info{"binary"}); + // cuDF type will be list, but remember it was originally binary data + schema_info->is_binary = true; + if (schema.has_value() and schema->get_type_length() > 0) { + schema_info->type_length = schema->get_type_length(); + } } return make_lists_column( diff --git a/cpp/tests/io/parquet_writer_test.cpp b/cpp/tests/io/parquet_writer_test.cpp index fd8484bc70f..ad0860e265e 100644 --- a/cpp/tests/io/parquet_writer_test.cpp +++ b/cpp/tests/io/parquet_writer_test.cpp @@ -1872,6 +1872,92 @@ TEST_F(ParquetWriterTest, DurationByteStreamSplit) test_durations([](auto i) { return false; }, true); } +TEST_F(ParquetWriterTest, WriteFixedLenByteArray) +{ + srand(31337); + using cudf::io::parquet::detail::Encoding; + constexpr int fixed_width = 16; + constexpr cudf::size_type num_rows = 200; + std::vector data(num_rows * fixed_width); + std::vector offsets(num_rows + 1); + + // fill a num_rows X fixed_width array with random numbers and populate offsets array + int cur_offset = 0; + for (int i = 0; i < num_rows; i++) { + offsets[i] = cur_offset; + for (int j = 0; j < fixed_width; j++, cur_offset++) { + data[cur_offset] = rand() & 0xff; + } + } + offsets[num_rows] = cur_offset; + + auto data_child = cudf::test::fixed_width_column_wrapper(data.begin(), data.end()); + auto off_child = cudf::test::fixed_width_column_wrapper(offsets.begin(), offsets.end()); + auto col = cudf::make_lists_column(num_rows, off_child.release(), data_child.release(), 0, {}); + + auto expected = table_view{{*col, *col, *col, *col}}; + cudf::io::table_input_metadata expected_metadata(expected); + + expected_metadata.column_metadata[0] + .set_name("flba_plain") + .set_type_length(fixed_width) + .set_encoding(cudf::io::column_encoding::PLAIN) + .set_output_as_binary(true); + expected_metadata.column_metadata[1] + .set_name("flba_split") + .set_type_length(fixed_width) + .set_encoding(cudf::io::column_encoding::BYTE_STREAM_SPLIT) + .set_output_as_binary(true); + expected_metadata.column_metadata[2] + .set_name("flba_delta") + .set_type_length(fixed_width) + .set_encoding(cudf::io::column_encoding::DELTA_BYTE_ARRAY) + .set_output_as_binary(true); + expected_metadata.column_metadata[3] + .set_name("flba_dict") + .set_type_length(fixed_width) + .set_encoding(cudf::io::column_encoding::DICTIONARY) + .set_output_as_binary(true); + + auto filepath = temp_env->get_temp_filepath("WriteFixedLenByteArray.parquet"); + cudf::io::parquet_writer_options out_opts = + cudf::io::parquet_writer_options::builder(cudf::io::sink_info{filepath}, expected) + .metadata(expected_metadata); + cudf::io::write_parquet(out_opts); + + cudf::io::parquet_reader_options in_opts = + cudf::io::parquet_reader_options::builder(cudf::io::source_info{filepath}); + auto result = cudf::io::read_parquet(in_opts); + + CUDF_TEST_EXPECT_TABLES_EQUAL(expected, result.tbl->view()); + + // check page headers to make sure each column is encoded with the appropriate encoder + auto const source = cudf::io::datasource::create(filepath); + cudf::io::parquet::detail::FileMetaData fmd; + read_footer(source, &fmd); + + // check that the schema retains the FIXED_LEN_BYTE_ARRAY type + for (int i = 1; i <= 4; i++) { + EXPECT_EQ(fmd.schema[i].type, cudf::io::parquet::detail::Type::FIXED_LEN_BYTE_ARRAY); + EXPECT_EQ(fmd.schema[i].type_length, fixed_width); + } + + // no nulls and no repetition, so the only encoding used should be for the data. + auto const expect_enc = [&fmd](int idx, cudf::io::parquet::detail::Encoding enc) { + EXPECT_EQ(fmd.row_groups[0].columns[idx].meta_data.encodings[0], enc); + }; + + // requested plain + expect_enc(0, Encoding::PLAIN); + // requested byte_stream_split + expect_enc(1, Encoding::BYTE_STREAM_SPLIT); + // requested delta_byte_array + expect_enc(2, Encoding::DELTA_BYTE_ARRAY); + // requested dictionary, but should fall back to plain + // TODO: update if we get FLBA working with dictionary encoding + expect_enc(3, Encoding::PLAIN); +} + ///////////////////////////////////////////////////////////// // custom mem mapped data sink that supports device writes template