Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add BYTE_STREAM_SPLIT support to Parquet #15311

Merged
merged 55 commits into from
Apr 24, 2024
Merged
Show file tree
Hide file tree
Changes from 48 commits
Commits
Show all changes
55 commits
Select commit Hold shift + click to select a range
39defae
initial cut
etseidl Mar 7, 2024
a2bf4c5
checkpoint
etseidl Mar 8, 2024
fec05e9
leave room for new microkernels
etseidl Mar 8, 2024
fe10804
checkpoint
etseidl Mar 8, 2024
16e961a
checkpoint
etseidl Mar 8, 2024
9de287f
checkpoint
etseidl Mar 8, 2024
8b75a6d
formatting
etseidl Mar 8, 2024
760ca0c
int and float working
etseidl Mar 12, 2024
cfa51e3
get decimals working
etseidl Mar 12, 2024
832428d
add more tests
etseidl Mar 12, 2024
55ed69c
clean up some dead code
etseidl Mar 12, 2024
9316f6c
update comment
etseidl Mar 12, 2024
cba0a33
only update cur ptr on t0
etseidl Mar 13, 2024
6b991ac
Merge remote-tracking branch 'origin/branch-24.04' into byte_stream_s…
etseidl Mar 14, 2024
ee7919c
rework kernel_mask_for_page
etseidl Mar 14, 2024
d6f5569
fix setting encoding on list children
etseidl Mar 14, 2024
56365b5
add flat version of decoder
etseidl Mar 14, 2024
fb691bc
Merge branch 'rapidsai:branch-24.04' into byte_stream_split
etseidl Mar 14, 2024
b7723b9
Merge branch 'branch-24.04' into byte_stream_split
etseidl Mar 18, 2024
0730d69
Merge branch 'branch-24.04' into byte_stream_split
etseidl Mar 18, 2024
bd81391
Merge branch 'rapidsai:branch-24.04' into byte_stream_split
etseidl Mar 19, 2024
b4b318d
fix typo
etseidl Mar 19, 2024
cfba761
address some review comments
etseidl Mar 20, 2024
eb0e5a4
seed random number generator
etseidl Mar 20, 2024
dada972
Merge remote-tracking branch 'origin/branch-24.04' into byte_stream_s…
etseidl Mar 20, 2024
242d30f
Merge remote-tracking branch 'origin/branch-24.06' into byte_stream_s…
etseidl Mar 20, 2024
cb7aa06
add test that list columns get encoding set properly
etseidl Mar 20, 2024
150b654
Merge remote-tracking branch 'origin/branch-24.06' into byte_stream_s…
etseidl Mar 20, 2024
255aa81
address review comments
etseidl Mar 21, 2024
29d9353
set size of unused arrays to 0
etseidl Mar 21, 2024
37ef728
Merge remote-tracking branch 'origin/branch-24.06' into byte_stream_s…
etseidl Mar 21, 2024
03a902f
change more unused arrays to 0 size
etseidl Mar 22, 2024
a674ef9
use a single kernel for PLAIN and BYTE_STREAM_SPLIT
etseidl Mar 22, 2024
0d05ab4
use size of 1 for unused arrays
etseidl Mar 22, 2024
6e07ca2
fix errant newline
etseidl Mar 22, 2024
374dbf7
Merge branch 'branch-24.06' into byte_stream_split
etseidl Mar 22, 2024
9736c2b
Merge branch 'rapidsai:branch-24.06' into byte_stream_split
etseidl Mar 27, 2024
7cdf10b
finish merge
etseidl Mar 27, 2024
d331dc9
Merge branch 'branch-24.06' into byte_stream_split
etseidl Mar 28, 2024
776b95b
Merge branch 'branch-24.06' into byte_stream_split
etseidl Apr 1, 2024
52c1500
Merge branch 'rapidsai:branch-24.06' into byte_stream_split
etseidl Apr 2, 2024
1fffda8
Merge branch 'rapidsai:branch-24.06' into byte_stream_split
etseidl Apr 4, 2024
1d2f395
Merge branch 'branch-24.06' into byte_stream_split
etseidl Apr 6, 2024
dc766e6
Merge branch 'rapidsai:branch-24.06' into byte_stream_split
etseidl Apr 8, 2024
9141101
Merge branch 'branch-24.06' into byte_stream_split
etseidl Apr 9, 2024
6702bda
test more duration types and fix a small bug
etseidl Apr 9, 2024
69ac97d
Merge remote-tracking branch 'origin/branch-24.06' into byte_stream_s…
etseidl Apr 9, 2024
d58ef9a
Merge branch 'rapidsai:branch-24.06' into byte_stream_split
etseidl Apr 16, 2024
47ca0af
address review comments
etseidl Apr 17, 2024
2fd9dfd
Merge remote-tracking branch 'origin/branch-24.06' into byte_stream_s…
etseidl Apr 17, 2024
c329145
Merge branch 'branch-24.06' into byte_stream_split
vuule Apr 17, 2024
25d2674
Merge branch 'rapidsai:branch-24.06' into byte_stream_split
etseidl Apr 18, 2024
6198644
Merge branch 'branch-24.06' into byte_stream_split
vuule Apr 18, 2024
daeb886
Merge branch 'branch-24.06' into byte_stream_split
etseidl Apr 23, 2024
d147c70
Merge remote-tracking branch 'origin/branch-24.06' into byte_stream_s…
etseidl Apr 23, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cpp/include/cudf/io/types.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ enum class column_encoding {
///< valid for BYTE_ARRAY columns)
DELTA_BYTE_ARRAY, ///< Use DELTA_BYTE_ARRAY encoding (only valid for
///< BYTE_ARRAY and FIXED_LEN_BYTE_ARRAY columns)
BYTE_STREAM_SPLIT, ///< Use BYTE_STREAM_SPLIT encoding (valid for all fixed width types)
// ORC encodings:
DIRECT, ///< Use DIRECT encoding
DIRECT_V2, ///< Use DIRECT_V2 encoding
Expand Down
231 changes: 230 additions & 1 deletion cpp/src/io/parquet/decode_fixed.cu
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,98 @@ __device__ inline void gpuDecodeValues(
}
}

