Skip to content

Commit

Permalink
add event to critical output error raised during opensearch bulk oper…
Browse files Browse the repository at this point in the history
…ations
  • Loading branch information
ekneg54 committed Sep 30, 2024
1 parent 04cbe6b commit be12243
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 10 deletions.
19 changes: 11 additions & 8 deletions logprep/connector/opensearch/output.py
Original file line number Diff line number Diff line change
Expand Up @@ -290,18 +290,21 @@ def _write_backlog(self):
def _bulk(self, client, actions, *args, **kwargs) -> Optional[List[dict]]:
failed = []
succeeded = []
for success, item in helpers.parallel_bulk(
client,
actions=actions,
chunk_size=self._config.chunk_size,
queue_size=self._config.queue_size,
raise_on_error=False,
raise_on_exception=False,
for index, data in enumerate(
helpers.parallel_bulk(
client,
actions=actions,
chunk_size=self._config.chunk_size,
queue_size=self._config.queue_size,
raise_on_error=False,
raise_on_exception=False,
)
):
success, item = data
if success:
succeeded.append(item)
else:
failed.append(item)
failed.append({"errors": item, "event": actions[index]})
if succeeded and logger.isEnabledFor(logging.DEBUG):
for item in succeeded:
logger.debug("Document indexed: %s", item)
Expand Down
10 changes: 8 additions & 2 deletions logprep/framework/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -360,8 +360,14 @@ def enqueue_error(
event = {"event": str(item.event), "errors": str(item.errors)}
elif isinstance(item, (CriticalInputError, CriticalOutputError)):
if isinstance(item.raw_input, list):
error = str(item)
event = [{"event": i, "errors": error} for i in item.raw_input]
default_error = str(item)
event = [
{
"event": i["event"] if "event" in i else i,
"errors": (i["errors"] if "errors" in i else default_error),
}
for i in item.raw_input
]
else:
event = {"event": str(item.raw_input), "errors": str(item)}
else:
Expand Down

0 comments on commit be12243

Please sign in to comment.