Skip to content

Commit

Permalink
[Core] Add metrics for ocean core
Browse files Browse the repository at this point in the history
1. Decorator that can be used to "metricify" a function - accepts phase as input.
2. Auto record duration.
3. `increment_field` method for handling counters.
4. `increment_status` method for handling requests.
5. populate event with `MetricAggregator` to track metrics.
  • Loading branch information
Ivan Kalinovski authored and Ivan Kalinovski committed Dec 23, 2024
1 parent 40d2ff4 commit 69bb09b
Show file tree
Hide file tree
Showing 6 changed files with 286 additions and 4 deletions.
53 changes: 52 additions & 1 deletion port_ocean/clients/port/mixins/entities.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import asyncio
from typing import Any
import asyncio
from urllib.parse import quote_plus

import httpx
Expand All @@ -11,7 +11,9 @@
handle_status_code,
PORT_HTTP_MAX_CONNECTIONS_LIMIT,
)
from port_ocean.context import event
from port_ocean.core.models import Entity
from port_ocean.helpers.metric import MetricFieldType, MetricType, metric


class EntityClientMixin:
Expand All @@ -23,6 +25,7 @@ def __init__(self, auth: PortAuthentication, client: httpx.AsyncClient):
# requests that are not related to entities.
self.semaphore = asyncio.Semaphore(round(0.9 * PORT_HTTP_MAX_CONNECTIONS_LIMIT))

