Skip to content

Commit

Permalink
remove MultiprocessLogHandler
Browse files Browse the repository at this point in the history
  • Loading branch information
ekneg54 committed Oct 9, 2023
1 parent 02448cd commit e7c4540
Show file tree
Hide file tree
Showing 13 changed files with 101 additions and 430 deletions.
10 changes: 2 additions & 8 deletions logprep/abc/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -275,15 +275,9 @@ def load_rules(self, specific_rules_targets: List[str], generic_rules_targets: L
self._generic_tree.add_rule(rule, self._logger)
if self._logger.isEnabledFor(DEBUG): # pragma: no cover
number_specific_rules = self._specific_tree.metrics.number_of_rules
self._logger.debug(
f"{self.describe()} loaded {number_specific_rules} "
f"specific rules ({current_process().name})"
)
self._logger.debug(f"{self.describe()} loaded {number_specific_rules} specific rules")
number_generic_rules = self._generic_tree.metrics.number_of_rules
self._logger.debug(
f"{self.describe()} loaded {number_generic_rules} generic rules "
f"generic rules ({current_process().name})"
)
self._logger.debug(f"{self.describe()} loaded {number_generic_rules} generic rules")

@staticmethod
def _field_exists(event: dict, dotted_field: str) -> bool:
Expand Down
81 changes: 23 additions & 58 deletions logprep/framework/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@
"""
import copy
import logging
import logging.handlers
import multiprocessing

# pylint: disable=logging-fstring-interpolation
import queue
import warnings
from ctypes import c_bool, c_ulonglong
from functools import cached_property
from logging import INFO, NOTSET, Handler, Logger
from multiprocessing import Lock, Process, Value, current_process
from typing import Any, List, Tuple

Expand Down Expand Up @@ -42,34 +44,21 @@
from logprep.metrics.metric import Metric, calculate_new_average
from logprep.metrics.metric_exposer import MetricExposer
from logprep.processor.base.exceptions import ProcessingCriticalError, ProcessingWarning
from logprep.util.multiprocessing_log_handler import MultiprocessingLogHandler
from logprep.util.pipeline_profiler import PipelineProfiler
from logprep.util.prometheus_exporter import PrometheusStatsExporter
from logprep.util.time_measurement import TimeMeasurement


class PipelineError(BaseException):
"""Base class for Pipeline related exceptions."""


class MustProvideALogHandlerError(PipelineError):
"""Raise if no log handler was provided."""


class MustProvideAnMPLogHandlerError(BaseException):
"""Raise if no multiprocessing log handler was provided."""


class SharedCounter:
"""A shared counter for multiprocessing pipelines."""

def __init__(self):
self._val = Value(c_ulonglong, 0)
self._printed = Value(c_bool, False)
self._lock = None
self._logger = None
self._period = None
self.scheduler = Scheduler()
self._logger = logging.getLogger("Logprep SharedCounter")

def _init_timer(self, period: float):
if self._period is None:
Expand All @@ -79,17 +68,9 @@ def _init_timer(self, period: float):
self.scheduler.every(int(self._period)).seconds.do(self.print_value)
self.scheduler.every(int(self._period + 1)).seconds.do(self.reset_printed)

def _create_logger(self, log_handler: Handler):
if self._logger is None:
logger = Logger("Processing Counter", level=log_handler.level)
for handler in logger.handlers:
logger.removeHandler(handler)
logger.addHandler(log_handler)
self._logger = logger

def setup(self, print_processed_period: float, log_handler: Handler, lock: Lock):
def setup(self, print_processed_period: float, lock: Lock):
"""Setup shared counter for multiprocessing pipeline."""
self._create_logger(log_handler)

self._init_timer(print_processed_period)
self._lock = lock

Expand Down Expand Up @@ -194,7 +175,7 @@ def update_mean_processing_time_per_event(self, new_sample):
_logprep_config: dict
""" the logprep configuration dict """

_log_handler: Handler
_log_queue: multiprocessing.Queue
""" the handler for the logs """

_continue_iterating: Value
Expand All @@ -220,25 +201,25 @@ def __init__(
config: dict,
pipeline_index: int = None,
counter: "SharedCounter" = None,
log_handler: Handler = None,
log_queue: multiprocessing.Queue = None,
lock: Lock = None,
shared_dict: dict = None,
used_server_ports: dict = None,
prometheus_exporter: PrometheusStatsExporter = None,
) -> None:
if log_handler and not isinstance(log_handler, Handler):
raise MustProvideALogHandlerError
self._log_queue = log_queue
self.logger = logging.getLogger(f"Logprep Pipeline {pipeline_index}")
self.logger.addHandler(logging.handlers.QueueHandler(log_queue))
self._logprep_config = config
self._timeout = config.get("timeout")
self._log_handler = log_handler
self._continue_iterating = Value(c_bool)

self._lock = lock
self._shared_dict = shared_dict
self._processing_counter = counter
if self._processing_counter:
print_processed_period = self._logprep_config.get("print_processed_period", 300)
self._processing_counter.setup(print_processed_period, log_handler, lock)
self._processing_counter.setup(print_processed_period, lock)
self._used_server_ports = used_server_ports
self._prometheus_exporter = prometheus_exporter
self.pipeline_index = pipeline_index
Expand Down Expand Up @@ -285,7 +266,7 @@ def metrics(self) -> PipelineMetrics:
def _pipeline(self) -> tuple:
self.logger.debug(f"Building '{self._process_name}'")
pipeline = [self._create_processor(entry) for entry in self._logprep_config.get("pipeline")]
self.logger.debug(f"Finished building pipeline ({self._process_name})")
self.logger.debug("Finished building pipeline")
return pipeline

@cached_property
Expand Down Expand Up @@ -313,28 +294,14 @@ def _input(self) -> Input:
)
return Factory.create(input_connector_config, self.logger)

@cached_property
def logger(self) -> Logger:
"""the pipeline logger"""
if self._log_handler is None:
return Logger("Pipeline")
if self._log_handler.level == NOTSET:
self._log_handler.level = INFO
logger = Logger("Pipeline", level=self._log_handler.level)
for handler in logger.handlers:
logger.removeHandler(handler)
logger.addHandler(self._log_handler)

return logger

@_handle_pipeline_error
def _setup(self):
self.logger.debug(f"Creating connectors ({self._process_name})")
self.logger.debug("Creating connectors")
for _, output in self._output.items():
output.input_connector = self._input
self.logger.debug(
f"Created connectors -> input: '{self._input.describe()}',"
f" output -> '{[output.describe() for _, output in self._output.items()]}' ({self._process_name})"
f" output -> '{[output.describe() for _, output in self._output.items()]}'"
)
self._input.pipeline_index = self.pipeline_index
self._input.setup()
Expand All @@ -345,10 +312,10 @@ def _setup(self):
while self._input.server.config.port in self._used_server_ports:
self._input.server.config.port += 1
self._used_server_ports.update({self._input.server.config.port: self._process_name})
self.logger.debug(f"Finished creating connectors ({self._process_name})")
self.logger.info(f"Start building pipeline ({self._process_name})")
self.logger.debug("Finished creating connectors")
self.logger.info("Start building pipeline")
_ = self._pipeline
self.logger.info(f"Finished building pipeline ({self._process_name})")
self.logger.info("Finished building pipeline")

def _create_processor(self, entry: dict) -> "Processor":
processor_name = list(entry.keys())[0]
Expand All @@ -357,7 +324,7 @@ def _create_processor(self, entry: dict) -> "Processor":
processor.setup()
if self.metrics:
self.metrics.pipeline.append(processor.metrics)
self.logger.debug(f"Created '{processor}' processor ({self._process_name})")
self.logger.debug(f"Created '{processor}' processor")
return processor

def run(self) -> None:
Expand All @@ -369,7 +336,7 @@ def run(self) -> None:
with warnings.catch_warnings():
warnings.simplefilter("default")
self._setup()
self.logger.debug(f"Start iterating ({self._process_name})")
self.logger.debug("Start iterating")
if hasattr(self._input, "server"):
with self._input.server.run_in_thread():
while self._iterate():
Expand Down Expand Up @@ -490,6 +457,7 @@ def _drain_input_queues(self) -> None:

def stop(self) -> None:
"""Stop processing processors in the Pipeline."""
self.logger.debug(f"Stopping pipeline ({self._process_name})")
with self._continue_iterating.get_lock():
self._continue_iterating.value = False

Expand All @@ -503,23 +471,20 @@ def __init__(
self,
pipeline_index: int,
config: dict,
log_handler: Handler,
log_queue: multiprocessing.Queue,
lock: Lock,
shared_dict: dict,
used_server_ports: dict,
prometheus_exporter: PrometheusStatsExporter = None,
) -> None:
if not isinstance(log_handler, MultiprocessingLogHandler):
raise MustProvideAnMPLogHandlerError

self._profile = config.get("profile_pipelines", False)

Pipeline.__init__(
self,
pipeline_index=pipeline_index,
config=config,
counter=self.processed_counter,
log_handler=log_handler,
log_queue=log_queue,
lock=lock,
shared_dict=shared_dict,
used_server_ports=used_server_ports,
Expand Down
38 changes: 16 additions & 22 deletions logprep/framework/pipeline_manager.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
"""This module contains functionality to manage pipelines via multi-processing."""
# pylint: disable=logging-fstring-interpolation

from logging import DEBUG, Logger
from multiprocessing import Lock, Manager
from queue import Empty
import logging
import logging.handlers
import multiprocessing

from logprep.framework.pipeline import MultiprocessingPipeline
from logprep.util.configuration import Configuration
from logprep.util.multiprocessing_log_handler import MultiprocessingLogHandler
from logprep.util.prometheus_exporter import PrometheusStatsExporter


class PipelineManagerError(BaseException):
class PipelineManagerError(Exception):
"""Base class for pipeline related exceptions."""


Expand All @@ -24,23 +24,25 @@ def __init__(self, what_failed: str):
class PipelineManager:
"""Manage pipelines via multi-processing."""

def __init__(self, logger: Logger):
self._logger = logger
def __init__(self):
self.prometheus_exporter = None
self._log_handler = MultiprocessingLogHandler(self._logger.level)
self._logger = logging.getLogger("Logprep PipelineManager")
self.log_queue = multiprocessing.Queue(-1)
self._queue_listener = logging.handlers.QueueListener(self.log_queue)
self._queue_listener.start()

self._pipelines = []
self._configuration = None

self._lock = Lock()
self._lock = multiprocessing.Lock()
self._shared_dict = None
self._used_server_ports = None

def set_configuration(self, configuration: Configuration):
"""set the verified config"""
self._configuration = configuration

manager = Manager()
manager = multiprocessing.Manager()
self._shared_dict = manager.dict()
self._used_server_ports = manager.dict()
for idx in range(configuration.get("process_count", 1)):
Expand All @@ -58,8 +60,7 @@ def get_count(self) -> int:
The pipeline count will be incrementally changed until it reaches this value.
"""
if self._logger.isEnabledFor(DEBUG): # pragma: no cover
self._logger.debug(f"Getting pipeline count: {len(self._pipelines)}")
self._logger.debug(f"Getting pipeline count: {len(self._pipelines)}")
return len(self._pipelines)

def set_count(self, count: int):
Expand Down Expand Up @@ -101,18 +102,11 @@ def restart_failed_pipeline(self):
self.set_count(self._configuration.get("process_count"))
self._logger.warning(f"Restarted {len(failed_pipelines)} failed pipeline(s)")

def handle_logs_into_logger(self, logger: Logger, timeout: float):
"""Handle logs."""
try:
logger.handle(self._log_handler.get(timeout))
while True:
logger.handle(self._log_handler.get(0.0))
except Empty:
pass

def stop(self):
"""Stop processing any pipelines by reducing the pipeline count to zero."""
self._decrease_to_count(0)
while len(self._pipelines) > 0:
self._logger.debug(f"Waiting for {len(self._pipelines)} pipelines to stop")

def _create_pipeline(self, index) -> MultiprocessingPipeline:
if self._configuration is None:
Expand All @@ -122,7 +116,7 @@ def _create_pipeline(self, index) -> MultiprocessingPipeline:
return MultiprocessingPipeline(
pipeline_index=index,
config=self._configuration,
log_handler=self._log_handler,
log_queue=self.log_queue,
lock=self._lock,
shared_dict=self._shared_dict,
used_server_ports=self._used_server_ports,
Expand Down
12 changes: 2 additions & 10 deletions logprep/run_logprep.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
import sys
import warnings
from argparse import ArgumentParser
from logging import ERROR, Logger, getLogger
from logging import ERROR, Logger, basicConfig, getLogger
from logging.handlers import SysLogHandler
from os.path import basename
from pathlib import Path

Expand All @@ -24,14 +25,6 @@
from logprep.util.schema_and_rule_checker import SchemaAndRuleChecker
from logprep.util.time_measurement import TimeMeasurement

from logging import (
getLogger,
basicConfig,
Logger,
)
from logging.handlers import SysLogHandler


warnings.simplefilter("always", DeprecationWarning)
logging.captureWarnings(True)

Expand Down Expand Up @@ -102,7 +95,6 @@ def _run_logprep(arguments, logger: Logger):
runner = None
try:
runner = Runner.get_runner()
runner.set_logger(logger)
runner.load_configuration(arguments.config)
logger.debug("Configuration loaded")
runner.start()
Expand Down
Loading

0 comments on commit e7c4540

Please sign in to comment.