diff --git a/CHANGELOG.rst b/CHANGELOG.rst
index 41ae897..cc36a5c 100644
--- a/CHANGELOG.rst
+++ b/CHANGELOG.rst
@@ -12,6 +12,7 @@ Added
- Publish event ``kytos/topology.current`` for topology reconciliation
- Subscribed to event ``kytos/topology.get`` to publish the current topology
- Added ``notified_up_at`` internal reserved metadata
+- Enabling/disabling a switch or an interface will send ``link_up`` and ``link_down`` notifications
Changed
=======
diff --git a/README.rst b/README.rst
index aa16dce..6e7843d 100644
--- a/README.rst
+++ b/README.rst
@@ -63,6 +63,7 @@ Subscribed
- ``kytos/.*.liveness.(up|down)``
- ``kytos/.*.liveness.disabled``
- ``kytos/topology.get``
+- ``kytos/topology.notify_link_up_if_status``
Published
@@ -196,3 +197,18 @@ Content:
:target: https://github.com/kytos-ng/topology
.. |Tag| image:: https://img.shields.io/github/tag/kytos-ng/topology.svg
:target: https://github.com/kytos-ng/topology/tags
+
+
+kytos/topology.notify_link_up_if_status
+~~~~~~~~~~~~~~~~~~~~~~~~
+
+Event reporting that the link was changed to 'down'. It contains the link instance.
+
+Content:
+
+.. code-block:: python3
+
+ {
+ 'reason': 'link_enabled'
+ 'link':
+ }
diff --git a/main.py b/main.py
index 48e1121..eccf7c2 100644
--- a/main.py
+++ b/main.py
@@ -270,6 +270,7 @@ def enable_switch(self, dpid):
self.notify_switch_enabled(dpid)
self.notify_topology_update()
+ self.notify_switch_links_status(switch, "link enabled")
return jsonify("Operation successful"), 201
@rest('v3/switches//disable', methods=['POST'])
@@ -284,6 +285,7 @@ def disable_switch(self, dpid):
self.notify_switch_disabled(dpid)
self.notify_topology_update()
+ self.notify_switch_links_status(switch, "link disabled")
return jsonify("Operation successful"), 201
@rest('v3/switches//metadata')
@@ -358,12 +360,14 @@ def enable_interface(self, interface_enable_id=None, dpid=None):
interface = switch.interfaces[interface_number]
self.topo_controller.enable_interface(interface.id)
interface.enable()
+ self.notify_interface_link_status(interface, "link enabled")
except KeyError:
msg = f"Switch {dpid} interface {interface_number} not found"
return jsonify(msg), 404
else:
for interface in switch.interfaces.copy().values():
interface.enable()
+ self.notify_interface_link_status(interface, "link enabled")
self.topo_controller.upsert_switch(switch.id, switch.as_dict())
self.notify_topology_update()
return jsonify("Operation successful"), 200
@@ -386,12 +390,14 @@ def disable_interface(self, interface_disable_id=None, dpid=None):
interface = switch.interfaces[interface_number]
self.topo_controller.disable_interface(interface.id)
interface.disable()
+ self.notify_interface_link_status(interface, "link disabled")
except KeyError:
msg = f"Switch {dpid} interface {interface_number} not found"
return jsonify(msg), 404
else:
for interface in switch.interfaces.copy().values():
interface.disable()
+ self.notify_interface_link_status(interface, "link disabled")
self.topo_controller.upsert_switch(switch.id, switch.as_dict())
self.notify_topology_update()
return jsonify("Operation successful"), 200
@@ -765,7 +771,7 @@ def link_status_hook_link_up_timer(self, link) -> Optional[EntityStatus]:
return EntityStatus.DOWN
return None
- def notify_link_up_if_status(self, link) -> None:
+ def notify_link_up_if_status(self, link, reason="link up") -> None:
"""Tries to notify link up and topology changes based on its status
Currently, it needs to wait up to a timer."""
@@ -784,7 +790,7 @@ def notify_link_up_if_status(self, link) -> None:
link.update_metadata(key, now())
self.topo_controller.add_link_metadata(link.id, {key: notified_at})
self.notify_topology_update()
- self.notify_link_status_change(link, reason="link up")
+ self.notify_link_status_change(link, reason)
def handle_link_up(self, interface):
"""Handle link up for an interface."""
@@ -807,7 +813,7 @@ def handle_link_up(self, interface):
link.extend_metadata(metadata)
link.activate()
self.topo_controller.activate_link(link.id, **metadata)
- self.notify_link_up_if_status(link)
+ self.notify_link_up_if_status(link, "link up")
@listen_to('.*.switch.interface.link_down')
def on_interface_link_down(self, event):
@@ -911,7 +917,7 @@ def add_links(self, event):
}
link.extend_metadata(metadata)
self.topo_controller.upsert_link(link.id, link.as_dict())
- self.notify_link_up_if_status(link)
+ self.notify_link_up_if_status(link, "link up")
@listen_to('.*.of_lldp.network_status.updated')
def on_lldp_status_updated(self, event):
@@ -949,6 +955,19 @@ def notify_switch_enabled(self, dpid):
event = KytosEvent(name=name, content={'dpid': dpid})
self.controller.buffers.app.put(event)
+ def notify_switch_links_status(self, switch, reason):
+ """Send an event to notify the status of a link in a switch"""
+ with self._links_lock:
+ for link in self.links.values():
+ if switch in (link.endpoint_a.switch, link.endpoint_b.switch):
+ if reason == "link enabled":
+ name = 'kytos/topology.notify_link_up_if_status'
+ content = {'reason': reason, "link": link}
+ event = KytosEvent(name=name, content=content)
+ self.controller.buffers.app.put(event)
+ else:
+ self.notify_link_status_change(link, reason)
+
def notify_switch_disabled(self, dpid):
"""Send an event to notify that a switch is disabled."""
name = 'kytos/topology.switch.disabled'
@@ -962,6 +981,19 @@ def notify_topology_update(self):
self._get_topology()})
self.controller.buffers.app.put(event)
+ def notify_interface_link_status(self, interface, reason):
+ """Send an event to notify the status of a link from
+ an interface."""
+ link = self._get_link_from_interface(interface)
+ if link:
+ if reason == "link enabled":
+ name = 'kytos/topology.notify_link_up_if_status'
+ content = {'reason': reason, "link": link}
+ event = KytosEvent(name=name, content=content)
+ self.controller.buffers.app.put(event)
+ else:
+ self.notify_link_status_change(link, reason)
+
def notify_link_status_change(self, link, reason='not given'):
"""Send an event to notify about a status change on a link."""
name = 'kytos/topology.'
@@ -999,6 +1031,13 @@ def notify_metadata_changes(self, obj, action):
self.controller.buffers.app.put(event)
log.debug(f'Metadata from {obj.id} was {action}.')
+ @listen_to('kytos/topology.notify_link_up_if_status')
+ def on_notify_link_up_if_status(self, event):
+ """Tries to notify link up and topology changes"""
+ link = event.content["link"]
+ reason = event.content["reason"]
+ self.notify_link_up_if_status(link, reason)
+
@listen_to('.*.switch.port.created')
def on_notify_port_created(self, event):
"""Notify when a port is created."""
diff --git a/tests/integration/test_main.py b/tests/integration/test_main.py
index 5eba8f0..8305cf1 100644
--- a/tests/integration/test_main.py
+++ b/tests/integration/test_main.py
@@ -154,7 +154,8 @@ def test_get_event_listeners(self):
'.*.switch.interface.link_down',
'.*.switch.interface.link_up',
'.*.switch.(new|reconnected)',
- '.*.switch.port.created']
+ '.*.switch.port.created',
+ 'kytos/topology.notify_link_up_if_status']
self.assertCountEqual(expected_events, actual_events)
def test_verify_api_urls(self):
diff --git a/tests/unit/test_main.py b/tests/unit/test_main.py
index dddfe4b..9884df9 100644
--- a/tests/unit/test_main.py
+++ b/tests/unit/test_main.py
@@ -87,7 +87,8 @@ def test_get_event_listeners(self):
'kytos/.*.liveness.(up|down)',
'kytos/.*.liveness.disabled',
'kytos/topology.get',
- '.*.switch.port.created']
+ '.*.switch.port.created',
+ 'kytos/topology.notify_link_up_if_status']
actual_events = self.napp.listeners()
self.assertCountEqual(expected_events, actual_events)
@@ -625,8 +626,9 @@ def test_fail_load_link(self, get_link_or_create_mock):
with self.assertRaises(RestoreError):
self.napp._load_link(link_attrs_fail)
+ @patch('napps.kytos.topology.main.Main.notify_switch_links_status')
@patch('napps.kytos.topology.main.Main.notify_topology_update')
- def test_enable_switch(self, mock_notify_topo):
+ def test_enable_switch(self, mock_notify_topo, mock_sw_l_status):
"""Test enable_switch."""
dpid = "00:00:00:00:00:00:00:01"
mock_switch = get_switch_mock(dpid)
@@ -639,6 +641,7 @@ def test_enable_switch(self, mock_notify_topo):
self.assertEqual(mock_switch.enable.call_count, 1)
self.napp.topo_controller.enable_switch.assert_called_once_with(dpid)
mock_notify_topo.assert_called()
+ mock_sw_l_status.assert_called()
# fail case
mock_switch.enable.call_count = 0
@@ -648,8 +651,9 @@ def test_enable_switch(self, mock_notify_topo):
self.assertEqual(response.status_code, 404, response.data)
self.assertEqual(mock_switch.enable.call_count, 0)
+ @patch('napps.kytos.topology.main.Main.notify_switch_links_status')
@patch('napps.kytos.topology.main.Main.notify_topology_update')
- def test_disable_switch(self, mock_notify_topo):
+ def test_disable_switch(self, mock_notify_topo, mock_sw_l_status):
"""Test disable_switch."""
dpid = "00:00:00:00:00:00:00:01"
mock_switch = get_switch_mock(dpid)
@@ -662,6 +666,7 @@ def test_disable_switch(self, mock_notify_topo):
self.assertEqual(mock_switch.disable.call_count, 1)
self.napp.topo_controller.disable_switch.assert_called_once_with(dpid)
mock_notify_topo.assert_called()
+ mock_sw_l_status.assert_called()
# fail case
mock_switch.disable.call_count = 0
@@ -1569,17 +1574,65 @@ def test_notify_link_up_if_status(
link = MagicMock(status=EntityStatus.UP)
link.get_metadata.return_value = now()
- assert not self.napp.notify_link_up_if_status(link)
+ assert not self.napp.notify_link_up_if_status(link, "link up")
link.update_metadata.assert_not_called()
mock_notify_topo.assert_not_called()
mock_notify_link.assert_not_called()
link = MagicMock(status=EntityStatus.UP)
link.get_metadata.return_value = now() - timedelta(seconds=60)
- assert not self.napp.notify_link_up_if_status(link)
+ assert not self.napp.notify_link_up_if_status(link, "link up")
link.update_metadata.assert_called()
self.napp.topo_controller.add_link_metadata.assert_called()
mock_notify_topo.assert_called()
mock_notify_link.assert_called()
assert mock_sleep.call_count == 2
+
+ @patch('napps.kytos.topology.main.Main.notify_link_status_change')
+ def test_notify_switch_links_status(self, mock_notify_link_status_change):
+ """Test switch links notification when switch status change"""
+ buffers_app_mock = MagicMock()
+ self.napp.controller.buffers.app = buffers_app_mock
+ dpid = "00:00:00:00:00:00:00:01"
+ mock_switch = get_switch_mock(dpid)
+ link1 = MagicMock()
+ link1.endpoint_a.switch = mock_switch
+ self.napp.links = {1: link1}
+
+ self.napp.notify_switch_links_status(mock_switch, "link enabled")
+ assert self.napp.controller.buffers.app.put.call_count == 1
+
+ self.napp.notify_switch_links_status(mock_switch, "link disabled")
+ assert self.napp.controller.buffers.app.put.call_count == 1
+ assert mock_notify_link_status_change.call_count == 1
+
+ # Without notification
+ link1.endpoint_a.switch = None
+ self.napp.notify_switch_links_status(mock_switch, "link enabled")
+ assert self.napp.controller.buffers.app.put.call_count == 1
+
+ @patch('napps.kytos.topology.main.Main.notify_link_status_change')
+ @patch('napps.kytos.topology.main.Main._get_link_from_interface')
+ def test_notify_interface_link_status(self, *args):
+ """Test interface links notification when enable"""
+ (mock_get_link_from_interface,
+ mock_notify_link_status_change) = args
+ buffers_app_mock = MagicMock()
+ self.napp.controller.buffers.app = buffers_app_mock
+ mock_link = MagicMock()
+ mock_get_link_from_interface.return_value = mock_link
+ self.napp.notify_interface_link_status(MagicMock(), "link enabled")
+ assert mock_get_link_from_interface.call_count == 1
+ assert self.napp.controller.buffers.app.put.call_count == 1
+
+ self.napp.notify_interface_link_status(MagicMock(), "link disabled")
+ assert mock_get_link_from_interface.call_count == 2
+ assert mock_notify_link_status_change.call_count == 1
+ assert self.napp.controller.buffers.app.put.call_count == 1
+
+ # Without notification
+ mock_get_link_from_interface.return_value = None
+ self.napp.notify_interface_link_status(MagicMock(), "link enabled")
+ assert mock_get_link_from_interface.call_count == 3
+ assert self.napp.controller.buffers.app.put.call_count == 1