Skip to content

Commit

Permalink
Merge pull request #35 from Llandy3d/multiprocess_redis_scrape_improv…
Browse files Browse the repository at this point in the history
…ement

Multiprocess redis scrape improvement
  • Loading branch information
Llandy3d authored May 14, 2023
2 parents 559c535 + 422d6df commit f08f57d
Show file tree
Hide file tree
Showing 9 changed files with 152 additions and 15 deletions.
6 changes: 6 additions & 0 deletions docs/backend.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,12 @@ You don't have to use these values in your implementation, they just need to be
# initialization steps
```

!!! tip

A `Backend` can have a `_generate_samples` class method that would be used when generating the metrics instead of going over every metric get method.

This allows for flexibility in implementing faster implementations, an example is the use of pipelines in the `MultiProcessRedisBackend`

---

## Default Backend
Expand Down
4 changes: 4 additions & 0 deletions docs/changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

## Unreleased

- Support for `_generate_samples` in Backend
- MultiProcessRedisBackend:
- Improved scraping performance via pipelining

## 0.0.12

- Escape `help` & `label_values` during exposition
Expand Down
4 changes: 4 additions & 0 deletions pytheus/backends/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,10 @@ def get_backend(metric: "_Metric", histogram_bucket: str | None = None) -> Backe
return BACKEND_CLASS(BACKEND_CONFIG, metric, histogram_bucket=histogram_bucket)


def get_backend_class() -> type[Backend]:
return BACKEND_CLASS


class SingleProcessBackend:
"""Provides a single-process backend that uses a thread-safe, in-memory approach."""

Expand Down
68 changes: 61 additions & 7 deletions pytheus/backends/redis.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,20 @@
from contextvars import ContextVar
from typing import TYPE_CHECKING

import redis

if TYPE_CHECKING:
from pytheus.backends.base import BackendConfig
from pytheus.metrics import _Metric
from pytheus.metrics import Sample, _Metric
from pytheus.registry import Collector, Registry


EXPIRE_KEY_TIME = 3600 # 1 hour


pipeline_var: ContextVar[redis.client.Pipeline | None] = ContextVar("pipeline", default=None)


class MultiProcessRedisBackend:
"""
Provides a multi-process backend that uses Redis.
Expand Down Expand Up @@ -81,6 +86,47 @@ def _init_key(self) -> None:

self.CONNECTION_POOL.expire(self._key_name, EXPIRE_KEY_TIME)

@classmethod
def _initialize_pipeline(cls) -> None:
assert cls.CONNECTION_POOL is not None
assert pipeline_var.get() is None
pipeline = cls.CONNECTION_POOL.pipeline()
pipeline_var.set(pipeline)

@staticmethod
def _execute_and_cleanup_pipeline() -> list[float | bool | None]:
pipeline = pipeline_var.get()
assert pipeline is not None
pipeline_var.set(None)
return pipeline.execute()

@classmethod
def _generate_samples(cls, registry: "Registry") -> dict["Collector", list["Sample"]]:
cls._initialize_pipeline()

# collect samples that are not yet stored with the value
samples_dict = {}
for collector in registry.collect():
samples_list: list[Sample] = []
samples_dict[collector] = samples_list
# collecting also builds requests in the pipeline
for sample in collector.collect():
samples_list.append(sample)

pipeline_data = cls._execute_and_cleanup_pipeline()
values = [
0 if item is None else item for item in pipeline_data if item not in (True, False)
]

# assign correct values to the samples
for samples in samples_dict.values():
owned_values = values[: len(samples)]
values = values[len(samples) :]

for sample, value in zip(samples, owned_values):
sample.value = value
return samples_dict

def inc(self, value: float) -> None:
assert self.CONNECTION_POOL is not None
if self._labels_hash:
Expand Down Expand Up @@ -110,14 +156,22 @@ def set(self, value: float) -> None:

def get(self) -> float:
assert self.CONNECTION_POOL is not None
client = self.CONNECTION_POOL

pipeline = pipeline_var.get()
if pipeline:
client = pipeline

if self._labels_hash:
value = self.CONNECTION_POOL.hget(self._key_name, self._labels_hash)
value = client.hget(self._key_name, self._labels_hash)
else:
value = self.CONNECTION_POOL.get(self._key_name)
value = client.get(self._key_name)

if not value:
self._init_key()
client.expire(self._key_name, EXPIRE_KEY_TIME)

if pipeline:
return 0.0

self.CONNECTION_POOL.expire(self._key_name, EXPIRE_KEY_TIME)
return float(value)
# NOTE: get() directly is only used when collecting metrics & in tests so it makes sense
# to consider adding a method only for tests
return float(value) if value else 0.0
29 changes: 23 additions & 6 deletions pytheus/exposition.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import os
from typing import Callable

from pytheus.metrics import Labels
from pytheus.backends.base import get_backend_class
from pytheus.metrics import Labels, Sample
from pytheus.registry import REGISTRY, Collector, Registry

LINE_SEPARATOR = os.linesep
Expand All @@ -22,9 +23,20 @@ def generate_metrics(registry: Registry = REGISTRY) -> str:
"""
Returns the metrics from the registry in prometheus text format
"""
lines = (
generate_from_collector(collector, registry.prefix) for collector in registry.collect()
)

backend_class = get_backend_class()
if hasattr(backend_class, "_generate_samples"):
samples_dict = backend_class._generate_samples(registry)

lines = (
generate_from_collector(collector, registry.prefix, samples)
for collector, samples in samples_dict.items()
)
else:
lines = (
generate_from_collector(collector, registry.prefix) for collector in registry.collect()
)

output = LINE_SEPARATOR.join(lines)
output += "\n"
return output
Expand All @@ -49,7 +61,9 @@ def format_labels(labels: Labels | None) -> str:
return f"{{{LABEL_SEPARATOR.join(label_str)}}}"


def generate_from_collector(collector: Collector, prefix: str | None = None) -> str:
def generate_from_collector(
collector: Collector, prefix: str | None = None, samples: list[Sample] | None = None
) -> str:
"""
Returns the metrics from a given collector in prometheus text format
"""
Expand All @@ -58,7 +72,10 @@ def generate_from_collector(collector: Collector, prefix: str | None = None) ->
type_text = f"# TYPE {metric_name} {collector.type_}"
output = [help_text, type_text]

for sample in collector.collect():
# iterate over samples if passed directly else fallback to the collect() method
samples_list = samples if samples else collector.collect()

for sample in samples_list:
label_str = format_labels(sample.labels)
metric = f"{metric_name}{sample.suffix}{label_str} {sample.value}"
output.append(metric)
Expand Down
3 changes: 2 additions & 1 deletion pytheus/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,8 @@ def collect(self) -> Iterable[Sample]:
sample for metric in self._labeled_metrics.values() for sample in metric.collect()
)
if self._default_labels and self._metric._can_observe:
labeled_metrics = itertools.chain(labeled_metrics, self._metric.collect())
# note: this chaining order is important for correct matching with redis pipeline
labeled_metrics = itertools.chain(self._metric.collect(), labeled_metrics)
return labeled_metrics
else:
return self._metric.collect()
Expand Down
6 changes: 6 additions & 0 deletions tests/backends/test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
SingleProcessBackend,
_import_backend_class,
get_backend,
get_backend_class,
load_backend,
)
from pytheus.metrics import _Metric
Expand Down Expand Up @@ -102,6 +103,11 @@ def test_get_backend():
assert isinstance(backend_class, SingleProcessBackend)


def test_get_backend_class():
load_backend()
assert get_backend_class() is SingleProcessBackend


class TestSingleProcessBackend:
@pytest.fixture
def single_process_backend(self):
Expand Down
38 changes: 38 additions & 0 deletions tests/backends/test_redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -271,3 +271,41 @@ def test_multiple_metrics_with_same_name_labeled_with_redis_key_name_do_not_over
assert second_collector_metrics_count == 2
assert counter_a.labels({"bob": "cat"})._metric_value_backend.get() == 0
assert counter_b.labels({"bob": "cat"})._metric_value_backend.get() == 1


@mock.patch("pytheus.backends.redis.pipeline_var")
def test_initialize_pipeline(pipeline_var_mock):
pipeline_var_mock.get.return_value = None
MultiProcessRedisBackend._initialize_pipeline()
assert pipeline_var_mock.set.called
assert pipeline_var_mock.set.call_args[0][0] is not None


@mock.patch("pytheus.backends.redis.pipeline_var")
def test_execute_and_cleanup_pipeline(pipeline_var_mock):
pipeline_mock = mock.Mock()
pipeline_var_mock.get.return_value = pipeline_mock
MultiProcessRedisBackend._execute_and_cleanup_pipeline()
assert pipeline_var_mock.set.called
assert pipeline_var_mock.set.call_args[0][0] is None
assert pipeline_mock.execute.called


def test_generate_samples():
registry = CollectorRegistry()
counter = Counter("name", "desc", registry=registry)
histogram = Histogram("histogram", "desc", registry=registry)
samples = MultiProcessRedisBackend._generate_samples(registry)
assert len(samples[counter._collector]) == 1
assert len(samples[histogram._collector]) == 14


def test_generate_samples_with_labels():
registry = CollectorRegistry()
counter = Counter(
"name", "desc", required_labels=["bob"], default_labels={"bob": "c"}, registry=registry
)
counter.labels({"bob": "a"})
counter.labels({"bob": "b"})
samples = MultiProcessRedisBackend._generate_samples(registry)
assert len(samples[counter._collector]) == 3
9 changes: 8 additions & 1 deletion tests/test_exposition.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from unittest import mock

from pytheus.exposition import _escape_help, format_labels, generate_metrics
from pytheus.metrics import Counter, Histogram
from pytheus.registry import REGISTRY, CollectorRegistry
Expand Down Expand Up @@ -85,7 +87,7 @@ def test_generate_metrics_histogram_with_labels_and_default_labels(self):
histogram.observe(0.4)
metrics_text = generate_metrics()
assert metrics_text == (
'# HELP hello world\n# TYPE hello histogram\nhello_bucket{bob="a",le="0.005"} 0.0\nhello_bucket{bob="a",le="0.01"} 0.0\nhello_bucket{bob="a",le="0.025"} 0.0\nhello_bucket{bob="a",le="0.05"} 0.0\nhello_bucket{bob="a",le="0.1"} 0.0\nhello_bucket{bob="a",le="0.25"} 0.0\nhello_bucket{bob="a",le="0.5"} 1.0\nhello_bucket{bob="a",le="1"} 1.0\nhello_bucket{bob="a",le="2.5"} 1.0\nhello_bucket{bob="a",le="5"} 1.0\nhello_bucket{bob="a",le="10"} 1.0\nhello_bucket{bob="a",le="+Inf"} 1.0\nhello_sum{bob="a"} 0.4\nhello_count{bob="a"} 1.0\nhello_bucket{bob="default",le="0.005"} 0.0\nhello_bucket{bob="default",le="0.01"} 0.0\nhello_bucket{bob="default",le="0.025"} 0.0\nhello_bucket{bob="default",le="0.05"} 0.0\nhello_bucket{bob="default",le="0.1"} 0.0\nhello_bucket{bob="default",le="0.25"} 0.0\nhello_bucket{bob="default",le="0.5"} 0.0\nhello_bucket{bob="default",le="1"} 0.0\nhello_bucket{bob="default",le="2.5"} 0.0\nhello_bucket{bob="default",le="5"} 0.0\nhello_bucket{bob="default",le="10"} 0.0\nhello_bucket{bob="default",le="+Inf"} 0.0\nhello_sum{bob="default"} 0.0\nhello_count{bob="default"} 0.0\n'
'# HELP hello world\n# TYPE hello histogram\nhello_bucket{bob="default",le="0.005"} 0.0\nhello_bucket{bob="default",le="0.01"} 0.0\nhello_bucket{bob="default",le="0.025"} 0.0\nhello_bucket{bob="default",le="0.05"} 0.0\nhello_bucket{bob="default",le="0.1"} 0.0\nhello_bucket{bob="default",le="0.25"} 0.0\nhello_bucket{bob="default",le="0.5"} 0.0\nhello_bucket{bob="default",le="1"} 0.0\nhello_bucket{bob="default",le="2.5"} 0.0\nhello_bucket{bob="default",le="5"} 0.0\nhello_bucket{bob="default",le="10"} 0.0\nhello_bucket{bob="default",le="+Inf"} 0.0\nhello_sum{bob="default"} 0.0\nhello_count{bob="default"} 0.0\nhello_bucket{bob="a",le="0.005"} 0.0\nhello_bucket{bob="a",le="0.01"} 0.0\nhello_bucket{bob="a",le="0.025"} 0.0\nhello_bucket{bob="a",le="0.05"} 0.0\nhello_bucket{bob="a",le="0.1"} 0.0\nhello_bucket{bob="a",le="0.25"} 0.0\nhello_bucket{bob="a",le="0.5"} 1.0\nhello_bucket{bob="a",le="1"} 1.0\nhello_bucket{bob="a",le="2.5"} 1.0\nhello_bucket{bob="a",le="5"} 1.0\nhello_bucket{bob="a",le="10"} 1.0\nhello_bucket{bob="a",le="+Inf"} 1.0\nhello_sum{bob="a"} 0.4\nhello_count{bob="a"} 1.0\n'
)

def test_generate_metrics_respects_escaping(self):
Expand All @@ -102,6 +104,11 @@ def test_generate_metrics_respects_escaping(self):
""
)

@mock.patch("pytheus.exposition.get_backend_class")
def test_generate_metrics_calls_generate_samples(self, get_backend_mock):
generate_metrics()
assert get_backend_mock()._generate_samples.called

def test_format_labels_escapes_characters(self):
# \ -> \\
# " -> \"
Expand Down

0 comments on commit f08f57d

Please sign in to comment.