Skip to content

Commit

Permalink
better opensearch error messages
Browse files Browse the repository at this point in the history
  • Loading branch information
ekneg54 committed Oct 14, 2024
1 parent fc81731 commit 04a2373
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 34 deletions.
38 changes: 10 additions & 28 deletions logprep/connector/opensearch/output.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,14 +140,11 @@ class Config(Output.Config):
"""Default op_type for indexing documents. Default is 'index',
Consider using 'create' for data streams or to prevent overwriting existing documents."""

__slots__ = ["_message_backlog", "_failed", "_succeeded"]
__slots__ = ["_message_backlog", "_failed"]

_failed: List
"""Temporary list of failed messages."""

_succeeded: List
"""Temporary list of succeeded messages."""

_message_backlog: List
"""List of messages to be sent to Opensearch."""

Expand Down Expand Up @@ -203,28 +200,16 @@ def _search_context(self):
verify_certs=True, # default is True, but we want to be explicit
timeout=self._config.timeout,
serializer=MSGPECSerializer(self),
pool_maxsize=20,
max_retries=self._config.max_retries,
pool_maxsize=10,
# default for connection pooling is 10 see:
# https://github.com/opensearch-project/opensearch-py/blob/main/guides/connection_classes.md
)

@cached_property
def _replace_pattern(self):
return re.compile(r"%{\S+?}")

@cached_property
def _size_error_pattern(self):
return re.compile(
r".*coordinating_operation_bytes=(?P<size>\d+), "
r"max_coordinating_and_primary_bytes=(?P<max_size>\d+).*"
)

def __init__(self, name: str, configuration: "OpensearchOutput.Config"):
super().__init__(name, configuration)
self._message_backlog = []
self._failed = []
self._succeeded = []

def setup(self):
super().setup()
Expand Down Expand Up @@ -299,7 +284,6 @@ def _write_backlog(self):
finally:
self._message_backlog.clear()
self._failed.clear()
self._succeeded.clear()

def _bulk(self, client, actions, *args, **kwargs) -> Optional[List[dict]]:
"""Bulk index documents into Opensearch.
Expand All @@ -313,17 +297,15 @@ def _bulk(self, client, actions, *args, **kwargs) -> Optional[List[dict]]:
"raise_on_error": False,
"raise_on_exception": False,
}
succeeded, failed = self._succeeded, self._failed
for index, result in enumerate(helpers.parallel_bulk(client, actions, **kwargs)):
failed = self._failed
for result in helpers.parallel_bulk(client, actions, **kwargs):
success, item = result
if logger.isEnabledFor(logging.DEBUG):
if success:
succeeded.append(item)
if not success:
failed.append({"errors": item, "event": actions[index]})
if succeeded and logger.isEnabledFor(logging.DEBUG):
for item in succeeded:
logger.debug("Document indexed: %s", item)
if success:
continue
error_result = item.get(self._config.default_op_type)
data = error_result.get("data", "None")
error = error_result.get("error", "None")
failed.append({"errors": error, "event": data})
if failed:
raise CriticalOutputError(self, "failed to index", failed)

Expand Down
8 changes: 2 additions & 6 deletions tests/unit/connector/test_opensearch_output.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,18 +171,14 @@ def test_write_backlog_clears_message_backlog_on_failure(self):
self.object._write_backlog()
assert len(self.object._message_backlog) == 0, "Message backlog should be cleared"

def test_write_backlog_clears_failed_and_succeeded_on_success(self):
def test_write_backlog_clears_failed_on_success(self):
self.object._message_backlog = [{"some": "event"}]
self.object._succeeded = [{"some": "event"}]
self.object._write_backlog()
assert len(self.object._failed) == 0, "temporary failed backlog should be cleared"
assert len(self.object._succeeded) == 0, "temporary succeeded backlog should be cleared"

def test_write_backlog_clears_failed_and_succeeded_on_failure(self):
def test_write_backlog_clears_failed_on_failure(self):
self.object._message_backlog = [{"some": "event"}]
self.object._failed = [{"some": "event"}]
self.object._succeeded = [{"some": "event"}]
with pytest.raises(CriticalOutputError):
self.object._write_backlog()
assert len(self.object._failed) == 0, "temporary failed backlog should be cleared"
assert len(self.object._succeeded) == 0, "temporary succeeded backlog should be cleared"

0 comments on commit 04a2373

Please sign in to comment.