Skip to content

Commit

Permalink
Extract Prom. metrics to separate module
Browse files Browse the repository at this point in the history
  • Loading branch information
catileptic committed Jun 4, 2024
1 parent d0e4679 commit d879e08
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 117 deletions.
52 changes: 52 additions & 0 deletions servicelayer/metrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
from prometheus_client import (
start_http_server,
Counter,
Histogram,
REGISTRY,
GC_COLLECTOR,
PROCESS_COLLECTOR,
)

REGISTRY.unregister(GC_COLLECTOR)
REGISTRY.unregister(PROCESS_COLLECTOR)

TASKS_STARTED = Counter(
"servicelayer_tasks_started_total",
"Number of tasks that a worker started processing",
["stage"],
)

TASKS_SUCCEEDED = Counter(
"servicelayer_tasks_succeeded_total",
"Number of successfully processed tasks",
["stage", "retries"],
)

TASKS_FAILED = Counter(
"servicelayer_tasks_failed_total",
"Number of failed tasks",
["stage", "retries", "failed_permanently"],
)

TASK_DURATION = Histogram(
"servicelayer_task_duration_seconds",
"Task duration in seconds",
["stage"],
# The bucket sizes are a rough guess right now, we might want to adjust
# them later based on observed runtimes
buckets=[
0.25,
0.5,
1,
5,
15,
30,
60,
60 * 15,
60 * 30,
60 * 60,
60 * 60 * 2,
60 * 60 * 6,
60 * 60 * 24,
],
)
75 changes: 17 additions & 58 deletions servicelayer/taskqueue.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,11 @@
import pika
from banal import ensure_list

from prometheus_client import (
start_http_server,
Counter,
Histogram,
REGISTRY,
GC_COLLECTOR,
PROCESS_COLLECTOR,
)

from servicelayer.cache import get_redis, make_key
from servicelayer.util import pack_now, unpack_int
from servicelayer import settings
from servicelayer.util import service_retries, backoff
from servicelayer import metrics

log = logging.getLogger(__name__)
local = threading.local()
Expand All @@ -42,50 +34,6 @@

TIMEOUT = 5

REGISTRY.unregister(GC_COLLECTOR)
REGISTRY.unregister(PROCESS_COLLECTOR)

TASKS_STARTED = Counter(
"servicelayer_tasks_started_total",
"Number of tasks that a worker started processing",
["stage"],
)

TASKS_SUCCEEDED = Counter(
"servicelayer_tasks_succeeded_total",
"Number of successfully processed tasks",
["stage", "retries"],
)

TASKS_FAILED = Counter(
"servicelayer_tasks_failed_total",
"Number of failed tasks",
["stage", "retries", "failed_permanently"],
)

TASK_DURATION = Histogram(
"servicelayer_task_duration_seconds",
"Task duration in seconds",
["stage"],
# The bucket sizes are a rough guess right now, we might want to adjust
# them later based on observed runtimes
buckets=[
0.25,
0.5,
1,
5,
15,
30,
60,
60 * 15,
60 * 30,
60 * 60,
60 * 60 * 2,
60 * 60 * 6,
60 * 60 * 24,
],
)


@dataclass
class Task:
Expand Down Expand Up @@ -440,6 +388,17 @@ def __init__(
version=None,
prefetch_count_mapping=defaultdict(lambda: 1),
):
if settings.SENTRY_DSN:
import sentry_sdk

sentry_sdk.init(
dsn=settings.SENTRY_DSN,
traces_sample_rate=0,
release=settings.SENTRY_RELEASE,
environment=settings.SENTRY_ENVIRONMENT,
send_default_pii=False,
)

