Skip to content

Commit

Permalink
Merge pull request #429 from kytos-ng/feat/qsize
Browse files Browse the repository at this point in the history
feat: parametrized timeout on KytosEventBuffer.put and exposing buffers queue size config
  • Loading branch information
viniarck authored Jan 25, 2024
2 parents 9b5c3c9 + eff1948 commit 7a2afa8
Show file tree
Hide file tree
Showing 10 changed files with 140 additions and 23 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,16 @@ Added
- Added ``TAGRange`` class which is used when a ``UNI`` has a tag as a list of ranges.
- Added ``KytosTagError`` exception that cover other exceptions ``KytosTagtypeNotSupported``, ``KytosInvalidTagRanges``, ``KytosSetTagRangeError``, ``KytosTagsNotInTagRanges`` and ``KytosTagsAreNotAvailable`` all of which are related to TAGs.
- Added ``special_available_tags`` which stores `"untagged"` and `"any"` if they can be used from an Interface.
- Added ``maxsize_multiplier`` on ``event_buffer_conf``, which will multiply the ``maxsize`` value of the queue. By default, all KytosEventBuffer who use a bounded queue will have ``maxsize_multiplier: 2``. This default is reasonable to work out of the box with kytos-ng core NApps. But, if you have other NApps who tend to produce too many events you might want to either increase the size of the queue with and/or increase the number of max workers in the thread pool if the event handler is running on a thread pool. Typically, you'll want to first start adjusting the number of workers in the thread pool.
- Introduced a new ``meta`` on ``KytosBuffers``, which is meant for general core control events.

