Skip to content

Commit

Permalink
Merge pull request #114 from kytos-ng/fix/link_up_notify
Browse files Browse the repository at this point in the history
[Fix] Link up notifications `LINK_UP_TIMER`
  • Loading branch information
viniarck authored Dec 1, 2022
2 parents 7054ad7 + a7d0b0a commit 045ae57
Show file tree
Hide file tree
Showing 3 changed files with 129 additions and 55 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,11 @@ Added
=====
- Publish event ``kytos/topology.current`` for topology reconciliation
- Subscribed to event ``kytos/topology.get`` to publish the current topology
- Added ``notified_up_at`` internal reserved metadata

Changed
=======
- Hooked ``link_status_hook_link_up_timer`` to update ``status`` accordingly.

Deprecated
==========
Expand All @@ -23,6 +25,7 @@ Removed

Fixed
=====
- Fixed link up to only notify when ``LINK_UP_TIMER`` has passed

Security
========
Expand Down
98 changes: 62 additions & 36 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
# pylint: disable=wrong-import-order

import time
from collections import defaultdict
from datetime import timezone
from threading import Lock
from typing import List, Optional

Expand All @@ -14,7 +16,7 @@
from kytos.core import KytosEvent, KytosNApp, log, rest
from kytos.core.common import EntityStatus
from kytos.core.exceptions import KytosLinkCreationError
from kytos.core.helpers import listen_to
from kytos.core.helpers import listen_to, now
from kytos.core.interface import Interface
from kytos.core.link import Link
from kytos.core.switch import Switch
Expand All @@ -40,9 +42,11 @@ def setup(self):
self.link_up_timer = getattr(settings, 'LINK_UP_TIMER',
DEFAULT_LINK_UP_TIMER)

self._lock = Lock()
self._links_lock = Lock()
self._links_notify_lock = defaultdict(Lock)
self.topo_controller = self.get_topo_controller()
Link.register_status_func(f"{self.napp_id}_link_up_timer",
self.link_status_hook_link_up_timer)
self.topo_controller.bootstrap_indexes()
self.load_topology()

Expand Down Expand Up @@ -748,8 +752,41 @@ def handle_switch_maintenance_end(self, event):
interface.enable()
self.handle_link_up(interface)

def link_status_hook_link_up_timer(self, link) -> Optional[EntityStatus]:
"""Link status hook link up timer."""
tnow = time.time()
if (
link.is_active()
and link.is_enabled()
and "last_status_change" in link.metadata
and tnow - link.metadata['last_status_change'] < self.link_up_timer
):
return EntityStatus.DOWN
return None

def notify_link_up_if_status(self, link) -> None:
"""Tries to notify link up and topology changes based on its status
Currently, it needs to wait up to a timer."""
time.sleep(self.link_up_timer)
if link.status != EntityStatus.UP:
return
with self._links_notify_lock[link.id]:
notified_at = link.get_metadata("notified_up_at")
if (
notified_at
and (now() - notified_at.replace(tzinfo=timezone.utc)).seconds
< self.link_up_timer
):
return
key, notified_at = "notified_up_at", now()
link.update_metadata(key, now())
self.topo_controller.add_link_metadata(link.id, {key: notified_at})
self.notify_topology_update()
self.notify_link_status_change(link, reason="link up")

def handle_link_up(self, interface):
"""Notify a link is up."""
"""Handle link up for an interface."""
interface.activate()
self.topo_controller.activate_interface(interface.id)
self.notify_topology_update()
Expand All @@ -762,34 +799,14 @@ def handle_link_up(self, interface):
other_interface = link.endpoint_a
if other_interface.is_active() is False:
return
if link.is_active() is False:
link.update_metadata('last_status_change', time.time())
link.activate()

# As each run of this method uses a different thread,
# there is no risk this sleep will lock the NApp.
time.sleep(self.link_up_timer)

