Skip to content

Commit

Permalink
Round trip FIXED_LEN_BYTE_ARRAY data properly in Parquet writer (#15600)
Browse files Browse the repository at this point in the history
#13437 added the ability to consume FIXED_LEN_BYTE_ARRAY encoded data and represent it as lists of `UINT8`. When trying to write this data back to Parquet there are two problems. 1) the notion of fixed length is lost, and 2) the `UINT8` data is written as a list of `INT32` which can quadruple the storage required. This PR addresses both issues by adding fields to the input and output metadata to allow for preserving the form of the original data.

Authors:
  - Ed Seidl (https://github.com/etseidl)
  - Vukasin Milovanovic (https://github.com/vuule)

Approvers:
  - Vukasin Milovanovic (https://github.com/vuule)
  - Muhammad Haseeb (https://github.com/mhaseeb123)

URL: #15600
  • Loading branch information
etseidl authored May 8, 2024
1 parent 46ae8cb commit 5f1f0dd
Show file tree
Hide file tree
Showing 9 changed files with 198 additions and 23 deletions.
59 changes: 57 additions & 2 deletions cpp/include/cudf/io/types.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -236,16 +236,21 @@ enum dictionary_policy {
struct column_name_info {
std::string name; ///< Column name
std::optional<bool> is_nullable; ///< Column nullability
std::optional<bool> is_binary; ///< Column is binary (i.e. not a list)
std::optional<int32_t> type_length; ///< Byte width of data (for fixed length data)
std::vector<column_name_info> children; ///< Child column names

/**
* @brief Construct a column name info with a name, optional nullabilty, and no children
*
* @param _name Column name
* @param _is_nullable True if column is nullable
* @param _is_binary True if column is binary data
*/
column_name_info(std::string const& _name, std::optional<bool> _is_nullable = std::nullopt)
: name(_name), is_nullable(_is_nullable)
column_name_info(std::string const& _name,
std::optional<bool> _is_nullable = std::nullopt,
std::optional<bool> _is_binary = std::nullopt)
: name(_name), is_nullable(_is_nullable), is_binary(_is_binary)
{
}

Expand Down Expand Up @@ -606,6 +611,7 @@ class column_in_metadata {
bool _skip_compression = false;
std::optional<uint8_t> _decimal_precision;
std::optional<int32_t> _parquet_field_id;
std::optional<int32_t> _type_length;
std::vector<column_in_metadata> children;
column_encoding _encoding = column_encoding::USE_DEFAULT;

Expand Down Expand Up @@ -693,6 +699,19 @@ class column_in_metadata {
return *this;
}

/**
* @brief Set the data length of the column. Only valid if this column is a
* fixed-length byte array.
*
* @param length The data length to set for this column
* @return this for chaining
*/
column_in_metadata& set_type_length(int32_t length) noexcept
{
_type_length = length;
return *this;
}

/**
* @brief Set the parquet field id of this column.
*
Expand Down Expand Up @@ -826,6 +845,22 @@ class column_in_metadata {
*/
[[nodiscard]] uint8_t get_decimal_precision() const { return _decimal_precision.value(); }

/**
* @brief Get whether type length has been set for this column
*
* @return Boolean indicating whether type length has been set for this column
*/
[[nodiscard]] bool is_type_length_set() const noexcept { return _type_length.has_value(); }

/**
* @brief Get the type length that was set for this column.
*
* @throws std::bad_optional_access If type length was not set for this
* column. Check using `is_type_length_set()` first.
* @return The decimal precision that was set for this column
*/
[[nodiscard]] uint8_t get_type_length() const { return _type_length.value(); }

/**
* @brief Get whether parquet field id has been set for this column.
*
Expand Down Expand Up @@ -932,6 +967,7 @@ struct partition_info {
class reader_column_schema {
// Whether to read binary data as a string column
bool _convert_binary_to_strings{true};
int32_t _type_length{0};

std::vector<reader_column_schema> children;

Expand Down Expand Up @@ -997,6 +1033,18 @@ class reader_column_schema {
return *this;
}

/**
* @brief Sets the length of fixed length data.
*
* @param type_length Size of the data type in bytes
* @return this for chaining
*/
reader_column_schema& set_type_length(int32_t type_length)
{
_type_length = type_length;
return *this;
}

/**
* @brief Get whether to encode this column as binary or string data
*
Expand All @@ -1007,6 +1055,13 @@ class reader_column_schema {
return _convert_binary_to_strings;
}

/**
* @brief Get the length in bytes of this fixed length data.
*
* @return The length in bytes of the data type
*/
[[nodiscard]] int32_t get_type_length() const { return _type_length; }

/**
* @brief Get the number of child objects
*
Expand Down
2 changes: 2 additions & 0 deletions cpp/src/io/functions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -592,6 +592,8 @@ table_input_metadata::table_input_metadata(table_metadata const& metadata)
[&](column_name_info const& name) {
auto col_meta = column_in_metadata{name.name};
if (name.is_nullable.has_value()) { col_meta.set_nullability(name.is_nullable.value()); }
if (name.is_binary.value_or(false)) { col_meta.set_output_as_binary(true); }
if (name.type_length.has_value()) { col_meta.set_type_length(name.type_length.value()); }
std::transform(name.children.begin(),
name.children.end(),
std::back_inserter(col_meta.children),
Expand Down
32 changes: 23 additions & 9 deletions cpp/src/io/parquet/page_enc.cu
Original file line number Diff line number Diff line change
Expand Up @@ -109,10 +109,10 @@ using rle_page_enc_state_s = page_enc_state_s<rle_buffer_size>;
/**
* @brief Returns the size of the type in the Parquet file.
*/
constexpr uint32_t physical_type_len(Type physical_type, type_id id)
constexpr uint32_t physical_type_len(Type physical_type, type_id id, int type_length)
{
if (physical_type == FIXED_LEN_BYTE_ARRAY and id == type_id::DECIMAL128) {
return sizeof(__int128_t);
if (physical_type == FIXED_LEN_BYTE_ARRAY) {
return id == type_id::DECIMAL128 ? sizeof(__int128_t) : type_length;
}
switch (physical_type) {
case INT96: return 12u;
Expand Down Expand Up @@ -183,7 +183,7 @@ void __device__ calculate_frag_size(frag_init_state_s* const s, int t)

auto const physical_type = s->col.physical_type;
auto const leaf_type = s->col.leaf_column->type().id();
auto const dtype_len = physical_type_len(physical_type, leaf_type);
auto const dtype_len = physical_type_len(physical_type, leaf_type, s->col.type_length);
auto const nvals = s->frag.num_leaf_values;
auto const start_value_idx = s->frag.start_value_idx;

Expand Down Expand Up @@ -541,7 +541,8 @@ __device__ size_t delta_data_len(Type physical_type,
size_t page_size,
encode_kernel_mask encoding)
{
auto const dtype_len_out = physical_type_len(physical_type, type_id);
// dtype_len_out is for the lengths, rather than the char data, so pass sizeof(int32_t)
auto const dtype_len_out = physical_type_len(physical_type, type_id, sizeof(int32_t));
auto const dtype_len = [&]() -> uint32_t {
if (physical_type == INT32) { return int32_logical_len(type_id); }
if (physical_type == INT96) { return sizeof(int64_t); }
Expand Down Expand Up @@ -1662,7 +1663,7 @@ CUDF_KERNEL void __launch_bounds__(block_size, 8)
__syncthreads();
auto const physical_type = s->col.physical_type;
auto const type_id = s->col.leaf_column->type().id();
auto const dtype_len_out = physical_type_len(physical_type, type_id);
auto const dtype_len_out = physical_type_len(physical_type, type_id, s->col.type_length);
auto const dtype_len_in = [&]() -> uint32_t {
if (physical_type == INT32) { return int32_logical_len(type_id); }
if (physical_type == INT96) { return sizeof(int64_t); }
Expand Down Expand Up @@ -1837,6 +1838,19 @@ CUDF_KERNEL void __launch_bounds__(block_size, 8)
thrust::make_reverse_iterator(v_char_ptr),
dst + pos);
}
} else {
auto const elem =
get_element<statistics::byte_array_view>(*(s->col.leaf_column), val_idx);
if (len != 0 and elem.data() != nullptr) {
if (is_split_stream) {
auto const v_char_ptr = reinterpret_cast<uint8_t const*>(elem.data());
for (int i = 0; i < dtype_len_out; i++, pos += stride) {
dst[pos] = v_char_ptr[i];
}
} else {
memcpy(dst + pos, elem.data(), len);
}
}
}
} break;
}
Expand Down Expand Up @@ -1884,7 +1898,7 @@ CUDF_KERNEL void __launch_bounds__(block_size, 8)
// Encode data values
auto const physical_type = s->col.physical_type;
auto const type_id = s->col.leaf_column->type().id();
auto const dtype_len_out = physical_type_len(physical_type, type_id);
auto const dtype_len_out = physical_type_len(physical_type, type_id, s->col.type_length);
auto const dtype_len_in = [&]() -> uint32_t {
if (physical_type == INT32) { return int32_logical_len(type_id); }
if (physical_type == INT96) { return sizeof(int64_t); }
Expand Down Expand Up @@ -2016,7 +2030,7 @@ CUDF_KERNEL void __launch_bounds__(block_size, 8)
// Encode data values
auto const physical_type = s->col.physical_type;
auto const type_id = s->col.leaf_column->type().id();
auto const dtype_len_out = physical_type_len(physical_type, type_id);
auto const dtype_len_out = physical_type_len(physical_type, type_id, s->col.type_length);
auto const dtype_len_in = [&]() -> uint32_t {
if (physical_type == INT32) { return int32_logical_len(type_id); }
if (physical_type == INT96) { return sizeof(int64_t); }
Expand Down Expand Up @@ -3218,7 +3232,7 @@ __device__ int32_t calculate_boundary_order(statistics_chunk const* s,
}

// align ptr to an 8-byte boundary. address returned will be <= ptr.
constexpr __device__ void* align8(void* ptr)
inline __device__ void* align8(void* ptr)
{
// it's ok to round down because we have an extra 7 bytes in the buffer
auto algn = 3 & reinterpret_cast<std::uintptr_t>(ptr);
Expand Down
1 change: 1 addition & 0 deletions cpp/src/io/parquet/parquet_gpu.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -472,6 +472,7 @@ struct chunk_page_info {
struct parquet_column_device_view : stats_column_desc {
Type physical_type; //!< physical data type
ConvertedType converted_type; //!< logical data type
int32_t type_length; //!< length of fixed_length_byte_array data
uint8_t level_bits; //!< bits to encode max definition (lower nibble) & repetition (upper nibble)
//!< levels
constexpr uint8_t num_def_level_bits() const { return level_bits & 0xf; }
Expand Down
16 changes: 10 additions & 6 deletions cpp/src/io/parquet/reader_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -510,14 +510,18 @@ table_with_metadata reader::impl::read_chunk_internal(

// Create the final output cudf columns.
for (size_t i = 0; i < _output_buffers.size(); ++i) {
auto metadata = _reader_column_schema.has_value()
? std::make_optional<reader_column_schema>((*_reader_column_schema)[i])
: std::nullopt;
auto const& schema = _metadata->get_schema(_output_column_schemas[i]);
// FIXED_LEN_BYTE_ARRAY never read as string
if (schema.type == FIXED_LEN_BYTE_ARRAY and schema.converted_type != DECIMAL) {
auto metadata = _reader_column_schema.has_value()
? std::make_optional<reader_column_schema>((*_reader_column_schema)[i])
: std::nullopt;
auto const& schema = _metadata->get_schema(_output_column_schemas[i]);
auto const logical_type = schema.logical_type.value_or(LogicalType{});
// FIXED_LEN_BYTE_ARRAY never read as string.
// TODO: if we ever decide that the default reader behavior is to treat unannotated BINARY as
// binary and not strings, this test needs to change.
if (schema.type == FIXED_LEN_BYTE_ARRAY and logical_type.type != LogicalType::DECIMAL) {
metadata = std::make_optional<reader_column_schema>();
metadata->set_convert_binary_to_strings(false);
metadata->set_type_length(schema.type_length);
}
// Only construct `out_metadata` if `_output_metadata` has not been cached.
if (!_output_metadata) {
Expand Down
7 changes: 4 additions & 3 deletions cpp/src/io/parquet/reader_impl_helpers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -165,9 +165,10 @@ type_id to_type_id(SchemaElement const& schema,
case FLOAT: return type_id::FLOAT32;
case DOUBLE: return type_id::FLOAT64;
case BYTE_ARRAY:
case FIXED_LEN_BYTE_ARRAY:
// Can be mapped to INT32 (32-bit hash) or STRING
return strings_to_categorical ? type_id::INT32 : type_id::STRING;
// strings can be mapped to a 32-bit hash
if (strings_to_categorical) { return type_id::INT32; }
[[fallthrough]];
case FIXED_LEN_BYTE_ARRAY: return type_id::STRING;
case INT96:
return (timestamp_type_id != type_id::EMPTY) ? timestamp_type_id
: type_id::TIMESTAMP_NANOSECONDS;
Expand Down
13 changes: 10 additions & 3 deletions cpp/src/io/parquet/writer_impl.cu
Original file line number Diff line number Diff line change
Expand Up @@ -755,7 +755,14 @@ std::vector<schema_tree_node> construct_schema_tree(
}

schema_tree_node col_schema{};
col_schema.type = Type::BYTE_ARRAY;
// test if this should be output as FIXED_LEN_BYTE_ARRAY
if (col_meta.is_type_length_set()) {
col_schema.type = Type::FIXED_LEN_BYTE_ARRAY;
col_schema.type_length = col_meta.get_type_length();
} else {
col_schema.type = Type::BYTE_ARRAY;
}

col_schema.converted_type = thrust::nullopt;
col_schema.stats_dtype = statistics_dtype::dtype_byte_array;
col_schema.repetition_type = col_nullable ? OPTIONAL : REQUIRED;
Expand Down Expand Up @@ -1075,6 +1082,7 @@ parquet_column_device_view parquet_column_view::get_device_view(rmm::cuda_stream
auto desc = parquet_column_device_view{}; // Zero out all fields
desc.stats_dtype = schema_node.stats_dtype;
desc.ts_scale = schema_node.ts_scale;
desc.type_length = schema_node.type_length;

if (is_list()) {
desc.level_offsets = _dremel_offsets.data();
Expand Down Expand Up @@ -1317,8 +1325,7 @@ build_chunk_dictionaries(hostdevice_2dvector<EncColumnChunk>& chunks,
chunk_col_desc.requested_encoding != column_encoding::USE_DEFAULT &&
chunk_col_desc.requested_encoding != column_encoding::DICTIONARY;
auto const is_type_non_dict =
chunk_col_desc.physical_type == Type::BOOLEAN ||
(chunk_col_desc.output_as_byte_array && chunk_col_desc.physical_type == Type::BYTE_ARRAY);
chunk_col_desc.physical_type == Type::BOOLEAN || chunk_col_desc.output_as_byte_array;

if (is_type_non_dict || is_requested_non_dict) {
chunk.use_dictionary = false;
Expand Down
5 changes: 5 additions & 0 deletions cpp/src/io/utilities/column_buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,11 @@ std::unique_ptr<column> make_column(column_buffer_base<string_policy>& buffer,
if (schema_info != nullptr) {
schema_info->children.push_back(column_name_info{"offsets"});
schema_info->children.push_back(column_name_info{"binary"});
// cuDF type will be list<UINT8>, but remember it was originally binary data
schema_info->is_binary = true;
if (schema.has_value() and schema->get_type_length() > 0) {
schema_info->type_length = schema->get_type_length();
}
}

return make_lists_column(
Expand Down
86 changes: 86 additions & 0 deletions cpp/tests/io/parquet_writer_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1872,6 +1872,92 @@ TEST_F(ParquetWriterTest, DurationByteStreamSplit)
test_durations([](auto i) { return false; }, true);
}

TEST_F(ParquetWriterTest, WriteFixedLenByteArray)
{
srand(31337);
using cudf::io::parquet::detail::Encoding;
constexpr int fixed_width = 16;
constexpr cudf::size_type num_rows = 200;
std::vector<uint8_t> data(num_rows * fixed_width);
std::vector<cudf::size_type> offsets(num_rows + 1);

// fill a num_rows X fixed_width array with random numbers and populate offsets array
int cur_offset = 0;
for (int i = 0; i < num_rows; i++) {
offsets[i] = cur_offset;
for (int j = 0; j < fixed_width; j++, cur_offset++) {
data[cur_offset] = rand() & 0xff;
}
}
offsets[num_rows] = cur_offset;

auto data_child = cudf::test::fixed_width_column_wrapper<uint8_t>(data.begin(), data.end());
auto off_child = cudf::test::fixed_width_column_wrapper<int32_t>(offsets.begin(), offsets.end());
auto col = cudf::make_lists_column(num_rows, off_child.release(), data_child.release(), 0, {});

auto expected = table_view{{*col, *col, *col, *col}};
cudf::io::table_input_metadata expected_metadata(expected);

expected_metadata.column_metadata[0]
.set_name("flba_plain")
.set_type_length(fixed_width)
.set_encoding(cudf::io::column_encoding::PLAIN)
.set_output_as_binary(true);
expected_metadata.column_metadata[1]
.set_name("flba_split")
.set_type_length(fixed_width)
.set_encoding(cudf::io::column_encoding::BYTE_STREAM_SPLIT)
.set_output_as_binary(true);
expected_metadata.column_metadata[2]
.set_name("flba_delta")
.set_type_length(fixed_width)
.set_encoding(cudf::io::column_encoding::DELTA_BYTE_ARRAY)
.set_output_as_binary(true);
expected_metadata.column_metadata[3]
.set_name("flba_dict")
.set_type_length(fixed_width)
.set_encoding(cudf::io::column_encoding::DICTIONARY)
.set_output_as_binary(true);

auto filepath = temp_env->get_temp_filepath("WriteFixedLenByteArray.parquet");
cudf::io::parquet_writer_options out_opts =
cudf::io::parquet_writer_options::builder(cudf::io::sink_info{filepath}, expected)
.metadata(expected_metadata);
cudf::io::write_parquet(out_opts);

cudf::io::parquet_reader_options in_opts =
cudf::io::parquet_reader_options::builder(cudf::io::source_info{filepath});
auto result = cudf::io::read_parquet(in_opts);

CUDF_TEST_EXPECT_TABLES_EQUAL(expected, result.tbl->view());

// check page headers to make sure each column is encoded with the appropriate encoder
auto const source = cudf::io::datasource::create(filepath);
cudf::io::parquet::detail::FileMetaData fmd;
read_footer(source, &fmd);

// check that the schema retains the FIXED_LEN_BYTE_ARRAY type
for (int i = 1; i <= 4; i++) {
EXPECT_EQ(fmd.schema[i].type, cudf::io::parquet::detail::Type::FIXED_LEN_BYTE_ARRAY);
EXPECT_EQ(fmd.schema[i].type_length, fixed_width);
}

// no nulls and no repetition, so the only encoding used should be for the data.
auto const expect_enc = [&fmd](int idx, cudf::io::parquet::detail::Encoding enc) {
EXPECT_EQ(fmd.row_groups[0].columns[idx].meta_data.encodings[0], enc);
};

// requested plain
expect_enc(0, Encoding::PLAIN);
// requested byte_stream_split
expect_enc(1, Encoding::BYTE_STREAM_SPLIT);
// requested delta_byte_array
expect_enc(2, Encoding::DELTA_BYTE_ARRAY);
// requested dictionary, but should fall back to plain
// TODO: update if we get FLBA working with dictionary encoding
expect_enc(3, Encoding::PLAIN);
}

/////////////////////////////////////////////////////////////
// custom mem mapped data sink that supports device writes
template <bool supports_device_writes>
Expand Down

0 comments on commit 5f1f0dd

Please sign in to comment.