Changed
=======
- Parametrized default ``maxTimeMS`` when creating an index via ``Mongo.boostrap_index`` via environment variable ``MONGO_IDX_TIMEOUTMS=30000``. The retries parameters reuse the same environment variables ``MONGO_AUTO_RETRY_STOP_AFTER_ATTEMPT=3``, ``MONGO_AUTO_RETRY_WAIT_RANDOM_MIN=0.1``, ``MONGO_AUTO_RETRY_WAIT_RANDOM_MAX=1`` that NApps controllers have been using.
- ``kytosd`` process will exit if a NApp raises an exception during its ``setup()`` execution.
- Change format for ``Interface.available_tags`` to ``dict[str, list[list[int]]]``. Storing ``tag_types`` as keys and a list of ranges for ``available_tags`` as values.
- ``Interface.use_tags`` and ``Interface.make_tags_available`` are compatible with list of ranges.
- ``KytosEventBuffer`'s ``put`` method timeout now accepts a timeout argument. NApps who publish events during ``setup()`` should set ``timeout=1`` without handling the exception, so if within 1 second the event can't be put in the queue, then the NApp will fail to start, which would imply that during initialization either the queue size is too small or the NApp is misbehaving sending too many events.

Fixed
=====
Expand Down
8 changes: 6 additions & 2 deletions kytos/core/buffers/buffers.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Kytos Buffer Classes, based on Python Queue."""
import logging
from typing import Optional

from janus import Queue

Expand All @@ -22,7 +23,7 @@ def __init__(self, name, queue: Queue = None):
self._queue = queue if queue is not None else Queue()
self._reject_new_events = False

def put(self, event):
def put(self, event, timeout: Optional[float] = None):
"""Insert an event in KytosEventBuffer if reject_new_events is False.
Reject new events is True when a kytos/core.shutdown message was
Expand All @@ -31,9 +32,12 @@ def put(self, event):
Args:
event (:class:`~kytos.core.events.KytosEvent`):
KytosEvent sent to queue.
timeout: Block if necessary until a free slot is available.
If 'timeout' is a non-negative number, it blocks at most 'timeout'
seconds and raises an Full exception if no free slot was available.
"""
if not self._reject_new_events:
self._queue.sync_q.put(event)
self._queue.sync_q.put(event, timeout=timeout)
LOG.debug('[buffer: %s] Added: %s', self.name, event.name)

if event.name == "kytos/core.shutdown":
Expand Down
3 changes: 2 additions & 1 deletion kytos/core/buffers/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ def process_queue(config: dict) -> Queue:
"""
queue_type = queue_classes[config.get('type', 'queue')]
queue_size = config.get('maxsize', 0)
queue_size_multiplier = config.get('maxsize_multiplier', 1)
if isinstance(queue_size, str):
if queue_size.startswith('threadpool_'):
threadpool = queue_size[len('threadpool_'):]
Expand All @@ -27,7 +28,7 @@ def process_queue(config: dict) -> Queue:
'Expected int or str formatted '
'as "threadpool_{threadpool_name}"'
)
return queue_type(maxsize=queue_size)
return queue_type(maxsize=queue_size * queue_size_multiplier)


extension_processors = {}
Expand Down
5 changes: 5 additions & 0 deletions kytos/core/buffers/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ def __init__(self):
:attr:`app`: :class:`~kytos.core.buffers.KytosEventBuffer` with events
sent to NApps.
:attr:`meta`: :class:`~kytos.core.buffers.KytosEventBuffer` with
core related events sent to NApps. This is meant for general core
control events.
"""

self._pool_max_workers = get_thread_pool_max_workers()
Expand All @@ -54,6 +58,7 @@ def __init__(self):
"app",
queue=Queue(maxsize=self._get_maxsize("app")),
)
self.meta = KytosEventBuffer("meta")

buffer_conf = KytosConfig().options['daemon'].event_buffer_conf

Expand Down
38 changes: 27 additions & 11 deletions kytos/core/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,18 +169,34 @@ def parse_args(self):
'apm': '',
'connection_timeout': 130,
'debug': False,
'event_buffer_conf': {
'msg_out': {
'queue': {
'type': 'priority',
'maxsize': 'threadpool_sb',
},
"event_buffer_conf": {
"msg_out": {
"queue": {
"type": "priority",
"maxsize": "threadpool_sb",
"maxsize_multiplier": 2,
}
},
'msg_in': {
'queue': {
'type': 'priority',
'maxsize': 'threadpool_sb',
},
"msg_in": {
"queue": {
"type": "priority",
"maxsize": "threadpool_sb",
"maxsize_multiplier": 2,
}
},
"raw": {
"queue": {
"type": "queue",
"maxsize": "threadpool_sb",
"maxsize_multiplier": 2,
}
},
"app": {
"queue": {
"type": "queue",
"maxsize": "threadpool_app",
"maxsize_multiplier": 2,
}
},
},
}
Expand Down
30 changes: 27 additions & 3 deletions kytos/core/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import threading
import traceback
from asyncio import AbstractEventLoop
from collections import Counter, defaultdict
from importlib import import_module
from importlib import reload as reload_module
from importlib.util import module_from_spec, spec_from_file_location
Expand Down Expand Up @@ -266,7 +267,8 @@ def start(self, restart=False):
except Exception as exc:
exc_fmt = traceback.format_exc(chain=True)
message = f"Kytos couldn't start because of {str(exc)} {exc_fmt}"
sys.exit(message)
counter = self._full_queue_counter()
sys.exit(self._try_to_fmt_traceback_msg(message, counter))

def create_pidfile(self):
"""Create a pidfile."""
Expand Down Expand Up @@ -370,6 +372,8 @@ def start_controller(self):
self._tasks.append(task)
task = self.loop.create_task(self.event_handler("app"))
self._tasks.append(task)
task = self.loop.create_task(self.event_handler("meta"))
self._tasks.append(task)

self.started_at = now()

Expand Down Expand Up @@ -569,7 +573,7 @@ async def publish_connection_error(self, event):
f"kytos/core.{event.destination.protocol.name}.connection.error"
error_msg = f"Connection state: {event.destination.state}"
event.content["exception"] = error_msg
await self.buffers.app.aput(event)
await self.buffers.conn.aput(event)

async def msg_out_event_handler(self):
"""Handle msg_out events.
Expand Down Expand Up @@ -679,7 +683,7 @@ def get_switch_or_create(self, dpid, connection=None):
if old_connection is not connection:
self.remove_connection(old_connection)

self.buffers.app.put(event)
self.buffers.conn.put(event)

return switch

Expand Down Expand Up @@ -935,3 +939,23 @@ def rest_reload_all_napps(self, _request: Request) -> JSONResponse:
for napp in self.napps:
self.reload_napp(*napp)
return JSONResponse('reloaded')

def _full_queue_counter(self) -> Counter:
"""Generate full queue stats counter."""
buffer_counter = defaultdict(Counter)
for buffer in self.buffers.get_all_buffers():
if not buffer.full():
continue
while not buffer.empty():
event = buffer.get()
buffer_counter[buffer.name][event.name] += 1
return buffer_counter

def _try_to_fmt_traceback_msg(self, message: str, counter: Counter) -> str:
"""Try to fmt traceback message."""
if counter:
counter = dict(counter)
message = (
f"{message}\nFull KytosEventBuffers counters: {counter}"
)
return message
2 changes: 1 addition & 1 deletion kytos/core/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ def __str__(self):
class KytosNAppSetupException(KytosNAppException):
"""KytosNAppSetupException. """

def __init__(self, message="KytosNAppSetupException") -> None:
def __init__(self, message="KytosNAppSetupException"):
"""KytosNAppSetupException."""
super().__init__(message=message)

Expand Down
2 changes: 1 addition & 1 deletion kytos/core/napps/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ def notify_loaded(self):
"""Inform this NApp has been loaded."""
name = f'{self.username}/{self.name}.loaded'
event = KytosEvent(name=name, content={})
self.controller.buffers.app.put(event)
self.controller.buffers.meta.put(event)

# all listeners receive event
def _shutdown_handler(self, event): # pylint: disable=unused-argument
Expand Down
20 changes: 18 additions & 2 deletions kytos/templates/kytos.conf.template
Original file line number Diff line number Diff line change
Expand Up @@ -85,13 +85,29 @@ event_buffer_conf =
"msg_out": {
"queue": {
"type": "priority",
"maxsize": "threadpool_sb"
"maxsize": "threadpool_sb",
"maxsize_multiplier": 2
}
},
"msg_in": {
"queue": {
"type": "priority",
"maxsize": "threadpool_sb"
"maxsize": "threadpool_sb",
"maxsize_multiplier": 2
}
},
"raw": {
"queue": {
"type": "queue",
"maxsize": "threadpool_sb",
"maxsize_multiplier": 2
}
},
"app": {
"queue": {
"type": "queue",
"maxsize": "threadpool_app",
"maxsize_multiplier": 2
}
}
}
Expand Down
52 changes: 50 additions & 2 deletions tests/unit/test_core/test_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,18 @@
import sys
import tempfile
import warnings
from collections import Counter
from copy import copy
from unittest import TestCase
from unittest.mock import MagicMock, Mock, call, patch
from unittest.mock import AsyncMock, MagicMock, Mock, call, patch

import pytest
from janus import Queue
from pyof.foundation.exceptions import PackException

from kytos.core import Controller
from kytos.core.auth import Auth
from kytos.core.buffers import KytosBuffers
from kytos.core.buffers import KytosBuffers, KytosEventBuffer
from kytos.core.config import KytosConfig
from kytos.core.events import KytosEvent
from kytos.core.exceptions import KytosNAppSetupException
Expand Down Expand Up @@ -362,22 +364,30 @@ def test_get_switch_or_create__exists(self):
dpid = '00:00:00:00:00:00:00:01'
switch = MagicMock(dpid=dpid)
self.controller.switches = {dpid: switch}
self.controller.buffers.conn = MagicMock()

connection = MagicMock()
resp_switch = self.controller.get_switch_or_create(dpid, connection)

self.assertEqual(resp_switch, switch)
self.controller.buffers.conn.put.assert_called()
ev_name = "kytos/core.switch.reconnected"
assert self.controller.buffers.conn.put.call_args[0][0].name == ev_name

def test_get_switch_or_create__not_exists(self):
"""Test status_api method when switch does not exist."""
self.controller.switches = {}
self.controller.buffers.conn = MagicMock()

dpid = '00:00:00:00:00:00:00:01'
connection = MagicMock()
switch = self.controller.get_switch_or_create(dpid, connection)

expected_switches = {'00:00:00:00:00:00:00:01': switch}
self.assertEqual(self.controller.switches, expected_switches)
self.controller.buffers.conn.put.assert_called()
ev_name = "kytos/core.switch.new"
assert self.controller.buffers.conn.put.call_args[0][0].name == ev_name

def test_create_or_update_connection(self):
"""Test create_or_update_connection method."""
Expand Down Expand Up @@ -677,6 +687,23 @@ def test_init_attrs(self):
assert self.controller.auth
assert self.controller.dead_letter

def test_try_to_fmt_traceback_msg(self) -> None:
"""Test test_try_to_fmt_traceback_msg."""
counter = Counter(range(5))
msg = "some traceback msg"
fmt_msg = self.controller._try_to_fmt_traceback_msg(msg, counter)
assert msg in fmt_msg
assert "counters" in fmt_msg

def test_config_default_maxsize_multiplier(self) -> None:
"""Test KytosConfig default maxsize multiplier."""
event_buffer_conf = self.controller.options.event_buffer_conf
assert event_buffer_conf
queues = event_buffer_conf.values()
assert queues
for queue in queues:
assert queue["queue"]["maxsize_multiplier"] == 2


class TestControllerAsync:

Expand Down Expand Up @@ -797,3 +824,24 @@ async def test_configuration_endpoint(self, controller, api_client):
resp = await api_client.get("kytos/core/config")
assert resp.status_code == 200
assert expected == resp.json()

async def test_publish_connection_error(self, controller):
"""Test publish_connection_error."""
controller.buffers.conn.aput = AsyncMock()
await controller.publish_connection_error(MagicMock())
controller.buffers.conn.aput.assert_called()

async def test_full_queue_counter(self, controller) -> None:
"""Test full queue counter."""
maxsize = 2
queue = Queue(maxsize=maxsize)
buffer = KytosEventBuffer("app", queue)
for i in range(maxsize):
await buffer.aput(KytosEvent(str(i)))
assert buffer.full()
controller._buffers.get_all_buffers.return_value = [buffer]
counter = controller._full_queue_counter()
assert counter
assert len(counter["app"]) == maxsize
queue.close()
await queue.wait_closed()

0 comments on commit 7a2afa8

Please sign in to comment.