diff --git a/cpp/src/community/k_truss_impl.cuh b/cpp/src/community/k_truss_impl.cuh index 84aebc00bb1..77c858a2020 100644 --- a/cpp/src/community/k_truss_impl.cuh +++ b/cpp/src/community/k_truss_impl.cuh @@ -17,9 +17,9 @@ #include "prims/edge_bucket.cuh" #include "prims/extract_transform_e.cuh" -#include "prims/per_v_pair_dst_nbr_intersection.cuh" #include "prims/extract_transform_v_frontier_outgoing_e.cuh" #include "prims/fill_edge_property.cuh" +#include "prims/per_v_pair_dst_nbr_intersection.cuh" #include "prims/transform_e.cuh" #include "prims/transform_reduce_dst_nbr_intersection_of_e_endpoints_by_v.cuh" #include "prims/update_edge_src_dst_property.cuh" @@ -41,18 +41,17 @@ #include #include #include + #include using namespace std::chrono; namespace cugraph { template -void order_edge_based_on_dodg( - raft::handle_t const& handle, - graph_view_t & graph_view, - raft::device_span edgelist_srcs, - raft::device_span edgelist_dsts - ) +void order_edge_based_on_dodg(raft::handle_t const& handle, + graph_view_t& graph_view, + raft::device_span edgelist_srcs, + raft::device_span edgelist_dsts) { // FIXME: Use global comm for debugging purposes @@ -67,123 +66,105 @@ void order_edge_based_on_dodg( auto vertex_partition_range_lasts = graph_view.vertex_partition_range_lasts(); rmm::device_uvector d_vertex_partition_range_lasts(vertex_partition_range_lasts.size(), - handle.get_stream()); + handle.get_stream()); raft::update_device(d_vertex_partition_range_lasts.data(), - vertex_partition_range_lasts.data(), - vertex_partition_range_lasts.size(), - handle.get_stream()); - - auto func = cugraph::detail::compute_gpu_id_from_int_edge_endpoints_t{ - raft::device_span(d_vertex_partition_range_lasts.data(), - d_vertex_partition_range_lasts.size()), - comm_size, - major_comm_size, - minor_comm_size}; + vertex_partition_range_lasts.data(), + vertex_partition_range_lasts.size(), + handle.get_stream()); + auto func = cugraph::detail::compute_gpu_id_from_int_edge_endpoints_t{ + raft::device_span(d_vertex_partition_range_lasts.data(), + d_vertex_partition_range_lasts.size()), + comm_size, + major_comm_size, + minor_comm_size}; rmm::device_uvector cp_edgelist_srcs(edgelist_srcs.size(), handle.get_stream()); rmm::device_uvector cp_edgelist_dsts(edgelist_srcs.size(), handle.get_stream()); - thrust::copy( - handle.get_thrust_policy(), - thrust::make_zip_iterator(edgelist_srcs.begin(), edgelist_srcs.begin()), - thrust::make_zip_iterator(edgelist_dsts.end(), edgelist_dsts.end()), - thrust::make_zip_iterator(cp_edgelist_srcs.begin(), cp_edgelist_dsts.begin())); + thrust::copy(handle.get_thrust_policy(), + thrust::make_zip_iterator(edgelist_srcs.begin(), edgelist_srcs.begin()), + thrust::make_zip_iterator(edgelist_dsts.end(), edgelist_dsts.end()), + thrust::make_zip_iterator(cp_edgelist_srcs.begin(), cp_edgelist_dsts.begin())); auto d_tx_counts = cugraph::groupby_and_count( thrust::make_zip_iterator(edgelist_srcs.begin(), edgelist_dsts.begin()), thrust::make_zip_iterator(edgelist_srcs.end(), edgelist_dsts.end()), - [func]__device__(auto val) { - return func(val); - }, - comm_size, - std::numeric_limits::max(), - handle.get_stream()); - + [func] __device__(auto val) { return func(val); }, + comm_size, + std::numeric_limits::max(), + handle.get_stream()); std::vector h_tx_counts(d_tx_counts.size()); - raft::update_host(h_tx_counts.data(), - d_tx_counts.data(), - d_tx_counts.size(), - handle.get_stream()); - + raft::update_host( + h_tx_counts.data(), d_tx_counts.data(), d_tx_counts.size(), handle.get_stream()); + rmm::device_uvector srcs(0, handle.get_stream()); rmm::device_uvector dsts(0, handle.get_stream()); std::vector rx_counts{}; std::tie(srcs, rx_counts) = shuffle_values(handle.get_comms(), edgelist_srcs.begin(), h_tx_counts, handle.get_stream()); - + std::tie(dsts, std::ignore) = shuffle_values(handle.get_comms(), edgelist_dsts.begin(), h_tx_counts, handle.get_stream()); - - //rmm::device_uvector edge_exists(0, handle.get_stream()); + // rmm::device_uvector edge_exists(0, handle.get_stream()); + + auto edge_exists = + graph_view.has_edge(handle, + raft::device_span(srcs.data(), srcs.size()), + raft::device_span(dsts.data(), dsts.size())); - auto edge_exists = graph_view.has_edge( - handle, - raft::device_span(srcs.data(), srcs.size()), - raft::device_span(dsts.data(), dsts.size()) - ); - // Send the result back std::tie(edge_exists, std::ignore) = - shuffle_values(handle.get_comms(), edge_exists.begin(), rx_counts, handle.get_stream()); - + shuffle_values(handle.get_comms(), edge_exists.begin(), rx_counts, handle.get_stream()); + // The 'edge_exists' array is ordered based on 'cp_edgelist_srcs' where the edges where group, // hoever it needs to match 'edgelist_srcs', hence re-order 'edge_exists' accordingly. - thrust::sort_by_key( - handle.get_thrust_policy(), - thrust::make_zip_iterator( - cp_edgelist_srcs.begin(), - cp_edgelist_dsts.begin()), - thrust::make_zip_iterator( - cp_edgelist_srcs.end(), - cp_edgelist_dsts.end()), - edge_exists.begin()); - - thrust::transform( - handle.get_thrust_policy(), - thrust::make_counting_iterator(0), - thrust::make_counting_iterator(edgelist_srcs.size()), - edge_exists.begin(), - [ - edge_exists = edge_exists.data(), - edgelist_first = thrust::make_zip_iterator(edgelist_srcs.begin(), edgelist_dsts.begin()), - cp_edgelist_first = thrust::make_zip_iterator(cp_edgelist_srcs.begin(), cp_edgelist_dsts.begin()), - cp_edgelist_last = thrust::make_zip_iterator(cp_edgelist_srcs.end(), cp_edgelist_dsts.end()) - ] __device__(auto idx) { - auto src = thrust::get<0>(edgelist_first[idx]); - auto dst = thrust::get<1>(edgelist_first[idx]); - - auto itr_pair = thrust::lower_bound( - thrust::seq, cp_edgelist_first, cp_edgelist_last, thrust::make_tuple(src, dst)); - - auto idx_pair = thrust::distance(cp_edgelist_first, itr_pair); - - return edge_exists[idx_pair]; - } - ); - - // Match DODG edges + thrust::sort_by_key(handle.get_thrust_policy(), + thrust::make_zip_iterator(cp_edgelist_srcs.begin(), cp_edgelist_dsts.begin()), + thrust::make_zip_iterator(cp_edgelist_srcs.end(), cp_edgelist_dsts.end()), + edge_exists.begin()); + thrust::transform( handle.get_thrust_policy(), thrust::make_counting_iterator(0), thrust::make_counting_iterator(edgelist_srcs.size()), - thrust::make_zip_iterator(edgelist_srcs.begin(), edgelist_dsts.begin()), - [ - edge_exists = edge_exists.data(), - edgelist_first = thrust::make_zip_iterator(edgelist_srcs.begin(), edgelist_dsts.begin()) - ] __device__(auto idx) { + edge_exists.begin(), + [edge_exists = edge_exists.data(), + edgelist_first = thrust::make_zip_iterator(edgelist_srcs.begin(), edgelist_dsts.begin()), + cp_edgelist_first = + thrust::make_zip_iterator(cp_edgelist_srcs.begin(), cp_edgelist_dsts.begin()), + cp_edgelist_last = thrust::make_zip_iterator(cp_edgelist_srcs.end(), + cp_edgelist_dsts.end())] __device__(auto idx) { auto src = thrust::get<0>(edgelist_first[idx]); auto dst = thrust::get<1>(edgelist_first[idx]); - return edge_exists[idx] ? thrust::make_tuple(src, dst) : thrust::make_tuple(dst, src); - } - ); + auto itr_pair = thrust::lower_bound( + thrust::seq, cp_edgelist_first, cp_edgelist_last, thrust::make_tuple(src, dst)); + + auto idx_pair = thrust::distance(cp_edgelist_first, itr_pair); + return edge_exists[idx_pair]; + }); + + // Match DODG edges + thrust::transform(handle.get_thrust_policy(), + thrust::make_counting_iterator(0), + thrust::make_counting_iterator(edgelist_srcs.size()), + thrust::make_zip_iterator(edgelist_srcs.begin(), edgelist_dsts.begin()), + [edge_exists = edge_exists.data(), + edgelist_first = thrust::make_zip_iterator( + edgelist_srcs.begin(), edgelist_dsts.begin())] __device__(auto idx) { + auto src = thrust::get<0>(edgelist_first[idx]); + auto dst = thrust::get<1>(edgelist_first[idx]); + + return edge_exists[idx] ? thrust::make_tuple(src, dst) + : thrust::make_tuple(dst, src); + }); } template @@ -192,8 +173,7 @@ struct extract_weak_edges { __device__ thrust::optional> operator()( vertex_t src, vertex_t dst, thrust::nullopt_t, thrust::nullopt_t, edge_t count) const { - - return ((count < k - 2) && (count > 0)) + return ((count < k - 2) && (count > 0)) ? thrust::optional>{thrust::make_tuple(src, dst)} : thrust::nullopt; } @@ -207,10 +187,9 @@ struct extract_triangles_from_weak_edges { raft::device_span weak_srcs{}; raft::device_span weak_dsts{}; - __device__ thrust::tuple - operator()(edge_t i) const + __device__ thrust::tuple operator()( + edge_t i) const { - auto itr = thrust::upper_bound( thrust::seq, intersection_offsets.begin() + 1, intersection_offsets.end(), i); auto idx = thrust::distance(intersection_offsets.begin() + 1, itr); @@ -223,10 +202,12 @@ struct extract_triangles_from_weak_edges { // Extract (q, r) edges auto edge_q_r = thrust::make_tuple(weak_dsts[chunk_start + idx], intersection_indices[i]); - return thrust::make_tuple( - thrust::get<0>(edge_p_q), thrust::get<1>(edge_p_q), - thrust::get<0>(edge_p_r), thrust::get<1>(edge_p_r), - thrust::get<0>(edge_q_r), thrust::get<1>(edge_q_r)); + return thrust::make_tuple(thrust::get<0>(edge_p_q), + thrust::get<1>(edge_p_q), + thrust::get<0>(edge_p_r), + thrust::get<1>(edge_p_r), + thrust::get<0>(edge_q_r), + thrust::get<1>(edge_q_r)); } }; @@ -303,7 +284,8 @@ k_truss(raft::handle_t const& handle, std::optional> modified_graph{std::nullopt}; std::optional> modified_graph_view{std::nullopt}; - std::optional> undirected_graph_view{std::nullopt}; + std::optional> undirected_graph_view{ + std::nullopt}; std::optional> renumber_map{std::nullopt}; std::optional, weight_t>> edge_weight{std::nullopt}; @@ -311,7 +293,8 @@ k_truss(raft::handle_t const& handle, cugraph::edge_bucket_t edgelist_dodg(handle); - cugraph::edge_property_t, bool> dodg_mask(handle, graph_view); + cugraph::edge_property_t, bool> dodg_mask( + handle, graph_view); // Ideally, leverage the undirected graph derived from k-core undirected_graph_view = graph_view; @@ -410,7 +393,6 @@ k_truss(raft::handle_t const& handle, // 3. Keep only the edges from a low-degree vertex to a high-degree vertex. { - auto cur_graph_view = modified_graph_view ? *modified_graph_view : graph_view; auto vertex_partition_range_lasts = @@ -452,501 +434,466 @@ k_truss(raft::handle_t const& handle, extract_low_to_high_degree_edges_t{}); } - cugraph::fill_edge_property(handle, cur_graph_view, dodg_mask.mutable_view(), bool{false}); // Masking edges not part of the DODG - edgelist_dodg.insert(srcs.begin(), - srcs.end(), - dsts.begin()); - + edgelist_dodg.insert(srcs.begin(), srcs.end(), dsts.begin()); + cugraph::transform_e( - handle, - cur_graph_view, - edgelist_dodg, - cugraph::edge_src_dummy_property_t{}.view(), - cugraph::edge_dst_dummy_property_t{}.view(), - cugraph::edge_dummy_property_t{}.view(), - [] __device__(auto src, auto dst, thrust::nullopt_t, thrust::nullopt_t, thrust::nullopt_t) { - - return true; - }, - dodg_mask.mutable_view(), - false); - + handle, + cur_graph_view, + edgelist_dodg, + cugraph::edge_src_dummy_property_t{}.view(), + cugraph::edge_dst_dummy_property_t{}.view(), + cugraph::edge_dummy_property_t{}.view(), + [] __device__(auto src, auto dst, thrust::nullopt_t, thrust::nullopt_t, thrust::nullopt_t) { + return true; + }, + dodg_mask.mutable_view(), + false); + edgelist_dodg.clear(); } // 4. Compute triangle count using nbr_intersection and unroll weak edges { - auto cur_graph_view = modified_graph_view ? *modified_graph_view : graph_view; - cugraph::edge_property_t weak_edges_mask(handle, cur_graph_view); + cugraph::edge_property_t weak_edges_mask(handle, + cur_graph_view); cugraph::fill_edge_property(handle, cur_graph_view, weak_edges_mask.mutable_view(), bool{true}); - + // Attach mask cur_graph_view.attach_edge_mask(dodg_mask.view()); auto edge_triangle_counts = - edge_triangle_count(handle, cur_graph_view, false); + edge_triangle_count(handle, cur_graph_view, false); cugraph::edge_bucket_t edgelist_weak(handle); cugraph::edge_bucket_t edges_to_decrement_count(handle); - size_t prev_chunk_size = 0; // FIXME: Add support for chunking - + size_t prev_chunk_size = 0; // FIXME: Add support for chunking while (true) { - - cur_graph_view.clear_edge_mask(); - cur_graph_view.attach_edge_mask(dodg_mask.view()); - - // Extract weak edges - auto [weak_edgelist_srcs, weak_edgelist_dsts] = - extract_transform_e(handle, - cur_graph_view, - edge_src_dummy_property_t{}.view(), - edge_dst_dummy_property_t{}.view(), - edge_triangle_counts.view(), - extract_weak_edges{k}); - - auto weak_edgelist_first = thrust::make_zip_iterator( - weak_edgelist_srcs.begin(), weak_edgelist_dsts.begin()); - auto weak_edgelist_last = thrust::make_zip_iterator( - weak_edgelist_srcs.end(), weak_edgelist_dsts.end()); - - // Perform nbr_intersection of the weak edges from the undirected - // graph view - cur_graph_view.clear_edge_mask(); - - // Attach the weak edge mask - cur_graph_view.attach_edge_mask(weak_edges_mask.view()); - - auto [intersection_offsets, intersection_indices] = \ - per_v_pair_dst_nbr_intersection( - handle, - cur_graph_view, - weak_edgelist_first, - weak_edgelist_last, - false); - - // Identify (p, q) edges, and form edges (p, q), (p, r) and (q, r) - // To avoid overcompensation, redirect all edges in the triangle to follow this unique - // pattern: (p, q) then (q, r) then (p, r) - - auto triangles_from_weak_edges = - allocate_dataframe_buffer>( - intersection_indices.size(), - handle.get_stream()); - - // Form (p, q) edges - // Extract triangle from weak - thrust::tabulate( - handle.get_thrust_policy(), - get_dataframe_buffer_begin(triangles_from_weak_edges), - get_dataframe_buffer_end(triangles_from_weak_edges), - extract_triangles_from_weak_edges{ - prev_chunk_size, - raft::device_span(intersection_offsets.data(), - intersection_offsets.size()), - raft::device_span(intersection_indices.data(), - intersection_indices.size()), - raft::device_span(weak_edgelist_srcs.data(), weak_edgelist_srcs.size()), - raft::device_span(weak_edgelist_dsts.data(), weak_edgelist_dsts.size()) + cur_graph_view.clear_edge_mask(); + cur_graph_view.attach_edge_mask(dodg_mask.view()); + + // Extract weak edges + auto [weak_edgelist_srcs, weak_edgelist_dsts] = + extract_transform_e(handle, + cur_graph_view, + edge_src_dummy_property_t{}.view(), + edge_dst_dummy_property_t{}.view(), + edge_triangle_counts.view(), + extract_weak_edges{k}); + + auto weak_edgelist_first = + thrust::make_zip_iterator(weak_edgelist_srcs.begin(), weak_edgelist_dsts.begin()); + auto weak_edgelist_last = + thrust::make_zip_iterator(weak_edgelist_srcs.end(), weak_edgelist_dsts.end()); + + // Perform nbr_intersection of the weak edges from the undirected + // graph view + cur_graph_view.clear_edge_mask(); + + // Attach the weak edge mask + cur_graph_view.attach_edge_mask(weak_edges_mask.view()); + + auto [intersection_offsets, intersection_indices] = per_v_pair_dst_nbr_intersection( + handle, cur_graph_view, weak_edgelist_first, weak_edgelist_last, false); + + // Identify (p, q) edges, and form edges (p, q), (p, r) and (q, r) + // To avoid overcompensation, redirect all edges in the triangle to follow this unique + // pattern: (p, q) then (q, r) then (p, r) + + auto triangles_from_weak_edges = allocate_dataframe_buffer< + thrust::tuple>( + intersection_indices.size(), handle.get_stream()); + + // Form (p, q) edges + // Extract triangle from weak + thrust::tabulate( + handle.get_thrust_policy(), + get_dataframe_buffer_begin(triangles_from_weak_edges), + get_dataframe_buffer_end(triangles_from_weak_edges), + extract_triangles_from_weak_edges{ + prev_chunk_size, + raft::device_span(intersection_offsets.data(), intersection_offsets.size()), + raft::device_span(intersection_indices.data(), + intersection_indices.size()), + raft::device_span(weak_edgelist_srcs.data(), weak_edgelist_srcs.size()), + raft::device_span(weak_edgelist_dsts.data(), weak_edgelist_dsts.size())}); + + cur_graph_view.clear_edge_mask(); + // Check for edge existance on the directed graph view + cur_graph_view.attach_edge_mask(dodg_mask.view()); + + rmm::device_uvector edge_exists(0, handle.get_stream()); + + // Handling (p, r) edges + if constexpr (multi_gpu) { + // (p, q) edges are owned by the current GPU while (p, r) and (q, r) + // can be owned by different GPUs + // Ordering (p, r) edges based on the DODG + order_edge_based_on_dodg( + handle, + cur_graph_view, + raft::device_span(std::get<2>(triangles_from_weak_edges).data(), + std::get<2>(triangles_from_weak_edges).size()), + raft::device_span(std::get<3>(triangles_from_weak_edges).data(), + std::get<3>(triangles_from_weak_edges).size())); + + } else { + edge_exists = cur_graph_view.has_edge( + handle, + raft::device_span(std::get<2>(triangles_from_weak_edges).data(), + intersection_indices.size()), + raft::device_span(std::get<3>(triangles_from_weak_edges).data(), + intersection_indices.size())); + } + + // Handling (q, r) edges + + if constexpr (multi_gpu) { + // (p, q) edges are owned by the current GPU while (p, r) and (q, r) + // can be owned by different GPUs + // Ordering (q, r) edges based on the DODG + order_edge_based_on_dodg( + handle, + cur_graph_view, + raft::device_span(std::get<4>(triangles_from_weak_edges).data(), + std::get<4>(triangles_from_weak_edges).size()), + raft::device_span(std::get<5>(triangles_from_weak_edges).data(), + std::get<5>(triangles_from_weak_edges).size())); + + } else { + edge_exists = cur_graph_view.has_edge( + handle, + raft::device_span(std::get<4>(triangles_from_weak_edges).data(), + intersection_indices.size()), + raft::device_span(std::get<5>(triangles_from_weak_edges).data(), + intersection_indices.size())); + } + + // re-order triangles + // To avoid overcompensation, redirect all edges in the triangle to follow this unique + // pattern: (p, q) then (q, r) then (p, r) + thrust::transform( + handle.get_thrust_policy(), + get_dataframe_buffer_begin(triangles_from_weak_edges), + get_dataframe_buffer_end(triangles_from_weak_edges), + get_dataframe_buffer_begin(triangles_from_weak_edges), + [] __device__(auto triangle) { + auto edge_p_q = thrust::make_tuple(thrust::get<0>(triangle), thrust::get<1>(triangle)); + auto edge_p_r = thrust::make_tuple(thrust::get<2>(triangle), thrust::get<3>(triangle)); + auto edge_q_r = thrust::make_tuple(thrust::get<4>(triangle), thrust::get<5>(triangle)); + + if (thrust::get<1>(edge_p_q) == thrust::get<1>(edge_q_r)) { + if (thrust::get<0>(edge_p_q) == thrust::get<0>(edge_p_r)) { + triangle = thrust::make_tuple(thrust::get<0>(edge_p_r), + thrust::get<1>(edge_p_r), + thrust::get<0>(edge_q_r), + thrust::get<1>(edge_q_r), + thrust::get<0>(edge_p_q), + thrust::get<1>(edge_p_q)); + + } else { + triangle = thrust::make_tuple(thrust::get<0>(edge_p_r), + thrust::get<1>(edge_p_r), + thrust::get<0>(edge_p_q), + thrust::get<1>(edge_p_q), + thrust::get<0>(edge_q_r), + thrust::get<1>(edge_q_r)); + } + } else if (thrust::get<1>(edge_p_q) == thrust::get<0>(edge_q_r)) { + triangle = thrust::make_tuple(thrust::get<0>(edge_p_q), + thrust::get<1>(edge_p_q), + thrust::get<0>(edge_q_r), + thrust::get<1>(edge_q_r), + thrust::get<0>(edge_p_r), + thrust::get<1>(edge_p_r)); } - ); - - cur_graph_view.clear_edge_mask(); - // Check for edge existance on the directed graph view - cur_graph_view.attach_edge_mask(dodg_mask.view()); - - rmm::device_uvector edge_exists(0, handle.get_stream()); - - // Handling (p, r) edges - if constexpr (multi_gpu) { - // (p, q) edges are owned by the current GPU while (p, r) and (q, r) - // can be owned by different GPUs - // Ordering (p, r) edges based on the DODG - order_edge_based_on_dodg( - handle, - cur_graph_view, - raft::device_span( - std::get<2>(triangles_from_weak_edges).data(), - std::get<2>(triangles_from_weak_edges).size()), - raft::device_span( - std::get<3>(triangles_from_weak_edges).data(), - std::get<3>(triangles_from_weak_edges).size()) - ); - - } else { - edge_exists = cur_graph_view.has_edge( - handle, - raft::device_span( - std::get<2>(triangles_from_weak_edges).data(), intersection_indices.size()), - raft::device_span( - std::get<3>(triangles_from_weak_edges).data(), intersection_indices.size()) - ); - } - - // Handling (q, r) edges - - if constexpr (multi_gpu) { - // (p, q) edges are owned by the current GPU while (p, r) and (q, r) - // can be owned by different GPUs - // Ordering (q, r) edges based on the DODG - order_edge_based_on_dodg( - handle, - cur_graph_view, - raft::device_span( - std::get<4>(triangles_from_weak_edges).data(), - std::get<4>(triangles_from_weak_edges).size()), - raft::device_span( - std::get<5>(triangles_from_weak_edges).data(), - std::get<5>(triangles_from_weak_edges).size()) - ); - - } else { - edge_exists = cur_graph_view.has_edge( - handle, - raft::device_span( - std::get<4>(triangles_from_weak_edges).data(), intersection_indices.size()), - raft::device_span( - std::get<5>(triangles_from_weak_edges).data(), intersection_indices.size()) - ); - } - - // re-order triangles - // To avoid overcompensation, redirect all edges in the triangle to follow this unique - // pattern: (p, q) then (q, r) then (p, r) - thrust::transform( - handle.get_thrust_policy(), + + return triangle; + }); + + thrust::sort(handle.get_thrust_policy(), + get_dataframe_buffer_begin(triangles_from_weak_edges), + get_dataframe_buffer_end(triangles_from_weak_edges)); + + auto unique_triangle_end = + thrust::unique(handle.get_thrust_policy(), + get_dataframe_buffer_begin(triangles_from_weak_edges), + get_dataframe_buffer_end(triangles_from_weak_edges)); + + auto num_unique_triangles = thrust::distance( + get_dataframe_buffer_begin(triangles_from_weak_edges), unique_triangle_end); + + resize_dataframe_buffer(triangles_from_weak_edges, num_unique_triangles, handle.get_stream()); + + if constexpr (multi_gpu) { + auto& comm = handle.get_comms(); + auto const comm_size = comm.get_size(); + auto& major_comm = handle.get_subcomm(cugraph::partition_manager::major_comm_name()); + auto const major_comm_size = major_comm.get_size(); + auto& minor_comm = handle.get_subcomm(cugraph::partition_manager::minor_comm_name()); + auto const minor_comm_size = minor_comm.get_size(); + + auto vertex_partition_range_lasts = cur_graph_view.vertex_partition_range_lasts(); + + rmm::device_uvector d_vertex_partition_range_lasts( + vertex_partition_range_lasts.size(), handle.get_stream()); + + raft::update_device(d_vertex_partition_range_lasts.data(), + vertex_partition_range_lasts.data(), + vertex_partition_range_lasts.size(), + handle.get_stream()); + + // FIXME: put the redundant code above in a function + std::tie(triangles_from_weak_edges, std::ignore) = groupby_gpu_id_and_shuffle_values( + handle.get_comms(), get_dataframe_buffer_begin(triangles_from_weak_edges), get_dataframe_buffer_end(triangles_from_weak_edges), - get_dataframe_buffer_begin(triangles_from_weak_edges), - [] __device__(auto triangle) { - auto edge_p_q = thrust::make_tuple(thrust::get<0>(triangle), thrust::get<1>(triangle)); - auto edge_p_r = thrust::make_tuple(thrust::get<2>(triangle), thrust::get<3>(triangle)); - auto edge_q_r = thrust::make_tuple(thrust::get<4>(triangle), thrust::get<5>(triangle)); - - if (thrust::get<1>(edge_p_q) == thrust::get<1>(edge_q_r)) { - if (thrust::get<0>(edge_p_q) == thrust::get<0>(edge_p_r)) { - triangle = thrust::make_tuple( - thrust::get<0>(edge_p_r), thrust::get<1>(edge_p_r), - thrust::get<0>(edge_q_r), thrust::get<1>(edge_q_r), - thrust::get<0>(edge_p_q), thrust::get<1>(edge_p_q) - ); - - } else { - triangle = thrust::make_tuple( - thrust::get<0>(edge_p_r), thrust::get<1>(edge_p_r), - thrust::get<0>(edge_p_q), thrust::get<1>(edge_p_q), - thrust::get<0>(edge_q_r), thrust::get<1>(edge_q_r) - ); - } - } else if (thrust::get<1>(edge_p_q) == thrust::get<0>(edge_q_r)) { - triangle = thrust::make_tuple( - thrust::get<0>(edge_p_q), thrust::get<1>(edge_p_q), - thrust::get<0>(edge_q_r), thrust::get<1>(edge_q_r), - thrust::get<0>(edge_p_r), thrust::get<1>(edge_p_r) - ); - - } - - return triangle; - } - ); - thrust::sort( - handle.get_thrust_policy(), - get_dataframe_buffer_begin(triangles_from_weak_edges), - get_dataframe_buffer_end(triangles_from_weak_edges)); - - auto unique_triangle_end = thrust::unique( - handle.get_thrust_policy(), - get_dataframe_buffer_begin(triangles_from_weak_edges), - get_dataframe_buffer_end(triangles_from_weak_edges)); - - auto num_unique_triangles = thrust::distance(get_dataframe_buffer_begin(triangles_from_weak_edges), unique_triangle_end); - - resize_dataframe_buffer(triangles_from_weak_edges, num_unique_triangles, handle.get_stream()); - - if constexpr (multi_gpu) { - - auto& comm = handle.get_comms(); - auto const comm_size = comm.get_size(); - auto& major_comm = handle.get_subcomm(cugraph::partition_manager::major_comm_name()); - auto const major_comm_size = major_comm.get_size(); - auto& minor_comm = handle.get_subcomm(cugraph::partition_manager::minor_comm_name()); - auto const minor_comm_size = minor_comm.get_size(); - - auto vertex_partition_range_lasts = cur_graph_view.vertex_partition_range_lasts(); - - rmm::device_uvector d_vertex_partition_range_lasts(vertex_partition_range_lasts.size(), - handle.get_stream()); - - raft::update_device(d_vertex_partition_range_lasts.data(), - vertex_partition_range_lasts.data(), - vertex_partition_range_lasts.size(), - handle.get_stream()); - - // FIXME: put the redundant code above in a function - std::tie(triangles_from_weak_edges, std::ignore) = - groupby_gpu_id_and_shuffle_values( - handle.get_comms(), - get_dataframe_buffer_begin(triangles_from_weak_edges), - get_dataframe_buffer_end(triangles_from_weak_edges), - - [key_func = - cugraph::detail::compute_gpu_id_from_int_edge_endpoints_t{ - raft::device_span(d_vertex_partition_range_lasts.data(), - d_vertex_partition_range_lasts.size()), - comm_size, - major_comm_size, - minor_comm_size}] __device__(auto val) {return key_func(thrust::get<0>(val), thrust::get<1>(val));}, - - handle.get_stream() - ); - - unique_triangle_end = thrust::unique( - handle.get_thrust_policy(), - get_dataframe_buffer_begin(triangles_from_weak_edges), - get_dataframe_buffer_end(triangles_from_weak_edges)); - - num_unique_triangles = thrust::distance( - get_dataframe_buffer_begin(triangles_from_weak_edges), unique_triangle_end); - resize_dataframe_buffer(triangles_from_weak_edges, num_unique_triangles, handle.get_stream()); - - } - - auto edgelist_to_update_count = - allocate_dataframe_buffer>(3* num_unique_triangles, - handle.get_stream()); - - // Flatten the triangle to a list of egdes. - thrust::transform( - handle.get_thrust_policy(), - thrust::make_counting_iterator(0), - thrust::make_counting_iterator(size_dataframe_buffer(edgelist_to_update_count)), - get_dataframe_buffer_begin(edgelist_to_update_count), - [ - num_unique_triangles, - triangles_from_weak_edges = get_dataframe_buffer_begin(triangles_from_weak_edges) - ] __device__(auto idx) { - auto idx_triangle = idx % num_unique_triangles; - auto idx_vertex_in_triangle = idx / num_unique_triangles; - auto triangle = (triangles_from_weak_edges + idx_triangle).get_iterator_tuple(); - vertex_t src; - vertex_t dst; - - if (idx_vertex_in_triangle == 0) { - src = *(thrust::get<0>(triangle)); - dst = *(thrust::get<1>(triangle)); - } + [key_func = + cugraph::detail::compute_gpu_id_from_int_edge_endpoints_t{ + raft::device_span(d_vertex_partition_range_lasts.data(), + d_vertex_partition_range_lasts.size()), + comm_size, + major_comm_size, + minor_comm_size}] __device__(auto val) { + return key_func(thrust::get<0>(val), thrust::get<1>(val)); + }, - if (idx_vertex_in_triangle == 1) { - src = *(thrust::get<2>(triangle)); - dst = *(thrust::get<3>(triangle)); - } + handle.get_stream()); + + unique_triangle_end = thrust::unique(handle.get_thrust_policy(), + get_dataframe_buffer_begin(triangles_from_weak_edges), + get_dataframe_buffer_end(triangles_from_weak_edges)); + + num_unique_triangles = thrust::distance( + get_dataframe_buffer_begin(triangles_from_weak_edges), unique_triangle_end); + resize_dataframe_buffer( + triangles_from_weak_edges, num_unique_triangles, handle.get_stream()); + } + + auto edgelist_to_update_count = allocate_dataframe_buffer>( + 3 * num_unique_triangles, handle.get_stream()); + + // Flatten the triangle to a list of egdes. + thrust::transform( + handle.get_thrust_policy(), + thrust::make_counting_iterator(0), + thrust::make_counting_iterator(size_dataframe_buffer(edgelist_to_update_count)), + get_dataframe_buffer_begin(edgelist_to_update_count), + [num_unique_triangles, + triangles_from_weak_edges = + get_dataframe_buffer_begin(triangles_from_weak_edges)] __device__(auto idx) { + auto idx_triangle = idx % num_unique_triangles; + auto idx_vertex_in_triangle = idx / num_unique_triangles; + auto triangle = (triangles_from_weak_edges + idx_triangle).get_iterator_tuple(); + vertex_t src; + vertex_t dst; + + if (idx_vertex_in_triangle == 0) { + src = *(thrust::get<0>(triangle)); + dst = *(thrust::get<1>(triangle)); + } - if (idx_vertex_in_triangle == 2) { - src = *(thrust::get<4>(triangle)); - dst = *(thrust::get<5>(triangle)); - } - - return thrust::make_tuple(src, dst); + if (idx_vertex_in_triangle == 1) { + src = *(thrust::get<2>(triangle)); + dst = *(thrust::get<3>(triangle)); } - ); - thrust::sort( - handle.get_thrust_policy(), - get_dataframe_buffer_begin(edgelist_to_update_count), - get_dataframe_buffer_end(edgelist_to_update_count)); - - auto unique_pair_count = thrust::unique_count( - handle.get_thrust_policy(), - get_dataframe_buffer_begin(edgelist_to_update_count), - get_dataframe_buffer_end(edgelist_to_update_count)); - - // If multi-GPU, shuffle and reduce - if constexpr (multi_gpu) { - - auto& comm = handle.get_comms(); - auto const comm_size = comm.get_size(); - auto& major_comm = handle.get_subcomm(cugraph::partition_manager::major_comm_name()); - auto const major_comm_size = major_comm.get_size(); - auto& minor_comm = handle.get_subcomm(cugraph::partition_manager::minor_comm_name()); - auto const minor_comm_size = minor_comm.get_size(); - - auto vertex_partition_range_lasts = cur_graph_view.vertex_partition_range_lasts(); - - rmm::device_uvector d_vertex_partition_range_lasts(vertex_partition_range_lasts.size(), - handle.get_stream()); - raft::update_device(d_vertex_partition_range_lasts.data(), - vertex_partition_range_lasts.data(), - vertex_partition_range_lasts.size(), - handle.get_stream()); - - auto my_rank = handle.get_comms().get_rank(); - - std::tie(edgelist_to_update_count, std::ignore) = - groupby_gpu_id_and_shuffle_values( - handle.get_comms(), - get_dataframe_buffer_begin(edgelist_to_update_count), - get_dataframe_buffer_end(edgelist_to_update_count), - - [key_func = - cugraph::detail::compute_gpu_id_from_int_edge_endpoints_t{ - raft::device_span(d_vertex_partition_range_lasts.data(), - d_vertex_partition_range_lasts.size()), - comm_size, - major_comm_size, - minor_comm_size}] __device__(auto val) {return key_func(thrust::get<0>(val), thrust::get<1>(val));}, - - handle.get_stream() - ); - } - - thrust::sort( - handle.get_thrust_policy(), - get_dataframe_buffer_begin(edgelist_to_update_count), - get_dataframe_buffer_end(edgelist_to_update_count)); - - unique_pair_count = thrust::unique_count(handle.get_thrust_policy(), - get_dataframe_buffer_begin(edgelist_to_update_count), - get_dataframe_buffer_end(edgelist_to_update_count)); - - auto vertex_pair_buffer_unique = allocate_dataframe_buffer>( - unique_pair_count, handle.get_stream()); - - rmm::device_uvector decrease_count(unique_pair_count, handle.get_stream()); - - thrust::reduce_by_key(handle.get_thrust_policy(), - get_dataframe_buffer_begin(edgelist_to_update_count), - get_dataframe_buffer_end(edgelist_to_update_count), - thrust::make_constant_iterator(size_t{1}), - get_dataframe_buffer_begin(vertex_pair_buffer_unique), - decrease_count.begin(), - thrust::equal_to>{}); - - // Update count of weak edges - edges_to_decrement_count.clear(); - - edges_to_decrement_count.insert(std::get<0>(vertex_pair_buffer_unique).begin(), - std::get<0>(vertex_pair_buffer_unique).end(), - std::get<1>(vertex_pair_buffer_unique).begin()); - - cugraph::transform_e( - handle, - cur_graph_view, - edges_to_decrement_count, - cugraph::edge_src_dummy_property_t{}.view(), - cugraph::edge_dst_dummy_property_t{}.view(), - edge_triangle_counts.view(), - [ - edge_buffer_first = thrust::make_zip_iterator(std::get<0>(vertex_pair_buffer_unique).begin(), std::get<1>(vertex_pair_buffer_unique).begin()), - edge_buffer_last = thrust::make_zip_iterator(std::get<0>(vertex_pair_buffer_unique).end(), std::get<1>(vertex_pair_buffer_unique).end()), - decrease_count = raft::device_span(decrease_count.data(), decrease_count.size()) - ] - __device__(auto src, auto dst, thrust::nullopt_t, thrust::nullopt_t, edge_t count) { - - auto itr_pair = thrust::lower_bound( - thrust::seq, edge_buffer_first, edge_buffer_last, thrust::make_tuple(src, dst)); - - auto idx_pair = thrust::distance(edge_buffer_first, itr_pair); - - count -= decrease_count[idx_pair]; - - return count; + if (idx_vertex_in_triangle == 2) { + src = *(thrust::get<4>(triangle)); + dst = *(thrust::get<5>(triangle)); + } + + return thrust::make_tuple(src, dst); + }); + + thrust::sort(handle.get_thrust_policy(), + get_dataframe_buffer_begin(edgelist_to_update_count), + get_dataframe_buffer_end(edgelist_to_update_count)); + + auto unique_pair_count = + thrust::unique_count(handle.get_thrust_policy(), + get_dataframe_buffer_begin(edgelist_to_update_count), + get_dataframe_buffer_end(edgelist_to_update_count)); + + // If multi-GPU, shuffle and reduce + if constexpr (multi_gpu) { + auto& comm = handle.get_comms(); + auto const comm_size = comm.get_size(); + auto& major_comm = handle.get_subcomm(cugraph::partition_manager::major_comm_name()); + auto const major_comm_size = major_comm.get_size(); + auto& minor_comm = handle.get_subcomm(cugraph::partition_manager::minor_comm_name()); + auto const minor_comm_size = minor_comm.get_size(); + auto vertex_partition_range_lasts = cur_graph_view.vertex_partition_range_lasts(); + + rmm::device_uvector d_vertex_partition_range_lasts( + vertex_partition_range_lasts.size(), handle.get_stream()); + raft::update_device(d_vertex_partition_range_lasts.data(), + vertex_partition_range_lasts.data(), + vertex_partition_range_lasts.size(), + handle.get_stream()); + + auto my_rank = handle.get_comms().get_rank(); + + std::tie(edgelist_to_update_count, std::ignore) = groupby_gpu_id_and_shuffle_values( + handle.get_comms(), + get_dataframe_buffer_begin(edgelist_to_update_count), + get_dataframe_buffer_end(edgelist_to_update_count), + + [key_func = + cugraph::detail::compute_gpu_id_from_int_edge_endpoints_t{ + raft::device_span(d_vertex_partition_range_lasts.data(), + d_vertex_partition_range_lasts.size()), + comm_size, + major_comm_size, + minor_comm_size}] __device__(auto val) { + return key_func(thrust::get<0>(val), thrust::get<1>(val)); }, - edge_triangle_counts.mutable_view(), - false); - - edgelist_weak.clear(); - - thrust::sort( - handle.get_thrust_policy(), - thrust::make_zip_iterator(weak_edgelist_srcs.begin(), weak_edgelist_dsts.begin()), - thrust::make_zip_iterator(weak_edgelist_srcs.end(), weak_edgelist_dsts.end()) - ); - - edgelist_weak.insert(weak_edgelist_srcs.begin(), - weak_edgelist_srcs.end(), - weak_edgelist_dsts.begin()); - - // Get undirected graph view - cur_graph_view.clear_edge_mask(); - cur_graph_view.attach_edge_mask(weak_edges_mask.view()); - - auto prev_number_of_edges = cur_graph_view.compute_number_of_edges(handle); - - cugraph::transform_e( - handle, - cur_graph_view, - edgelist_weak, - cugraph::edge_src_dummy_property_t{}.view(), - cugraph::edge_dst_dummy_property_t{}.view(), - cugraph::edge_dummy_property_t{}.view(), - [] __device__(auto src, auto dst, thrust::nullopt_t, thrust::nullopt_t, thrust::nullopt_t) { - - return false; - }, - weak_edges_mask.mutable_view(), - false); - - edgelist_weak.clear(); - thrust::sort( - handle.get_thrust_policy(), - thrust::make_zip_iterator(weak_edgelist_dsts.begin(), weak_edgelist_srcs.begin()), - thrust::make_zip_iterator(weak_edgelist_dsts.end(), weak_edgelist_srcs.end()) - ); - - edgelist_weak.insert(weak_edgelist_dsts.begin(), - weak_edgelist_dsts.end(), - weak_edgelist_srcs.begin()); - - cugraph::transform_e( - handle, - cur_graph_view, - edgelist_weak, - cugraph::edge_src_dummy_property_t{}.view(), - cugraph::edge_dst_dummy_property_t{}.view(), - cugraph::edge_dummy_property_t{}.view(), - [] __device__(auto src, auto dst, thrust::nullopt_t, thrust::nullopt_t, thrust::nullopt_t) { - - return false; - }, - weak_edges_mask.mutable_view(), - false); - - cur_graph_view.attach_edge_mask(weak_edges_mask.view()); - - if (prev_number_of_edges == cur_graph_view.compute_number_of_edges(handle)) { break; } - + + handle.get_stream()); + } + + thrust::sort(handle.get_thrust_policy(), + get_dataframe_buffer_begin(edgelist_to_update_count), + get_dataframe_buffer_end(edgelist_to_update_count)); + + unique_pair_count = thrust::unique_count(handle.get_thrust_policy(), + get_dataframe_buffer_begin(edgelist_to_update_count), + get_dataframe_buffer_end(edgelist_to_update_count)); + + auto vertex_pair_buffer_unique = allocate_dataframe_buffer>( + unique_pair_count, handle.get_stream()); + + rmm::device_uvector decrease_count(unique_pair_count, handle.get_stream()); + + thrust::reduce_by_key(handle.get_thrust_policy(), + get_dataframe_buffer_begin(edgelist_to_update_count), + get_dataframe_buffer_end(edgelist_to_update_count), + thrust::make_constant_iterator(size_t{1}), + get_dataframe_buffer_begin(vertex_pair_buffer_unique), + decrease_count.begin(), + thrust::equal_to>{}); + + // Update count of weak edges + edges_to_decrement_count.clear(); + + edges_to_decrement_count.insert(std::get<0>(vertex_pair_buffer_unique).begin(), + std::get<0>(vertex_pair_buffer_unique).end(), + std::get<1>(vertex_pair_buffer_unique).begin()); + + cugraph::transform_e( + handle, + cur_graph_view, + edges_to_decrement_count, + cugraph::edge_src_dummy_property_t{}.view(), + cugraph::edge_dst_dummy_property_t{}.view(), + edge_triangle_counts.view(), + [edge_buffer_first = + thrust::make_zip_iterator(std::get<0>(vertex_pair_buffer_unique).begin(), + std::get<1>(vertex_pair_buffer_unique).begin()), + edge_buffer_last = thrust::make_zip_iterator(std::get<0>(vertex_pair_buffer_unique).end(), + std::get<1>(vertex_pair_buffer_unique).end()), + decrease_count = raft::device_span( + decrease_count.data(), decrease_count.size())] __device__(auto src, + auto dst, + thrust::nullopt_t, + thrust::nullopt_t, + edge_t count) { + auto itr_pair = thrust::lower_bound( + thrust::seq, edge_buffer_first, edge_buffer_last, thrust::make_tuple(src, dst)); + + auto idx_pair = thrust::distance(edge_buffer_first, itr_pair); + + count -= decrease_count[idx_pair]; + + return count; + }, + edge_triangle_counts.mutable_view(), + false); + + edgelist_weak.clear(); + + thrust::sort( + handle.get_thrust_policy(), + thrust::make_zip_iterator(weak_edgelist_srcs.begin(), weak_edgelist_dsts.begin()), + thrust::make_zip_iterator(weak_edgelist_srcs.end(), weak_edgelist_dsts.end())); + + edgelist_weak.insert( + weak_edgelist_srcs.begin(), weak_edgelist_srcs.end(), weak_edgelist_dsts.begin()); + + // Get undirected graph view + cur_graph_view.clear_edge_mask(); + cur_graph_view.attach_edge_mask(weak_edges_mask.view()); + + auto prev_number_of_edges = cur_graph_view.compute_number_of_edges(handle); + + cugraph::transform_e( + handle, + cur_graph_view, + edgelist_weak, + cugraph::edge_src_dummy_property_t{}.view(), + cugraph::edge_dst_dummy_property_t{}.view(), + cugraph::edge_dummy_property_t{}.view(), + [] __device__(auto src, auto dst, thrust::nullopt_t, thrust::nullopt_t, thrust::nullopt_t) { + return false; + }, + weak_edges_mask.mutable_view(), + false); + + edgelist_weak.clear(); + thrust::sort( + handle.get_thrust_policy(), + thrust::make_zip_iterator(weak_edgelist_dsts.begin(), weak_edgelist_srcs.begin()), + thrust::make_zip_iterator(weak_edgelist_dsts.end(), weak_edgelist_srcs.end())); + + edgelist_weak.insert( + weak_edgelist_dsts.begin(), weak_edgelist_dsts.end(), weak_edgelist_srcs.begin()); + + cugraph::transform_e( + handle, + cur_graph_view, + edgelist_weak, + cugraph::edge_src_dummy_property_t{}.view(), + cugraph::edge_dst_dummy_property_t{}.view(), + cugraph::edge_dummy_property_t{}.view(), + [] __device__(auto src, auto dst, thrust::nullopt_t, thrust::nullopt_t, thrust::nullopt_t) { + return false; + }, + weak_edges_mask.mutable_view(), + false); + + cur_graph_view.attach_edge_mask(weak_edges_mask.view()); + + if (prev_number_of_edges == cur_graph_view.compute_number_of_edges(handle)) { break; } } - + cur_graph_view.clear_edge_mask(); cur_graph_view.attach_edge_mask(dodg_mask.view()); - + cugraph::transform_e( - handle, - cur_graph_view, - cugraph::edge_src_dummy_property_t{}.view(), - cugraph::edge_dst_dummy_property_t{}.view(), - edge_triangle_counts.view(), - [] __device__(auto src, auto dst, thrust::nullopt_t, thrust::nullopt_t, auto count) { - return count == 0 ? false : true; - }, - dodg_mask.mutable_view(), - true); - + handle, + cur_graph_view, + cugraph::edge_src_dummy_property_t{}.view(), + cugraph::edge_dst_dummy_property_t{}.view(), + edge_triangle_counts.view(), + [] __device__(auto src, auto dst, thrust::nullopt_t, thrust::nullopt_t, auto count) { + return count == 0 ? false : true; + }, + dodg_mask.mutable_view(), + true); + rmm::device_uvector edgelist_srcs(0, handle.get_stream()); rmm::device_uvector edgelist_dsts(0, handle.get_stream()); std::optional> edgelist_wgts{std::nullopt}; - std::tie(edgelist_srcs, edgelist_dsts, edgelist_wgts, std::ignore, std::ignore) = decompress_to_edgelist( handle, @@ -954,19 +901,17 @@ k_truss(raft::handle_t const& handle, edge_weight_view, std::optional>{std::nullopt}, std::optional>{std::nullopt}, - renumber_map - ? std::make_optional( - raft::device_span((*renumber_map).data(), (*renumber_map).size())): - std::nullopt - ); - + renumber_map ? std::make_optional(raft::device_span((*renumber_map).data(), + (*renumber_map).size())) + : std::nullopt); + std::tie(edgelist_srcs, edgelist_dsts, edgelist_wgts) = symmetrize_edgelist(handle, std::move(edgelist_srcs), std::move(edgelist_dsts), std::move(edgelist_wgts), false); - + return std::make_tuple( std::move(edgelist_srcs), std::move(edgelist_dsts), std::move(edgelist_wgts)); }