diff --git a/.gitignore b/.gitignore index 927fb084..4b747366 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,8 @@ **/data_dir/ run/datasets/data/ +run/results/ +run/runs_*/ **/__pycache__/ **/.ipynb_checkpoints -.idea/ \ No newline at end of file +.idea/ +.vscode/settings.json diff --git a/ROLAND_README.md b/ROLAND_README.md new file mode 100644 index 00000000..933d02d2 --- /dev/null +++ b/ROLAND_README.md @@ -0,0 +1,106 @@ +# ROLAND: Graph Neural Networks for Dynamic Graphs +This repository contains code associated with the ROLAND project and more. +You can firstly walk through the *how-to* sections to run experiments on existing +public datasets. +After understanding how to run and analyze experiments, you can read through the *development topics* to run our + + + + +## How to Download Datasets +Most of datasets are used in our paper can be found at `https://snap.stanford.edu/data/index.html`. + +```bash +# Or Use your own dataset directory. +mkdir ./all_datasets/ +cd ./all_datasets +wget 'https://snap.stanford.edu/data/soc-sign-bitcoinotc.csv.gz' +wget 'https://snap.stanford.edu/data/soc-sign-bitcoinalpha.csv.gz' +wget 'https://snap.stanford.edu/data/as-733.tar.gz' +wget 'https://snap.stanford.edu/data/CollegeMsg.txt.gz' +wget 'https://snap.stanford.edu/data/soc-redditHyperlinks-body.tsv' +wget 'https://snap.stanford.edu/data/soc-redditHyperlinks-title.tsv' +wget 'http://snap.stanford.edu/data/web-redditEmbeddings-subreddits.csv' + +# Unzip files +gunzip CollegeMsg.txt.gz +gunzip soc-sign-bitcoinalpha.csv.gz +gunzip soc-sign-bitcoinotc.csv.gz +tar xf ./as-733.tar.gz + +# Rename files, this step is required by our loader. +# You can leave the web-redditEmbeddings-subreddits.csv file unchanged. +mv ./soc-sign-bitcoinotc.csv ./bitcoinotc.csv +mv ./soc-sign-bitcoinalpha.csv ./bitcoinalpha.csv + +mv ./soc-redditHyperlinks-body.tsv ./reddit-body.tsv +mv ./soc-redditHyperlinks-title.tsv ./reddit-title.tsv +``` +You should expect 740 files, including the zipped `as-733.tar.gz`, by checking `ls | wc -l`. +The total disk space required is approximately 950MiB. +## How to Run Single Experiments from Our Paper +**WARNING**: for each `yaml` file in `./run/configs/ROLAND`, you need to update the `dataset.dir` field to the correct path of datasets downloaded above. + +The ROLAND project focuses on link-predictions for homogenous dynamic graphs. +Here we demonstrate example runs using + +To run link-prediction task on `CollegeMsg.txt` dataset with default settings: +```bash +cd ./run +python3 main_dynamic.py --cfg configs/ROLAND/roland_gru_ucimsg.yaml --repeat 1 +``` +For other datasets: +```bash +python3 main_dynamic.py --cfg configs/ROLAND/roland_gru_btcalpha.yaml --repeat 1 + +python3 main_dynamic.py --cfg configs/ROLAND/roland_gru_btcotc.yaml --repeat 1 + +python3 main_dynamic.py --cfg configs/ROLAND/roland_gru_ucimsg.yaml --repeat 1 + +python3 main_dynamic.py --cfg configs/ROLAND/roland_gru_reddittitle.yaml --repeat 1 + +python3 main_dynamic.py --cfg configs/ROLAND/roland_gru_redditbody.yaml --repeat 1 +``` +The `--repeat` argument controls for number of random seeds used for each experiment. For example, setting `--repeat 3` runs each single experiments for three times with three different random seeds. + +To explore training result: +```bash +cd ./run +tensorboard --logdir=./runs_live_update --port=6006 +``` +**WARNING** The x-axis of plots in tensorboard is **not** epochs, they are snapshot IDs (e.g., the $i^{th}$ day or the $i^{th}$ week) instead. + + + +## How to Run Grid Search / Batch Experiments +To run grid search / batch experiments, one needs a `main.py` file, a `base_config.yaml`, and a `grid.txt` file. The main and config files are the same as in the single experiment setup above. +If one wants to do link-prediction on `CollegeMsg.txt` dataset with configurations from `configs/ROLAND/roland_gru_ucimsg.yaml`, in addition, she wants to try out (1) *different numbers of GNN message passing layers* and (2) *different learning rates*. +In this case, one can use the following grid file: +```text +# grid.txt, lines starting with # are comments. +gnn.layers_mp mp [2,3,4,5] +optim.base_lr lr [0.003,0.01,0.03] +``` +**WARNING**: the format of each line is crucial: `NAME_IN_YAMLSHORT_ALIASLIST_OF_VALUES`, and there should **not** be any space in the list of values. + +The `grid.txt` above will generate $4\times 3=12$ different configurations by modifying `gnn.layers_mp` and `gnn.layers_mp` to the respective levels in base config file `roland_gru_ucimsg.yaml`. + +Please see `./run/grids/ROLAND/example_grid.txt` for a complete example of grid search text file. + +To run the experiment using `example_grid.txt`: +```bash +bash ./run_roland_batch.sh +``` +## How to Export Tensorboard Results to CSV +We provide a simple script to aggregate results from a batch of tensorboard files, please feel free to look into `tabulate_events.py` and modify it. +```bash +# Usage: python3 ./tabulate_events.py +python3 ./tabulate_events.py ./live_update ./out.csv +``` + +## Development Topic: Use Your Own Dataset +We provided two examples of constructing your own datasets, please refer to +(1) `./graphgym/contrib/loader/roland_template.py` and (2) `./graphgym/contrib/loader/roland_template_hetero.py` for examples of building loaders. diff --git a/graphgym/contrib/config/roland.py b/graphgym/contrib/config/roland.py new file mode 100644 index 00000000..af6da3fc --- /dev/null +++ b/graphgym/contrib/config/roland.py @@ -0,0 +1,196 @@ +from yacs.config import CfgNode as CN + +from graphgym.register import register_config + + +def set_cfg_roland(cfg): + """ + This function sets the default config value for customized options + :return: customized configuration use by the experiment. + """ + + # ----------------------------------------------------------------------- # + # Customized options + # ----------------------------------------------------------------------- # + + # Use to identify experiments, tensorboard will be saved to this path. + # Options: any string. + cfg.remark = '' + + # ----------------------------------------------------------------------- # + # Additional GNN options. + # ----------------------------------------------------------------------- # + # Method to update node embedding from old node embedding and new node features. + # Options: {'moving_average', 'mlp', 'gru'} + cfg.gnn.embed_update_method = 'moving_average' + + # How many layers to use in the MLP updater. + # Options: integers >= 1. + # NOTE: there is a known issue when set to 1, use >= 2 for now. + # Only effective when cfg.gnn.embed_update_method == 'mlp'. + cfg.gnn.mlp_update_layers = 2 + + # What kind of skip-connection to use. + # Options: {'none', 'identity', 'affine'}. + cfg.gnn.skip_connection = 'none' + + # The bath size while making link prediction, useful when number of negative + # edges is huge, use a smaller number depends on GPU memroy size.. + cfg.gnn.link_pred_batch_size = 500000 + # ----------------------------------------------------------------------- # + # Meta-Learning options. + # ----------------------------------------------------------------------- # + # For meta-learning. + cfg.meta = CN() + # Whether to do meta-learning via initialization moving average. + # Options: {True, False} + cfg.meta.is_meta = False + + # Weight used in moving average for model parameters. + # After fine-tuning the model in period t and get model M[t], + # Set W_init = (1-alpha) * W_init + alpha * M[t]. + # For the next period, use W_init as the initialization for fine-tune + # Set cfg.meta.alpha = 1.0 to recover the original algorithm. + # Options: float between 0.0 and 1.0. + cfg.meta.alpha = 0.9 + + # ----------------------------------------------------------------------- # + # Additional GNN options. + # ----------------------------------------------------------------------- # + # How many snapshots for the truncated back-propagation. + # Set to a very large integer to use full-back-prop-through-time + # Options: integers >= 1. + cfg.train.tbptt_freq = 10 + + # Early stopping tolerance in live-update. + # Options: integers >= 1. + cfg.train.internal_validation_tolerance = 5 + + # Computing MRR is slow in the baseline setting. + # Only start to compute MRR in the test set range after certain time. + # Options: integers >= 0. + cfg.train.start_compute_mrr = 0 + + # ----------------------------------------------------------------------- # + # Additional dataset options. + # ----------------------------------------------------------------------- # + + # How to handle node features in AS-733 dataset. + # Options: ['one', 'one_hot_id', 'one_hot_degree_global'] + cfg.dataset.AS_node_feature = 'one' + + # Method used to sample negative edges for edge_label_index. + # Options: + # 'uniform': all non-existing edges have same probability of being sampled + # as negative edges. + # 'src': non-existing edges from high-degree nodes are more likely to be + # sampled as negative edges. + # 'dest': non-existing edges pointed to high-degree nodes are more likely + # to be sampled as negative edges. + cfg.dataset.negative_sample_weight = 'uniform' + + # Whether to load dataset as heterogeneous graphs. + # Options: {True, False}. + cfg.dataset.is_hetero = False + + # whether to look for and load cached graph. By default (load_cache=False) + # the loader loads the raw tsv file from disk and + cfg.dataset.load_cache = False + + cfg.dataset.premade_datasets = 'fresh' + + cfg.dataset.include_node_features = False + + # 'chronological_temporal' or 'default'. + # 'chronological_temporal': only for temporal graphs, for example, + # the first 80% snapshots are for training, then subsequent 10% snapshots + # are for validation and the last 10% snapshots are for testing. + cfg.dataset.split_method = 'default' + + # In the case of live-update, whether to predict all edges at time t+1. + cfg.dataset.link_pred_all_edges = False + # ----------------------------------------------------------------------- # + # Customized options: `transaction` for ROLAND dynamic graphs. + # ----------------------------------------------------------------------- # + + # example argument group + cfg.transaction = CN() + + # whether use snapshot + cfg.transaction.snapshot = False + + # snapshot split method 1: number of snapshots + # split dataset into fixed number of snapshots. + cfg.transaction.snapshot_num = 100 + + # snapshot split method 2: snapshot frequency + # e.g., one snapshot contains transactions within 1 day. + cfg.transaction.snapshot_freq = 'D' + + cfg.transaction.check_snapshot = False + + # how to use transaction history + # full or rolling + cfg.transaction.history = 'full' + + # type of loss: supervised / meta + cfg.transaction.loss = 'meta' + + # feature dim for int edge features + cfg.transaction.feature_int_dim = 32 + cfg.transaction.feature_edge_int_num = [50, 8, 252, 252, 3, 3] + cfg.transaction.feature_node_int_num = [0] + + # feature dim for amount (float) edge feature + cfg.transaction.feature_amount_dim = 64 + + # feature dim for time (float) edge feature + cfg.transaction.feature_time_dim = 64 + + # + cfg.transaction.node_feature = 'raw' + + # how many days look into the future + cfg.transaction.horizon = 1 + + # prediction mode for the task; 'before' or 'after' + cfg.transaction.pred_mode = 'before' + + # number of periods to be captured. + # set to a list of integers if wish to use pre-defined periodicity. + # e.g., [1,7,28,31,...] etc. + cfg.transaction.time_enc_periods = [1] + + # if 'enc_before_diff': attention weight = diff(enc(t1), enc(t2)) + # if 'diff_before_enc': attention weight = enc(t1 - t2) + cfg.transaction.time_enc_mode = 'enc_before_diff' + + # how to compute the keep ratio while updating the recurrent GNN. + # the update ratio (for each node) is a function of its degree in [0, t) + # and its degree in snapshot t. + cfg.transaction.keep_ratio = 'linear' + + # ----------------------------------------------------------------------- # + # Customized options: metrics. + # ----------------------------------------------------------------------- # + + cfg.metric = CN() + # How many negative edges for each node to compute rank-based evaluation + # metrics such as MRR and recall at K. + # E.g., if multiplier = 1000 and a node has 3 positive edges, then we + # compute the MRR using 1000 randomly generated negative edges + # + 3 existing positive edges. + # Use 100 ~ 1000 for fast and reliable results. + cfg.metric.mrr_num_negative_edges = 1000 + + # how to compute MRR. + # available: f = 'min', 'max', 'mean'. + # Step 1: get the p* = f(scores of positive edges) + # Step 2: compute the rank r of p* among all negative edges. + # Step 3: RR = 1 / rank. + # Step 4: average over all users. + # expected MRR(min) <= MRR(mean) <= MRR(max). + cfg.metric.mrr_method = 'max' + + +register_config('roland', set_cfg_roland) diff --git a/graphgym/contrib/feature_encoder/roland.py b/graphgym/contrib/feature_encoder/roland.py new file mode 100644 index 00000000..5a3b4898 --- /dev/null +++ b/graphgym/contrib/feature_encoder/roland.py @@ -0,0 +1,142 @@ +import deepsnap +import torch +import torch.nn as nn +from graphgym.config import cfg +from graphgym.register import register_edge_encoder, register_node_encoder + + +class TransactionEdgeEncoder(torch.nn.Module): + """A module that encodes edge features in the transaction graph. + + Example: + TransactionEdgeEncoder( + (embedding_list): ModuleList( + (0): Embedding(50, 32) # The first integral edge feature has 50 unique values. + # convert this integral feature to 32 dimensional embedding. + (1): Embedding(8, 32) + (2): Embedding(252, 32) + (3): Embedding(252, 32) + ) + (linear_amount): Linear(in_features=1, out_features=64, bias=True) + (linear_time): Linear(in_features=1, out_features=64, bias=True) + ) + + Initial edge feature dimension = 6 + Final edge embedding dimension = 32 + 32 + 32 + 32 + 64 + 64 = 256 + """ + + def __init__(self, emb_dim: int): + # emb_dim is not used here. + super(TransactionEdgeEncoder, self).__init__() + + self.embedding_list = torch.nn.ModuleList() + # Note: feature_edge_int_num[i] = len(torch.unique(graph.edge_feature[:, i])) + # where i-th edge features are integral. + for num in cfg.transaction.feature_edge_int_num: + emb = torch.nn.Embedding(num, cfg.transaction.feature_int_dim) + torch.nn.init.xavier_uniform_(emb.weight.data) + self.embedding_list.append(emb) + + # Embed non-integral features. + self.linear_amount = nn.Linear(1, cfg.transaction.feature_amount_dim) + self.linear_time = nn.Linear(1, cfg.transaction.feature_time_dim) + # update edge_dim + cfg.dataset.edge_dim = len(cfg.transaction.feature_edge_int_num) \ + * cfg.transaction.feature_int_dim \ + + cfg.transaction.feature_amount_dim \ + + cfg.transaction.feature_time_dim + + def forward(self, batch: deepsnap.batch.Batch) -> deepsnap.batch.Batch: + edge_embedding = [] + for i in range(len(self.embedding_list)): + edge_embedding.append( + self.embedding_list[i](batch.edge_feature[:, i].long()) + ) + # By default, edge_feature[:, -2] contains edge amount, + # edge_feature[:, -1] contains edge time. + edge_embedding.append( + self.linear_amount(batch.edge_feature[:, -2].view(-1, 1)) + ) + edge_embedding.append( + self.linear_time(batch.edge_feature[:, -1].view(-1, 1)) + ) + batch.edge_feature = torch.cat(edge_embedding, dim=1) + return batch + + +register_edge_encoder('roland', TransactionEdgeEncoder) + + +class TransactionNodeEncoder(torch.nn.Module): + """A module that encodes node features in the transaction graph. + + Parameters: + num_classes - the number of classes for the embedding mapping to learn + + Example: + 3 unique values for the first integral node feature. + 3 unique values for the second integral node feature. + + cfg.transaction.feature_node_int_num = [3, 3] + cfg.transaction.feature_int_dim = 32 + + TransactionNodeEncoder( + (embedding_list): ModuleList( + (0): Embedding(3, 32) # embed the first node feature to 32-dimensional space. + (1): Embedding(3, 32) # embed the second node feature to 32-dimensional space. + ) + ) + + Initial node feature dimension = 2 + Final node embedding dimension = 32 + 32 = 256 + """ + + def __init__(self, emb_dim: int, num_classes=None): + super(TransactionNodeEncoder, self).__init__() + self.embedding_list = torch.nn.ModuleList() + for i, num in enumerate(cfg.transaction.feature_node_int_num): + emb = torch.nn.Embedding(num, cfg.transaction.feature_int_dim) + torch.nn.init.xavier_uniform_(emb.weight.data) + self.embedding_list.append(emb) + # update encoder_dim + cfg.dataset.encoder_dim = len(cfg.transaction.feature_node_int_num) \ + * cfg.transaction.feature_int_dim + + def forward(self, batch: deepsnap.batch.Batch) -> deepsnap.batch.Batch: + node_embedding = [] + for i in range(len(self.embedding_list)): + node_embedding.append( + self.embedding_list[i](batch.node_feature[:, i].long()) + ) + batch.node_feature = torch.cat(node_embedding, dim=1) + return batch + + +register_node_encoder('roland', TransactionNodeEncoder) + + +class LinearEdgeEncoder(torch.nn.Module): + """ + Basic edge encoder for temporal graphs, this encoder does not assume edge dim, + this encoder uses linear layers to contract/expand raw edge features to + dimension cfg.transaction.feature_amount_dim + feature_time_dim for consistency. + """ + def __init__(self, emb_dim: int): + # emb_dim is not used here. + super(LinearEdgeEncoder, self).__init__() + # For consistency, for non-transaction datasets with only timestamp, + # we use the feature amount dimension + time dimension to generate + # the same dimension as transaction datasets. + # TODO: change to feature_time_dim only for better naming? + expected_dim = cfg.transaction.feature_amount_dim \ + + cfg.transaction.feature_time_dim + + self.linear = nn.Linear(cfg.dataset.edge_dim, expected_dim) + cfg.dataset.edge_dim = expected_dim + + def forward(self, batch: deepsnap.batch.Batch) -> deepsnap.batch.Batch: + batch.edge_feature = self.linear(batch.edge_feature) + return batch + + +register_edge_encoder('roland_general', LinearEdgeEncoder) diff --git a/graphgym/contrib/head/scalable_link_pred.py b/graphgym/contrib/head/scalable_link_pred.py new file mode 100644 index 00000000..6ea875a6 --- /dev/null +++ b/graphgym/contrib/head/scalable_link_pred.py @@ -0,0 +1,108 @@ +""" +An improved version of graphgym.models.head.GNNEdgeHead. This head handles +large link prediction tasks by splitting them into chunks to avoid OOM errors. +This is particular useful for computing MRR when a large amount of memory is +needed. + +(Not implemented yet) Alternatively, one may implement head for MRR by all +prediction task to CPU, by doing so, we need sepearate heads for training and +inference (training requires everything including head to be on GPU). +""" +import torch +import torch.nn as nn +from graphgym.config import cfg +from graphgym.models.layer import MLP +from graphgym.register import register_head + + +class ScalableLinkPred(nn.Module): + def __init__(self, dim_in: int, dim_out: int): + # Use dim_in for graph conv, since link prediction dim_out could be + # binary + # E.g. if decoder='dot', link probability is dot product between + # node embeddings, of dimension dim_in + super(ScalableLinkPred, self).__init__() + # module to decode edges from node embeddings + + if cfg.model.edge_decoding == 'concat': + # Only use node features. + self.layer_post_mp = MLP(dim_in * 2, dim_out, + num_layers=cfg.gnn.layers_post_mp, + bias=True) + # requires parameter + self.decode_module = lambda v1, v2: \ + self.layer_post_mp(torch.cat((v1, v2), dim=-1)) + elif cfg.model.edge_decoding == 'edgeconcat': + # Use both node and edge features. + self.layer_post_mp = MLP(dim_in * 2 + cfg.dataset.edge_dim, dim_out, + num_layers=cfg.gnn.layers_post_mp, + bias=True) + # requires parameter + self.decode_module = lambda v1, v2, edge: \ + self.layer_post_mp(torch.cat((v1, v2, edge), dim=-1)) + else: + if dim_out > 1: + raise ValueError( + 'Binary edge decoding ({})is used for multi-class ' + 'edge/link prediction.'.format(cfg.model.edge_decoding)) + self.layer_post_mp = MLP(dim_in, dim_in, + num_layers=cfg.gnn.layers_post_mp, + bias=True) + if cfg.model.edge_decoding == 'dot': + self.decode_module = lambda v1, v2: torch.sum(v1 * v2, dim=-1) + elif cfg.model.edge_decoding == 'cosine_similarity': + self.decode_module = nn.CosineSimilarity(dim=-1) + else: + raise ValueError('Unknown edge decoding {}.'.format( + cfg.model.edge_decoding)) + + def _apply_index(self, batch): + return batch.node_feature[batch.edge_label_index], \ + batch.edge_label + + def forward_pred(self, batch): + predict_batch_size = cfg.gnn.link_pred_batch_size + num_pred = len(batch.edge_label) + label = batch.edge_label + if num_pred >= predict_batch_size: + # for large prediction tasks, split into chunks. + num_chunks = num_pred // predict_batch_size + 1 + edge_label_index_chunks = torch.chunk( + batch.edge_label_index, num_chunks, dim=1) + gathered_pred = list() + + for edge_label_index in edge_label_index_chunks: + pred = batch.node_feature[edge_label_index] + # node features of the source node of each edge. + nodes_first = pred[0] + nodes_second = pred[1] + if cfg.model.edge_decoding == 'edgeconcat': + raise NotImplementedError + else: + pred = self.decode_module(nodes_first, nodes_second) + gathered_pred.append(pred) + + pred = torch.cat(gathered_pred) + else: + pred, label = self._apply_index(batch) + # node features of the source node of each edge. + nodes_first = pred[0] + nodes_second = pred[1] + if cfg.model.edge_decoding == 'edgeconcat': + edge_feature = torch.index_select( + batch.edge_feature, 0, batch.edge_split_index) + pred = self.decode_module( + nodes_first, nodes_second, edge_feature) + else: + pred = self.decode_module(nodes_first, nodes_second) + return pred, label + + def forward(self, batch): + if cfg.model.edge_decoding != 'concat' and \ + cfg.model.edge_decoding != 'edgeconcat': + batch = self.layer_post_mp(batch) + pred, label = self.forward_pred(batch) + return pred, label + + +register_head('scalable_link_pred', ScalableLinkPred) diff --git a/graphgym/contrib/layer/residual_edge_conv.py b/graphgym/contrib/layer/residual_edge_conv.py new file mode 100644 index 00000000..dc7a7ec0 --- /dev/null +++ b/graphgym/contrib/layer/residual_edge_conv.py @@ -0,0 +1,137 @@ +import torch +import torch.nn as nn +from torch.nn import Parameter +from torch_geometric.nn.conv import MessagePassing +from torch_geometric.nn.inits import zeros +from torch_geometric.utils import add_remaining_self_loops +from torch_scatter import scatter_add + +from graphgym.config import cfg +from graphgym.register import register_layer + + +class ResidualEdgeConvLayer(MessagePassing): + """ + A general GNN layer with arbitrary edge features and self residual + connections. + """ + + def __init__(self, in_channels: int, out_channels: int, + improved: bool = False, cached: bool = False, bias: bool = True, + **kwargs): + super(ResidualEdgeConvLayer, self).__init__(aggr=cfg.gnn.agg, **kwargs) + + self.in_channels = in_channels + self.out_channels = out_channels + self.improved = improved + self.cached = cached + self.normalize = cfg.gnn.normalize_adj + self.msg_direction = cfg.gnn.msg_direction + + if self.msg_direction == 'single': + self.linear_msg = nn.Linear(in_channels + cfg.dataset.edge_dim, + out_channels, bias=False) + elif self.msg_direction == 'both': + self.linear_msg = nn.Linear(in_channels * 2 + cfg.dataset.edge_dim, + out_channels, bias=False) + else: + raise ValueError + + if cfg.gnn.skip_connection == 'affine': + self.linear_skip = nn.Linear(in_channels, out_channels, bias=True) + elif cfg.gnn.skip_connection == 'identity': + assert self.in_channels == self.out_channels + + if bias: + self.bias = Parameter(torch.Tensor(out_channels)) + else: + self.register_parameter('bias', None) + + self.reset_parameters() + + def reset_parameters(self): + zeros(self.bias) + self.cached_result = None + self.cached_num_edges = None + + @staticmethod + def norm(edge_index, num_nodes, edge_weight=None, improved=False, + dtype=None): + if edge_weight is None: + edge_weight = torch.ones((edge_index.size(1),), dtype=dtype, + device=edge_index.device) + + fill_value = 1 if not improved else 2 + edge_index, edge_weight = add_remaining_self_loops( + edge_index, edge_weight, fill_value, num_nodes) + + row, col = edge_index + deg = scatter_add(edge_weight, row, dim=0, dim_size=num_nodes) + deg_inv_sqrt = deg.pow(-0.5) + deg_inv_sqrt[deg_inv_sqrt == float('inf')] = 0 + + return edge_index, deg_inv_sqrt[row] * edge_weight * deg_inv_sqrt[col] + + def forward(self, x, edge_index, edge_weight=None, edge_feature=None): + if self.cached and self.cached_result is not None: + if edge_index.size(1) != self.cached_num_edges: + raise RuntimeError( + 'Cached {} number of edges, but found {}. Please ' + 'disable the caching behavior of this layer by removing ' + 'the `cached=True` argument in its constructor.'.format( + self.cached_num_edges, edge_index.size(1))) + + if not self.cached or self.cached_result is None: + self.cached_num_edges = edge_index.size(1) + if self.normalize: + edge_index, norm = self.norm(edge_index, x.size(self.node_dim), + edge_weight, self.improved, + x.dtype) + else: + norm = edge_weight + self.cached_result = edge_index, norm + + edge_index, norm = self.cached_result + if cfg.gnn.skip_connection == 'affine': + skip_x = self.linear_skip(x) + elif cfg.gnn.skip_connection == 'identity': + skip_x = x + else: + skip_x = 0.0 + return self.propagate(edge_index, x=x, norm=norm, + edge_feature=edge_feature) + skip_x + + def message(self, x_i, x_j, norm, edge_feature): + if self.msg_direction == 'both': + x_j = torch.cat((x_i, x_j, edge_feature), dim=-1) + elif self.msg_direction == 'single': + x_j = torch.cat((x_j, edge_feature), dim=-1) + else: + raise ValueError + x_j = self.linear_msg(x_j) + return norm.view(-1, 1) * x_j if norm is not None else x_j + + def update(self, aggr_out): + if self.bias is not None: + aggr_out = aggr_out + self.bias + return aggr_out + + def __repr__(self): + return '{}({}, {})'.format(self.__class__.__name__, self.in_channels, + self.out_channels) + + +class ResidualEdgeConv(nn.Module): + '''Wrapper for residual edge conv layer''' + + def __init__(self, dim_in, dim_out, bias=False, **kwargs): + super(ResidualEdgeConv, self).__init__() + self.model = ResidualEdgeConvLayer(dim_in, dim_out, bias=bias) + + def forward(self, batch): + batch.node_feature = self.model(batch.node_feature, batch.edge_index, + edge_feature=batch.edge_feature) + return batch + + +register_layer('residual_edge_conv', ResidualEdgeConv) diff --git a/graphgym/contrib/loader/dynamic_graph_utils.py b/graphgym/contrib/loader/dynamic_graph_utils.py new file mode 100644 index 00000000..448401ea --- /dev/null +++ b/graphgym/contrib/loader/dynamic_graph_utils.py @@ -0,0 +1,103 @@ +""" +Helper functions and utilities for dynamic graphs. + +Mar. 31, 2021. +""" +from typing import List + +import numpy as np +import pandas as pd +import torch +from deepsnap.graph import Graph + + +def make_graph_snapshot(g_all: Graph, + snapshot_freq: str, + is_hetero: bool=False) -> List[Graph]: + """ + Constructs a list of graph snapshots based from g_all using g_all.edge_time + and provided snapshot_freq (frequency on calendar). + + Args: + g_all: the entire graph object, g_all must have a edge_time attribute, + g_all.edge_time consists of unix timestamp of edge time. + snapshot_freq: snapshot frequency, must be one of + 'D': daily, 'W': weekly, and 'M': monthly. + is_hetero: whether the graph is heterogeneous. + + Return: + A list of graph object, each graph snapshot has edge level information + (edge_feature, edge_time, etc) of only edges in that time period. + However, every graph snapshot has the same and full node level + information (node_feature, node_type, etc). + """ + # Arg check. + if not hasattr(g_all, 'edge_time'): + raise KeyError('Temporal graph needs to have edge_time attribute.') + + if snapshot_freq.upper() not in ['D', 'W', 'M']: + raise ValueError(f'Unsupported snapshot freq: {snapshot_freq}.') + + snapshot_freq = snapshot_freq.upper() + t = g_all.edge_time.numpy().astype(np.int64) # all timestamps. + + period_split = pd.DataFrame( + {'Timestamp': t, 'TransactionTime': pd.to_datetime(t, unit='s')}, + index=range(len(g_all.edge_time)) + ) + + freq_map = {'D': '%j', # day of year. + 'W': '%W', # week of year. + 'M': '%m'} # month of year. + + period_split['Year'] = period_split['TransactionTime'].dt.strftime( + '%Y').astype(int) + + period_split['SubYearFlag'] = period_split['TransactionTime'].dt.strftime( + freq_map[snapshot_freq]).astype(int) + + period2id = period_split.groupby(['Year', 'SubYearFlag']).indices + # e.g., dictionary w/ key = (2021, 3) and val = array(edge IDs). + + periods = sorted(list(period2id.keys())) # ascending order. + # alternatively, sorted(..., key=lambda x: x[0] + x[1]/1000). + snapshot_list = list() + for p in periods: + # unique IDs of edges in this period. + period_members = period2id[p] + + g_incr = Graph( + node_feature=g_all.node_feature, + edge_feature=g_all.edge_feature[period_members, :], + edge_index=g_all.edge_index[:, period_members], + edge_time=g_all.edge_time[period_members], + directed=g_all.directed, + list_n_type=g_all.list_n_type if is_hetero else None, + list_e_type=g_all.list_e_type if is_hetero else None, + ) + if is_hetero and hasattr(g_all, 'node_type'): + g_incr.node_type = g_all.node_type + g_incr.edge_type = g_all.edge_type[period_members] + snapshot_list.append(g_incr) + return snapshot_list + + +def make_graph_snapshot_by_seconds(g_all: Graph, + freq_sec: int) -> List[Graph]: + """ + Split the entire graph into snapshots by frequency in terms of seconds. + """ + split_criterion = g_all.edge_time // freq_sec + groups = torch.sort(torch.unique(split_criterion))[0] + snapshot_list = list() + for t in groups: + period_members = (split_criterion == t) + g_incr = Graph( + node_feature=g_all.node_feature, + edge_feature=g_all.edge_feature[period_members, :], + edge_index=g_all.edge_index[:, period_members], + edge_time=g_all.edge_time[period_members], + directed=g_all.directed + ) + snapshot_list.append(g_incr) + return snapshot_list diff --git a/graphgym/contrib/loader/roland.py b/graphgym/contrib/loader/roland.py new file mode 100644 index 00000000..ffa8d94d --- /dev/null +++ b/graphgym/contrib/loader/roland.py @@ -0,0 +1,550 @@ +""" +One single loader for the roland project. +""" +import os +from datetime import datetime +from typing import List + +import dask.dataframe as dd +import graphgym.contrib.loader.dynamic_graph_utils as utils +import numpy as np +import pandas as pd +import torch +from dask_ml.preprocessing import OrdinalEncoder as DaskOrdinalEncoder +from deepsnap.graph import Graph +from graphgym.config import cfg +from graphgym.register import register_loader +from sklearn.preprocessing import MinMaxScaler +from sklearn.preprocessing import OrdinalEncoder as SkOrdinalEncoder +from tqdm import tqdm + +# ============================================================================= +# AS-733 Dataset. +# ============================================================================= + + +def load_AS_dataset(dataset_dir: str) -> Graph: + all_files = [x for x in sorted(os.listdir(dataset_dir)) + if (x.startswith('as') and x.endswith('.txt'))] + assert len(all_files) == 733 + assert all(x.endswith('.txt') for x in all_files) + + def file2timestamp(file_name: str) -> int: + t = file_name.strip('.txt').strip('as') + ts = int(datetime.strptime(t, '%Y%m%d').timestamp()) + return ts + + edge_index_lst, edge_time_lst = list(), list() + all_files = sorted(all_files) + + for graph_file in tqdm(all_files): + today = file2timestamp(graph_file) + graph_file = os.path.join(dataset_dir, graph_file) + + src, dst = list(), list() + with open(graph_file, 'r') as f: + for line in f.readlines(): + if line.startswith('#'): + continue + line = line.strip('\n') + v1, v2 = line.split('\t') + src.append(int(v1)) + dst.append(int(v2)) + + edge_index = np.stack((src, dst)) + edge_index_lst.append(edge_index) + + edge_time = np.ones(edge_index.shape[1]) * today + edge_time_lst.append(edge_time) + + edge_index_raw = np.concatenate(edge_index_lst, axis=1).astype(int) + + num_nodes = len(np.unique(edge_index_raw)) + + # encode node indices to consecutive integers. + node_indices = np.sort(np.unique(edge_index_raw)) + enc = SkOrdinalEncoder(categories=[node_indices, node_indices]) + edge_index = enc.fit_transform(edge_index_raw.transpose()).transpose() + edge_index = torch.Tensor(edge_index).long() + edge_time = torch.Tensor(np.concatenate(edge_time_lst)) + + # Use scaled datetime as edge_feature. + scale = edge_time.max() - edge_time.min() + base = edge_time.min() + scaled_edge_time = 2 * (edge_time.clone() - base) / scale + + assert cfg.dataset.AS_node_feature in ['one', 'one_hot_id', + 'one_hot_degree_global'] + + if cfg.dataset.AS_node_feature == 'one': + node_feature = torch.ones(num_nodes, 1) + elif cfg.dataset.AS_node_feature == 'one_hot_id': + # One hot encoding the node ID. + node_feature = torch.Tensor(np.eye(num_nodes)) + elif cfg.dataset.AS_node_feature == 'one_hot_degree_global': + # undirected graph, use only out degree. + _, node_degree = torch.unique(edge_index[0], sorted=True, + return_counts=True) + node_feature = np.zeros((num_nodes, node_degree.max() + 1)) + node_feature[np.arange(num_nodes), node_degree] = 1 + # 1 ~ 63748 degrees, but only 710 possible levels, exclude all zero + # columns. + non_zero_cols = (node_feature.sum(axis=0) > 0) + node_feature = node_feature[:, non_zero_cols] + node_feature = torch.Tensor(node_feature) + else: + raise NotImplementedError + + g_all = Graph( + node_feature=node_feature, + edge_feature=scaled_edge_time.reshape(-1, 1), + edge_index=edge_index, + edge_time=edge_time, + directed=True + ) + + return g_all + + +# ============================================================================= +# BSI-SVT Dataset +# ============================================================================= +# Required for all graphs. +SRC_NODE: str = 'Payer' +DST_NODE: str = 'Payee' +TIMESTAMP: str = 'Timestamp' +AMOUNT: str = 'AmountEUR' + +# Categorical columns are SRC_NODE+var and DST_NODE+var. +# columns: SRC_NODE + NODE_CATE_VARS, DST_NODE + NODE_CATE_VARS, EDGE_CATE_VARS +# will be encoded using ordinal encoder. +# Note that '' corresponds to columns SRC_NODE and DST_NODE. +NODE_CATE_VARS: List[str] = ['', 'Bank', 'Country', 'Region', 'Skd', 'SkdL1', + 'SkdL2', 'Skis', 'SkisL1', 'SkisL2'] +EDGE_CATE_VARS: List[str] = ['# System', 'Currency'] + +# contents of graph.edge_feature +EDGE_FEATURE_COLS: List[str] = [AMOUNT, 'TimestampScaled'] +# contents of graph.node_feature +NODE_FEATURE_LIST: List[str] = ['Bank', 'Country', 'Region', 'SkdL1', 'SkisL1'] + +# Required for heterogeneous graphs only. +# Node and edge features used to define node and edge type in hete GNN. +NODE_TYPE_DEFN: List[str] = ['Country'] +EDGE_TYPE_DEFN: List[str] = ['# System', 'AmountLevel'] + + +# Required for graphs with node features only. + +def get_node_feature(df: pd.DataFrame) -> pd.DataFrame: + """Extract node features from a transaction dataset. + """ + temp = list() + for p in [SRC_NODE, DST_NODE]: + # require ['Payer', 'PayerBank', 'PayerCountry', ...] + cols = [p] + [p + var for var in NODE_FEATURE_LIST] + relevant = df[cols].copy() + # rename to ['Company', 'Bank', 'Country', ...] + relevant.columns = ['Company'] + NODE_FEATURE_LIST + temp.append(relevant) + df_char = pd.concat(temp, axis=0) + + # get company's information based on its first occurrence. + df_char = df_char.groupby('Company').first() + return df_char[NODE_FEATURE_LIST] + + +def construct_additional_features(df: pd.DataFrame) -> pd.DataFrame: + """ + Constructs additional features of the transaction dataset. + """ + for p in ('Payer', 'Payee'): + # %% Location of companies. + mask = (df[p + 'Country'] != 'SI') + out_of_country = np.empty(len(df), dtype=object) + out_of_country[mask] = 'OutOfCountry' + out_of_country[~mask] = 'InCountry' + df[p + 'OutOfCountry'] = out_of_country + + mask = (df['PayerCountry'] != df['PayeeCountry']) + missing_mask = np.logical_or(df['PayerCountry'] == 'missing', + df['PayeeCountry'] == 'missing') + cross_country = np.empty(len(df), dtype=object) + cross_country[mask] = 'CrossCountry' + cross_country[~mask] = 'WithinCountry' + cross_country[missing_mask] = 'Missing' + df['CrossCountry'] = cross_country + + amount_level = np.empty(len(df), dtype=object) + mask_small = df['AmountEUR'] < 500 + mask_medium = np.logical_and(df['AmountEUR'] >= 500, + df['AmountEUR'] < 1000) + mask_large = df['AmountEUR'] >= 1000 + amount_level[mask_small] = '$<500' + amount_level[mask_medium] = '500<=$<1k' + amount_level[mask_large] = '$>=1k' + + df['AmountLevel'] = amount_level + return df + + +def load_bsi_dataset(dataset_dir: str, is_hetero: bool = False) -> Graph: + """ + Loads a single graph object from tsv file. + + Args: + dataset_dir: the path of tsv file to be loaded. + is_hetero: whether to load heterogeneous graph. + + Returns: + graph: a (homogenous) deepsnap graph object. + """ + # Load dataset using dask for fast parallel loading. + df_trans = dd.read_csv(dataset_dir, sep='\t', low_memory=False) + df_trans = df_trans.fillna('missing') + df_trans = df_trans.compute() + if is_hetero: + df_trans = construct_additional_features(df_trans) + df_trans.reset_index(drop=True, inplace=True) # necessary for dask. + + # a unique values of node-level categorical variables. + node_cat_uniques = dict() # Dict[str, np.ndarray of str] + for var in NODE_CATE_VARS: # for each node level categorical variable. + # get unique values of this categorical variable. + relevant = df_trans[[SRC_NODE + var, DST_NODE + var]] + unique_var = pd.unique(relevant.to_numpy().ravel()) + node_cat_uniques[var] = np.sort(unique_var) + # convert corresponding columns into pandas categorical variables. + cate_type = pd.api.types.CategoricalDtype( + categories=node_cat_uniques[var], ordered=True) + for p in ['Payer', 'Payee']: + df_trans[p + var] = df_trans[p + var].astype(cate_type) + + # Convert edge level categorical variables. + for var in EDGE_CATE_VARS: + unique_var = np.sort(pd.unique(df_trans[[var]].to_numpy().ravel())) + cate_type = pd.api.types.CategoricalDtype(categories=unique_var, + ordered=True) + df_trans[var] = df_trans[var].astype(cate_type) + + # Encoding categorical variables, the dask_ml.OrdinalEncoder only modify + # and encode columns of categorical dtype. + enc = DaskOrdinalEncoder() + df_encoded = enc.fit_transform(df_trans) + df_encoded.reset_index(drop=True, inplace=True) + print('Columns encoded to ordinal:') + print(list(enc.categorical_columns_)) + + # Scaling transaction amounts. + scaler = MinMaxScaler((0, 2)) + df_encoded[AMOUNT] = scaler.fit_transform( + df_encoded[AMOUNT].values.reshape(-1, 1)) + + # Scaling timestamps. + time_scaler = MinMaxScaler((0, 2)) + df_encoded['TimestampScaled'] = time_scaler.fit_transform( + df_encoded[TIMESTAMP].values.reshape(-1, 1)) + + # Prepare for output. + edge_feature = torch.Tensor(df_encoded[EDGE_FEATURE_COLS].values) + + feature_edge_int_num = [int(torch.max(edge_feature[:, i])) + 1 + for i in range(len(EDGE_FEATURE_COLS) - 2)] + cfg.transaction.feature_edge_int_num = feature_edge_int_num + print('feature_edge_int_num', feature_edge_int_num) + + edge_index = torch.Tensor( + df_encoded[[SRC_NODE, DST_NODE]].values.transpose()).long() # (2, E) + num_nodes = torch.max(edge_index) + 1 + assert num_nodes == len(node_cat_uniques['']) + + df_node_info = get_node_feature(df_encoded) + print(df_node_info.shape) + node_feature = torch.Tensor(df_node_info.astype(float).values) + + cfg.transaction.feature_node_int_num = [ + int(torch.max(node_feature[:, i])) + 1 + for i in range(len(NODE_FEATURE_LIST)) + ] + + print('feature_node_int_num: ', + [int(torch.max(node_feature[:, i])) + 1 + for i in range(len(NODE_FEATURE_LIST))]) + + edge_time = torch.FloatTensor(df_encoded[TIMESTAMP].values) + + graph = Graph( + node_feature=node_feature, + edge_feature=edge_feature, + edge_index=edge_index, + edge_time=edge_time, + directed=True + ) + + if is_hetero: + # Construct node type signatures. E.g., 'USA--CA' for country + region. + df_node_info['NodeType'] = df_node_info[NODE_TYPE_DEFN[0]].astype(str) + for var in NODE_TYPE_DEFN[1:]: + df_node_info['NodeType'] += ('--' + df_node_info[var].astype(str)) + + node_type_enc = SkOrdinalEncoder() + # The sklearn ordinal encoder transforms numpy array instead. + node_type_int = node_type_enc.fit_transform( + df_node_info['NodeType'].values.reshape(-1, 1)) + node_type_int = torch.FloatTensor(node_type_int) + + # Construct edge type signatures. + df_trans['EdgeType'] = df_trans[EDGE_TYPE_DEFN[0]].astype(str) + for var in EDGE_TYPE_DEFN[1:]: + df_trans['EdgeType'] += ('--' + df_trans[var].astype(str)) + + edge_type_enc = SkOrdinalEncoder() + edge_type_int = edge_type_enc.fit_transform( + df_trans['EdgeType'].values.reshape(-1, 1)) + edge_type_int = torch.FloatTensor(edge_type_int) + + graph.node_type = node_type_int.reshape(-1,).long() + graph.edge_type = edge_type_int.reshape(-1,).long() + + # add a list of unique types for reference. + graph.list_n_type = node_type_int.unique().long() + graph.list_e_type = edge_type_int.unique().long() + + return graph + +# ============================================================================= +# Bitcoin Dataset. +# ============================================================================= + + +def load_bitcoin_dataset(dataset_dir: str) -> Graph: + df_trans = pd.read_csv(dataset_dir, sep=',', header=None, index_col=None) + df_trans.columns = ['SOURCE', 'TARGET', 'RATING', 'TIME'] + # NOTE: 'SOURCE' and 'TARGET' are not consecutive. + num_nodes = len( + pd.unique(df_trans[['SOURCE', 'TARGET']].to_numpy().ravel())) + + # bitcoin OTC contains decimal numbers, round them. + df_trans['TIME'] = df_trans['TIME'].astype(np.int).astype(np.float) + assert not np.any(pd.isna(df_trans).values) + + time_scaler = MinMaxScaler((0, 2)) + df_trans['TimestampScaled'] = time_scaler.fit_transform( + df_trans['TIME'].values.reshape(-1, 1)) + + edge_feature = torch.Tensor( + df_trans[['RATING', 'TimestampScaled']].values) # (E, edge_dim) + + node_indices = np.sort( + pd.unique(df_trans[['SOURCE', 'TARGET']].to_numpy().ravel())) + enc = SkOrdinalEncoder(categories=[node_indices, node_indices]) + raw_edges = df_trans[['SOURCE', 'TARGET']].values + edge_index = enc.fit_transform(raw_edges).transpose() + edge_index = torch.LongTensor(edge_index) + + # num_nodes = torch.max(edge_index) + 1 + # Use dummy node features. + node_feature = torch.ones(num_nodes, 1).float() + + edge_time = torch.FloatTensor(df_trans['TIME'].values) + + graph = Graph( + node_feature=node_feature, + edge_feature=edge_feature, + edge_index=edge_index, + edge_time=edge_time, + directed=True + ) + return graph + + +# ============================================================================= +# Reddit Dataset. +# ============================================================================= + + +def load_reddit_dataset(dataset_dir: str) -> Graph: + df_trans = dd.read_csv(dataset_dir, sep='\t', low_memory=False) + df_trans = df_trans.compute() + assert not np.any(pd.isna(df_trans).values) + df_trans.reset_index(drop=True, inplace=True) # required for dask. + + # Encode src and dst node IDs. + # get unique values of src and dst. + unique_subreddits = pd.unique( + df_trans[['SOURCE_SUBREDDIT', 'TARGET_SUBREDDIT']].to_numpy().ravel()) + unique_subreddits = np.sort(unique_subreddits) + cate_type = pd.api.types.CategoricalDtype(categories=unique_subreddits, + ordered=True) + df_trans['SOURCE_SUBREDDIT'] = df_trans['SOURCE_SUBREDDIT'].astype( + cate_type) + df_trans['TARGET_SUBREDDIT'] = df_trans['TARGET_SUBREDDIT'].astype( + cate_type) + enc = DaskOrdinalEncoder(columns=['SOURCE_SUBREDDIT', 'TARGET_SUBREDDIT']) + df_encoded = enc.fit_transform(df_trans) + df_encoded.reset_index(drop=True, inplace=True) + + # Add node feature from the embedding dataset. + node_embedding_dir = os.path.join(cfg.dataset.dir, + 'web-redditEmbeddings-subreddits.csv') + + # index: subreddit name, values: embedding. + df_node = pd.read_csv(node_embedding_dir, header=None, index_col=0) + + # ordinal encoding follows order in unique_subreddits. + # df_encoded['SOURCE_SUBREDDIT'] contains encoded integral values. + # unique_subreddits[df_encoded['SOURCE_SUBREDDIT']] + # tries to reverse encoded_integer --> original subreddit name. + # check if recovered sub-reddit name matched the raw data. + for col in ['SOURCE_SUBREDDIT', 'TARGET_SUBREDDIT']: + assert all(unique_subreddits[df_encoded[col]] == df_trans[col]) + + num_nodes = len(cate_type.categories) + node_feature = torch.ones(size=(num_nodes, 300)) + # for nodes without precomputed embedding, use the average value. + node_feature = node_feature * np.mean(df_node.values) + + # cate_type.categories[i] is encoded to i, by construction. + for i, subreddit in enumerate(cate_type.categories): + if subreddit in df_node.index: + embedding = df_node.loc[subreddit] + node_feature[i, :] = torch.Tensor(embedding.values) + + # Original format: df['TIMESTAMP'][0] = '2013-12-31 16:39:18' + # Convert to unix timestamp (integers). + df_encoded['TIMESTAMP'] = pd.to_datetime(df_encoded['TIMESTAMP'], + format='%Y-%m-%d %H:%M:%S') + df_encoded['TIMESTAMP'] = (df_encoded['TIMESTAMP'] - pd.Timestamp( + '1970-01-01')) // pd.Timedelta('1s') # now integers. + + # Scale edge time. + time_scaler = MinMaxScaler((0, 2)) + df_encoded['TimestampScaled'] = time_scaler.fit_transform( + df_encoded['TIMESTAMP'].values.reshape(-1, 1)) + + # Link sentimental representation (86-dimension). + # comma-separated string: '3.1,5.1,0.0,...' + senti_str_lst = df_encoded['PROPERTIES'].values + edge_senti_embedding = [x.split(',') for x in senti_str_lst] + edge_senti_embedding = np.array(edge_senti_embedding).astype(np.float32) + # (E, 86) + + ef = df_encoded[['TimestampScaled', 'LINK_SENTIMENT']].values + edge_feature = np.concatenate([ef, edge_senti_embedding], axis=1) + edge_feature = torch.Tensor(edge_feature).float() # (E, 88) + + edge_index = torch.Tensor( + df_encoded[['SOURCE_SUBREDDIT', + 'TARGET_SUBREDDIT']].values.transpose()).long() # (2, E) + num_nodes = torch.max(edge_index) + 1 + + edge_time = torch.FloatTensor(df_encoded['TIMESTAMP'].values) + + graph = Graph( + node_feature=node_feature, + edge_feature=edge_feature, + edge_index=edge_index, + edge_time=edge_time, + directed=True + ) + + return graph + + +# ============================================================================= +# College Message Dataset. +# ============================================================================= + + +def load_college_message_dataset(dataset_dir: str) -> Graph: + df_trans = pd.read_csv(dataset_dir, sep=' ', header=None) + df_trans.columns = ['SRC', 'DST', 'TIMESTAMP'] + assert not np.any(pd.isna(df_trans).values) + df_trans.reset_index(drop=True, inplace=True) + + # Node IDs of this dataset start from 1, re-index to 0-based. + df_trans['SRC'] -= 1 + df_trans['DST'] -= 1 + + print('num of edges:', len(df_trans)) + print('num of nodes:', np.max(df_trans[['SRC', 'DST']].values) + 1) + + time_scaler = MinMaxScaler((0, 2)) + df_trans['TimestampScaled'] = time_scaler.fit_transform( + df_trans['TIMESTAMP'].values.reshape(-1, 1)) + + edge_feature = torch.Tensor( + df_trans[['TimestampScaled']].values).view(-1, 1) + edge_index = torch.Tensor( + df_trans[['SRC', 'DST']].values.transpose()).long() # (2, E) + num_nodes = torch.max(edge_index) + 1 + + node_feature = torch.ones(num_nodes, 1) + + edge_time = torch.FloatTensor(df_trans['TIMESTAMP'].values) + + graph = Graph( + node_feature=node_feature, + edge_feature=edge_feature, + edge_index=edge_index, + edge_time=edge_time, + directed=True + ) + + return graph + + +def load_roland_dataset(format: str, name: str, dataset_dir: str + ) -> List[Graph]: + if format == 'roland': + # Load the entire graph from specified dataset. + if name in ['AS-733']: + g_all = load_AS_dataset(os.path.join(dataset_dir, name)) + elif name in ['bsi_svt_2008.tsv']: + # NOTE: only BSI dataset supports hetero graph. + g_all = load_bsi_dataset(os.path.join(dataset_dir, name), + is_hetero=cfg.dataset.is_hetero) + elif name in ['bitcoinotc.csv', 'bitcoinalpha.csv']: + g_all = load_bitcoin_dataset(os.path.join(dataset_dir, name)) + elif name in ['reddit-body.tsv', 'reddit-title.tsv']: + g_all = load_reddit_dataset(os.path.join(dataset_dir, name)) + elif name in ['CollegeMsg.txt']: + g_all = load_college_message_dataset( + os.path.join(dataset_dir, name)) + else: + raise ValueError(f'Unsupported filename') + + # Make the graph snapshots. + snapshot_freq = cfg.transaction.snapshot_freq + if snapshot_freq.upper() in ['D', 'W', 'M']: + # Split snapshot using calendar frequency. + snapshot_list = utils.make_graph_snapshot(g_all, + snapshot_freq, + cfg.dataset.is_hetero) + elif snapshot_freq.endswith('s'): + # Split using frequency in terms of seconds. + assert snapshot_freq.endswith('s') + snapshot_freq = int(snapshot_freq.strip('s')) + assert not cfg.dataset.is_hetero, 'Hetero graph is not supported.' + snapshot_list = utils.make_graph_snapshot_by_seconds(g_all, + snapshot_freq) + else: + raise ValueError(f'Unsupported frequency type: {snapshot_freq}') + + num_nodes = g_all.edge_index.max() + 1 + + for g_snapshot in snapshot_list: + g_snapshot.node_states = [0 for _ in range(cfg.gnn.layers_mp)] + g_snapshot.node_cells = [0 for _ in range(cfg.gnn.layers_mp)] + g_snapshot.node_degree_existing = torch.zeros(num_nodes) + + # Filter small snapshots. + filtered_graphs = list() + for g in snapshot_list: + if g.num_edges >= 10: + filtered_graphs.append(g) + + return filtered_graphs + + +register_loader('roland', load_roland_dataset) diff --git a/graphgym/contrib/loader/roland_template.py b/graphgym/contrib/loader/roland_template.py new file mode 100644 index 00000000..b576029e --- /dev/null +++ b/graphgym/contrib/loader/roland_template.py @@ -0,0 +1,69 @@ +""" +A generic loader for the roland project, modify this template to build +loaders for other financial transaction datasets and dynamic graphs. +NOTE: this script is the trimmed version for homogenous graphs only. +Mar. 22, 2021. +# Search for TODO in this file. +""" +import os +from typing import List + +import deepsnap +import graphgym.contrib.loader.dynamic_graph_utils as utils +import torch +from deepsnap.graph import Graph +from graphgym.config import cfg +from graphgym.register import register_loader + + +def load_single_dataset(dataset_dir: str) -> Graph: + # TODO: Load your data from dataset_dir here. + # Example: + num_nodes = 500 + num_node_feature = 16 + num_edges = 10000 + num_edge_feature = 32 + node_feature = torch.rand((num_nodes, num_node_feature)) + edge_feature = torch.rand((num_edges, num_edge_feature)) + edge_index = torch.randint(0, num_nodes - 1, (2, num_edges)) + # edge time should be unix timestmap integers. + # random generate timestamps from 2021-05-01 to 2021-06-01 + edge_time = torch.randint(1619852450, 1622530850, (num_edges,)).sort()[0] + + graph = Graph( + node_feature=node_feature, + edge_feature=edge_feature, + edge_index=edge_index, + edge_time=edge_time, + directed=True + ) + + return graph + + +def load_generic_dataset(format: str, name: str, dataset_dir: str + ) -> List[deepsnap.graph.Graph]: + """Load the dataset as a list of graph snapshots. + + Args: + format (str): format of dataset. + name (str): file name of dataset. + dataset_dir (str): path of dataset, do NOT include the file name, use + the parent directory of dataset file. + + Returns: + List[deepsnap.graph.Graph]: a list of graph snapshots. + """ + # TODO: change the format name. + if format == 'YOUR_FORMAT_NAME_HERE': + dataset_dir = os.path.join(dataset_dir, name) + g_all = load_single_dataset(dataset_dir) + snapshot_list = utils.make_graph_snapshot( + g_all, + snapshot_freq=cfg.transaction.snapshot_freq, + is_hetero=cfg.dataset.is_hetero) + return snapshot_list + + +# TODO: don't forget to register the loader. +register_loader('YOUR_LOADER_NAME_HERE', load_generic_dataset) diff --git a/graphgym/contrib/loader/roland_template_hetero.py b/graphgym/contrib/loader/roland_template_hetero.py new file mode 100644 index 00000000..fe202a33 --- /dev/null +++ b/graphgym/contrib/loader/roland_template_hetero.py @@ -0,0 +1,85 @@ +""" +A generic loader for the roland project, modify this template to build +loaders for other financial transaction datasets and dynamic graphs. +NOTE: this script is the trimmed version for homogenous graphs only. +Mar. 22, 2021. +# Search for TODO in this file. +""" +import os +from typing import List + +import deepsnap +import graphgym.contrib.loader.dynamic_graph_utils as utils +import torch +from deepsnap.graph import Graph +from graphgym.config import cfg +from graphgym.register import register_loader + + +def load_single_hetero_dataset(dataset_dir: str) -> Graph: + # TODO: Load your data from dataset_dir here. + # Example: + num_nodes = 500 + num_node_feature = 16 + num_edges = 10000 + num_edge_feature = 32 + node_feature = torch.rand((num_nodes, num_node_feature)) + edge_feature = torch.rand((num_edges, num_edge_feature)) + edge_index = torch.randint(0, num_nodes - 1, (2, num_edges)) + # edge time should be unix timestmap integers. + # random generate timestamps from 2021-05-01 to 2021-06-01 + edge_time = torch.randint(1619852450, 1622530850, (num_edges,)).sort()[0] + + graph = Graph( + node_feature=node_feature, + edge_feature=edge_feature, + edge_index=edge_index, + edge_time=edge_time, + directed=True + ) + + # TODO: additional operations required for heterogeneous graphs. + # Assume there are 3 types of edges. + num_edge_types = 3 + edge_type_int = torch.randint(0, num_edge_types - 1, (num_edges,)).float() + # Assume there are 5 types of nodes. + num_node_types = 5 + node_type_int = torch.randint(0, num_node_types - 1, (num_nodes,)).float() + + graph.node_type = node_type_int.reshape(-1,).long() + graph.edge_type = edge_type_int.reshape(-1,).long() + + # add a list of unique types for reference. + graph.list_n_type = node_type_int.unique().long() + graph.list_e_type = edge_type_int.unique().long() + + return graph + + +def load_generic_dataset(format: str, name: str, dataset_dir: str + ) -> List[deepsnap.graph.Graph]: + """Load the dataset as a list of graph snapshots. + + Args: + format (str): format of dataset. + name (str): file name of dataset. + dataset_dir (str): path of dataset, do NOT include the file name, use + the parent directory of dataset file. + + Returns: + List[deepsnap.graph.Graph]: a list of graph snapshots. + """ + # TODO: change the format name. + if format == 'YOUR_HETERO_FORMAT_NAME_HERE': + assert cfg.dataset.is_hetero + dataset_dir = os.path.join(dataset_dir, name) + g_all = load_single_hetero_dataset(dataset_dir) + snapshot_list = utils.make_graph_snapshot( + g_all, + snapshot_freq=cfg.transaction.snapshot_freq, + is_hetero=cfg.dataset.is_hetero) + return snapshot_list + + +# TODO: don't forget to register the loader. +register_loader('YOUR_HETERO_LOADER_NAME_HERE', load_generic_dataset) diff --git a/graphgym/contrib/network/gnn_recurrent.py b/graphgym/contrib/network/gnn_recurrent.py new file mode 100644 index 00000000..85667d63 --- /dev/null +++ b/graphgym/contrib/network/gnn_recurrent.py @@ -0,0 +1,109 @@ +import graphgym.register as register +import torch.nn as nn +import torch.nn.functional as F +from graphgym.config import cfg +from graphgym.contrib.stage import * +from graphgym.init import init_weights +from graphgym.models.feature_augment import Preprocess +from graphgym.models.feature_encoder import (edge_encoder_dict, + node_encoder_dict) +from graphgym.models.head import head_dict +from graphgym.models.layer import (BatchNorm1dEdge, BatchNorm1dNode, + GeneralMultiLayer, layer_dict) +from graphgym.models.layer_recurrent import GeneralRecurrentLayer +from graphgym.register import register_network + + +def GNNLayer(dim_in: int, dim_out: int, has_act: bool = True, layer_id: int = 0): + # General constructor for GNN layer. + return GeneralRecurrentLayer(cfg.gnn.layer_type, dim_in, dim_out, + has_act, layer_id=layer_id) + + +def GNNPreMP(dim_in, dim_out): + r'''Constructs preprocessing layers: dim_in --> dim_out --> dim_out --> ... --> dim_out''' + return GeneralMultiLayer('linear', cfg.gnn.layers_pre_mp, + dim_in, dim_out, dim_inner=dim_out, + final_act=True) + + +class GNNStackStage(nn.Module): + def __init__(self, dim_in, dim_out, num_layers): + super(GNNStackStage, self).__init__() + for i in range(num_layers): + d_in = dim_in if i == 0 else dim_out + layer = GNNLayer(d_in, dim_out, layer_id=i) + self.add_module('layer{}'.format(i), layer) + self.dim_out = dim_out + + def forward(self, batch): + for layer in self.children(): + batch = layer(batch) + if cfg.gnn.l2norm: + batch.node_feature = F.normalize(batch.node_feature, p=2, dim=-1) + return batch + + +stage_dict = { + 'stack': GNNStackStage, +} + +stage_dict = {**register.stage_dict, **stage_dict} + + +class GNNRecurrent(nn.Module): + r'''The General GNN model''' + + def __init__(self, dim_in, dim_out, **kwargs): + r'''Initializes the GNN model. + + Args: + dim_in, dim_out: dimensions of in and out channels. + Parameters: + node_encoding_classes - For integer features, gives the number + of possible integer features to map. + ''' + super(GNNRecurrent, self).__init__() + # Stage: inter-layer connections. + GNNStage = stage_dict[cfg.gnn.stage_type] + # Head: prediction head, the final layer. + GNNHead = head_dict[cfg.dataset.task] + + if cfg.dataset.node_encoder: + # Encode integer node features via nn.Embeddings + NodeEncoder = node_encoder_dict[cfg.dataset.node_encoder_name] + self.node_encoder = NodeEncoder(cfg.dataset.encoder_dim) + if cfg.dataset.node_encoder_bn: + self.node_encoder_bn = BatchNorm1dNode(cfg.dataset.encoder_dim) + # Update dim_in to reflect the new dimension fo the node features + dim_in = cfg.dataset.encoder_dim + + if cfg.dataset.edge_encoder: + # Encode integer edge features via nn.Embeddings + EdgeEncoder = edge_encoder_dict[cfg.dataset.edge_encoder_name] + self.edge_encoder = EdgeEncoder(cfg.dataset.encoder_dim) + if cfg.dataset.edge_encoder_bn: + self.edge_encoder_bn = BatchNorm1dEdge(cfg.dataset.edge_dim) + + self.preprocess = Preprocess(dim_in) + d_in = self.preprocess.dim_out + + if cfg.gnn.layers_pre_mp > 0: + self.pre_mp = GNNPreMP(d_in, cfg.gnn.dim_inner) + d_in = cfg.gnn.dim_inner + if cfg.gnn.layers_mp >= 1: + self.mp = GNNStage(dim_in=d_in, + dim_out=cfg.gnn.dim_inner, + num_layers=cfg.gnn.layers_mp) + d_in = self.mp.dim_out + self.post_mp = GNNHead(dim_in=d_in, dim_out=dim_out) + + self.apply(init_weights) + + def forward(self, batch): + for module in self.children(): + batch = module(batch) + return batch + + +register_network('gnn_recurrent', GNNRecurrent) diff --git a/graphgym/contrib/train/train_live_update.py b/graphgym/contrib/train/train_live_update.py new file mode 100644 index 00000000..aa8a91fd --- /dev/null +++ b/graphgym/contrib/train/train_live_update.py @@ -0,0 +1,271 @@ +""" +The ROLAND training pipeline with live-update. +""" +import copy +import datetime +import logging +from typing import Dict, List, Optional, Tuple + +import deepsnap +import numpy as np +import torch +from graphgym.checkpoint import clean_ckpt +from graphgym.config import cfg +from graphgym.contrib.train import train_utils +from graphgym.loss import compute_loss +from graphgym.optimizer import create_optimizer, create_scheduler +from graphgym.register import register_train +from graphgym.utils.io import makedirs_rm_exist +from graphgym.utils.stats import node_degree +from torch.utils.tensorboard import SummaryWriter +from tqdm import tqdm + + +@torch.no_grad() +def get_task_batch(dataset: deepsnap.dataset.GraphDataset, + today: int, tomorrow: int, + prev_node_states: Optional[Dict[str, List[torch.Tensor]]] + ) -> deepsnap.graph.Graph: + """ + Construct batch required for the task (today, tomorrow). + For current implementation, we use tomorrow = today + 1. + As defined in batch's get_item method (used to get edge_label and + get_label_index), edge_label and edge_label_index returned would be + different everytime get_task_batch() is called. + + Moreover, copy node-memories (node_states and node_cells) to the batch. + + Lastly, this method moves the created task batch to the appropriate device. + """ + assert today < tomorrow < len(dataset) + # Get edges for message passing and prediction task. + batch = dataset[today].clone() + batch.edge_label = dataset[tomorrow].edge_label.clone() + batch.edge_label_index = dataset[tomorrow].edge_label_index.clone() + + # Copy previous memory to the batch. + if prev_node_states is not None: + for key, val in prev_node_states.items(): + copied = [x.detach().clone() for x in val] + setattr(batch, key, copied) + + batch = train_utils.move_batch_to_device(batch, cfg.device) + return batch + + +@torch.no_grad() +def update_node_states(model, dataset, task: Tuple[int, int], + prev_node_states: Optional[ + Dict[str, List[torch.Tensor]]] + ) -> Dict[str, List[torch.Tensor]]: + """Perform the provided task and keep track of the latest node_states. + + Example: task = (t, t+1), + the prev_node_states contains node embeddings at time (t-1). + the model perform task (t, t+1): + Input: (node embedding at t - 1, edges at t). + Output: possible transactions at t+1. + the model also generates node embeddings at t. + + after doing task (t, t+1), node_states contains information + from snapshot t. + """ + today, tomorrow = task + batch = get_task_batch(dataset, today, tomorrow, prev_node_states).clone() + # Let the model modify batch.node_states (and batch.node_cells). + # This operation does not track gradient, so should not affect back-prop. + _, _ = model(batch) # Inplace modification on batch. + # Collect the updated node states. + out = dict() + out['node_states'] = [x.detach().clone() for x in batch.node_states] + # If node cells are also used. + if isinstance(batch.node_cells[0], torch.Tensor): + out['node_cells'] = [x.detach().clone() for x in batch.node_cells] + + return out + + +def train_step(model, optimizer, scheduler, dataset, + task: Tuple[int, int], + prev_node_states: Optional[Dict[str, torch.Tensor]] + ) -> Dict[str, float]: + """ + After receiving ground truth from a particular task, update the model by + performing back-propagation. + For example, on day t, the ground truth of task (t-1, t) has been revealed, + train the model using G[t-1] for message passing and label[t] as target. + """ + optimizer.zero_grad() + torch.cuda.empty_cache() + + today, tomorrow = task + model.train() + batch = get_task_batch(dataset, today, tomorrow, prev_node_states).clone() + + pred, true = model(batch) + loss, pred_score = compute_loss(pred, true) + loss.backward() + optimizer.step() + + scheduler.step() + return {'loss': loss.item()} + + +@torch.no_grad() +def evaluate_step(model, dataset, task: Tuple[int, int], + prev_node_states: Optional[Dict[str, List[torch.Tensor]]], + fast: bool = False) -> Dict[str, float]: + """ + Evaluate model's performance on task = (today, tomorrow) + where today and tomorrow are integers indexing snapshots. + """ + today, tomorrow = task + model.eval() + batch = get_task_batch(dataset, today, tomorrow, prev_node_states).clone() + + pred, true = model(batch) + loss, pred_score = compute_loss(pred, true) + + if fast: + # skip MRR calculation for internal validation. + return {'loss': loss.item()} + + mrr_batch = get_task_batch(dataset, today, tomorrow, + prev_node_states).clone() + + mrr = train_utils.compute_MRR( + mrr_batch, + model, + num_neg_per_node=cfg.metric.mrr_num_negative_edges, + method=cfg.metric.mrr_method) + + return {'loss': loss.item(), 'mrr': mrr} + + +def train_live_update(loggers, loaders, model, optimizer, scheduler, datasets, + **kwargs): + + for dataset in datasets: + # Sometimes edge degree info is already included in dataset. + if not hasattr(dataset[0], 'keep_ratio'): + train_utils.precompute_edge_degree_info(dataset) + + num_splits = len(loggers) # train/val/test splits. + # range for today in (today, tomorrow) task pairs. + task_range = range(len(datasets[0]) - cfg.transaction.horizon) + + t = datetime.datetime.now().strftime('%b%d_%H-%M-%S') + + # directory to store tensorboard files of this run. + out_dir = cfg.out_dir.replace('/', '\\') + # dir to store all run outputs for the entire batch. + run_dir = 'runs_' + cfg.remark + + print(f'Tensorboard directory: {out_dir}') + # If tensorboard directory exists, this config is in the re-run phase + # of run_batch, replace logs of previous runs with the new one. + makedirs_rm_exist(f'./{run_dir}/{out_dir}') + writer = SummaryWriter(f'./{run_dir}/{out_dir}') + + # save a copy of configuration for later identifications. + with open(f'./{run_dir}/{out_dir}/config.yaml', 'w') as f: + cfg.dump(stream=f) + + prev_node_states = None # no previous state on day 0. + # {'node_states': [Tensor, Tensor], 'node_cells: [Tensor, Tensor]} + + model_meta = None # the state_dict() object of the meta-model. + + # TODO: How to incorporate logger? + + for t in tqdm(task_range, desc='Snapshot'): + # current task: t --> t+1. + # (1) Evaluate model's performance on this task, at this time, the + # model has seen no information on t+1, this evaluation is fair. + # TODO: modify here to predict on all edges? + for i in range(1, num_splits): + # Validation and test edges. + perf = evaluate_step(model, datasets[i], (t, t + 1), + prev_node_states) + + writer.add_scalars('val' if i == 1 else 'test', perf, t) + + # (2) Reveal the ground truth of task (t, t+1) and update the model + # to prepare for the next task. + del optimizer, scheduler # use new optimizers. + optimizer = create_optimizer(model.parameters()) + scheduler = create_scheduler(optimizer) + + # best model's validation loss, training epochs, and state_dict. + best_model = {'val_loss': np.inf, 'train_epoch': 0, 'state': None} + # keep track of how long we have NOT update the best model. + best_model_unchanged = 0 + # after not updating the best model for `tol` epochs, stop. + tol = cfg.train.internal_validation_tolerance + + # internal training loop (intra-snapshot cross-validation). + # choose the best model using current validation set, prepare for + # next task. + + if cfg.meta.is_meta and (model_meta is not None): + # For meta-learning, start fine-tuning from the meta-model. + model.load_state_dict(copy.deepcopy(model_meta)) + + # Internal training loop. + for i in tqdm(range(cfg.optim.max_epoch + 1), desc='live update', + leave=False): + # Start with the un-trained model (i = 0), evaluate the model. + internal_val_perf = evaluate_step(model, datasets[1], + (t, t + 1), + prev_node_states, fast=True) + val_loss = internal_val_perf['loss'] + + if val_loss < best_model['val_loss']: + # replace the best model with the current model. + best_model = {'val_loss': val_loss, 'train_epoch': i, + 'state': copy.deepcopy(model.state_dict())} + best_model_unchanged = 0 + else: + # the current best model has dominated for these epochs. + best_model_unchanged += 1 + + # if (i >= 2 * tol) and (best_model_unchanged >= tol): + if best_model_unchanged >= tol: + # If the best model has not been updated for a while, stop. + break + else: + # Otherwise, keep training. + train_perf = train_step(model, optimizer, scheduler, + datasets[0], (t, t + 1), + prev_node_states) + writer.add_scalars('train', train_perf, t) + + writer.add_scalar('internal_best_val', best_model['val_loss'], t) + writer.add_scalar('best epoch', best_model['train_epoch'], t) + + # (3) Actually perform the update on training set to get node_states + # contains information up to time t. + # Use the best model selected from intra-snapshot cross-validation. + model.load_state_dict(best_model['state']) + + if cfg.meta.is_meta: # update meta-learning's initialization weights. + if model_meta is None: # for the first task. + model_meta = copy.deepcopy(best_model['state']) + else: # for subsequent task, update init. + # (1-alpha)*model_meta + alpha*best_model. + model_meta = train_utils.average_state_dict(model_meta, + best_model['state'], + cfg.meta.alpha) + + prev_node_states = update_node_states(model, datasets[0], (t, t + 1), + prev_node_states) + + writer.close() + + if cfg.train.ckpt_clean: + clean_ckpt() + + logging.info('Task done, results saved in {}'.format(cfg.out_dir)) + + +register_train('live_update', train_live_update) diff --git a/graphgym/contrib/train/train_utils.py b/graphgym/contrib/train/train_utils.py new file mode 100644 index 00000000..5784147e --- /dev/null +++ b/graphgym/contrib/train/train_utils.py @@ -0,0 +1,410 @@ +""" +Metrics, other utility, and helper functions. +""" +# TODO: proof-read this file. +# TODO: remove comments. +import copy + +import deepsnap +import numpy as np +import torch +from graphgym.config import cfg +from graphgym.loss import compute_loss +from graphgym.utils.stats import node_degree +from torch_scatter import scatter_max, scatter_mean, scatter_min + + +@torch.no_grad() +def average_state_dict(dict1: dict, dict2: dict, weight: float) -> dict: + """ + Average two model.state_dict() objects, + ut = (1-w)*dict1 + w*dict2 + when dict1, dict2 are model_dicts, this method updates the meta-model. + """ + assert 0 <= weight <= 1 + d1 = copy.deepcopy(dict1) + d2 = copy.deepcopy(dict2) + out = dict() + for key in d1.keys(): + assert isinstance(d1[key], torch.Tensor) + param1 = d1[key].detach().clone() + assert isinstance(d2[key], torch.Tensor) + param2 = d2[key].detach().clone() + out[key] = (1 - weight) * param1 + weight * param2 + return out + + +def get_keep_ratio(existing: torch.Tensor, + new: torch.Tensor, + mode: str = 'linear') -> torch.Tensor: + """ + Get the keep ratio for individual nodes to update node embeddings. + Specifically: + state[v,t] = state[v,t-1]*keep_ratio + new_feature[v,t]*(1-keep_ratio) + + Args: + existing: a tensor of nodes' degrees in G[0], G[1], ..., G[t-1]. + new: a tensor of nodes' degrees in G[t]. + mode: how to compute the keep_ratio. + + Returns: + A tensor with shape (num_nodes,) valued in [0, 1]. + """ + if mode == 'constant': + # This scheme is equivalent to exponential decaying. + ratio = torch.ones_like(existing) + # node observed for the first time, keep_ratio = 0. + ratio[torch.logical_and(existing == 0, new > 0)] = 0 + # take convex combination of old and new embeddings. + # 1/2 can be changed to other values. + ratio[torch.logical_and(existing > 0, new > 0)] = 1 / 2 + # inactive nodes have keep ratio 1, embeddings don't change. + elif mode == 'linear': + # The original method proposed by Jiaxuan. + ratio = existing / (existing + new + 1e-6) + # Following methods aim to shrink the weight of existing + # degrees, help to ensure non-trivial embedding update when the graph + # is large and history is long. + elif mode == 'log': + ratio = torch.log(existing + 1) / ( + torch.log(existing + 1) + new + 1e-6) + elif mode == 'sqrt': + ratio = torch.sqrt(existing) / (torch.sqrt(existing) + new + 1e-6) + else: + raise NotImplementedError(f'Mode {mode} is not supported.') + return ratio + + +@torch.no_grad() +def precompute_edge_degree_info(dataset: deepsnap.dataset.GraphDataset): + """Pre-computes edge_degree_existing, edge_degree_new and keep ratio + at each snapshot. Inplace modifications. + """ + # Assume all graph snapshots have the same number of nodes. + num_nodes = dataset[0].node_feature.shape[0] + for t in range(len(dataset)): + if t == 0: + # No previous edges for any nodes. + dataset[t].node_degree_existing = torch.zeros(num_nodes) + else: + # degree[ int: + """Computes how much memory a batch has consumed.""" + total_byte = 0 + for k, v in batch.__dict__.items(): + if isinstance(v, torch.Tensor): + total_byte += v.element_size() * v.nelement() + elif isinstance(v, list): # for node_states. + for sub_v in v: + if isinstance(sub_v, torch.Tensor): + total_byte += sub_v.element_size() * sub_v.nelement() + + return total_byte / (1024 ** 2) # MiB. + + +def move_batch_to_device(batch: deepsnap.graph.Graph, + device: str) -> deepsnap.graph.Graph: + """Moves and collects everything in the batch to the target device.""" + device = torch.device(device) + # This handles node_feature, edge_feature, etc. + batch = batch.to(device) + + for layer in range(len(batch.node_states)): + if torch.is_tensor(batch.node_states[layer]): + batch.node_states[layer] = batch.node_states[layer].to(device) + + if hasattr(batch, 'node_cells'): + # node_cells exist only for LSTM type RNNs. + for layer in range(len(batch.node_cells)): + if torch.is_tensor(batch.node_cells[layer]): + batch.node_cells[layer] = batch.node_cells[layer].to(device) + + return batch + + +def edge_index_difference(edge_include: torch.LongTensor, + edge_except: torch.LongTensor, + num_nodes: int) -> torch.LongTensor: + """Set difference operator, return edges in edge_all but not + in edge_except. + + Args: + edge_all (torch.LongTensor): (2, E1) tensor of edge indices. + edge_except (torch.LongTensor): (2, E2) tensor of edge indices to be + excluded from edge_all. + num_nodes (int): total number of nodes. + + Returns: + torch.LongTensor: Edge indices in edge_include but not in edge_except. + """ + # flatten (i, j) edge representations. + idx_include = edge_include[0] * num_nodes + edge_include[1] + idx_except = edge_except[0] * num_nodes + edge_except[1] + # filter out edges in idx_except. + mask = torch.from_numpy(np.isin(idx_include, idx_except)).to(torch.bool) + idx_kept = idx_include[~mask] + i = idx_kept // num_nodes + j = idx_kept % num_nodes + return torch.stack([i, j], dim=0).long() + + +def gen_negative_edges(edge_index: torch.LongTensor, + num_neg_per_node: int, + num_nodes: int) -> torch.LongTensor: + """Generates a fixed number of negative edges for each node. + + Args: + edge_index (torch.LongTensor): (2, E) array of positive edges. + num_neg_per_node (int): 'approximate' number of negative edges generated + for each source node in edge_index. + num_nodes (int): total number of nodes. + + Returns: + torch.LongTensor: approximate num_nodes * num_neg_per_node + negative edges. + """ + src_lst = torch.unique(edge_index[0]) # get unique senders. + num_neg_per_node = int(1.2 * num_neg_per_node) # add some redundancy. + i = src_lst.repeat_interleave(num_neg_per_node) + j = torch.Tensor(np.random.choice(num_nodes, len(i), replace=True)) + # candidates for negative edges, X candidates from each src. + candidates = torch.stack([i, j], dim=0).long() + # filter out positive edges in candidate. + neg_edge_index = edge_index_difference(candidates, edge_index.to('cpu'), + num_nodes) + return neg_edge_index + + +@torch.no_grad() +def fast_batch_mrr(edge_label_index: torch.Tensor, + edge_label: torch.Tensor, + pred_score: torch.Tensor, + num_neg_per_node: int, + num_nodes: int, + method: str) -> float: + """ + A vectorized implementation to compute average rank-based metrics over + all source nodes. + + Args: + edge_label_index: + edge_label: + pred_score: P(edge i is positive) from the model. + num_neg_per_node: number of negative edges per node. + num_nodes: total number of nodes in the graph. + + Returns: + the MRR for all nodes. + """ + # A list of source nodes to consider. + src_lst = torch.unique(edge_label_index[0], sorted=True) + num_users = len(src_lst) + + edge_pos = edge_label_index[:, edge_label == 1] + edge_neg = edge_label_index[:, edge_label == 0] + + # By construction, negative edge index should be sorted by their src nodes. + assert torch.all(edge_neg[0].sort()[0] == edge_neg[0]) + + # Prediction scores of all positive and negative edges. + p_pos = pred_score[edge_label == 1] + p_neg = pred_score[edge_label == 0] + + # For each player src, compute the highest score among all positive edges + # from src. + # We want to compute the rank of this edge. + # Construct an interval of model's performance. + if method == 'mean': + best_p_pos = scatter_mean(src=p_pos, index=edge_pos[0], + dim_size=num_nodes) + elif method == 'min': + best_p_pos, _ = scatter_min(src=p_pos, index=edge_pos[0], + dim_size=num_nodes) + elif method == 'max': + # The default setting, consider the rank of the most confident edge. + best_p_pos, _ = scatter_max(src=p_pos, index=edge_pos[0], + dim_size=num_nodes) + else: + raise ValueError(f'Unrecognized method: {method}.') + # best_p_pos has shape (num_nodes), for nodes not in src_lst has value 0. + best_p_pos_by_user = best_p_pos[src_lst] + + uni, counts = torch.unique(edge_neg[0], sorted=True, return_counts=True) + # note: edge_neg (src, dst) are sorted by src. + # find index of first occurrence of each src in edge_neg[0]. + # neg edges[0], [1,1,...1, 2, 2, ... 2, 3, ..] + first_occ_idx = torch.cumsum(counts, dim=0) - counts + add = torch.arange(num_neg_per_node, device=first_occ_idx.device) + + # take the first 100 negative edges from each src. + score_idx = first_occ_idx.view(-1, 1) + add.view(1, -1) + + assert torch.all(edge_neg[0][score_idx].float().std(axis=1) == 0) + + p_neg_by_user = p_neg[score_idx] # (num_users, num_neg_per_node) + compare = (p_neg_by_user >= best_p_pos_by_user.view(num_users, 1)).float() + assert compare.shape == (num_users, num_neg_per_node) + # compare[i, j], for node i, the j-th negative edge's score > p_best. + + # counts 1 + how many negative edge from src has higher score than p_best. + # if there's no such negative edge, rank is 1. + rank_by_user = compare.sum(axis=1) + 1 # (num_users,) + assert rank_by_user.shape == (num_users,) + + mrr = float(torch.mean(1 / rank_by_user)) + return mrr + + +def get_row_MRR(probs, true_classes): + existing_mask = true_classes == 1 + # descending in probability for all edge predictions. + ordered_indices = np.flip(probs.argsort()) + # indicators of positive/negative, in prob desc order. + ordered_existing_mask = existing_mask[ordered_indices] + # [1, 2, ... ][ordered_existing_mask] + # prob rank of positive edges. + existing_ranks = np.arange(1, true_classes.shape[0] + 1, + dtype=np.float)[ordered_existing_mask] + # average 1/rank of positive edges. + MRR = (1 / existing_ranks).sum() / existing_ranks.shape[0] + return MRR + + +@torch.no_grad() +def report_MRR_all(eval_batch: deepsnap.graph.Graph, + model: torch.nn.Module) -> float: + # Get positive edge indices. + edge_index = eval_batch.edge_label_index[:, eval_batch.edge_label == 1] + edge_index = edge_index.to('cpu') + num_nodes = eval_batch.num_nodes + src_of_pos_edges = torch.unique(edge_index[0]).numpy() + + all_edges_idx = np.arange(num_nodes) + all_edges_idx = np.array(np.meshgrid(all_edges_idx, + all_edges_idx)).reshape(2, -1) + all_edges_idx = torch.LongTensor(all_edges_idx) + # Get all O(N^2) negative edges. + neg_edge_index = edge_index_difference( + all_edges_idx, edge_index, num_nodes) + # Only keep negative edges share src node with some positive edges. + mask = np.isin(neg_edge_index[0], src_of_pos_edges) + neg_edge_index = neg_edge_index[:, mask] + + new_edge_label_index = torch.cat((edge_index, neg_edge_index), + dim=1).long() + new_edge_label = torch.cat((torch.ones(edge_index.shape[1]), + torch.zeros(neg_edge_index.shape[1]) + ), dim=0).long() + + # Construct evaluation samples. + eval_batch.edge_label_index = new_edge_label_index + eval_batch.edge_label = new_edge_label + + eval_batch.to(torch.device(cfg.device)) + # move state to gpu + for layer in range(len(eval_batch.node_states)): + if torch.is_tensor(eval_batch.node_states[layer]): + eval_batch.node_states[layer] = eval_batch.node_states[layer].to( + torch.device(cfg.device)) + pred, true = model(eval_batch) + loss, pred_score = compute_loss(pred, true) + + probs = pred_score.cpu().numpy().squeeze() + true = true.cpu().numpy() + + xi = new_edge_label_index[0].cpu().numpy() + xj = new_edge_label_index[1].cpu().numpy() + # pred_matrix = coo_matrix((probs, (xi, xj))).toarray() + # true_matrix = coo_matrix((true, (xi, xj))).toarray() + + row_MRRs = [] + for src in src_of_pos_edges: + mask = np.argwhere(xi == src) + pred_row = probs.take(mask).squeeze() + true_row = true.take(mask).squeeze() + row_MRRs.append(get_row_MRR(pred_row, true_row)) + + avg_MRR = torch.tensor(row_MRRs).mean() + return float(avg_MRR) + + +def compute_MRR(eval_batch: deepsnap.graph.Graph, + model: torch.nn.Module, + num_neg_per_node: int, + method: str) -> float: + """Computes the MRR score on the evaluation batch. + + Args: + eval_batch (deepsnap.graph.Graph): a graph snapshot. + model (torch.nn.Module): a GNN model for this graph snapshot + num_neg_per_node (int): how many negative edges per node required for + computing the MRR score. + For example, if num_neg_per_node = 1000, this method firstly + sample 1,000 negative edges for each source node, and compute the + average rank of positive edges from each source node among these + 1,000 sampled negative edges. + Setting num_neg_per_node = -1 to use all possible negative edges. + method (str): {'min', 'mean', 'max', 'all'} + All methods firstly compute MRR for each source node, and then + average MRRs over all source nodes. + For each source node v, + let P denote scores of all positive edges from v, the rank() + operator computes the rank among all negative edges from v. + 'min' computes 1/rank(min(P)) + 'mean' computes 1/rank(mean(P)) + 'max' computes 1/rank(max(P)) + 'all' computes mean[1/rank(x) for x in P] + """ + if method == 'all': + # NOTE: this method requires iterating over all nodes, which is slow. + assert num_neg_per_node == -1 + return report_MRR_all(eval_batch, model) + else: + assert num_neg_per_node > 0 + # Sample negative edges for each node. + edge_index = eval_batch.edge_label_index[:, eval_batch.edge_label == 1] + edge_index = edge_index.to('cpu') + + neg_edge_index = gen_negative_edges(edge_index, num_neg_per_node, + num_nodes=eval_batch.num_nodes) + + new_edge_label_index = torch.cat((edge_index, neg_edge_index), + dim=1).long() + new_edge_label = torch.cat((torch.ones(edge_index.shape[1]), + torch.zeros(neg_edge_index.shape[1]) + ), dim=0).long() + + # Construct evaluation samples. + eval_batch.edge_label_index = new_edge_label_index + eval_batch.edge_label = new_edge_label + + eval_batch.to(torch.device(cfg.device)) + # move state to gpu + for layer in range(len(eval_batch.node_states)): + if torch.is_tensor(eval_batch.node_states[layer]): + eval_batch.node_states[layer] = eval_batch.node_states[layer].to( + torch.device(cfg.device)) + pred, true = model(eval_batch) + loss, pred_score = compute_loss(pred, true) + + mrr = fast_batch_mrr(eval_batch.edge_label_index, + eval_batch.edge_label, + pred_score, + num_neg_per_node, + eval_batch.num_nodes, + method) + return mrr diff --git a/graphgym/loader.py b/graphgym/loader.py index c4e810ac..402e8062 100644 --- a/graphgym/loader.py +++ b/graphgym/loader.py @@ -166,12 +166,23 @@ def transform_before_split(dataset): return dataset -def transform_after_split(datasets): +def transform_after_split(datasets, dataset): ''' Dataset transformation after train/val/test split :param dataset: A list of DeepSNAP dataset objects :return: A list of transformed DeepSNAP dataset objects ''' + if cfg.dataset.link_pred_all_edges: + for t in range(len(datasets[2])): + g = datasets[2].graphs[t] + neg = g.negative_sampling(dataset[t].edge_index, + dataset[t].num_nodes, + dataset[t].edge_index.shape[1]) + pos = dataset[t].edge_index + g.edge_label_index = torch.cat((neg, pos), dim=1) + g.edge_label = torch.cat((torch.zeros(neg.shape[1]), + torch.ones(pos.shape[1]))) + if cfg.dataset.transform == 'ego': for split_dataset in datasets: split_dataset.apply_transform(ego_nets, @@ -190,18 +201,18 @@ def transform_after_split(datasets): def create_dataset(): - ## Load dataset + # Load dataset time1 = time.time() if cfg.dataset.format == 'OGB': graphs, splits = load_dataset() else: graphs = load_dataset() - ## Filter graphs + # Filter graphs time2 = time.time() min_node = filter_graphs() - ## Create whole dataset + # Create whole dataset dataset = GraphDataset( graphs, task=cfg.dataset.task, @@ -211,10 +222,10 @@ def create_dataset(): resample_disjoint=cfg.dataset.resample_disjoint, minimum_node_per_graph=min_node) - ## Transform the whole dataset + # Transform the whole dataset dataset = transform_before_split(dataset) - ## Split dataset + # Split dataset time3 = time.time() # Use custom data splits if cfg.dataset.format == 'OGB': @@ -232,14 +243,14 @@ def create_dataset(): for i in range(1, len(datasets)): dataset.edge_negative_sampling_ratio = 1 - ## Transform each split dataset + # Transform each split dataset time4 = time.time() - datasets = transform_after_split(datasets) + datasets = transform_after_split(datasets, dataset) time5 = time.time() logging.info('Load: {:.4}s, Before split: {:.4}s, ' 'Split: {:.4}s, After split: {:.4}s'.format( - time2 - time1, time3 - time2, time4 - time3, time5 - time4)) + time2 - time1, time3 - time2, time4 - time3, time5 - time4)) return datasets diff --git a/graphgym/models/layer_recurrent.py b/graphgym/models/layer_recurrent.py new file mode 100644 index 00000000..c64f039c --- /dev/null +++ b/graphgym/models/layer_recurrent.py @@ -0,0 +1,89 @@ +""" +This file contains wrapper layers and constructors for dynamic/recurrent GNNs. +""" +from graphgym.register import register_layer +import torch +import torch.nn as nn +import torch.nn.functional as F +from graphgym.config import cfg +from graphgym.models.act import act_dict +from graphgym.models.layer import layer_dict +from graphgym.models.update import update_dict + + +class GeneralRecurrentLayer(nn.Module): + """ + The recurrent graph layer for snapshot-based dynamic graphs. + This layer requires + (1): a GNN block for message passing. + (2): a node embedding/state update module. + + This layer updates node embedding as the following: + h[l, t] = Update(h[l, t-1], GNN(h[l-1, t])). + + This layer corresponds to a particular l-th layer in multi-layer setting, + the layer id is specified by 'id' in '__init__'. + """ + def __init__(self, name: str, dim_in: int, dim_out: int, has_act: bool=True, + has_bn: bool=True, has_l2norm: bool=False, layer_id: int=0, + **kwargs): + """ + Args: + name (str): The name of GNN layer to use for message-passing. + dim_in (int): Dimension of input node feature. + dim_out (int): Dimension of updated embedding. + has_act (bool, optional): Whether to after message passing. + Defaults to True. + has_bn (bool, optional): Whether add batch normalization for + node embedding. Defaults to True. + has_l2norm (bool, optional): Whether to add L2-normalization for + message passing result. Defaults to False. + layer_id (int, optional): The layer id in multi-layer setting. + Defaults to 0. + """ + super(GeneralRecurrentLayer, self).__init__() + self.has_l2norm = has_l2norm + if layer_id < 0: + raise ValueError(f'layer_id must be non-negative, got {layer_id}.') + self.layer_id = layer_id + has_bn = has_bn and cfg.gnn.batchnorm + self.dim_in = dim_in + self.dim_out = dim_out + # Construct the internal GNN layer. + self.layer = layer_dict[name](dim_in, dim_out, + bias=not has_bn, **kwargs) + layer_wrapper = [] + if has_bn: + layer_wrapper.append(nn.BatchNorm1d( + dim_out, eps=cfg.bn.eps, momentum=cfg.bn.mom)) + if cfg.gnn.dropout > 0: + layer_wrapper.append(nn.Dropout( + p=cfg.gnn.dropout, inplace=cfg.mem.inplace)) + if has_act: + layer_wrapper.append(act_dict[cfg.gnn.act]) + self.post_layer = nn.Sequential(*layer_wrapper) + + self.update = update_dict[cfg.gnn.embed_update_method](self.dim_in, + self.dim_out, + self.layer_id) + + def _init_hidden_state(self, batch): + # Initialize all node-states to zero. + if not isinstance(batch.node_states[self.layer_id], torch.Tensor): + batch.node_states[self.layer_id] = torch.zeros( + batch.node_feature.shape[0], self.dim_out).to( + batch.node_feature.device) + + def forward(self, batch): + # Message passing. + batch = self.layer(batch) + batch.node_feature = self.post_layer(batch.node_feature) + if self.has_l2norm: + batch.node_feature = F.normalize(batch.node_feature, p=2, dim=1) + + self._init_hidden_state(batch) + # Compute output from updater block. + batch = self.update(batch) + # batch.node_states[self.layer_id] = node_states_new + batch.node_feature = batch.node_states[self.layer_id] + return batch diff --git a/graphgym/models/update.py b/graphgym/models/update.py new file mode 100644 index 00000000..0d3a6520 --- /dev/null +++ b/graphgym/models/update.py @@ -0,0 +1,108 @@ +"""Embedding update modules for dynamic graphs.""" +import graphgym.register as register +import torch +import torch.nn as nn +from graphgym.models.layer import MLP + + +class MovingAverageUpdater(nn.Module): + """ + Moving average updater for node embeddings, + let h[l, t] denote all nodes' embedding at the l-th layer at snapshot t. + + h[l,t] = KeepRatio * h[l,t-1] + (1-KeepRatio) * h[l-1,t] + + where the precomputed KeepRatio at current snapshot t is node-specific, + which depends on the node's degree in all snapshots before t and nodes's + degree in snapshot at time t. + """ + + def __init__(self, dim_in: int, dim_out: int, layer_id: int) -> None: + self.layer_id = layer_id + super(MovingAverageUpdater, self).__init__() + + def forward(self, batch): + # TODO: check if boardcasting is correct. + H_prev = batch.node_states[self.layer_id] + X = batch.node_feature + H_new = H_prev * batch.keep_ratio + X * (1.0 - batch.keep_ratio) + batch.node_states[self.layer_id] = H_new + return batch + + +class MLPUpdater(nn.Module): + """ + Node embedding update block using simple MLP. + + h[l,t] = MLP(concat(h[l,t-1],h[l-1,t])) + """ + + def __init__(self, dim_in: int, dim_out: int, layer_id: int, + num_layers: int): + """ + Args: + dim_in (int): dimension of h[l-1, t]. + dim_out (int): dimension of h[l, t-1], node embedding dimension of + the current layer level. + layer_id (int): the index of current layer in multi-layer setting. + num_layers (int): number of layers in MLP. + """ + super(MLPUpdater, self).__init__() + self.layer_id = layer_id + # FIXME: + # assert num_layers > 1, 'There is a problem with layer=1 now, pending fix.' + self.mlp = MLP(dim_in=dim_in + dim_out, dim_out=dim_out, + num_layers=num_layers) + + def forward(self, batch): + H_prev = batch.node_states[self.layer_id] + X = batch.node_feature + concat = torch.cat((H_prev, X), axis=1) + H_new = self.mlp(concat) + batch.node_states[self.layer_id] = H_new + return batch + + +class GRUUpdater(nn.Module): + """ + Node embedding update block using standard GRU. + + h[l,t] = GRU(h[l,t-1], h[l-1,t]) + """ + def __init__(self, dim_in: int, dim_out: int, layer_id: int): + # dim_in (dim of X): dimension of input node_feature. + # dim_out (dim of H): dimension of previous and current hidden states. + # forward(X, H) --> H. + super(GRUUpdater, self).__init__() + self.layer_id = layer_id + self.GRU_Z = nn.Sequential( + nn.Linear(dim_in + dim_out, dim_out, bias=True), + nn.Sigmoid()) + # reset gate. + self.GRU_R = nn.Sequential( + nn.Linear(dim_in + dim_out, dim_out, bias=True), + nn.Sigmoid()) + # new embedding gate. + self.GRU_H_Tilde = nn.Sequential( + nn.Linear(dim_in + dim_out, dim_out, bias=True), + nn.Tanh()) + + def forward(self, batch): + H_prev = batch.node_states[self.layer_id] + X = batch.node_feature + Z = self.GRU_Z(torch.cat([X, H_prev], dim=1)) + R = self.GRU_R(torch.cat([X, H_prev], dim=1)) + H_tilde = self.GRU_H_Tilde(torch.cat([X, R * H_prev], dim=1)) + H_gru = Z * H_prev + (1 - Z) * H_tilde + batch.node_states[self.layer_id] = H_gru + return batch + + +update_dict = { + 'moving_average': MovingAverageUpdater, + 'mlp': MLPUpdater, + 'gru': GRUUpdater +} + +# merge additional update modules in register.update_dict. +update_dict = {**register.update_dict, **update_dict} diff --git a/graphgym/register.py b/graphgym/register.py index 32d64b6d..cfa95ade 100644 --- a/graphgym/register.py +++ b/graphgym/register.py @@ -33,6 +33,10 @@ def register_head(key, module): def register_layer(key, module): register(key, module, layer_dict) +update_dict = {} +def register_update(key, module): + register(key, module, update_dict) + pooling_dict = {} def register_pooling(key, module): register(key, module, pooling_dict) diff --git a/graphgym/utils/stats.py b/graphgym/utils/stats.py new file mode 100644 index 00000000..4b97b3b6 --- /dev/null +++ b/graphgym/utils/stats.py @@ -0,0 +1,14 @@ +import torch + + +def node_degree(edge_index, n=None, mode='in'): + if mode == 'in': + index = edge_index[0, :] + elif mode == 'out': + index = edge_index[1, :] + else: + index = edge_index.flatten() + n = edge_index.max() + 1 if n is None else n + degree = torch.zeros(n) + ones = torch.ones(index.shape[0]) + return degree.scatter_add_(0, index, ones) diff --git a/requirements.txt b/requirements.txt index f2111231..0c1a0d8c 100644 --- a/requirements.txt +++ b/requirements.txt @@ -4,11 +4,15 @@ torch torch-scatter torch-geometric deepsnap +dask_ml +dask[complete] ogb numpy -pandas +pandas>=1.0 scipy scikit-learn matplotlib seaborn -notebook \ No newline at end of file +notebook +tensorboard +tqdm \ No newline at end of file diff --git a/run/configs/ROLAND/roland_gru_as733.yaml b/run/configs/ROLAND/roland_gru_as733.yaml new file mode 100644 index 00000000..16901c0a --- /dev/null +++ b/run/configs/ROLAND/roland_gru_as733.yaml @@ -0,0 +1,70 @@ +remark: live_update +out_dir: results +device: auto +metric: + mrr_method: max + mrr_num_negative_edges: 1000 +dataset: + format: roland + name: AS-733 + is_hetero: False + dir: /home/tianyudu/Data/all_datasets + task: link_pred + task_type: classification + transductive: True + split: [0.8, 0.1, 0.1] + augment_feature: [] + augment_feature_dims: [0] + edge_encoder: True + edge_encoder_name: roland_general + edge_dim: 1 + node_encoder: False + link_pred_all_edges: False +transaction: + keep_ratio: linear + snapshot: True + snapshot_freq: D + check_snapshot: False + history: rolling + horizon: 1 + pred_mode: at + loss: supervised + feature_int_dim: 16 + feature_edge_int_num: [] + feature_node_int_num: [] + feature_amount_dim: 16 + feature_time_dim: 16 +train: + batch_size: 32 + eval_period: 20 + ckpt_period: 400 + mode: live_update + internal_validation_tolerance: 5 +meta: + is_meta: True + alpha: 0.5 +model: + type: gnn_recurrent + loss_fun: cross_entropy + edge_decoding: concat +gnn: + embed_update_method: gru + layers_pre_mp: 2 + layers_mp: 2 + layers_post_mp: 2 + dim_inner: 128 + mlp_update_layers: 2 + layer_type: residual_edge_conv + skip_connection: affine + stage_type: stack + batchnorm: True + act: prelu + dropout: 0.0 + agg: add + att_heads: 1 + normalize_adj: False + msg_direction: both +optim: + optimizer: adam + base_lr: 0.03 + max_epoch: 100 \ No newline at end of file diff --git a/run/configs/ROLAND/roland_gru_btcalpha.yaml b/run/configs/ROLAND/roland_gru_btcalpha.yaml new file mode 100644 index 00000000..5b5ed5e7 --- /dev/null +++ b/run/configs/ROLAND/roland_gru_btcalpha.yaml @@ -0,0 +1,70 @@ +remark: live_update +out_dir: results +device: auto +metric: + mrr_method: max + mrr_num_negative_edges: 1000 +dataset: + format: roland + name: bitcoinalpha.csv + is_hetero: False + dir: /home/tianyudu/Data/all_datasets + task: link_pred + task_type: classification + transductive: True + split: [0.8, 0.1, 0.1] + augment_feature: [] + augment_feature_dims: [0] + edge_encoder: True + edge_encoder_name: roland_general + edge_dim: 2 + node_encoder: False + link_pred_all_edges: False +transaction: + keep_ratio: linear + snapshot: True + snapshot_freq: W + check_snapshot: False + history: rolling + horizon: 1 + pred_mode: at + loss: supervised + feature_int_dim: 16 + feature_edge_int_num: [] + feature_node_int_num: [] + feature_amount_dim: 16 + feature_time_dim: 16 +train: + batch_size: 32 + eval_period: 20 + ckpt_period: 400 + mode: live_update + internal_validation_tolerance: 5 +meta: + is_meta: True + alpha: 0.8 +model: + type: gnn_recurrent + loss_fun: cross_entropy + edge_decoding: concat +gnn: + embed_update_method: gru + layers_pre_mp: 2 + layers_mp: 2 + layers_post_mp: 2 + dim_inner: 64 + mlp_update_layers: 2 + layer_type: residual_edge_conv + skip_connection: affine + stage_type: stack + batchnorm: False + act: prelu + dropout: 0.0 + agg: add + att_heads: 1 + normalize_adj: False + msg_direction: both +optim: + optimizer: adam + base_lr: 0.003 + max_epoch: 100 \ No newline at end of file diff --git a/run/configs/ROLAND/roland_gru_btcotc.yaml b/run/configs/ROLAND/roland_gru_btcotc.yaml new file mode 100644 index 00000000..fafc3cb1 --- /dev/null +++ b/run/configs/ROLAND/roland_gru_btcotc.yaml @@ -0,0 +1,70 @@ +remark: live_update +out_dir: results +device: auto +metric: + mrr_method: max + mrr_num_negative_edges: 1000 +dataset: + format: roland + name: bitcoinotc.csv + is_hetero: False + dir: /home/tianyudu/Data/all_datasets + task: link_pred + task_type: classification + transductive: True + split: [0.8, 0.1, 0.1] + augment_feature: [] + augment_feature_dims: [0] + edge_encoder: True + edge_encoder_name: roland_general + edge_dim: 2 + node_encoder: False + link_pred_all_edges: False +transaction: + keep_ratio: linear + snapshot: True + snapshot_freq: W + check_snapshot: False + history: rolling + horizon: 1 + pred_mode: at + loss: supervised + feature_int_dim: 16 + feature_edge_int_num: [] + feature_node_int_num: [] + feature_amount_dim: 16 + feature_time_dim: 16 +train: + batch_size: 32 + eval_period: 20 + ckpt_period: 400 + mode: live_update + internal_validation_tolerance: 5 +meta: + is_meta: True + alpha: 0.9 +model: + type: gnn_recurrent + loss_fun: cross_entropy + edge_decoding: concat +gnn: + embed_update_method: gru + layers_pre_mp: 2 + layers_mp: 4 + layers_post_mp: 2 + dim_inner: 64 + mlp_update_layers: 2 + layer_type: residual_edge_conv + skip_connection: affine + stage_type: stack + batchnorm: False + act: prelu + dropout: 0.0 + agg: add + att_heads: 1 + normalize_adj: False + msg_direction: both +optim: + optimizer: adam + base_lr: 0.003 + max_epoch: 100 \ No newline at end of file diff --git a/run/configs/ROLAND/roland_gru_redditbody.yaml b/run/configs/ROLAND/roland_gru_redditbody.yaml new file mode 100644 index 00000000..ee2da59a --- /dev/null +++ b/run/configs/ROLAND/roland_gru_redditbody.yaml @@ -0,0 +1,70 @@ +remark: live_update +out_dir: results +device: auto +metric: + mrr_method: max + mrr_num_negative_edges: 1000 +dataset: + format: roland + name: reddit-body.tsv + is_hetero: False + dir: /home/tianyudu/Data/all_datasets + task: link_pred + task_type: classification + transductive: True + split: [0.8, 0.1, 0.1] + augment_feature: [] + augment_feature_dims: [0] + edge_encoder: True + edge_encoder_name: roland_general + edge_dim: 88 + node_encoder: False + link_pred_all_edges: False +transaction: + keep_ratio: linear + snapshot: True + snapshot_freq: W + check_snapshot: False + history: rolling + horizon: 1 + pred_mode: at + loss: supervised + feature_int_dim: 16 + feature_edge_int_num: [] + feature_node_int_num: [] + feature_amount_dim: 16 + feature_time_dim: 16 +train: + batch_size: 32 + eval_period: 20 + ckpt_period: 400 + mode: live_update + internal_validation_tolerance: 5 +meta: + is_meta: True + alpha: 0.5 +model: + type: gnn_recurrent + loss_fun: cross_entropy + edge_decoding: concat +gnn: + embed_update_method: gru + layers_pre_mp: 2 + layers_mp: 2 + layers_post_mp: 2 + dim_inner: 64 + mlp_update_layers: 2 + layer_type: residual_edge_conv + skip_connection: affine + stage_type: stack + batchnorm: True + act: prelu + dropout: 0.0 + agg: add + att_heads: 1 + normalize_adj: False + msg_direction: both +optim: + optimizer: adam + base_lr: 0.003 + max_epoch: 100 \ No newline at end of file diff --git a/run/configs/ROLAND/roland_gru_reddittitle.yaml b/run/configs/ROLAND/roland_gru_reddittitle.yaml new file mode 100644 index 00000000..e48519fd --- /dev/null +++ b/run/configs/ROLAND/roland_gru_reddittitle.yaml @@ -0,0 +1,70 @@ +remark: live_update +out_dir: results +device: auto +metric: + mrr_method: max + mrr_num_negative_edges: 1000 +dataset: + format: roland + name: reddit-title.tsv + is_hetero: False + dir: /home/tianyudu/Data/all_datasets + task: link_pred + task_type: classification + transductive: True + split: [0.8, 0.1, 0.1] + augment_feature: [] + augment_feature_dims: [0] + edge_encoder: True + edge_encoder_name: roland_general + edge_dim: 88 + node_encoder: False + link_pred_all_edges: False +transaction: + keep_ratio: linear + snapshot: True + snapshot_freq: W + check_snapshot: False + history: rolling + horizon: 1 + pred_mode: at + loss: supervised + feature_int_dim: 16 + feature_edge_int_num: [] + feature_node_int_num: [] + feature_amount_dim: 16 + feature_time_dim: 16 +train: + batch_size: 32 + eval_period: 20 + ckpt_period: 400 + mode: live_update + internal_validation_tolerance: 5 +meta: + is_meta: True + alpha: 0.1 +model: + type: gnn_recurrent + loss_fun: cross_entropy + edge_decoding: concat +gnn: + embed_update_method: gru + layers_pre_mp: 2 + layers_mp: 6 + layers_post_mp: 2 + dim_inner: 128 + mlp_update_layers: 2 + layer_type: residual_edge_conv + skip_connection: affine + stage_type: stack + batchnorm: True + act: prelu + dropout: 0.0 + agg: add + att_heads: 1 + normalize_adj: False + msg_direction: both +optim: + optimizer: adam + base_lr: 0.003 + max_epoch: 100 \ No newline at end of file diff --git a/run/configs/ROLAND/roland_gru_ucimsg.yaml b/run/configs/ROLAND/roland_gru_ucimsg.yaml new file mode 100644 index 00000000..441cb0c9 --- /dev/null +++ b/run/configs/ROLAND/roland_gru_ucimsg.yaml @@ -0,0 +1,70 @@ +remark: live_update +out_dir: results +device: auto +metric: + mrr_method: max + mrr_num_negative_edges: 1000 +dataset: + format: roland + name: CollegeMsg.txt + is_hetero: False + dir: /home/tianyudu/Data/all_datasets + task: link_pred + task_type: classification + transductive: True + split: [0.8, 0.1, 0.1] + augment_feature: [] + augment_feature_dims: [0] + edge_encoder: True + edge_encoder_name: roland_general + edge_dim: 1 + node_encoder: False + link_pred_all_edges: False +transaction: + keep_ratio: linear + snapshot: True + snapshot_freq: W + check_snapshot: False + history: rolling + horizon: 1 + pred_mode: at + loss: supervised + feature_int_dim: 16 + feature_edge_int_num: [] + feature_node_int_num: [] + feature_amount_dim: 16 + feature_time_dim: 16 +train: + batch_size: 32 + eval_period: 20 + ckpt_period: 400 + mode: live_update + internal_validation_tolerance: 5 +meta: + is_meta: True + alpha: 0.5 +model: + type: gnn_recurrent + loss_fun: cross_entropy + edge_decoding: concat +gnn: + embed_update_method: gru + layers_pre_mp: 2 + layers_mp: 8 + layers_post_mp: 2 + dim_inner: 64 + mlp_update_layers: 2 + layer_type: residual_edge_conv + skip_connection: affine + stage_type: stack + batchnorm: True + act: prelu + dropout: 0.0 + agg: add + att_heads: 1 + normalize_adj: False + msg_direction: both +optim: + optimizer: adam + base_lr: 0.01 + max_epoch: 100 \ No newline at end of file diff --git a/run/configs/ROLAND/roland_hetero.yaml b/run/configs/ROLAND/roland_hetero.yaml new file mode 100644 index 00000000..23bf9feb --- /dev/null +++ b/run/configs/ROLAND/roland_hetero.yaml @@ -0,0 +1,70 @@ +remark: live_update +out_dir: results +device: auto +metric: + mrr_method: max + mrr_num_negative_edges: 1000 +dataset: + format: roland + name: bsi_svt_2008.tsv + is_hetero: True + dir: /home/tianyudu/Data/all_datasets + task: link_pred # edge, node. + task_type: classification + transductive: True + split: [0.8, 0.1, 0.1] + augment_feature: [] + augment_feature_dims: [0] + edge_encoder: True + edge_encoder_name: roland_general + edge_dim: 1 + node_encoder: False + link_pred_all_edges: False +transaction: + keep_ratio: linear + snapshot: True + snapshot_freq: W + check_snapshot: False + history: rolling + horizon: 1 + pred_mode: at + loss: supervised + feature_int_dim: 16 + feature_edge_int_num: [] + feature_node_int_num: [] + feature_amount_dim: 16 # * + feature_time_dim: 16 # * +train: + batch_size: 32 + eval_period: 20 + ckpt_period: 400 + mode: live_update + internal_validation_tolerance: 5 # * +meta: + is_meta: True + alpha: 0.5 # * +model: + type: gnn_recurrent + loss_fun: cross_entropy + edge_decoding: concat +gnn: + embed_update_method: gru # * + layers_pre_mp: 2 # * + layers_mp: 8 # * + layers_post_mp: 2 # * + dim_inner: 64 # * + mlp_update_layers: 2 # * + layer_type: residual_edge_conv # * + skip_connection: affine # * + stage_type: stack + batchnorm: True # * + act: prelu + dropout: 0.0 + agg: add # * + att_heads: 1 + normalize_adj: False + msg_direction: both # * +optim: + optimizer: adam + base_lr: 0.01 # * + max_epoch: 100 \ No newline at end of file diff --git a/run/configs/ROLAND/roland_mlp_bsisvt.yaml b/run/configs/ROLAND/roland_mlp_bsisvt.yaml new file mode 100644 index 00000000..35949287 --- /dev/null +++ b/run/configs/ROLAND/roland_mlp_bsisvt.yaml @@ -0,0 +1,71 @@ +remark: live_update +out_dir: results +device: auto +metric: + mrr_method: max + mrr_num_negative_edges: 1000 +dataset: + format: roland + name: bsi_svt_2008.tsv + is_hetero: False + dir: /home/tianyudu/Data/all_datasets + task: link_pred + task_type: classification + transductive: True + split: [0.8, 0.1, 0.1] + augment_feature: [] + augment_feature_dims: [0] + edge_encoder: True + edge_encoder_name: roland + edge_dim: 2 + node_encoder: True + node_encoder_name: roland + link_pred_all_edges: False +transaction: + keep_ratio: linear + snapshot: True + snapshot_freq: W + check_snapshot: False + history: rolling + horizon: 1 + pred_mode: at + loss: supervised + feature_int_dim: 16 + feature_edge_int_num: [] + feature_node_int_num: [1018, 33, 13, 23, 5] + feature_amount_dim: 16 + feature_time_dim: 16 +train: + batch_size: 32 + eval_period: 20 + ckpt_period: 400 + mode: live_update + internal_validation_tolerance: 5 +meta: + is_meta: True + alpha: 0.4 +model: + type: gnn_recurrent + loss_fun: cross_entropy + edge_decoding: concat +gnn: + embed_update_method: gru + layers_pre_mp: 2 + layers_mp: 4 + layers_post_mp: 2 + dim_inner: 128 + mlp_update_layers: 2 + layer_type: residual_edge_conv + skip_connection: affine + stage_type: stack + batchnorm: True + act: prelu + dropout: 0.0 + agg: add + att_heads: 1 + normalize_adj: False + msg_direction: both +optim: + optimizer: adam + base_lr: 0.003 + max_epoch: 100 \ No newline at end of file diff --git a/run/grids/ROLAND/example_grid.txt b/run/grids/ROLAND/example_grid.txt new file mode 100644 index 00000000..d2c3291f --- /dev/null +++ b/run/grids/ROLAND/example_grid.txt @@ -0,0 +1,7 @@ +meta.is_meta is_meta [True] +meta.alpha alpha [0.2,0.4,0.6,0.8,1.0] +gnn.skip_connection skip ['affine','identity','none'] +gnn.embed_update_method update ['gru'] +gnn.layers_mp mp [2,3] +gnn.batchnorm bn [True] +optim.base_lr lr [0.003,0.01,0.03] \ No newline at end of file diff --git a/run/main_dynamic.py b/run/main_dynamic.py new file mode 100644 index 00000000..cb52e1a3 --- /dev/null +++ b/run/main_dynamic.py @@ -0,0 +1,71 @@ +import logging +import os +import random +import warnings +from datetime import datetime +from itertools import product + +import numpy as np +import torch +from graphgym.cmd_args import parse_args +from graphgym.config import (assert_cfg, cfg, dump_cfg, get_parent_dir, + update_out_dir) +from graphgym.contrib.train import * +from graphgym.loader import create_dataset, create_loader +from graphgym.logger import create_logger, setup_printing +from graphgym.model_builder import create_model +from graphgym.optimizer import create_optimizer, create_scheduler +from graphgym.register import train_dict +from graphgym.train import train +from graphgym.utils.agg_runs import agg_runs +from graphgym.utils.comp_budget import params_count +from graphgym.utils.device import auto_select_device + +os.environ['MPLCONFIGDIR'] = "/tmp" + + +if __name__ == '__main__': + # Load cmd line args + args = parse_args() + # Repeat for different random seeds + for i in range(args.repeat): + # Load config file + cfg.merge_from_file(args.cfg_file) + cfg.merge_from_list(args.opts) + assert_cfg(cfg) + # Set Pytorch environment + torch.set_num_threads(cfg.num_threads) + out_dir_parent = cfg.out_dir + cfg.seed = i + 1 + random.seed(cfg.seed) + np.random.seed(cfg.seed) + torch.manual_seed(cfg.seed) + update_out_dir(out_dir_parent, args.cfg_file) + dump_cfg(cfg) + setup_printing() + auto_select_device() + + # Set learning environment + datasets = create_dataset() + + cfg.dataset.num_nodes = datasets[0][0].num_nodes + loaders = create_loader(datasets) + meters = create_logger(datasets, loaders) + + model = create_model(datasets) + # breakpoint() + optimizer = create_optimizer(model.parameters()) + scheduler = create_scheduler(optimizer) + # Print model info + logging.info(model) + logging.info(cfg) + cfg.params = params_count(model) + logging.info('Num parameters: {}'.format(cfg.params)) + # Start training + if cfg.train.mode == 'live_update': + train_dict[cfg.train.mode]( + meters, loaders, model, optimizer, scheduler, datasets=datasets) + + # When being launched in batch mode, mark a yaml as done + if args.mark_done: + os.rename(args.cfg_file, '{}_done'.format(args.cfg_file)) diff --git a/run/run_roland_batch.sh b/run/run_roland_batch.sh new file mode 100644 index 00000000..398fa830 --- /dev/null +++ b/run/run_roland_batch.sh @@ -0,0 +1,21 @@ +#!/usr/bin/env bash + +CONFIG=roland_gru_ucimsg +GRID=example_grid +REPEAT=3 +MAX_JOBS=10 +SLEEP=1 + +python configs_gen.py --config configs/ROLAND/${CONFIG}.yaml \ + --grid grids/ROLAND/${GRID}.txt \ + --out_dir configs +# run batch of configs +# Args: config_dir, num of repeats, max jobs running, sleep time +bash parallel.sh configs/${CONFIG}_grid_${GRID} $REPEAT $MAX_JOBS $SLEEP +# rerun missed / stopped experiments +bash parallel.sh configs/${CONFIG}_grid_${GRID} $REPEAT $MAX_JOBS $SLEEP +# rerun missed / stopped experiments +bash parallel.sh configs/${CONFIG}_grid_${GRID} $REPEAT $MAX_JOBS $SLEEP + +# aggregate results for the batch +python agg_batch.py --dir results/${CONFIG}_grid_${GRID} diff --git a/run/run_roland_single.sh b/run/run_roland_single.sh new file mode 100644 index 00000000..b10c5533 --- /dev/null +++ b/run/run_roland_single.sh @@ -0,0 +1,13 @@ +#!/usr/bin/env bash + +# python3 main_dynamic.py --cfg configs/ROLAND/roland_gru_btcalpha.yaml --repeat 1 + +# python3 main_dynamic.py --cfg configs/ROLAND/roland_gru_btcotc.yaml --repeat 1 + +# python3 main_dynamic.py --cfg configs/ROLAND/roland_gru_ucimsg.yaml --repeat 1 + +# python3 main_dynamic.py --cfg configs/ROLAND/roland_gru_reddittitle.yaml --repeat 1 + +# python3 main_dynamic.py --cfg configs/ROLAND/roland_gru_redditbody.yaml --repeat 1 + +python3 main_dynamic.py --cfg configs/ROLAND/roland_mlp_bsisvt.yaml --repeat 1 \ No newline at end of file diff --git a/run/tabulate_events.py b/run/tabulate_events.py new file mode 100644 index 00000000..42e640cb --- /dev/null +++ b/run/tabulate_events.py @@ -0,0 +1,116 @@ +""" +A simple utility that generates performance report for different model on +different datasets. + +This script works for live-update scheme only, use graphgym's native analyze +tools for rolling/fixed-split scheme. +""" +import os +import sys +from typing import List + +import numpy as np +import pandas as pd +import yaml +from tensorboard.backend.event_processing.event_accumulator import \ + EventAccumulator +from tqdm import tqdm + + +def squeeze_dict(old_dict: dict) -> dict: + """Squeezes nested dictionary keys. + Example: old_dict['key1'] = {'key2': 'hello'}. + will generate new_dict['key1.key2'] = 'hello'. + """ + new_dict = dict() + for k1 in old_dict.keys(): + if isinstance(old_dict[k1], dict): + for k2 in old_dict[k1].keys(): + new_key = k1 + '.' + k2 + new_dict[new_key] = old_dict[k1][k2] + else: + new_dict[k1] = old_dict[k1] + return new_dict + + +def tabulate_events(logdir: str, variables: List[str]) -> pd.DataFrame: + """ + Generates a pandas dataframe which contains experiment (runs) as its rows, + the returned dataframe contains columns: + (1) File name/path of that run. + (2) Fields required in `variables' from corresponding config.yaml. + (3) Test and validation set performance (MRR and Recall at k). + """ + all_runs = list() + count = 0 # count number of experiment runs processed. + + for run_dir in tqdm(os.listdir(logdir)): + if run_dir.startswith('.'): + # Ignore hidden files. + continue + + if not os.path.isdir(os.path.join(logdir, run_dir)): + # Ignore other things such as generated tables. + print(run_dir) + continue + + count += 1 + + config_dir = os.path.join(logdir, run_dir, 'config.yaml') + with open(config_dir) as file: + config = yaml.full_load(file) + config = squeeze_dict(config) + + current_run = {'run': run_dir} + for var in variables: + # record required variables in config.yaml. + current_run[var] = config[var] + + # for metric in ['test_mrr', 'test_rck1', 'test_rck3', 'test_rck10', + # 'test_loss', + # 'val_mrr', 'val_rck1', 'val_rck3', 'val_rck10', + # 'val_loss']: + for metric in ['test_mrr']: + event_path = os.path.join(logdir, run_dir, metric) + # print(f'Processing event file {event_path}') + + ea = EventAccumulator(event_path).Reload() + + tag_values = [] + steps = [] + + x = 'test' if metric.startswith('test') else 'val' + for event in ea.Scalars(x): + # Each (value, step) corresponds to a (value, snapshot). + tag_values.append(event.value) + steps.append(event.step) + + current_run['average_' + metric] = np.mean(tag_values) + # current_run: one row in the aggregated dataset. + all_runs.append(current_run) + print(f'exported {count} experiments.') + return pd.DataFrame(all_runs) + + +if __name__ == '__main__': + # 1. directory of baseline experiment set. + # 2. directory of fine-tuning experiment, our model + all datasets. + # 3. directory of output tables and files. + path, out_dir = sys.argv[1], sys.argv[2] + # fields from config.yaml to be included as columns, + # doesn't hurt to add more columns. + variables = ['dataset.format', 'dataset.name', + 'dataset.AS_node_feature', + 'gnn.layer_type', 'gnn.batchnorm', 'gnn.layers_mp', + 'gnn.layers_post_mp', + 'gnn.layers_pre_mp', + 'gnn.skip_connection', 'gnn.embed_update_method', + 'optim.base_lr', + 'transaction.feature_int_dim', + 'gnn.agg', 'train.mode', + 'gnn.msg_direction', + 'train.internal_validation_tolerance', 'gnn.dim_inner', + 'meta.is_meta', 'meta.method', 'meta.alpha', + 'transaction.snapshot_freq', 'gnn.embed_update_method'] + df = tabulate_events(path, variables) + df.to_csv(out_dir)