diff --git a/cpp/src/io/orc/dict_enc.cu b/cpp/src/io/orc/dict_enc.cu index 5be75350951..0cb5c382631 100644 --- a/cpp/src/io/orc/dict_enc.cu +++ b/cpp/src/io/orc/dict_enc.cu @@ -77,20 +77,6 @@ void rowgroup_char_counts(device_2dspan counts, counts, orc_columns, rowgroup_bounds, str_col_indexes); } -template -CUDF_KERNEL void __launch_bounds__(block_size) - initialize_dictionary_hash_maps_kernel(device_span dictionaries) -{ - auto const dict_map = dictionaries[blockIdx.x].map_slots; - auto const t = threadIdx.x; - for (size_type i = 0; i < dict_map.size(); i += block_size) { - if (t + i < dict_map.size()) { - new (&dict_map[t + i].first) map_type::atomic_key_type{KEY_SENTINEL}; - new (&dict_map[t + i].second) map_type::atomic_mapped_type{VALUE_SENTINEL}; - } - } -} - struct equality_functor { column_device_view const& col; __device__ bool operator()(size_type lhs_idx, size_type rhs_idx) const @@ -109,6 +95,9 @@ struct hash_functor { } }; +// Probing scheme to use for the hash map +using probing_scheme_type = cuco::linear_probing; + template CUDF_KERNEL void __launch_bounds__(block_size) populate_dictionary_hash_maps_kernel(device_2dspan dictionaries, @@ -121,26 +110,34 @@ CUDF_KERNEL void __launch_bounds__(block_size) auto const& col = columns[dict.column_idx]; // Make a view of the hash map - auto hash_map_mutable = map_type::device_mutable_view(dict.map_slots.data(), - dict.map_slots.size(), - cuco::empty_key{KEY_SENTINEL}, - cuco::empty_value{VALUE_SENTINEL}); auto const hash_fn = hash_functor{col}; auto const equality_fn = equality_functor{col}; + storage_ref_type const storage_ref{dict.map_slots.size(), dict.map_slots.data()}; + // Make a view of the hash map. + auto hash_map_ref = cuco::static_map_ref{cuco::empty_key{KEY_SENTINEL}, + cuco::empty_value{VALUE_SENTINEL}, + equality_fn, + probing_scheme_type{hash_fn}, + cuco::thread_scope_block, + storage_ref}; + + // Create a map ref with `cuco::insert` operator + auto has_map_insert_ref = hash_map_ref.rebind_operators(cuco::insert); + auto const start_row = dict.start_row; auto const end_row = dict.start_row + dict.num_rows; size_type entry_count{0}; size_type char_count{0}; + // all threads should loop the same number of times for (thread_index_type cur_row = start_row + t; cur_row - t < end_row; cur_row += block_size) { auto const is_valid = cur_row < end_row and col.is_valid(cur_row); if (is_valid) { // insert element at cur_row to hash map and count successful insertions - auto const is_unique = - hash_map_mutable.insert(std::pair(cur_row, cur_row), hash_fn, equality_fn); + auto const is_unique = has_map_insert_ref.insert(cuco::pair{cur_row, cur_row}); if (is_unique) { ++entry_count; @@ -175,24 +172,23 @@ CUDF_KERNEL void __launch_bounds__(block_size) if (not dict.is_enabled) { return; } auto const t = threadIdx.x; - auto map = map_type::device_view(dict.map_slots.data(), - dict.map_slots.size(), - cuco::empty_key{KEY_SENTINEL}, - cuco::empty_value{VALUE_SENTINEL}); - __shared__ cuda::atomic counter; using cuda::std::memory_order_relaxed; if (t == 0) { new (&counter) cuda::atomic{0}; } __syncthreads(); + for (size_type i = 0; i < dict.map_slots.size(); i += block_size) { if (t + i < dict.map_slots.size()) { - auto* slot = reinterpret_cast(map.begin_slot() + t + i); - auto key = slot->first; - if (key != KEY_SENTINEL) { - auto loc = counter.fetch_add(1, memory_order_relaxed); - dict.data[loc] = key; - slot->second = loc; + auto window = dict.map_slots.begin() + t + i; + // Collect all slots from each window. + for (auto& slot : *window) { + auto const key = slot.first; + if (key != KEY_SENTINEL) { + auto loc = counter.fetch_add(1, memory_order_relaxed); + dict.data[loc] = key; + slot.second = loc; + } } } } @@ -205,47 +201,42 @@ CUDF_KERNEL void __launch_bounds__(block_size) { auto const col_idx = blockIdx.x; auto const stripe_idx = blockIdx.y; + auto const t = threadIdx.x; auto const& dict = dictionaries[col_idx][stripe_idx]; auto const& col = columns[dict.column_idx]; if (not dict.is_enabled) { return; } - auto const t = threadIdx.x; + // Make a view of the hash map + auto const hash_fn = hash_functor{col}; + auto const equality_fn = equality_functor{col}; + + storage_ref_type const storage_ref{dict.map_slots.size(), dict.map_slots.data()}; + // Make a view of the hash map. + auto hash_map_ref = cuco::static_map_ref{cuco::empty_key{KEY_SENTINEL}, + cuco::empty_value{VALUE_SENTINEL}, + equality_fn, + probing_scheme_type{hash_fn}, + cuco::thread_scope_block, + storage_ref}; + + // Create a map ref with `cuco::insert` operator + auto has_map_find_ref = hash_map_ref.rebind_operators(cuco::find); + auto const start_row = dict.start_row; auto const end_row = dict.start_row + dict.num_rows; - auto const map = map_type::device_view(dict.map_slots.data(), - dict.map_slots.size(), - cuco::empty_key{KEY_SENTINEL}, - cuco::empty_value{VALUE_SENTINEL}); - - thread_index_type cur_row = start_row + t; - while (cur_row < end_row) { + for (thread_index_type cur_row = start_row + t; cur_row < end_row; cur_row += block_size) { if (col.is_valid(cur_row)) { - auto const hash_fn = hash_functor{col}; - auto const equality_fn = equality_functor{col}; - auto const found_slot = map.find(cur_row, hash_fn, equality_fn); - cudf_assert(found_slot != map.end() && + auto const found_slot = has_map_find_ref.find(cur_row); + // Fail if we didn't find the previously inserted key. + cudf_assert(found_slot != has_map_find_ref.end() && "Unable to find value in map in dictionary index construction"); - if (found_slot != map.end()) { - // No need for atomic as this is not going to be modified by any other thread - auto const val_ptr = reinterpret_cast(&found_slot->second); - dict.index[cur_row] = *val_ptr; - } + dict.index[cur_row] = found_slot->second; } - cur_row += block_size; } } -void initialize_dictionary_hash_maps(device_2dspan dictionaries, - rmm::cuda_stream_view stream) -{ - if (dictionaries.count() == 0) { return; } - constexpr int block_size = 1024; - initialize_dictionary_hash_maps_kernel - <<>>(dictionaries.flat_view()); -} - void populate_dictionary_hash_maps(device_2dspan dictionaries, device_span columns, rmm::cuda_stream_view stream) diff --git a/cpp/src/io/orc/orc_gpu.hpp b/cpp/src/io/orc/orc_gpu.hpp index 8c7ccf0527f..0949fafe9a4 100644 --- a/cpp/src/io/orc/orc_gpu.hpp +++ b/cpp/src/io/orc/orc_gpu.hpp @@ -21,6 +21,7 @@ #include "io/utilities/column_buffer.hpp" #include "orc.hpp" +#include #include #include #include @@ -40,19 +41,27 @@ namespace gpu { using cudf::detail::device_2dspan; using cudf::detail::host_2dspan; +using key_type = size_type; +using mapped_type = size_type; +using slot_type = cuco::pair; +auto constexpr map_cg_size = + 1; ///< A CUDA Cooperative Group of 1 thread (set for best performance) to handle each subset. + ///< Note: Adjust insert and find loops to use `cg::tile` if increasing this. +auto constexpr window_size = + 1; ///< Number of concurrent slots (set for best performance) handled by each thread. +auto constexpr occupancy_factor = 1.43f; ///< cuCollections suggests using a hash map of size + ///< N * (1/0.7) = 1.43 to target a 70% occupancy factor. +using storage_type = cuco::aow_storage, + cudf::detail::cuco_allocator>; +using storage_ref_type = typename storage_type::ref_type; +using window_type = typename storage_type::window_type; +using slot_type = cuco::pair; + auto constexpr KEY_SENTINEL = size_type{-1}; auto constexpr VALUE_SENTINEL = size_type{-1}; -using map_type = cuco::legacy::static_map; - -/** - * @brief The alias of `map_type::pair_atomic_type` class. - * - * Declare this struct by trivial subclassing instead of type aliasing so we can have forward - * declaration of this struct somewhere else. - */ -struct slot_type : public map_type::slot_type {}; - struct CompressedStreamInfo { CompressedStreamInfo() = default; explicit constexpr CompressedStreamInfo(uint8_t const* compressed_data_, size_t compressed_size_) @@ -184,11 +193,11 @@ struct StripeStream { */ struct stripe_dictionary { // input - device_span map_slots; // hash map storage - uint32_t column_idx = 0; // column index - size_type start_row = 0; // first row in the stripe - size_type start_rowgroup = 0; // first rowgroup in the stripe - size_type num_rows = 0; // number of rows in the stripe + device_span map_slots; // hash map (windows) storage + uint32_t column_idx = 0; // column index + size_type start_row = 0; // first row in the stripe + size_type start_rowgroup = 0; // first rowgroup in the stripe + size_type num_rows = 0; // number of rows in the stripe // output device_span data; // index of elements in the column to include in the dictionary diff --git a/cpp/src/io/orc/writer_impl.cu b/cpp/src/io/orc/writer_impl.cu index 60a64fb0ee6..b09062f700e 100644 --- a/cpp/src/io/orc/writer_impl.cu +++ b/cpp/src/io/orc/writer_impl.cu @@ -20,6 +20,7 @@ */ #include "io/comp/nvcomp_adapter.hpp" +#include "io/orc/orc_gpu.hpp" #include "io/statistics/column_statistics.cuh" #include "io/utilities/column_utils.cuh" #include "writer_impl.hpp" @@ -2110,7 +2111,9 @@ stripe_dictionaries build_dictionaries(orc_table_view& orc_table, bool sort_dictionaries, rmm::cuda_stream_view stream) { - std::vector>> hash_maps_storage( + // Variable to keep track of the current total map storage size + size_t total_map_storage_size = 0; + std::vector> hash_maps_storage_offsets( orc_table.string_column_indices.size()); for (auto col_idx : orc_table.string_column_indices) { auto& str_column = orc_table.column(col_idx); @@ -2119,14 +2122,21 @@ stripe_dictionaries build_dictionaries(orc_table_view& orc_table, stripe.size == 0 ? 0 : segmentation.rowgroups[stripe.first + stripe.size - 1][col_idx].end - segmentation.rowgroups[stripe.first][col_idx].begin; - hash_maps_storage[str_column.str_index()].emplace_back(stripe_num_rows * 1.43, stream); + hash_maps_storage_offsets[str_column.str_index()].emplace_back(total_map_storage_size); + total_map_storage_size += stripe_num_rows * gpu::occupancy_factor; } + hash_maps_storage_offsets[str_column.str_index()].emplace_back(total_map_storage_size); } hostdevice_2dvector stripe_dicts( orc_table.num_string_columns(), segmentation.num_stripes(), stream); if (stripe_dicts.count() == 0) return {std::move(stripe_dicts), {}, {}}; + // Create a single bulk storage to use for all sub-dictionaries + auto map_storage = std::make_unique( + total_map_storage_size, + cudf::detail::cuco_allocator{rmm::mr::polymorphic_allocator{}, stream}); + // Initialize stripe dictionaries for (auto col_idx : orc_table.string_column_indices) { auto& str_column = orc_table.column(col_idx); @@ -2137,7 +2147,9 @@ stripe_dictionaries build_dictionaries(orc_table_view& orc_table, auto const stripe_idx = stripe.id; auto& sd = stripe_dicts[str_col_idx][stripe_idx]; - sd.map_slots = hash_maps_storage[str_col_idx][stripe_idx]; + sd.map_slots = {map_storage->data() + hash_maps_storage_offsets[str_col_idx][stripe_idx], + hash_maps_storage_offsets[str_col_idx][stripe_idx + 1] - + hash_maps_storage_offsets[str_col_idx][stripe_idx]}; sd.column_idx = col_idx; sd.start_row = segmentation.rowgroups[stripe.first][col_idx].begin; sd.start_rowgroup = stripe.first; @@ -2150,7 +2162,7 @@ stripe_dictionaries build_dictionaries(orc_table_view& orc_table, } stripe_dicts.host_to_device_async(stream); - gpu::initialize_dictionary_hash_maps(stripe_dicts, stream); + map_storage->initialize_async({gpu::KEY_SENTINEL, gpu::VALUE_SENTINEL}, {stream.value()}); gpu::populate_dictionary_hash_maps(stripe_dicts, orc_table.d_columns, stream); // Copy the entry counts and char counts from the device to the host stripe_dicts.device_to_host_sync(stream); @@ -2184,8 +2196,7 @@ stripe_dictionaries build_dictionaries(orc_table_view& orc_table, col_use_dictionary = true; } else { // Clear hash map storage as dictionary encoding is not used for this stripe - hash_maps_storage[str_col_idx][stripe_idx] = rmm::device_uvector(0, stream); - sd.map_slots = {}; + sd.map_slots = {}; } } // If any stripe uses dictionary encoding, allocate index storage for the whole column @@ -2203,7 +2214,7 @@ stripe_dictionaries build_dictionaries(orc_table_view& orc_table, gpu::get_dictionary_indices(stripe_dicts, orc_table.d_columns, stream); // deallocate hash map storage, unused after this point - hash_maps_storage.clear(); + map_storage.reset(); // Clear map slots and attach order buffers auto dictionaries_flat = stripe_dicts.host_view().flat_view(); diff --git a/cpp/src/io/parquet/chunk_dict.cu b/cpp/src/io/parquet/chunk_dict.cu index 1a2a9eac17d..b85ebf2fa1a 100644 --- a/cpp/src/io/parquet/chunk_dict.cu +++ b/cpp/src/io/parquet/chunk_dict.cu @@ -194,17 +194,12 @@ struct map_find_fn { val_idx += block_size) { // Find the key using a single thread for best performance for now. if (data_col.is_valid(val_idx)) { + auto const found_slot = map_find_ref.find(val_idx); + // Fail if we didn't find the previously inserted key. + cudf_assert(found_slot != map_find_ref.end() && + "Unable to find value in map in dictionary index construction"); // No need for atomic as this is not going to be modified by any other thread. - chunk->dict_index[val_idx - s_ck_start_val_idx] = [&]() { - auto const found_slot = map_find_ref.find(val_idx); - - // Fail if we didn't find the previously inserted key. - cudf_assert(found_slot != map_find_ref.end() && - "Unable to find value in map in dictionary index construction"); - - // Return the found value. - return found_slot->second; - }(); + chunk->dict_index[val_idx - s_ck_start_val_idx] = found_slot->second; } } } else {