Skip to content

Commit

Permalink
Merge pull request #485 from kytos-ng/feat/msg_handlers_handle_exc
Browse files Browse the repository at this point in the history
feat: handled broad exception on event handler looped execution
  • Loading branch information
viniarck committed Jul 24, 2024
2 parents 76a3667 + a65854a commit 0d5476f
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 12 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ Changed
- Updated python environment installation from 3.9 to 3.11
- Updated test dependencies
- MongoDB version has been updated to 7.0
- Queue buffers event handlers will now handle and log broad exceptions keeping the task execution alive

General Information
===================
Expand Down
33 changes: 21 additions & 12 deletions kytos/core/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -583,17 +583,22 @@ def notify_listeners(self, event):
else:
listener(event)

# pylint: disable=broad-exception-caught
async def event_handler(self, buffer_name: str):
"""Default event handler that gets from an event buffer."""
event_buffer = getattr(self.buffers, buffer_name)
self.log.info(f"Event handler {buffer_name} started")
while True:
event = await event_buffer.aget()
self.notify_listeners(event)
try:
event = await event_buffer.aget()
self.notify_listeners(event)

if event.name == "kytos/core.shutdown":
self.log.debug(f"Event handler {buffer_name} stopped")
break
if event.name == "kytos/core.shutdown":
self.log.debug(f"Event handler {buffer_name} stopped")
break
except Exception as exc:
self.log.exception(f"Unhandled exception on {buffer_name}",
exc_info=exc)

async def publish_connection_error(self, event):
"""Publish connection error event.
Expand All @@ -607,6 +612,7 @@ async def publish_connection_error(self, event):
event.content["exception"] = error_msg
await self.buffers.conn.aput(event)

# pylint: disable=broad-exception-caught
async def msg_out_event_handler(self):
"""Handle msg_out events.
Expand All @@ -615,15 +621,15 @@ async def msg_out_event_handler(self):
"""
self.log.info("Event handler msg_out started")
while True:
triggered_event = await self.buffers.msg_out.aget()
try:
triggered_event = await self.buffers.msg_out.aget()

if triggered_event.name == "kytos/core.shutdown":
self.log.debug("Message Out Event handler stopped")
break
if triggered_event.name == "kytos/core.shutdown":
self.log.debug("Message Out Event handler stopped")
break

message = triggered_event.content['message']
destination = triggered_event.destination
try:
message = triggered_event.content['message']
destination = triggered_event.destination
if (destination and
not destination.state == ConnectionState.FINISHED):
packet = message.pack()
Expand All @@ -644,6 +650,9 @@ async def msg_out_event_handler(self):
f"Discarding message: {message}, event: {triggered_event} "
f"because of PackException {err}"
)
except Exception as exc:
self.log.exception("Unhandled exception on msg_out",
exc_info=exc)

def get_interface_by_id(self, interface_id):
"""Find a Interface with interface_id.
Expand Down
28 changes: 28 additions & 0 deletions tests/unit/test_core/test_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -818,6 +818,21 @@ async def test_msg_out_event_handler_pack_exc(self, controller):
await controller.msg_out_event_handler()
assert controller.log.error.call_count == 1

async def test_msg_out_event_handler_broad_exc(self, controller):
"""Test msg_out_event_handler async broad exception."""
controller._buffers = KytosBuffers()
dst, msg = MagicMock(), MagicMock()
dst.state = 0
msg.pack.side_effect = ValueError("some error")
event_1 = KytosEvent('kytos/core.any',
content={'message': msg, 'destination': dst})
event_2 = KytosEvent('kytos/core.shutdown')

await controller.buffers.msg_out._queue.async_q.put(event_1)
await controller.buffers.msg_out._queue.async_q.put(event_2)
await controller.msg_out_event_handler()
assert controller.log.exception.call_count == 1

async def test_app_event_handler(self, controller):
"""Test app_event_handler async method by handling a shutdown event."""
controller._buffers = KytosBuffers()
Expand All @@ -827,6 +842,19 @@ async def test_app_event_handler(self, controller):
await controller.event_handler("app")
controller.notify_listeners.assert_called_with(event)

async def test_app_event_handler_exc(self, controller):
"""Test app_event_handler async method exc."""
controller._buffers = KytosBuffers()
event1 = KytosEvent("kytos/core.any", content={"message": ""})
event2 = KytosEvent("kytos/core.shutdown")
controller.notify_listeners = MagicMock()
controller.notify_listeners.side_effect = [ValueError("some error"), 1]
await controller.buffers.app._queue.async_q.put(event1)
await controller.buffers.app._queue.async_q.put(event2)
await controller.event_handler("app")
assert controller.log.exception.call_count == 1
controller.notify_listeners.assert_called_with(event2)

async def test_configuration_endpoint(self, controller, api_client):
"""Should return the attribute options as json."""
expected = vars(controller.options)
Expand Down

0 comments on commit 0d5476f

Please sign in to comment.