Skip to content

Commit

Permalink
Add runtime_metrics_async (#385)
Browse files Browse the repository at this point in the history
  • Loading branch information
george-zubrienko authored Jan 11, 2024
1 parent fcf0b25 commit a903863
Show file tree
Hide file tree
Showing 3 changed files with 101 additions and 14 deletions.
2 changes: 0 additions & 2 deletions adapta/utils/_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,6 @@ def convert_datadog_tags(tag_dict: Optional[Dict[str, str]]) -> Optional[List[st
def operation_time():
"""
Returns execution time for the context block.
:param operation: A method to measure execution time for.
:return: A tuple of (method_execution_time_ns, method_result)
"""
result = namedtuple("OperationDuration", ["start", "end", "elapsed"])
Expand Down
43 changes: 43 additions & 0 deletions adapta/utils/decorators/_logging.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
""" Module for common decorator methods. """
from functools import wraps
from typing import Optional, Dict

from adapta.logs import SemanticLogger
from adapta.logs._async_logger import _AsyncLogger
from adapta.logs.models import LogLevel
from adapta.metrics import MetricsProvider
from adapta.utils._common import operation_time
Expand Down Expand Up @@ -51,3 +53,44 @@ def inner_runtime_decorator(*args, **kwargs):
return inner_runtime_decorator

return outer_runtime_decorator


def run_time_metrics_async(metric_name: str, tag_function_name: bool = False):
"""
Decorator that records runtime of decorated method to logging source and metrics_provider.
:param metric_name: Description of method type that can be used to capture logging in metric sink.
:param tag_function_name: Boolean flag to indicate if function name should be added as tag to metric. Default False.
"""

def outer(func):
@wraps(func)
async def inner_runtime_decorator(
metrics_provider: MetricsProvider,
logger: _AsyncLogger,
metric_tags: Optional[Dict[str, str]] = None,
**kwargs,
):
metric_tags = (metric_tags or {}) | ({"function_name": str(func.__name__)} if tag_function_name else {})

logger.debug("Running {run_type} on method {method_name}", run_type=metric_name, method_name=func.__name__)

with operation_time() as ot:
result = await func(metrics_provider=metrics_provider, logger=logger, metric_tags=metric_tags, **kwargs)

metrics_provider.gauge(
metric_name=metric_name,
metric_value=round(ot.elapsed / 1e9, 2),
tags=metric_tags,
)

logger.debug(
"Method {method_name} finished in {elapsed:.2f}s seconds",
method_name=func.__name__,
elapsed=(ot.elapsed / 1e9),
)
return result

return inner_runtime_decorator

return outer
70 changes: 58 additions & 12 deletions tests/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
import asyncio
import os
import pathlib
import sys
Expand All @@ -26,8 +27,10 @@
import pytest
from dataclasses_json import DataClassJsonMixin

from adapta.logs import SemanticLogger
from adapta.logs import SemanticLogger, create_async_logger
from adapta.logs._async_logger import _AsyncLogger
from adapta.logs.models import LogLevel
from adapta.metrics import MetricsProvider
from adapta.utils import (
doze,
operation_time,
Expand All @@ -39,6 +42,7 @@
xmltree_to_dict_collection,
)
from adapta.utils.concurrent_task_runner import Executable, ConcurrentTaskRunner
from adapta.utils.decorators._logging import run_time_metrics_async


@pytest.mark.parametrize("sleep_period,doze_interval", [(1, 50), (2, 10)])
Expand Down Expand Up @@ -273,6 +277,18 @@ def test_data_adapter(drop_missing: bool):
assert (result["D"] == 7).all()


class AssertiveMetricProvider:
def __init__(self, run_type: str, tag_func_name: bool):
self._run_type = run_type
self._tag_func_name = tag_func_name

def gauge(self, metric_name: str, metric_value: float, tags: dict[str, str]):
"""Dummy provider to assert passed values"""
assert metric_name == self._run_type
assert type(metric_value) == float
assert not self._tag_func_name or tags["function_name"] == "test_function"


@pytest.mark.parametrize("reporting_level", [LogLevel.DEBUG, LogLevel.INFO])
@pytest.mark.parametrize("loglevel", [LogLevel.DEBUG, LogLevel.INFO])
@pytest.mark.parametrize("tag_func_name", [True, False])
Expand All @@ -287,30 +303,23 @@ def test_runtime_decorator(caplog, reporting_level, loglevel, tag_func_name):
:param reporting_level: Reporting level defining at what level decorator sends logs.
:param loglevel: Loglevel that is tested.
"""
logger = SemanticLogger().add_log_source(
sem_logger = SemanticLogger().add_log_source(
log_source_name="decorator_test",
min_log_level=loglevel,
log_handlers=[StreamHandler(sys.stdout)],
is_default=True,
)

class DummyMetricProvider:
def gauge(self, metric_name, metric_value, tags):
"""Dummy provider to assert passed values"""
assert metric_name == run_type
assert type(metric_value) == float
assert not tag_func_name or tags["function_name"] == "test_function"

metrics_provider = DummyMetricProvider()
run_type = "test_execution"
print_from_func = "from_function_call"
metrics_provider = AssertiveMetricProvider(run_type=run_type, tag_func_name=tag_func_name)

@run_time_metrics(metric_name=run_type, tag_function_name=True, log_level=reporting_level)
def test_function(logger, **_kwargs):
def test_function(logger: SemanticLogger, **_kwargs):
logger.info(print_from_func)
return True

test_function(logger=logger, metrics_provider=metrics_provider)
test_function(logger=sem_logger, metrics_provider=metrics_provider)
if loglevel == LogLevel.DEBUG:
assert "test_function" in caplog.text and run_type in caplog.text
assert "finished in" in caplog.text and "s seconds" in caplog.text
Expand All @@ -330,6 +339,43 @@ def test_function(**_kwargs):
test_function()


@pytest.mark.parametrize("tag_func_name", [True, False])
@pytest.mark.asyncio
async def test_runtime_decorator_async(caplog, tag_func_name: bool):
"""
Test that run_time_metrics_decorator reports correct information for every run of the algorithm.
Firstly tests that wrapped method executes even when no logger is passed
Secondly tests that wrapped method sends logs when logger is passed.
:param caplog: pytest fixture for testing logging.
:param reporting_level: Reporting level defining at what level decorator sends logs.
:param loglevel: Loglevel that is tested.
"""

class AsyncTest:
pass

async_logger = create_async_logger(
logger_type=AsyncTest, log_handlers=[StreamHandler()], min_log_level=LogLevel.DEBUG
)

run_type = "test_execution"
print_from_func = "from_function_call"
metrics_provider = AssertiveMetricProvider(run_type=run_type, tag_func_name=tag_func_name)

@run_time_metrics_async(metric_name=run_type, tag_function_name=True)
async def test_function(logger: _AsyncLogger, **_kwargs):
logger.info(print_from_func)
await asyncio.sleep(1.2)
return True

await test_function(logger=async_logger, metrics_provider=metrics_provider)
assert "test_function" in caplog.text and run_type in caplog.text
assert "finished in" in caplog.text and "s seconds" in caplog.text
assert print_from_func in caplog.text


@pytest.mark.parametrize(
"dataframe, expected_types, column_filter",
[
Expand Down

0 comments on commit a903863

Please sign in to comment.