diff --git a/graph.py b/graph.py index 219c1e94..871e91b3 100644 --- a/graph.py +++ b/graph.py @@ -1,5 +1,7 @@ """Module Graph of kytos/pathfinder Kytos Network Application.""" +from itertools import combinations + from kytos.core import log try: @@ -9,20 +11,20 @@ PACKAGE = 'networkx>=2.2' log.error(f"Package {PACKAGE} not found. Please 'pip install {PACKAGE}'") -from itertools import combinations class Filter: + """Class responsible for removing items with disqualifying values.""" + def __init__(self, filter_type, filter_function): self._filter_type = filter_type - self._filter_fun = filter_function + self._filter_function = filter_function - def run(self,value, items): + def run(self, value, items): + """Filter out items.""" if isinstance(value, self._filter_type): - fun0 = self._filter_fun(value) - return filter(fun0, items) - else: - raise TypeError(f"Expected type: {self._filter_type}") + return filter(self._filter_function(value), items) + raise TypeError(f"Expected type: {self._filter_type}") class KytosGraph: @@ -30,26 +32,30 @@ class KytosGraph: def __init__(self): self.graph = nx.Graph() - self._filter_fun_dict = {} - def filterLEQ(metric):# Lower values are better - return lambda x: (lambda y: y[2].get(metric,x) <= x) - def filterGEQ(metric):# Higher values are better - return lambda x: (lambda y: y[2].get(metric,x) >= x) - def filterEEQ(metric):# Equivalence - return lambda x: (lambda y: y[2].get(metric,x) == x) - - - self._filter_fun_dict["ownership"] = Filter(str,filterEEQ("ownership")) - self._filter_fun_dict["bandwidth"] = Filter((int,float),filterGEQ("bandwidth")) - self._filter_fun_dict["priority"] = Filter((int,float),filterGEQ("priority")) - self._filter_fun_dict["reliability"] = Filter((int,float),filterGEQ("reliability")) - self._filter_fun_dict["utilization"] = Filter((int,float),filterLEQ("utilization")) - self._filter_fun_dict["delay"] = Filter((int,float),filterLEQ("delay")) - self._path_fun = nx.all_shortest_paths - - - def set_path_fun(self, path_fun): - self._path_fun = path_fun + self._filter_functions = {} + + def filter_leq(metric): # Lower values are better + return lambda x: (lambda y: y[2].get(metric, x) <= x) + + def filter_geq(metric): # Higher values are better + return lambda x: (lambda y: y[2].get(metric, x) >= x) + + def filter_eeq(metric): # Equivalence + return lambda x: (lambda y: y[2].get(metric, x) == x) + + self._filter_functions["ownership"] = Filter( + str, filter_eeq("ownership")) + self._filter_functions["bandwidth"] = Filter( + (int, float), filter_geq("bandwidth")) + self._filter_functions["priority"] = Filter( + (int, float), filter_geq("priority")) + self._filter_functions["reliability"] = Filter( + (int, float), filter_geq("reliability")) + self._filter_functions["utilization"] = Filter( + (int, float), filter_leq("utilization")) + self._filter_functions["delay"] = Filter( + (int, float), filter_leq("delay")) + self._path_function = nx.all_shortest_paths def clear(self): """Remove all nodes and links registered.""" @@ -86,9 +92,22 @@ def update_links(self, links): endpoint_b = link.endpoint_b.id self.graph[endpoint_a][endpoint_b][key] = value - def get_metadata_from_link(self, endpoint_a, endpoint_b): + self._set_default_metadata(keys) + + def _set_default_metadata(self, keys): + """Set metadata to all links. + + Set the value to zero for inexistent metadata in a link to make those + irrelevant in pathfinding. + """ + for key in keys: + for endpoint_a, endpoint_b in self.graph.edges: + if key not in self.graph[endpoint_a][endpoint_b]: + self.graph[endpoint_a][endpoint_b][key] = 0 + + def get_link_metadata(self, endpoint_a, endpoint_b): """Return the metadata of a link.""" - return self.graph.edges[endpoint_a, endpoint_b] + return self.graph.get_edge_data(endpoint_a, endpoint_b) @staticmethod def _remove_switch_hops(circuit): @@ -100,44 +119,45 @@ def _remove_switch_hops(circuit): def shortest_paths(self, source, destination, parameter=None): """Calculate the shortest paths and return them.""" try: - paths = list(self._path_fun(self.graph, - source, destination, parameter)) + paths = list(nx.shortest_simple_paths(self.graph, + source, destination, + parameter)) except (NodeNotFound, NetworkXNoPath): return [] return paths - def constrained_flexible_paths(self, source, destination, metrics, flexible_metrics, flexible = None): - default_edge_list = self.graph.edges(data=True) - default_edge_list = self._filter_edges(default_edge_list,**metrics) - default_edge_list = list(default_edge_list) - length = len(flexible_metrics) - if flexible is None: - flexible = length - flexible = max(0,flexible) - flexible = min(length,flexible) + def constrained_flexible_paths(self, source, destination, + minimum_hits=None, **metrics): + """Calculate the constrained shortest paths with flexibility.""" + base = metrics.get("base", {}) + flexible = metrics.get("flexible", {}) + first_pass_links = list(self._filter_links(self.graph.edges(data=True), + **base)) + length = len(flexible) + if minimum_hits is None: + minimum_hits = length + minimum_hits = min(length, max(0, minimum_hits)) results = [] - stop = False - for i in range(0,flexible+1): - if stop: - break - y = combinations(flexible_metrics.items(),length-i) - for x in y: - tempDict = {} - for k,v in x: - tempDict[k] = v - edges = self._filter_edges(default_edge_list,**tempDict) - edges = ((u,v) for u,v,d in edges) - res0 = self._constrained_shortest_paths(source,destination,edges) - if res0 != []: - results.append({"paths":res0, "metrics":{**metrics, **tempDict}}) - stop = True + paths = [] + i = 0 + while (paths == [] and i in range(0, minimum_hits+1)): + for combo in combinations(flexible.items(), length-i): + additional = dict(combo) + paths = self._constrained_shortest_paths( + source, destination, + self._filter_links(first_pass_links, + metadata=False, **additional)) + if paths != []: + results.append( + {"paths": paths, "metrics": {**base, **additional}}) + i = i + 1 return results - def _constrained_shortest_paths(self, source, destination, edges): + def _constrained_shortest_paths(self, source, destination, links): paths = [] try: - paths = list(self._path_fun(self.graph.edge_subgraph(edges), - source, destination)) + paths = list(self._path_function(self.graph.edge_subgraph(links), + source, destination)) except NetworkXNoPath: pass except NodeNotFound: @@ -146,12 +166,14 @@ def _constrained_shortest_paths(self, source, destination, edges): paths = [[source]] return paths - def _filter_edges(self, edges, **metrics): + def _filter_links(self, links, metadata=True, **metrics): for metric, value in metrics.items(): - fil = self._filter_fun_dict.get(metric, None) - if fil != None: + filter_ = self._filter_functions.get(metric, None) + if filter_ is not None: try: - edges = fil.run(value,edges) + links = filter_.run(value, links) except TypeError as err: - raise TypeError(f"Error in {metric} filter: {err}") - return edges + raise TypeError(f"Error in {metric} value: {err}") + if not metadata: + links = ((u, v) for u, v, d in links) + return links diff --git a/main.py b/main.py index f3618a6f..85ac18c2 100644 --- a/main.py +++ b/main.py @@ -89,38 +89,24 @@ def shortest_path(self): paths = self._filter_paths(paths, desired, undesired) return jsonify({'paths': paths}) - @rest('v3/', methods=['POST']) + @rest('v2/best-constrained-paths', methods=['POST']) def shortest_constrained_path(self): """Get the set of shortest paths between the source and destination.""" data = request.get_json() source = data.get('source') destination = data.get('destination') - flexible = data.get('flexible', 0) - metrics = data.get('metrics',{}) - try: - paths = self.graph.constrained_flexible_paths(source, destination,{},metrics,flexible) - return jsonify(paths) - except TypeError as err: - return jsonify({"error":err}) - - - @rest('v4/', methods=['POST']) - def shortest_constrained_path2(self): - """Get the set of shortest paths between the source and destination.""" - data = request.get_json() - - source = data.get('source') - destination = data.get('destination') - metrics = data.get('metrics',{}) - flexible_metrics = data.get('flexibleMetrics', {}) + base_metrics = data.get('base_metrics', {}) + fle_metrics = data.get('flexible_metrics', {}) + minimum_hits = data.get('minimum_flexible_hits') try: paths = self.graph.constrained_flexible_paths(source, destination, - metrics, flexible_metrics) + minimum_hits, + base=base_metrics, + flexible=fle_metrics) return jsonify(paths) except TypeError as err: - return jsonify({"error":err}) - + return jsonify({"error": str(err)}), 400 @listen_to('kytos.topology.updated') def update_topology(self, event): diff --git a/tests/helpers.py b/tests/helpers.py index 2626d86f..72de5bf7 100644 --- a/tests/helpers.py +++ b/tests/helpers.py @@ -37,3 +37,107 @@ def get_topology_mock(): switch_b.dpid: switch_b, switch_c.dpid: switch_c} return topology + + +def get_topology_with_metadata_mock(): + """Create a topology with metadata.""" + switches = {} + interfaces = {} + links = {} + i = 0 + + switches_to_interface_counts = {"S1": 2, "S2": 2, "S3": 6, "S4": 2, + "S5": 6, "S6": 5, "S7": 2, "S8": 8, + "S9": 4, "S10": 3, "S11": 3, + "User1": 4, "User2": 2, + "User3": 2, "User4": 3} + + links_to_interfaces = [["S1:1", "S2:1"], ["S1:2", "User1:1"], + ["S2:2", "User4:1"], ["S3:1", "S5:1"], + ["S3:2", "S7:1"], ["S3:3", "S8:1"], + ["S3:4", "S11:1"], + ["S3:5", "User3:1"], ["S3:6", "User4:2"], + ["S4:1", "S5:2"], ["S4:2", "User1:2"], + ["S5:3", "S6:1"], + ["S5:4", "S6:2"], ["S5:5", "S8:2"], + ["S5:6", "User1:3"], ["S6:3", "S9:1"], + ["S6:4", "S9:2"], ["S6:5", "S10:1"], + ["S7:2", "S8:3"], + ["S8:4", "S9:3"], ["S8:5", "S9:4"], + ["S8:6", "S10:2"], + ["S8:7", "S11:2"], ["S8:8", "User3:2"], + ["S10:3", "User2:1"], ["S11:3", "User2:2"], + ["User1:4", "User4:3"]] + + links_to_metadata = [ + {"reliability": 5, "bandwidth": 100, "delay": 105}, + {"reliability": 5, "bandwidth": 100, "delay": 1}, + {"reliability": 5, "bandwidth": 100, "delay": 10}, + {"reliability": 5, "bandwidth": 10, "delay": 112}, + {"reliability": 5, "bandwidth": 100, "delay": 1}, + {"reliability": 5, "bandwidth": 100, "delay": 1}, + {"reliability": 3, "bandwidth": 100, "delay": 6}, + {"reliability": 5, "bandwidth": 100, "delay": 1}, + {"reliability": 5, "bandwidth": 100, "delay": 10}, + {"reliability": 1, "bandwidth": 100, "delay": 30, "ownership": "A"}, + {"reliability": 3, "bandwidth": 100, "delay": 110, "ownership": "A"}, + {"reliability": 1, "bandwidth": 100, "delay": 40}, + {"reliability": 3, "bandwidth": 100, "delay": 40, "ownership": "A"}, + {"reliability": 5, "bandwidth": 100, "delay": 112}, + {"reliability": 3, "bandwidth": 100, "delay": 60}, + {"reliability": 3, "bandwidth": 100, "delay": 60}, + {"reliability": 5, "bandwidth": 100, "delay": 62}, + {"bandwidth": 100, "delay": 108, "ownership": "A"}, + {"reliability": 5, "bandwidth": 100, "delay": 1}, + {"reliability": 3, "bandwidth": 100, "delay": 32}, + {"reliability": 3, "bandwidth": 100, "delay": 110}, + {"reliability": 5, "bandwidth": 100, "ownership": "A"}, + {"reliability": 3, "bandwidth": 100, "delay": 7}, + {"reliability": 5, "bandwidth": 100, "delay": 1}, + {"reliability": 3, "bandwidth": 100, "delay": 10, "ownership": "A"}, + {"reliability": 3, "bandwidth": 100, "delay": 6}, + {"reliability": 5, "bandwidth": 10, "delay": 105}] + + for switch in switches_to_interface_counts: + switches[switch] = get_switch_mock(switch) + + for key, value in switches_to_interface_counts.items(): + switches[key].interfaces = {} + for interface in _get_interfaces(value, switches[key]): + switches[key].interfaces[interface.id] = interface + interfaces[interface.id] = interface + + for interfaces_str in links_to_interfaces: + interface_a = interfaces[interfaces_str[0]] + interface_b = interfaces[interfaces_str[1]] + links[str(i)] = get_link_mock(interface_a, interface_b) + links[str(i)].metadata = links_to_metadata[i] + i += 1 + + topology = MagicMock() + topology.links = links + topology.switches = switches + return topology + + +def _get_interfaces(count, switch): + """Add a new interface to the list of interfaces.""" + for i in range(1, count + 1): + yield get_interface_mock("", i, switch) + + +def get_filter_links_fake(links, metadata=True, **metrics): + """Get test links with optional metadata.""" + # pylint: disable=unused-argument + filtered_links = ["a", "b", "c"] + filtered_links_without_metadata = filtered_links[0:2] + if not metadata: + return filtered_links_without_metadata + return filtered_links + +# pylint: enable=unused-argument + + +def get_test_filter_function(): + """Get minimum filter function.""" + return lambda x: (lambda y: y >= x) diff --git a/tests/integration/__init__.py b/tests/integration/__init__.py new file mode 100644 index 00000000..759025eb --- /dev/null +++ b/tests/integration/__init__.py @@ -0,0 +1 @@ +"""kytos/pathfinder subsystem tests.""" diff --git a/tests/integration/test_results.py b/tests/integration/test_results.py new file mode 100644 index 00000000..efe0176e --- /dev/null +++ b/tests/integration/test_results.py @@ -0,0 +1,68 @@ +"""Module to test the KytosGraph in graph.py.""" +from unittest import TestCase + +from kytos.core.interface import Interface +from kytos.core.link import Link +from kytos.core.switch import Switch + +# module under test +from graph import KytosGraph + + +class TestResults(TestCase): + """Tests for the graph class.""" + + def setUp(self): + """Setup for most tests""" + switches, links = self.generate_topology() + self.graph = KytosGraph() + self.graph.clear() + self.graph.update_nodes(switches) + self.graph.update_links(links) + + def get_path(self, source, destination): + """Return the shortest path""" + results = self.graph.shortest_paths(source, destination) + return results + + def get_path_constrained(self, source, destination, minimum_hits=None, + **metrics): + """Return the constrained shortest path""" + return self.graph.constrained_flexible_paths(source, destination, + minimum_hits, + **metrics) + + @staticmethod + def generate_topology(): + """Generates a predetermined topology""" + switches = {} + links = {} + return (switches, links) + + @staticmethod + def create_switch(name, switches): + '''Add a new switch to the list of switches''' + switches[name] = Switch(name) + + @staticmethod + def add_interfaces(count, switch, interfaces): + '''Add a new interface to the list of interfaces''' + for i in range(1, count + 1): + str1 = "{}:{}".format(switch.dpid, i) + interface = Interface(str1, i, switch) + interfaces[str1] = interface + switch.update_interface(interface) + + @staticmethod + def create_link(interface_a, interface_b, interfaces, links): + '''Add a new link between two interfaces into the list of links''' + compounded = "{}|{}".format(interface_a, interface_b) + final_name = compounded + links[final_name] = Link( + interfaces[interface_a], interfaces[interface_b]) + + @staticmethod + def add_metadata_to_link(interface_a, interface_b, metrics, links): + '''Add metadata to an existing link in the list of links''' + compounded = "{}|{}".format(interface_a, interface_b) + links[compounded].extend_metadata(metrics) diff --git a/tests/test_graph2.py b/tests/integration/test_results_edges.py similarity index 61% rename from tests/test_graph2.py rename to tests/integration/test_results_edges.py index 4a8e111c..4d479b6e 100644 --- a/tests/test_graph2.py +++ b/tests/integration/test_results_edges.py @@ -1,46 +1,42 @@ """Module to test the KytosGraph in graph.py.""" -from unittest import TestCase -from unittest.mock import Mock from itertools import combinations -import networkx as nx +# Core modules to import +from kytos.core.link import Link # module under test -from graph import KytosGraph +from tests.integration.test_results import TestResults -from tests.test_graph import TestKytosGraph -# Core modules to import -from kytos.core.switch import Switch -from kytos.core.interface import Interface -from kytos.core.link import Link +class TestResultsEdges(TestResults): + """Tests for the graph class. -class TestGraph2(TestKytosGraph): + Tests to see if reflexive searches and impossible searches + show correct results. + """ def test_path1(self): """Tests paths between all users using unconstrained path alogrithm.""" - self.setup() - combos = combinations(["User1","User2","User3","User4"],2) + combos = combinations(["User1", "User2", "User3", "User4"], 2) for point_a, point_b in combos: - results = self.get_path(point_a,point_b) + results = self.get_path(point_a, point_b) self.assertNotEqual(results, []) def test_path2(self): """Tests paths between all users using constrained path algorithm, with no constraints set.""" - self.setup() - combos = combinations(["User1","User2","User3","User4"],2) + combos = combinations(["User1", "User2", "User3", "User4"], 2) for point_a, point_b in combos: - results = self.get_path_constrained(point_a,point_b) + results = self.get_path_constrained(point_a, point_b) self.assertNotEqual(results, []) def test_path3(self): """Tests paths between all users using constrained path algorithm, with the ownership constraint set to B.""" - self.setup() - combos = combinations(["User1","User2","User3","User4"],2) + combos = combinations(["User1", "User2", "User3", "User4"], 2) for point_a, point_b in combos: - results = self.get_path_constrained(point_a, point_b, 0, ownership = "B") + results = self.get_path_constrained( + point_a, point_b, base=dict(ownership="B")) for result in results: for path in result["paths"]: self.assertNotIn("S4:1", path) @@ -59,10 +55,10 @@ def test_path3(self): def test_path4(self): """Tests paths between all users using constrained path algorithm, with the reliability constraint set to 3.""" - self.setup() - combos = combinations(["User1","User2","User3","User4"],2) + combos = combinations(["User1", "User2", "User3", "User4"], 2) for point_a, point_b in combos: - results = self.get_path_constrained(point_a, point_b, 0, reliability = 3) + results = self.get_path_constrained( + point_a, point_b, base=dict(reliability=3)) for result in results: for path in result["paths"]: self.assertNotIn("S4:1", path) @@ -73,10 +69,10 @@ def test_path4(self): def test_path5(self): """Tests paths between all users using constrained path algorithm, with the bandwidth contraint set to 100.""" - self.setup() - combos = combinations(["User1","User2","User3","User4"],2) + combos = combinations(["User1", "User2", "User3", "User4"], 2) for point_a, point_b in combos: - results = self.get_path_constrained(point_a, point_b, 0, bandwidth = 100) + results = self.get_path_constrained( + point_a, point_b, base=dict(bandwidth=100)) for result in results: for path in result["paths"]: self.assertNotIn("S3:1", path) @@ -87,10 +83,10 @@ def test_path5(self): def test_path6(self): """Tests paths between all users using constrained path algorithm, with the delay constraint set to 50.""" - self.setup() - combos = combinations(["User1","User2","User3","User4"],2) + combos = combinations(["User1", "User2", "User3", "User4"], 2) for point_a, point_b in combos: - results = self.get_path_constrained(point_a, point_b, 0, delay = 50) + results = self.get_path_constrained( + point_a, point_b, base=dict(delay=50)) for result in results: for path in result["paths"]: self.assertNotIn("S1:1", path) @@ -113,15 +109,18 @@ def test_path6(self): self.assertNotIn("S9:4", path) self.assertNotIn("User1:4", path) self.assertNotIn("User4:3", path) - + def test_path7(self): """Tests paths between all users using constrained path algorithm, - with the delay constraint set to 50, the bandwidth constraint set to 100, - the reliability contraint set to 3, and the ownership constraint set to 'B' """ - self.setup() - combos = combinations(["User1","User2","User3","User4"],2) + with the delay constraint set to 50, the bandwidth constraint set + to 100, the reliability contraint set to 3, and the ownership + constraint set to 'B' """ + combos = combinations(["User1", "User2", "User3", "User4"], 2) for point_a, point_b in combos: - results = self.get_path_constrained(point_a, point_b, 0, delay = 50, bandwidth = 100, reliability = 3, ownership = "B") + results = self.get_path_constrained( + point_a, point_b, base=dict(delay=50, bandwidth=100, + reliability=3, + ownership="B")) for result in results: for path in result["paths"]: # delay = 50 checks @@ -177,14 +176,17 @@ def test_path7(self): def test_path8(self): """Tests paths between all users using constrained path algorithm, - with the delay constraint set to 50, the bandwidth constraint set to 100, - the reliability contraint set to 3, and the ownership constraint set to 'B' - + with the delay constraint set to 50, the bandwidth constraint + set to 100, the reliability contraint set to 3, and the ownership + constraint set to 'B' + Tests conducted with flexibility enabled""" - self.setup() - combos = combinations(["User1","User2","User3","User4"],2) + combos = combinations(["User1", "User2", "User3", "User4"], 2) for point_a, point_b in combos: - results = self.get_path_constrained(point_a, point_b, 4, delay = 50, bandwidth = 100, reliability = 3, ownership = "B") + results = self.get_path_constrained( + point_a, point_b, flexible=dict(delay=50, bandwidth=100, + reliability=3, + ownership="B")) for result in results: # delay = 50 checks if "delay" in result["metrics"]: @@ -244,14 +246,18 @@ def test_path8(self): def test_path9(self): """Tests paths between all users using constrained path algorithm, - with the delay constraint set to 50, the bandwidth constraint set to 100, - the reliability contraint set to 3, and the ownership constraint set to 'B' - + with the delay constraint set to 50, the bandwidth constraint + set to 100, the reliability contraint set to 3, and the ownership + constraint set to 'B' + Tests conducted with all but ownership flexible""" - self.setup() - combos = combinations(["User1","User2","User3","User4"],2) + combos = combinations(["User1", "User2", "User3", "User4"], 2) for point_a, point_b in combos: - results = self.get_path_constrained2(point_a, point_b, {"ownership":"B"}, {"delay": 50, "bandwidth":100, "reliability": 3}) + results = self.get_path_constrained(point_a, point_b, + base={"ownership": "B"}, + flexible={"delay": 50, + "bandwidth": 100, + "reliability": 3}) for result in results: # delay = 50 checks if "delay" in result["metrics"]: @@ -311,144 +317,192 @@ def test_path9(self): def test_path10(self): """Tests that TypeError is generated by get_path_constrained - + Tests with ownership using an int type rather than string""" - self.setup() with self.assertRaises(TypeError): - self.get_path_constrained2("User1", "User2", {"ownership":1}) - + self.get_path_constrained( + "User1", "User2", base={"ownership": 1}) @staticmethod - def generateTopology(): + def generate_topology(): """Generates a predetermined topology""" switches = {} interfaces = {} links = {} - TestKytosGraph.createSwitch("S1", switches) - TestKytosGraph.addInterfaces(2, switches["S1"], interfaces) + TestResults.create_switch("S1", switches) + TestResults.add_interfaces(2, switches["S1"], interfaces) + + TestResults.create_switch("S2", switches) + TestResults.add_interfaces(2, switches["S2"], interfaces) - TestKytosGraph.createSwitch("S2", switches) - TestKytosGraph.addInterfaces(2, switches["S2"], interfaces) + TestResults.create_switch("S3", switches) + TestResults.add_interfaces(6, switches["S3"], interfaces) - TestKytosGraph.createSwitch("S3", switches) - TestKytosGraph.addInterfaces(6, switches["S3"], interfaces) + TestResults.create_switch("S4", switches) + TestResults.add_interfaces(2, switches["S4"], interfaces) - TestKytosGraph.createSwitch("S4", switches) - TestKytosGraph.addInterfaces(2, switches["S4"], interfaces) + TestResults.create_switch("S5", switches) + TestResults.add_interfaces(6, switches["S5"], interfaces) - TestKytosGraph.createSwitch("S5", switches) - TestKytosGraph.addInterfaces(6, switches["S5"], interfaces) + TestResults.create_switch("S6", switches) + TestResults.add_interfaces(5, switches["S6"], interfaces) - TestKytosGraph.createSwitch("S6", switches) - TestKytosGraph.addInterfaces(5, switches["S6"], interfaces) + TestResults.create_switch("S7", switches) + TestResults.add_interfaces(2, switches["S7"], interfaces) - TestKytosGraph.createSwitch("S7", switches) - TestKytosGraph.addInterfaces(2, switches["S7"], interfaces) + TestResults.create_switch("S8", switches) + TestResults.add_interfaces(8, switches["S8"], interfaces) - TestKytosGraph.createSwitch("S8", switches) - TestKytosGraph.addInterfaces(8, switches["S8"], interfaces) + TestResults.create_switch("S9", switches) + TestResults.add_interfaces(4, switches["S9"], interfaces) - TestKytosGraph.createSwitch("S9", switches) - TestKytosGraph.addInterfaces(4, switches["S9"], interfaces) + TestResults.create_switch("S10", switches) + TestResults.add_interfaces(3, switches["S10"], interfaces) - TestKytosGraph.createSwitch("S10", switches) - TestKytosGraph.addInterfaces(3, switches["S10"], interfaces) + TestResults.create_switch("S11", switches) + TestResults.add_interfaces(3, switches["S11"], interfaces) - TestKytosGraph.createSwitch("S11", switches) - TestKytosGraph.addInterfaces(3, switches["S11"], interfaces) + TestResults.create_switch("User1", switches) + TestResults.add_interfaces(4, switches["User1"], interfaces) - TestKytosGraph.createSwitch("User1", switches) - TestKytosGraph.addInterfaces(4, switches["User1"], interfaces) + TestResults.create_switch("User2", switches) + TestResults.add_interfaces(2, switches["User2"], interfaces) - TestKytosGraph.createSwitch("User2", switches) - TestKytosGraph.addInterfaces(2, switches["User2"], interfaces) + TestResults.create_switch("User3", switches) + TestResults.add_interfaces(2, switches["User3"], interfaces) - TestKytosGraph.createSwitch("User3", switches) - TestKytosGraph.addInterfaces(2, switches["User3"], interfaces) + TestResults.create_switch("User4", switches) + TestResults.add_interfaces(3, switches["User4"], interfaces) - TestKytosGraph.createSwitch("User4", switches) - TestKytosGraph.addInterfaces(3, switches["User4"], interfaces) + TestResultsEdges._fill_links(links, interfaces) + TestResultsEdges._add_metadata_to_links(links) + return (switches, links) + + @staticmethod + def _add_metadata_to_links(links): + links["S1:1<->S2:1"].extend_metadata( + {"reliability": 5, "bandwidth": 100, "delay": 105}) + links["S1:2<->User1:1"].extend_metadata( + {"reliability": 5, "bandwidth": 100, "delay": 1}) + links["S2:2<->User4:1"].extend_metadata( + {"reliability": 5, "bandwidth": 100, "delay": 10}) + links["S3:1<->S5:1"].extend_metadata( + {"reliability": 5, "bandwidth": 10, "delay": 112}) + links["S3:2<->S7:1"].extend_metadata( + {"reliability": 5, "bandwidth": 100, "delay": 1}) + links["S3:3<->S8:1"].extend_metadata( + {"reliability": 5, "bandwidth": 100, "delay": 1}) + links["S3:4<->S11:1"].extend_metadata( + {"reliability": 3, "bandwidth": 100, "delay": 6}) + links["S3:5<->User3:1"].extend_metadata( + {"reliability": 5, "bandwidth": 100, "delay": 1}) + links["S3:6<->User4:2"].extend_metadata( + {"reliability": 5, "bandwidth": 100, "delay": 10}) + links["S4:1<->S5:2"].extend_metadata( + {"reliability": 1, "bandwidth": 100, "delay": 30, + "ownership": "A"}) + links["S4:2<->User1:2"].extend_metadata( + {"reliability": 3, "bandwidth": 100, "delay": 110, + "ownership": "A"}) + links["S5:3<->S6:1"].extend_metadata( + {"reliability": 1, "bandwidth": 100, "delay": 40}) + links["S5:4<->S6:2"].extend_metadata( + {"reliability": 3, "bandwidth": 100, "delay": 40, + "ownership": "A"}) + links["S5:5<->S8:2"].extend_metadata( + {"reliability": 5, "bandwidth": 100, "delay": 112}) + links["S5:6<->User1:3"].extend_metadata( + {"reliability": 3, "bandwidth": 100, "delay": 60}) + links["S6:3<->S9:1"].extend_metadata( + {"reliability": 3, "bandwidth": 100, "delay": 60}) + links["S6:4<->S9:2"].extend_metadata( + {"reliability": 5, "bandwidth": 100, "delay": 62}) + links["S6:5<->S10:1"].extend_metadata( + {"bandwidth": 100, "delay": 108, "ownership": "A"}) + links["S7:2<->S8:3"].extend_metadata( + {"reliability": 5, "bandwidth": 100, "delay": 1}) + links["S8:4<->S9:3"].extend_metadata( + {"reliability": 3, "bandwidth": 100, "delay": 32}) + links["S8:5<->S9:4"].extend_metadata( + {"reliability": 3, "bandwidth": 100, "delay": 110}) + links["S8:6<->S10:2"].extend_metadata( + {"reliability": 5, "bandwidth": 100, "ownership": "A"}) + links["S8:7<->S11:2"].extend_metadata( + {"reliability": 3, "bandwidth": 100, "delay": 7}) + links["S8:8<->User3:2"].extend_metadata( + {"reliability": 5, "bandwidth": 100, "delay": 1}) + links["S10:3<->User2:1"].extend_metadata( + {"reliability": 3, "bandwidth": 100, "delay": 10, + "ownership": "A"}) + links["S11:3<->User2:2"].extend_metadata( + {"reliability": 3, "bandwidth": 100, "delay": 6}) + links["User1:4<->User4:3"].extend_metadata( + {"reliability": 5, "bandwidth": 10, "delay": 105}) + + @staticmethod + def _fill_links(links, interfaces): links["S1:1<->S2:1"] = Link(interfaces["S1:1"], interfaces["S2:1"]) - links["S1:1<->S2:1"].extend_metadata({"reliability": 5, "bandwidth": 100, "delay": 105}) - links["S1:2<->User1:1"] = Link(interfaces["S1:2"], interfaces["User1:1"]) - links["S1:2<->User1:1"].extend_metadata({"reliability": 5, "bandwidth": 100, "delay": 1}) + links["S1:2<->User1:1"] = Link(interfaces["S1:2"], + interfaces["User1:1"]) - links["S2:2<->User4:1"] = Link(interfaces["S2:2"], interfaces["User4:1"]) - links["S2:2<->User4:1"].extend_metadata({"reliability": 5, "bandwidth": 100, "delay": 10}) + links["S2:2<->User4:1"] = Link(interfaces["S2:2"], + interfaces["User4:1"]) links["S3:1<->S5:1"] = Link(interfaces["S3:1"], interfaces["S5:1"]) - links["S3:1<->S5:1"].extend_metadata({"reliability": 5, "bandwidth": 10, "delay": 112}) links["S3:2<->S7:1"] = Link(interfaces["S3:2"], interfaces["S7:1"]) - links["S3:2<->S7:1"].extend_metadata({"reliability": 5, "bandwidth": 100, "delay": 1}) links["S3:3<->S8:1"] = Link(interfaces["S3:3"], interfaces["S8:1"]) - links["S3:3<->S8:1"].extend_metadata({"reliability": 5, "bandwidth": 100, "delay": 1}) links["S3:4<->S11:1"] = Link(interfaces["S3:4"], interfaces["S11:1"]) - links["S3:4<->S11:1"].extend_metadata({"reliability": 3, "bandwidth": 100, "delay": 6}) - links["S3:5<->User3:1"] = Link(interfaces["S3:5"], interfaces["User3:1"]) - links["S3:5<->User3:1"].extend_metadata({"reliability": 5, "bandwidth": 100, "delay": 1}) + links["S3:5<->User3:1"] = Link(interfaces["S3:5"], + interfaces["User3:1"]) - links["S3:6<->User4:2"] = Link(interfaces["S3:6"], interfaces["User4:2"]) - links["S3:6<->User4:2"].extend_metadata({"reliability": 5, "bandwidth": 100, "delay": 10}) + links["S3:6<->User4:2"] = Link(interfaces["S3:6"], + interfaces["User4:2"]) links["S4:1<->S5:2"] = Link(interfaces["S4:1"], interfaces["S5:2"]) - links["S4:1<->S5:2"].extend_metadata({"reliability": 1, "bandwidth": 100, "delay": 30, "ownership": "A"}) - links["S4:2<->User1:2"] = Link(interfaces["S4:2"], interfaces["User1:2"]) - links["S4:2<->User1:2"].extend_metadata({"reliability": 3, "bandwidth": 100, "delay": 110, "ownership": "A"}) + links["S4:2<->User1:2"] = Link(interfaces["S4:2"], + interfaces["User1:2"]) links["S5:3<->S6:1"] = Link(interfaces["S5:3"], interfaces["S6:1"]) - links["S5:3<->S6:1"].extend_metadata({"reliability": 1, "bandwidth": 100, "delay": 40}) links["S5:4<->S6:2"] = Link(interfaces["S5:4"], interfaces["S6:2"]) - links["S5:4<->S6:2"].extend_metadata({"reliability": 3, "bandwidth": 100, "delay": 40, "ownership": "A"}) links["S5:5<->S8:2"] = Link(interfaces["S5:5"], interfaces["S8:2"]) - links["S5:5<->S8:2"].extend_metadata({"reliability": 5, "bandwidth": 100, "delay": 112}) - links["S5:6<->User1:3"] = Link(interfaces["S5:6"], interfaces["User1:3"]) - links["S5:6<->User1:3"].extend_metadata({"reliability": 3, "bandwidth": 100, "delay": 60}) + links["S5:6<->User1:3"] = Link(interfaces["S5:6"], + interfaces["User1:3"]) links["S6:3<->S9:1"] = Link(interfaces["S6:3"], interfaces["S9:1"]) - links["S6:3<->S9:1"].extend_metadata({"reliability": 3,"bandwidth": 100, "delay":60}) links["S6:4<->S9:2"] = Link(interfaces["S6:4"], interfaces["S9:2"]) - links["S6:4<->S9:2"].extend_metadata({"reliability": 5,"bandwidth": 100, "delay":62}) links["S6:5<->S10:1"] = Link(interfaces["S6:5"], interfaces["S10:1"]) - links["S6:5<->S10:1"].extend_metadata({"bandwidth": 100, "delay":108, "ownership":"A"}) links["S7:2<->S8:3"] = Link(interfaces["S7:2"], interfaces["S8:3"]) - links["S7:2<->S8:3"].extend_metadata({"reliability": 5, "bandwidth": 100, "delay": 1}) links["S8:4<->S9:3"] = Link(interfaces["S8:4"], interfaces["S9:3"]) - links["S8:4<->S9:3"].extend_metadata({"reliability": 3,"bandwidth": 100, "delay":32}) links["S8:5<->S9:4"] = Link(interfaces["S8:5"], interfaces["S9:4"]) - links["S8:5<->S9:4"].extend_metadata({"reliability": 3,"bandwidth": 100, "delay":110}) links["S8:6<->S10:2"] = Link(interfaces["S8:6"], interfaces["S10:2"]) - links["S8:6<->S10:2"].extend_metadata({"reliability": 5, "bandwidth": 100, "ownership":"A"}) links["S8:7<->S11:2"] = Link(interfaces["S8:7"], interfaces["S11:2"]) - links["S8:7<->S11:2"].extend_metadata({"reliability": 3, "bandwidth": 100, "delay": 7}) - - links["S8:8<->User3:2"] = Link(interfaces["S8:8"], interfaces["User3:2"]) - links["S8:8<->User3:2"].extend_metadata({"reliability": 5, "bandwidth": 100, "delay": 1}) - links["S10:3<->User2:1"] = Link(interfaces["S10:3"], interfaces["User2:1"]) - links["S10:3<->User2:1"].extend_metadata({"reliability": 3, "bandwidth": 100, "delay": 10, "ownership":"A"}) + links["S8:8<->User3:2"] = Link(interfaces["S8:8"], + interfaces["User3:2"]) - links["S11:3<->User2:2"] = Link(interfaces["S11:3"], interfaces["User2:2"]) - links["S11:3<->User2:2"].extend_metadata({"reliability": 3, "bandwidth": 100, "delay": 6}) + links["S10:3<->User2:1"] = Link(interfaces["S10:3"], + interfaces["User2:1"]) - links["User1:4<->User4:3"] = Link(interfaces["User1:4"], interfaces["User4:3"]) - links["User1:4<->User4:3"].extend_metadata({"reliability": 5, "bandwidth": 10, "delay": 105}) + links["S11:3<->User2:2"] = Link(interfaces["S11:3"], + interfaces["User2:2"]) - return (switches,links) + links["User1:4<->User4:3"] = Link(interfaces["User1:4"], + interfaces["User4:3"]) diff --git a/tests/integration/test_results_metadata.py b/tests/integration/test_results_metadata.py new file mode 100644 index 00000000..b86b168a --- /dev/null +++ b/tests/integration/test_results_metadata.py @@ -0,0 +1,165 @@ +"""Module to test the KytosGraph in graph.py.""" + +# module under test +from tests.integration.test_results import TestResults + + +class TestResultsMetadata(TestResults): + """Tests for the graph class. + + Tests if the metadata in search result edges have passing values. + """ + + def test_path1(self): + """Tests to see if the edges used in the paths of the result set + do not have poor reliability""" + reliabilities = [] + requirements = {"reliability": 3} + poor_reliability = 1 + + result = self.get_path_constrained("User1", "User2", base=requirements) + + if result: + for path in result[0]["paths"]: + for i in range(1, len(path)): + endpoint_a = path[i - 1] + endpoint_b = path[i] + meta_data = self.graph.get_link_metadata( + endpoint_a, endpoint_b) + if meta_data and "reliability" in meta_data.keys(): + reliabilities.append(meta_data["reliability"]) + + self.assertNotIn(poor_reliability, reliabilities) + + def test_path2(self): + """Tests to see if the edges used in the paths from User 1 to User 2 + have less than 30 delay.""" + delays = [] + requirements = {"delay": 29} + + result = self.get_path_constrained( + "User1", "User2", base=requirements) + + if result: + for path in result[0]["paths"]: + for i in range(1, len(path)): + endpoint_a = path[i - 1] + endpoint_b = path[i] + meta_data = self.graph.get_link_metadata( + endpoint_a, endpoint_b) + if meta_data and "delay" in meta_data.keys(): + delays.append(meta_data["delay"]) + + for delay in delays: + self.assertEqual(delay > requirements["delay"], False) + + def test_path3(self): + """Tests to see if the edges used in the paths from User 1 to User 2 + have at least 20 bandwidth.""" + bandwidths = [] + requirements = {"bandwidth": 20} + + result = self.get_path_constrained( + "User1", "User2", base=requirements) + + if result: + for path in result[0]["paths"]: + for i in range(1, len(path)): + endpoint_a = path[i - 1] + endpoint_b = path[i] + meta_data = self.graph.get_link_metadata( + endpoint_a, endpoint_b) + if meta_data and "bandwidth" in meta_data.keys(): + bandwidths.append(meta_data["bandwidth"]) + + for bandwidth in bandwidths: + self.assertEqual(bandwidth < requirements["bandwidth"], False) + + def test_path4(self): + """Tests to see if the edges used in the paths from User 1 to User 2 + have at least 20 bandwidth and under 30 delay.""" + bandwidths = [] + delays = [] + requirements = {"bandwidth": 20, "delay": 29} + + result = self.get_path_constrained( + "User1", "User2", base=requirements) + + if result: + for path in result[0]["paths"]: + for i in range(1, len(path)): + endpoint_a = path[i - 1] + endpoint_b = path[i] + meta_data = self.graph.get_link_metadata( + endpoint_a, endpoint_b) + if meta_data and "bandwidth" in meta_data.keys(): + bandwidths.append(meta_data["bandwidth"]) + elif meta_data and "delay" in meta_data.keys(): + delays.append(meta_data["delay"]) + + for bandwidth in bandwidths: + self.assertEqual(bandwidth < requirements["bandwidth"], False) + + for delay in delays: + self.assertEqual(delay > requirements["delay"], False) + + @staticmethod + def generate_topology(): + """Generates a predetermined topology""" + switches = {} + interfaces = {} + links = {} + + TestResults.create_switch("User1", switches) + TestResults.add_interfaces(3, switches["User1"], interfaces) + + TestResults.create_switch("S2", switches) + TestResults.add_interfaces(2, switches["S2"], interfaces) + + TestResults.create_switch("User2", switches) + TestResults.add_interfaces(3, switches["User2"], interfaces) + + TestResults.create_switch("S4", switches) + TestResults.add_interfaces(4, switches["S4"], interfaces) + + TestResults.create_switch("S5", switches) + TestResults.add_interfaces(2, switches["S5"], interfaces) + + TestResults.create_link("User1:1", "S2:1", interfaces, links) + TestResults.create_link("User1:2", "S5:1", interfaces, links) + TestResults.create_link("User1:3", "S4:1", interfaces, links) + TestResults.create_link("S2:2", "User2:1", interfaces, links) + TestResults.create_link("User2:2", "S4:2", interfaces, links) + TestResults.create_link("S5:2", "S4:3", interfaces, links) + TestResults.create_link("User2:3", "S4:4", interfaces, links) + + TestResults.add_metadata_to_link( + "User1:1", "S2:1", { + "reliability": 3, "ownership": "B", "delay": 30, + "bandwidth": 20}, links) + TestResults.add_metadata_to_link( + "User1:2", "S5:1", { + "reliability": 1, "ownership": "A", "delay": 5, + "bandwidth": 50}, links) + TestResults.add_metadata_to_link( + "User1:3", "S4:1", { + "reliability": 3, "ownership": "A", "delay": 60, + "bandwidth": 10}, links) + TestResults.add_metadata_to_link( + "S2:2", "User2:1", { + "reliability": 3, "ownership": "B", "delay": 30, + "bandwidth": 20}, links) + TestResults.add_metadata_to_link( + "User2:2", "S4:2", { + "reliability": 3, "ownership": "B", "delay": 30, + "bandwidth": 10}, links) + TestResults.add_metadata_to_link( + "S5:2", "S4:3", { + "reliability": 1, "ownership": "A", "delay": 10, + "bandwidth": 50}, links) + TestResults.add_metadata_to_link( + "User2:3", "S4:4", { + "reliability": 3, "ownership": "A", "delay": 29, + "bandwidth": 20}, links) + + return (switches, links) diff --git a/tests/integration/test_results_simple.py b/tests/integration/test_results_simple.py new file mode 100644 index 00000000..cc592497 --- /dev/null +++ b/tests/integration/test_results_simple.py @@ -0,0 +1,78 @@ +"""Module to test the KytosGraph in graph.py.""" +from kytos.core.link import Link + +# module under test +from tests.integration.test_results import TestResults + + +class TestResultsSimple(TestResults): + """Tests for the graph class. + + Tests if the paths returned have only legal edges. + """ + + def test_path1(self): + """Tests a simple, possible path""" + results = self.get_path_constrained("S1", "S2") + self.assertNotEqual(results, []) + + def test_path2(self): + """Tests a simple, impossible path""" + results = self.get_path_constrained("S1", "S4") + self.assertEqual(results, []) + + def test_path3(self): + """Tests a path to self again""" + results = self.get_path_constrained("S1", "S1") + self.assertNotEqual(results, []) + + def test_path4(self): + """Tests constrained path to self again""" + results = self.get_path_constrained( + "S5", "S5", base={"ownership": "blue"}) + for result in results: + self.assertNotEqual([], result["paths"]) + self.assertIn(['S5'], result["paths"]) + + def test_path5(self): + """Tests constrained path to self again""" + results = self.get_path_constrained( + "S5", "S5", flexible={"priority": 5}) + for result in results: + self.assertNotEqual([], result["paths"]) + self.assertIn(['S5'], result["paths"]) + + @staticmethod + def generate_topology(): + """Generates a predetermined topology""" + switches = {} + interfaces = {} + links = {} + + TestResults.create_switch("S1", switches) + TestResults.add_interfaces(2, switches["S1"], interfaces) + + TestResults.create_switch("S2", switches) + TestResults.add_interfaces(3, switches["S2"], interfaces) + + TestResults.create_switch("S3", switches) + TestResults.add_interfaces(2, switches["S3"], interfaces) + + TestResults.create_switch("S4", switches) + TestResults.add_interfaces(2, switches["S4"], interfaces) + + TestResults.create_switch("S5", switches) + + links["S1:1<->S2:1"] = Link(interfaces["S1:1"], interfaces["S2:1"]) + links["S1:1<->S2:1"].extend_metadata( + {"bandwidth": 50, "ownership": "red"}) + + links["S3:1<->S2:2"] = Link(interfaces["S3:1"], interfaces["S2:2"]) + links["S3:1<->S2:2"].extend_metadata( + {"bandwidth": 51, "ownership": "blue"}) + + links["S1:2<->S3:2"] = Link(interfaces["S1:2"], interfaces["S3:2"]) + links["S1:2<->S3:2"].extend_metadata( + {"bandwidth": 49, "ownership": "blue"}) + + return (switches, links) diff --git a/tests/test_graph.py b/tests/test_graph.py deleted file mode 100644 index 97bc74ce..00000000 --- a/tests/test_graph.py +++ /dev/null @@ -1,74 +0,0 @@ -"""Module to test the KytosGraph in graph.py.""" -from unittest import TestCase -from unittest.mock import Mock - -import networkx as nx - -# module under test -from graph import KytosGraph - -# Core modules to import -from kytos.core.switch import Switch -from kytos.core.interface import Interface -from kytos.core.link import Link - -class TestKytosGraph(TestCase): - - def setup(self): - """Setup for most tests""" - switches, links = self.generateTopology() - self.graph = KytosGraph() - self.graph.clear() - self.graph.update_nodes(switches) - self.graph.update_links(links) - self.graph.set_path_fun(nx.shortest_simple_paths) - - def get_path(self, source, destination): - # print(f"Attempting path between {source} and {destination}.") - results = self.graph.shortest_paths(source,destination) - # print(f"Results: {results}") - return results - - def get_path_constrained(self, source, destination, flexible = 0, **metrics): - # print(f"Attempting path between {source} and {destination}.") - # print(f"Filtering with the following metrics: {metrics}") - # print(f"Flexible is set to {flexible}") - results = self.graph.constrained_flexible_paths(source, destination,{},metrics,flexible) - # print(f"Results: {results}") - return results - - def get_path_constrained2(self, source, destination, metrics, flexible_metrics): - return self.graph.constrained_flexible_paths(source, destination, metrics, flexible_metrics) - - def test_setup(self): - """Provides information on default test setup""" - self.setup() - # print("Nodes in graph") - # for node in self.graph.graph.nodes: - # print(node) - # print("Edges in graph") - # for edge in self.graph.graph.edges(data=True): - # print(edge) - - @staticmethod - def generateTopology(): - """Generates a predetermined topology""" - switches = {} - interfaces = {} - links = {} - return (switches,links) - - @staticmethod - def createSwitch(name,switches): - switches[name] = Switch(name) - # print("Creating Switch: ", name) - - @staticmethod - def addInterfaces(count,switch,interfaces): - for x in range(1,count + 1): - str1 = "{}:{}".format(switch.dpid,x) - # print("Creating Interface: ", str1) - iFace = Interface(str1,x,switch) - interfaces[str1] = iFace - switch.update_interface(iFace) - diff --git a/tests/test_graph1.py b/tests/test_graph1.py deleted file mode 100644 index 75b376db..00000000 --- a/tests/test_graph1.py +++ /dev/null @@ -1,133 +0,0 @@ -"""Module to test the KytosGraph in graph.py.""" -from unittest import TestCase -from unittest.mock import Mock - -import networkx as nx - -# module under test -from graph import KytosGraph - -from tests.test_graph import TestKytosGraph - -# Core modules to import -from kytos.core.switch import Switch -from kytos.core.interface import Interface -from kytos.core.link import Link - -class TestGraph1(TestKytosGraph): - - def test_path1(self): - """Tests a simple, definetly possible path""" - self.setup() - results = self.get_path("S1","S2") - self.assertNotEqual(results, []) - - def test_constrained_path1(self): - """Tests a simple, definetly possible path""" - self.setup() - results = self.get_path_constrained("S1","S2") - self.assertNotEqual(results, []) - - def test_path2(self): - """Tests a simple, impossible path""" - self.setup() - results = self.get_path("S1","S4") - self.assertEqual(results, []) - - def test_constrained_path2(self): - """Tests a simple, impossible path""" - self.setup() - results = self.get_path_constrained("S1","S4") - self.assertEqual(results, []) - - def test_path3(self): - """Tests a path to self""" - self.setup() - results = self.get_path("S4","S4") - self.assertNotEqual(results, []) - - def test_constrained_path3(self): - """Tests a path to self""" - self.setup() - results = self.get_path_constrained("S4","S4") - self.assertNotEqual(results, []) - - def test_path4(self): - """Tests a path to self again""" - self.setup() - results = self.get_path("S1","S1") - self.assertNotEqual(results, []) - - def test_constrained_path4(self): - """Tests a path to self again""" - self.setup() - results = self.get_path_constrained("S1","S1") - self.assertNotEqual(results, []) - - def test_constrained_path5(self): - """Tests constrained path""" - self.setup() - results = self.get_path_constrained("S1","S3", 0, bandwidth = 50) - for result in results: - self.assertNotIn(['S1', 'S1:2', 'S3:2', 'S3'], result["paths"]) - - def test_constrained_path6(self): - """Tests constrained path""" - self.setup() - results = self.get_path_constrained("S1","S2", 0, ownership = "red") - for result in results: - self.assertNotIn(['S1', 'S1:2', 'S3:2', 'S3', 'S3:1', 'S2:2', 'S2'],result["paths"]) - - def test_constrained_path7(self): - """Tests constrained path""" - self.setup() - results = self.get_path_constrained("S1","S2", 0, ownership = "blue") - for result in results: - self.assertNotIn(['S1', 'S1:1', 'S2:1', 'S2'],result["paths"]) - - def test_constrained_path8(self): - """Tests constrained path, to self AGAIN""" - self.setup() - results = self.get_path_constrained("S5","S5", 0, ownership = "blue") - for result in results: - self.assertNotEqual([],result["paths"]) - self.assertIn(['S5'],result["paths"]) - - def test_constrained_path9(self): - """Tests constrained path""" - self.setup() - results = self.get_path_constrained("S1","S2", 1, ownership = "blue") - - @staticmethod - def generateTopology(): - """Generates a predetermined topology""" - switches = {} - interfaces = {} - - TestKytosGraph.createSwitch("S1",switches) - TestKytosGraph.addInterfaces(2, switches["S1"], interfaces) - - TestKytosGraph.createSwitch("S2",switches) - TestKytosGraph.addInterfaces(3, switches["S2"], interfaces) - - TestKytosGraph.createSwitch("S3",switches) - TestKytosGraph.addInterfaces(2, switches["S3"], interfaces) - - TestKytosGraph.createSwitch("S4",switches) - TestKytosGraph.addInterfaces(2, switches["S4"], interfaces) - - TestKytosGraph.createSwitch("S5",switches) - - links = {} - - links["S1:1<->S2:1"] = Link(interfaces["S1:1"], interfaces["S2:1"]) - links["S1:1<->S2:1"].extend_metadata({"bandwidth":50,"ownership":"red"}) - - links["S3:1<->S2:2"] = Link(interfaces["S3:1"], interfaces["S2:2"]) - links["S3:1<->S2:2"].extend_metadata({"bandwidth":51,"ownership":"blue"}) - - links["S1:2<->S3:2"] = Link(interfaces["S1:2"], interfaces["S3:2"]) - links["S1:2<->S3:2"].extend_metadata({"bandwidth":49,"ownership":"blue"}) - - return (switches,links) - diff --git a/tests/test_graph3.py b/tests/test_graph3.py deleted file mode 100644 index e605a4f2..00000000 --- a/tests/test_graph3.py +++ /dev/null @@ -1,196 +0,0 @@ -"""Module to test the KytosGraph in graph.py.""" -from unittest import TestCase -from unittest.mock import Mock - -import networkx as nx - -# module under test -from graph import KytosGraph - -from tests.test_graph import TestKytosGraph - -# Core modules to import -from kytos.core.switch import Switch -from kytos.core.interface import Interface -from kytos.core.link import Link - -class TestGraph3(TestKytosGraph): - - def test_path9(self): - """Tests to see if an illegal path is not in the set of paths that use only edges owned by A.""" - #Arrange - self.test_setup() - illegal_path = ['User1', 'User1:1', 'S2:1', 'S2', 'S2:2', 'User2:1', 'User2'] - #Act - result = self.get_path_constrained("User1", "User2", 0, ownership = 'A') - #Assert - self.assertNotIn(illegal_path, result[0]["paths"]) - - def test_path10(self): - """Tests to see if the edges used in the paths of the result set do not have poor reliability""" - #Arrange - self.test_setup() - reliabilities = [] - poor_reliability = 1 - key = "reliability" - - #Act - result = self.get_path_constrained("User1", "User2", 0, reliability = 3) - - if result: - for path in result[0]["paths"]: - for i in range(1, len(path)): - endpoint_a = path[i-1] - endpoint_b = path[i] - meta_data = self.graph.get_metadata_from_link(endpoint_a, endpoint_b) - if meta_data and key in meta_data.keys(): - reliabilities.append(meta_data[key]) - - #Assert - self.assertNotIn(poor_reliability, reliabilities) - - def test_path11(self): - """Tests to see if the edges used in the paths from User 1 to User 2 have less than 30 delay.""" - #Arrange - self.test_setup() - delays = [] - delay_cap = 29 - key = "delay" - has_bad_delay = False - - #Act - result = self.get_path_constrained("User1", "User2", 0, delay = delay_cap) - - if result: - for path in result[0]["paths"]: - for i in range(1, len(path)): - endpoint_a = path[i-1] - endpoint_b = path[i] - meta_data = self.graph.get_metadata_from_link(endpoint_a, endpoint_b) - if meta_data and key in meta_data.keys(): - delays.append(meta_data[key]) - - #Assert - for delay in delays: - has_bad_delay = delay > delay_cap - self.assertEqual(has_bad_delay, False) - - def test_path12(self): - """Tests to see if the edges used in the paths from User 1 to User 2 have at least 20 bandwidth.""" - #Arrange - self.test_setup() - bandwidths = [] - bandwidth_floor = 20 - key = "bandwidth" - has_bad_bandwidth = False - - #Act - result = self.get_path_constrained("User1", "User2", 0, bandwidth = bandwidth_floor) - - if result: - for path in result[0]["paths"]: - for i in range(1, len(path)): - endpoint_a = path[i-1] - endpoint_b = path[i] - meta_data = self.graph.get_metadata_from_link(endpoint_a, endpoint_b) - if meta_data and key in meta_data.keys(): - bandwidths.append(meta_data[key]) - - #Assert - for bandwidth in bandwidths: - has_bad_bandwidth = bandwidth < bandwidth_floor - self.assertEqual(has_bad_bandwidth, False) - - def test_path13(self): - """Tests to see if the edges used in the paths from User 1 to User 2 have at least 20 bandwidth - and under 30 delay.""" - #Arrange - self.test_setup() - bandwidths = [] - delays = [] - bandwidth_floor = 20 - key_a = "bandwidth" - delay_cap = 29 - key_b = "delay" - has_bad_bandwidth = False - has_bad_delay = False - - #Act - result = self.get_path_constrained("User1", "User2", 0, bandwidth = bandwidth_floor, delay = delay_cap) - - if result: - for path in result[0]["paths"]: - for i in range(1, len(path)): - endpoint_a = path[i-1] - endpoint_b = path[i] - meta_data = self.graph.get_metadata_from_link(endpoint_a, endpoint_b) - if meta_data and key_a in meta_data.keys(): - bandwidths.append(meta_data[key_a]) - elif meta_data and key_b in meta_data.keys(): - delays.append(meta_data[key_b]) - - #Assert - for bandwidth in bandwidths: - has_bad_bandwidth = bandwidth < bandwidth_floor - self.assertEqual(has_bad_bandwidth, False) - - for delay in delays: - has_bad_delay = delay > delay_cap - self.assertEqual(has_bad_delay, False) - - @staticmethod - def createLink(interface_a, interface_b, interfaces, links): - compounded = "{}|{}".format(interface_a, interface_b) - final_name = compounded - links[final_name] = Link(interfaces[interface_a], interfaces[interface_b]) - - @staticmethod - def addMetadataToLink(interface_a, interface_b, metrics, links): - compounded = "{}|{}".format(interface_a, interface_b) - links[compounded].extend_metadata(metrics) - - @staticmethod - def generateTopology(): - switches = {} - interfaces = {} - links = {} - - TestKytosGraph.createSwitch("User1", switches) - TestKytosGraph.addInterfaces(3, switches["User1"], interfaces) - - TestKytosGraph.createSwitch("S2", switches) - TestKytosGraph.addInterfaces(2, switches["S2"], interfaces) - - TestKytosGraph.createSwitch("User2", switches) - TestKytosGraph.addInterfaces(3, switches["User2"], interfaces) - - TestKytosGraph.createSwitch("S4", switches) - TestKytosGraph.addInterfaces(4, switches["S4"], interfaces) - - TestKytosGraph.createSwitch("S5", switches) - TestKytosGraph.addInterfaces(2, switches["S5"], interfaces) - - TestGraph3.createLink("User1:1", "S2:1", interfaces, links) - TestGraph3.createLink("User1:2", "S5:1", interfaces, links) - TestGraph3.createLink("User1:3", "S4:1", interfaces, links) - TestGraph3.createLink("S2:2", "User2:1", interfaces, links) - TestGraph3.createLink("User2:2", "S4:2", interfaces, links) - TestGraph3.createLink("S5:2", "S4:3", interfaces, links) - TestGraph3.createLink("User2:3", "S4:4", interfaces, links) - - TestGraph3.addMetadataToLink("User1:1", "S2:1", {"reliability": 3, "ownership": "B", - "delay": 30, "bandwidth": 20}, links) - TestGraph3.addMetadataToLink("User1:2", "S5:1", {"reliability": 1, "ownership": "A", - "delay": 5, "bandwidth": 50}, links) - TestGraph3.addMetadataToLink("User1:3", "S4:1", {"reliability": 3, "ownership": "A", - "delay": 60, "bandwidth": 10}, links) - TestGraph3.addMetadataToLink("S2:2", "User2:1", {"reliability": 3, "ownership": "B", - "delay": 30, "bandwidth": 20}, links) - TestGraph3.addMetadataToLink("User2:2", "S4:2", {"reliability": 3, "ownership": "B", - "delay": 30, "bandwidth": 10}, links) - TestGraph3.addMetadataToLink("S5:2", "S4:3", {"reliability": 1, "ownership": "A", - "delay": 10, "bandwidth": 50}, links) - TestGraph3.addMetadataToLink("User2:3", "S4:4", {"reliability": 3, "ownership": "A", - "delay": 29, "bandwidth": 20}, links) - - return (switches, links) diff --git a/tests/unit/test_filter.py b/tests/unit/test_filter.py new file mode 100644 index 00000000..aa01aac2 --- /dev/null +++ b/tests/unit/test_filter.py @@ -0,0 +1,30 @@ +"""Test filter methods""" +from unittest import TestCase + +from napps.kytos.pathfinder.graph import Filter +from tests.helpers import get_test_filter_function + + +class TestFilter(TestCase): + """Tests for the Main class.""" + + def setUp(self): + """Execute steps before each test.""" + filter_type = (int, float) + filter_function = get_test_filter_function() + self.filter = Filter(filter_type, filter_function) + + def test_run_success_case(self): + """Test filtering with valid minimum type.""" + items = [8, 9, 10, 11, 12] + minimum = 10 + result = self.filter.run(minimum, items) + expected_result = [10, 11, 12] + self.assertEqual(list(result), expected_result) + + def test_run_failure_case(self): + """Test filtering with invalid minimum type.""" + items = [8, 9, 10, 11, 12] + minimum = "apple" + with self.assertRaises(TypeError): + self.filter.run(minimum, items) diff --git a/tests/unit/test_graph.py b/tests/unit/test_graph.py index d61e7a00..2bccc05b 100644 --- a/tests/unit/test_graph.py +++ b/tests/unit/test_graph.py @@ -1,12 +1,14 @@ """Test Graph methods.""" from unittest import TestCase -from unittest.mock import call, patch +from unittest.mock import MagicMock, call, patch from napps.kytos.pathfinder.graph import KytosGraph -from tests.helpers import get_topology_mock - +from tests.helpers import (get_filter_links_fake, get_topology_mock, + get_topology_with_metadata_mock) # pylint: disable=arguments-differ, protected-access + + class TestGraph(TestCase): """Tests for the Main class.""" @@ -82,3 +84,45 @@ def test_shortest_paths(self, mock_shortest_simple_paths): mock_shortest_simple_paths.assert_called_with(self.kytos_graph.graph, source, dest, None) self.assertEqual(shortest_paths, ["any"]) + + @patch('graph.combinations', autospec=True) + def test_constrained_flexible_paths(self, mock_combinations): + """Test shortest constrained paths.""" + source, dest = "00:00:00:00:00:00:00:01:1", "00:00:00:00:00:00:00:02:2" + minimum_hits = 1 + base_metrics = {"apple": 1} + flexible_metrics = {"pear": 2} + mock_combinations.return_value = [(('pear', 2),)] + filtered_links = ["a", "b", "c"] + filtered_links_without_metadata = filtered_links[0:2] + constrained_shortest_paths = [["path1"], ["path2"]] + + self.kytos_graph._constrained_shortest_paths = MagicMock( + return_value=constrained_shortest_paths) + self.kytos_graph._filter_links = MagicMock( + side_effect=get_filter_links_fake) + shortest_paths = self.kytos_graph.constrained_flexible_paths( + source, dest, minimum_hits, base=base_metrics, + flexible=flexible_metrics) + + self.kytos_graph._constrained_shortest_paths.assert_has_calls([ + call(source, dest, filtered_links_without_metadata)]) + self.kytos_graph._filter_links.assert_called() + self.assertEqual(shortest_paths, [ + {"paths": constrained_shortest_paths, "metrics": + {"apple": 1, "pear": 2}}]) + + def test_get_link_metadata(self): + """Test metadata retrieval.""" + topology = get_topology_with_metadata_mock() + self.kytos_graph.update_nodes(topology.switches) + self.kytos_graph.update_links(topology.links) + endpoint_a = "S1:1" + endpoint_b = "S2:1" + metadata = {"reliability": 5, "bandwidth": 100, "delay": 105} + self.kytos_graph.get_link_metadata = MagicMock(return_value=metadata) + + result = self.kytos_graph.get_link_metadata( + endpoint_a, endpoint_b) + + self.assertEqual(result, metadata) diff --git a/tests/unit/test_main.py b/tests/unit/test_main.py index 47a4c827..d2c27e45 100644 --- a/tests/unit/test_main.py +++ b/tests/unit/test_main.py @@ -52,6 +52,33 @@ def test_shortest_path(self, mock_shortest_paths): self.assertEqual(response.json, expected_response) self.assertEqual(response.status_code, 200) + @patch('napps.kytos.pathfinder.graph.KytosGraph.' + + 'constrained_flexible_paths', autospec=True) + def test_shortest_constrained_path(self, mock_constrained_flexible_paths): + """Test constrained flexible paths.""" + source = "00:00:00:00:00:00:00:01:1" + destination = "00:00:00:00:00:00:00:02:1" + path = [source, destination] + base_metrics = {"ownership": "bob"} + fle_metrics = {"delay": 30} + metrics = {**base_metrics, **fle_metrics} + mock_constrained_flexible_paths.return_value = [ + {"paths": [path], "metrics": metrics}] + + api = get_test_client(self.napp.controller, self.napp) + url = "http://127.0.0.1:8181/api/kytos/pathfinder/v2/" +\ + "best-constrained-paths" + data = {"source": "00:00:00:00:00:00:00:01:1", + "destination": "00:00:00:00:00:00:00:02:1", + "base_metrics": {"ownership": "bob"}, + "flexible_metrics": {"delay": 30}, + "minimum_flexible_hits": 1} + response = api.open(url, method='POST', json=data) + expected_response = [{"metrics": metrics, "paths": [path]}] + + self.assertEqual(response.json, expected_response) + self.assertEqual(response.status_code, 200) + def test_filter_paths(self): """Test filter paths.""" self.napp._topology = get_topology_mock()