Skip to content

Commit

Permalink
Remove MetricFileTarget (#456)
Browse files Browse the repository at this point in the history
  • Loading branch information
ekneg54 authored Oct 9, 2023
1 parent e5cbb34 commit 02448cd
Show file tree
Hide file tree
Showing 19 changed files with 141 additions and 971 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
## Upcoming Changes

## next release

### Breaking

* removed metric file target

### Features

* add a preprocessor to enrich by systems env variables
Expand Down
29 changes: 3 additions & 26 deletions doc/source/user_manual/configuration/metrics.rst
Original file line number Diff line number Diff line change
Expand Up @@ -90,27 +90,10 @@ Time Measurement is deactivated by default.

If only the general metrics are activated then the metric for the time measurement will be 0.

targets
port
^^^^^^^

List of targets where the statistics should be exported to. At the moment only :code:`file` and
:code:`prometheus` are allowed. Those can be further configured with the following options:

**file**

| **path** *(String)*
| Path to the log file.
| **rollover_interval** *(Integer, value > 0)*
| Defines after how many seconds the log file should be rotated.
| **backup_count** *(Integer, value > 0)*
| Defines how many rotating log files should exist simultaneously.
**prometheus**

| **port** *(Integer, value > 0)*
| Port which should be used to start the default prometheus exporter webservers
Port which should be used to start the default prometheus exporter webservers. (default: 8000)

Example
-------
Expand All @@ -126,13 +109,7 @@ Example
measure_time:
enabled: true
append_to_event: false
targets:
- prometheus:
port: 8000
- file:
path: ./logs/status.json
rollover_interval: 86400
backup_count: 10
port: 8000
Metrics Overview
Expand Down
17 changes: 9 additions & 8 deletions logprep/framework/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@
from logprep.abc.connector import Connector
from logprep.abc.input import (
CriticalInputError,
CriticalInputParsingError,
FatalInputError,
Input,
SourceDisconnectedError,
WarningInputError,
CriticalInputParsingError,
)
from logprep.abc.output import (
CriticalOutputError,
Expand All @@ -39,11 +39,12 @@
)
from logprep.abc.processor import Processor
from logprep.factory import Factory
from logprep.metrics.metric import Metric, MetricTargets, calculate_new_average
from logprep.metrics.metric import Metric, calculate_new_average
from logprep.metrics.metric_exposer import MetricExposer
from logprep.processor.base.exceptions import ProcessingCriticalError, ProcessingWarning
from logprep.util.multiprocessing_log_handler import MultiprocessingLogHandler
from logprep.util.pipeline_profiler import PipelineProfiler
from logprep.util.prometheus_exporter import PrometheusStatsExporter
from logprep.util.time_measurement import TimeMeasurement


Expand Down Expand Up @@ -223,7 +224,7 @@ def __init__(
lock: Lock = None,
shared_dict: dict = None,
used_server_ports: dict = None,
metric_targets: MetricTargets = None,
prometheus_exporter: PrometheusStatsExporter = None,
) -> None:
if log_handler and not isinstance(log_handler, Handler):
raise MustProvideALogHandlerError
Expand All @@ -239,7 +240,7 @@ def __init__(
print_processed_period = self._logprep_config.get("print_processed_period", 300)
self._processing_counter.setup(print_processed_period, log_handler, lock)
self._used_server_ports = used_server_ports
self._metric_targets = metric_targets
self._prometheus_exporter = prometheus_exporter
self.pipeline_index = pipeline_index
self._encoder = msgspec.msgpack.Encoder()
self._decoder = msgspec.msgpack.Decoder()
Expand All @@ -263,7 +264,7 @@ def _metric_labels(self) -> dict:
def _metrics_exposer(self) -> MetricExposer:
return MetricExposer(
self._logprep_config.get("metrics", {}),
self._metric_targets,
self._prometheus_exporter,
self._shared_dict,
self._lock,
self.logger,
Expand All @@ -272,7 +273,7 @@ def _metrics_exposer(self) -> MetricExposer:
@cached_property
def metrics(self) -> PipelineMetrics:
"""The pipeline metrics object"""
if self._metric_targets is None:
if self._prometheus_exporter is None:
return None
return self.PipelineMetrics(
input=self._input.metrics,
Expand Down Expand Up @@ -506,7 +507,7 @@ def __init__(
lock: Lock,
shared_dict: dict,
used_server_ports: dict,
metric_targets: MetricTargets = None,
prometheus_exporter: PrometheusStatsExporter = None,
) -> None:
if not isinstance(log_handler, MultiprocessingLogHandler):
raise MustProvideAnMPLogHandlerError
Expand All @@ -522,7 +523,7 @@ def __init__(
lock=lock,
shared_dict=shared_dict,
used_server_ports=used_server_ports,
metric_targets=metric_targets,
prometheus_exporter=prometheus_exporter,
)

self._continue_iterating = Value(c_bool)
Expand Down
26 changes: 13 additions & 13 deletions logprep/framework/pipeline_manager.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
"""This module contains functionality to manage pipelines via multi-processing."""

from logging import Logger, DEBUG
from multiprocessing import Manager, Lock
from logging import DEBUG, Logger
from multiprocessing import Lock, Manager
from queue import Empty

from logprep.framework.pipeline import MultiprocessingPipeline
from logprep.metrics.metric import MetricTargets
from logprep.util.configuration import Configuration
from logprep.util.multiprocessing_log_handler import MultiprocessingLogHandler
from logprep.util.prometheus_exporter import PrometheusStatsExporter


class PipelineManagerError(BaseException):
Expand All @@ -24,10 +24,9 @@ def __init__(self, what_failed: str):
class PipelineManager:
"""Manage pipelines via multi-processing."""

def __init__(self, logger: Logger, metric_targets: MetricTargets):
def __init__(self, logger: Logger):
self._logger = logger
self.metric_targets = metric_targets

self.prometheus_exporter = None
self._log_handler = MultiprocessingLogHandler(self._logger.level)

self._pipelines = []
Expand All @@ -44,8 +43,11 @@ def set_configuration(self, configuration: Configuration):
manager = Manager()
self._shared_dict = manager.dict()
self._used_server_ports = manager.dict()
for idx in range(configuration["process_count"]):
for idx in range(configuration.get("process_count", 1)):
self._shared_dict[idx] = None
prometheus_config = configuration.get("metrics", {})
if prometheus_config.get("enabled", False):
self.prometheus_exporter = PrometheusStatsExporter(prometheus_config, self._logger)

def get_count(self) -> int:
"""Get the pipeline count.
Expand Down Expand Up @@ -91,11 +93,9 @@ def restart_failed_pipeline(self):
failed_pipelines = [pipeline for pipeline in self._pipelines if not pipeline.is_alive()]
for failed_pipeline in failed_pipelines:
self._pipelines.remove(failed_pipeline)

if self.metric_targets and self.metric_targets.prometheus_target:
self.metric_targets.prometheus_target.prometheus_exporter.remove_metrics_from_process(
failed_pipeline.pid
)
if self.prometheus_exporter is None:
continue
self.prometheus_exporter.remove_metrics_from_process(failed_pipeline.pid)

if failed_pipelines:
self.set_count(self._configuration.get("process_count"))
Expand Down Expand Up @@ -126,5 +126,5 @@ def _create_pipeline(self, index) -> MultiprocessingPipeline:
lock=self._lock,
shared_dict=self._shared_dict,
used_server_ports=self._used_server_ports,
metric_targets=self.metric_targets,
prometheus_exporter=self.prometheus_exporter,
)
6 changes: 1 addition & 5 deletions logprep/metrics/metric.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
"""This module tracks, calculates, exposes and resets logprep metrics"""
from collections import namedtuple

from attr import define, asdict

MetricTargets = namedtuple("MetricTargets", "file_target prometheus_target")
from attr import asdict, define


def is_public(attribute, _):
Expand Down
41 changes: 30 additions & 11 deletions logprep/metrics/metric_exposer.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,21 @@

import numpy as np

from logprep.metrics.metric_targets import split_key_label_string

def split_key_label_string(key_label_string):
"""Splits the key label string into separate variables"""
if ";" not in key_label_string:
return key_label_string, {}
key, labels = key_label_string.split(";")
labels = labels.split(",")
labels = [label.split(":") for label in labels]
return key, dict(labels)


class MetricExposer:
"""The MetricExposer collects all metrics and exposes them via configured outputs"""

def __init__(self, config, metric_targets, shared_dict, lock, logger):
def __init__(self, config, prometheus_exporter, shared_dict, lock, logger):
self._shared_dict = shared_dict
self._print_period = config.get("period", 180)
self._cumulative = config.get("cumulative", True)
Expand All @@ -20,12 +28,7 @@ def __init__(self, config, metric_targets, shared_dict, lock, logger):
self._logger = logger
self._first_metrics_exposed = False
self._timer = Value(c_double, time() + self._print_period)

self.output_targets = []
if metric_targets and metric_targets.file_target:
self.output_targets.append(metric_targets.file_target)
if metric_targets and metric_targets.prometheus_target:
self.output_targets.append(metric_targets.prometheus_target)
self._prometheus_exporter = prometheus_exporter

def expose(self, metrics):
"""
Expand All @@ -35,7 +38,7 @@ def expose(self, metrics):
pipeline, or in an independent form, where each multiprocessing pipeline will be exposed
directly.
"""
if not self.output_targets:
if not self._prometheus_exporter:
return

if self._time_to_expose():
Expand Down Expand Up @@ -117,5 +120,21 @@ def _send_to_output(self, metrics):
Passes the metric object to the configured outputs such that they
can transform and expose them
"""
for output in self.output_targets:
output.expose(metrics)
for key_labels, value in metrics.items():
key, labels = split_key_label_string(key_labels)
if key not in self._prometheus_exporter.metrics.keys():
label_names = []
if labels:
label_names = labels.keys()
self._prometheus_exporter.create_new_metric_exporter(key, label_names)

if labels:
self._prometheus_exporter.metrics[key].labels(**labels).set(value)
else:
self._prometheus_exporter.metrics[key].set(value)

interval = self._prometheus_exporter.configuration["period"]
labels = {
"component": "logprep",
}
self._prometheus_exporter.tracking_interval.labels(**labels).set(interval)
Loading

0 comments on commit 02448cd

Please sign in to comment.