Skip to content

Commit

Permalink
feat: Emit target metrics (#2486)
Browse files Browse the repository at this point in the history
* feat: Emit some target metrics

* Remove unused Metric

* Use PID tag in all metrics

* Update tests

* Update docs

* Add batch processing timer

* Add docs
  • Loading branch information
edgarrmondragon authored Aug 13, 2024
1 parent 64f9e64 commit b6fa56a
Show file tree
Hide file tree
Showing 5 changed files with 59 additions and 7 deletions.
12 changes: 9 additions & 3 deletions docs/implementation/metrics.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,14 @@
# Tap Metrics
# Tap and Target Metrics

Metrics logging is specified in the
[Singer Spec](https://hub.meltano.com/singer/spec#metrics). The SDK will automatically
emit metrics for `record_count`, `http_request_duration` and `sync_duration`.
[Singer Spec](https://hub.meltano.com/singer/spec#metrics).

The SDK will automatically emit the following metrics:

- `record_count`: The number of records processed by the tap or target.
- `http_request_duration`: The duration of HTTP requests made by the tap.
- `sync_duration`: The duration of the sync operation.
- `batch_processing_time`: The duration of processing a batch of records.

## Customization options

Expand Down
9 changes: 8 additions & 1 deletion singer_sdk/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ class Tag(str, enum.Enum):
JOB_TYPE = "job_type"
HTTP_STATUS_CODE = "http_status_code"
STATUS = "status"
PID = "pid"


class Metric(str, enum.Enum):
Expand All @@ -58,6 +59,7 @@ class Metric(str, enum.Enum):
HTTP_REQUEST_COUNT = "http_request_count"
JOB_DURATION = "job_duration"
SYNC_DURATION = "sync_duration"
BATCH_PROCESSING_TIME = "batch_processing_time"


@dataclass
Expand Down Expand Up @@ -116,6 +118,7 @@ def __init__(self, metric: Metric, tags: dict | None = None) -> None:
"""
self.metric = metric
self.tags = tags or {}
self.tags[Tag.PID] = os.getpid()
self.logger = get_metrics_logger()

@property
Expand Down Expand Up @@ -182,6 +185,10 @@ def __init__(
self.log_interval = log_interval
self.last_log_time = time()

def exit(self) -> None:
"""Exit the counter context."""
self._pop()

def __enter__(self) -> Counter:
"""Enter the counter context.
Expand All @@ -204,7 +211,7 @@ def __exit__(
exc_val: The exception value.
exc_tb: The exception traceback.
"""
self._pop()
self.exit()

def _pop(self) -> None:
"""Log and reset the counter."""
Expand Down
27 changes: 27 additions & 0 deletions singer_sdk/sinks/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import jsonschema.validators
from typing_extensions import override

from singer_sdk import metrics
from singer_sdk._singerlib.json import deserialize_json
from singer_sdk.exceptions import (
InvalidJSONSchema,
Expand Down Expand Up @@ -193,6 +194,31 @@ def __init__(
)

self._validator: BaseJSONSchemaValidator | None = self.get_validator()
self._record_counter: metrics.Counter = metrics.record_counter(self.stream_name)
self._batch_timer = metrics.Timer(
metrics.Metric.BATCH_PROCESSING_TIME,
tags={
metrics.Tag.STREAM: self.stream_name,
},
)

@property
def record_counter_metric(self) -> metrics.Counter:
"""Get the record counter for this sink.
Returns:
The Meter instance for the record counter.
"""
return self._record_counter

@property
def batch_processing_timer(self) -> metrics.Timer:
"""Get the batch processing timer for this sink.
Returns:
The Meter instance for the batch processing timer.
"""
return self._batch_timer

@cached_property
def validate_schema(self) -> bool:
Expand Down Expand Up @@ -685,6 +711,7 @@ def clean_up(self) -> None:
should not be relied on, it's recommended to use a uuid as well.
"""
self.logger.info("Cleaning up %s", self.stream_name)
self.record_counter_metric.exit()

def process_batch_files(
self,
Expand Down
4 changes: 3 additions & 1 deletion singer_sdk/target_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,7 @@ def _process_record_message(self, message_dict: dict) -> None:

sink.tally_record_read()
sink.process_record(transformed_record, context)
sink.record_counter_metric.increment()
sink._after_process_record(context) # noqa: SLF001

if sink.is_full:
Expand Down Expand Up @@ -510,7 +511,8 @@ def drain_one(self, sink: Sink) -> None: # noqa: PLR6301
return

draining_status = sink.start_drain()
sink.process_batch(draining_status)
with sink.batch_processing_timer:
sink.process_batch(draining_status)
sink.mark_drained()

def _drain_all(self, sink_list: list[Sink], parallelism: int) -> None:
Expand Down
14 changes: 12 additions & 2 deletions tests/core/test_metrics.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

import logging
import os

import pytest
import time_machine
Expand All @@ -18,6 +19,8 @@ def __str__(self) -> str:


def test_meter():
pid = os.getpid()

class _MyMeter(metrics.Meter):
def __enter__(self):
return self
Expand All @@ -27,18 +30,22 @@ def __exit__(self, exc_type, exc_val, exc_tb):

meter = _MyMeter(metrics.Metric.RECORD_COUNT)

assert meter.tags == {}
assert meter.tags == {metrics.Tag.PID: pid}

stream_context = {"parent_id": 1}
meter.context = stream_context
assert meter.tags == {metrics.Tag.CONTEXT: stream_context}
assert meter.tags == {
metrics.Tag.CONTEXT: stream_context,
metrics.Tag.PID: pid,
}

meter.context = None
assert metrics.Tag.CONTEXT not in meter.tags


def test_record_counter(caplog: pytest.LogCaptureFixture):
caplog.set_level(logging.INFO, logger=metrics.METRICS_LOGGER_NAME)
pid = os.getpid()
custom_object = CustomObject("test", 1)

with metrics.record_counter(
Expand Down Expand Up @@ -68,6 +75,7 @@ def test_record_counter(caplog: pytest.LogCaptureFixture):
assert point.tags == {
metrics.Tag.STREAM: "test_stream",
metrics.Tag.ENDPOINT: "test_endpoint",
metrics.Tag.PID: pid,
"custom_tag": "pytest",
"custom_obj": custom_object,
}
Expand All @@ -79,6 +87,7 @@ def test_record_counter(caplog: pytest.LogCaptureFixture):

def test_sync_timer(caplog: pytest.LogCaptureFixture):
caplog.set_level(logging.INFO, logger=metrics.METRICS_LOGGER_NAME)
pid = os.getpid()
traveler = time_machine.travel(0, tick=False)
traveler.start()

Expand All @@ -100,6 +109,7 @@ def test_sync_timer(caplog: pytest.LogCaptureFixture):
assert point.tags == {
metrics.Tag.STREAM: "test_stream",
metrics.Tag.STATUS: "succeeded",
metrics.Tag.PID: pid,
"custom_tag": "pytest",
}

Expand Down

0 comments on commit b6fa56a

Please sign in to comment.