template <typename state_buf>
__device__ inline void gpuDecodeSplitValues(page_state_s* s,
state_buf* const sb,
int start,
int end)
{
using cudf::detail::warp_size;
constexpr int num_warps = decode_block_size / warp_size;
constexpr int max_batch_size = num_warps * warp_size;

auto const t = threadIdx.x;

PageNestingDecodeInfo* nesting_info_base = s->nesting_info;
int const dtype = s->col.physical_type;
auto const data_len = thrust::distance(s->data_start, s->data_end);
auto const num_values = data_len / s->dtype_len_in;

// decode values
int pos = start;
while (pos < end) {
int const batch_size = min(max_batch_size, end - pos);

int const target_pos = pos + batch_size;
int const src_pos = pos + t;

// the position in the output column/buffer
int dst_pos = sb->nz_idx[rolling_index<state_buf::nz_buf_size>(src_pos)] - s->first_row;

// target_pos will always be properly bounded by num_rows, but dst_pos may be negative (values
// before first_row) in the flat hierarchy case.
if (src_pos < target_pos && dst_pos >= 0) {
// nesting level that is storing actual leaf values
int const leaf_level_index = s->col.max_nesting_depth - 1;

uint32_t dtype_len = s->dtype_len;
uint8_t const* src = s->data_start + src_pos;
uint8_t* dst =
nesting_info_base[leaf_level_index].data_out + static_cast<size_t>(dst_pos) * dtype_len;
auto const is_decimal =
s->col.logical_type.has_value() and s->col.logical_type->type == LogicalType::DECIMAL;

// Note: non-decimal FIXED_LEN_BYTE_ARRAY will be handled in the string reader
if (is_decimal) {
switch (dtype) {
case INT32: gpuOutputByteStreamSplit<int32_t>(dst, src, num_values); break;
case INT64: gpuOutputByteStreamSplit<int64_t>(dst, src, num_values); break;
case FIXED_LEN_BYTE_ARRAY:
if (s->dtype_len_in <= sizeof(int32_t)) {
gpuOutputSplitFixedLenByteArrayAsInt(
reinterpret_cast<int32_t*>(dst), src, num_values, s->dtype_len_in);
break;
} else if (s->dtype_len_in <= sizeof(int64_t)) {
gpuOutputSplitFixedLenByteArrayAsInt(
reinterpret_cast<int64_t*>(dst), src, num_values, s->dtype_len_in);
break;
} else if (s->dtype_len_in <= sizeof(__int128_t)) {
gpuOutputSplitFixedLenByteArrayAsInt(
reinterpret_cast<__int128_t*>(dst), src, num_values, s->dtype_len_in);
break;
}
// unsupported decimal precision
[[fallthrough]];

default: s->set_error_code(decode_error::UNSUPPORTED_ENCODING);
}
} else if (dtype_len == 8) {
if (s->dtype_len_in == 4) {
// Reading INT32 TIME_MILLIS into 64-bit DURATION_MILLISECONDS
// TIME_MILLIS is the only duration type stored as int32:
// https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#deprecated-time-convertedtype
gpuOutputByteStreamSplit<int32_t>(dst, src, num_values);
dst[4] = 0;
dst[5] = 0;
dst[6] = 0;
dst[7] = 0;
etseidl marked this conversation as resolved.
Show resolved Hide resolved
} else if (s->ts_scale) {
mhaseeb123 marked this conversation as resolved.
Show resolved Hide resolved
gpuOutputSplitInt64Timestamp(
reinterpret_cast<int64_t*>(dst), src, num_values, s->ts_scale);
} else {
gpuOutputByteStreamSplit<int64_t>(dst, src, num_values);
}
} else if (dtype_len == 4) {
gpuOutputByteStreamSplit<int32_t>(dst, src, num_values);
} else {
s->set_error_code(decode_error::UNSUPPORTED_ENCODING);
}
}

pos += batch_size;
}
}

