diff --git a/logprep/framework/pipeline.py b/logprep/framework/pipeline.py index 258706baf..5e5ffd751 100644 --- a/logprep/framework/pipeline.py +++ b/logprep/framework/pipeline.py @@ -17,11 +17,10 @@ from ctypes import c_bool from functools import cached_property, partial from importlib.metadata import version -from multiprocessing import Lock, Value, current_process -from typing import Any, Generator, List, Optional, Tuple +from multiprocessing import Value, current_process +from typing import Any, Generator, List, Tuple import attrs -import msgspec from logprep.abc.component import Component from logprep.abc.input import ( @@ -69,30 +68,20 @@ class PipelineResult: pipeline: list[Processor] """The pipeline that processed the event""" - @property - def has_processing_errors(self) -> bool: - """Check if any of the results has processing errors.""" - return any(result.errors for result in self) - - @property - def has_processing_warnings(self) -> bool: - """Check if any of the results has processing errors.""" - return any(result.warnings for result in self) - - @property + @cached_property def errors(self) -> List[ProcessingError]: """Return all processing errors.""" - return itertools.chain(*[result.errors for result in self]) + return list(itertools.chain(*[result.errors for result in self])) - @property + @cached_property def warnings(self) -> List[ProcessingWarning]: """Return all processing warnings.""" - return itertools.chain(*[result.warnings for result in self]) + return list(itertools.chain(*[result.warnings for result in self])) - @property + @cached_property def data(self) -> List[Tuple[dict, dict]]: """Return all extra data.""" - return itertools.chain(*[result.data for result in self]) + return list(itertools.chain(*[result.data for result in self])) def __attrs_post_init__(self): self.results = list( @@ -152,9 +141,6 @@ class Metrics(Component.Metrics): _continue_iterating: Value """ a flag to signal if iterating continues """ - _lock: Lock - """ the lock for the pipeline process """ - pipeline_index: int """ the index of this pipeline """ @@ -214,19 +200,13 @@ def _input(self) -> Input: ) return Factory.create(input_connector_config) - def __init__( - self, config: Configuration, pipeline_index: int = None, lock: Lock = None - ) -> None: + def __init__(self, config: Configuration, pipeline_index: int = None) -> None: self.logger = logging.getLogger("Pipeline") self.logger.name = f"Pipeline{pipeline_index}" self._logprep_config = config self._timeout = config.timeout self._continue_iterating = Value(c_bool) - - self._lock = lock self.pipeline_index = pipeline_index - self._encoder = msgspec.msgpack.Encoder() - self._decoder = msgspec.msgpack.Decoder() if self._logprep_config.profile_pipelines: self.run = partial(PipelineProfiler.profile_function, self.run) @@ -263,10 +243,9 @@ def run(self) -> None: # pylint: disable=method-hidden self._continue_iterating.value = True assert self._input, "Pipeline should not be run without input connector" assert self._output, "Pipeline should not be run without output connector" - with self._lock: - with warnings.catch_warnings(): - warnings.simplefilter("default") - self._setup() + with warnings.catch_warnings(): + warnings.simplefilter("default") + self._setup() self.logger.debug("Start iterating") while self._continue_iterating.value: self.process_pipeline() @@ -281,9 +260,9 @@ def process_pipeline(self) -> PipelineResult: if not event: return None, None result: PipelineResult = self.process_event(event) - if result.has_processing_warnings: + if result.warnings: self.logger.warning(",".join((str(warning) for warning in result.warnings))) - if result.has_processing_errors: + if result.errors: self.logger.error(",".join((str(error) for error in result.errors))) self._store_failed_event(result.errors, result.event_received, event) return diff --git a/logprep/framework/pipeline_manager.py b/logprep/framework/pipeline_manager.py index a0cf6181f..bba5ac540 100644 --- a/logprep/framework/pipeline_manager.py +++ b/logprep/framework/pipeline_manager.py @@ -64,7 +64,6 @@ def __init__(self, configuration: Configuration): self._pipelines: list[multiprocessing.Process] = [] self._configuration = configuration - self._lock = multiprocessing.Lock() prometheus_config = self._configuration.metrics if prometheus_config.enabled: self.prometheus_exporter = PrometheusExporter(prometheus_config) @@ -164,7 +163,7 @@ def restart(self): self.prometheus_exporter.run() def _create_pipeline(self, index) -> multiprocessing.Process: - pipeline = Pipeline(pipeline_index=index, config=self._configuration, lock=self._lock) + pipeline = Pipeline(pipeline_index=index, config=self._configuration) logger.info("Created new pipeline") process = multiprocessing.Process(target=pipeline.run, daemon=True) process.stop = pipeline.stop