last_status_change = link.get_metadata('last_status_change')
now = time.time()
if link.is_active() and \
now - last_status_change >= self.link_up_timer:
link.update_metadata('last_status_is_active', True)
self.topo_controller.activate_link(link.id, last_status_change,
last_status_is_active=True)
if link.status == EntityStatus.UP:
self.notify_topology_update()
self.notify_link_status_change(link, reason='link up')
else:
last_status_change = time.time()
metadata = {'last_status_change': last_status_change,
'last_status_is_active': True}
link.extend_metadata(metadata)
self.topo_controller.activate_link(link.id, last_status_change,
last_status_is_active=True)
if link.status == EntityStatus.UP:
self.notify_topology_update()
self.notify_link_status_change(link, reason='link up')
metadata = {
'last_status_change': time.time(),
'last_status_is_active': True
}
link.extend_metadata(metadata)
link.activate()
self.topo_controller.activate_link(link.id, **metadata)
self.notify_link_up_if_status(link)

@listen_to('.*.switch.interface.link_down')
def on_interface_link_down(self, event):
Expand Down Expand Up @@ -880,11 +897,20 @@ def add_links(self, event):
log.error(f'Error creating link: {err}.')
return

if created:
link.update_metadata('last_status_is_active', True)
self.notify_link_status_change(link, reason='link up')
self.notify_topology_update()
self.topo_controller.upsert_link(link.id, link.as_dict())
if not created:
return

self.notify_topology_update()
if not link.is_active():
return

metadata = {
'last_status_change': time.time(),
'last_status_is_active': True
}
link.extend_metadata(metadata)
self.topo_controller.upsert_link(link.id, link.as_dict())
self.notify_link_up_if_status(link)

@listen_to('.*.of_lldp.network_status.updated')
def on_lldp_status_updated(self, event):
Expand Down
83 changes: 64 additions & 19 deletions tests/unit/test_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@
import pytest
import json
import time
from datetime import timedelta
from unittest import TestCase
from unittest.mock import MagicMock, create_autospec, patch

from kytos.core.common import EntityStatus
from kytos.core.helpers import now
from kytos.core.interface import Interface
from kytos.core.link import Link
from kytos.core.switch import Switch
Expand Down Expand Up @@ -1199,29 +1201,32 @@ def test_interface_deleted(self, mock_handle_interface_link_down):
mock_handle_interface_link_down.assert_called()

