Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

remove MultiprocessLogHandler #455

Merged
merged 3 commits into from
Oct 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 2 additions & 8 deletions logprep/abc/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -291,15 +291,9 @@ def load_rules(self, specific_rules_targets: List[str], generic_rules_targets: L
self._generic_tree.add_rule(rule, self._logger)
if self._logger.isEnabledFor(DEBUG): # pragma: no cover
number_specific_rules = self._specific_tree.metrics.number_of_rules
self._logger.debug(
f"{self.describe()} loaded {number_specific_rules} "
f"specific rules ({current_process().name})"
)
self._logger.debug(f"{self.describe()} loaded {number_specific_rules} specific rules")
number_generic_rules = self._generic_tree.metrics.number_of_rules
self._logger.debug(
f"{self.describe()} loaded {number_generic_rules} generic rules "
f"generic rules ({current_process().name})"
)
self._logger.debug(f"{self.describe()} loaded {number_generic_rules} generic rules")

@staticmethod
def _field_exists(event: dict, dotted_field: str) -> bool:
Expand Down
81 changes: 23 additions & 58 deletions logprep/framework/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@

"""
import copy
import logging
import logging.handlers
import multiprocessing

# pylint: disable=logging-fstring-interpolation
import queue
import warnings
from ctypes import c_bool, c_ulonglong
from functools import cached_property
from logging import INFO, NOTSET, Handler, Logger
from multiprocessing import Lock, Process, Value, current_process
from typing import Any, List, Tuple

Expand Down Expand Up @@ -42,34 +44,21 @@
from logprep.metrics.metric import Metric, calculate_new_average
from logprep.metrics.metric_exposer import MetricExposer
from logprep.processor.base.exceptions import ProcessingCriticalError, ProcessingWarning
from logprep.util.multiprocessing_log_handler import MultiprocessingLogHandler
from logprep.util.pipeline_profiler import PipelineProfiler
from logprep.util.prometheus_exporter import PrometheusStatsExporter
from logprep.util.time_measurement import TimeMeasurement


class PipelineError(BaseException):
"""Base class for Pipeline related exceptions."""


class MustProvideALogHandlerError(PipelineError):
"""Raise if no log handler was provided."""


class MustProvideAnMPLogHandlerError(BaseException):
"""Raise if no multiprocessing log handler was provided."""


class SharedCounter:
"""A shared counter for multiprocessing pipelines."""

def __init__(self):
self._val = Value(c_ulonglong, 0)
self._printed = Value(c_bool, False)
self._lock = None
self._logger = None
self._period = None
self.scheduler = Scheduler()
self._logger = logging.getLogger("Logprep SharedCounter")

def _init_timer(self, period: float):
if self._period is None:
Expand All @@ -79,17 +68,9 @@ def _init_timer(self, period: float):
self.scheduler.every(int(self._period)).seconds.do(self.print_value)
self.scheduler.every(int(self._period + 1)).seconds.do(self.reset_printed)

def _create_logger(self, log_handler: Handler):
if self._logger is None:
logger = Logger("Processing Counter", level=log_handler.level)
for handler in logger.handlers:
logger.removeHandler(handler)
logger.addHandler(log_handler)
self._logger = logger

def setup(self, print_processed_period: float, log_handler: Handler, lock: Lock):
def setup(self, print_processed_period: float, lock: Lock):
"""Setup shared counter for multiprocessing pipeline."""
self._create_logger(log_handler)

self._init_timer(print_processed_period)
self._lock = lock

Expand Down Expand Up @@ -194,7 +175,7 @@ def update_mean_processing_time_per_event(self, new_sample):
_logprep_config: dict
""" the logprep configuration dict """

_log_handler: Handler
_log_queue: multiprocessing.Queue
""" the handler for the logs """

_continue_iterating: Value
Expand All @@ -220,25 +201,25 @@ def __init__(
config: dict,
pipeline_index: int = None,
counter: "SharedCounter" = None,
log_handler: Handler = None,
log_queue: multiprocessing.Queue = None,
lock: Lock = None,
shared_dict: dict = None,
used_server_ports: dict = None,
prometheus_exporter: PrometheusStatsExporter = None,
) -> None:
if log_handler and not isinstance(log_handler, Handler):
raise MustProvideALogHandlerError
self._log_queue = log_queue
self.logger = logging.getLogger(f"Logprep Pipeline {pipeline_index}")
self.logger.addHandler(logging.handlers.QueueHandler(log_queue))
self._logprep_config = config
self._timeout = config.get("timeout")
self._log_handler = log_handler
self._continue_iterating = Value(c_bool)

self._lock = lock
self._shared_dict = shared_dict
self._processing_counter = counter
if self._processing_counter:
print_processed_period = self._logprep_config.get("print_processed_period", 300)
self._processing_counter.setup(print_processed_period, log_handler, lock)
self._processing_counter.setup(print_processed_period, lock)
self._used_server_ports = used_server_ports
self._prometheus_exporter = prometheus_exporter
self.pipeline_index = pipeline_index
Expand Down Expand Up @@ -285,7 +266,7 @@ def metrics(self) -> PipelineMetrics:
def _pipeline(self) -> tuple:
self.logger.debug(f"Building '{self._process_name}'")
pipeline = [self._create_processor(entry) for entry in self._logprep_config.get("pipeline")]
self.logger.debug(f"Finished building pipeline ({self._process_name})")
self.logger.debug("Finished building pipeline")
return pipeline

@cached_property
Expand Down Expand Up @@ -313,28 +294,14 @@ def _input(self) -> Input:
)
return Factory.create(input_connector_config, self.logger)

@cached_property
def logger(self) -> Logger:
"""the pipeline logger"""
if self._log_handler is None:
return Logger("Pipeline")
if self._log_handler.level == NOTSET:
self._log_handler.level = INFO
logger = Logger("Pipeline", level=self._log_handler.level)
for handler in logger.handlers:
logger.removeHandler(handler)
logger.addHandler(self._log_handler)

return logger

@_handle_pipeline_error
def _setup(self):
self.logger.debug(f"Creating connectors ({self._process_name})")
self.logger.debug("Creating connectors")
for _, output in self._output.items():
output.input_connector = self._input
self.logger.debug(
f"Created connectors -> input: '{self._input.describe()}',"
f" output -> '{[output.describe() for _, output in self._output.items()]}' ({self._process_name})"
f" output -> '{[output.describe() for _, output in self._output.items()]}'"
)
self._input.pipeline_index = self.pipeline_index
self._input.setup()
Expand All @@ -345,10 +312,10 @@ def _setup(self):
while self._input.server.config.port in self._used_server_ports:
self._input.server.config.port += 1
self._used_server_ports.update({self._input.server.config.port: self._process_name})
self.logger.debug(f"Finished creating connectors ({self._process_name})")
self.logger.info(f"Start building pipeline ({self._process_name})")
self.logger.debug("Finished creating connectors")
self.logger.info("Start building pipeline")
_ = self._pipeline
self.logger.info(f"Finished building pipeline ({self._process_name})")
self.logger.info("Finished building pipeline")

def _create_processor(self, entry: dict) -> "Processor":
processor_name = list(entry.keys())[0]
Expand All @@ -357,7 +324,7 @@ def _create_processor(self, entry: dict) -> "Processor":
processor.setup()
if self.metrics:
self.metrics.pipeline.append(processor.metrics)
self.logger.debug(f"Created '{processor}' processor ({self._process_name})")
self.logger.debug(f"Created '{processor}' processor")
return processor

def run(self) -> None:
Expand All @@ -369,7 +336,7 @@ def run(self) -> None:
with warnings.catch_warnings():
warnings.simplefilter("default")
self._setup()
self.logger.debug(f"Start iterating ({self._process_name})")
self.logger.debug("Start iterating")
if hasattr(self._input, "server"):
with self._input.server.run_in_thread():
while self._iterate():
Expand Down Expand Up @@ -490,6 +457,7 @@ def _drain_input_queues(self) -> None:

def stop(self) -> None:
"""Stop processing processors in the Pipeline."""
self.logger.debug(f"Stopping pipeline ({self._process_name})")
with self._continue_iterating.get_lock():
self._continue_iterating.value = False

Expand All @@ -503,23 +471,20 @@ def __init__(
self,
pipeline_index: int,
config: dict,
log_handler: Handler,
log_queue: multiprocessing.Queue,
lock: Lock,
shared_dict: dict,
used_server_ports: dict,
prometheus_exporter: PrometheusStatsExporter = None,
) -> None:
if not isinstance(log_handler, MultiprocessingLogHandler):
raise MustProvideAnMPLogHandlerError

self._profile = config.get("profile_pipelines", False)

Pipeline.__init__(
self,
pipeline_index=pipeline_index,
config=config,
counter=self.processed_counter,
log_handler=log_handler,
log_queue=log_queue,
lock=lock,
shared_dict=shared_dict,
used_server_ports=used_server_ports,
Expand Down
36 changes: 14 additions & 22 deletions logprep/framework/pipeline_manager.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
"""This module contains functionality to manage pipelines via multi-processing."""
# pylint: disable=logging-fstring-interpolation

from logging import DEBUG, Logger
from multiprocessing import Lock, Manager
from queue import Empty
import logging
import logging.handlers
import multiprocessing

from logprep.framework.pipeline import MultiprocessingPipeline
from logprep.util.configuration import Configuration
from logprep.util.multiprocessing_log_handler import MultiprocessingLogHandler
from logprep.util.prometheus_exporter import PrometheusStatsExporter


class PipelineManagerError(BaseException):
class PipelineManagerError(Exception):
"""Base class for pipeline related exceptions."""


Expand All @@ -24,23 +24,25 @@ def __init__(self, what_failed: str):
class PipelineManager:
"""Manage pipelines via multi-processing."""

def __init__(self, logger: Logger):
self._logger = logger
def __init__(self):
self.prometheus_exporter = None
self._log_handler = MultiprocessingLogHandler(self._logger.level)
self._logger = logging.getLogger("Logprep PipelineManager")
self.log_queue = multiprocessing.Queue(-1)
self._queue_listener = logging.handlers.QueueListener(self.log_queue)
self._queue_listener.start()

self._pipelines = []
self._configuration = None

self._lock = Lock()
self._lock = multiprocessing.Lock()
self._shared_dict = None
self._used_server_ports = None

def set_configuration(self, configuration: Configuration):
"""set the verified config"""
self._configuration = configuration

manager = Manager()
manager = multiprocessing.Manager()
self._shared_dict = manager.dict()
self._used_server_ports = manager.dict()
for idx in range(configuration.get("process_count", 1)):
Expand All @@ -58,8 +60,7 @@ def get_count(self) -> int:
The pipeline count will be incrementally changed until it reaches this value.

"""
if self._logger.isEnabledFor(DEBUG): # pragma: no cover
self._logger.debug(f"Getting pipeline count: {len(self._pipelines)}")
self._logger.debug(f"Getting pipeline count: {len(self._pipelines)}")
return len(self._pipelines)

def set_count(self, count: int):
Expand Down Expand Up @@ -101,15 +102,6 @@ def restart_failed_pipeline(self):
self.set_count(self._configuration.get("process_count"))
self._logger.warning(f"Restarted {len(failed_pipelines)} failed pipeline(s)")

def handle_logs_into_logger(self, logger: Logger, timeout: float):
"""Handle logs."""
try:
logger.handle(self._log_handler.get(timeout))
while True:
logger.handle(self._log_handler.get(0.0))
except Empty:
pass

def stop(self):
"""Stop processing any pipelines by reducing the pipeline count to zero."""
self._decrease_to_count(0)
Expand All @@ -122,7 +114,7 @@ def _create_pipeline(self, index) -> MultiprocessingPipeline:
return MultiprocessingPipeline(
pipeline_index=index,
config=self._configuration,
log_handler=self._log_handler,
log_queue=self.log_queue,
lock=self._lock,
shared_dict=self._shared_dict,
used_server_ports=self._used_server_ports,
Expand Down
1 change: 0 additions & 1 deletion logprep/run_logprep.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,6 @@ def _run_logprep(arguments, logger: logging.Logger):
runner = None
try:
runner = Runner.get_runner()
runner.set_logger(logger)
runner.load_configuration(arguments.config)
logger.debug("Configuration loaded")
runner.start()
Expand Down
Loading
Loading