From 39defaef09e8f4d3e3080e952065d941ad649cc4 Mon Sep 17 00:00:00 2001 From: seidl Date: Thu, 7 Mar 2024 12:44:56 -0800 Subject: [PATCH 01/29] initial cut --- cpp/include/cudf/io/types.hpp | 1 + cpp/src/io/parquet/page_data.cu | 256 +++++++++++++++++++++++ cpp/src/io/parquet/page_decode.cuh | 1 + cpp/src/io/parquet/page_enc.cu | 103 +++++++-- cpp/src/io/parquet/page_hdr.cu | 3 + cpp/src/io/parquet/page_string_decode.cu | 14 +- cpp/src/io/parquet/parquet_gpu.hpp | 49 +++-- cpp/src/io/parquet/reader_impl.cpp | 11 + cpp/src/io/parquet/writer_impl.cu | 15 ++ 9 files changed, 419 insertions(+), 34 deletions(-) diff --git a/cpp/include/cudf/io/types.hpp b/cpp/include/cudf/io/types.hpp index 64d627483e6..713bd1413ea 100644 --- a/cpp/include/cudf/io/types.hpp +++ b/cpp/include/cudf/io/types.hpp @@ -113,6 +113,7 @@ enum class column_encoding { ///< valid for BYTE_ARRAY columns) DELTA_BYTE_ARRAY, ///< Use DELTA_BYTE_ARRAY encoding (only valid for ///< BYTE_ARRAY and FIXED_LEN_BYTE_ARRAY columns) + BYTE_STREAM_SPLIT, ///< Use BYTE_STREAM_SPLIT encoding (alid for all fixed width types) // ORC encodings: DIRECT, ///< Use DIRECT encoding DIRECT_V2, ///< Use DIRECT_V2 encoding diff --git a/cpp/src/io/parquet/page_data.cu b/cpp/src/io/parquet/page_data.cu index 79154851cc7..dab4662c657 100644 --- a/cpp/src/io/parquet/page_data.cu +++ b/cpp/src/io/parquet/page_data.cu @@ -406,6 +406,237 @@ static __device__ void gpuOutputGeneric( } } +/** + * @brief Kernel for computing the column data stored in the pages + * + * This function will write the page data and the page data's validity to the + * output specified in the page's column chunk. If necessary, additional + * conversion will be performed to translate from the Parquet datatype to + * desired output datatype (ex. 32-bit to 16-bit, string to hash). + * + * @param pages List of pages + * @param chunks List of column chunks + * @param min_row Row index to start reading at + * @param num_rows Maximum number of rows to read + * @param error_code Error code to set if an error is encountered + */ +template +CUDF_KERNEL void __launch_bounds__(decode_block_size) + gpuDecodeSplitPageData(PageInfo* pages, + device_span chunks, + size_t min_row, + size_t num_rows, + kernel_error::pointer error_code) +{ + __shared__ __align__(16) page_state_s state_g; + __shared__ __align__(16) + page_state_buffers_s + state_buffers; + + page_state_s* const s = &state_g; + auto* const sb = &state_buffers; + int page_idx = blockIdx.x; + int t = threadIdx.x; + int out_thread0; + [[maybe_unused]] null_count_back_copier _{s, t}; + + if (!setupLocalPageInfo(s, + &pages[page_idx], + chunks, + min_row, + num_rows, + mask_filter{decode_kernel_mask::BYTE_STREAM_SPLIT}, + page_processing_stage::DECODE)) { + return; + } + + bool const has_repetition = s->col.max_level[level_type::REPETITION] > 0; + + auto const data_len = thrust::distance(s->data_start, s->data_end); + auto const num_values = data_len / s->dtype_len_in; + + if (s->dict_base) { + out_thread0 = (s->dict_bits > 0) ? 64 : 32; + } else { + switch (s->col.data_type & 7) { + case BOOLEAN: [[fallthrough]]; + case BYTE_ARRAY: [[fallthrough]]; + case FIXED_LEN_BYTE_ARRAY: out_thread0 = 64; break; + default: out_thread0 = 32; + } + } + + PageNestingDecodeInfo* nesting_info_base = s->nesting_info; + + __shared__ level_t rep[rolling_buf_size]; // circular buffer of repetition level values + __shared__ level_t def[rolling_buf_size]; // circular buffer of definition level values + + // skipped_leaf_values will always be 0 for flat hierarchies. + uint32_t skipped_leaf_values = s->page.skipped_leaf_values; + while (s->error == 0 && + (s->input_value_count < s->num_input_values || s->src_pos < s->nz_count)) { + int target_pos; + int src_pos = s->src_pos; + + if (t < out_thread0) { + target_pos = min(src_pos + 2 * (decode_block_size - out_thread0), + s->nz_count + (decode_block_size - out_thread0)); + } else { + target_pos = min(s->nz_count, src_pos + decode_block_size - out_thread0); + if (out_thread0 > 32) { target_pos = min(target_pos, s->dict_pos); } + } + // this needs to be here to prevent warp 3 modifying src_pos before all threads have read it + __syncthreads(); + if (t < 32) { + // decode repetition and definition levels. + // - update validity vectors + // - updates offsets (for nested columns) + // - produces non-NULL value indices in s->nz_idx for subsequent decoding + gpuDecodeLevels(s, sb, target_pos, rep, def, t); + } else if (t < out_thread0) { + // skipped_leaf_values will always be 0 for flat hierarchies. + uint32_t src_target_pos = target_pos + skipped_leaf_values; + + // WARP1: Decode dictionary indices, booleans or string positions + // NOTE: racecheck complains of a RAW error involving the s->dict_pos assignment below. + // This is likely a false positive in practice, but could be solved by wrapping the next + // 9 lines in `if (s->dict_pos < src_target_pos) {}`. If that change is made here, it will + // be needed in the other DecodeXXX kernels. + if (s->dict_base) { + src_target_pos = gpuDecodeDictionaryIndices(s, sb, src_target_pos, t & 0x1f).first; + } else if ((s->col.data_type & 7) == BOOLEAN) { + src_target_pos = gpuDecodeRleBooleans(s, sb, src_target_pos, t & 0x1f); + } else if ((s->col.data_type & 7) == BYTE_ARRAY or + (s->col.data_type & 7) == FIXED_LEN_BYTE_ARRAY) { + gpuInitStringDescriptors(s, sb, src_target_pos, t & 0x1f); + } + if (t == 32) { s->dict_pos = src_target_pos; } + } else { + // WARP1..WARP3: Decode values + int const dtype = s->col.data_type & 7; + src_pos += t - out_thread0; + + // the position in the output column/buffer + int dst_pos = sb->nz_idx[rolling_index(src_pos)]; + + // for the flat hierarchy case we will be reading from the beginning of the value stream, + // regardless of the value of first_row. so adjust our destination offset accordingly. + // example: + // - user has passed skip_rows = 2, so our first_row to output is 2 + // - the row values we get from nz_idx will be + // 0, 1, 2, 3, 4 .... + // - by shifting these values by first_row, the sequence becomes + // -1, -2, 0, 1, 2 ... + // - so we will end up ignoring the first two input rows, and input rows 2..n will + // get written to the output starting at position 0. + // + if (!has_repetition) { dst_pos -= s->first_row; } + + // target_pos will always be properly bounded by num_rows, but dst_pos may be negative (values + // before first_row) in the flat hierarchy case. + if (src_pos < target_pos && dst_pos >= 0) { + // src_pos represents the logical row position we want to read from. But in the case of + // nested hierarchies, there is no 1:1 mapping of rows to values. So our true read position + // has to take into account the # of values we have to skip in the page to get to the + // desired logical row. For flat hierarchies, skipped_leaf_values will always be 0. + uint32_t val_src_pos = src_pos + skipped_leaf_values; + + // nesting level that is storing actual leaf values + int leaf_level_index = s->col.max_nesting_depth - 1; + + uint32_t dtype_len = s->dtype_len; + uint8_t* dst = + nesting_info_base[leaf_level_index].data_out + static_cast(dst_pos) * dtype_len; + + if (s->col.converted_type == DECIMAL) { + switch (dtype) { + case INT32: { + dst[0] = s->data_start[val_src_pos]; + dst[1] = s->data_start[val_src_pos + 1 * num_values]; + dst[2] = s->data_start[val_src_pos + 2 * num_values]; + dst[3] = s->data_start[val_src_pos + 3 * num_values]; + } break; + case INT64: { + dst[0] = s->data_start[val_src_pos]; + dst[1] = s->data_start[val_src_pos + 1 * num_values]; + dst[2] = s->data_start[val_src_pos + 2 * num_values]; + dst[3] = s->data_start[val_src_pos + 3 * num_values]; + dst[4] = s->data_start[val_src_pos + 4 * num_values]; + dst[5] = s->data_start[val_src_pos + 5 * num_values]; + dst[6] = s->data_start[val_src_pos + 6 * num_values]; + dst[7] = s->data_start[val_src_pos + 7 * num_values]; + } break; + default: + // FIXME(ets) + // need a new version of gpuOutputFixedLenByteArrayAsInt for BSS + // decimals in FLBA are in big endian order + if (s->dtype_len_in <= sizeof(int32_t)) { + int32_t unscaled = 0; + for (int i = 0; i < s->dtype_len_in; i++) { + unscaled = (unscaled << 8) | s->data_start[val_src_pos + i * num_values]; + } + if (s->dtype_len_in < sizeof(int32_t)) { + auto const shift = (sizeof(int32_t) - s->dtype_len_in) * 8; + unscaled <<= shift; + unscaled >>= shift; + } + *(reinterpret_cast(dst)) = unscaled; + } else if (s->dtype_len_in <= sizeof(int64_t)) { + gpuOutputFixedLenByteArrayAsInt( + s, sb, val_src_pos, reinterpret_cast(dst)); + } else { + gpuOutputFixedLenByteArrayAsInt( + s, sb, val_src_pos, reinterpret_cast<__int128_t*>(dst)); + } + break; + } + } else if (dtype == FIXED_LEN_BYTE_ARRAY) { + // TODO: float16 will end up here if we don't check for logical type + // FLBA without logical type will be handled by string decoder + } else if (dtype_len == 8) { + if (s->dtype_len_in == 4) { + // Reading INT32 TIME_MILLIS into 64-bit DURATION_MILLISECONDS + // TIME_MILLIS is the only duration type stored as int32: + // https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#deprecated-time-convertedtype + dst[0] = 0; + dst[1] = 0; + dst[2] = 0; + dst[3] = 0; + dst[4] = s->data_start[val_src_pos]; + dst[5] = s->data_start[val_src_pos + 1 * num_values]; + dst[6] = s->data_start[val_src_pos + 2 * num_values]; + dst[7] = s->data_start[val_src_pos + 3 * num_values]; + } else if (s->ts_scale) { + // TODO: implement this + gpuOutputInt64Timestamp(s, sb, val_src_pos, reinterpret_cast(dst)); + } else { + dst[0] = s->data_start[val_src_pos]; + dst[1] = s->data_start[val_src_pos + 1 * num_values]; + dst[2] = s->data_start[val_src_pos + 2 * num_values]; + dst[3] = s->data_start[val_src_pos + 3 * num_values]; + dst[4] = s->data_start[val_src_pos + 4 * num_values]; + dst[5] = s->data_start[val_src_pos + 5 * num_values]; + dst[6] = s->data_start[val_src_pos + 6 * num_values]; + dst[7] = s->data_start[val_src_pos + 7 * num_values]; + } + } else if (dtype_len == 4) { + dst[0] = s->data_start[val_src_pos]; + dst[1] = s->data_start[val_src_pos + 1 * num_values]; + dst[2] = s->data_start[val_src_pos + 2 * num_values]; + dst[3] = s->data_start[val_src_pos + 3 * num_values]; + } else { + // FIXME: what would this be anyway???? + gpuOutputGeneric(s, sb, val_src_pos, static_cast(dst), dtype_len); + } + } + + if (t == out_thread0) { s->src_pos = target_pos; } + } + __syncthreads(); + } + if (t == 0 and s->error != 0) { set_error(s->error, error_code); } +} + /** * @brief Kernel for computing the column data stored in the pages * @@ -643,4 +874,29 @@ void __host__ DecodePageData(cudf::detail::hostdevice_span pages, } } +/** + * @copydoc cudf::io::parquet::detail::DecodePageData + */ +void __host__ DecodeSplitPageData(cudf::detail::hostdevice_span& pages, + cudf::detail::hostdevice_span const& chunks, + size_t num_rows, + size_t min_row, + int level_type_size, + kernel_error::pointer error_code, + rmm::cuda_stream_view stream) +{ + CUDF_EXPECTS(pages.size() > 0, "There is no page to decode"); + + dim3 dim_block(decode_block_size, 1); + dim3 dim_grid(pages.size(), 1); // 1 threadblock per page + + if (level_type_size == 1) { + gpuDecodeSplitPageData<<>>( + pages.device_ptr(), chunks, min_row, num_rows, error_code); + } else { + gpuDecodeSplitPageData<<>>( + pages.device_ptr(), chunks, min_row, num_rows, error_code); + } +} + } // namespace cudf::io::parquet::detail diff --git a/cpp/src/io/parquet/page_decode.cuh b/cpp/src/io/parquet/page_decode.cuh index cf3e1911496..23af41f0446 100644 --- a/cpp/src/io/parquet/page_decode.cuh +++ b/cpp/src/io/parquet/page_decode.cuh @@ -1314,6 +1314,7 @@ inline __device__ bool setupLocalPageInfo(page_state_s* const s, } break; case Encoding::PLAIN: + case Encoding::BYTE_STREAM_SPLIT: s->dict_size = static_cast(end - cur); s->dict_val = 0; if ((s->col.data_type & 7) == BOOLEAN) { s->dict_run = s->dict_size * 2 + 1; } diff --git a/cpp/src/io/parquet/page_enc.cu b/cpp/src/io/parquet/page_enc.cu index fb17545875a..781276c506e 100644 --- a/cpp/src/io/parquet/page_enc.cu +++ b/cpp/src/io/parquet/page_enc.cu @@ -514,6 +514,7 @@ __device__ encode_kernel_mask data_encoding_for_col(EncColumnChunk const* chunk, case column_encoding::DELTA_BINARY_PACKED: return encode_kernel_mask::DELTA_BINARY; case column_encoding::DELTA_LENGTH_BYTE_ARRAY: return encode_kernel_mask::DELTA_LENGTH_BA; case column_encoding::DELTA_BYTE_ARRAY: return encode_kernel_mask::DELTA_BYTE_ARRAY; + case column_encoding::BYTE_STREAM_SPLIT: return encode_kernel_mask::BYTE_STREAM_SPLIT; } } @@ -1608,7 +1609,7 @@ __device__ void finish_page_encode(state_buf* s, // PLAIN page data encoder // blockDim(128, 1, 1) -template +template CUDF_KERNEL void __launch_bounds__(block_size, 8) gpuEncodePages(device_span pages, device_span> comp_in, @@ -1634,7 +1635,11 @@ CUDF_KERNEL void __launch_bounds__(block_size, 8) } __syncthreads(); - if (BitAnd(s->page.kernel_mask, encode_kernel_mask::PLAIN) == 0) { return; } + if constexpr (is_split_stream) { + if (BitAnd(s->page.kernel_mask, encode_kernel_mask::BYTE_STREAM_SPLIT) == 0) { return; } + } else { + if (BitAnd(s->page.kernel_mask, encode_kernel_mask::PLAIN) == 0) { return; } + } // Encode data values __syncthreads(); @@ -1706,6 +1711,13 @@ CUDF_KERNEL void __launch_bounds__(block_size, 8) uint32_t total_len = 0; block_scan(scan_storage).ExclusiveSum(len, pos, total_len); __syncthreads(); + + // if BYTE_STREAM_SPLIT, then translate byte positions to indexes + if constexpr (is_split_stream) { + pos /= dtype_len_out; + total_len /= dtype_len_out; + } + if (t == 0) { s->cur = dst + total_len; } if (is_valid) { switch (physical_type) { @@ -1723,10 +1735,18 @@ CUDF_KERNEL void __launch_bounds__(block_size, 8) } }(); - dst[pos + 0] = v; - dst[pos + 1] = v >> 8; - dst[pos + 2] = v >> 16; - dst[pos + 3] = v >> 24; + if constexpr (is_split_stream) { + auto const num_valid = s->page.num_valid; + dst[pos + 0 * num_valid] = v >> 24; + dst[pos + 1 * num_valid] = v >> 16; + dst[pos + 2 * num_valid] = v >> 8; + dst[pos + 3 * num_valid] = v; + } else { + dst[pos + 0] = v; + dst[pos + 1] = v >> 8; + dst[pos + 2] = v >> 16; + dst[pos + 3] = v >> 24; + } } break; case INT64: { int64_t v = s->col.leaf_column->element(val_idx); @@ -1738,16 +1758,29 @@ CUDF_KERNEL void __launch_bounds__(block_size, 8) v *= ts_scale; } } - dst[pos + 0] = v; - dst[pos + 1] = v >> 8; - dst[pos + 2] = v >> 16; - dst[pos + 3] = v >> 24; - dst[pos + 4] = v >> 32; - dst[pos + 5] = v >> 40; - dst[pos + 6] = v >> 48; - dst[pos + 7] = v >> 56; + if constexpr (is_split_stream) { + auto const num_valid = s->page.num_valid; + dst[pos + 0 * num_valid] = v >> 56; + dst[pos + 1 * num_valid] = v >> 48; + dst[pos + 2 * num_valid] = v >> 40; + dst[pos + 3 * num_valid] = v >> 32; + dst[pos + 4 * num_valid] = v >> 24; + dst[pos + 5 * num_valid] = v >> 16; + dst[pos + 6 * num_valid] = v >> 8; + dst[pos + 7 * num_valid] = v; + } else { + dst[pos + 0] = v; + dst[pos + 1] = v >> 8; + dst[pos + 2] = v >> 16; + dst[pos + 3] = v >> 24; + dst[pos + 4] = v >> 32; + dst[pos + 5] = v >> 40; + dst[pos + 6] = v >> 48; + dst[pos + 7] = v >> 56; + } } break; case INT96: { + // only PLAIN encoding is supported int64_t v = s->col.leaf_column->element(val_idx); int32_t ts_scale = s->col.ts_scale; if (ts_scale != 0) { @@ -1791,10 +1824,24 @@ CUDF_KERNEL void __launch_bounds__(block_size, 8) } break; case DOUBLE: { - auto v = s->col.leaf_column->element(val_idx); - memcpy(dst + pos, &v, 8); + if (is_split_stream) { + int64_t const v = static_cast(s->col.leaf_column->element(val_idx)); + auto const num_valid = s->page.num_valid; + dst[pos + 0 * num_valid] = v >> 56; + dst[pos + 1 * num_valid] = v >> 48; + dst[pos + 2 * num_valid] = v >> 40; + dst[pos + 3 * num_valid] = v >> 32; + dst[pos + 4 * num_valid] = v >> 24; + dst[pos + 5 * num_valid] = v >> 16; + dst[pos + 6 * num_valid] = v >> 8; + dst[pos + 7 * num_valid] = v; + } else { + auto v = s->col.leaf_column->element(val_idx); + memcpy(dst + pos, &v, 8); + } } break; case BYTE_ARRAY: { + // only PLAIN encoding is supported auto const bytes = [](cudf::type_id const type_id, column_device_view const* leaf_column, uint32_t const val_idx) -> void const* { @@ -1820,10 +1867,17 @@ CUDF_KERNEL void __launch_bounds__(block_size, 8) // When using FIXED_LEN_BYTE_ARRAY for decimals, the rep is encoded in big-endian auto const v = s->col.leaf_column->element(val_idx).value(); auto const v_char_ptr = reinterpret_cast(&v); - thrust::copy(thrust::seq, - thrust::make_reverse_iterator(v_char_ptr + sizeof(v)), - thrust::make_reverse_iterator(v_char_ptr), - dst + pos); + if constexpr (is_split_stream) { + auto const num_valid = s->page.num_valid; + for (int i = 0; i < dtype_len_out; i++, pos += num_valid) { + dst[pos] = v_char_ptr[i]; + } + } else { + thrust::copy(thrust::seq, + thrust::make_reverse_iterator(v_char_ptr + sizeof(v)), + thrust::make_reverse_iterator(v_char_ptr), + dst + pos); + } } } break; } @@ -3412,7 +3466,14 @@ void EncodePages(device_span pages, auto const strm = streams[s_idx++]; gpuEncodePageLevels<<>>( pages, write_v2_headers, encode_kernel_mask::PLAIN); - gpuEncodePages<<>>( + gpuEncodePages<<>>( + pages, comp_in, comp_out, comp_results, write_v2_headers); + } + if (BitAnd(kernel_mask, encode_kernel_mask::BYTE_STREAM_SPLIT) != 0) { + auto const strm = streams[s_idx++]; + gpuEncodePageLevels<<>>( + pages, write_v2_headers, encode_kernel_mask::BYTE_STREAM_SPLIT); + gpuEncodePages<<>>( pages, comp_in, comp_out, comp_results, write_v2_headers); } if (BitAnd(kernel_mask, encode_kernel_mask::DELTA_BINARY) != 0) { diff --git a/cpp/src/io/parquet/page_hdr.cu b/cpp/src/io/parquet/page_hdr.cu index f502fc837d6..f087fb9ee5a 100644 --- a/cpp/src/io/parquet/page_hdr.cu +++ b/cpp/src/io/parquet/page_hdr.cu @@ -159,7 +159,10 @@ __device__ decode_kernel_mask kernel_mask_for_page(PageInfo const& page, } else if (page.encoding == Encoding::DELTA_LENGTH_BYTE_ARRAY) { return decode_kernel_mask::DELTA_LENGTH_BA; } else if (is_string_col(chunk)) { + // check for string before byte_stream_split so FLBA will go to the right kernel return decode_kernel_mask::STRING; + } else if (page.encoding == Encoding::BYTE_STREAM_SPLIT) { + return decode_kernel_mask::BYTE_STREAM_SPLIT; } // non-string, non-delta diff --git a/cpp/src/io/parquet/page_string_decode.cu b/cpp/src/io/parquet/page_string_decode.cu index a0dfaa2fa58..92412f68057 100644 --- a/cpp/src/io/parquet/page_string_decode.cu +++ b/cpp/src/io/parquet/page_string_decode.cu @@ -1065,7 +1065,19 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size) cub::WarpScan(temp_storage).ExclusiveSum(len, offset); offset += last_offset; - if (use_char_ll) { + if (s->page.encoding == Encoding::BYTE_STREAM_SPLIT) { + if (src_pos + i < target_pos && dst_pos >= 0) { + auto const stride = s->page.str_bytes / s->dtype_len_in; + auto offptr = + reinterpret_cast(nesting_info_base[leaf_level_index].data_out) + dst_pos; + *offptr = len; + auto str_ptr = nesting_info_base[leaf_level_index].string_out + offset; + for (int ii = 0; ii < s->dtype_len_in; ii++) { + str_ptr[ii] = s->data_start[src_pos + i + ii * stride]; + } + } + __syncwarp(); + } else if (use_char_ll) { __shared__ __align__(8) uint8_t const* pointers[warp_size]; __shared__ __align__(4) size_type offsets[warp_size]; __shared__ __align__(4) int dsts[warp_size]; diff --git a/cpp/src/io/parquet/parquet_gpu.hpp b/cpp/src/io/parquet/parquet_gpu.hpp index ca7334be216..bf9f0652aca 100644 --- a/cpp/src/io/parquet/parquet_gpu.hpp +++ b/cpp/src/io/parquet/parquet_gpu.hpp @@ -81,7 +81,8 @@ constexpr bool is_supported_encoding(Encoding enc) case Encoding::RLE_DICTIONARY: case Encoding::DELTA_BINARY_PACKED: case Encoding::DELTA_LENGTH_BYTE_ARRAY: - case Encoding::DELTA_BYTE_ARRAY: return true; + case Encoding::DELTA_BYTE_ARRAY: + case Encoding::BYTE_STREAM_SPLIT: return true; default: return false; } } @@ -199,12 +200,13 @@ enum level_type { * Used to control which decode kernels to run. */ enum class decode_kernel_mask { - NONE = 0, - GENERAL = (1 << 0), // Run catch-all decode kernel - STRING = (1 << 1), // Run decode kernel for string data - DELTA_BINARY = (1 << 2), // Run decode kernel for DELTA_BINARY_PACKED data - DELTA_BYTE_ARRAY = (1 << 3), // Run decode kernel for DELTA_BYTE_ARRAY encoded data - DELTA_LENGTH_BA = (1 << 4), // Run decode kernel for DELTA_LENGTH_BYTE_ARRAY encoded data + NONE = 0, + GENERAL = (1 << 0), // Run catch-all decode kernel + STRING = (1 << 1), // Run decode kernel for string data + DELTA_BINARY = (1 << 2), // Run decode kernel for DELTA_BINARY_PACKED data + DELTA_BYTE_ARRAY = (1 << 3), // Run decode kernel for DELTA_BYTE_ARRAY encoded data + DELTA_LENGTH_BA = (1 << 4), // Run decode kernel for DELTA_LENGTH_BYTE_ARRAY encoded data + BYTE_STREAM_SPLIT = (1 << 5), // Run decode kernel for BYTE_STREAM_SPLIT encoded data }; // mask representing all the ways in which a string can be encoded @@ -514,11 +516,12 @@ constexpr uint32_t encoding_to_mask(Encoding encoding) * Used to control which encode kernels to run. */ enum class encode_kernel_mask { - PLAIN = (1 << 0), // Run plain encoding kernel - DICTIONARY = (1 << 1), // Run dictionary encoding kernel - DELTA_BINARY = (1 << 2), // Run DELTA_BINARY_PACKED encoding kernel - DELTA_LENGTH_BA = (1 << 3), // Run DELTA_LENGTH_BYTE_ARRAY encoding kernel - DELTA_BYTE_ARRAY = (1 << 4), // Run DELTA_BYtE_ARRAY encoding kernel + PLAIN = (1 << 0), // Run plain encoding kernel + DICTIONARY = (1 << 1), // Run dictionary encoding kernel + DELTA_BINARY = (1 << 2), // Run DELTA_BINARY_PACKED encoding kernel + DELTA_LENGTH_BA = (1 << 3), // Run DELTA_LENGTH_BYTE_ARRAY encoding kernel + DELTA_BYTE_ARRAY = (1 << 4), // Run DELTA_BYtE_ARRAY encoding kernel + BYTE_STREAM_SPLIT = (1 << 5), // Run plain encoding kernel, but split streams }; /** @@ -741,6 +744,28 @@ void DecodePageData(cudf::detail::hostdevice_span pages, kernel_error::pointer error_code, rmm::cuda_stream_view stream); +/** + * @brief Launches kernel for reading the BYTE_STREAM_SPLIT column data stored in the pages + * + * The page data will be written to the output pointed to in the page's + * associated column chunk. + * + * @param[in,out] pages All pages to be decoded + * @param[in] chunks All chunks to be decoded + * @param[in] num_rows Total number of rows to read + * @param[in] min_row Minimum number of rows to read + * @param[in] level_type_size Size in bytes of the type for level decoding + * @param[out] error_code Error code for kernel failures + * @param[in] stream CUDA stream to use + */ +void DecodeSplitPageData(cudf::detail::hostdevice_span& pages, + cudf::detail::hostdevice_span const& chunks, + size_t num_rows, + size_t min_row, + int level_type_size, + kernel_error::pointer error_code, + rmm::cuda_stream_view stream); + /** * @brief Launches kernel for reading the string column data stored in the pages * diff --git a/cpp/src/io/parquet/reader_impl.cpp b/cpp/src/io/parquet/reader_impl.cpp index 89562514564..8adccb3107d 100644 --- a/cpp/src/io/parquet/reader_impl.cpp +++ b/cpp/src/io/parquet/reader_impl.cpp @@ -237,6 +237,17 @@ void reader::impl::decode_page_data(bool uses_custom_row_bounds, size_t skip_row streams[s_idx++]); } + // launch byte stream split decoder + if (BitAnd(kernel_mask, decode_kernel_mask::BYTE_STREAM_SPLIT) != 0) { + DecodeSplitPageData(subpass.pages, + pass.chunks, + num_rows, + skip_rows, + level_type_size, + error_code.data(), + streams[s_idx++]); + } + // launch the catch-all page decoder if (BitAnd(kernel_mask, decode_kernel_mask::GENERAL) != 0) { DecodePageData(subpass.pages, diff --git a/cpp/src/io/parquet/writer_impl.cu b/cpp/src/io/parquet/writer_impl.cu index 5a8d96975ce..a7e1ddc81b6 100644 --- a/cpp/src/io/parquet/writer_impl.cu +++ b/cpp/src/io/parquet/writer_impl.cu @@ -657,6 +657,21 @@ std::vector construct_schema_tree( } break; + case column_encoding::BYTE_STREAM_SPLIT: + if (s.type == Type::BYTE_ARRAY) { + CUDF_LOG_WARN( + "BYTE_STREAM_SPLIT encoding is only supported for fixed width columns; the " + "requested encoding will be ignored"); + return; + } + if (s.type == Type::INT96) { + CUDF_LOG_WARN( + "BYTE_STREAM_SPLIT encoding is not supported for INT96 columns; the " + "requested encoding will be ignored"); + return; + } + break; + // supported parquet encodings case column_encoding::PLAIN: case column_encoding::DICTIONARY: break; From a2bf4c5cf47ab1299926abb6a8500ce61d8426e3 Mon Sep 17 00:00:00 2001 From: seidl Date: Fri, 8 Mar 2024 12:44:11 -0800 Subject: [PATCH 02/29] checkpoint --- cpp/src/io/parquet/page_data.cu | 107 ++++++++++++++++---------------- 1 file changed, 52 insertions(+), 55 deletions(-) diff --git a/cpp/src/io/parquet/page_data.cu b/cpp/src/io/parquet/page_data.cu index dab4662c657..bbe4bdfbf72 100644 --- a/cpp/src/io/parquet/page_data.cu +++ b/cpp/src/io/parquet/page_data.cu @@ -406,6 +406,31 @@ static __device__ void gpuOutputGeneric( } } +template +__device__ inline void gpuOutputByteStreamSplit(uint8_t* dst, uint8_t const* src, size_type stride) +{ + for (int i = 0; i < byte_length; i++) { + dst[i] = src[i * stride]; + } +} + +inline __device__ void gpuOutputSplitInt64Timestamp(int64_t* dst, + uint8_t const* src, + size_type stride, + int32_t ts_scale) +{ + gpuOutputByteStreamSplit(reinterpret_cast(dst), src, stride); + if (ts_scale < 0) { + // round towards negative infinity + int sign = (*dst < 0); + *dst = ((*dst + sign) / -ts_scale) + sign; + } else { + *dst = *dst * ts_scale; + } +} + +// TODO(ets): is this better as a standalone, or as part of the plain/dict decoder? +// how does this work with the new microkernels? /** * @brief Kernel for computing the column data stored in the pages * @@ -459,8 +484,6 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size) out_thread0 = (s->dict_bits > 0) ? 64 : 32; } else { switch (s->col.data_type & 7) { - case BOOLEAN: [[fallthrough]]; - case BYTE_ARRAY: [[fallthrough]]; case FIXED_LEN_BYTE_ARRAY: out_thread0 = 64; break; default: out_thread0 = 32; } @@ -497,19 +520,13 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size) // skipped_leaf_values will always be 0 for flat hierarchies. uint32_t src_target_pos = target_pos + skipped_leaf_values; - // WARP1: Decode dictionary indices, booleans or string positions + // WARP1: Decode string positions // NOTE: racecheck complains of a RAW error involving the s->dict_pos assignment below. // This is likely a false positive in practice, but could be solved by wrapping the next // 9 lines in `if (s->dict_pos < src_target_pos) {}`. If that change is made here, it will // be needed in the other DecodeXXX kernels. - if (s->dict_base) { - src_target_pos = gpuDecodeDictionaryIndices(s, sb, src_target_pos, t & 0x1f).first; - } else if ((s->col.data_type & 7) == BOOLEAN) { - src_target_pos = gpuDecodeRleBooleans(s, sb, src_target_pos, t & 0x1f); - } else if ((s->col.data_type & 7) == BYTE_ARRAY or - (s->col.data_type & 7) == FIXED_LEN_BYTE_ARRAY) { - gpuInitStringDescriptors(s, sb, src_target_pos, t & 0x1f); - } + gpuInitStringDescriptors(s, sb, src_target_pos, t & 0x1f); + if (t == 32) { s->dict_pos = src_target_pos; } } else { // WARP1..WARP3: Decode values @@ -548,25 +565,18 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size) uint8_t* dst = nesting_info_base[leaf_level_index].data_out + static_cast(dst_pos) * dtype_len; + // Note: non-decimal FIXED_LEN_BYTE_ARRAY will be handled in the string reader if (s->col.converted_type == DECIMAL) { switch (dtype) { - case INT32: { - dst[0] = s->data_start[val_src_pos]; - dst[1] = s->data_start[val_src_pos + 1 * num_values]; - dst[2] = s->data_start[val_src_pos + 2 * num_values]; - dst[3] = s->data_start[val_src_pos + 3 * num_values]; - } break; - case INT64: { - dst[0] = s->data_start[val_src_pos]; - dst[1] = s->data_start[val_src_pos + 1 * num_values]; - dst[2] = s->data_start[val_src_pos + 2 * num_values]; - dst[3] = s->data_start[val_src_pos + 3 * num_values]; - dst[4] = s->data_start[val_src_pos + 4 * num_values]; - dst[5] = s->data_start[val_src_pos + 5 * num_values]; - dst[6] = s->data_start[val_src_pos + 6 * num_values]; - dst[7] = s->data_start[val_src_pos + 7 * num_values]; - } break; - default: + case INT32: + gpuOutputByteStreamSplit( + dst, s->data_start + val_src_pos, num_values); + break; + case INT64: + gpuOutputByteStreamSplit( + dst, s->data_start + val_src_pos, num_values); + break; + case FIXED_LEN_BYTE_ARRAY: // FIXME(ets) // need a new version of gpuOutputFixedLenByteArrayAsInt for BSS // decimals in FLBA are in big endian order @@ -589,44 +599,31 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size) s, sb, val_src_pos, reinterpret_cast<__int128_t*>(dst)); } break; + + default: s->set_error_code(decode_error::UNSUPPORTED_ENCODING); } - } else if (dtype == FIXED_LEN_BYTE_ARRAY) { - // TODO: float16 will end up here if we don't check for logical type - // FLBA without logical type will be handled by string decoder } else if (dtype_len == 8) { if (s->dtype_len_in == 4) { + gpuOutputByteStreamSplit(dst, s->data_start + val_src_pos, num_values); // Reading INT32 TIME_MILLIS into 64-bit DURATION_MILLISECONDS // TIME_MILLIS is the only duration type stored as int32: // https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#deprecated-time-convertedtype - dst[0] = 0; - dst[1] = 0; - dst[2] = 0; - dst[3] = 0; - dst[4] = s->data_start[val_src_pos]; - dst[5] = s->data_start[val_src_pos + 1 * num_values]; - dst[6] = s->data_start[val_src_pos + 2 * num_values]; - dst[7] = s->data_start[val_src_pos + 3 * num_values]; + dst[4] = 0; + dst[5] = 0; + dst[6] = 0; + dst[7] = 0; } else if (s->ts_scale) { - // TODO: implement this - gpuOutputInt64Timestamp(s, sb, val_src_pos, reinterpret_cast(dst)); + gpuOutputSplitInt64Timestamp(reinterpret_cast(dst), + s->data_start + val_src_pos, + num_values, + s->ts_scale); } else { - dst[0] = s->data_start[val_src_pos]; - dst[1] = s->data_start[val_src_pos + 1 * num_values]; - dst[2] = s->data_start[val_src_pos + 2 * num_values]; - dst[3] = s->data_start[val_src_pos + 3 * num_values]; - dst[4] = s->data_start[val_src_pos + 4 * num_values]; - dst[5] = s->data_start[val_src_pos + 5 * num_values]; - dst[6] = s->data_start[val_src_pos + 6 * num_values]; - dst[7] = s->data_start[val_src_pos + 7 * num_values]; + gpuOutputByteStreamSplit(dst, s->data_start + val_src_pos, num_values); } } else if (dtype_len == 4) { - dst[0] = s->data_start[val_src_pos]; - dst[1] = s->data_start[val_src_pos + 1 * num_values]; - dst[2] = s->data_start[val_src_pos + 2 * num_values]; - dst[3] = s->data_start[val_src_pos + 3 * num_values]; + gpuOutputByteStreamSplit(dst, s->data_start + val_src_pos, num_values); } else { - // FIXME: what would this be anyway???? - gpuOutputGeneric(s, sb, val_src_pos, static_cast(dst), dtype_len); + s->set_error_code(decode_error::UNSUPPORTED_ENCODING); } } From fec05e948525f77d0ebfb8c3a41dd66be82aeb0a Mon Sep 17 00:00:00 2001 From: Ed Seidl Date: Fri, 8 Mar 2024 09:26:19 -0800 Subject: [PATCH 03/29] leave room for new microkernels --- cpp/src/io/parquet/parquet_gpu.hpp | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/cpp/src/io/parquet/parquet_gpu.hpp b/cpp/src/io/parquet/parquet_gpu.hpp index bf9f0652aca..c9ae43baba1 100644 --- a/cpp/src/io/parquet/parquet_gpu.hpp +++ b/cpp/src/io/parquet/parquet_gpu.hpp @@ -200,13 +200,15 @@ enum level_type { * Used to control which decode kernels to run. */ enum class decode_kernel_mask { - NONE = 0, - GENERAL = (1 << 0), // Run catch-all decode kernel - STRING = (1 << 1), // Run decode kernel for string data - DELTA_BINARY = (1 << 2), // Run decode kernel for DELTA_BINARY_PACKED data - DELTA_BYTE_ARRAY = (1 << 3), // Run decode kernel for DELTA_BYTE_ARRAY encoded data - DELTA_LENGTH_BA = (1 << 4), // Run decode kernel for DELTA_LENGTH_BYTE_ARRAY encoded data - BYTE_STREAM_SPLIT = (1 << 5), // Run decode kernel for BYTE_STREAM_SPLIT encoded data + NONE = 0, + GENERAL = (1 << 0), // Run catch-all decode kernel + STRING = (1 << 1), // Run decode kernel for string data + DELTA_BINARY = (1 << 2), // Run decode kernel for DELTA_BINARY_PACKED data + DELTA_BYTE_ARRAY = (1 << 3), // Run decode kernel for DELTA_BYTE_ARRAY encoded data + DELTA_LENGTH_BA = (1 << 4), // Run decode kernel for DELTA_LENGTH_BYTE_ARRAY encoded data + FIXED_WIDTH_NO_DICT = (1 << 5), // Run decode kernel for fixed width non-dictionary pages + FIXED_WIDTH_DICT = (1 << 6), // Run decode kernel for fixed width dictionary pages + BYTE_STREAM_SPLIT = (1 << 7), // Run decode kernel for BYTE_STREAM_SPLIT encoded data }; // mask representing all the ways in which a string can be encoded From fe10804d889e8b850b88b22d7fe23615cb29d824 Mon Sep 17 00:00:00 2001 From: Ed Seidl Date: Fri, 8 Mar 2024 13:12:51 -0800 Subject: [PATCH 04/29] checkpoint --- cpp/src/io/parquet/page_data.cu | 56 +++++++++++++++++++-------------- 1 file changed, 32 insertions(+), 24 deletions(-) diff --git a/cpp/src/io/parquet/page_data.cu b/cpp/src/io/parquet/page_data.cu index bbe4bdfbf72..066142ffc6a 100644 --- a/cpp/src/io/parquet/page_data.cu +++ b/cpp/src/io/parquet/page_data.cu @@ -429,6 +429,23 @@ inline __device__ void gpuOutputSplitInt64Timestamp(int64_t* dst, } } +template +__device__ void gpuOutputSplitFixedLenByteArrayAsInt(T* dst, uint8_t const* src, size_type stride, uint32_t dtype_len_in) +{ + T unscaled = 0; + // fixed_len_byte_array decimals are big endian + for (unsigned int i = 0; i < dtype_len_in; i++) { + unscaled = (unscaled << 8) | src[i * stride]; + } + // Shift the unscaled value up and back down when it isn't all 8 bytes, + // which sign extend the value for correctly representing negative numbers. + if (dtype_len_in < sizeof(T)) { + unscaled <<= (sizeof(T) - dtype_len_in) * 8; + unscaled >>= (sizeof(T) - dtype_len_in) * 8; + } + *dst = unscaled; +} + // TODO(ets): is this better as a standalone, or as part of the plain/dict decoder? // how does this work with the new microkernels? /** @@ -562,6 +579,7 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size) int leaf_level_index = s->col.max_nesting_depth - 1; uint32_t dtype_len = s->dtype_len; + uint8_t const* src = s->data_start + val_src_pos; uint8_t* dst = nesting_info_base[leaf_level_index].data_out + static_cast(dst_pos) * dtype_len; @@ -570,58 +588,48 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size) switch (dtype) { case INT32: gpuOutputByteStreamSplit( - dst, s->data_start + val_src_pos, num_values); + dst, src, num_values); break; case INT64: gpuOutputByteStreamSplit( - dst, s->data_start + val_src_pos, num_values); + dst, src, num_values); break; case FIXED_LEN_BYTE_ARRAY: - // FIXME(ets) - // need a new version of gpuOutputFixedLenByteArrayAsInt for BSS - // decimals in FLBA are in big endian order if (s->dtype_len_in <= sizeof(int32_t)) { - int32_t unscaled = 0; - for (int i = 0; i < s->dtype_len_in; i++) { - unscaled = (unscaled << 8) | s->data_start[val_src_pos + i * num_values]; - } - if (s->dtype_len_in < sizeof(int32_t)) { - auto const shift = (sizeof(int32_t) - s->dtype_len_in) * 8; - unscaled <<= shift; - unscaled >>= shift; - } - *(reinterpret_cast(dst)) = unscaled; + gpuOutputSplitFixedLenByteArrayAsInt(reinterpret_cast(dst), + src, num_values, s->dtype_len_in); } else if (s->dtype_len_in <= sizeof(int64_t)) { - gpuOutputFixedLenByteArrayAsInt( - s, sb, val_src_pos, reinterpret_cast(dst)); + gpuOutputSplitFixedLenByteArrayAsInt(reinterpret_cast(dst), + src, num_values, s->dtype_len_in); } else { - gpuOutputFixedLenByteArrayAsInt( - s, sb, val_src_pos, reinterpret_cast<__int128_t*>(dst)); + gpuOutputSplitFixedLenByteArrayAsInt(reinterpret_cast<__int128_t*>(dst), + src, num_values, s->dtype_len_in); } - break; + // unsupported decimal precision + [[fallthrough]]; default: s->set_error_code(decode_error::UNSUPPORTED_ENCODING); } } else if (dtype_len == 8) { if (s->dtype_len_in == 4) { - gpuOutputByteStreamSplit(dst, s->data_start + val_src_pos, num_values); // Reading INT32 TIME_MILLIS into 64-bit DURATION_MILLISECONDS // TIME_MILLIS is the only duration type stored as int32: // https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#deprecated-time-convertedtype + gpuOutputByteStreamSplit(dst, src, num_values); dst[4] = 0; dst[5] = 0; dst[6] = 0; dst[7] = 0; } else if (s->ts_scale) { gpuOutputSplitInt64Timestamp(reinterpret_cast(dst), - s->data_start + val_src_pos, + src, num_values, s->ts_scale); } else { - gpuOutputByteStreamSplit(dst, s->data_start + val_src_pos, num_values); + gpuOutputByteStreamSplit(dst, src, num_values); } } else if (dtype_len == 4) { - gpuOutputByteStreamSplit(dst, s->data_start + val_src_pos, num_values); + gpuOutputByteStreamSplit(dst, src, num_values); } else { s->set_error_code(decode_error::UNSUPPORTED_ENCODING); } From 16e961a1c1b5e1fedfaf081a006de4daac6fcedb Mon Sep 17 00:00:00 2001 From: Ed Seidl Date: Fri, 8 Mar 2024 13:22:56 -0800 Subject: [PATCH 05/29] checkpoint --- cpp/src/io/parquet/page_enc.cu | 49 +++++++++++++++++----------------- 1 file changed, 25 insertions(+), 24 deletions(-) diff --git a/cpp/src/io/parquet/page_enc.cu b/cpp/src/io/parquet/page_enc.cu index 781276c506e..a2033bbc3e0 100644 --- a/cpp/src/io/parquet/page_enc.cu +++ b/cpp/src/io/parquet/page_enc.cu @@ -1733,14 +1733,14 @@ CUDF_KERNEL void __launch_bounds__(block_size, 8) case 2: return col->element(idx) * scale; default: return col->element(idx) * scale; } - }(); + }(); if constexpr (is_split_stream) { - auto const num_valid = s->page.num_valid; - dst[pos + 0 * num_valid] = v >> 24; - dst[pos + 1 * num_valid] = v >> 16; - dst[pos + 2 * num_valid] = v >> 8; - dst[pos + 3 * num_valid] = v; + auto const stride = s->page.num_valid; + dst[pos + 0 * stride] = v >> 24; + dst[pos + 1 * stride] = v >> 16; + dst[pos + 2 * stride] = v >> 8; + dst[pos + 3 * stride] = v; } else { dst[pos + 0] = v; dst[pos + 1] = v >> 8; @@ -1759,15 +1759,15 @@ CUDF_KERNEL void __launch_bounds__(block_size, 8) } } if constexpr (is_split_stream) { - auto const num_valid = s->page.num_valid; - dst[pos + 0 * num_valid] = v >> 56; - dst[pos + 1 * num_valid] = v >> 48; - dst[pos + 2 * num_valid] = v >> 40; - dst[pos + 3 * num_valid] = v >> 32; - dst[pos + 4 * num_valid] = v >> 24; - dst[pos + 5 * num_valid] = v >> 16; - dst[pos + 6 * num_valid] = v >> 8; - dst[pos + 7 * num_valid] = v; + auto const stride = s->page.num_valid; + dst[pos + 0 * stride] = v >> 56; + dst[pos + 1 * stride] = v >> 48; + dst[pos + 2 * stride] = v >> 40; + dst[pos + 3 * stride] = v >> 32; + dst[pos + 4 * stride] = v >> 24; + dst[pos + 5 * stride] = v >> 16; + dst[pos + 6 * stride] = v >> 8; + dst[pos + 7 * stride] = v; } else { dst[pos + 0] = v; dst[pos + 1] = v >> 8; @@ -1826,20 +1826,21 @@ CUDF_KERNEL void __launch_bounds__(block_size, 8) case DOUBLE: { if (is_split_stream) { int64_t const v = static_cast(s->col.leaf_column->element(val_idx)); - auto const num_valid = s->page.num_valid; - dst[pos + 0 * num_valid] = v >> 56; - dst[pos + 1 * num_valid] = v >> 48; - dst[pos + 2 * num_valid] = v >> 40; - dst[pos + 3 * num_valid] = v >> 32; - dst[pos + 4 * num_valid] = v >> 24; - dst[pos + 5 * num_valid] = v >> 16; - dst[pos + 6 * num_valid] = v >> 8; - dst[pos + 7 * num_valid] = v; + auto const stride = s->page.num_valid; + dst[pos + 0 * stride] = v >> 56; + dst[pos + 1 * stride] = v >> 48; + dst[pos + 2 * stride] = v >> 40; + dst[pos + 3 * stride] = v >> 32; + dst[pos + 4 * stride] = v >> 24; + dst[pos + 5 * stride] = v >> 16; + dst[pos + 6 * stride] = v >> 8; + dst[pos + 7 * stride] = v; } else { auto v = s->col.leaf_column->element(val_idx); memcpy(dst + pos, &v, 8); } } break; + // TODO(ets): can this ever be reached now? case BYTE_ARRAY: { // only PLAIN encoding is supported auto const bytes = [](cudf::type_id const type_id, From 9de287fdff76c92f7ceda643218b24a207d44fa6 Mon Sep 17 00:00:00 2001 From: Ed Seidl Date: Fri, 8 Mar 2024 14:42:33 -0800 Subject: [PATCH 06/29] checkpoint --- cpp/src/io/parquet/page_enc.cu | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/cpp/src/io/parquet/page_enc.cu b/cpp/src/io/parquet/page_enc.cu index a2033bbc3e0..7ccac9f897a 100644 --- a/cpp/src/io/parquet/page_enc.cu +++ b/cpp/src/io/parquet/page_enc.cu @@ -1840,7 +1840,6 @@ CUDF_KERNEL void __launch_bounds__(block_size, 8) memcpy(dst + pos, &v, 8); } } break; - // TODO(ets): can this ever be reached now? case BYTE_ARRAY: { // only PLAIN encoding is supported auto const bytes = [](cudf::type_id const type_id, @@ -1869,8 +1868,8 @@ CUDF_KERNEL void __launch_bounds__(block_size, 8) auto const v = s->col.leaf_column->element(val_idx).value(); auto const v_char_ptr = reinterpret_cast(&v); if constexpr (is_split_stream) { - auto const num_valid = s->page.num_valid; - for (int i = 0; i < dtype_len_out; i++, pos += num_valid) { + auto const stride = s->page.num_valid; + for (int i = dtype_len_out - 1; i >= 0; i--, pos += stride) { dst[pos] = v_char_ptr[i]; } } else { From 8b75a6d9c970cf038aa2cd13fe57da22f4158f37 Mon Sep 17 00:00:00 2001 From: seidl Date: Fri, 8 Mar 2024 15:35:26 -0800 Subject: [PATCH 07/29] formatting --- cpp/src/io/parquet/page_data.cu | 33 ++++++++++++++------------------- cpp/src/io/parquet/page_enc.cu | 6 +++--- 2 files changed, 17 insertions(+), 22 deletions(-) diff --git a/cpp/src/io/parquet/page_data.cu b/cpp/src/io/parquet/page_data.cu index 066142ffc6a..f4ccffddcfa 100644 --- a/cpp/src/io/parquet/page_data.cu +++ b/cpp/src/io/parquet/page_data.cu @@ -430,7 +430,10 @@ inline __device__ void gpuOutputSplitInt64Timestamp(int64_t* dst, } template -__device__ void gpuOutputSplitFixedLenByteArrayAsInt(T* dst, uint8_t const* src, size_type stride, uint32_t dtype_len_in) +__device__ void gpuOutputSplitFixedLenByteArrayAsInt(T* dst, + uint8_t const* src, + size_type stride, + uint32_t dtype_len_in) { T unscaled = 0; // fixed_len_byte_array decimals are big endian @@ -586,24 +589,18 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size) // Note: non-decimal FIXED_LEN_BYTE_ARRAY will be handled in the string reader if (s->col.converted_type == DECIMAL) { switch (dtype) { - case INT32: - gpuOutputByteStreamSplit( - dst, src, num_values); - break; - case INT64: - gpuOutputByteStreamSplit( - dst, src, num_values); - break; + case INT32: gpuOutputByteStreamSplit(dst, src, num_values); break; + case INT64: gpuOutputByteStreamSplit(dst, src, num_values); break; case FIXED_LEN_BYTE_ARRAY: if (s->dtype_len_in <= sizeof(int32_t)) { - gpuOutputSplitFixedLenByteArrayAsInt(reinterpret_cast(dst), - src, num_values, s->dtype_len_in); + gpuOutputSplitFixedLenByteArrayAsInt( + reinterpret_cast(dst), src, num_values, s->dtype_len_in); } else if (s->dtype_len_in <= sizeof(int64_t)) { - gpuOutputSplitFixedLenByteArrayAsInt(reinterpret_cast(dst), - src, num_values, s->dtype_len_in); + gpuOutputSplitFixedLenByteArrayAsInt( + reinterpret_cast(dst), src, num_values, s->dtype_len_in); } else { - gpuOutputSplitFixedLenByteArrayAsInt(reinterpret_cast<__int128_t*>(dst), - src, num_values, s->dtype_len_in); + gpuOutputSplitFixedLenByteArrayAsInt( + reinterpret_cast<__int128_t*>(dst), src, num_values, s->dtype_len_in); } // unsupported decimal precision [[fallthrough]]; @@ -621,10 +618,8 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size) dst[6] = 0; dst[7] = 0; } else if (s->ts_scale) { - gpuOutputSplitInt64Timestamp(reinterpret_cast(dst), - src, - num_values, - s->ts_scale); + gpuOutputSplitInt64Timestamp( + reinterpret_cast(dst), src, num_values, s->ts_scale); } else { gpuOutputByteStreamSplit(dst, src, num_values); } diff --git a/cpp/src/io/parquet/page_enc.cu b/cpp/src/io/parquet/page_enc.cu index 7ccac9f897a..412671f8c84 100644 --- a/cpp/src/io/parquet/page_enc.cu +++ b/cpp/src/io/parquet/page_enc.cu @@ -1733,7 +1733,7 @@ CUDF_KERNEL void __launch_bounds__(block_size, 8) case 2: return col->element(idx) * scale; default: return col->element(idx) * scale; } - }(); + }(); if constexpr (is_split_stream) { auto const stride = s->page.num_valid; @@ -1825,8 +1825,8 @@ CUDF_KERNEL void __launch_bounds__(block_size, 8) case DOUBLE: { if (is_split_stream) { - int64_t const v = static_cast(s->col.leaf_column->element(val_idx)); - auto const stride = s->page.num_valid; + int64_t const v = static_cast(s->col.leaf_column->element(val_idx)); + auto const stride = s->page.num_valid; dst[pos + 0 * stride] = v >> 56; dst[pos + 1 * stride] = v >> 48; dst[pos + 2 * stride] = v >> 40; From 760ca0cd0998e7a93be16ad17501fc5138add8df Mon Sep 17 00:00:00 2001 From: seidl Date: Tue, 12 Mar 2024 11:20:29 -0700 Subject: [PATCH 08/29] int and float working --- cpp/src/io/parquet/page_enc.cu | 55 ++++++++++++++++------------ cpp/tests/io/parquet_writer_test.cpp | 39 ++++++++++++++++++++ 2 files changed, 70 insertions(+), 24 deletions(-) diff --git a/cpp/src/io/parquet/page_enc.cu b/cpp/src/io/parquet/page_enc.cu index 412671f8c84..89562816464 100644 --- a/cpp/src/io/parquet/page_enc.cu +++ b/cpp/src/io/parquet/page_enc.cu @@ -1658,8 +1658,12 @@ CUDF_KERNEL void __launch_bounds__(block_size, 8) s->rle_pos = 0; s->rle_numvals = 0; s->rle_out = dst; - s->page.encoding = - determine_encoding(s->page.page_type, physical_type, s->ck.use_dictionary, write_v2_headers); + if constexpr (is_split_stream) { + s->page.encoding = Encoding::BYTE_STREAM_SPLIT; + } else { + s->page.encoding = determine_encoding( + s->page.page_type, physical_type, s->ck.use_dictionary, write_v2_headers); + } s->page_start_val = row_to_value_idx(s->page.start_row, s->col); s->chunk_start_val = row_to_value_idx(s->ck.start_row, s->col); } @@ -1737,10 +1741,10 @@ CUDF_KERNEL void __launch_bounds__(block_size, 8) if constexpr (is_split_stream) { auto const stride = s->page.num_valid; - dst[pos + 0 * stride] = v >> 24; - dst[pos + 1 * stride] = v >> 16; - dst[pos + 2 * stride] = v >> 8; - dst[pos + 3 * stride] = v; + dst[pos + 0 * stride] = v; + dst[pos + 1 * stride] = v >> 8; + dst[pos + 2 * stride] = v >> 16; + dst[pos + 3 * stride] = v >> 24; } else { dst[pos + 0] = v; dst[pos + 1] = v >> 8; @@ -1760,14 +1764,14 @@ CUDF_KERNEL void __launch_bounds__(block_size, 8) } if constexpr (is_split_stream) { auto const stride = s->page.num_valid; - dst[pos + 0 * stride] = v >> 56; - dst[pos + 1 * stride] = v >> 48; - dst[pos + 2 * stride] = v >> 40; - dst[pos + 3 * stride] = v >> 32; - dst[pos + 4 * stride] = v >> 24; - dst[pos + 5 * stride] = v >> 16; - dst[pos + 6 * stride] = v >> 8; - dst[pos + 7 * stride] = v; + dst[pos + 0 * stride] = v; + dst[pos + 1 * stride] = v >> 8; + dst[pos + 2 * stride] = v >> 16; + dst[pos + 3 * stride] = v >> 24; + dst[pos + 4 * stride] = v >> 32; + dst[pos + 5 * stride] = v >> 40; + dst[pos + 6 * stride] = v >> 48; + dst[pos + 7 * stride] = v >> 56; } else { dst[pos + 0] = v; dst[pos + 1] = v >> 8; @@ -1825,16 +1829,16 @@ CUDF_KERNEL void __launch_bounds__(block_size, 8) case DOUBLE: { if (is_split_stream) { - int64_t const v = static_cast(s->col.leaf_column->element(val_idx)); - auto const stride = s->page.num_valid; - dst[pos + 0 * stride] = v >> 56; - dst[pos + 1 * stride] = v >> 48; - dst[pos + 2 * stride] = v >> 40; - dst[pos + 3 * stride] = v >> 32; - dst[pos + 4 * stride] = v >> 24; - dst[pos + 5 * stride] = v >> 16; - dst[pos + 6 * stride] = v >> 8; - dst[pos + 7 * stride] = v; + auto const v = s->col.leaf_column->element(val_idx); + auto const stride = s->page.num_valid; + dst[pos + 0 * stride] = v; + dst[pos + 1 * stride] = v >> 8; + dst[pos + 2 * stride] = v >> 16; + dst[pos + 3 * stride] = v >> 24; + dst[pos + 4 * stride] = v >> 32; + dst[pos + 5 * stride] = v >> 40; + dst[pos + 6 * stride] = v >> 48; + dst[pos + 7 * stride] = v >> 56; } else { auto v = s->col.leaf_column->element(val_idx); memcpy(dst + pos, &v, 8); @@ -1885,6 +1889,9 @@ CUDF_KERNEL void __launch_bounds__(block_size, 8) __syncthreads(); } + // for BYTE_STREAM_SPLIT, s->cur now points to the end of the first stream. + // need it to point to the end of the Nth stream. + if constexpr (is_split_stream) { s->cur += (dtype_len_out - 1) * s->page.num_valid; } finish_page_encode( s, s->cur, pages, comp_in, comp_out, comp_results, write_v2_headers); } diff --git a/cpp/tests/io/parquet_writer_test.cpp b/cpp/tests/io/parquet_writer_test.cpp index 200c58bb9aa..d0ac0e2509d 100644 --- a/cpp/tests/io/parquet_writer_test.cpp +++ b/cpp/tests/io/parquet_writer_test.cpp @@ -1623,6 +1623,45 @@ TEST_F(ParquetWriterTest, DeltaBinaryStartsWithNulls) CUDF_TEST_EXPECT_TABLES_EQUAL(expected, result.tbl->view()); } +TEST_F(ParquetWriterTest, ByteStreamSplit) +{ + constexpr auto num_rows = 100; + + auto col0_data = random_values(num_rows); + auto col1_data = random_values(num_rows); + auto col2_data = random_values(num_rows); + auto col3_data = random_values(num_rows); + + column_wrapper col0{col0_data.begin(), col0_data.end(), no_nulls()}; + column_wrapper col1{col1_data.begin(), col1_data.end(), no_nulls()}; + column_wrapper col2{col2_data.begin(), col2_data.end(), no_nulls()}; + column_wrapper col3{col3_data.begin(), col3_data.end(), no_nulls()}; + + auto expected = table_view{{col0, col1, col2, col3}}; + + cudf::io::table_input_metadata expected_metadata(expected); + expected_metadata.column_metadata[0].set_name("int32s"); + expected_metadata.column_metadata[1].set_name("int64s"); + expected_metadata.column_metadata[2].set_name("floats"); + expected_metadata.column_metadata[3].set_name("doubles"); + for (size_t i = 0; i < expected_metadata.column_metadata.size(); i++) { + expected_metadata.column_metadata[i].set_encoding(cudf::io::column_encoding::BYTE_STREAM_SPLIT); + } + + auto filepath = temp_env->get_temp_filepath("ByteStreamSplit.parquet"); + cudf::io::parquet_writer_options out_opts = + cudf::io::parquet_writer_options::builder(cudf::io::sink_info{filepath}, expected) + .metadata(expected_metadata); + cudf::io::write_parquet(out_opts); + + cudf::io::parquet_reader_options in_opts = + cudf::io::parquet_reader_options::builder(cudf::io::source_info{filepath}); + auto result = cudf::io::read_parquet(in_opts); + + //*((char*)0) = 0; + CUDF_TEST_EXPECT_TABLES_EQUAL(expected, result.tbl->view()); +} + ///////////////////////////////////////////////////////////// // custom mem mapped data sink that supports device writes template From cfa51e3eddce9983d09adcf578a23ea33aee39af Mon Sep 17 00:00:00 2001 From: seidl Date: Tue, 12 Mar 2024 12:05:30 -0700 Subject: [PATCH 09/29] get decimals working --- cpp/src/io/parquet/page_data.cu | 5 +++- cpp/tests/io/parquet_common.cpp | 1 + cpp/tests/io/parquet_writer_test.cpp | 39 ++++++++++++++++++++++++++-- 3 files changed, 42 insertions(+), 3 deletions(-) diff --git a/cpp/src/io/parquet/page_data.cu b/cpp/src/io/parquet/page_data.cu index f4ccffddcfa..9f51de387e9 100644 --- a/cpp/src/io/parquet/page_data.cu +++ b/cpp/src/io/parquet/page_data.cu @@ -595,12 +595,15 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size) if (s->dtype_len_in <= sizeof(int32_t)) { gpuOutputSplitFixedLenByteArrayAsInt( reinterpret_cast(dst), src, num_values, s->dtype_len_in); + break; } else if (s->dtype_len_in <= sizeof(int64_t)) { gpuOutputSplitFixedLenByteArrayAsInt( reinterpret_cast(dst), src, num_values, s->dtype_len_in); - } else { + break; + } else if (s->dtype_len_in <= sizeof(__int128_t)) { gpuOutputSplitFixedLenByteArrayAsInt( reinterpret_cast<__int128_t*>(dst), src, num_values, s->dtype_len_in); + break; } // unsupported decimal precision [[fallthrough]]; diff --git a/cpp/tests/io/parquet_common.cpp b/cpp/tests/io/parquet_common.cpp index b64cd230bc6..c1211869bcc 100644 --- a/cpp/tests/io/parquet_common.cpp +++ b/cpp/tests/io/parquet_common.cpp @@ -203,6 +203,7 @@ template std::vector random_values(size_t size); template std::vector random_values(size_t size); template std::vector random_values(size_t size); template std::vector random_values(size_t size); +template std::vector<__int128_t> random_values<__int128_t>(size_t size); template std::vector random_values(size_t size); template std::vector random_values(size_t size); template std::vector random_values(size_t size); diff --git a/cpp/tests/io/parquet_writer_test.cpp b/cpp/tests/io/parquet_writer_test.cpp index d0ac0e2509d..e42db628c1e 100644 --- a/cpp/tests/io/parquet_writer_test.cpp +++ b/cpp/tests/io/parquet_writer_test.cpp @@ -1648,7 +1648,7 @@ TEST_F(ParquetWriterTest, ByteStreamSplit) expected_metadata.column_metadata[i].set_encoding(cudf::io::column_encoding::BYTE_STREAM_SPLIT); } - auto filepath = temp_env->get_temp_filepath("ByteStreamSplit.parquet"); + auto const filepath = temp_env->get_temp_filepath("ByteStreamSplit.parquet"); cudf::io::parquet_writer_options out_opts = cudf::io::parquet_writer_options::builder(cudf::io::sink_info{filepath}, expected) .metadata(expected_metadata); @@ -1658,7 +1658,42 @@ TEST_F(ParquetWriterTest, ByteStreamSplit) cudf::io::parquet_reader_options::builder(cudf::io::source_info{filepath}); auto result = cudf::io::read_parquet(in_opts); - //*((char*)0) = 0; + CUDF_TEST_EXPECT_TABLES_EQUAL(expected, result.tbl->view()); +} + +TEST_F(ParquetWriterTest, DecimalByteStreamSplit) +{ + constexpr cudf::size_type num_rows = 100; + auto seq_col0 = random_values(num_rows); + auto seq_col1 = random_values(num_rows); + auto seq_col2 = random_values<__int128_t>(num_rows); + + auto col0 = cudf::test::fixed_point_column_wrapper{ + seq_col0.begin(), seq_col0.end(), no_nulls(), numeric::scale_type{-5}}; + auto col1 = cudf::test::fixed_point_column_wrapper{ + seq_col1.begin(), seq_col1.end(), no_nulls(), numeric::scale_type{-9}}; + auto col2 = cudf::test::fixed_point_column_wrapper<__int128_t>{ + seq_col1.begin(), seq_col1.end(), no_nulls(), numeric::scale_type{-11}}; + + auto expected = table_view({col0, col1, col2}); + cudf::io::table_input_metadata expected_metadata(expected); + expected_metadata.column_metadata[0].set_name("int32s").set_decimal_precision(7); + expected_metadata.column_metadata[1].set_name("int64s").set_decimal_precision(11); + expected_metadata.column_metadata[2].set_name("int128s").set_decimal_precision(22); + for (size_t i = 0; i < expected_metadata.column_metadata.size(); i++) { + expected_metadata.column_metadata[i].set_encoding(cudf::io::column_encoding::BYTE_STREAM_SPLIT); + } + + auto const filepath = temp_env->get_temp_filepath("DecimalByteStreamSplit.parquet"); + cudf::io::parquet_writer_options args = + cudf::io::parquet_writer_options::builder(cudf::io::sink_info{filepath}, expected) + .metadata(expected_metadata); + cudf::io::write_parquet(args); + + cudf::io::parquet_reader_options read_opts = + cudf::io::parquet_reader_options::builder(cudf::io::source_info{filepath}); + auto result = cudf::io::read_parquet(read_opts); + CUDF_TEST_EXPECT_TABLES_EQUAL(expected, result.tbl->view()); } From 832428db9e78207161e235f579b8690f8d922622 Mon Sep 17 00:00:00 2001 From: seidl Date: Tue, 12 Mar 2024 12:55:05 -0700 Subject: [PATCH 10/29] add more tests --- cpp/tests/io/parquet_writer_test.cpp | 60 ++++++++++++++++++++++++++++ 1 file changed, 60 insertions(+) diff --git a/cpp/tests/io/parquet_writer_test.cpp b/cpp/tests/io/parquet_writer_test.cpp index e42db628c1e..9258967e799 100644 --- a/cpp/tests/io/parquet_writer_test.cpp +++ b/cpp/tests/io/parquet_writer_test.cpp @@ -1697,6 +1697,38 @@ TEST_F(ParquetWriterTest, DecimalByteStreamSplit) CUDF_TEST_EXPECT_TABLES_EQUAL(expected, result.tbl->view()); } +TEST_F(ParquetWriterTest, DurationByteStreamSplit) +{ + constexpr cudf::size_type num_rows = 100; + auto seq_col0 = random_values(num_rows); + auto seq_col1 = random_values(num_rows); + + auto durations_ms = cudf::test::fixed_width_column_wrapper( + seq_col0.begin(), seq_col0.end(), no_nulls()); + auto durations_us = cudf::test::fixed_width_column_wrapper( + seq_col1.begin(), seq_col1.end(), no_nulls()); + + auto expected = table_view{{durations_ms, durations_us}}; + cudf::io::table_input_metadata expected_metadata(expected); + expected_metadata.column_metadata[0].set_name("millis"); + expected_metadata.column_metadata[1].set_name("micros"); + for (size_t i = 0; i < expected_metadata.column_metadata.size(); i++) { + expected_metadata.column_metadata[i].set_encoding(cudf::io::column_encoding::BYTE_STREAM_SPLIT); + } + + auto filepath = temp_env->get_temp_filepath("DurationByteStreamSplit.parquet"); + cudf::io::parquet_writer_options out_opts = + cudf::io::parquet_writer_options::builder(cudf::io::sink_info{filepath}, expected) + .metadata(expected_metadata); + cudf::io::write_parquet(out_opts); + + cudf::io::parquet_reader_options in_opts = + cudf::io::parquet_reader_options::builder(cudf::io::source_info{filepath}); + auto result = cudf::io::read_parquet(in_opts); + + CUDF_TEST_EXPECT_TABLES_EQUAL(expected, result.tbl->view()); +} + ///////////////////////////////////////////////////////////// // custom mem mapped data sink that supports device writes template @@ -1880,6 +1912,34 @@ TYPED_TEST(ParquetWriterTimestampTypeTest, TimestampOverflow) CUDF_TEST_EXPECT_TABLES_EQUAL(expected, result.tbl->view()); } +TYPED_TEST(ParquetWriterTimestampTypeTest, TimestampsByteStreamSplit) +{ + auto sequence = cudf::detail::make_counting_transform_iterator( + 0, [](auto i) { return ((std::rand() / 10000) * 1000); }); + + constexpr auto num_rows = 100; + column_wrapper col( + sequence, sequence + num_rows, no_nulls()); + + auto expected = table_view{{col}}; + + cudf::io::table_input_metadata expected_metadata(expected); + expected_metadata.column_metadata[0].set_encoding(cudf::io::column_encoding::BYTE_STREAM_SPLIT); + + auto filepath = temp_env->get_temp_filepath("TimestampsByteStreamSplit.parquet"); + cudf::io::parquet_writer_options out_opts = + cudf::io::parquet_writer_options::builder(cudf::io::sink_info{filepath}, expected) + .metadata(expected_metadata); + cudf::io::write_parquet(out_opts); + + cudf::io::parquet_reader_options in_opts = + cudf::io::parquet_reader_options::builder(cudf::io::source_info{filepath}) + .timestamp_type(this->type()); + auto result = cudf::io::read_parquet(in_opts); + + CUDF_TEST_EXPECT_TABLES_EQUAL(expected, result.tbl->view()); +} + ////////////////////////////// // writer stress tests From 55ed69cf0894f94fd6e80313d74735e40811a49c Mon Sep 17 00:00:00 2001 From: seidl Date: Tue, 12 Mar 2024 13:22:44 -0700 Subject: [PATCH 11/29] clean up some dead code --- cpp/src/io/parquet/page_data.cu | 38 +++++++-------------------------- 1 file changed, 8 insertions(+), 30 deletions(-) diff --git a/cpp/src/io/parquet/page_data.cu b/cpp/src/io/parquet/page_data.cu index 9f51de387e9..3ff6426b8c1 100644 --- a/cpp/src/io/parquet/page_data.cu +++ b/cpp/src/io/parquet/page_data.cu @@ -449,10 +449,8 @@ __device__ void gpuOutputSplitFixedLenByteArrayAsInt(T* dst, *dst = unscaled; } -// TODO(ets): is this better as a standalone, or as part of the plain/dict decoder? -// how does this work with the new microkernels? /** - * @brief Kernel for computing the column data stored in the pages + * @brief Kernel for computing the BYTE_STREAM_SPLIT column data stored in the pages * * This function will write the page data and the page data's validity to the * output specified in the page's column chunk. If necessary, additional @@ -473,6 +471,7 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size) size_t num_rows, kernel_error::pointer error_code) { + using cudf::detail::warp_size; __shared__ __align__(16) page_state_s state_g; __shared__ __align__(16) page_state_buffers_s @@ -482,7 +481,6 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size) auto* const sb = &state_buffers; int page_idx = blockIdx.x; int t = threadIdx.x; - int out_thread0; [[maybe_unused]] null_count_back_copier _{s, t}; if (!setupLocalPageInfo(s, @@ -497,17 +495,9 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size) bool const has_repetition = s->col.max_level[level_type::REPETITION] > 0; - auto const data_len = thrust::distance(s->data_start, s->data_end); - auto const num_values = data_len / s->dtype_len_in; - - if (s->dict_base) { - out_thread0 = (s->dict_bits > 0) ? 64 : 32; - } else { - switch (s->col.data_type & 7) { - case FIXED_LEN_BYTE_ARRAY: out_thread0 = 64; break; - default: out_thread0 = 32; - } - } + auto const data_len = thrust::distance(s->data_start, s->data_end); + auto const num_values = data_len / s->dtype_len_in; + auto const out_thread0 = warp_size; PageNestingDecodeInfo* nesting_info_base = s->nesting_info; @@ -526,28 +516,16 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size) s->nz_count + (decode_block_size - out_thread0)); } else { target_pos = min(s->nz_count, src_pos + decode_block_size - out_thread0); - if (out_thread0 > 32) { target_pos = min(target_pos, s->dict_pos); } } - // this needs to be here to prevent warp 3 modifying src_pos before all threads have read it + // this needs to be here to prevent warp 1 modifying src_pos before all threads have read it __syncthreads(); - if (t < 32) { + + if (t < warp_size) { // decode repetition and definition levels. // - update validity vectors // - updates offsets (for nested columns) // - produces non-NULL value indices in s->nz_idx for subsequent decoding gpuDecodeLevels(s, sb, target_pos, rep, def, t); - } else if (t < out_thread0) { - // skipped_leaf_values will always be 0 for flat hierarchies. - uint32_t src_target_pos = target_pos + skipped_leaf_values; - - // WARP1: Decode string positions - // NOTE: racecheck complains of a RAW error involving the s->dict_pos assignment below. - // This is likely a false positive in practice, but could be solved by wrapping the next - // 9 lines in `if (s->dict_pos < src_target_pos) {}`. If that change is made here, it will - // be needed in the other DecodeXXX kernels. - gpuInitStringDescriptors(s, sb, src_target_pos, t & 0x1f); - - if (t == 32) { s->dict_pos = src_target_pos; } } else { // WARP1..WARP3: Decode values int const dtype = s->col.data_type & 7; From 9316f6cc5fbf41b0e2d2bee7f87fa243fa79acf0 Mon Sep 17 00:00:00 2001 From: seidl Date: Tue, 12 Mar 2024 13:28:46 -0700 Subject: [PATCH 12/29] update comment --- cpp/src/io/parquet/page_data.cu | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/cpp/src/io/parquet/page_data.cu b/cpp/src/io/parquet/page_data.cu index 3ff6426b8c1..647570be0ce 100644 --- a/cpp/src/io/parquet/page_data.cu +++ b/cpp/src/io/parquet/page_data.cu @@ -452,10 +452,12 @@ __device__ void gpuOutputSplitFixedLenByteArrayAsInt(T* dst, /** * @brief Kernel for computing the BYTE_STREAM_SPLIT column data stored in the pages * - * This function will write the page data and the page data's validity to the - * output specified in the page's column chunk. If necessary, additional - * conversion will be performed to translate from the Parquet datatype to - * desired output datatype (ex. 32-bit to 16-bit, string to hash). + * This is basically the PLAIN decoder, but with a pared down set of supported data + * types, and using output functions that piece together the individual streams. + * Supported physical types include INT32, INT64, FLOAT, DOUBLE and FIXED_LEN_BYTE_ARRAY. + * The latter is currently only used for large decimals. The Parquet specification also + * has FLOAT16 and UUID types that are currently not supported. FIXED_LEN_BYTE_ARRAY data + * that lacks a `LogicalType` annotation will be handled by the string decoder. * * @param pages List of pages * @param chunks List of column chunks From cba0a33329463436a930d91a2ce5e31e09c861bd Mon Sep 17 00:00:00 2001 From: seidl Date: Wed, 13 Mar 2024 12:09:46 -0700 Subject: [PATCH 13/29] only update cur ptr on t0 --- cpp/src/io/parquet/page_enc.cu | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/cpp/src/io/parquet/page_enc.cu b/cpp/src/io/parquet/page_enc.cu index 89562816464..2fa1d42a6da 100644 --- a/cpp/src/io/parquet/page_enc.cu +++ b/cpp/src/io/parquet/page_enc.cu @@ -1891,7 +1891,9 @@ CUDF_KERNEL void __launch_bounds__(block_size, 8) // for BYTE_STREAM_SPLIT, s->cur now points to the end of the first stream. // need it to point to the end of the Nth stream. - if constexpr (is_split_stream) { s->cur += (dtype_len_out - 1) * s->page.num_valid; } + if constexpr (is_split_stream) { + if (t == 0) { s->cur += (dtype_len_out - 1) * s->page.num_valid; } + } finish_page_encode( s, s->cur, pages, comp_in, comp_out, comp_results, write_v2_headers); } From ee7919ce80b946de16f56853ac0c443f28425617 Mon Sep 17 00:00:00 2001 From: seidl Date: Thu, 14 Mar 2024 10:56:46 -0700 Subject: [PATCH 14/29] rework kernel_mask_for_page --- cpp/src/io/parquet/page_hdr.cu | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/cpp/src/io/parquet/page_hdr.cu b/cpp/src/io/parquet/page_hdr.cu index 19678b57e58..2f8c021700d 100644 --- a/cpp/src/io/parquet/page_hdr.cu +++ b/cpp/src/io/parquet/page_hdr.cu @@ -166,13 +166,7 @@ __device__ decode_kernel_mask kernel_mask_for_page(PageInfo const& page, ColumnChunkDesc const& chunk) { if (page.flags & PAGEINFO_FLAGS_DICTIONARY) { return decode_kernel_mask::NONE; } - if (!is_string_col(chunk) && !is_nested(chunk) && !is_byte_array(chunk) && !is_boolean(chunk)) { - if (page.encoding == Encoding::PLAIN) { - return decode_kernel_mask::FIXED_WIDTH_NO_DICT; - } else if (page.encoding == Encoding::PLAIN_DICTIONARY) { - return decode_kernel_mask::FIXED_WIDTH_DICT; - } - } + if (page.encoding == Encoding::DELTA_BINARY_PACKED) { return decode_kernel_mask::DELTA_BINARY; } else if (page.encoding == Encoding::DELTA_BYTE_ARRAY) { @@ -186,6 +180,15 @@ __device__ decode_kernel_mask kernel_mask_for_page(PageInfo const& page, return decode_kernel_mask::BYTE_STREAM_SPLIT; } + if (!is_nested(chunk) && !is_byte_array(chunk) && !is_boolean(chunk)) { + if (page.encoding == Encoding::PLAIN) { + return decode_kernel_mask::FIXED_WIDTH_NO_DICT; + } else if (page.encoding == Encoding::PLAIN_DICTIONARY || + page.encoding == Encoding::RLE_DICTIONARY) { + return decode_kernel_mask::FIXED_WIDTH_DICT; + } + } + // non-string, non-delta return decode_kernel_mask::GENERAL; } From d6f5569de25b9daf91d81ff08374c2f9f3b0fa83 Mon Sep 17 00:00:00 2001 From: seidl Date: Thu, 14 Mar 2024 14:27:43 -0700 Subject: [PATCH 15/29] fix setting encoding on list children --- cpp/src/io/parquet/writer_impl.cu | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/cpp/src/io/parquet/writer_impl.cu b/cpp/src/io/parquet/writer_impl.cu index a7e1ddc81b6..675d8943e1e 100644 --- a/cpp/src/io/parquet/writer_impl.cu +++ b/cpp/src/io/parquet/writer_impl.cu @@ -611,8 +611,7 @@ std::vector construct_schema_tree( column_in_metadata const& col_meta) { s.requested_encoding = column_encoding::USE_DEFAULT; - if (schema[parent_idx].name != "list" and - col_meta.get_encoding() != column_encoding::USE_DEFAULT) { + if (s.name != "list" and col_meta.get_encoding() != column_encoding::USE_DEFAULT) { // do some validation switch (col_meta.get_encoding()) { case column_encoding::DELTA_BINARY_PACKED: From 56365b5c27411aab37825e4c55fa1112e4d8f4c3 Mon Sep 17 00:00:00 2001 From: seidl Date: Thu, 14 Mar 2024 14:28:54 -0700 Subject: [PATCH 16/29] add flat version of decoder --- cpp/src/io/parquet/decode_fixed.cu | 229 +++++++++++++++++++++++++++ cpp/src/io/parquet/page_data.cu | 43 ----- cpp/src/io/parquet/page_data.cuh | 76 +++++++++ cpp/src/io/parquet/page_hdr.cu | 10 +- cpp/src/io/parquet/parquet_gpu.hpp | 24 +++ cpp/src/io/parquet/reader_impl.cpp | 11 ++ cpp/tests/io/parquet_writer_test.cpp | 15 +- 7 files changed, 358 insertions(+), 50 deletions(-) diff --git a/cpp/src/io/parquet/decode_fixed.cu b/cpp/src/io/parquet/decode_fixed.cu index 062363db503..0fde880b05d 100644 --- a/cpp/src/io/parquet/decode_fixed.cu +++ b/cpp/src/io/parquet/decode_fixed.cu @@ -225,6 +225,96 @@ __device__ inline void gpuDecodeValues( } } +template +__device__ inline void gpuDecodeSplitValues(page_state_s* s, + state_buf* const sb, + int start, + int end) +{ + using cudf::detail::warp_size; + constexpr int num_warps = decode_block_size / warp_size; + constexpr int max_batch_size = num_warps * warp_size; + + auto const t = threadIdx.x; + + PageNestingDecodeInfo* nesting_info_base = s->nesting_info; + int const dtype = s->col.data_type & 7; + auto const data_len = thrust::distance(s->data_start, s->data_end); + auto const num_values = data_len / s->dtype_len_in; + + // decode values + int pos = start; + while (pos < end) { + int const batch_size = min(max_batch_size, end - pos); + + int const target_pos = pos + batch_size; + int const src_pos = pos + t; + + // the position in the output column/buffer + int dst_pos = sb->nz_idx[rolling_index(src_pos)] - s->first_row; + + // target_pos will always be properly bounded by num_rows, but dst_pos may be negative (values + // before first_row) in the flat hierarchy case. + if (src_pos < target_pos && dst_pos >= 0) { + // nesting level that is storing actual leaf values + int const leaf_level_index = s->col.max_nesting_depth - 1; + + uint32_t dtype_len = s->dtype_len; + uint8_t const* src = s->data_start + src_pos; + uint8_t* dst = + nesting_info_base[leaf_level_index].data_out + static_cast(dst_pos) * dtype_len; + + // Note: non-decimal FIXED_LEN_BYTE_ARRAY will be handled in the string reader + if (s->col.converted_type == DECIMAL) { + switch (dtype) { + case INT32: gpuOutputByteStreamSplit(dst, src, num_values); break; + case INT64: gpuOutputByteStreamSplit(dst, src, num_values); break; + case FIXED_LEN_BYTE_ARRAY: + if (s->dtype_len_in <= sizeof(int32_t)) { + gpuOutputSplitFixedLenByteArrayAsInt( + reinterpret_cast(dst), src, num_values, s->dtype_len_in); + break; + } else if (s->dtype_len_in <= sizeof(int64_t)) { + gpuOutputSplitFixedLenByteArrayAsInt( + reinterpret_cast(dst), src, num_values, s->dtype_len_in); + break; + } else if (s->dtype_len_in <= sizeof(__int128_t)) { + gpuOutputSplitFixedLenByteArrayAsInt( + reinterpret_cast<__int128_t*>(dst), src, num_values, s->dtype_len_in); + break; + } + // unsupported decimal precision + [[fallthrough]]; + + default: s->set_error_code(decode_error::UNSUPPORTED_ENCODING); + } + } else if (dtype_len == 8) { + if (s->dtype_len_in == 4) { + // Reading INT32 TIME_MILLIS into 64-bit DURATION_MILLISECONDS + // TIME_MILLIS is the only duration type stored as int32: + // https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#deprecated-time-convertedtype + gpuOutputByteStreamSplit(dst, src, num_values); + dst[4] = 0; + dst[5] = 0; + dst[6] = 0; + dst[7] = 0; + } else if (s->ts_scale) { + gpuOutputSplitInt64Timestamp( + reinterpret_cast(dst), src, num_values, s->ts_scale); + } else { + gpuOutputByteStreamSplit(dst, src, num_values); + } + } else if (dtype_len == 4) { + gpuOutputByteStreamSplit(dst, src, num_values); + } else { + s->set_error_code(decode_error::UNSUPPORTED_ENCODING); + } + } + + pos += batch_size; + } +} + // is the page marked nullable or not __device__ inline bool is_nullable(page_state_s* s) { @@ -495,6 +585,123 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size) if (t == 0 and s->error != 0) { set_error(s->error, error_code); } } +/** + * @brief Kernel for computing fixed width non dictionary column data stored in the pages + * + * This function will write the page data and the page data's validity to the + * output specified in the page's column chunk. If necessary, additional + * conversion will be performed to translate from the Parquet datatype to + * desired output datatype. + * + * @param pages List of pages + * @param chunks List of column chunks + * @param min_row Row index to start reading at + * @param num_rows Maximum number of rows to read + * @param error_code Error code to set if an error is encountered + */ +template +CUDF_KERNEL void __launch_bounds__(decode_block_size) + gpuDecodeSplitPageDataFlat(PageInfo* pages, + device_span chunks, + size_t min_row, + size_t num_rows, + kernel_error::pointer error_code) +{ + __shared__ __align__(16) page_state_s state_g; + __shared__ __align__(16) page_state_buffers_s // unused in this kernel + state_buffers; + + page_state_s* const s = &state_g; + auto* const sb = &state_buffers; + int const page_idx = blockIdx.x; + int const t = threadIdx.x; + PageInfo* pp = &pages[page_idx]; + + if (!(BitAnd(pages[page_idx].kernel_mask, decode_kernel_mask::BYTE_STREAM_SPLIT_FLAT))) { + return; + } + + // must come after the kernel mask check + [[maybe_unused]] null_count_back_copier _{s, t}; + + if (!setupLocalPageInfo(s, + pp, + chunks, + min_row, + num_rows, + mask_filter{decode_kernel_mask::BYTE_STREAM_SPLIT_FLAT}, + page_processing_stage::DECODE)) { + return; + } + + // the level stream decoders + __shared__ rle_run def_runs[rle_run_buffer_size]; + rle_stream def_decoder{def_runs}; + + // if we have no work to do (eg, in a skip_rows/num_rows case) in this page. + if (s->num_rows == 0) { return; } + + bool const nullable = is_nullable(s); + bool const nullable_with_nulls = nullable && has_nulls(s); + + // initialize the stream decoders (requires values computed in setupLocalPageInfo) + level_t* const def = reinterpret_cast(pp->lvl_decode_buf[level_type::DEFINITION]); + if (nullable_with_nulls) { + def_decoder.init(s->col.level_bits[level_type::DEFINITION], + s->abs_lvl_start[level_type::DEFINITION], + s->abs_lvl_end[level_type::DEFINITION], + def, + s->page.num_input_values); + } + __syncthreads(); + + // We use two counters in the loop below: processed_count and valid_count. + // - processed_count: number of rows out of num_input_values that we have decoded so far. + // the definition stream returns the number of total rows it has processed in each call + // to decode_next and we accumulate in process_count. + // - valid_count: number of non-null rows we have decoded so far. In each iteration of the + // loop below, we look at the number of valid items (which could be all for non-nullable), + // and valid_count is that running count. + int processed_count = 0; + int valid_count = 0; + // the core loop. decode batches of level stream data using rle_stream objects + // and pass the results to gpuDecodeValues + while (s->error == 0 && processed_count < s->page.num_input_values) { + int next_valid_count; + + // only need to process definition levels if this is a nullable column + if (nullable) { + if (nullable_with_nulls) { + processed_count += def_decoder.decode_next(t); + __syncthreads(); + } else { + processed_count += min(rolling_buf_size, s->page.num_input_values - processed_count); + } + + next_valid_count = gpuUpdateValidityOffsetsAndRowIndicesFlat( + processed_count, s, sb, def, t, nullable_with_nulls); + } + // if we wanted to split off the skip_rows/num_rows case into a separate kernel, we could skip + // this function call entirely since all it will ever generate is a mapping of (i -> i) for + // nz_idx. gpuDecodeValues would be the only work that happens. + else { + processed_count += min(rolling_buf_size, s->page.num_input_values - processed_count); + next_valid_count = gpuUpdateValidityOffsetsAndRowIndicesFlat( + processed_count, s, sb, nullptr, t, false); + } + __syncthreads(); + + // decode the values themselves + gpuDecodeSplitValues(s, sb, valid_count, next_valid_count); + __syncthreads(); + + valid_count = next_valid_count; + } + if (t == 0 and s->error != 0) { set_error(s->error, error_code); } +} + } // anonymous namespace void __host__ DecodePageDataFixed(cudf::detail::hostdevice_span pages, @@ -539,4 +746,26 @@ void __host__ DecodePageDataFixedDict(cudf::detail::hostdevice_span pa } } +void __host__ DecodeSplitPageDataFlat(cudf::detail::hostdevice_span pages, + cudf::detail::hostdevice_span chunks, + size_t num_rows, + size_t min_row, + int level_type_size, + kernel_error::pointer error_code, + rmm::cuda_stream_view stream) +{ + // dim3 dim_block(decode_block_size, 1); // decode_block_size = 128 threads per block + // 1 full warp, and 1 warp of 1 thread + dim3 dim_block(decode_block_size, 1); // decode_block_size = 128 threads per block + dim3 dim_grid(pages.size(), 1); // 1 thread block per pags => # blocks + + if (level_type_size == 1) { + gpuDecodeSplitPageDataFlat<<>>( + pages.device_ptr(), chunks, min_row, num_rows, error_code); + } else { + gpuDecodeSplitPageDataFlat<<>>( + pages.device_ptr(), chunks, min_row, num_rows, error_code); + } +} + } // namespace cudf::io::parquet::detail diff --git a/cpp/src/io/parquet/page_data.cu b/cpp/src/io/parquet/page_data.cu index eb8d773fb22..efd356be086 100644 --- a/cpp/src/io/parquet/page_data.cu +++ b/cpp/src/io/parquet/page_data.cu @@ -28,49 +28,6 @@ namespace { constexpr int decode_block_size = 128; constexpr int rolling_buf_size = decode_block_size * 2; -template -__device__ inline void gpuOutputByteStreamSplit(uint8_t* dst, uint8_t const* src, size_type stride) -{ - for (int i = 0; i < byte_length; i++) { - dst[i] = src[i * stride]; - } -} - -inline __device__ void gpuOutputSplitInt64Timestamp(int64_t* dst, - uint8_t const* src, - size_type stride, - int32_t ts_scale) -{ - gpuOutputByteStreamSplit(reinterpret_cast(dst), src, stride); - if (ts_scale < 0) { - // round towards negative infinity - int sign = (*dst < 0); - *dst = ((*dst + sign) / -ts_scale) + sign; - } else { - *dst = *dst * ts_scale; - } -} - -template -__device__ void gpuOutputSplitFixedLenByteArrayAsInt(T* dst, - uint8_t const* src, - size_type stride, - uint32_t dtype_len_in) -{ - T unscaled = 0; - // fixed_len_byte_array decimals are big endian - for (unsigned int i = 0; i < dtype_len_in; i++) { - unscaled = (unscaled << 8) | src[i * stride]; - } - // Shift the unscaled value up and back down when it isn't all 8 bytes, - // which sign extend the value for correctly representing negative numbers. - if (dtype_len_in < sizeof(T)) { - unscaled <<= (sizeof(T) - dtype_len_in) * 8; - unscaled >>= (sizeof(T) - dtype_len_in) * 8; - } - *dst = unscaled; -} - /** * @brief Kernel for computing the BYTE_STREAM_SPLIT column data stored in the pages * diff --git a/cpp/src/io/parquet/page_data.cuh b/cpp/src/io/parquet/page_data.cuh index f0fa7d814cf..a6d3b7ab49c 100644 --- a/cpp/src/io/parquet/page_data.cuh +++ b/cpp/src/io/parquet/page_data.cuh @@ -397,4 +397,80 @@ inline __device__ void gpuOutputGeneric( } } } + +/** + * Output a BYTE_STREAM_SPLIT value of length `byte_length`. + * + * Data is encoded as N streams of length M, forming an NxM sized matrix. Rows are streams, + * columns are individual values. + * + * @param dst pointer to output data + * @param src pointer to first byte of input data in stream 0 + * @param stride number of bytes per input stream (M) + */ +template +__device__ inline void gpuOutputByteStreamSplit(uint8_t* dst, uint8_t const* src, size_type stride) +{ + for (int i = 0; i < byte_length; i++) { + dst[i] = src[i * stride]; + } +} + +/** + * Output a 64-bit BYTE_STREAM_SPLIT encoded timestamp. + * + * Data is encoded as N streams of length M, forming an NxM sized matrix. Rows are streams, + * columns are individual values. + * + * @param dst pointer to output data + * @param src pointer to first byte of input data in stream 0 + * @param stride number of bytes per input stream (M) + * @param ts_scale timestamp scale + */ +inline __device__ void gpuOutputSplitInt64Timestamp(int64_t* dst, + uint8_t const* src, + size_type stride, + int32_t ts_scale) +{ + gpuOutputByteStreamSplit(reinterpret_cast(dst), src, stride); + if (ts_scale < 0) { + // round towards negative infinity + int sign = (*dst < 0); + *dst = ((*dst + sign) / -ts_scale) + sign; + } else { + *dst = *dst * ts_scale; + } +} + +/** + * Output a BYTE_STREAM_SPLIT encoded decimal as an integer type. + * + * Data is encoded as N streams of length M, forming an NxM sized matrix. Rows are streams, + * columns are individual values. + * + * @param dst pointer to output data + * @param src pointer to first byte of input data in stream 0 + * @param stride number of bytes per input stream (M) + * @param dtype_len_in length of the `FIXED_LEN_BYTE_ARRAY` used to represent the decimal + */ +template +__device__ void gpuOutputSplitFixedLenByteArrayAsInt(T* dst, + uint8_t const* src, + size_type stride, + uint32_t dtype_len_in) +{ + T unscaled = 0; + // fixed_len_byte_array decimals are big endian + for (unsigned int i = 0; i < dtype_len_in; i++) { + unscaled = (unscaled << 8) | src[i * stride]; + } + // Shift the unscaled value up and back down when it isn't all 8 bytes, + // which sign extend the value for correctly representing negative numbers. + if (dtype_len_in < sizeof(T)) { + unscaled <<= (sizeof(T) - dtype_len_in) * 8; + unscaled >>= (sizeof(T) - dtype_len_in) * 8; + } + *dst = unscaled; +} + } // namespace cudf::io::parquet::detail diff --git a/cpp/src/io/parquet/page_hdr.cu b/cpp/src/io/parquet/page_hdr.cu index 2f8c021700d..2c2eda6ec04 100644 --- a/cpp/src/io/parquet/page_hdr.cu +++ b/cpp/src/io/parquet/page_hdr.cu @@ -176,8 +176,6 @@ __device__ decode_kernel_mask kernel_mask_for_page(PageInfo const& page, } else if (is_string_col(chunk)) { // check for string before byte_stream_split so FLBA will go to the right kernel return decode_kernel_mask::STRING; - } else if (page.encoding == Encoding::BYTE_STREAM_SPLIT) { - return decode_kernel_mask::BYTE_STREAM_SPLIT; } if (!is_nested(chunk) && !is_byte_array(chunk) && !is_boolean(chunk)) { @@ -186,10 +184,16 @@ __device__ decode_kernel_mask kernel_mask_for_page(PageInfo const& page, } else if (page.encoding == Encoding::PLAIN_DICTIONARY || page.encoding == Encoding::RLE_DICTIONARY) { return decode_kernel_mask::FIXED_WIDTH_DICT; + } else if (page.encoding == Encoding::BYTE_STREAM_SPLIT) { + return decode_kernel_mask::BYTE_STREAM_SPLIT_FLAT; } } - // non-string, non-delta + if (page.encoding == Encoding::BYTE_STREAM_SPLIT) { + return decode_kernel_mask::BYTE_STREAM_SPLIT; + } + + // non-string, non-delta, non-split_stream return decode_kernel_mask::GENERAL; } diff --git a/cpp/src/io/parquet/parquet_gpu.hpp b/cpp/src/io/parquet/parquet_gpu.hpp index f19115e7311..72d916939e7 100644 --- a/cpp/src/io/parquet/parquet_gpu.hpp +++ b/cpp/src/io/parquet/parquet_gpu.hpp @@ -209,6 +209,8 @@ enum class decode_kernel_mask { FIXED_WIDTH_NO_DICT = (1 << 5), // Run decode kernel for fixed width non-dictionary pages FIXED_WIDTH_DICT = (1 << 6), // Run decode kernel for fixed width dictionary pages BYTE_STREAM_SPLIT = (1 << 7), // Run decode kernel for BYTE_STREAM_SPLIT encoded data + BYTE_STREAM_SPLIT_FLAT = + (1 << 8), // Run decode kernel for BYTE_STREAM_SPLIT encoded data with a flat schema }; // mask representing all the ways in which a string can be encoded @@ -910,6 +912,28 @@ void DecodePageDataFixedDict(cudf::detail::hostdevice_span pages, kernel_error::pointer error_code, rmm::cuda_stream_view stream); +/** + * @brief Launches kernel for reading dictionary fixed width column data stored in the pages + * + * The page data will be written to the output pointed to in the page's + * associated column chunk. + * + * @param[in,out] pages All pages to be decoded + * @param[in] chunks All chunks to be decoded + * @param[in] num_rows Total number of rows to read + * @param[in] min_row Minimum number of rows to read + * @param[in] level_type_size Size in bytes of the type for level decoding + * @param[out] error_code Error code for kernel failures + * @param[in] stream CUDA stream to use + */ +void DecodeSplitPageDataFlat(cudf::detail::hostdevice_span pages, + cudf::detail::hostdevice_span chunks, + std::size_t num_rows, + size_t min_row, + int level_type_size, + kernel_error::pointer error_code, + rmm::cuda_stream_view stream); + /** * @brief Launches kernel for initializing encoder row group fragments * diff --git a/cpp/src/io/parquet/reader_impl.cpp b/cpp/src/io/parquet/reader_impl.cpp index e4b492e4f19..29d9669c783 100644 --- a/cpp/src/io/parquet/reader_impl.cpp +++ b/cpp/src/io/parquet/reader_impl.cpp @@ -237,6 +237,17 @@ void reader::impl::decode_page_data(bool uses_custom_row_bounds, size_t skip_row streams[s_idx++]); } + // launch byte stream split decoder + if (BitAnd(kernel_mask, decode_kernel_mask::BYTE_STREAM_SPLIT_FLAT) != 0) { + DecodeSplitPageDataFlat(subpass.pages, + pass.chunks, + num_rows, + skip_rows, + level_type_size, + error_code.data(), + streams[s_idx++]); + } + // launch byte stream split decoder if (BitAnd(kernel_mask, decode_kernel_mask::BYTE_STREAM_SPLIT) != 0) { DecodeSplitPageData(subpass.pages, diff --git a/cpp/tests/io/parquet_writer_test.cpp b/cpp/tests/io/parquet_writer_test.cpp index 9258967e799..83f4d13ea6c 100644 --- a/cpp/tests/io/parquet_writer_test.cpp +++ b/cpp/tests/io/parquet_writer_test.cpp @@ -1626,7 +1626,7 @@ TEST_F(ParquetWriterTest, DeltaBinaryStartsWithNulls) TEST_F(ParquetWriterTest, ByteStreamSplit) { constexpr auto num_rows = 100; - + std::mt19937 engine{31337}; auto col0_data = random_values(num_rows); auto col1_data = random_values(num_rows); auto col2_data = random_values(num_rows); @@ -1637,17 +1637,24 @@ TEST_F(ParquetWriterTest, ByteStreamSplit) column_wrapper col2{col2_data.begin(), col2_data.end(), no_nulls()}; column_wrapper col3{col3_data.begin(), col3_data.end(), no_nulls()}; - auto expected = table_view{{col0, col1, col2, col3}}; + // throw in a list to make sure both decoders are working + auto col4 = make_parquet_list_col(engine, num_rows, 5, true); + + auto expected = table_view{{col0, col1, col2, col3, *col4}}; cudf::io::table_input_metadata expected_metadata(expected); expected_metadata.column_metadata[0].set_name("int32s"); expected_metadata.column_metadata[1].set_name("int64s"); expected_metadata.column_metadata[2].set_name("floats"); expected_metadata.column_metadata[3].set_name("doubles"); - for (size_t i = 0; i < expected_metadata.column_metadata.size(); i++) { - expected_metadata.column_metadata[i].set_encoding(cudf::io::column_encoding::BYTE_STREAM_SPLIT); + expected_metadata.column_metadata[4].set_name("int32list"); + auto const encoding = cudf::io::column_encoding::BYTE_STREAM_SPLIT; + for (int i = 0; i < 3; i++) { + expected_metadata.column_metadata[i].set_encoding(encoding); } + expected_metadata.column_metadata[4].child(1).set_encoding(encoding); + auto const filepath = temp_env->get_temp_filepath("ByteStreamSplit.parquet"); cudf::io::parquet_writer_options out_opts = cudf::io::parquet_writer_options::builder(cudf::io::sink_info{filepath}, expected) From b4b318dd7b8db9da76e4942dabb3f5b64618bb64 Mon Sep 17 00:00:00 2001 From: Ed Seidl Date: Tue, 19 Mar 2024 16:49:42 -0700 Subject: [PATCH 17/29] fix typo Co-authored-by: Vukasin Milovanovic --- cpp/include/cudf/io/types.hpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/include/cudf/io/types.hpp b/cpp/include/cudf/io/types.hpp index 713bd1413ea..04203da4e86 100644 --- a/cpp/include/cudf/io/types.hpp +++ b/cpp/include/cudf/io/types.hpp @@ -113,7 +113,7 @@ enum class column_encoding { ///< valid for BYTE_ARRAY columns) DELTA_BYTE_ARRAY, ///< Use DELTA_BYTE_ARRAY encoding (only valid for ///< BYTE_ARRAY and FIXED_LEN_BYTE_ARRAY columns) - BYTE_STREAM_SPLIT, ///< Use BYTE_STREAM_SPLIT encoding (alid for all fixed width types) + BYTE_STREAM_SPLIT, ///< Use BYTE_STREAM_SPLIT encoding (valid for all fixed width types) // ORC encodings: DIRECT, ///< Use DIRECT encoding DIRECT_V2, ///< Use DIRECT_V2 encoding From cfba761ba42828b7ae154f6c548746572bcdbe6b Mon Sep 17 00:00:00 2001 From: seidl Date: Tue, 19 Mar 2024 17:03:42 -0700 Subject: [PATCH 18/29] address some review comments --- cpp/src/io/parquet/page_data.cu | 4 ++-- cpp/src/io/parquet/parquet_gpu.hpp | 25 ++++++++++++------------- 2 files changed, 14 insertions(+), 15 deletions(-) diff --git a/cpp/src/io/parquet/page_data.cu b/cpp/src/io/parquet/page_data.cu index efd356be086..f9c910645c0 100644 --- a/cpp/src/io/parquet/page_data.cu +++ b/cpp/src/io/parquet/page_data.cu @@ -439,8 +439,8 @@ void __host__ DecodePageData(cudf::detail::hostdevice_span pages, /** * @copydoc cudf::io::parquet::detail::DecodePageData */ -void __host__ DecodeSplitPageData(cudf::detail::hostdevice_span& pages, - cudf::detail::hostdevice_span const& chunks, +void __host__ DecodeSplitPageData(cudf::detail::hostdevice_span pages, + cudf::detail::hostdevice_span chunks, size_t num_rows, size_t min_row, int level_type_size, diff --git a/cpp/src/io/parquet/parquet_gpu.hpp b/cpp/src/io/parquet/parquet_gpu.hpp index 72d916939e7..edee763b30d 100644 --- a/cpp/src/io/parquet/parquet_gpu.hpp +++ b/cpp/src/io/parquet/parquet_gpu.hpp @@ -200,17 +200,16 @@ enum level_type { * Used to control which decode kernels to run. */ enum class decode_kernel_mask { - NONE = 0, - GENERAL = (1 << 0), // Run catch-all decode kernel - STRING = (1 << 1), // Run decode kernel for string data - DELTA_BINARY = (1 << 2), // Run decode kernel for DELTA_BINARY_PACKED data - DELTA_BYTE_ARRAY = (1 << 3), // Run decode kernel for DELTA_BYTE_ARRAY encoded data - DELTA_LENGTH_BA = (1 << 4), // Run decode kernel for DELTA_LENGTH_BYTE_ARRAY encoded data - FIXED_WIDTH_NO_DICT = (1 << 5), // Run decode kernel for fixed width non-dictionary pages - FIXED_WIDTH_DICT = (1 << 6), // Run decode kernel for fixed width dictionary pages - BYTE_STREAM_SPLIT = (1 << 7), // Run decode kernel for BYTE_STREAM_SPLIT encoded data - BYTE_STREAM_SPLIT_FLAT = - (1 << 8), // Run decode kernel for BYTE_STREAM_SPLIT encoded data with a flat schema + NONE = 0, + GENERAL = (1 << 0), // Run catch-all decode kernel + STRING = (1 << 1), // Run decode kernel for string data + DELTA_BINARY = (1 << 2), // Run decode kernel for DELTA_BINARY_PACKED data + DELTA_BYTE_ARRAY = (1 << 3), // Run decode kernel for DELTA_BYTE_ARRAY encoded data + DELTA_LENGTH_BA = (1 << 4), // Run decode kernel for DELTA_LENGTH_BYTE_ARRAY encoded data + FIXED_WIDTH_NO_DICT = (1 << 5), // Run decode kernel for fixed width non-dictionary pages + FIXED_WIDTH_DICT = (1 << 6), // Run decode kernel for fixed width dictionary pages + BYTE_STREAM_SPLIT = (1 << 7), // Run decode kernel for BYTE_STREAM_SPLIT encoded data + BYTE_STREAM_SPLIT_FLAT = (1 << 8), // Same as above but with a flat schema }; // mask representing all the ways in which a string can be encoded @@ -772,8 +771,8 @@ void DecodePageData(cudf::detail::hostdevice_span pages, * @param[out] error_code Error code for kernel failures * @param[in] stream CUDA stream to use */ -void DecodeSplitPageData(cudf::detail::hostdevice_span& pages, - cudf::detail::hostdevice_span const& chunks, +void DecodeSplitPageData(cudf::detail::hostdevice_span pages, + cudf::detail::hostdevice_span chunks, size_t num_rows, size_t min_row, int level_type_size, From eb0e5a478febcf163c3e2885fa2e84f12565a750 Mon Sep 17 00:00:00 2001 From: seidl Date: Tue, 19 Mar 2024 17:06:15 -0700 Subject: [PATCH 19/29] seed random number generator --- cpp/tests/io/parquet_writer_test.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/cpp/tests/io/parquet_writer_test.cpp b/cpp/tests/io/parquet_writer_test.cpp index 57c7fdd2a63..7aa262b97b4 100644 --- a/cpp/tests/io/parquet_writer_test.cpp +++ b/cpp/tests/io/parquet_writer_test.cpp @@ -1979,6 +1979,7 @@ TYPED_TEST(ParquetWriterTimestampTypeTest, TimestampOverflow) TYPED_TEST(ParquetWriterTimestampTypeTest, TimestampsByteStreamSplit) { + srand(42); auto sequence = cudf::detail::make_counting_transform_iterator( 0, [](auto i) { return ((std::rand() / 10000) * 1000); }); From cb7aa06129e37f918633158e7953e90c26c60761 Mon Sep 17 00:00:00 2001 From: seidl Date: Wed, 20 Mar 2024 22:53:36 +0000 Subject: [PATCH 20/29] add test that list columns get encoding set properly --- cpp/tests/io/parquet_writer_test.cpp | 24 +++++++++++++++++++++--- 1 file changed, 21 insertions(+), 3 deletions(-) diff --git a/cpp/tests/io/parquet_writer_test.cpp b/cpp/tests/io/parquet_writer_test.cpp index 7aa262b97b4..88c2ffe117b 100644 --- a/cpp/tests/io/parquet_writer_test.cpp +++ b/cpp/tests/io/parquet_writer_test.cpp @@ -1531,6 +1531,7 @@ TEST_F(ParquetWriterTest, UserRequestedEncodings) using cudf::io::column_encoding; using cudf::io::parquet::detail::Encoding; constexpr int num_rows = 500; + std::mt19937 engine{31337}; auto const ones = thrust::make_constant_iterator(1); auto const col = @@ -1540,6 +1541,9 @@ TEST_F(ParquetWriterTest, UserRequestedEncodings) auto const string_col = cudf::test::strings_column_wrapper(strings, strings + num_rows, no_nulls()); + // throw in a list to make sure encoding selection works there too + auto list_col = make_parquet_list_col(engine, num_rows, 5, true); + auto const table = table_view({col, col, col, @@ -1551,7 +1555,8 @@ TEST_F(ParquetWriterTest, UserRequestedEncodings) string_col, string_col, string_col, - string_col}); + string_col, + *list_col}); cudf::io::table_input_metadata table_metadata(table); @@ -1573,10 +1578,17 @@ TEST_F(ParquetWriterTest, UserRequestedEncodings) set_meta(10, "string_db", column_encoding::DELTA_BINARY_PACKED); table_metadata.column_metadata[11].set_name("string_none"); - for (auto& col_meta : table_metadata.column_metadata) { - col_meta.set_nullability(false); + for (int i = 0; i < 12; i++) { + table_metadata.column_metadata[i].set_nullability(false); } + // handle list column separately + table_metadata.column_metadata[12].set_name("int32_list").set_nullability(true); + table_metadata.column_metadata[12] + .child(1) + .set_encoding(column_encoding::DELTA_BINARY_PACKED) + .set_nullability(true); + auto const filepath = temp_env->get_temp_filepath("UserRequestedEncodings.parquet"); cudf::io::parquet_writer_options opts = cudf::io::parquet_writer_options::builder(cudf::io::sink_info{filepath}, table) @@ -1621,6 +1633,12 @@ TEST_F(ParquetWriterTest, UserRequestedEncodings) expect_enc(10, Encoding::PLAIN_DICTIONARY); // no request, should use dictionary expect_enc(11, Encoding::PLAIN_DICTIONARY); + // int list requested delta_binary_packed. it's has level data, so have to search for a match. + auto const encodings = fmd.row_groups[0].columns[12].meta_data.encodings; + auto const has_delta = std::any_of(encodings.begin(), encodings.end(), [](Encoding enc) { + return enc == Encoding::DELTA_BINARY_PACKED; + }); + EXPECT_TRUE(has_delta); } TEST_F(ParquetWriterTest, Decimal128DeltaByteArray) From 255aa812aa0d45be2539e25a8b6d6181d67033dd Mon Sep 17 00:00:00 2001 From: seidl Date: Thu, 21 Mar 2024 16:20:27 -0700 Subject: [PATCH 21/29] address review comments --- cpp/src/io/parquet/decode_fixed.cu | 16 ++-- cpp/src/io/parquet/page_data.cu | 14 ++-- cpp/src/io/parquet/page_data.cuh | 6 +- cpp/src/io/parquet/page_enc.cu | 94 +++++++----------------- cpp/src/io/parquet/page_string_decode.cu | 2 +- 5 files changed, 43 insertions(+), 89 deletions(-) diff --git a/cpp/src/io/parquet/decode_fixed.cu b/cpp/src/io/parquet/decode_fixed.cu index 0fde880b05d..35a383c7345 100644 --- a/cpp/src/io/parquet/decode_fixed.cu +++ b/cpp/src/io/parquet/decode_fixed.cu @@ -267,8 +267,8 @@ __device__ inline void gpuDecodeSplitValues(page_state_s* s, // Note: non-decimal FIXED_LEN_BYTE_ARRAY will be handled in the string reader if (s->col.converted_type == DECIMAL) { switch (dtype) { - case INT32: gpuOutputByteStreamSplit(dst, src, num_values); break; - case INT64: gpuOutputByteStreamSplit(dst, src, num_values); break; + case INT32: gpuOutputByteStreamSplit(dst, src, num_values); break; + case INT64: gpuOutputByteStreamSplit(dst, src, num_values); break; case FIXED_LEN_BYTE_ARRAY: if (s->dtype_len_in <= sizeof(int32_t)) { gpuOutputSplitFixedLenByteArrayAsInt( @@ -293,7 +293,7 @@ __device__ inline void gpuDecodeSplitValues(page_state_s* s, // Reading INT32 TIME_MILLIS into 64-bit DURATION_MILLISECONDS // TIME_MILLIS is the only duration type stored as int32: // https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#deprecated-time-convertedtype - gpuOutputByteStreamSplit(dst, src, num_values); + gpuOutputByteStreamSplit(dst, src, num_values); dst[4] = 0; dst[5] = 0; dst[6] = 0; @@ -302,10 +302,10 @@ __device__ inline void gpuDecodeSplitValues(page_state_s* s, gpuOutputSplitInt64Timestamp( reinterpret_cast(dst), src, num_values, s->ts_scale); } else { - gpuOutputByteStreamSplit(dst, src, num_values); + gpuOutputByteStreamSplit(dst, src, num_values); } } else if (dtype_len == 4) { - gpuOutputByteStreamSplit(dst, src, num_values); + gpuOutputByteStreamSplit(dst, src, num_values); } else { s->set_error_code(decode_error::UNSUPPORTED_ENCODING); } @@ -735,7 +735,7 @@ void __host__ DecodePageDataFixedDict(cudf::detail::hostdevice_span pa // dim3 dim_block(decode_block_size, 1); // decode_block_size = 128 threads per block // 1 full warp, and 1 warp of 1 thread dim3 dim_block(decode_block_size, 1); // decode_block_size = 128 threads per block - dim3 dim_grid(pages.size(), 1); // 1 thread block per pags => # blocks + dim3 dim_grid(pages.size(), 1); // 1 thread block per page => # blocks if (level_type_size == 1) { gpuDecodePageDataFixedDict<<>>( @@ -754,10 +754,8 @@ void __host__ DecodeSplitPageDataFlat(cudf::detail::hostdevice_span pa kernel_error::pointer error_code, rmm::cuda_stream_view stream) { - // dim3 dim_block(decode_block_size, 1); // decode_block_size = 128 threads per block - // 1 full warp, and 1 warp of 1 thread dim3 dim_block(decode_block_size, 1); // decode_block_size = 128 threads per block - dim3 dim_grid(pages.size(), 1); // 1 thread block per pags => # blocks + dim3 dim_grid(pages.size(), 1); // 1 thread block per page => # blocks if (level_type_size == 1) { gpuDecodeSplitPageDataFlat<<>>( diff --git a/cpp/src/io/parquet/page_data.cu b/cpp/src/io/parquet/page_data.cu index f9c910645c0..ad8aab753e6 100644 --- a/cpp/src/io/parquet/page_data.cu +++ b/cpp/src/io/parquet/page_data.cu @@ -122,7 +122,7 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size) // - the row values we get from nz_idx will be // 0, 1, 2, 3, 4 .... // - by shifting these values by first_row, the sequence becomes - // -1, -2, 0, 1, 2 ... + // -2, -1, 0, 1, 2 ... // - so we will end up ignoring the first two input rows, and input rows 2..n will // get written to the output starting at position 0. // @@ -148,8 +148,8 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size) // Note: non-decimal FIXED_LEN_BYTE_ARRAY will be handled in the string reader if (s->col.converted_type == DECIMAL) { switch (dtype) { - case INT32: gpuOutputByteStreamSplit(dst, src, num_values); break; - case INT64: gpuOutputByteStreamSplit(dst, src, num_values); break; + case INT32: gpuOutputByteStreamSplit(dst, src, num_values); break; + case INT64: gpuOutputByteStreamSplit(dst, src, num_values); break; case FIXED_LEN_BYTE_ARRAY: if (s->dtype_len_in <= sizeof(int32_t)) { gpuOutputSplitFixedLenByteArrayAsInt( @@ -174,7 +174,7 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size) // Reading INT32 TIME_MILLIS into 64-bit DURATION_MILLISECONDS // TIME_MILLIS is the only duration type stored as int32: // https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#deprecated-time-convertedtype - gpuOutputByteStreamSplit(dst, src, num_values); + gpuOutputByteStreamSplit(dst, src, num_values); dst[4] = 0; dst[5] = 0; dst[6] = 0; @@ -183,10 +183,10 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size) gpuOutputSplitInt64Timestamp( reinterpret_cast(dst), src, num_values, s->ts_scale); } else { - gpuOutputByteStreamSplit(dst, src, num_values); + gpuOutputByteStreamSplit(dst, src, num_values); } } else if (dtype_len == 4) { - gpuOutputByteStreamSplit(dst, src, num_values); + gpuOutputByteStreamSplit(dst, src, num_values); } else { s->set_error_code(decode_error::UNSUPPORTED_ENCODING); } @@ -316,7 +316,7 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size) // - the row values we get from nz_idx will be // 0, 1, 2, 3, 4 .... // - by shifting these values by first_row, the sequence becomes - // -1, -2, 0, 1, 2 ... + // -2, -1, 0, 1, 2 ... // - so we will end up ignoring the first two input rows, and input rows 2..n will // get written to the output starting at position 0. // diff --git a/cpp/src/io/parquet/page_data.cuh b/cpp/src/io/parquet/page_data.cuh index a6d3b7ab49c..62aa516cfc7 100644 --- a/cpp/src/io/parquet/page_data.cuh +++ b/cpp/src/io/parquet/page_data.cuh @@ -408,10 +408,10 @@ inline __device__ void gpuOutputGeneric( * @param src pointer to first byte of input data in stream 0 * @param stride number of bytes per input stream (M) */ -template +template __device__ inline void gpuOutputByteStreamSplit(uint8_t* dst, uint8_t const* src, size_type stride) { - for (int i = 0; i < byte_length; i++) { + for (int i = 0; i < sizeof(T); i++) { dst[i] = src[i * stride]; } } @@ -432,7 +432,7 @@ inline __device__ void gpuOutputSplitInt64Timestamp(int64_t* dst, size_type stride, int32_t ts_scale) { - gpuOutputByteStreamSplit(reinterpret_cast(dst), src, stride); + gpuOutputByteStreamSplit(reinterpret_cast(dst), src, stride); if (ts_scale < 0) { // round towards negative infinity int sign = (*dst < 0); diff --git a/cpp/src/io/parquet/page_enc.cu b/cpp/src/io/parquet/page_enc.cu index b52a01b38f7..d8c3c860d40 100644 --- a/cpp/src/io/parquet/page_enc.cu +++ b/cpp/src/io/parquet/page_enc.cu @@ -1607,6 +1607,19 @@ __device__ void finish_page_encode(state_buf* s, } } +// Encode a fixed-width data type int `dst`. `dst` points to the first byte +// of the result. `stride` is 1 for PLAIN encoding and num_values for +// BYTE_STREAM_SPLIT. +template +__device__ inline void encode_value(uint8_t* dst, T src, size_type stride) +{ + T v = src; + for (int i = 0; i < sizeof(T); i++) { + dst[i * stride] = v; + v >>= 8; + } +} + // PLAIN page data encoder // blockDim(128, 1, 1) template @@ -1669,6 +1682,8 @@ CUDF_KERNEL void __launch_bounds__(block_size, 8) } __syncthreads(); + auto const stride = is_split_stream ? s->page.num_valid : 1; + for (uint32_t cur_val_idx = 0; cur_val_idx < s->page.num_leaf_values;) { uint32_t nvals = min(s->page.num_leaf_values - cur_val_idx, block_size); uint32_t len, pos; @@ -1739,21 +1754,11 @@ CUDF_KERNEL void __launch_bounds__(block_size, 8) } }(); - if constexpr (is_split_stream) { - auto const stride = s->page.num_valid; - dst[pos + 0 * stride] = v; - dst[pos + 1 * stride] = v >> 8; - dst[pos + 2 * stride] = v >> 16; - dst[pos + 3 * stride] = v >> 24; - } else { - dst[pos + 0] = v; - dst[pos + 1] = v >> 8; - dst[pos + 2] = v >> 16; - dst[pos + 3] = v >> 24; - } + encode_value(dst + pos, v, stride); } break; + case DOUBLE: case INT64: { - int64_t v = s->col.leaf_column->element(val_idx); + auto v = s->col.leaf_column->element(val_idx); int32_t ts_scale = s->col.ts_scale; if (ts_scale != 0) { if (ts_scale < 0) { @@ -1762,26 +1767,7 @@ CUDF_KERNEL void __launch_bounds__(block_size, 8) v *= ts_scale; } } - if constexpr (is_split_stream) { - auto const stride = s->page.num_valid; - dst[pos + 0 * stride] = v; - dst[pos + 1 * stride] = v >> 8; - dst[pos + 2 * stride] = v >> 16; - dst[pos + 3 * stride] = v >> 24; - dst[pos + 4 * stride] = v >> 32; - dst[pos + 5 * stride] = v >> 40; - dst[pos + 6 * stride] = v >> 48; - dst[pos + 7 * stride] = v >> 56; - } else { - dst[pos + 0] = v; - dst[pos + 1] = v >> 8; - dst[pos + 2] = v >> 16; - dst[pos + 3] = v >> 24; - dst[pos + 4] = v >> 32; - dst[pos + 5] = v >> 40; - dst[pos + 6] = v >> 48; - dst[pos + 7] = v >> 56; - } + encode_value(dst + pos, v, stride); } break; case INT96: { // only PLAIN encoding is supported @@ -1811,39 +1797,12 @@ CUDF_KERNEL void __launch_bounds__(block_size, 8) }(); // the 12 bytes of fixed length data. - v = last_day_nanos.count(); - dst[pos + 0] = v; - dst[pos + 1] = v >> 8; - dst[pos + 2] = v >> 16; - dst[pos + 3] = v >> 24; - dst[pos + 4] = v >> 32; - dst[pos + 5] = v >> 40; - dst[pos + 6] = v >> 48; - dst[pos + 7] = v >> 56; - uint32_t w = julian_days.count(); - dst[pos + 8] = w; - dst[pos + 9] = w >> 8; - dst[pos + 10] = w >> 16; - dst[pos + 11] = w >> 24; + v = last_day_nanos.count(); + encode_value(dst + pos, v, 1); + uint32_t w = julian_days.count(); + encode_value(dst + pos + 8, w, 1); } break; - case DOUBLE: { - if (is_split_stream) { - auto const v = s->col.leaf_column->element(val_idx); - auto const stride = s->page.num_valid; - dst[pos + 0 * stride] = v; - dst[pos + 1 * stride] = v >> 8; - dst[pos + 2 * stride] = v >> 16; - dst[pos + 3 * stride] = v >> 24; - dst[pos + 4 * stride] = v >> 32; - dst[pos + 5 * stride] = v >> 40; - dst[pos + 6 * stride] = v >> 48; - dst[pos + 7 * stride] = v >> 56; - } else { - auto v = s->col.leaf_column->element(val_idx); - memcpy(dst + pos, &v, 8); - } - } break; case BYTE_ARRAY: { // only PLAIN encoding is supported auto const bytes = [](cudf::type_id const type_id, @@ -1859,11 +1818,8 @@ CUDF_KERNEL void __launch_bounds__(block_size, 8) default: CUDF_UNREACHABLE("invalid type id for byte array writing!"); } }(type_id, s->col.leaf_column, val_idx); - uint32_t v = len - 4; // string length - dst[pos + 0] = v; - dst[pos + 1] = v >> 8; - dst[pos + 2] = v >> 16; - dst[pos + 3] = v >> 24; + uint32_t v = len - 4; // string length + encode_value(dst + pos, v, 1); if (v != 0) memcpy(dst + pos + 4, bytes, v); } break; case FIXED_LEN_BYTE_ARRAY: { diff --git a/cpp/src/io/parquet/page_string_decode.cu b/cpp/src/io/parquet/page_string_decode.cu index 0130e5f772d..544d6ebf599 100644 --- a/cpp/src/io/parquet/page_string_decode.cu +++ b/cpp/src/io/parquet/page_string_decode.cu @@ -1039,7 +1039,7 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size) // - the row values we get from nz_idx will be // 0, 1, 2, 3, 4 .... // - by shifting these values by first_row, the sequence becomes - // -1, -2, 0, 1, 2 ... + // -2, -1, 0, 1, 2 ... // - so we will end up ignoring the first two input rows, and input rows 2..n will // get written to the output starting at position 0. // From 29d9353c879d8f0c33d58cb5da80d08b580bf4e8 Mon Sep 17 00:00:00 2001 From: seidl Date: Thu, 21 Mar 2024 16:32:47 -0700 Subject: [PATCH 22/29] set size of unused arrays to 0 --- cpp/src/io/parquet/decode_fixed.cu | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cpp/src/io/parquet/decode_fixed.cu b/cpp/src/io/parquet/decode_fixed.cu index 35a383c7345..bae92c537df 100644 --- a/cpp/src/io/parquet/decode_fixed.cu +++ b/cpp/src/io/parquet/decode_fixed.cu @@ -609,8 +609,8 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size) { __shared__ __align__(16) page_state_s state_g; __shared__ __align__(16) page_state_buffers_s // unused in this kernel + 0, // unused in this kernel + 0> // unused in this kernel state_buffers; page_state_s* const s = &state_g; From 03a902fbd5fe98fd0e50fccd3bb605fd84f2ad1e Mon Sep 17 00:00:00 2001 From: seidl Date: Thu, 21 Mar 2024 17:07:12 -0700 Subject: [PATCH 23/29] change more unused arrays to 0 size --- cpp/src/io/parquet/decode_fixed.cu | 6 +++--- cpp/src/io/parquet/parquet_gpu.hpp | 8 +++++++- 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/cpp/src/io/parquet/decode_fixed.cu b/cpp/src/io/parquet/decode_fixed.cu index bae92c537df..c306a7a28b1 100644 --- a/cpp/src/io/parquet/decode_fixed.cu +++ b/cpp/src/io/parquet/decode_fixed.cu @@ -366,8 +366,8 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size) { __shared__ __align__(16) page_state_s state_g; __shared__ __align__(16) page_state_buffers_s // unused in this kernel + 0, // unused in this kernel + 0> // unused in this kernel state_buffers; page_state_s* const s = &state_g; @@ -482,7 +482,7 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size) __shared__ __align__(16) page_state_s state_g; __shared__ __align__(16) page_state_buffers_s // unused in this kernel + 0> // unused in this kernel state_buffers; page_state_s* const s = &state_g; diff --git a/cpp/src/io/parquet/parquet_gpu.hpp b/cpp/src/io/parquet/parquet_gpu.hpp index edee763b30d..38bc7c8c9f1 100644 --- a/cpp/src/io/parquet/parquet_gpu.hpp +++ b/cpp/src/io/parquet/parquet_gpu.hpp @@ -54,7 +54,13 @@ constexpr int LEVEL_DECODE_BUF_SIZE = 2048; template constexpr int rolling_index(int index) { - return index % rolling_size; + // Cannot divide by 0. But `rolling_size` will be 0 for unused arrays, so this case will never + // actual be executed. + if constexpr (rolling_size == 0) { + return index; + } else { + return index % rolling_size; + } } // PARQUET-2261 allows for not writing the level histograms in certain cases. From a674ef97bd4f5ee4f784c98143efe4c29149c0f6 Mon Sep 17 00:00:00 2001 From: seidl Date: Fri, 22 Mar 2024 11:33:39 -0700 Subject: [PATCH 24/29] use a single kernel for PLAIN and BYTE_STREAM_SPLIT --- cpp/src/io/parquet/page_enc.cu | 42 +++++++++++++++------------------- 1 file changed, 19 insertions(+), 23 deletions(-) diff --git a/cpp/src/io/parquet/page_enc.cu b/cpp/src/io/parquet/page_enc.cu index d8c3c860d40..1771509d63f 100644 --- a/cpp/src/io/parquet/page_enc.cu +++ b/cpp/src/io/parquet/page_enc.cu @@ -15,6 +15,7 @@ */ #include "delta_enc.cuh" +#include "io/parquet/parquet_gpu.hpp" #include "io/utilities/block_utils.cuh" #include "page_string_utils.cuh" #include "parquet_gpu.cuh" @@ -1622,13 +1623,14 @@ __device__ inline void encode_value(uint8_t* dst, T src, size_type stride) // PLAIN page data encoder // blockDim(128, 1, 1) -template +template CUDF_KERNEL void __launch_bounds__(block_size, 8) gpuEncodePages(device_span pages, device_span> comp_in, device_span> comp_out, device_span comp_results, - bool write_v2_headers) + bool write_v2_headers, + bool is_split_stream) { __shared__ __align__(8) page_enc_state_s<0> state_g; using block_scan = cub::BlockScan; @@ -1648,11 +1650,9 @@ CUDF_KERNEL void __launch_bounds__(block_size, 8) } __syncthreads(); - if constexpr (is_split_stream) { - if (BitAnd(s->page.kernel_mask, encode_kernel_mask::BYTE_STREAM_SPLIT) == 0) { return; } - } else { - if (BitAnd(s->page.kernel_mask, encode_kernel_mask::PLAIN) == 0) { return; } - } + auto const allowed_mask = + is_split_stream ? encode_kernel_mask::BYTE_STREAM_SPLIT : encode_kernel_mask::PLAIN; + if (BitAnd(s->page.kernel_mask, allowed_mask) == 0) { return; } // Encode data values __syncthreads(); @@ -1671,12 +1671,10 @@ CUDF_KERNEL void __launch_bounds__(block_size, 8) s->rle_pos = 0; s->rle_numvals = 0; s->rle_out = dst; - if constexpr (is_split_stream) { - s->page.encoding = Encoding::BYTE_STREAM_SPLIT; - } else { - s->page.encoding = determine_encoding( - s->page.page_type, physical_type, s->ck.use_dictionary, write_v2_headers); - } + s->page.encoding = + is_split_stream ? Encoding::BYTE_STREAM_SPLIT + : determine_encoding( + s->page.page_type, physical_type, s->ck.use_dictionary, write_v2_headers); s->page_start_val = row_to_value_idx(s->page.start_row, s->col); s->chunk_start_val = row_to_value_idx(s->ck.start_row, s->col); } @@ -1732,7 +1730,7 @@ CUDF_KERNEL void __launch_bounds__(block_size, 8) __syncthreads(); // if BYTE_STREAM_SPLIT, then translate byte positions to indexes - if constexpr (is_split_stream) { + if (is_split_stream) { pos /= dtype_len_out; total_len /= dtype_len_out; } @@ -1827,8 +1825,7 @@ CUDF_KERNEL void __launch_bounds__(block_size, 8) // When using FIXED_LEN_BYTE_ARRAY for decimals, the rep is encoded in big-endian auto const v = s->col.leaf_column->element(val_idx).value(); auto const v_char_ptr = reinterpret_cast(&v); - if constexpr (is_split_stream) { - auto const stride = s->page.num_valid; + if (is_split_stream) { for (int i = dtype_len_out - 1; i >= 0; i--, pos += stride) { dst[pos] = v_char_ptr[i]; } @@ -1847,9 +1844,8 @@ CUDF_KERNEL void __launch_bounds__(block_size, 8) // for BYTE_STREAM_SPLIT, s->cur now points to the end of the first stream. // need it to point to the end of the Nth stream. - if constexpr (is_split_stream) { - if (t == 0) { s->cur += (dtype_len_out - 1) * s->page.num_valid; } - } + if (is_split_stream and t == 0) { s->cur += (dtype_len_out - 1) * s->page.num_valid; + } finish_page_encode( s, s->cur, pages, comp_in, comp_out, comp_results, write_v2_headers); } @@ -3431,15 +3427,15 @@ void EncodePages(device_span pages, auto const strm = streams[s_idx++]; gpuEncodePageLevels<<>>( pages, write_v2_headers, encode_kernel_mask::PLAIN); - gpuEncodePages<<>>( - pages, comp_in, comp_out, comp_results, write_v2_headers); + gpuEncodePages<<>>( + pages, comp_in, comp_out, comp_results, write_v2_headers, false); } if (BitAnd(kernel_mask, encode_kernel_mask::BYTE_STREAM_SPLIT) != 0) { auto const strm = streams[s_idx++]; gpuEncodePageLevels<<>>( pages, write_v2_headers, encode_kernel_mask::BYTE_STREAM_SPLIT); - gpuEncodePages<<>>( - pages, comp_in, comp_out, comp_results, write_v2_headers); + gpuEncodePages<<>>( + pages, comp_in, comp_out, comp_results, write_v2_headers, true); } if (BitAnd(kernel_mask, encode_kernel_mask::DELTA_BINARY) != 0) { auto const strm = streams[s_idx++]; From 0d05ab4346e7b7d91a4138298c86633f95e521ef Mon Sep 17 00:00:00 2001 From: seidl Date: Fri, 22 Mar 2024 11:39:15 -0700 Subject: [PATCH 25/29] use size of 1 for unused arrays --- cpp/src/io/parquet/decode_fixed.cu | 10 +++++----- cpp/src/io/parquet/page_delta_decode.cu | 6 +++--- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/cpp/src/io/parquet/decode_fixed.cu b/cpp/src/io/parquet/decode_fixed.cu index c306a7a28b1..35a383c7345 100644 --- a/cpp/src/io/parquet/decode_fixed.cu +++ b/cpp/src/io/parquet/decode_fixed.cu @@ -366,8 +366,8 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size) { __shared__ __align__(16) page_state_s state_g; __shared__ __align__(16) page_state_buffers_s // unused in this kernel + 1, // unused in this kernel + 1> // unused in this kernel state_buffers; page_state_s* const s = &state_g; @@ -482,7 +482,7 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size) __shared__ __align__(16) page_state_s state_g; __shared__ __align__(16) page_state_buffers_s // unused in this kernel + 1> // unused in this kernel state_buffers; page_state_s* const s = &state_g; @@ -609,8 +609,8 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size) { __shared__ __align__(16) page_state_s state_g; __shared__ __align__(16) page_state_buffers_s // unused in this kernel + 1, // unused in this kernel + 1> // unused in this kernel state_buffers; page_state_s* const s = &state_g; diff --git a/cpp/src/io/parquet/page_delta_decode.cu b/cpp/src/io/parquet/page_delta_decode.cu index 7c0092c6185..da1bbaebd73 100644 --- a/cpp/src/io/parquet/page_delta_decode.cu +++ b/cpp/src/io/parquet/page_delta_decode.cu @@ -315,7 +315,7 @@ CUDF_KERNEL void __launch_bounds__(96) using cudf::detail::warp_size; __shared__ __align__(16) delta_binary_decoder db_state; __shared__ __align__(16) page_state_s state_g; - __shared__ __align__(16) page_state_buffers_s state_buffers; + __shared__ __align__(16) page_state_buffers_s state_buffers; page_state_s* const s = &state_g; auto* const sb = &state_buffers; @@ -440,7 +440,7 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size) using cudf::detail::warp_size; __shared__ __align__(16) delta_byte_array_decoder db_state; __shared__ __align__(16) page_state_s state_g; - __shared__ __align__(16) page_state_buffers_s state_buffers; + __shared__ __align__(16) page_state_buffers_s state_buffers; page_state_s* const s = &state_g; auto* const sb = &state_buffers; @@ -605,7 +605,7 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size) using cudf::detail::warp_size; __shared__ __align__(16) delta_binary_decoder db_state; __shared__ __align__(16) page_state_s state_g; - __shared__ __align__(16) page_state_buffers_s state_buffers; + __shared__ __align__(16) page_state_buffers_s state_buffers; __shared__ __align__(8) uint8_t const* page_string_data; __shared__ size_t string_offset; From 6e07ca2d1cb282e62bc4a2aa675cb34f306d1fc5 Mon Sep 17 00:00:00 2001 From: seidl Date: Fri, 22 Mar 2024 18:57:14 +0000 Subject: [PATCH 26/29] fix errant newline --- cpp/src/io/parquet/page_enc.cu | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/cpp/src/io/parquet/page_enc.cu b/cpp/src/io/parquet/page_enc.cu index 1771509d63f..b65bd68ba67 100644 --- a/cpp/src/io/parquet/page_enc.cu +++ b/cpp/src/io/parquet/page_enc.cu @@ -1844,8 +1844,7 @@ CUDF_KERNEL void __launch_bounds__(block_size, 8) // for BYTE_STREAM_SPLIT, s->cur now points to the end of the first stream. // need it to point to the end of the Nth stream. - if (is_split_stream and t == 0) { s->cur += (dtype_len_out - 1) * s->page.num_valid; - } + if (is_split_stream and t == 0) { s->cur += (dtype_len_out - 1) * s->page.num_valid; } finish_page_encode( s, s->cur, pages, comp_in, comp_out, comp_results, write_v2_headers); } From 7cdf10b8b8cfe3fd56e47353e7855df4e1332750 Mon Sep 17 00:00:00 2001 From: seidl Date: Wed, 27 Mar 2024 15:23:29 +0000 Subject: [PATCH 27/29] finish merge --- cpp/src/io/parquet/decode_fixed.cu | 6 ++++-- cpp/src/io/parquet/page_data.cu | 6 ++++-- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/cpp/src/io/parquet/decode_fixed.cu b/cpp/src/io/parquet/decode_fixed.cu index 479fbd80532..b7f3e15037d 100644 --- a/cpp/src/io/parquet/decode_fixed.cu +++ b/cpp/src/io/parquet/decode_fixed.cu @@ -238,7 +238,7 @@ __device__ inline void gpuDecodeSplitValues(page_state_s* s, auto const t = threadIdx.x; PageNestingDecodeInfo* nesting_info_base = s->nesting_info; - int const dtype = s->col.data_type & 7; + int const dtype = s->col.physical_type; auto const data_len = thrust::distance(s->data_start, s->data_end); auto const num_values = data_len / s->dtype_len_in; @@ -263,9 +263,11 @@ __device__ inline void gpuDecodeSplitValues(page_state_s* s, uint8_t const* src = s->data_start + src_pos; uint8_t* dst = nesting_info_base[leaf_level_index].data_out + static_cast(dst_pos) * dtype_len; + auto const is_decimal = + s->col.logical_type.has_value() and s->col.logical_type->type == LogicalType::DECIMAL; // Note: non-decimal FIXED_LEN_BYTE_ARRAY will be handled in the string reader - if (s->col.converted_type == DECIMAL) { + if (is_decimal) { switch (dtype) { case INT32: gpuOutputByteStreamSplit(dst, src, num_values); break; case INT64: gpuOutputByteStreamSplit(dst, src, num_values); break; diff --git a/cpp/src/io/parquet/page_data.cu b/cpp/src/io/parquet/page_data.cu index 8c9667b8ea3..6d29bc38fe7 100644 --- a/cpp/src/io/parquet/page_data.cu +++ b/cpp/src/io/parquet/page_data.cu @@ -109,7 +109,7 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size) gpuDecodeLevels(s, sb, target_pos, rep, def, t); } else { // WARP1..WARP3: Decode values - int const dtype = s->col.data_type & 7; + int const dtype = s->col.physical_type; src_pos += t - out_thread0; // the position in the output column/buffer @@ -144,9 +144,11 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size) uint8_t const* src = s->data_start + val_src_pos; uint8_t* dst = nesting_info_base[leaf_level_index].data_out + static_cast(dst_pos) * dtype_len; + auto const is_decimal = + s->col.logical_type.has_value() and s->col.logical_type->type == LogicalType::DECIMAL; // Note: non-decimal FIXED_LEN_BYTE_ARRAY will be handled in the string reader - if (s->col.converted_type == DECIMAL) { + if (is_decimal) { switch (dtype) { case INT32: gpuOutputByteStreamSplit(dst, src, num_values); break; case INT64: gpuOutputByteStreamSplit(dst, src, num_values); break; From 6702bda7352a4faf424d54e18cdf421bbad58631 Mon Sep 17 00:00:00 2001 From: seidl Date: Tue, 9 Apr 2024 15:15:09 -0700 Subject: [PATCH 28/29] test more duration types and fix a small bug --- cpp/tests/io/parquet_writer_test.cpp | 55 ++++++++++------------------ 1 file changed, 19 insertions(+), 36 deletions(-) diff --git a/cpp/tests/io/parquet_writer_test.cpp b/cpp/tests/io/parquet_writer_test.cpp index 88c2ffe117b..1f8dc247420 100644 --- a/cpp/tests/io/parquet_writer_test.cpp +++ b/cpp/tests/io/parquet_writer_test.cpp @@ -31,7 +31,7 @@ using cudf::test::iterators::no_nulls; template -void test_durations(mask_op_t mask_op) +void test_durations(mask_op_t mask_op, bool use_byte_stream_split) { std::default_random_engine generator; std::uniform_int_distribution distribution_d(0, 30); @@ -63,6 +63,13 @@ void test_durations(mask_op_t mask_op) auto expected = table_view{{durations_d, durations_s, durations_ms, durations_us, durations_ns}}; + if (use_byte_stream_split) { + cudf::io::table_input_metadata expected_metadata(expected); + for (auto& col_meta : expected_metadata.column_metadata) { + col_meta.set_encoding(cudf::io::column_encoding::BYTE_STREAM_SPLIT); + } + } + auto filepath = temp_env->get_temp_filepath("Durations.parquet"); cudf::io::parquet_writer_options out_opts = cudf::io::parquet_writer_options::builder(cudf::io::sink_info{filepath}, expected); @@ -87,10 +94,10 @@ void test_durations(mask_op_t mask_op) TEST_F(ParquetWriterTest, Durations) { - test_durations([](auto i) { return true; }); - test_durations([](auto i) { return (i % 2) != 0; }); - test_durations([](auto i) { return (i % 3) != 0; }); - test_durations([](auto i) { return false; }); + test_durations([](auto i) { return true; }, false); + test_durations([](auto i) { return (i % 2) != 0; }, false); + test_durations([](auto i) { return (i % 3) != 0; }, false); + test_durations([](auto i) { return false; }, false); } TEST_F(ParquetWriterTest, MultiIndex) @@ -1725,7 +1732,7 @@ TEST_F(ParquetWriterTest, ByteStreamSplit) expected_metadata.column_metadata[3].set_name("doubles"); expected_metadata.column_metadata[4].set_name("int32list"); auto const encoding = cudf::io::column_encoding::BYTE_STREAM_SPLIT; - for (int i = 0; i < 3; i++) { + for (int i = 0; i <= 3; i++) { expected_metadata.column_metadata[i].set_encoding(encoding); } @@ -1763,8 +1770,8 @@ TEST_F(ParquetWriterTest, DecimalByteStreamSplit) expected_metadata.column_metadata[0].set_name("int32s").set_decimal_precision(7); expected_metadata.column_metadata[1].set_name("int64s").set_decimal_precision(11); expected_metadata.column_metadata[2].set_name("int128s").set_decimal_precision(22); - for (size_t i = 0; i < expected_metadata.column_metadata.size(); i++) { - expected_metadata.column_metadata[i].set_encoding(cudf::io::column_encoding::BYTE_STREAM_SPLIT); + for (auto& col_meta : expected_metadata.column_metadata) { + col_meta.set_encoding(cudf::io::column_encoding::BYTE_STREAM_SPLIT); } auto const filepath = temp_env->get_temp_filepath("DecimalByteStreamSplit.parquet"); @@ -1782,34 +1789,10 @@ TEST_F(ParquetWriterTest, DecimalByteStreamSplit) TEST_F(ParquetWriterTest, DurationByteStreamSplit) { - constexpr cudf::size_type num_rows = 100; - auto seq_col0 = random_values(num_rows); - auto seq_col1 = random_values(num_rows); - - auto durations_ms = cudf::test::fixed_width_column_wrapper( - seq_col0.begin(), seq_col0.end(), no_nulls()); - auto durations_us = cudf::test::fixed_width_column_wrapper( - seq_col1.begin(), seq_col1.end(), no_nulls()); - - auto expected = table_view{{durations_ms, durations_us}}; - cudf::io::table_input_metadata expected_metadata(expected); - expected_metadata.column_metadata[0].set_name("millis"); - expected_metadata.column_metadata[1].set_name("micros"); - for (size_t i = 0; i < expected_metadata.column_metadata.size(); i++) { - expected_metadata.column_metadata[i].set_encoding(cudf::io::column_encoding::BYTE_STREAM_SPLIT); - } - - auto filepath = temp_env->get_temp_filepath("DurationByteStreamSplit.parquet"); - cudf::io::parquet_writer_options out_opts = - cudf::io::parquet_writer_options::builder(cudf::io::sink_info{filepath}, expected) - .metadata(expected_metadata); - cudf::io::write_parquet(out_opts); - - cudf::io::parquet_reader_options in_opts = - cudf::io::parquet_reader_options::builder(cudf::io::source_info{filepath}); - auto result = cudf::io::read_parquet(in_opts); - - CUDF_TEST_EXPECT_TABLES_EQUAL(expected, result.tbl->view()); + test_durations([](auto i) { return true; }, true); + test_durations([](auto i) { return (i % 2) != 0; }, true); + test_durations([](auto i) { return (i % 3) != 0; }, true); + test_durations([](auto i) { return false; }, true); } ///////////////////////////////////////////////////////////// From 47ca0af60be43a9872c63da37f2d8eaab29604e9 Mon Sep 17 00:00:00 2001 From: seidl Date: Wed, 17 Apr 2024 14:21:31 +0000 Subject: [PATCH 29/29] address review comments --- cpp/src/io/parquet/decode_fixed.cu | 6 ++---- cpp/src/io/parquet/page_data.cu | 6 ++---- cpp/src/io/parquet/page_data.cuh | 6 +++--- cpp/src/io/parquet/page_enc.cu | 34 +++++++++++++++--------------- 4 files changed, 24 insertions(+), 28 deletions(-) diff --git a/cpp/src/io/parquet/decode_fixed.cu b/cpp/src/io/parquet/decode_fixed.cu index b7f3e15037d..f3332a23992 100644 --- a/cpp/src/io/parquet/decode_fixed.cu +++ b/cpp/src/io/parquet/decode_fixed.cu @@ -296,10 +296,8 @@ __device__ inline void gpuDecodeSplitValues(page_state_s* s, // TIME_MILLIS is the only duration type stored as int32: // https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#deprecated-time-convertedtype gpuOutputByteStreamSplit(dst, src, num_values); - dst[4] = 0; - dst[5] = 0; - dst[6] = 0; - dst[7] = 0; + // zero out most significant bytes + memset(dst + 4, 0, 4); } else if (s->ts_scale) { gpuOutputSplitInt64Timestamp( reinterpret_cast(dst), src, num_values, s->ts_scale); diff --git a/cpp/src/io/parquet/page_data.cu b/cpp/src/io/parquet/page_data.cu index 6d29bc38fe7..7207173b82f 100644 --- a/cpp/src/io/parquet/page_data.cu +++ b/cpp/src/io/parquet/page_data.cu @@ -177,10 +177,8 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size) // TIME_MILLIS is the only duration type stored as int32: // https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#deprecated-time-convertedtype gpuOutputByteStreamSplit(dst, src, num_values); - dst[4] = 0; - dst[5] = 0; - dst[6] = 0; - dst[7] = 0; + // zero out most significant bytes + memset(dst + 4, 0, 4); } else if (s->ts_scale) { gpuOutputSplitInt64Timestamp( reinterpret_cast(dst), src, num_values, s->ts_scale); diff --git a/cpp/src/io/parquet/page_data.cuh b/cpp/src/io/parquet/page_data.cuh index 2c8686f3681..f182747650e 100644 --- a/cpp/src/io/parquet/page_data.cuh +++ b/cpp/src/io/parquet/page_data.cuh @@ -398,10 +398,10 @@ inline __device__ void gpuOutputGeneric( } /** - * Output a BYTE_STREAM_SPLIT value of length `byte_length`. + * Output a BYTE_STREAM_SPLIT value of type `T`. * - * Data is encoded as N streams of length M, forming an NxM sized matrix. Rows are streams, - * columns are individual values. + * Data is encoded as N == sizeof(T) streams of length M, forming an NxM sized matrix. + * Rows are streams, columns are individual values. * * @param dst pointer to output data * @param src pointer to first byte of input data in stream 0 diff --git a/cpp/src/io/parquet/page_enc.cu b/cpp/src/io/parquet/page_enc.cu index 6096bd439fc..30859d7d6be 100644 --- a/cpp/src/io/parquet/page_enc.cu +++ b/cpp/src/io/parquet/page_enc.cu @@ -239,8 +239,10 @@ void __device__ calculate_frag_size(frag_init_state_s* const s, int t) Encoding __device__ determine_encoding(PageType page_type, Type physical_type, bool use_dictionary, - bool write_v2_headers) + bool write_v2_headers, + bool is_split_stream) { + if (is_split_stream) { return Encoding::BYTE_STREAM_SPLIT; } // NOTE: For dictionary encoding, parquet v2 recommends using PLAIN in dictionary page and // RLE_DICTIONARY in data page, but parquet v1 uses PLAIN_DICTIONARY in both dictionary and // data pages (actual encoding is identical). @@ -1666,15 +1668,13 @@ CUDF_KERNEL void __launch_bounds__(block_size, 8) }(); if (t == 0) { - uint8_t* dst = s->cur; - s->rle_run = 0; - s->rle_pos = 0; - s->rle_numvals = 0; - s->rle_out = dst; - s->page.encoding = - is_split_stream ? Encoding::BYTE_STREAM_SPLIT - : determine_encoding( - s->page.page_type, physical_type, s->ck.use_dictionary, write_v2_headers); + uint8_t* dst = s->cur; + s->rle_run = 0; + s->rle_pos = 0; + s->rle_numvals = 0; + s->rle_out = dst; + s->page.encoding = determine_encoding( + s->page.page_type, physical_type, s->ck.use_dictionary, write_v2_headers, is_split_stream); s->page_start_val = row_to_value_idx(s->page.start_row, s->col); s->chunk_start_val = row_to_value_idx(s->ck.start_row, s->col); } @@ -1895,13 +1895,13 @@ CUDF_KERNEL void __launch_bounds__(block_size, 8) ? s->ck.dict_rle_bits : -1; if (t == 0) { - uint8_t* dst = s->cur; - s->rle_run = 0; - s->rle_pos = 0; - s->rle_numvals = 0; - s->rle_out = dst; - s->page.encoding = - determine_encoding(s->page.page_type, physical_type, s->ck.use_dictionary, write_v2_headers); + uint8_t* dst = s->cur; + s->rle_run = 0; + s->rle_pos = 0; + s->rle_numvals = 0; + s->rle_out = dst; + s->page.encoding = determine_encoding( + s->page.page_type, physical_type, s->ck.use_dictionary, write_v2_headers, false); if (dict_bits >= 0 && physical_type != BOOLEAN) { dst[0] = dict_bits; s->rle_out = dst + 1;