Skip to content

Commit

Permalink
make parallel_bulk optional (#504)
Browse files Browse the repository at this point in the history
* make parallel_bulk optional
---------

Co-authored-by: dtrai2 <[email protected]>
  • Loading branch information
ekneg54 and dtrai2 authored Jan 8, 2024
1 parent d13e040 commit 588a140
Show file tree
Hide file tree
Showing 5 changed files with 29 additions and 20 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

### Features

* add option to Opensearch Output Connector to use parallel bulk implementation (default is True)

### Improvements

### Bugfix
Expand Down
4 changes: 2 additions & 2 deletions logprep/connector/elasticsearch/output.py
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,8 @@ def _write_backlog(self):
max_retries=self._config.max_retries,
chunk_size=len(self._message_backlog),
)
if self.input_connector and hasattr(self.input_connector, "batch_finished_callback"):
self.input_connector.batch_finished_callback()
self._message_backlog.clear()

def _bulk(self, client, actions, *args, **kwargs):
Expand All @@ -324,8 +326,6 @@ def _bulk(self, client, actions, *args, **kwargs):
self._handle_bulk_index_error(error)
except search.exceptions.TransportError as error:
self._handle_transport_error(error)
if self.input_connector and hasattr(self.input_connector, "batch_finished_callback"):
self.input_connector.batch_finished_callback()

def _handle_serialization_error(self, error: search.SerializationError):
"""Handle serialization error for elasticsearch bulk indexing.
Expand Down
35 changes: 21 additions & 14 deletions logprep/connector/opensearch/output.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ class OpensearchOutput(ElasticsearchOutput):
class Config(ElasticsearchOutput.Config):
"""Config for OpensearchOutput."""

parallel_bulk: bool = field(default=True, validator=validators.instance_of(bool))
"""Configure if all events in the backlog should be send, in parallel, via multiple threads
to Opensearch. (Default: :code:`True`)"""
thread_count: int = field(
default=4, validator=[validators.instance_of(int), validators.gt(1)]
)
Expand Down Expand Up @@ -112,18 +115,10 @@ def describe(self) -> str:

def _bulk(self, client, actions, *args, **kwargs):
try:
for success, item in helpers.parallel_bulk(
client,
actions=actions,
chunk_size=self._config.chunk_size,
queue_size=self._config.queue_size,
raise_on_error=True,
raise_on_exception=True,
):
if not success:
result = item[list(item.keys())[0]]
if "error" in result:
raise result.get("error")
if self._config.parallel_bulk:
self._parallel_bulk(client, actions, *args, **kwargs)
return
helpers.bulk(client, actions, *args, **kwargs)
except search.SerializationError as error:
self._handle_serialization_error(error)
except search.ConnectionError as error:
Expand All @@ -132,5 +127,17 @@ def _bulk(self, client, actions, *args, **kwargs):
self._handle_bulk_index_error(error)
except search.exceptions.TransportError as error:
self._handle_transport_error(error)
if self.input_connector:
self.input_connector.batch_finished_callback()

def _parallel_bulk(self, client, actions, *args, **kwargs):
for success, item in helpers.parallel_bulk(
client,
actions=actions,
chunk_size=self._config.chunk_size,
queue_size=self._config.queue_size,
raise_on_error=True,
raise_on_exception=True,
):
if not success:
result = item[list(item.keys())[0]]
if "error" in result:
raise result.get("error")
6 changes: 2 additions & 4 deletions quickstart/exampledata/config/pipeline.yml
Original file line number Diff line number Diff line change
Expand Up @@ -111,13 +111,11 @@ output:
- 127.0.0.1:9200
default_index: processed
error_index: errors
message_backlog_size: 16000
message_backlog_size: 10000
timeout: 10000
flush_timeout: 60
max_retries: 3
thread_count: 16
queue_size: 32
chunk_size: 500
parallel_bulk: false
user: admin
secret: admin
kafka:
Expand Down
2 changes: 2 additions & 0 deletions tests/unit/connector/test_opensearch_output.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ class NotJsonSerializableMock:
in_ci = os.environ.get("GITHUB_ACTIONS") == "true"

helpers.parallel_bulk = mock.MagicMock()
helpers.bulk = mock.MagicMock()


class TestOpenSearchOutput(BaseOutputTestCase):
Expand Down Expand Up @@ -348,6 +349,7 @@ def test_setup_raises_fatal_output_error_if_opensearch_error_is_raised(self):
self.object.setup()

def test_setup_registers_flush_timout_tasks(self):
# this test fails if opensearch is running on localhost
job_count = len(Component._scheduler.jobs)
with pytest.raises(FatalOutputError):
self.object.setup()
Expand Down

0 comments on commit 588a140

Please sign in to comment.