self.conn = conn or get_redis()
self.num_threads = num_threads
self.queues = ensure_list(queues)
Expand Down Expand Up @@ -523,7 +482,7 @@ def handle(self, task: Task, channel):
if dataset.should_execute(task.task_id):
task_retry_count = task.get_retry_count(self.conn)
if task_retry_count:
TASKS_FAILED.labels(
metrics.TASKS_FAILED.labels(
stage=task.operation,
retries=task_retry_count,
failed_permanently=False,
Expand All @@ -538,7 +497,7 @@ def handle(self, task: Task, channel):
task.increment_retry_count(self.conn)

# Emit Prometheus metrics
TASKS_STARTED.labels(stage=task.operation).inc()
metrics.TASKS_STARTED.labels(stage=task.operation).inc()
start_time = default_timer()
log.info(
f"Dispatching task {task.task_id} from job {task.job_id}"
Expand All @@ -549,8 +508,8 @@ def handle(self, task: Task, channel):

# Emit Prometheus metrics
duration = max(0, default_timer() - start_time)
TASK_DURATION.labels(stage=task.operation).observe(duration)
TASKS_SUCCEEDED.labels(
metrics.TASK_DURATION.labels(stage=task.operation).observe(duration)
metrics.TASKS_SUCCEEDED.labels(
stage=task.operation, retries=task_retry_count
).inc()
else:
Expand All @@ -562,7 +521,7 @@ def handle(self, task: Task, channel):
# In this case, a task ID was found neither in the
# list of Pending, nor the list of Running tasks
# in Redis. It was never attempted.
TASKS_FAILED.labels(
metrics.TASKS_FAILED.labels(
stage=task.operation,
retries=0,
failed_permanently=True,
Expand Down
69 changes: 10 additions & 59 deletions servicelayer/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,12 @@
from banal import ensure_list
from abc import ABC, abstractmethod

from prometheus_client import (
start_http_server,
Counter,
Histogram,
REGISTRY,
GC_COLLECTOR,
PROCESS_COLLECTOR,
)

from servicelayer import settings
from servicelayer.jobs import Stage
from servicelayer.cache import get_redis
from servicelayer.util import unpack_int
from servicelayer import metrics


log = logging.getLogger(__name__)

Expand All @@ -29,50 +22,6 @@
INTERVAL = 2
TASK_FETCH_RETRY = 60 / INTERVAL

REGISTRY.unregister(GC_COLLECTOR)
REGISTRY.unregister(PROCESS_COLLECTOR)

TASKS_STARTED = Counter(
"servicelayer_tasks_started_total",
"Number of tasks that a worker started processing",
["stage"],
)

TASKS_SUCCEEDED = Counter(
"servicelayer_tasks_succeeded_total",
"Number of successfully processed tasks",
["stage", "retries"],
)

TASKS_FAILED = Counter(
"servicelayer_tasks_failed_total",
"Number of failed tasks",
["stage", "retries", "failed_permanently"],
)

TASK_DURATION = Histogram(
"servicelayer_task_duration_seconds",
"Task duration in seconds",
["stage"],
# The bucket sizes are a rough guess right now, we might want to adjust
# them later based on observed runtimes
buckets=[
0.25,
0.5,
1,
5,
15,
30,
60,
60 * 15,
60 * 30,
60 * 60,
60 * 60 * 2,
60 * 60 * 6,
60 * 60 * 24,
],
)


class Worker(ABC):
"""Workers of all microservices, unite!"""
Expand Down Expand Up @@ -108,12 +57,14 @@ def handle_safe(self, task):
retries = unpack_int(task.context.get("retries"))

try:
TASKS_STARTED.labels(stage=task.stage.stage).inc()
metrics.TASKS_STARTED.labels(stage=task.stage.stage).inc()
start_time = default_timer()
self.handle(task)
duration = max(0, default_timer() - start_time)
TASK_DURATION.labels(stage=task.stage.stage).observe(duration)
TASKS_SUCCEEDED.labels(stage=task.stage.stage, retries=retries).inc()
metrics.TASK_DURATION.labels(stage=task.stage.stage).observe(duration)
metrics.TASKS_SUCCEEDED.labels(
stage=task.stage.stage, retries=retries
).inc()
except SystemExit as exc:
self.exit_code = exc.code
self.retry(task)
Expand All @@ -140,7 +91,7 @@ def run_prometheus_server(self):
def run_server():
port = settings.PROMETHEUS_PORT
log.info(f"Running Prometheus metrics server on port {port}")
start_http_server(port)
metrics.start_http_server(port)

thread = Thread(target=run_server)
thread.start()
Expand All @@ -153,7 +104,7 @@ def retry(self, task):
log.warning(
f"Queueing failed task for retry #{retry_count}/{settings.WORKER_RETRY}..." # noqa
)
TASKS_FAILED.labels(
metrics.TASKS_FAILED.labels(
stage=task.stage.stage,
retries=retries,
failed_permanently=False,
Expand All @@ -164,7 +115,7 @@ def retry(self, task):
log.warning(
f"Failed task, exhausted retry count of {settings.WORKER_RETRY}"
)
TASKS_FAILED.labels(
metrics.TASKS_FAILED.labels(
stage=task.stage.stage,
retries=retries,
failed_permanently=True,
Expand Down

0 comments on commit d879e08

Please sign in to comment.