diff --git a/flow/record/adapter/elastic.py b/flow/record/adapter/elastic.py index 035199d..86081ea 100644 --- a/flow/record/adapter/elastic.py +++ b/flow/record/adapter/elastic.py @@ -54,6 +54,7 @@ def __init__( if not uri.lower().startswith(("http://", "https://")): uri = "http://" + uri + self.json_packer = JsonRecordPacker(pack_descriptors=False) self.queue: queue.Queue[Record | StopIteration] = queue.Queue() self.event = threading.Event() @@ -64,8 +65,6 @@ def __init__( api_key=api_key, ) - self.json_packer = JsonRecordPacker() - self.thread = threading.Thread(target=self.streaming_bulk_thread) self.thread.start() self.exception: Exception | None = None @@ -90,7 +89,7 @@ def excepthook(self, exc: threading.ExceptHookArgs, *args, **kwargs) -> None: def record_to_document(self, record: Record, index: str) -> dict: """Convert a record to a Elasticsearch compatible document dictionary""" - rdict = record._asdict() + rdict = self.json_packer.pack_obj(record) # Store record metadata under `_record_metadata`. rdict_meta = { diff --git a/flow/record/jsonpacker.py b/flow/record/jsonpacker.py index 06264c8..6678893 100644 --- a/flow/record/jsonpacker.py +++ b/flow/record/jsonpacker.py @@ -51,6 +51,10 @@ def pack_obj(self, obj): if field_type == "boolean" and isinstance(serial[field_name], int): serial[field_name] = bool(serial[field_name]) + # Flatten command type + elif field_type == "command" and isinstance(serial[field_name], fieldtypes.command): + serial[field_name] = serial[field_name]._join() + return serial if isinstance(obj, RecordDescriptor): serial = { diff --git a/tests/test_elastic_adapter.py b/tests/test_elastic_adapter.py index c70012d..d6aaa85 100644 --- a/tests/test_elastic_adapter.py +++ b/tests/test_elastic_adapter.py @@ -1,3 +1,5 @@ +from __future__ import annotations + import json import pytest @@ -10,18 +12,50 @@ [ ("string", "field_one"), ("string", "field_two"), + ("string", "field_three"), + ], +) + +AnotherRecord = RecordDescriptor( + "my/record", + [ + ("command", "field_one"), + ("boolean", "field_two"), + ("bytes", "field_three"), ], ) @pytest.mark.parametrize( - "record", + "record, expected_output", [ - MyRecord("first", "record"), - MyRecord("second", "record"), + ( + MyRecord("first", "record", "!"), + { + "field_one": "first", + "field_two": "record", + "field_three": "!", + }, + ), + ( + MyRecord("second", "record", "!"), + { + "field_one": "second", + "field_two": "record", + "field_three": "!", + }, + ), + ( + AnotherRecord("/bin/bash -c 'echo hello'", False, b"\x01\x02\x03\x04"), + { + "field_one": "/bin/bash -c 'echo hello'", + "field_two": False, + "field_three": "AQIDBA==", + }, + ), ], ) -def test_elastic_writer_metadata(record): +def test_elastic_writer_metadata(record: MyRecord | AnotherRecord, expected_output: dict) -> None: options = { "_meta_foo": "some value", "_meta_bar": "another value", @@ -34,8 +68,7 @@ def test_elastic_writer_metadata(record): "_index": "some-index", "_source": json.dumps( { - "field_one": record.field_one, - "field_two": record.field_two, + **expected_output, "_record_metadata": { "descriptor": { "name": "my/record", diff --git a/tests/test_json_packer.py b/tests/test_json_packer.py index acd2edb..8d0877d 100644 --- a/tests/test_json_packer.py +++ b/tests/test_json_packer.py @@ -110,3 +110,19 @@ def test_record_pack_surrogateescape() -> None: # pack the json string back to a record and make sure it is the same as before assert packer.unpack(data) == record + + +def test_record_pack_command_type() -> None: + TestRecord = RecordDescriptor( + "test/record_with_commands", + [ + ("command", "win_command"), + ("command", "nix_command"), + ], + ) + + record = TestRecord(win_command="foo.exe /H /E /L /O", nix_command="/bin/bash -c 'echo hello'") + packer = JsonRecordPacker() + data = packer.pack(record) + + assert data.startswith('{"win_command": "foo.exe /H /E /L /O", "nix_command": "/bin/bash -c \'echo hello\'", ')