From 93a6b96eb3a24dfbebb6d0c0a0f6083166956258 Mon Sep 17 00:00:00 2001 From: edxu96 Date: Thu, 14 Jan 2021 07:56:29 +0100 Subject: [PATCH] Ignore edge contraction for now (#39) --- tests/test_conversion.py | 73 ---------------------- tests/test_graph.py | 20 +++++- vsec/__init__.py | 5 +- vsec/conversion.py | 130 --------------------------------------- vsec/geometry.py | 83 ------------------------- vsec/graph.py | 42 ++++++++++++- vsec/utils.py | 24 ++++++++ 7 files changed, 86 insertions(+), 291 deletions(-) delete mode 100644 tests/test_conversion.py delete mode 100644 vsec/conversion.py delete mode 100644 vsec/geometry.py create mode 100644 vsec/utils.py diff --git a/tests/test_conversion.py b/tests/test_conversion.py deleted file mode 100644 index c820ac6..0000000 --- a/tests/test_conversion.py +++ /dev/null @@ -1,73 +0,0 @@ -"""Test functions in ``conversion.py``.""" -import networkx as nx -from pandas.core.frame import DataFrame -import pytest as pt -from vsec.conversion import contract, split, split_vertex_df -from vsec.geometry import GeoGraph - -# There are only two edges left, but sequences might be different. -EDGES = {("n1", "n2n3"), ("n2n3", "n1"), ("n2n3", "n4"), ("n4", "n2n3")} - -# Arguments used to split multiple vertices in ``case_grid``. -ATTR = "voltage" -NAMING = lambda x: (x + "_hv", x + "_lv") -IS_FIRST = lambda x: x == 10 - -# Two columns in dataframe for resulted new vertices. -COLUMNS = {"first", "second"} - - -@pt.mark.usefixtures("case_simple") -def test_contract(case_simple: nx.Graph): - """Check if edge in a graph contracted and its terminals renamed. - - Args: - case_simple: a simple graph with 3 edges. - """ - res = contract(case_simple, "contraction") - print(res.nodes) - assert type(res) is GeoGraph - assert set(res.edges).difference(EDGES) == set() - - -@pt.mark.usefixtures("case_grid", "vertices_grid") -def test_split(case_grid: GeoGraph, vertices_grid: DataFrame): - """Check if all 10-0.4 kV transformers can be split. - - Note: - - The result is not tested thoroughly. - - The only 60-10 kV transformer is not split here. - - Args: - case_grid: a case with 207 edges. - vertices_grid: vertices in ``case_grid`` and their **type** - attributes. - """ - # Only split 10-0.4 kV transformers. - vertices_split = vertices_grid.index[vertices_grid["type"] == "STAT1004"] - - graph, vertex_df = split(case_grid, vertices_split, NAMING, ATTR, IS_FIRST) - - assert isinstance(nx.to_pandas_edgelist(graph), DataFrame) - assert nx.is_connected(graph) - assert isinstance(vertex_df, DataFrame) - assert vertex_df.index.name == "original" - assert set(vertex_df.columns) == COLUMNS - assert vertex_df.shape == (34, 2) - - -@pt.mark.usefixtures("vertices_grid") -def test_split_vertex_df(vertices_grid: DataFrame): - """Check if can gather split vertices and resulted new vertices. - - Args: - vertices_grid: vertices in ``case_grid`` and their **type** - attributes. - """ - # Only split 10-0.4 kV transformers. - vertices_split = vertices_grid.index[vertices_grid["type"] == "STAT1004"] - - res = split_vertex_df(vertices_split, NAMING) - assert set(res.columns) == COLUMNS - assert res.shape == (34, 2) - assert res.index.name == "original" diff --git a/tests/test_graph.py b/tests/test_graph.py index 2d5178a..44aa303 100644 --- a/tests/test_graph.py +++ b/tests/test_graph.py @@ -3,6 +3,7 @@ from pandas.core.frame import DataFrame import pytest as pt from vsec.graph import COLUMNS, Graph +from vsec.utils import join_terminal_labels ATTR = "level" IS_FIRST = lambda x: x == "high" @@ -13,11 +14,13 @@ ("f", "g_lv"), ("g_hv", "g_lv"), } +# There are only two edges left, but sequences might be different. +EDGES = {("n1", "n2n3"), ("n2n3", "n1"), ("n2n3", "n4"), ("n4", "n2n3")} @pt.mark.usefixtures("case_readme") def test_split(case_readme: nx.DiGraph): - """Check basic features of ``split`` method in ``GeoGraph``. + """Check basic features after one vertex splitting. Note: ``split`` method is not tested thoroughly here. @@ -52,3 +55,18 @@ def test_split(case_readme: nx.DiGraph): assert res.find_vertices_component("g_lv") == {"g_lv", "d", "f"} assert res.vertices_new == {"g_hv", "g_lv"} + + +@pt.mark.skip(reason="not sure about edge contraction yet") +@pt.mark.usefixtures("case_simple") +def test_contract(case_simple: nx.Graph): + """Check if edge in a graph contracted and its terminals renamed. + + Args: + case_simple: a simple graph with 3 edges. + """ + g = Graph(case_simple) + res = g.contract("contraction", join_terminal_labels) + print(res.nodes) + assert type(res) is Graph + assert set(res.edges).difference(EDGES) == set() diff --git a/vsec/__init__.py b/vsec/__init__.py index 5c84fe9..f975e0f 100644 --- a/vsec/__init__.py +++ b/vsec/__init__.py @@ -1,10 +1,9 @@ """Main module for ``vsec``.""" -from vsec.conversion import contract, split from vsec.graph import Graph +from vsec.utils import join_terminal_labels __version__ = "0.1.0" __all__ = [ - "contract", - "split", "Graph", + "join_terminal_labels", ] diff --git a/vsec/conversion.py b/vsec/conversion.py deleted file mode 100644 index 24281f3..0000000 --- a/vsec/conversion.py +++ /dev/null @@ -1,130 +0,0 @@ -"""Functions to convert two different graphs. - -When to specify which vertices should be split, it is not recommended to -use some vertex attribute. Multiple sets of vertices can be passed. -Different arguments can be used each time accordingly. -""" -from typing import Callable, Optional, Set, Tuple, Union - -from loguru import logger -import networkx as nx -from networkx.algorithms.minors import contracted_edge -from networkx.relabel import relabel_nodes -import pandas as pd -from pandas.core.frame import DataFrame -from vsec.geometry import GeoGraph -from vsec.graph import WeightGraph - -COLUMNS = ["first", "second"] - - -def join_terminal_labels(edge: Tuple[str, str]) -> str: - """Join labels of two terminals of a contracted edge. - - Args: - edge: an edge to be contracted. - - Returns: - Label of the new node. - """ - if not (isinstance(edge[0], str) and isinstance(edge[0], str)): - res = str(edge[0]) + str(edge[1]) - logger.warning("Some label is not string, which might be problematic.") - else: - res = edge[0] + edge[1] - return res - - -def contract( - graph: Union[WeightGraph, nx.Graph], - attr: str, - naming: Optional[Callable[[Tuple[str, str]], str]] = join_terminal_labels, -) -> GeoGraph: - """Contract edges with ``attr`` being true in ``WeightGraph``. - - Args: - graph: a weighted graph with some edges corresponding to - geographical points. - attr: some bool edge attribute indicating contraction if true. - naming: a function to name the new node based on labels of two - terminals of the contracted edge. - - Returns: - The corresponding planar geometric graph. - """ - for u, v, attributes in graph.edges(data=True): - if attributes[attr]: - edge_contracted = (u, v) - graph = contracted_edge(graph, edge_contracted, self_loops=False) - logger.debug(f"Edge {edge_contracted} has been contracted.") - - # Find which node is kept. - if u in graph.nodes: - mapping = {u: naming(edge_contracted)} - else: - mapping = {v: naming(edge_contracted)} - - # Rename the kept node. - relabel_nodes(graph, mapping, copy=False) - - return GeoGraph(graph) - - -def split( - graph: GeoGraph, - vertices: Set[str], - naming: Callable[[str], Tuple[str, str]], - attr: str, - is_first: Callable[[str], Union[bool, None]], -) -> Tuple[WeightGraph, DataFrame]: - """Split multiple vertices of a planar geometric graph. - - Args: - graph: a planar geometric graph with multiple vertex to be - split. - vertices: vertices ought to be modelled as edges. - naming: how two resulted vertices should be named. - attr: edge attribute used as input in ``is_first``. - is_first: how to choose between resulted vertices. When None - is returned, an error will be logged. - - Returns: - Resulted weighted graph, and correspondence between split - vertices & resulted new vertices. - """ - vertex_dict = {} - - for vertex in vertices: - vertex_dict[vertex] = graph.split(vertex, naming, attr, is_first) - - vertex_df = pd.DataFrame.from_dict( - vertex_dict, orient="index", columns=COLUMNS, - ) - vertex_df.index.name = "original" - - return graph, vertex_df - - -def split_vertex_df( - vertices: Set[str], naming: Callable[[str], Tuple[str, str]], -) -> DataFrame: - """Gather split vertices and resulted new vertices. - - Warning: - This should be used when vertex splitting is known to be - correct, because there is no validation. - - Args: - vertices: vertices ought to be modelled as edges. - naming: how two resulted vertices should be named. - - Returns: - Correspondence between split vertices & resulted new vertices. - """ - vertex_dict = {} - for vertex in vertices: - vertex_dict[vertex] = naming(vertex) - - res = pd.DataFrame.from_dict(vertex_dict, orient="index", columns=COLUMNS,) - res.index.name = "original" - return res diff --git a/vsec/geometry.py b/vsec/geometry.py deleted file mode 100644 index b1e5f9d..0000000 --- a/vsec/geometry.py +++ /dev/null @@ -1,83 +0,0 @@ -"""Classes and functions to build planar geometric graph. - -Geographically, some edges might look like points. They can be -contracted to nodes. -""" -from typing import Callable, Optional, Tuple, Union - -from loguru import logger -import networkx as nx -from vsec.graph import WeightGraph - - -class GeoGraph(WeightGraph): - """Planar geometric graph with geographical information. - - Note: - ``GeoGraph`` is based on ``WeightGraph`` class, so add their - common features like ``df_edges`` and ``complete_edge_attr`` - in ``WeightGraph``. - """ - - def __init__(self, g: Optional[nx.Graph] = None): - """Init an empty directed graph or existing directed graph. - - Args: - g: an existing directed graph. Default to be None. - """ - if not g: - super().__init__() - else: - super().__init__(g) - - def split( - self, - vertex: str, - naming: Callable[[str], Tuple[str, str]], - attr: str, - is_first: Callable[[str], Union[bool, None]], - ) -> Tuple[str, str]: - """Split a vertex and handle new vertices and associated edges. - - Args: - vertex: a vertex ought to be modelled as an edge. - naming: how two resulted vertices should be named. - attr: edge attribute used as input in ``is_first``. - is_first: how to choose between resulted vertices. When None - is returned, an error will be logged. - - Returns: - Two resulted vertices. - """ - edges_asso = list(self.edges(nbunch=vertex, data=True)) - vertices = naming(vertex) - - # Rename terminals of associated edges. - for u, v, attributes in edges_asso: - if is_first(attributes[attr]) is None: - vertex_new = None - elif is_first(attributes[attr]): - vertex_new = vertices[0] - else: - vertex_new = vertices[1] - - if vertex_new: - self.remove_edge(u, v) - if u == vertex: - self.add_edge(vertex_new, v, **attributes) - else: - self.add_edge(u, vertex_new, **attributes) - else: - logger.critical( - f"Unable to determine new terminal of edge ({u}, {v}) " - f"with attributes {attributes}." - ) - vertices = None - break - - # Add resulted new edge. - if vertices: - self.add_edge(vertices[0], vertices[1], split_=True) - self.remove_node(vertex) - - return vertices diff --git a/vsec/graph.py b/vsec/graph.py index 7c2c518..d14fdd4 100644 --- a/vsec/graph.py +++ b/vsec/graph.py @@ -1,8 +1,14 @@ -"""A class for two operations at the same time.""" +"""A class for two operations at the same time. + +Any modification after the initiation should be avoided, or many methods +and properties will not work as expected. +""" from typing import Callable, Optional, Set, Tuple, Union from loguru import logger import networkx as nx +from networkx.algorithms.minors import contracted_edge +from networkx.relabel import relabel_nodes import pandas as pd from pandas.core.frame import DataFrame @@ -283,3 +289,37 @@ def is_connected_graph(self) -> bool: True if this undirected graph is connected. """ return nx.is_connected(self.to_undirected()) + + def contract( + self, attr: str, naming: Optional[Callable[[Tuple[str, str]], str]], + ) -> nx.Graph: + """Contract edges with ``attr`` being true in ``Graph``. + + Args: + attr: some bool edge attribute indicating contraction if + true. + naming: a function to name the new node based on labels of + two terminals of the contracted edge. + + Returns: + A graph with less edge(s). + """ + graph = nx.Graph(self) + for u, v, attributes in self.edges(data=True): + if attributes[attr]: + edge_contracted = (u, v) + graph = contracted_edge( + graph, edge_contracted, self_loops=False + ) + logger.debug(f"Edge {edge_contracted} has been contracted.") + + # Find which node is kept. + if u in graph.nodes: + mapping = {u: naming(edge_contracted)} + else: + mapping = {v: naming(edge_contracted)} + + # Rename the kept node. + relabel_nodes(graph, mapping, copy=False) + + return graph diff --git a/vsec/utils.py b/vsec/utils.py new file mode 100644 index 0000000..8071213 --- /dev/null +++ b/vsec/utils.py @@ -0,0 +1,24 @@ +"""Utility functions to handle associated dataframes.""" +from typing import Tuple + +from loguru import logger + + +def join_terminal_labels(edge: Tuple[str, str]) -> str: + """Join labels of two terminals of a contracted edge. + + Args: + edge: an edge to be contracted. + + Returns: + Label of the new node. + """ + if not (isinstance(edge[0], str) and isinstance(edge[0], str)): + res = str(edge[0]) + str(edge[1]) + logger.warning("Some label is not string, which might be problematic.") + else: + res = edge[0] + edge[1] + return res + + +# TODO: (#26) function to rename dataframes associated with edges.