// is the page marked nullable or not
__device__ inline bool is_nullable(page_state_s* s)
{
Expand Down Expand Up @@ -495,6 +587,123 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size)
if (t == 0 and s->error != 0) { set_error(s->error, error_code); }
}

/**
* @brief Kernel for computing fixed width non dictionary column data stored in the pages
*
* This function will write the page data and the page data's validity to the
* output specified in the page's column chunk. If necessary, additional
* conversion will be performed to translate from the Parquet datatype to
* desired output datatype.
*
* @param pages List of pages
* @param chunks List of column chunks
* @param min_row Row index to start reading at
* @param num_rows Maximum number of rows to read
* @param error_code Error code to set if an error is encountered
*/
template <typename level_t>
CUDF_KERNEL void __launch_bounds__(decode_block_size)
gpuDecodeSplitPageDataFlat(PageInfo* pages,
device_span<ColumnChunkDesc const> chunks,
size_t min_row,
size_t num_rows,
kernel_error::pointer error_code)
{
__shared__ __align__(16) page_state_s state_g;
__shared__ __align__(16) page_state_buffers_s<rolling_buf_size, // size of nz_idx buffer
1, // unused in this kernel
1> // unused in this kernel
vuule marked this conversation as resolved.
Show resolved Hide resolved
state_buffers;

page_state_s* const s = &state_g;
auto* const sb = &state_buffers;
int const page_idx = blockIdx.x;
int const t = threadIdx.x;
PageInfo* pp = &pages[page_idx];

if (!(BitAnd(pages[page_idx].kernel_mask, decode_kernel_mask::BYTE_STREAM_SPLIT_FLAT))) {
return;
}

// must come after the kernel mask check
[[maybe_unused]] null_count_back_copier _{s, t};

if (!setupLocalPageInfo(s,
pp,
chunks,
min_row,
num_rows,
mask_filter{decode_kernel_mask::BYTE_STREAM_SPLIT_FLAT},
page_processing_stage::DECODE)) {
return;
}

// the level stream decoders
__shared__ rle_run<level_t> def_runs[rle_run_buffer_size];
rle_stream<level_t, decode_block_size, rolling_buf_size> def_decoder{def_runs};

// if we have no work to do (eg, in a skip_rows/num_rows case) in this page.
if (s->num_rows == 0) { return; }

bool const nullable = is_nullable(s);
bool const nullable_with_nulls = nullable && has_nulls(s);

// initialize the stream decoders (requires values computed in setupLocalPageInfo)
level_t* const def = reinterpret_cast<level_t*>(pp->lvl_decode_buf[level_type::DEFINITION]);
if (nullable_with_nulls) {
def_decoder.init(s->col.level_bits[level_type::DEFINITION],
s->abs_lvl_start[level_type::DEFINITION],
s->abs_lvl_end[level_type::DEFINITION],
def,
s->page.num_input_values);
}
__syncthreads();

// We use two counters in the loop below: processed_count and valid_count.
// - processed_count: number of rows out of num_input_values that we have decoded so far.
// the definition stream returns the number of total rows it has processed in each call
// to decode_next and we accumulate in process_count.
// - valid_count: number of non-null rows we have decoded so far. In each iteration of the
// loop below, we look at the number of valid items (which could be all for non-nullable),
// and valid_count is that running count.
int processed_count = 0;
int valid_count = 0;
// the core loop. decode batches of level stream data using rle_stream objects
// and pass the results to gpuDecodeValues
while (s->error == 0 && processed_count < s->page.num_input_values) {
int next_valid_count;

// only need to process definition levels if this is a nullable column
if (nullable) {
if (nullable_with_nulls) {
processed_count += def_decoder.decode_next(t);
__syncthreads();
} else {
processed_count += min(rolling_buf_size, s->page.num_input_values - processed_count);
}

next_valid_count = gpuUpdateValidityOffsetsAndRowIndicesFlat<true, level_t>(
processed_count, s, sb, def, t, nullable_with_nulls);
}
// if we wanted to split off the skip_rows/num_rows case into a separate kernel, we could skip
// this function call entirely since all it will ever generate is a mapping of (i -> i) for
// nz_idx. gpuDecodeValues would be the only work that happens.
else {
processed_count += min(rolling_buf_size, s->page.num_input_values - processed_count);
next_valid_count = gpuUpdateValidityOffsetsAndRowIndicesFlat<false, level_t>(
processed_count, s, sb, nullptr, t, false);
}
__syncthreads();

// decode the values themselves
gpuDecodeSplitValues(s, sb, valid_count, next_valid_count);
__syncthreads();

valid_count = next_valid_count;
}
if (t == 0 and s->error != 0) { set_error(s->error, error_code); }
}

} // anonymous namespace