@patch('napps.kytos.topology.main.Main._get_link_from_interface')
@patch('napps.kytos.topology.main.Main.notify_link_up_if_status')
@patch('napps.kytos.topology.main.Main.notify_topology_update')
@patch('napps.kytos.topology.main.Main.notify_link_status_change')
def test_interface_link_up(self, *args):
"""Test interface link_up."""
(mock_status_change, mock_topology_update,
(mock_notify_topology_update,
mock_notify_link_up_if_status,
mock_link_from_interface) = args

now = time.time()
tnow = time.time()
mock_interface_a = create_autospec(Interface)
mock_interface_a.is_active.return_value = False
mock_interface_b = create_autospec(Interface)
mock_interface_b.is_active.return_value = True
mock_link = create_autospec(Link)
mock_link.get_metadata.return_value = now
mock_link.get_metadata.return_value = tnow
mock_link.is_active.side_effect = [False, True]
mock_link.endpoint_a = mock_interface_a
mock_link.endpoint_b = mock_interface_b
mock_link_from_interface.return_value = mock_link
mock_link.status = EntityStatus.UP
self.napp.link_up_timer = 1
self.napp.handle_interface_link_up(mock_interface_a)
mock_topology_update.assert_called()
mock_status_change.assert_called()
mock_notify_topology_update.assert_called()
mock_link.extend_metadata.assert_called()
mock_link.activate.assert_called()
self.napp.topo_controller.activate_link.assert_called()
mock_notify_link_up_if_status.assert_called()

@patch('napps.kytos.topology.main.Main._get_link_from_interface')
@patch('napps.kytos.topology.main.Main.notify_topology_update')
Expand Down Expand Up @@ -1282,11 +1287,12 @@ def test_handle_link_down_not_active(self, *args):
assert self.napp.topo_controller.deactivate_interface.call_count == 1

@patch('napps.kytos.topology.main.Main._get_link_from_interface')
@patch('napps.kytos.topology.main.Main.notify_link_up_if_status')
@patch('napps.kytos.topology.main.Main.notify_topology_update')
@patch('napps.kytos.topology.main.Main.notify_link_status_change')
def test_handle_link_up(self, *args):
"""Test handle link up."""
(mock_status_change, mock_topology_update,
(mock_notify_topology_update,
mock_notify_link_up_if_status,
mock_link_from_interface) = args

mock_interface = create_autospec(Interface)
Expand All @@ -1296,8 +1302,8 @@ def test_handle_link_up(self, *args):
self.napp.handle_link_up(mock_interface)
assert self.napp.topo_controller.activate_link.call_count == 1
mock_interface.activate.assert_called()
assert mock_topology_update.call_count == 2
mock_status_change.assert_called()
assert mock_notify_link_up_if_status.call_count == 1
mock_notify_topology_update.assert_called()

@patch('time.sleep')
@patch('napps.kytos.topology.main.Main._get_link_from_interface')
Expand All @@ -1319,14 +1325,12 @@ def test_handle_link_up_intf_down(self, *args):
assert mock_topology_update.call_count == 1
mock_status_change.assert_not_called()

@patch('napps.kytos.topology.main.Main.notify_link_status_change')
@patch('napps.kytos.topology.main.Main._get_link_or_create')
@patch('napps.kytos.topology.main.Main.notify_topology_update')
@patch('napps.kytos.topology.main.Main.notify_link_up_if_status')
def test_add_links(self, *args):
"""Test add_links."""
(mock_notify_topology_update,
mock_get_link_or_create,
mock_link_status_change) = args
(mock_notify_link_up_if_status,
mock_get_link_or_create) = args

mock_link = MagicMock()
mock_get_link_or_create.return_value = (mock_link, True)
Expand All @@ -1336,12 +1340,11 @@ def test_add_links(self, *args):
mock_event.content = {"interface_a": mock_intf_a,
"interface_b": mock_intf_b}
self.napp.add_links(mock_event)
mock_link.update_metadata.assert_called()
mock_link.extend_metadata.assert_called()
mock_get_link_or_create.assert_called()
mock_notify_topology_update.assert_called()
mock_notify_link_up_if_status.assert_called()
mock_intf_a.update_link.assert_called()
mock_intf_b.update_link.assert_called()
mock_link_status_change.assert_called()
mock_link.endpoint_a = mock_intf_a
mock_link.endpoint_b = mock_intf_b

Expand Down Expand Up @@ -1536,3 +1539,45 @@ def test_handle_switch_maintenance_end(self, handle_link_up_mock):
event.content = content
self.napp.handle_switch_maintenance_end(event)
self.assertEqual(handle_link_up_mock.call_count, 5)

def test_link_status_hook_link_up_timer(self) -> None:
"""Test status hook link up timer."""
last_change = time.time() - self.napp.link_up_timer + 5
link = MagicMock(metadata={"last_status_change": last_change})
link.is_active.return_value = True
link.is_enabled.return_value = True
res = self.napp.link_status_hook_link_up_timer(link)
assert res == EntityStatus.DOWN

last_change = time.time() - self.napp.link_up_timer
link.metadata["last_status_change"] = last_change
res = self.napp.link_status_hook_link_up_timer(link)
assert res is None

@patch('napps.kytos.topology.main.Main.notify_link_status_change')
@patch('napps.kytos.topology.main.Main.notify_topology_update')
@patch('time.sleep')
def test_notify_link_up_if_status(
self,
mock_sleep,
mock_notify_topo,
mock_notify_link,
) -> None:
"""Test notify link up if status."""

link = MagicMock(status=EntityStatus.UP)
link.get_metadata.return_value = now()
assert not self.napp.notify_link_up_if_status(link)
link.update_metadata.assert_not_called()
mock_notify_topo.assert_not_called()
mock_notify_link.assert_not_called()

link = MagicMock(status=EntityStatus.UP)
link.get_metadata.return_value = now() - timedelta(seconds=60)
assert not self.napp.notify_link_up_if_status(link)
link.update_metadata.assert_called()
self.napp.topo_controller.add_link_metadata.assert_called()
mock_notify_topo.assert_called()
mock_notify_link.assert_called()

assert mock_sleep.call_count == 2

0 comments on commit 045ae57

Please sign in to comment.