Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/pacing #480

Merged
merged 23 commits into from
Jun 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@ All notable changes to the kytos project will be documented in this file.
UNRELEASED - Under development
******************************

Added
=====
- Added in the ``pacer`` to the kytos controller. The ``pacer`` can be used by NApps to pace specific actions, at a specified ``pace`` using a specified ``strategy``. For more info see EP0038. NOTE: The only available strategy at this time is ``fixed_window``.

Changed
=======
- Updated python environment installation from 3.9 to 3.11
Expand Down
4 changes: 4 additions & 0 deletions kytos/core/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
from kytos.core.napps.base import NApp
from kytos.core.napps.manager import NAppsManager
from kytos.core.napps.napp_dir_listener import NAppDirListener
from kytos.core.pacing import Pacer
from kytos.core.queue_monitor import QueueMonitorWindow
from kytos.core.switch import Switch

Expand Down Expand Up @@ -164,6 +165,9 @@ def __init__(self, options=None, loop: AbstractEventLoop = None):
sys.path.append(os.path.join(self.options.napps, os.pardir))
sys.excepthook = exc_handler

#: Pacer for controlling the rate which actions can be executed
self.pacer = Pacer("memory://")

def start_auth(self):
"""Initialize Auth() and its services"""
self.auth = Auth(self)
Expand Down
179 changes: 179 additions & 0 deletions kytos/core/pacing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
"""Provides utilities for pacing actions."""
import asyncio
import logging
import time

import limits.aio.strategies
import limits.strategies
from limits import RateLimitItem, parse
from limits.storage import storage_from_string

LOG = logging.getLogger(__name__)


available_strategies = {
"fixed_window": (
limits.strategies.FixedWindowRateLimiter,
limits.aio.strategies.FixedWindowRateLimiter,
),
# "elastic_window": (
# limits.strategies.FixedWindowElasticExpiryRateLimiter,
# limits.aio.strategies.FixedWindowElasticExpiryRateLimiter,
# ),
}


class NoSuchActionError(BaseException):
"""
Exception for trying to use actions that aren't configured.

Not intended to be caught by NApps.
"""


class Pacer:
"""Class for controlling the rate at which actions are executed."""
sync_strategies: dict[str, limits.strategies.RateLimiter]
viniarck marked this conversation as resolved.
Show resolved Hide resolved
async_strategies: dict[str, limits.aio.strategies.RateLimiter]
pace_config: dict[str, tuple[str, RateLimitItem]]

def __init__(self, storage_uri):
# Initialize dicts
self.sync_strategies = {}
self.async_strategies = {}
self.pace_config = {}

# Acquire storage
sync_storage = storage_from_string(storage_uri)
async_storage = storage_from_string(f"async+{storage_uri}")

# Populate strategies
for strat_name, strat_pair in available_strategies.items():
sync_strat_type, async_strat_type = strat_pair
self.sync_strategies[strat_name] = sync_strat_type(sync_storage)
self.async_strategies[strat_name] = async_strat_type(async_storage)

def inject_config(self, config: dict[str, dict]):
"""
Inject settings for pacing
"""
# Regenerate update dict
next_config = {
key: (
value.get('strategy', 'fixed_window'),
parse(value['pace'])
viniarck marked this conversation as resolved.
Show resolved Hide resolved
)
for key, value in config.items()
}

# Validate
for action, (strat, _) in next_config.items():
if strat not in available_strategies:
raise ValueError(
f"Strategy ({strat}) for action ({action}) not valid"
)
LOG.info("Added pace for action %s", action)

# Apply
self.pace_config.update(
next_config
)

async def ahit(self, action_name: str, *keys):
"""
Asynchronous variant of `hit`.

This can be called from the serving thread safely.
"""
if action_name not in self.pace_config:
raise NoSuchActionError(
f"`{action_name}` has not been configured yet"
)
strat, pace = self.pace_config[action_name]
identifiers = pace, action_name, *keys
strategy = self.async_strategies[strat]
while not await strategy.hit(*identifiers):
window_reset, _ = await strategy.get_window_stats(
*identifiers
)
sleep_time = window_reset - time.time()

await asyncio.sleep(sleep_time)

def hit(self, action_name: str, *keys):
"""
Pace execution, based on the pacing config for the given `action_name`.
Keys can be included to allow multiple objects
to be be paced separately on the same action.

This should not be called from the same thread serving
the pacing.
"""
if action_name not in self.pace_config:
raise NoSuchActionError(
f"`{action_name}` has not been configured yet"
)
strat, pace = self.pace_config[action_name]
identifiers = pace, action_name, *keys
strategy = self.sync_strategies[strat]
while not strategy.hit(*identifiers):
window_reset, _ = strategy.get_window_stats(
*identifiers
)
sleep_time = window_reset - time.time()