void __host__ DecodePageDataFixed(cudf::detail::hostdevice_span<PageInfo> pages,
Expand Down Expand Up @@ -528,7 +737,7 @@ void __host__ DecodePageDataFixedDict(cudf::detail::hostdevice_span<PageInfo> pa
// dim3 dim_block(decode_block_size, 1); // decode_block_size = 128 threads per block
// 1 full warp, and 1 warp of 1 thread
dim3 dim_block(decode_block_size, 1); // decode_block_size = 128 threads per block
dim3 dim_grid(pages.size(), 1); // 1 thread block per pags => # blocks
dim3 dim_grid(pages.size(), 1); // 1 thread block per page => # blocks

if (level_type_size == 1) {
gpuDecodePageDataFixedDict<uint8_t><<<dim_grid, dim_block, 0, stream.value()>>>(
Expand All @@ -539,4 +748,24 @@ void __host__ DecodePageDataFixedDict(cudf::detail::hostdevice_span<PageInfo> pa
}
}

void __host__ DecodeSplitPageDataFlat(cudf::detail::hostdevice_span<PageInfo> pages,
cudf::detail::hostdevice_span<ColumnChunkDesc const> chunks,
size_t num_rows,
size_t min_row,
int level_type_size,
kernel_error::pointer error_code,
rmm::cuda_stream_view stream)
{
dim3 dim_block(decode_block_size, 1); // decode_block_size = 128 threads per block
dim3 dim_grid(pages.size(), 1); // 1 thread block per page => # blocks

if (level_type_size == 1) {
gpuDecodeSplitPageDataFlat<uint8_t><<<dim_grid, dim_block, 0, stream.value()>>>(
pages.device_ptr(), chunks, min_row, num_rows, error_code);
} else {
gpuDecodeSplitPageDataFlat<uint16_t><<<dim_grid, dim_block, 0, stream.value()>>>(
pages.device_ptr(), chunks, min_row, num_rows, error_code);
}
}

} // namespace cudf::io::parquet::detail
Loading
Loading