diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 58637438..2c830997 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -15,6 +15,7 @@ Changed - Updated python environment installation from 3.9 to 3.11 - Updated test dependencies - MongoDB version has been updated to 7.0 +- Queue buffers event handlers will now handle and log broad exceptions keeping the task execution alive General Information =================== diff --git a/kytos/core/controller.py b/kytos/core/controller.py index a9200e07..05beb055 100644 --- a/kytos/core/controller.py +++ b/kytos/core/controller.py @@ -583,17 +583,22 @@ def notify_listeners(self, event): else: listener(event) + # pylint: disable=broad-exception-caught async def event_handler(self, buffer_name: str): """Default event handler that gets from an event buffer.""" event_buffer = getattr(self.buffers, buffer_name) self.log.info(f"Event handler {buffer_name} started") while True: - event = await event_buffer.aget() - self.notify_listeners(event) + try: + event = await event_buffer.aget() + self.notify_listeners(event) - if event.name == "kytos/core.shutdown": - self.log.debug(f"Event handler {buffer_name} stopped") - break + if event.name == "kytos/core.shutdown": + self.log.debug(f"Event handler {buffer_name} stopped") + break + except Exception as exc: + self.log.exception(f"Unhandled exception on {buffer_name}", + exc_info=exc) async def publish_connection_error(self, event): """Publish connection error event. @@ -607,6 +612,7 @@ async def publish_connection_error(self, event): event.content["exception"] = error_msg await self.buffers.conn.aput(event) + # pylint: disable=broad-exception-caught async def msg_out_event_handler(self): """Handle msg_out events. @@ -615,15 +621,15 @@ async def msg_out_event_handler(self): """ self.log.info("Event handler msg_out started") while True: - triggered_event = await self.buffers.msg_out.aget() + try: + triggered_event = await self.buffers.msg_out.aget() - if triggered_event.name == "kytos/core.shutdown": - self.log.debug("Message Out Event handler stopped") - break + if triggered_event.name == "kytos/core.shutdown": + self.log.debug("Message Out Event handler stopped") + break - message = triggered_event.content['message'] - destination = triggered_event.destination - try: + message = triggered_event.content['message'] + destination = triggered_event.destination if (destination and not destination.state == ConnectionState.FINISHED): packet = message.pack() @@ -644,6 +650,9 @@ async def msg_out_event_handler(self): f"Discarding message: {message}, event: {triggered_event} " f"because of PackException {err}" ) + except Exception as exc: + self.log.exception("Unhandled exception on msg_out", + exc_info=exc) def get_interface_by_id(self, interface_id): """Find a Interface with interface_id. diff --git a/tests/unit/test_core/test_controller.py b/tests/unit/test_core/test_controller.py index bda65fe4..bdf79df2 100644 --- a/tests/unit/test_core/test_controller.py +++ b/tests/unit/test_core/test_controller.py @@ -818,6 +818,21 @@ async def test_msg_out_event_handler_pack_exc(self, controller): await controller.msg_out_event_handler() assert controller.log.error.call_count == 1 + async def test_msg_out_event_handler_broad_exc(self, controller): + """Test msg_out_event_handler async broad exception.""" + controller._buffers = KytosBuffers() + dst, msg = MagicMock(), MagicMock() + dst.state = 0 + msg.pack.side_effect = ValueError("some error") + event_1 = KytosEvent('kytos/core.any', + content={'message': msg, 'destination': dst}) + event_2 = KytosEvent('kytos/core.shutdown') + + await controller.buffers.msg_out._queue.async_q.put(event_1) + await controller.buffers.msg_out._queue.async_q.put(event_2) + await controller.msg_out_event_handler() + assert controller.log.exception.call_count == 1 + async def test_app_event_handler(self, controller): """Test app_event_handler async method by handling a shutdown event.""" controller._buffers = KytosBuffers() @@ -827,6 +842,19 @@ async def test_app_event_handler(self, controller): await controller.event_handler("app") controller.notify_listeners.assert_called_with(event) + async def test_app_event_handler_exc(self, controller): + """Test app_event_handler async method exc.""" + controller._buffers = KytosBuffers() + event1 = KytosEvent("kytos/core.any", content={"message": ""}) + event2 = KytosEvent("kytos/core.shutdown") + controller.notify_listeners = MagicMock() + controller.notify_listeners.side_effect = [ValueError("some error"), 1] + await controller.buffers.app._queue.async_q.put(event1) + await controller.buffers.app._queue.async_q.put(event2) + await controller.event_handler("app") + assert controller.log.exception.call_count == 1 + controller.notify_listeners.assert_called_with(event2) + async def test_configuration_endpoint(self, controller, api_client): """Should return the attribute options as json.""" expected = vars(controller.options)