@metric(MetricType.LOAD)
async def upsert_entity(
self,
entity: Entity,
Expand Down Expand Up @@ -52,6 +55,9 @@ async def upsert_entity(
)

if response.is_error:
await event.event._metric_aggregator.increment_field(
MetricFieldType.ERROR_COUNT
)
logger.error(
f"Error {'Validating' if validation_only else 'Upserting'} "
f"entity: {entity.identifier} of "
Expand All @@ -67,6 +73,14 @@ async def upsert_entity(
# Happens when upsert fails and search identifier is defined.
# We return None to ignore the entity later in the delete process
if result_entity.is_using_search_identifier:
if not response.is_error:
(
await event.event._metric_aggregator.increment_field(
MetricFieldType.ERROR_COUNT
)
if event.event._metric_aggregator
else None
)
return None

# In order to save memory we'll keep only the identifier, blueprint and relations of the
Expand All @@ -83,6 +97,21 @@ async def upsert_entity(
for key, relation in result_entity.relations.items()
}

(
await event.event._metric_aggregator.increment_field(
MetricFieldType.OBJECT_COUNT
)
if event.event._metric_aggregator
else None
)
(
await event.event._metric_aggregator.increment_field(
MetricFieldType.UPSERTED
)
if event.event._metric_aggregator
else None
)

return reduced_entity

async def batch_upsert_entities(
Expand Down Expand Up @@ -116,6 +145,7 @@ async def batch_upsert_entities(

return entity_results

@metric(MetricType.LOAD)
async def delete_entity(
self,
entity: Entity,
Expand All @@ -138,6 +168,13 @@ async def delete_entity(
)

if response.is_error:
(
await event.event._metric_aggregator.increment_field(
MetricFieldType.ERROR_COUNT
)
if event.event._metric_aggregator
else None
)
if response.status_code == 404:
logger.info(
f"Failed to delete entity: {entity.identifier} of blueprint: {entity.blueprint},"
Expand All @@ -151,6 +188,20 @@ async def delete_entity(
)

handle_status_code(response, should_raise)
(
await event.event._metric_aggregator.increment_field(
MetricFieldType.OBJECT_COUNT
)
if event.event._metric_aggregator
else None
)
(
await event.event._metric_aggregator.increment_field(
MetricFieldType.DELETED
)
if event.event._metric_aggregator
else None
)

async def batch_delete_entities(
self,
Expand Down
18 changes: 17 additions & 1 deletion port_ocean/context/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from uuid import uuid4

from loguru import logger
from port_ocean.helpers.metric import MetricAggregator
from pydispatch import dispatcher # type: ignore
from werkzeug.local import LocalStack, LocalProxy

Expand All @@ -38,6 +39,7 @@ class EventType:
START = "start"
RESYNC = "resync"
HTTP_REQUEST = "http_request"
METRIC = "metric"


@dataclass
Expand All @@ -50,6 +52,7 @@ class EventContext:
_parent_event: Optional["EventContext"] = None
_event_id: str = field(default_factory=lambda: str(uuid4()))
_on_abort_callbacks: list[AbortCallbackFunction] = field(default_factory=list)
_metric_aggregator: Optional["MetricAggregator"] = None

def on_abort(self, func: AbortCallbackFunction) -> None:
self._on_abort_callbacks.append(func)
Expand Down Expand Up @@ -131,23 +134,36 @@ async def event_context(
parent_attributes = parent.attributes if parent else {}

attributes = {**parent_attributes, **(attributes or {})}

aggregator = (
parent._metric_aggregator
if parent and parent._metric_aggregator
else MetricAggregator()
)

new_event = EventContext(
event_type,
trigger_type=trigger_type,
attributes=attributes,
_parent_event=parent,
# inherit port app config from parent event, so it can be used in nested events
_port_app_config=parent.port_app_config if parent else None,
_metric_aggregator=aggregator,
)
_event_context_stack.push(new_event)

def _handle_event(triggering_event_id: int) -> None:
async def _handle_event(triggering_event_id: int) -> None:
if (
new_event.event_type == EventType.RESYNC
and new_event.id != triggering_event_id
):
logger.warning("ABORTING RESYNC EVENT DUE TO NEWER RESYNC REQUEST")
new_event.abort()
(
await new_event._metric_aggregator.flush()
if new_event._metric_aggregator
else None
)

dispatcher.connect(_handle_event, event_type)
dispatcher.send(event_type, triggering_event_id=event.id)
Expand Down
38 changes: 36 additions & 2 deletions port_ocean/core/handlers/entity_processor/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@

from loguru import logger

from port_ocean.context import event
from port_ocean.core.handlers.base import BaseHandler
from port_ocean.core.handlers.port_app_config.models import ResourceConfig
from port_ocean.core.ocean_types import (
RAW_ITEM,
CalculationResult,
EntitySelectorDiff,
)
from port_ocean.helpers.metric import MetricFieldType, MetricType, metric


class BaseEntityProcessor(BaseHandler):
Expand All @@ -30,6 +32,7 @@ async def _parse_items(
) -> CalculationResult:
pass

@metric(MetricType.TRANSFORM)
async def parse_items(
self,
mapping: ResourceConfig,
Expand All @@ -51,7 +54,38 @@ async def parse_items(
with logger.contextualize(kind=mapping.kind):
if not raw_data:
return CalculationResult(EntitySelectorDiff([], []), [])

return await self._parse_items(
result = await self._parse_items(
mapping, raw_data, parse_all, send_raw_data_examples_amount
)

(
await event.event._metric_aggregator.increment_field(
MetricFieldType.INPUT_COUNT, len(raw_data)
)
if event.event._metric_aggregator
else None
)
(
await event.event._metric_aggregator.increment_field(
MetricFieldType.OBJECT_COUNT,
len(result.entity_selector_diff.passed),
)
if event.event._metric_aggregator
else None
)
(
await event.event._metric_aggregator.increment_field(
MetricFieldType.FAILED, len(result.entity_selector_diff.failed)
)
if event.event._metric_aggregator
else None
)
(
await event.event._metric_aggregator.increment_field(
MetricFieldType.ERROR_COUNT, len(result.errors)
)
if event.event._metric_aggregator
else None
)

return result
6 changes: 6 additions & 0 deletions port_ocean/core/integrations/mixins/sync_raw.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
)
from port_ocean.core.utils import zip_and_sum, gather_and_split_errors_from_results
from port_ocean.exceptions.core import OceanAbortException
from port_ocean.helpers.metric import MetricFieldType, MetricType, metric

SEND_RAW_DATA_EXAMPLES_AMOUNT = 5

Expand Down Expand Up @@ -83,6 +84,7 @@ def _collect_resync_functions(

return fns

@metric(MetricType.EXTRACT)
async def _execute_resync_tasks(
self,
fns: list[Callable[[str], Awaitable[RAW_RESULT]]],
Expand Down Expand Up @@ -112,6 +114,9 @@ async def _execute_resync_tasks(
f"Triggered {len(tasks)} tasks for {resource_config.kind}, failed: {len(errors)}"
)

await event._metric_aggregator.increment_field(MetricFieldType.OBJECT_COUNT,len(results))if event._metric_aggregator else None
await event._metric_aggregator.increment_field(MetricFieldType.ERROR_COUNT,len(errors))if event._metric_aggregator else None

return results, errors

async def _calculate_raw(
Expand Down Expand Up @@ -483,6 +488,7 @@ async def sync_raw_all(

logger.error(message, exc_info=error_group)
else:
await event._metric_aggregator.flush()
logger.info(
f"Running resync diff calculation, number of entities at Port before resync: {len(entities_at_port)}, number of entities created during sync: {len(flat_created_entities)}"
)
Expand Down
Loading

0 comments on commit 69bb09b

Please sign in to comment.