Skip to content

Commit

Permalink
add opensearch options
Browse files Browse the repository at this point in the history
  • Loading branch information
ekneg54 committed Oct 2, 2024
1 parent 81552c5 commit 2f1aab3
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 92 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
* removes the option to use synchronous `bulk` or `parallel_bulk` operation in favor of `parallel_bulk` in `opensearch_output`
* reimplement error handling by introducing the option to configure an error output
* if no error output is configured, failed event will be dropped
* remove the retry mechanic from `opensearch_output` connector to reduce implementation complexity
* all documents are now stored as failed in the error output if configured

### Features

Expand All @@ -17,6 +19,9 @@
* initially run health checks on setup for every configured component
* make `imagePullPolicy` configurable for helm chart deployments
* adds ability to configure error output
* adds option `default_op_type` to `opensearch_output` connector to set the default operation for indexing documents (default: index)
* adds option `max_chunk_bytes` to `opensearch_output` connector to set the maximum size of the request in bytes (default: 100MB)


### Improvements

Expand Down
95 changes: 16 additions & 79 deletions logprep/connector/opensearch/output.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,8 @@
from opensearchpy import OpenSearchException, helpers
from opensearchpy.serializer import JSONSerializer

from logprep.abc.output import CriticalOutputError, FatalOutputError, Output
from logprep.abc.output import CriticalOutputError, Output
from logprep.metrics.metrics import Metric
from logprep.util.helper import get_dict_size_in_byte

logger = logging.getLogger("OpenSearchOutput")

Expand Down Expand Up @@ -99,20 +98,8 @@ class Config(Output.Config):
default index."""
message_backlog_size: int = field(validator=validators.instance_of(int))
"""Amount of documents to store before sending them."""
maximum_message_size_mb: Optional[Union[float, int]] = field(
validator=validators.optional(validators.instance_of((float, int))),
converter=(lambda x: x * 10**6 if x else None),
default=None,
)
"""(Optional) Maximum estimated size of a document in MB before discarding it if it causes
an error."""
timeout: int = field(validator=validators.instance_of(int), default=500)
"""(Optional) Timeout for the connection (default is 500ms)."""
max_retries: int = field(validator=validators.instance_of(int), default=0)
"""(Optional) Maximum number of retries for documents rejected with code 429 (default is 0).
Increases backoff time by 2 seconds per try, but never exceeds 600 seconds. When using
parallel_bulk in the opensearch connector then the backoff time starts with 1 second. With
each consecutive retry 500 to 1000 ms will be added to the delay, chosen randomly """
user: Optional[str] = field(validator=validators.instance_of(str), default="")
"""(Optional) User used for authentication."""
secret: Optional[str] = field(validator=validators.instance_of(str), default="")
Expand All @@ -134,6 +121,9 @@ class Config(Output.Config):
default=500, validator=[validators.instance_of(int), validators.gt(1)]
)
"""Chunk size to use for bulk requests."""
max_chunk_bytes: int = field(
default=100 * 1024 * 1024, validator=[validators.instance_of(int), validators.gt(1)]
)
desired_cluster_status: list = field(
default=["green"], validator=validators.instance_of(list)
)
Expand Down Expand Up @@ -291,7 +281,6 @@ def _write_backlog(self):
self._bulk(
self._search_context,
self._message_backlog,
max_retries=self._config.max_retries,
chunk_size=len(self._message_backlog),
)
except CriticalOutputError as error:
Expand All @@ -302,17 +291,19 @@ def _write_backlog(self):
self._succeeded.clear()

def _bulk(self, client, actions, *args, **kwargs) -> Optional[List[dict]]:
"""Bulk index documents into Opensearch.
uses the parallel_bulk function from the opensearchpy library.
all args are passed to :code:`streaming_bulk` function.
"""
kwargs |= {
"max_chunk_bytes": self._config.max_chunk_bytes,
"chunk_size": self._config.chunk_size,
"queue_size": self._config.queue_size,
"raise_on_error": False,
"raise_on_exception": False,
}
succeeded, failed = self._succeeded, self._failed
for index, result in enumerate(
helpers.parallel_bulk(
client,
actions=actions,
chunk_size=self._config.chunk_size,
queue_size=self._config.queue_size,
raise_on_error=False,
raise_on_exception=False,
)
):
for index, result in enumerate(helpers.parallel_bulk(client, actions, **kwargs)):
success, item = result
if logger.isEnabledFor(logging.DEBUG):
if success:
Expand All @@ -325,60 +316,6 @@ def _bulk(self, client, actions, *args, **kwargs) -> Optional[List[dict]]:
if failed:
raise CriticalOutputError(self, "failed to index", failed)

def _handle_transport_error(self, error: search.exceptions.TransportError):
"""Handle transport error for opensearch bulk indexing.
Discard messages that exceed the maximum size if they caused an error.
Parameters
----------
error : TransportError
TransportError for the error message.
"""
if self._message_exceeds_max_size_error(error):
(
messages_under_size_limit,
messages_over_size_limit,
) = self._split_message_backlog_by_size_limit()

if len(messages_over_size_limit) == 0:
raise FatalOutputError(self, error.error)

error_documents = self._build_messages_for_large_error_documents(
messages_over_size_limit
)
self._message_backlog = error_documents + messages_under_size_limit
self._bulk(self._search_context, self._message_backlog)
else:
raise FatalOutputError(self, error.error)

def _message_exceeds_max_size_error(self, error):
if error.status_code != 429:
return False
if error.error == "circuit_breaking_exception":
return True

if error.error == "rejected_execution_exception":
reason = error.info.get("error", {}).get("reason", "")
match = self._size_error_pattern.match(reason)
if match and int(match.group("size")) >= int(match.group("max_size")):
return True
return False

def _split_message_backlog_by_size_limit(self):
messages_under_size_limit = []
messages_over_size_limit = []
total_size = 0
for message in self._message_backlog:
message_size = get_dict_size_in_byte(message)
if message_size < self._config.maximum_message_size_mb:
messages_under_size_limit.append(message)
total_size += message_size
else:
messages_over_size_limit.append((message, message_size))
return messages_under_size_limit, messages_over_size_limit

def health(self) -> bool:
"""Check the health of the component."""
try:
Expand Down
13 changes: 0 additions & 13 deletions logprep/util/helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -304,19 +304,6 @@ def get_source_fields_dict(event, rule):
return source_field_dict


def get_dict_size_in_byte(dictionary: dict) -> int:
"""returns the size of a nested dictionary in bytes"""
size = sys.getsizeof(dictionary)
if isinstance(dictionary, dict):
keys_size = sum(map(get_dict_size_in_byte, dictionary.keys()))
values_size = sum(map(get_dict_size_in_byte, dictionary.values()))
return size + keys_size + values_size
if isinstance(dictionary, list):
elements_size = sum(map(get_dict_size_in_byte, dictionary))
return size + elements_size
return size


def get_versions_string(config: "Configuration" = None) -> str:
"""
Prints the version and exists. If a configuration was found then it's version
Expand Down

0 comments on commit 2f1aab3

Please sign in to comment.