if sleep_time <= 0:
continue

time.sleep(sleep_time)


class PacerWrapper:
"""
Applies a namespace to various operations related to pacing.
"""
namespace: str
pacer: Pacer

def __init__(self, namespace: str, pacer: Pacer):
self.namespace = namespace
self.pacer = pacer

def inject_config(self, napp_config: dict):
"""
Inject namespace specific settings for pacing
"""
self.pacer.inject_config(
{
self._localized_key(key): value
for key, value in napp_config.items()
}
)

def hit(self, action_name: str, *keys):
"""
Asynchronous variant of `hit`.

This can be called from the serving thread safely.
"""
return self.pacer.hit(
self._localized_key(action_name),
*keys
)

async def ahit(self, action_name: str, *keys):
"""
Pace execution, based on the pacing config for the given `action_name`.
Keys can be included to allow multiple objects
to be be paced separately on the same action.

This should not be called from the same thread serving
the pacing.
"""
return await self.pacer.ahit(
self._localized_key(action_name),
*keys
)

def _localized_key(self, key):
return f"{self.namespace}.{key}"
21 changes: 20 additions & 1 deletion requirements/dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,11 @@ decorator==5.1.1
# -r requirements/run.txt
# ipython
# kytos
deprecated==1.2.14
# via
# -r requirements/run.txt
# kytos
# limits
dill==0.3.8
# via pylint
distlib==0.3.8
Expand Down Expand Up @@ -157,6 +162,11 @@ idna==3.6
# requests
imagesize==1.4.1
# via sphinx
importlib-resources==6.4.0
# via
# -r requirements/run.txt
# kytos
# limits
iniconfig==2.0.0
# via pytest
ipython==8.22.1
Expand Down Expand Up @@ -221,6 +231,10 @@ lazy-object-proxy==1.10.0
# -r requirements/run.txt
# kytos
# openapi-spec-validator
limits==3.11.0
# via
# -r requirements/run.txt
# kytos
livereload==2.6.3
# via sphinx-autobuild
lockfile==0.12.2
Expand Down Expand Up @@ -262,10 +276,13 @@ openapi-spec-validator==0.7.1
# -r requirements/run.txt
# kytos
# openapi-core
packaging==23.2
packaging==24.0
# via
# -r requirements/run.txt
# black
# build
# kytos
# limits
# pyproject-api
# pytest
# sphinx
Expand Down Expand Up @@ -478,6 +495,7 @@ typing-extensions==4.10.0
# -r requirements/run.txt
# janus
# kytos
# limits
# pydantic
# pydantic-core
urllib3==1.26.18
Expand Down Expand Up @@ -528,6 +546,7 @@ wheel==0.40.0
wrapt==1.14.1
# via
# -r requirements/run.txt
# deprecated
# elastic-apm
# kytos
yala==3.2.0
Expand Down
1 change: 1 addition & 0 deletions requirements/run.in
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,4 @@ httpx==0.27.0
starlette[full]==0.31.1
uvicorn[standard]==0.27.1
asgiref==3.7.2
limits==3.11.0
13 changes: 12 additions & 1 deletion requirements/run.txt
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ click==8.1.7
# via uvicorn
decorator==5.1.1
# via ipython
deprecated==1.2.14
# via limits
dnspython==2.6.1
# via
# -r requirements/run.in
Expand Down Expand Up @@ -66,6 +68,8 @@ idna==3.6
# email-validator
# httpx
# requests
importlib-resources==6.4.0
# via limits
ipython==8.22.1
# via -r requirements/run.in
isodate==0.6.1
Expand Down Expand Up @@ -95,6 +99,8 @@ jsonschema-specifications==2023.7.1
# openapi-schema-validator
lazy-object-proxy==1.10.0
# via openapi-spec-validator
limits==3.11.0
# via -r requirements/run.in
lockfile==0.12.2
# via
# -r requirements/run.in
Expand All @@ -113,6 +119,8 @@ openapi-schema-validator==0.6.2
# openapi-spec-validator
openapi-spec-validator==0.7.1
# via openapi-core
packaging==24.0
# via limits
parse==1.20.1
# via openapi-core
parso==0.8.3
Expand Down Expand Up @@ -191,6 +199,7 @@ traitlets==5.14.1
typing-extensions==4.10.0
# via
# janus
# limits
# pydantic
# pydantic-core
urllib3==1.26.18
Expand All @@ -212,7 +221,9 @@ websockets==12.0
werkzeug==2.0.3
# via openapi-core
wrapt==1.14.1
# via elastic-apm
# via
# deprecated
# elastic-apm

# The following packages are considered to be unsafe in a requirements file:
# setuptools
Loading
Loading