From e066f2de97c5d87d6908b2478c14546a493fdbd5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rg=20Zimmermann?= <101292599+ekneg54@users.noreply.github.com> Date: Thu, 27 Jun 2024 11:31:03 +0200 Subject: [PATCH] remove `elasticsearch_output` connector (#620) * Update changelog * remove elasticsearch dependency * remove elasticsearch output * remove get_conn mock and use responses with ordered registry --- CHANGELOG.md | 2 + README.md | 2 +- doc/source/development/architecture/index.rst | 2 +- .../user_manual/configuration/output.rst | 9 +- logprep/connector/elasticsearch/__init__.py | 0 logprep/connector/elasticsearch/output.py | 493 ------------------ logprep/connector/opensearch/output.py | 414 ++++++++++++++- .../processor/domain_resolver/processor.py | 2 +- logprep/processor/pre_detector/rule.py | 2 +- logprep/processor/pseudonymizer/processor.py | 2 +- logprep/registry.py | 2 - logprep/util/configuration.py | 3 +- logprep/util/defaults.py | 1 - pyproject.toml | 2 - quickstart/docker-compose.yml | 2 +- .../connector/test_elasticsearch_output.py | 368 ------------- .../unit/connector/test_opensearch_output.py | 1 - tests/unit/util/test_getter.py | 46 +- tests/unit/util/test_logging.py | 1 - 19 files changed, 429 insertions(+), 925 deletions(-) delete mode 100644 logprep/connector/elasticsearch/__init__.py delete mode 100644 logprep/connector/elasticsearch/output.py delete mode 100644 tests/unit/connector/test_elasticsearch_output.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 9bc12ac8f..7979c0b93 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,8 @@ * This release limits the maximum python version to `3.12.3` because of the issue [#612](https://github.com/fkie-cad/Logprep/issues/612). * Remove `normalizer` processor, as it's functionality was replaced by the `grokker`, `timestamper` and `field_manager` processors +* Remove `elasticsearch_output` connector to reduce maintenance effort + ### Features diff --git a/README.md b/README.md index 075c1e382..8e33a94b9 100644 --- a/README.md +++ b/README.md @@ -15,7 +15,7 @@ Logprep allows to collect, process and forward log messages from various data sources. Log messages are being read and written by so-called connectors. -Currently, connectors for Kafka, Opensearch, ElasticSearch, S3, HTTP and JSON(L) files exist. +Currently, connectors for Kafka, Opensearch, S3, HTTP and JSON(L) files exist. The log messages are processed in serial by a pipeline of processors, where each processor modifies an event that is being passed through. diff --git a/doc/source/development/architecture/index.rst b/doc/source/development/architecture/index.rst index 51a622bb9..8159a8510 100644 --- a/doc/source/development/architecture/index.rst +++ b/doc/source/development/architecture/index.rst @@ -73,7 +73,7 @@ The Ruletree diagramm shows how the matching rules for a given event are searche Output ====== -In this diagram, the last part about the backlog is specific for the Elasticsearch/ Opensearch Output. +In this diagram, the last part about the backlog is specific for the Opensearch Output. This was deemed to be important enough to be part of the diagram. .. raw:: html diff --git a/doc/source/user_manual/configuration/output.rst b/doc/source/user_manual/configuration/output.rst index 432140edc..8f6fe65c2 100644 --- a/doc/source/user_manual/configuration/output.rst +++ b/doc/source/user_manual/configuration/output.rst @@ -12,7 +12,7 @@ logprep only guaranties that one output has received data by calling the .. security-best-practice:: :title: Output Connectors :location: config.output..type - :suggested-value: + :suggested-value: Similar to the input connectors there is a list of available output connectors of which some are only meant for debugging, namely: :code:`ConsoleOutput` and :code:`JsonlOutput`. @@ -43,13 +43,6 @@ logprep only guaranties that one output has received data by calling the :inherited-members: :noindex: -.. automodule:: logprep.connector.elasticsearch.output -.. autoclass:: logprep.connector.elasticsearch.output.ElasticsearchOutput.Config - :members: - :undoc-members: - :inherited-members: - :noindex: - .. automodule:: logprep.connector.opensearch.output .. autoclass:: logprep.connector.opensearch.output.OpensearchOutput.Config :members: diff --git a/logprep/connector/elasticsearch/__init__.py b/logprep/connector/elasticsearch/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/logprep/connector/elasticsearch/output.py b/logprep/connector/elasticsearch/output.py deleted file mode 100644 index 2355c7130..000000000 --- a/logprep/connector/elasticsearch/output.py +++ /dev/null @@ -1,493 +0,0 @@ -""" -ElasticsearchOutput -=================== - -This section contains the connection settings for Elasticsearch, the default -index, the error index and a buffer size. Documents are sent in batches to Elasticsearch to reduce -the amount of times connections are created. - -The documents desired index is the field :code:`_index` in the document. It is deleted afterwards. -If you want to send documents to datastreams, you have to set the field :code:`_op_type: create` in -the document. - -Example -^^^^^^^ -.. code-block:: yaml - :linenos: - - output: - myelasticsearch_output: - type: elasticsearch_output - hosts: - - 127.0.0.1:9200 - default_index: default_index - error_index: error_index - message_backlog_size: 10000 - timeout: 10000 - max_retries: - user: - secret: - ca_cert: /path/to/cert.crt -""" - -import json -import logging -import re -import ssl -from functools import cached_property -from typing import List, Optional, Pattern, Tuple, Union - -import elasticsearch as search -from attr import define, field -from attrs import validators -from elasticsearch import ElasticsearchException, helpers -from opensearchpy import OpenSearchException -from urllib3.exceptions import TimeoutError # pylint: disable=redefined-builtin - -from logprep.abc.output import FatalOutputError, Output -from logprep.metrics.metrics import Metric -from logprep.util.helper import get_dict_size_in_byte -from logprep.util.time import TimeParser - -logger = logging.getLogger("ElasticsearchOutput") - - -class ElasticsearchOutput(Output): - """An Elasticsearch output connector.""" - - @define(kw_only=True, slots=False) - class Config(Output.Config): - """Elastic/Opensearch Output Config - - .. security-best-practice:: - :title: Output Connectors - ElasticsearchOutput - - It is suggested to enable a secure message transfer by setting :code:`user`, - :code:`secret` and a valid :code:`ca_cert`. - """ - - hosts: List[str] = field( - validator=validators.deep_iterable( - member_validator=validators.instance_of((str, type(None))), - iterable_validator=validators.instance_of(list), - ), - default=[], - ) - """Addresses of elasticsearch/opensearch servers. Can be a list of hosts or one single host - in the format HOST:PORT without specifying a schema. The schema is set automatically to - https if a certificate is being used.""" - default_index: str = field(validator=validators.instance_of(str)) - """Default index to write to if no index was set in the document or the document could not - be indexed. The document will be transformed into a string to prevent rejections by the - default index.""" - error_index: str = field(validator=validators.instance_of(str)) - """Index to write documents to that could not be processed.""" - message_backlog_size: int = field(validator=validators.instance_of(int)) - """Amount of documents to store before sending them.""" - maximum_message_size_mb: Optional[Union[float, int]] = field( - validator=validators.optional(validators.instance_of((float, int))), - converter=(lambda x: x * 10**6 if x else None), - default=None, - ) - """(Optional) Maximum estimated size of a document in MB before discarding it if it causes - an error.""" - timeout: int = field(validator=validators.instance_of(int), default=500) - """(Optional) Timeout for the connection (default is 500ms).""" - max_retries: int = field(validator=validators.instance_of(int), default=0) - """(Optional) Maximum number of retries for documents rejected with code 429 (default is 0). - Increases backoff time by 2 seconds per try, but never exceeds 600 seconds. When using - parallel_bulk in the opensearch connector then the backoff time starts with 1 second. With - each consecutive retry 500 to 1000 ms will be added to the delay, chosen randomly """ - user: Optional[str] = field(validator=validators.instance_of(str), default="") - """(Optional) User used for authentication.""" - secret: Optional[str] = field(validator=validators.instance_of(str), default="") - """(Optional) Secret used for authentication.""" - ca_cert: Optional[str] = field(validator=validators.instance_of(str), default="") - """(Optional) Path to a SSL ca certificate to verify the ssl context.""" - flush_timeout: Optional[int] = field(validator=validators.instance_of(int), default=60) - """(Optional) Timeout after :code:`message_backlog` is flushed if - :code:`message_backlog_size` is not reached.""" - - __slots__ = ["_message_backlog", "_size_error_pattern"] - - _message_backlog: List - - _size_error_pattern: Pattern[str] - - def __init__(self, name: str, configuration: "ElasticsearchOutput.Config"): - super().__init__(name, configuration) - self._message_backlog = [] - self._size_error_pattern = re.compile( - r".*coordinating_operation_bytes=(?P\d+), " - r"max_coordinating_and_primary_bytes=(?P\d+).*" - ) - - @cached_property - def ssl_context(self) -> ssl.SSLContext: - """Returns the ssl context - - Returns - ------- - SSLContext - The ssl context - """ - return ( - ssl.create_default_context(cafile=self._config.ca_cert) - if self._config.ca_cert - else None - ) - - @property - def schema(self) -> str: - """Returns the shema. `https` if ssl config is set, else `http` - - Returns - ------- - str - the shema - """ - return "https" if self._config.ca_cert else "http" - - @property - def http_auth(self) -> tuple: - """Returns the credentials - - Returns - ------- - tuple - the credentials in format (user, secret) - """ - return ( - (self._config.user, self._config.secret) - if self._config.user and self._config.secret - else None - ) - - @cached_property - def _search_context(self) -> search.Elasticsearch: - """Returns a elasticsearch context - - Returns - ------- - elasticsearch.Elasticsearch - the eleasticsearch context - """ - return search.Elasticsearch( - self._config.hosts, - scheme=self.schema, - http_auth=self.http_auth, - ssl_context=self.ssl_context, - timeout=self._config.timeout, - ) - - @cached_property - def _replace_pattern(self): - return re.compile(r"%{\S+?}") - - def describe(self) -> str: - """Get name of Elasticsearch endpoint with the host. - - Returns - ------- - elasticsearch_output : ElasticsearchOutput - Acts as output connector for Elasticsearch. - - """ - base_description = super().describe() - return f"{base_description} - ElasticSearch Output: {self._config.hosts}" - - def setup(self): - super().setup() - flush_timeout = self._config.flush_timeout - self._schedule_task(task=self._write_backlog, seconds=flush_timeout) - try: - self._search_context.info() - except (ElasticsearchException, OpenSearchException, TimeoutError) as error: - raise FatalOutputError(self, error) from error - - def store(self, document: dict): - """Store a document in the index. - - Parameters - ---------- - document : dict - Document to store. - - Returns - ------- - Returns True to inform the pipeline to call the batch_finished_callback method in the - configured input - """ - if document.get("_index") is None: - document = self._build_failed_index_document(document, "Missing index in document") - - self._add_dates(document) - self.metrics.number_of_processed_events += 1 - self._message_backlog.append(document) - self._write_to_search_context() - - def store_custom(self, document: dict, target: str): - """Store document into backlog to be written into Elasticsearch with the target index. - - Only add to backlog instead of writing the batch and calling batch_finished_callback, - since store_custom can be called before the event has been fully processed. - Setting the offset or comiting before fully processing an event can lead to data loss if - Logprep terminates. - - Parameters - ---------- - document : dict - Document to be stored into the target index. - target : str - Index to store the document in. - Raises - ------ - CriticalOutputError - Raises if any error except a BufferError occurs while writing into elasticsearch. - - """ - document["_index"] = target - self._add_dates(document) - self.metrics.number_of_processed_events += 1 - self._message_backlog.append(document) - - def store_failed(self, error_message: str, document_received: dict, document_processed: dict): - """Write errors into error topic for documents that failed processing. - - Parameters - ---------- - error_message : str - Error message to write into Kafka document. - document_received : dict - Document as it was before processing. - document_processed : dict - Document after processing until an error occurred. - - """ - self.metrics.number_of_failed_events += 1 - error_document = { - "error": error_message, - "original": document_received, - "processed": document_processed, - "@timestamp": TimeParser.now().isoformat(), - "_index": self._config.error_index, - } - self._add_dates(error_document) - self._message_backlog.append(error_document) - self._write_to_search_context() - - def _build_failed_index_document(self, message_document: dict, reason: str): - document = { - "reason": reason, - "@timestamp": TimeParser.now().isoformat(), - "_index": self._config.default_index, - } - try: - document["message"] = json.dumps(message_document) - except TypeError: - document["message"] = str(message_document) - return document - - def _add_dates(self, document): - date_format_matches = self._replace_pattern.findall(document["_index"]) - if date_format_matches: - now = TimeParser.now() - for date_format_match in date_format_matches: - formatted_date = now.strftime(date_format_match[2:-1]) - document["_index"] = re.sub(date_format_match, formatted_date, document["_index"]) - - def _write_to_search_context(self): - """Writes documents from a buffer into Elasticsearch indices. - - Writes documents in a bulk if the document buffer limit has been reached. - This reduces connections to Elasticsearch. - The target index is determined per document by the value of the meta field '_index'. - A configured default index is used if '_index' hasn't been set. - - Returns - ------- - Returns True to inform the pipeline to call the batch_finished_callback method in the - configured input - """ - if len(self._message_backlog) >= self._config.message_backlog_size: - self._write_backlog() - - @Metric.measure_time() - def _write_backlog(self): - if not self._message_backlog: - return - - self._bulk( - self._search_context, - self._message_backlog, - max_retries=self._config.max_retries, - chunk_size=len(self._message_backlog), - ) - if self.input_connector and hasattr(self.input_connector, "batch_finished_callback"): - self.input_connector.batch_finished_callback() - self._message_backlog.clear() - - def _bulk(self, client, actions, *args, **kwargs): - try: - helpers.bulk(client, actions, *args, **kwargs) - except search.SerializationError as error: - self._handle_serialization_error(error) - except search.ConnectionError as error: - self._handle_connection_error(error) - except helpers.BulkIndexError as error: - self._handle_bulk_index_error(error) - except search.exceptions.TransportError as error: - self._handle_transport_error(error) - - def _handle_serialization_error(self, error: search.SerializationError): - """Handle serialization error for elasticsearch bulk indexing. - - If at least one document in a chunk can't be serialized, no events will be sent. - The chunk size is thus set to be the same size as the message backlog size. - Therefore, it won't result in duplicates once the data is resent. - - Parameters - ---------- - error : SerializationError - SerializationError for the error message. - - Raises - ------ - FatalOutputError - This causes a pipeline rebuild and gives an appropriate error log message. - - """ - raise FatalOutputError(self, f"{error.args[1]} in document {error.args[0]}") - - def _handle_connection_error(self, error: search.ConnectionError): - """Handle connection error for elasticsearch bulk indexing. - - No documents will be sent if there is no connection to begin with. - Therefore, it won't result in duplicates once the the data is resent. - If the connection is lost during indexing, duplicate documents could be sent. - - Parameters - ---------- - error : ConnectionError - ConnectionError for the error message. - - Raises - ------ - FatalOutputError - This causes a pipeline rebuild and gives an appropriate error log message. - - """ - raise FatalOutputError(self, error.error) - - def _handle_bulk_index_error(self, error: helpers.BulkIndexError): - """Handle bulk indexing error for elasticsearch bulk indexing. - - Documents that could not be sent to elastiscsearch due to index errors are collected and - sent into an error index that should always accept all documents. - This can lead to a rebuild of the pipeline if this causes another exception. - - Parameters - ---------- - error : BulkIndexError - BulkIndexError to collect IndexErrors from. - - """ - error_documents = [] - for bulk_error in error.errors: - _, error_info = bulk_error.popitem() - data = error_info.get("data") if "data" in error_info else None - error_type = error_info.get("error").get("type") - error_reason = error_info.get("error").get("reason") - reason = f"{error_type}: {error_reason}" - error_document = self._build_failed_index_document(data, reason) - self._add_dates(error_document) - error_documents.append(error_document) - self._bulk(self._search_context, error_documents) - - def _handle_transport_error(self, error: search.exceptions.TransportError): - """Handle transport error for elasticsearch bulk indexing. - - Discard messages that exceed the maximum size if they caused an error. - - Parameters - ---------- - error : TransportError - TransportError for the error message. - - """ - if self._config.maximum_message_size_mb is None: - raise FatalOutputError(self, error.error) - - if self._message_exceeds_max_size_error(error): - ( - messages_under_size_limit, - messages_over_size_limit, - ) = self._split_message_backlog_by_size_limit() - - if len(messages_over_size_limit) == 0: - raise FatalOutputError(self, error.error) - - error_documents = self._build_messages_for_large_error_documents( - messages_over_size_limit - ) - self._message_backlog = error_documents + messages_under_size_limit - self._bulk(self._search_context, self._message_backlog) - else: - raise FatalOutputError(self, error.error) - - def _message_exceeds_max_size_error(self, error): - if error.status_code == 429: - if error.error == "circuit_breaking_exception": - return True - - if error.error == "rejected_execution_exception": - reason = error.info.get("error", {}).get("reason", "") - match = self._size_error_pattern.match(reason) - if match and int(match.group("size")) >= int(match.group("max_size")): - return True - return False - - def _split_message_backlog_by_size_limit(self): - messages_under_size_limit = [] - messages_over_size_limit = [] - total_size = 0 - for message in self._message_backlog: - message_size = get_dict_size_in_byte(message) - if message_size < self._config.maximum_message_size_mb: - messages_under_size_limit.append(message) - total_size += message_size - else: - messages_over_size_limit.append((message, message_size)) - return messages_under_size_limit, messages_over_size_limit - - def _build_messages_for_large_error_documents( - self, messages_over_size_limit: List[Tuple[dict, int]] - ) -> List[dict]: - """Build error message for messages that were larger than the allowed size limit. - - Only a snipped of the message is stored in the error document, since the original message - was too large to be written in the first place. - - Parameters - ---------- - messages_over_size_limit : List[Tuple[dict, int]] - Messages that were too large with their corresponding sizes in byte. - - """ - error_documents = [] - for message, size in messages_over_size_limit: - error_message = ( - f"Discarded message that is larger than the allowed size limit " - f"({size / 10 ** 6} MB/{self._config.maximum_message_size_mb} MB)" - ) - logger.warning(error_message) - - error_document = { - "processed_snipped": f'{self._encoder.encode(message).decode("utf-8")[:1000]} ...', - "error": error_message, - "@timestamp": TimeParser.now().isoformat(), - "_index": self._config.error_index, - } - self._add_dates(error_document) - error_documents.append(error_document) - return error_documents diff --git a/logprep/connector/opensearch/output.py b/logprep/connector/opensearch/output.py index 7ea404347..effe3e1ea 100644 --- a/logprep/connector/opensearch/output.py +++ b/logprep/connector/opensearch/output.py @@ -2,12 +2,12 @@ OpensearchOutput ================ -This section contains the connection settings for Elasticsearch, the default -index, the error index and a buffer size. Documents are sent in batches to Elasticsearch to reduce +This section contains the connection settings for Opensearch, the default +index, the error index and a buffer size. Documents are sent in batches to Opensearch to reduce the amount of times connections are created. The documents desired index is the field :code:`_index` in the document. It is deleted afterwards. -If you want to send documents to datastreams, you have to set the field :code:`_op_type: create` in +If you want to send documents to data streams, you have to set the field :code:`_op_type: create` in the document. Example @@ -30,18 +30,24 @@ ca_cert: /path/to/cert.crt """ +import json import logging import random +import re +import ssl import time from functools import cached_property +from typing import List, Optional, Tuple, Union import opensearchpy as search from attrs import define, field, validators -from opensearchpy import helpers +from opensearchpy import OpenSearchException, helpers from opensearchpy.serializer import JSONSerializer -from logprep.abc.output import Output, FatalOutputError -from logprep.connector.elasticsearch.output import ElasticsearchOutput +from logprep.abc.output import FatalOutputError, Output +from logprep.metrics.metrics import Metric +from logprep.util.helper import get_dict_size_in_byte +from logprep.util.time import TimeParser logger = logging.getLogger("OpenSearchOutput") @@ -67,13 +73,12 @@ def loads(self, data): return self._decoder.decode(data) -class OpensearchOutput(ElasticsearchOutput): +class OpensearchOutput(Output): """An OpenSearch output connector.""" @define(kw_only=True, slots=False) - class Config(ElasticsearchOutput.Config): - """ - Config for OpensearchOutput. + class Config(Output.Config): + """Opensearch Output Config .. security-best-practice:: :title: Output Connectors - OpensearchOutput @@ -82,6 +87,48 @@ class Config(ElasticsearchOutput.Config): :code:`secret` and a valid :code:`ca_cert`. """ + hosts: List[str] = field( + validator=validators.deep_iterable( + member_validator=validators.instance_of((str, type(None))), + iterable_validator=validators.instance_of(list), + ), + default=[], + ) + """Addresses of opensearch/opensearch servers. Can be a list of hosts or one single host + in the format HOST:PORT without specifying a schema. The schema is set automatically to + https if a certificate is being used.""" + default_index: str = field(validator=validators.instance_of(str)) + """Default index to write to if no index was set in the document or the document could not + be indexed. The document will be transformed into a string to prevent rejections by the + default index.""" + error_index: str = field(validator=validators.instance_of(str)) + """Index to write documents to that could not be processed.""" + message_backlog_size: int = field(validator=validators.instance_of(int)) + """Amount of documents to store before sending them.""" + maximum_message_size_mb: Optional[Union[float, int]] = field( + validator=validators.optional(validators.instance_of((float, int))), + converter=(lambda x: x * 10**6 if x else None), + default=None, + ) + """(Optional) Maximum estimated size of a document in MB before discarding it if it causes + an error.""" + timeout: int = field(validator=validators.instance_of(int), default=500) + """(Optional) Timeout for the connection (default is 500ms).""" + max_retries: int = field(validator=validators.instance_of(int), default=0) + """(Optional) Maximum number of retries for documents rejected with code 429 (default is 0). + Increases backoff time by 2 seconds per try, but never exceeds 600 seconds. When using + parallel_bulk in the opensearch connector then the backoff time starts with 1 second. With + each consecutive retry 500 to 1000 ms will be added to the delay, chosen randomly """ + user: Optional[str] = field(validator=validators.instance_of(str), default="") + """(Optional) User used for authentication.""" + secret: Optional[str] = field(validator=validators.instance_of(str), default="") + """(Optional) Secret used for authentication.""" + ca_cert: Optional[str] = field(validator=validators.instance_of(str), default="") + """(Optional) Path to a SSL ca certificate to verify the ssl context.""" + flush_timeout: Optional[int] = field(validator=validators.instance_of(int), default=60) + """(Optional) Timeout after :code:`message_backlog` is flushed if + :code:`message_backlog_size` is not reached.""" + parallel_bulk: bool = field(default=True, validator=validators.instance_of(bool)) """Configure if all events in the backlog should be send, in parallel, via multiple threads to Opensearch. (Default: :code:`True`)""" @@ -98,6 +145,53 @@ class Config(ElasticsearchOutput.Config): ) """Chunk size to use for bulk requests.""" + __slots__ = ["_message_backlog", "_size_error_pattern"] + + _message_backlog: List + + _size_error_pattern: re.Pattern[str] + + @cached_property + def ssl_context(self) -> ssl.SSLContext: + """Returns the ssl context + + Returns + ------- + SSLContext + The ssl context + """ + return ( + ssl.create_default_context(cafile=self._config.ca_cert) + if self._config.ca_cert + else None + ) + + @property + def schema(self) -> str: + """Returns the schema. `https` if ssl config is set, else `http` + + Returns + ------- + str + the schema + """ + return "https" if self._config.ca_cert else "http" + + @property + def http_auth(self) -> tuple: + """Returns the credentials + + Returns + ------- + tuple + the credentials in format (user, secret) + """ + return ( + (self._config.user, self._config.secret) + if self._config.user and self._config.secret + else None + ) + @cached_property def _search_context(self): return search.OpenSearch( @@ -109,18 +203,161 @@ def _search_context(self): serializer=MSGPECSerializer(self), ) + @cached_property + def _replace_pattern(self): + return re.compile(r"%{\S+?}") + + def __init__(self, name: str, configuration: "OpensearchOutput.Config"): + super().__init__(name, configuration) + self._message_backlog = [] + self._size_error_pattern = re.compile( + r".*coordinating_operation_bytes=(?P\d+), " + r"max_coordinating_and_primary_bytes=(?P\d+).*" + ) + + def setup(self): + super().setup() + flush_timeout = self._config.flush_timeout + self._schedule_task(task=self._write_backlog, seconds=flush_timeout) + try: + self._search_context.info() + except (OpenSearchException, TimeoutError) as error: + raise FatalOutputError(self, error) from error + def describe(self) -> str: - """Get name of Elasticsearch endpoint with the host. + """Get name of Opensearch endpoint with the host. Returns ------- opensearch_output : OpensearchOutput - Acts as output connector for Elasticsearch. + Acts as output connector for Opensearch. """ base_description = Output.describe(self) return f"{base_description} - Opensearch Output: {self._config.hosts}" + def store(self, document: dict): + """Store a document in the index. + + Parameters + ---------- + document : dict + Document to store. + + Returns + ------- + Returns True to inform the pipeline to call the batch_finished_callback method in the + configured input + """ + if document.get("_index") is None: + document = self._build_failed_index_document(document, "Missing index in document") + + self._add_dates(document) + self.metrics.number_of_processed_events += 1 + self._message_backlog.append(document) + self._write_to_search_context() + + def store_custom(self, document: dict, target: str): + """Store document into backlog to be written into Opensearch with the target index. + + Only add to backlog instead of writing the batch and calling batch_finished_callback, + since store_custom can be called before the event has been fully processed. + Setting the offset or committing before fully processing an event can lead to data loss if + Logprep terminates. + + Parameters + ---------- + document : dict + Document to be stored into the target index. + target : str + Index to store the document in. + Raises + ------ + CriticalOutputError + Raises if any error except a BufferError occurs while writing into opensearch. + + """ + document["_index"] = target + self._add_dates(document) + self.metrics.number_of_processed_events += 1 + self._message_backlog.append(document) + + def store_failed(self, error_message: str, document_received: dict, document_processed: dict): + """Write errors into error topic for documents that failed processing. + + Parameters + ---------- + error_message : str + Error message to write into Kafka document. + document_received : dict + Document as it was before processing. + document_processed : dict + Document after processing until an error occurred. + + """ + self.metrics.number_of_failed_events += 1 + error_document = { + "error": error_message, + "original": document_received, + "processed": document_processed, + "@timestamp": TimeParser.now().isoformat(), + "_index": self._config.error_index, + } + self._add_dates(error_document) + self._message_backlog.append(error_document) + self._write_to_search_context() + + def _build_failed_index_document(self, message_document: dict, reason: str): + document = { + "reason": reason, + "@timestamp": TimeParser.now().isoformat(), + "_index": self._config.default_index, + } + try: + document["message"] = json.dumps(message_document) + except TypeError: + document["message"] = str(message_document) + return document + + def _add_dates(self, document): + date_format_matches = self._replace_pattern.findall(document["_index"]) + if date_format_matches: + now = TimeParser.now() + for date_format_match in date_format_matches: + formatted_date = now.strftime(date_format_match[2:-1]) + document["_index"] = re.sub(date_format_match, formatted_date, document["_index"]) + + def _write_to_search_context(self): + """Writes documents from a buffer into Opensearch indices. + + Writes documents in a bulk if the document buffer limit has been reached. + This reduces connections to Opensearch. + The target index is determined per document by the value of the meta field '_index'. + A configured default index is used if '_index' hasn't been set. + + Returns + ------- + Returns True to inform the pipeline to call the batch_finished_callback method in the + configured input + """ + if len(self._message_backlog) >= self._config.message_backlog_size: + self._write_backlog() + + @Metric.measure_time() + def _write_backlog(self): + if not self._message_backlog: + return + + self._bulk( + self._search_context, + self._message_backlog, + max_retries=self._config.max_retries, + chunk_size=len(self._message_backlog), + ) + if self.input_connector and hasattr(self.input_connector, "batch_finished_callback"): + self.input_connector.batch_finished_callback() + self._message_backlog.clear() + def _bulk(self, client, actions, *args, **kwargs): try: if self._config.parallel_bulk: @@ -164,3 +401,156 @@ def _parallel_bulk(self, client, actions, *args, **kwargs): raise FatalOutputError( self, "Opensearch too many requests, all parallel bulk retries failed" ) + + def _handle_serialization_error(self, error: search.SerializationError): + """Handle serialization error for opensearch bulk indexing. + + If at least one document in a chunk can't be serialized, no events will be sent. + The chunk size is thus set to be the same size as the message backlog size. + Therefore, it won't result in duplicates once the data is resent. + + Parameters + ---------- + error : SerializationError + SerializationError for the error message. + + Raises + ------ + FatalOutputError + This causes a pipeline rebuild and gives an appropriate error log message. + + """ + raise FatalOutputError(self, f"{error.args[1]} in document {error.args[0]}") + + def _handle_connection_error(self, error: search.ConnectionError): + """Handle connection error for opensearch bulk indexing. + + No documents will be sent if there is no connection to begin with. + Therefore, it won't result in duplicates once the the data is resent. + If the connection is lost during indexing, duplicate documents could be sent. + + Parameters + ---------- + error : ConnectionError + ConnectionError for the error message. + + Raises + ------ + FatalOutputError + This causes a pipeline rebuild and gives an appropriate error log message. + + """ + raise FatalOutputError(self, error.error) + + def _handle_bulk_index_error(self, error: helpers.BulkIndexError): + """Handle bulk indexing error for opensearch bulk indexing. + + Documents that could not be sent to opensearch due to index errors are collected and + sent into an error index that should always accept all documents. + This can lead to a rebuild of the pipeline if this causes another exception. + + Parameters + ---------- + error : BulkIndexError + BulkIndexError to collect IndexErrors from. + + """ + error_documents = [] + for bulk_error in error.errors: + _, error_info = bulk_error.popitem() + data = error_info.get("data") if "data" in error_info else None + error_type = error_info.get("error").get("type") + error_reason = error_info.get("error").get("reason") + reason = f"{error_type}: {error_reason}" + error_document = self._build_failed_index_document(data, reason) + self._add_dates(error_document) + error_documents.append(error_document) + self._bulk(self._search_context, error_documents) + + def _handle_transport_error(self, error: search.exceptions.TransportError): + """Handle transport error for opensearch bulk indexing. + + Discard messages that exceed the maximum size if they caused an error. + + Parameters + ---------- + error : TransportError + TransportError for the error message. + + """ + if self._config.maximum_message_size_mb is None: + raise FatalOutputError(self, error.error) + + if self._message_exceeds_max_size_error(error): + ( + messages_under_size_limit, + messages_over_size_limit, + ) = self._split_message_backlog_by_size_limit() + + if len(messages_over_size_limit) == 0: + raise FatalOutputError(self, error.error) + + error_documents = self._build_messages_for_large_error_documents( + messages_over_size_limit + ) + self._message_backlog = error_documents + messages_under_size_limit + self._bulk(self._search_context, self._message_backlog) + else: + raise FatalOutputError(self, error.error) + + def _message_exceeds_max_size_error(self, error): + if error.status_code == 429: + if error.error == "circuit_breaking_exception": + return True + + if error.error == "rejected_execution_exception": + reason = error.info.get("error", {}).get("reason", "") + match = self._size_error_pattern.match(reason) + if match and int(match.group("size")) >= int(match.group("max_size")): + return True + return False + + def _build_messages_for_large_error_documents( + self, messages_over_size_limit: List[Tuple[dict, int]] + ) -> List[dict]: + """Build error message for messages that were larger than the allowed size limit. + + Only a snipped of the message is stored in the error document, since the original message + was too large to be written in the first place. + + Parameters + ---------- + messages_over_size_limit : List[Tuple[dict, int]] + Messages that were too large with their corresponding sizes in byte. + + """ + error_documents = [] + for message, size in messages_over_size_limit: + error_message = ( + f"Discarded message that is larger than the allowed size limit " + f"({size / 10 ** 6} MB/{self._config.maximum_message_size_mb} MB)" + ) + logger.warning(error_message) + + error_document = { + "processed_snipped": f'{self._encoder.encode(message).decode("utf-8")[:1000]} ...', + "error": error_message, + "@timestamp": TimeParser.now().isoformat(), + "_index": self._config.error_index, + } + self._add_dates(error_document) + error_documents.append(error_document) + return error_documents + + def _split_message_backlog_by_size_limit(self): + messages_under_size_limit = [] + messages_over_size_limit = [] + total_size = 0 + for message in self._message_backlog: + message_size = get_dict_size_in_byte(message) + if message_size < self._config.maximum_message_size_mb: + messages_under_size_limit.append(message) + total_size += message_size + else: + messages_over_size_limit.append((message, message_size)) + return messages_under_size_limit, messages_over_size_limit diff --git a/logprep/processor/domain_resolver/processor.py b/logprep/processor/domain_resolver/processor.py index 137b31be3..243a11e29 100644 --- a/logprep/processor/domain_resolver/processor.py +++ b/logprep/processor/domain_resolver/processor.py @@ -84,7 +84,7 @@ class Config(Processor.Config): max_caching_days: int = field(validator=validators.instance_of(int)) """Number of days a domains is cached after the last time it appeared. This caching reduces the CPU load of Logprep (no demanding encryption must be performed - repeatedly) and the load on subsequent components (i.e. Logstash or Elasticsearch). + repeatedly) and the load on subsequent components (i.e. Logstash or Opensearch). Setting the caching days to Null deactivates the caching. In case the cache size has been exceeded (see `domain_resolver.max_cached_domains`),the oldest cached pseudonyms will be discarded first.Thus, it is possible that a domain is re-added to the cache before diff --git a/logprep/processor/pre_detector/rule.py b/logprep/processor/pre_detector/rule.py index 6072df6ff..78c96ffd4 100644 --- a/logprep/processor/pre_detector/rule.py +++ b/logprep/processor/pre_detector/rule.py @@ -64,7 +64,7 @@ } This generated extra output contains a corresponding :code:`rule_filter` in lucene notation, which -can be used to further investigate this rule in an existing OpenSearch or ElasticSearch. +can be used to further investigate this rule in an existing OpenSearch. Additionally, the optional field :code:`ip_fields` can be specified. It allows to specify a list of fields that can be compared to a list of IPs, diff --git a/logprep/processor/pseudonymizer/processor.py b/logprep/processor/pseudonymizer/processor.py index e084af194..d51316425 100644 --- a/logprep/processor/pseudonymizer/processor.py +++ b/logprep/processor/pseudonymizer/processor.py @@ -128,7 +128,7 @@ class Config(FieldManager.Config): million elements would require about 2.3 GB RAM. The cache is not persisted. Restarting Logprep does therefore clear the cache. This caching reduces the CPU load of Logprep (no demanding encryption must be performed - repeatedly) and the load on subsequent components (i.e. Logstash or Elasticsearch). + repeatedly) and the load on subsequent components (i.e. Logstash or Opensearch). In case the cache size has been exceeded, the least recently used entry is deleted. Has to be greater than 0. """ diff --git a/logprep/registry.py b/logprep/registry.py index c6090d7fd..c9329e52b 100644 --- a/logprep/registry.py +++ b/logprep/registry.py @@ -8,7 +8,6 @@ from logprep.connector.console.output import ConsoleOutput from logprep.connector.dummy.input import DummyInput from logprep.connector.dummy.output import DummyOutput -from logprep.connector.elasticsearch.output import ElasticsearchOutput from logprep.connector.file.input import FileInput from logprep.connector.http.input import HttpInput from logprep.connector.http.output import HttpOutput @@ -89,7 +88,6 @@ class Registry: "confluentkafka_input": ConfluentKafkaInput, "confluentkafka_output": ConfluentKafkaOutput, "console_output": ConsoleOutput, - "elasticsearch_output": ElasticsearchOutput, "jsonl_output": JsonlOutput, "opensearch_output": OpensearchOutput, "http_input": HttpInput, diff --git a/logprep/util/configuration.py b/logprep/util/configuration.py index d5c8423bf..f6f8f09c8 100644 --- a/logprep/util/configuration.py +++ b/logprep/util/configuration.py @@ -145,7 +145,7 @@ recommended to use these especially for sensitive information like usernames, password, secrets or hash salts. Examples where this could be useful would be the :code:`key` for the hmac calculation (see - `input` > `preprocessing`) or the :code:`user`/:code:`secret` for the elastic-/opensearch + `input` > `preprocessing`) or the :code:`user`/:code:`secret` for the opensearch connectors. The following config file will be valid by setting the given environment variables: @@ -380,7 +380,6 @@ class LoggerConfig: "root", "INFO" "filelock", "ERROR" "urllib3.connectionpool", "ERROR" - "elasticsearch", "ERROR" "opensearch", "ERROR" "uvicorn", "INFO" "uvicorn.access", "INFO" diff --git a/logprep/util/defaults.py b/logprep/util/defaults.py index fffdd6afe..33dbb5b29 100644 --- a/logprep/util/defaults.py +++ b/logprep/util/defaults.py @@ -50,7 +50,6 @@ class EXITCODES(Enum): "console": {"handlers": ["console"]}, "filelock": {"level": "ERROR"}, "urllib3.connectionpool": {"level": "ERROR"}, - "elasticsearch": {"level": "ERROR"}, "opensearch": {"level": "ERROR"}, }, "filters": {}, diff --git a/pyproject.toml b/pyproject.toml index 5ccb5031d..87fa227ee 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -35,7 +35,6 @@ classifiers = [ "Topic :: System :: Logging", ] keywords = [ - "elasticsearch", "kafka", "etl", "sre", @@ -51,7 +50,6 @@ dependencies = [ "ciso8601", # fastest iso8601 datetime parser. can be removed after dropping support for python < 3.11 "colorama", "confluent-kafka>2", - "elasticsearch>=7,<8", "geoip2", "hyperscan>=0.7.0", "jsonref", diff --git a/quickstart/docker-compose.yml b/quickstart/docker-compose.yml index 886dfc64d..63f893e0d 100644 --- a/quickstart/docker-compose.yml +++ b/quickstart/docker-compose.yml @@ -1,6 +1,6 @@ version: "3.9" -# The following command must be executed after each restart on linux or elasticsearch exits with an error +# The following command must be executed after each restart on linux or opensearch exits with an error # sudo sysctl -w vm.max_map_count=262144 services: diff --git a/tests/unit/connector/test_elasticsearch_output.py b/tests/unit/connector/test_elasticsearch_output.py deleted file mode 100644 index ef8c7fdfa..000000000 --- a/tests/unit/connector/test_elasticsearch_output.py +++ /dev/null @@ -1,368 +0,0 @@ -# pylint: disable=missing-docstring -# pylint: disable=protected-access -# pylint: disable=wrong-import-position -# pylint: disable=wrong-import-order -# pylint: disable=attribute-defined-outside-init -# pylint: disable=too-many-arguments -import json -import re -from datetime import datetime -from math import isclose -from unittest import mock - -import elasticsearch as search -import pytest -from elasticsearch import ElasticsearchException as SearchException -from elasticsearch import helpers - -from logprep.abc.component import Component -from logprep.abc.output import FatalOutputError -from logprep.util.time import TimeParser -from tests.unit.connector.base import BaseOutputTestCase - - -class NotJsonSerializableMock: - pass - - -helpers.bulk = mock.MagicMock() - - -class TestElasticsearchOutput(BaseOutputTestCase): - CONFIG = { - "type": "elasticsearch_output", - "hosts": ["host:123"], - "default_index": "default_index", - "error_index": "error_index", - "message_backlog_size": 1, - "timeout": 5000, - } - - def test_store_sends_to_default_index(self): - self.object._config.message_backlog_size = 2 - event = {"field": "content"} - expected = { - "_index": "default_index", - "message": '{"field": "content"}', - "reason": "Missing index in document", - } - self.object.store(event) - - assert self.object._message_backlog[0].pop("@timestamp") - assert self.object._message_backlog[0] == expected - - def test_store_sends_event_to_expected_index_with_date_pattern_if_index_missing_in_event(self): - default_index = "default_index-%{%y-%m-%d}" - event = {"field": "content"} - - formatted_date = TimeParser.now().strftime("%y-%m-%d") - expected_index = re.sub(r"%{%y-%m-%d}", formatted_date, default_index) - expected = { - "_index": expected_index, - "message": '{"field": "content"}', - "reason": "Missing index in document", - } - self.object._config.default_index = default_index - self.object._config.message_backlog_size = 2 - self.object.store(event) - - assert self.object._message_backlog[0].pop("@timestamp") - assert self.object._message_backlog[0] == expected - - def test_store_custom_sends_event_to_expected_index(self): - custom_index = "custom_index" - event = {"field": "content"} - expected = {"field": "content", "_index": custom_index} - self.object._config.message_backlog_size = 2 - self.object.store_custom(event, custom_index) - assert self.object._message_backlog[0] == expected - - def test_store_failed(self): - error_index = "error_index" - event_received = {"field": "received"} - event = {"field": "content"} - error_message = "error message" - expected = { - "error": error_message, - "original": event_received, - "processed": event, - "_index": error_index, - "@timestamp": str(datetime.now()), - } - - self.object._config.message_backlog_size = 2 - self.object.store_failed(error_message, event_received, event) - - error_document = self.object._message_backlog[0] - # timestamp is compared to be approximately the same, - # since it is variable and then removed to compare the rest - error_time = datetime.timestamp(TimeParser.from_string(error_document["@timestamp"])) - expected_time = datetime.timestamp(TimeParser.from_string(error_document["@timestamp"])) - assert isclose(error_time, expected_time) - del error_document["@timestamp"] - del expected["@timestamp"] - - assert error_document == expected - - def test_create_es_output_settings_contains_expected_values(self): - expected = { - "reason": "A reason for failed indexing", - "_index": "default_index", - } - failed_document = self.object._build_failed_index_document( - {"invalid_json": NotJsonSerializableMock(), "something_valid": "im_valid!"}, - "A reason for failed indexing", - ) - assert "NotJsonSerializableMock" in failed_document.pop("message") - assert failed_document.pop("@timestamp") - assert failed_document == expected - - def test_build_failed_index_document(self): - expected = { - "reason": "A reason for failed indexing", - "message": '{"foo": "bar"}', - "_index": "default_index", - } - failed_document = self.object._build_failed_index_document( - {"foo": "bar"}, "A reason for failed indexing" - ) - assert failed_document.pop("@timestamp") - assert failed_document == expected - - @mock.patch( - "elasticsearch.helpers.bulk", - side_effect=search.SerializationError, - ) - def test_write_to_search_context_calls_handle_serialization_error_if_serialization_error( - self, _ - ): - self.object._config.message_backlog_size = 1 - self.object._handle_serialization_error = mock.MagicMock() - self.object._message_backlog.append({"dummy": "event"}) - self.object._write_to_search_context() - self.object._handle_serialization_error.assert_called() - - @mock.patch( - "elasticsearch.helpers.bulk", - side_effect=search.ConnectionError, - ) - def test_write_to_search_context_calls_handle_connection_error_if_connection_error(self, _): - self.object._config.message_backlog_size = 1 - self.object._handle_connection_error = mock.MagicMock() - self.object._message_backlog.append({"dummy": "event"}) - self.object._write_to_search_context() - self.object._handle_connection_error.assert_called() - - @mock.patch( - "elasticsearch.helpers.bulk", - side_effect=helpers.BulkIndexError, - ) - def test_write_to_search_context_calls_handle_bulk_index_error_if_bulk_index_error(self, _): - self.object._config.message_backlog_size = 1 - self.object._handle_bulk_index_error = mock.MagicMock() - self.object._message_backlog.append({"dummy": "event"}) - self.object._write_to_search_context() - self.object._handle_bulk_index_error.assert_called() - - @mock.patch("elasticsearch.helpers.bulk") - def test_handle_bulk_index_error_calls_bulk(self, fake_bulk): - mock_bulk_index_error = mock.MagicMock() - mock_bulk_index_error.errors = [ - { - "index": { - "data": {"my": "document"}, - "error": {"type": "myerrortype", "reason": "myreason"}, - } - } - ] - self.object._handle_bulk_index_error(mock_bulk_index_error) - fake_bulk.assert_called() - - @mock.patch("elasticsearch.helpers.bulk") - def test_handle_bulk_index_error_calls_bulk_with_error_documents(self, fake_bulk): - mock_bulk_index_error = mock.MagicMock() - mock_bulk_index_error.errors = [ - { - "index": { - "data": {"my": "document"}, - "error": {"type": "myerrortype", "reason": "myreason"}, - } - } - ] - self.object._handle_bulk_index_error(mock_bulk_index_error) - call_args = fake_bulk.call_args[0][1] - error_document = call_args[0] - assert "reason" in error_document - assert "@timestamp" in error_document - assert "_index" in error_document - assert "message" in error_document - assert error_document.get("reason") == "myerrortype: myreason" - assert error_document.get("message") == json.dumps({"my": "document"}) - - @pytest.mark.parametrize( - "status_code, error, error_info, messages, discarded_cnt, exception", - [ - ( - 429, - "circuit_breaking_exception", - {"anything": "anything"}, - [{"foo": "*" * 500}], - 1, - None, - ), - ( - 429, - "circuit_breaking_exception", - {"anything": "anything"}, - [{"foo": "bar"}, {"bar": "baz"}], - 0, - FatalOutputError, - ), - ( - 429, - "circuit_breaking_exception", - {"anything": "anything"}, - [{"foo": "*" * 500}, {"bar": "baz"}], - 1, - None, - ), - ( - 429, - "circuit_breaking_exception", - {"anything": "anything"}, - [{"foo": "*" * 500}, {"bar": "*" * 500}], - 2, - None, - ), - ( - 123, - "circuit_breaking_exception", - {"anything": "anything"}, - [{"foo": "*" * 500}], - 1, - FatalOutputError, - ), - ( - 429, - "wrong_exception", - {"anything": "anything"}, - [{"foo": "*" * 500}], - 1, - FatalOutputError, - ), - ( - 429, - "rejected_execution_exception", - {"invalid": "error"}, - [{"foo": "*" * 500}], - 1, - FatalOutputError, - ), - ( - 429, - "rejected_execution_exception", - {"error": {"reason": "wrong_reason"}}, - [{"foo": "*" * 500}], - 1, - FatalOutputError, - ), - ( - 429, - "rejected_execution_exception", - { - "error": { - "reason": "... " - "coordinating_operation_bytes=9, max_coordinating_and_primary_bytes=1 " - "..." - } - }, - [{"foo": "*" * 500}], - 1, - None, - ), - ( - 429, - "rejected_execution_exception", - { - "error": { - "reason": "... " - "coordinating_operation_bytes=1, max_coordinating_and_primary_bytes=9 " - "..." - } - }, - [{"foo": "*" * 500}], - 1, - FatalOutputError, - ), - ], - ) - def test_handle_transport_error_calls_bulk_with_error_documents( - self, status_code, error, error_info, messages, discarded_cnt, exception - ): - self.object._config.maximum_message_size_mb = 5 * 10**-4 - - mock_transport_error = search.exceptions.TransportError(status_code, error, error_info) - - if exception: - with pytest.raises(exception): - self.object._handle_transport_error(mock_transport_error) - else: - self.object._message_backlog = messages - self.object._handle_transport_error(mock_transport_error) - above_limit = [] - under_limit = [] - for message in self.object._message_backlog: - if message.get("_index") == "error_index": - assert message.get("error", "").startswith( - "Discarded message that is larger than the allowed size limit" - ) - assert len(message.get("processed_snipped", "")) <= 1000 - assert message.get("processed_snipped", "").endswith(" ...") - above_limit.append(message) - else: - under_limit.append(message) - assert len(above_limit) == discarded_cnt - assert len(above_limit) + len(under_limit) == len(self.object._message_backlog) - - def test_handle_connection_error_raises_fatal_output_error(self): - with pytest.raises(FatalOutputError): - self.object._handle_connection_error(mock.MagicMock()) - - def test_handle_serialization_error_raises_fatal_output_error(self): - with pytest.raises(FatalOutputError): - self.object._handle_serialization_error(mock.MagicMock()) - - def test_setup_raises_fatal_output_error_if_elastic_error_is_raised(self): - self.object._search_context.info = mock.MagicMock() - self.object._search_context.info.side_effect = SearchException - with pytest.raises(FatalOutputError): - self.object.setup() - - def test_setup_registers_flush_timout_tasks(self): - job_count = len(Component._scheduler.jobs) - with pytest.raises(FatalOutputError): - self.object.setup() - assert len(Component._scheduler.jobs) == job_count + 1 - - def test_message_backlog_is_not_written_if_message_backlog_size_not_reached(self): - self.object._config.message_backlog_size = 2 - assert len(self.object._message_backlog) == 0 - with mock.patch( - "logprep.connector.elasticsearch.output.ElasticsearchOutput._write_backlog" - ) as mock_write_backlog: - self.object.store({"test": "event"}) - mock_write_backlog.assert_not_called() - - def test_message_backlog_is_cleared_after_it_was_written(self): - self.object._config.message_backlog_size = 1 - self.object.store({"event": "test_event"}) - assert len(self.object._message_backlog) == 0 - - @mock.patch( - "logprep.connector.elasticsearch.output.ElasticsearchOutput._search_context", - new=mock.MagicMock(), - ) - @mock.patch("inspect.getmembers", return_value=[("mock_prop", lambda: None)]) - def test_setup_populates_cached_properties(self, mock_getmembers): - self.object.setup() - mock_getmembers.assert_called_with(self.object) diff --git a/tests/unit/connector/test_opensearch_output.py b/tests/unit/connector/test_opensearch_output.py index 1621f2721..3e086a426 100644 --- a/tests/unit/connector/test_opensearch_output.py +++ b/tests/unit/connector/test_opensearch_output.py @@ -353,7 +353,6 @@ def test_setup_raises_fatal_output_error_if_opensearch_error_is_raised(self): self.object.setup() def test_setup_registers_flush_timout_tasks(self): - self.object._config.hosts = ["opensearch:9092"] job_count = len(Component._scheduler.jobs) with pytest.raises(FatalOutputError): self.object.setup() diff --git a/tests/unit/util/test_getter.py b/tests/unit/util/test_getter.py index 6badf6ac5..c64d51d39 100644 --- a/tests/unit/util/test_getter.py +++ b/tests/unit/util/test_getter.py @@ -14,6 +14,7 @@ import requests.exceptions import responses from responses import matchers +from responses.registries import OrderedRegistry from ruamel.yaml import YAML from logprep.util.credentials import Credentials, CredentialsEnvNotFoundError @@ -357,40 +358,27 @@ def test_raises_on_try_to_set_credentials_from_url_string(self): "https://oauth:ajhsdfpoweiurjdfs239487@the.target.url/targetfile" ) - @mock.patch("urllib3.connectionpool.HTTPConnectionPool._get_conn") - def test_raises_requestexception_after_3_retries(self, getconn_mock): - getconn_mock.return_value.getresponse.side_effect = [ - mock.MagicMock(status=500), # one initial request and three retries - mock.MagicMock(status=502), - mock.MagicMock(status=500), - mock.MagicMock(status=500), - mock.MagicMock(status=500), # fourth is not considered because of raise - ] + # pylint: disable=unexpected-keyword-arg,no-value-for-parameter + @responses.activate(registry=OrderedRegistry) + def test_raises_requestexception_after_3_retries(self): + responses.get("https://does-not-matter/bar", status=500) + responses.get("https://does-not-matter/bar", status=500) # 1st retry + responses.get("https://does-not-matter/bar", status=502) # 2nd retry + responses.get("https://does-not-matter/bar", status=500) # 3rd retry and exception + responses.get("https://does-not-matter/bar", status=200) # works again http_getter = GetterFactory.from_string("https://does-not-matter/bar") with pytest.raises(requests.exceptions.RequestException, match="Max retries exceed"): http_getter.get() - assert getconn_mock.return_value.request.mock_calls == [ - # one initial request and three retries - mock.call("GET", "/bar", body=None, headers=mock.ANY), - mock.call("GET", "/bar", body=None, headers=mock.ANY), - mock.call("GET", "/bar", body=None, headers=mock.ANY), - mock.call("GET", "/bar", body=None, headers=mock.ANY), - ] - - @mock.patch("urllib3.connectionpool.HTTPConnectionPool._get_conn") - def test_get_does_one_successful_request_after_two_failed(self, getconn_mock): - getconn_mock.return_value.getresponse.side_effect = [ - mock.MagicMock(status=500), - mock.MagicMock(status=502), - mock.MagicMock(status=200), - ] + http_getter.get() + + @responses.activate(registry=OrderedRegistry) + def test_get_does_one_successful_request_after_two_failed(self): + responses.get("https://does-not-matter/bar", status=500) + responses.get("https://does-not-matter/bar", status=500) # 1st retry + responses.get("https://does-not-matter/bar", status=502) # 2nd retry + responses.get("https://does-not-matter/bar", status=200) # works again http_getter = GetterFactory.from_string("https://does-not-matter/bar") http_getter.get() - assert getconn_mock.return_value.request.mock_calls == [ - mock.call("GET", "/bar", body=None, headers=mock.ANY), - mock.call("GET", "/bar", body=None, headers=mock.ANY), - mock.call("GET", "/bar", body=None, headers=mock.ANY), - ] def test_credentials_returns_credential_object_if_no_credentials(self): http_getter = GetterFactory.from_string("https://does-not-matter/bar") diff --git a/tests/unit/util/test_logging.py b/tests/unit/util/test_logging.py index fc229052b..007ac5352 100644 --- a/tests/unit/util/test_logging.py +++ b/tests/unit/util/test_logging.py @@ -42,7 +42,6 @@ def test_queuhandler_uses_multiprocessing_queue(self): [ ("filelock", "ERROR"), ("urllib3.connectionpool", "ERROR"), - ("elasticsearch", "ERROR"), ("opensearch", "ERROR"), ], )