Skip to content

Commit

Permalink
[FEA] Support Link Prediction and Negative Sampling in cuGraph-DGL (#50)
Browse files Browse the repository at this point in the history
Supports the key functions needed for link prediction and negative sampling in cuGraph-DGL.

Depends on rapidsai/cugraph#4687
Closes rapidsai/cugraph#2753

Authors:
  - Alex Barghi (https://github.com/alexbarghi-nv)

Approvers:
  - Tingyu Wang (https://github.com/tingyu66)

URL: #50
  • Loading branch information
alexbarghi-nv authored Feb 6, 2025
1 parent bc52d9d commit 8bf2012
Show file tree
Hide file tree
Showing 4 changed files with 618 additions and 49 deletions.
217 changes: 208 additions & 9 deletions python/cugraph-dgl/cugraph_dgl/graph.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2024, NVIDIA CORPORATION.
# Copyright (c) 2024-2025, NVIDIA CORPORATION.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
Expand Down Expand Up @@ -106,6 +106,7 @@ def __init__(

self.__graph = None
self.__vertex_offsets = None
self.__edge_lookup_table = None
self.__handle = None
self.__is_multi_gpu = is_multi_gpu

Expand All @@ -127,6 +128,11 @@ def __init__(
def is_multi_gpu(self):
return self.__is_multi_gpu

def _clear_graph(self):
self.__graph = None
self.__edge_lookup_table = None
self.__vertex_offsets = None

def to_canonical_etype(
self, etype: Union[str, Tuple[str, str, str]]
) -> Tuple[str, str, str]:
Expand All @@ -146,6 +152,20 @@ def to_canonical_etype(

raise ValueError("Unknown relation type " + etype)

def _to_numeric_etype(self, etype: Union[str, Tuple[str, str, str]]) -> int:
if etype is None:
if len(self.canonical_etypes) > 1:
raise ValueError("Edge type is required for heterogeneous graphs.")
return 0

etype = self.to_canonical_etype(etype)
return {
k: i
for i, k in enumerate(
sorted(self.__edge_indices.keys(leaves_only=True, include_nested=True))
)
}[etype]

def add_nodes(
self,
global_num_nodes: int,
Expand Down Expand Up @@ -217,8 +237,7 @@ def add_nodes(
_cast_to_torch_tensor(feature_tensor), **self.__wg_kwargs
)

self.__graph = None
self.__vertex_offsets = None
self._clear_graph()

def __check_node_ids(self, ntype: str, ids: TensorType):
"""
Expand Down Expand Up @@ -309,8 +328,7 @@ def add_edges(

self.__num_edges_dict[dgl_can_edge_type] = int(num_edges)

self.__graph = None
self.__vertex_offsets = None
self._clear_graph()

def num_nodes(self, ntype: Optional[str] = None) -> int:
"""
Expand Down Expand Up @@ -537,7 +555,7 @@ def _graph(
self.__graph["direction"] != direction
or self.__graph["prob_attr"] != prob_attr
):
self.__graph = None
self._clear_graph()

if self.__graph is None:
src_col, dst_col = ("src", "dst") if direction == "out" else ("dst", "src")
Expand Down Expand Up @@ -620,9 +638,6 @@ def _get_n_emb(
)

try:
print(
u,
)
return self.__ndata_storage[ntype, emb_name].fetch(
_cast_to_torch_tensor(u), "cuda"
)
Expand Down Expand Up @@ -895,6 +910,190 @@ def all_edges(
else:
raise ValueError(f"Invalid form {form}")

@property
def _edge_lookup_table(self):
if self.__edge_lookup_table is None:
self.__edge_lookup_table = pylibcugraph.EdgeIdLookupTable(
self._resource_handle,
self._graph("out") if self.__graph is None else self.__graph["graph"],
)

return self.__edge_lookup_table

def find_edges(
self, eid: "torch.Tensor", etype: Union[str, Tuple[str, str, str]] = None
) -> Tuple["torch.Tensor", "torch.Tensor"]:
"""
Looks up and returns the appropriate src/dst pairs given a sequence of edge
ids and an edge type.
"""

# Have to properly de-offset the vertices based on edge type
etype = self.to_canonical_etype(etype)
num_edge_type = self._to_numeric_etype(etype)
out = self._edge_lookup_table.lookup_vertex_ids(
cupy.asarray(eid), num_edge_type
)

src_name = "sources" if self.__graph["direction"] == "out" else "destinations"
dst_name = "destinations" if self.__graph["direction"] == "out" else "sources"
offsets = self._vertex_offsets

return (
torch.as_tensor(out[src_name], device="cuda") - offsets[etype[0]],
torch.as_tensor(out[dst_name], device="cuda") - offsets[etype[2]],
)

def global_uniform_negative_sampling(
self,
num_samples: int,
exclude_self_loops: bool = True,
replace: bool = False,
etype: Optional[Union[str, Tuple[str, str, str]]] = None,
redundancy: Optional[float] = None,
):
"""
Performs negative sampling, which constructs a set of source and destination
pairs that do not exist in this graph.
Parameters
----------
num_samples: int
Target number of negative edges to generate. May generate less depending
on whether the existing set of edges allows for it.
exclude_self_loops: bool
Whether to drop edges where the source and destination is the same.
Defaults to True.
replace: bool
Whether to sample with replacement. Sampling with replacement is not
supported by the cuGraph-DGL generator. Defaults to False.
etype: str or tuple[str, str, str] (Optional)
The edge type to generate negative edges for. Optional if there is
only one edge type in the graph.
redundancy: float (Optional)
Not supported by the cuGraph-DGL generator.
"""

if redundancy:
warnings.warn("The 'redudancy' parameter is ignored by cuGraph-DGL.")
if replace:
raise NotImplementedError(
"Negative sampling with replacement is not supported by cuGraph-DGL."
)

if len(self.ntypes) == 1:
vertices = torch.arange(self.num_nodes())
src_vertex_offset = 0
dst_vertex_offset = 0
src_bias = cupy.ones(len(vertices), dtype="float32")
dst_bias = src_bias
else:
can_edge_type = self.to_canonical_etype(etype)
src_vertex_offset = self._vertex_offsets[can_edge_type[0]]
dst_vertex_offset = self._vertex_offsets[can_edge_type[2]]

# Limit sampled vertices to those of the given edge type.
if can_edge_type[0] == can_edge_type[2]:
vertices = torch.arange(
src_vertex_offset,
src_vertex_offset + self.num_nodes(can_edge_type[0]),
dtype=torch.int64,
device="cuda",
)
src_bias = cupy.ones(self.num_nodes(can_edge_type[0]), dtype="float32")
dst_bias = src_bias

else:
vertices = torch.concat(
[
torch.arange(
src_vertex_offset,
src_vertex_offset + self.num_nodes(can_edge_type[0]),
dtype=torch.int64,
device="cuda",
),
torch.arange(
dst_vertex_offset,
dst_vertex_offset + self.num_nodes(can_edge_type[2]),
dtype=torch.int64,
device="cuda",
),
]
)

src_bias = cupy.concatenate(
[
cupy.ones(self.num_nodes(can_edge_type[0]), dtype="float32"),
cupy.zeros(self.num_nodes(can_edge_type[2]), dtype="float32"),
]
)

dst_bias = cupy.concatenate(
[
cupy.zeros(self.num_nodes(can_edge_type[0]), dtype="float32"),
cupy.ones(self.num_nodes(can_edge_type[2]), dtype="float32"),
]
)

if self.is_multi_gpu:
rank = torch.distributed.get_rank()
world_size = torch.distributed.get_world_size()

num_samples_global = torch.tensor([num_samples], device="cuda")
torch.distributed.all_reduce(
num_samples_global, op=torch.distributed.ReduceOp.SUM
)
num_samples_global = int(num_samples_global)

vertices = torch.tensor_split(vertices, world_size)[rank]

src_bias = cupy.array_split(src_bias, world_size)[rank]
dst_bias = (
src_bias
if can_edge_type[0] == can_edge_type[2]
else cupy.array_split(dst_bias, world_size)[rank]
)
else:
num_samples_global = num_samples

graph = (
self.__graph["graph"]
if self.__graph is not None and self.__graph["direction"] == "out"
else self._graph(
"out", None if self.__graph is None else self.__graph["prob_attr"]
)
)

result_dict = pylibcugraph.negative_sampling(
self._resource_handle,
graph,
num_samples_global,
vertices=cupy.asarray(vertices),
src_bias=src_bias,
dst_bias=dst_bias,
remove_duplicates=True,
remove_false_negatives=True,
exact_number_of_samples=True,
do_expensive_check=False,
)

# TODO remove this workaround once the C API is updated to take a local number
# of negatives (rapidsai/cugraph#4672)
src_neg = (
torch.as_tensor(result_dict["sources"], device="cuda")[:num_samples]
- src_vertex_offset
)
dst_neg = (
torch.as_tensor(result_dict["destinations"], device="cuda")[:num_samples]
- dst_vertex_offset
)

if exclude_self_loops:
f = src_neg != dst_neg
return src_neg[f], dst_neg[f]
else:
return src_neg, dst_neg

@property
def ndata(self) -> HeteroNodeDataView:
"""
Expand Down
72 changes: 71 additions & 1 deletion python/cugraph-dgl/cugraph_dgl/tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2021-2024, NVIDIA CORPORATION.
# Copyright (c) 2021-2025, NVIDIA CORPORATION.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
Expand All @@ -15,12 +15,17 @@

import dgl
import torch
import numpy as np

import cugraph_dgl

from cugraph.testing.mg_utils import (
start_dask_client,
stop_dask_client,
)

from cugraph.datasets import karate


@pytest.fixture(scope="module")
def dask_client():
Expand Down Expand Up @@ -66,3 +71,68 @@ def dgl_graph_1():
src = torch.tensor([0, 1, 0, 2, 3, 0, 4, 0, 5, 0, 6, 7, 0, 8, 9])
dst = torch.tensor([1, 9, 2, 9, 9, 4, 9, 5, 9, 6, 9, 9, 8, 9, 0])
return dgl.graph((src, dst))


def create_karate_bipartite(multi_gpu: bool = False):
df = karate.get_edgelist()
df.src = df.src.astype("int64")
df.dst = df.dst.astype("int64")

graph = cugraph_dgl.Graph(is_multi_gpu=multi_gpu)
total_num_nodes = max(df.src.max(), df.dst.max()) + 1

num_nodes_group_1 = total_num_nodes // 2
num_nodes_group_2 = total_num_nodes - num_nodes_group_1

node_x_1 = np.random.random((num_nodes_group_1,))
node_x_2 = np.random.random((num_nodes_group_2,))

if multi_gpu:
rank = torch.distributed.get_rank()
world_size = torch.distributed.get_world_size()

node_x_1 = np.array_split(node_x_1, world_size)[rank]
node_x_2 = np.array_split(node_x_2, world_size)[rank]

graph.add_nodes(num_nodes_group_1, {"x": node_x_1}, "type1")
graph.add_nodes(num_nodes_group_2, {"x": node_x_2}, "type2")

edges = {}
edges["type1", "e1", "type1"] = df[
(df.src < num_nodes_group_1) & (df.dst < num_nodes_group_1)
]
edges["type1", "e2", "type2"] = df[
(df.src < num_nodes_group_1) & (df.dst >= num_nodes_group_1)
]
edges["type2", "e3", "type1"] = df[
(df.src >= num_nodes_group_1) & (df.dst < num_nodes_group_1)
]
edges["type2", "e4", "type2"] = df[
(df.src >= num_nodes_group_1) & (df.dst >= num_nodes_group_1)
]

edges["type1", "e2", "type2"].dst -= num_nodes_group_1
edges["type2", "e3", "type1"].src -= num_nodes_group_1
edges["type2", "e4", "type2"].dst -= num_nodes_group_1
edges["type2", "e4", "type2"].src -= num_nodes_group_1

if multi_gpu:
rank = torch.distributed.get_rank()
world_size = torch.distributed.get_world_size()

edges_local = {
etype: edf.iloc[np.array_split(np.arange(len(edf)), world_size)[rank]]
for etype, edf in edges.items()
}
else:
edges_local = edges

for etype, edf in edges_local.items():
graph.add_edges(edf.src, edf.dst, etype=etype)

return graph, edges, (num_nodes_group_1, num_nodes_group_2)


@pytest.fixture
def karate_bipartite():
return create_karate_bipartite(False)
Loading

0 comments on commit 8bf2012

Please sign in to comment.