From 0b1ac041eaf1cab73aee35de67e9f952f9b035cb Mon Sep 17 00:00:00 2001 From: Jake-Moss Date: Wed, 13 Mar 2024 20:01:34 +1000 Subject: [PATCH] Move NetworkGraphIndices dataclass, add node mapping, extend API untested --- aequilibrae/paths/graph.py | 41 +++++- .../paths/results/assignment_results.py | 21 +-- aequilibrae/paths/route_choice.py | 126 +++++++++++++++++- aequilibrae/paths/route_choice_set.pyx | 2 +- 4 files changed, 163 insertions(+), 27 deletions(-) diff --git a/aequilibrae/paths/graph.py b/aequilibrae/paths/graph.py index f28834151..a8ee6a2ce 100644 --- a/aequilibrae/paths/graph.py +++ b/aequilibrae/paths/graph.py @@ -4,6 +4,7 @@ from datetime import datetime from os.path import join from typing import List, Tuple, Optional +import dataclasses import numpy as np import pandas as pd @@ -12,6 +13,26 @@ from aequilibrae.context import get_logger +@dataclasses.dataclass +class NetworkGraphIndices: + network_ab_idx: np.array + network_ba_idx: np.array + graph_ab_idx: np.array + graph_ba_idx: np.array + + +def _get_graph_to_network_mapping(lids, direcs): + num_uncompressed_links = int(np.unique(lids).shape[0]) + indexing = np.zeros(int(lids.max()) + 1, np.uint64) + indexing[np.unique(lids)[:]] = np.arange(num_uncompressed_links) + + graph_ab_idx = direcs > 0 + graph_ba_idx = direcs < 0 + network_ab_idx = indexing[lids[graph_ab_idx]] + network_ba_idx = indexing[lids[graph_ba_idx]] + return NetworkGraphIndices(network_ab_idx, network_ba_idx, graph_ab_idx, graph_ba_idx) + + class GraphBase(ABC): # noqa: B024 """ Graph class. @@ -173,6 +194,7 @@ def prepare_graph(self, centroids: Optional[np.ndarray]) -> None: # The cache property should be recalculated when the graph has been re-prepared self.compressed_link_network_mapping_idx = None self.compressed_link_network_mapping_data = None + self.network_compressed_node_mapping = None def __build_compressed_graph(self): build_compressed_graph(self) @@ -535,8 +557,13 @@ def create_compressed_link_network_mapping(self): if ( self.compressed_link_network_mapping_idx is not None and self.compressed_link_network_mapping_data is not None + and self.network_compressed_node_mapping is not None ): - return self.compressed_link_network_mapping_idx, self.compressed_link_network_mapping_data + return ( + self.compressed_link_network_mapping_idx, + self.compressed_link_network_mapping_data, + self.network_compressed_node_mapping, + ) # This method requires that graph.graph is sorted on the a_node IDs, since that's done already we don't # bother redoing sorting it. This method would be faster using a Cython module but it's a one time compute @@ -548,6 +575,8 @@ def create_compressed_link_network_mapping(self): idx = np.zeros(self.compact_num_links + 1, dtype=np.uint32) data = np.zeros(len(filtered), dtype=np.uint32) + node_mapping = np.full(self.num_nodes, -1) + i = 0 for compressed_id, df in gb: idx[compressed_id] = i @@ -559,7 +588,8 @@ def create_compressed_link_network_mapping(self): # we do this assuming the `a` array is sorted. j = 0 # Find the missing a_node, this is the starting of the chain. We cannot rely on the node ordering to do a simple lookup - x = a[np.isin(a, b, invert=True, assume_unique=True)][0] + + a_node = x = a[np.isin(a, b, invert=True, assume_unique=True)][0] while True: tmp = a.searchsorted(x) if tmp < len(a) and a[tmp] == x: @@ -569,14 +599,19 @@ def create_compressed_link_network_mapping(self): break j += 1 + b_node = x + node_mapping[a_node] = self.compact_graph["a_node"].iat[compressed_id] + node_mapping[b_node] = self.compact_graph["b_node"].iat[compressed_id] + i += len(values) idx[-1] = i self.compressed_link_network_mapping_idx = idx self.compressed_link_network_mapping_data = data + self.network_compressed_node_mapping = node_mapping - return idx, data + return idx, data, node_mapping class Graph(GraphBase): diff --git a/aequilibrae/paths/results/assignment_results.py b/aequilibrae/paths/results/assignment_results.py index 220b7033a..1c6da335a 100644 --- a/aequilibrae/paths/results/assignment_results.py +++ b/aequilibrae/paths/results/assignment_results.py @@ -1,10 +1,9 @@ -import dataclasses import multiprocessing as mp from abc import ABC, abstractmethod import numpy as np from aequilibrae.matrix import AequilibraeMatrix, AequilibraeData -from aequilibrae.paths.graph import Graph, TransitGraph, GraphBase +from aequilibrae.paths.graph import Graph, TransitGraph, GraphBase, _get_graph_to_network_mapping from aequilibrae.parameters import Parameters from aequilibrae import global_logger from pathlib import Path @@ -22,14 +21,6 @@ """ -@dataclasses.dataclass -class NetworkGraphIndices: - network_ab_idx: np.array - network_ba_idx: np.array - graph_ab_idx: np.array - graph_ba_idx: np.array - - class AssignmentResultsBase(ABC): """Assignment results base class for traffic and transit assignments.""" @@ -249,15 +240,7 @@ def total_flows(self) -> None: sum_axis1(self.total_link_loads, self.link_loads, self.cores) def get_graph_to_network_mapping(self): - num_uncompressed_links = int(np.unique(self.lids).shape[0]) - indexing = np.zeros(int(self.lids.max()) + 1, np.uint64) - indexing[np.unique(self.lids)[:]] = np.arange(num_uncompressed_links) - - graph_ab_idx = self.direcs > 0 - graph_ba_idx = self.direcs < 0 - network_ab_idx = indexing[self.lids[graph_ab_idx]] - network_ba_idx = indexing[self.lids[graph_ba_idx]] - return NetworkGraphIndices(network_ab_idx, network_ba_idx, graph_ab_idx, graph_ba_idx) + return _get_graph_to_network_mapping(self.lids, self.direcs) def get_load_results(self) -> AequilibraeData: """ diff --git a/aequilibrae/paths/route_choice.py b/aequilibrae/paths/route_choice.py index 9de098905..467ba3bd5 100644 --- a/aequilibrae/paths/route_choice.py +++ b/aequilibrae/paths/route_choice.py @@ -1,11 +1,13 @@ import numpy as np import socket from aequilibrae.context import get_active_project -from aequilibrae.paths.graph import Graph +from aequilibrae.paths.graph import Graph, _get_graph_to_network_mapping from aequilibrae.paths.route_choice_set import RouteChoiceSet -from typing import Optional +from aequilibrae.matrix import AequilibraeMatrix, AequilibraeData +from typing import Optional, Union, Tuple, List import pyarrow as pa import pathlib +import itertools import logging @@ -21,7 +23,7 @@ class RouteChoice: "max_depth": 0, } - def __init__(self, graph: Graph, project=None): + def __init__(self, graph: Graph, matrix: AequilibraeMatrix, project=None): self.paramaters = self.default_paramaters.copy() proj = project or get_active_project(must_exist=False) @@ -31,7 +33,8 @@ def __init__(self, graph: Graph, project=None): self.cores: int = 0 self.graph = graph - self.__rc = RouteChoiceSet(graph) + self.matrix = matrix + self.__rc = None self.schema = RouteChoiceSet.schema self.psl_schema = RouteChoiceSet.psl_schema @@ -41,6 +44,8 @@ def __init__(self, graph: Graph, project=None): self.results: Optional[pa.Table] = None self.where: Optional[pathlib.Path] = None + self.nodes = Optional[Union[List[int], List[Tuple[int, int]]]] = None + def set_algorithm(self, algorithm: str): """ Chooses the assignment algorithm. @@ -113,6 +118,7 @@ def set_save_path_files(self, save_it: bool) -> None: **save_it** (:obj:`bool`): Boolean to indicate whether paths should be saved """ self.save_path_files = save_it + raise NotImplementedError() def set_save_routes(self, where: Optional[str] = None) -> None: """ @@ -126,6 +132,42 @@ def set_save_routes(self, where: Optional[str] = None) -> None: """ self.where = pathlib.Path(where) if where is not None else None + def prepare(self, nodes: Union[List[int], List[Tuple[int, int]]]): + """ + Prepare OD pairs for batch computation. + + :Arguments: + **nodes** (:obj:`Union[list[int], list[tuple[int, int]]]`): List of node IDs to operate on. If a 1D list is + provided, OD pairs are taken to be all pair permutations of the list. If a list of pairs is provided + OD pairs are taken as is. All node IDs must be present in the compressed graph. To make a node ID + always appear in the compressed graph add it as a centroid. Duplicates will be dropped on execution. + """ + if len(nodes) == 0: + raise ValueError("`nodes` list-like empty.") + + if isinstance(nodes[0], tuple): + # Selection of OD pairs + if any(len(x) != 2 for x in nodes): + raise ValueError("`nodes` list contains non-pair elements") + self.nodes = nodes + + elif isinstance(nodes[0], int): + self.nodes = list(itertools.permutations(nodes, r=2)) + + def execute_single(self, origin: int, destination: int): + if self.__rc is None: + self.__rc = RouteChoiceSet(self.graph) + + return self.__rc.run(origin, destination, **self.paramaters) + + def execute(self, path_size_logit: bool = False): + if self.__rc is None: + self.__rc = RouteChoiceSet(self.graph) + + return self.__rc.batched( + self.nodes, bfsle=self.algorithm == "bfsle", path_size_logit=path_size_logit, **self.paramaters + ) + def info(self) -> dict: """Returns information for the transit assignment procedure @@ -173,3 +215,79 @@ def results(self): ) return self.results + + def get_load_results( + self, which: str = "uncompressed" + ) -> Union[Tuple[AequilibraeData, AequilibraeData], Tuple[AequilibraeData]]: + """ + Translates the link loading results from the graph format into the network format. + + :Returns: + **dataset** (:obj:`tuple[AequilibraeData]`): Tuple of uncompressed and compressed AequilibraE data with the link loading results. + """ + + if not isinstance(which, str) or which not in ["uncompressed", "compressed", "both"]: + raise ValueError("`which` argumnet must be one of ['uncompressed', 'compressed', 'both']") + + compressed = which == "both" or which == "compressed" + uncompressed = which == "both" or which == "uncompressed" + + fields = self.matrix.names + + tmp = self.__rc.link_loading(self.matrix, self.save_path_files) + if isinstance(tmp, dict): + self.link_loads = {k: v[0] for k, v in tmp.items()} + self.compact_link_loads = {k: v[1] for k, v in tmp.items()} + else: + self.link_loads = {fields[0]: tmp[0]} + self.compact_link_loads = {fields[0]: tmp[1]} + + # Get a mapping from the compressed graph to/from the network graph + m = _get_graph_to_network_mapping(self.graph.graph.link_id.values, self.graph.graph.direction.values) + + # Create a data store with a row for each uncompressed link + + if uncompressed: + uncompressed_res = AequilibraeData.empty( + memory_mode=True, + entries=self.graph.num_links, + field_names=fields, + data_types=[np.float64] * len(fields), + fill=np.nan, + index=self.graph.graph.link_id.values, + ) + + for k, v in self.link_loads: + # Directional Flows + uncompressed_res.data[k + "_ab"][m.network_ab_idx] = np.nan_to_num(v[m.graph_ab_idx]) + uncompressed_res.data[k + "_ba"][m.network_ba_idx] = np.nan_to_num(v[m.graph_ba_idx]) + + # Tot Flow + uncompressed_res.data[k + "_tot"] = np.nan_to_num(uncompressed_res.data[k + "_ab"]) + np.nan_to_num( + uncompressed_res.data[k + "_ba"] + ) + + if compressed: + compressed_res = AequilibraeData.empty( + memory_mode=True, + entries=self.graph.compact_num_links, + field_names=fields, + data_types=[np.float64] * len(fields), + fill=np.nan, + index=self.graph.compact_graph.id.values, + ) + + for k, v in self.compact_link_loads: + # Directional Flows + compressed_res.data[k + "_ab"][m.network_ab_idx] = np.nan_to_num(v[m.graph_ab_idx]) + compressed_res.data[k + "_ba"][m.network_ba_idx] = np.nan_to_num(v[m.graph_ba_idx]) + + # Tot Flow + compressed_res.data[k + "_tot"] = np.nan_to_num(compressed_res.data[k + "_ab"]) + np.nan_to_num( + compressed_res.data[k + "_ba"] + ) + + return ((uncompressed_res,) if uncompressed else ()) + ((compressed_res,) if compressed else ()) + + def get_select_link_results(self) -> AequilibraeData: + raise NotImplementedError() diff --git a/aequilibrae/paths/route_choice_set.pyx b/aequilibrae/paths/route_choice_set.pyx index d6ed083f0..eeccbaabd 100644 --- a/aequilibrae/paths/route_choice_set.pyx +++ b/aequilibrae/paths/route_choice_set.pyx @@ -143,7 +143,7 @@ cdef class RouteChoiceSet: self.zones = graph.num_zones self.block_flows_through_centroids = graph.block_centroid_flows - self.mapping_idx, self.mapping_data = graph.create_compressed_link_network_mapping() + self.mapping_idx, self.mapping_data, _ = graph.create_compressed_link_network_mapping() def __dealloc__(self): """