Skip to content

Commit

Permalink
add way to detect format of timestamp for normalization
Browse files Browse the repository at this point in the history
  • Loading branch information
djkhl committed Aug 13, 2024
1 parent 0c62428 commit 7ee1b48
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 14 deletions.
28 changes: 19 additions & 9 deletions logprep/processor/pre_detector/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
.. automodule:: logprep.processor.pre_detector.rule
"""

from datetime import datetime
from datetime import datetime, timezone
from functools import cached_property
from uuid import uuid4

Expand All @@ -40,7 +40,7 @@
from logprep.processor.pre_detector.ip_alerter import IPAlerter
from logprep.processor.pre_detector.rule import PreDetectorRule
from logprep.util.helper import add_field_to, get_dotted_field_value
from logprep.util.time import TimeParser
from logprep.util.time import TimeParser, TimeParserException


class PreDetector(Processor):
Expand Down Expand Up @@ -101,6 +101,21 @@ def is_normalized_timestamp(self, timestamp: str):
except ValueError:
return False

def detect_format_and_normalize_timestamp(self, timestamp):
"""method for detecting the used source format of a timestamp and normalizing it"""
formats = [
"%Y%m%d%H%M%S",
"UNIX",
]
for form in formats:
if not self.is_normalized_timestamp(timestamp):
try:
return TimeParser.parse_datetime(timestamp, form, timezone.utc).isoformat()
except TimeParserException:
continue
else:
return timestamp

def _apply_rules(self, event, rule):
if not (
self._ip_alerter.has_ip_fields(rule)
Expand All @@ -112,13 +127,8 @@ def _apply_rules(self, event, rule):
timestamp = get_dotted_field_value(event, "@timestamp")

if timestamp is not None:
if self.is_normalized_timestamp(timestamp):
detection["@timestamp"] = timestamp
else:
# need to find out how to get every format not just unix..
timestamp = TimeParser.parse_datetime(timestamp, "UNIX", "UTC")
result = timestamp.isoformat()
detection["@timestamp"] = result
timestamp = self.detect_format_and_normalize_timestamp(timestamp)
detection["@timestamp"] = timestamp

def _get_detection_result(self, event: dict, rule: PreDetectorRule):
pre_detection_id = get_dotted_field_value(event, "pre_detection_id")
Expand Down
22 changes: 17 additions & 5 deletions tests/unit/processor/pre_detector/test_pre_detector.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import re
from copy import deepcopy

import pytest

from tests.unit.processor.base import BaseProcessorTestCase


Expand Down Expand Up @@ -329,16 +331,26 @@ def _assert_equality_of_results(

def test_adds_timestamp_to_extra_data_if_provided_by_event(self):
document = {
"@timestamp": "2024-08-12T12:13:04Z",
"@timestamp": "20240812121304",
"winlog": {"event_id": 123, "event_data": {"ServiceName": "VERY BAD"}},
}
detection_results = self.object.process(document)
assert detection_results.data[0][0].get("@timestamp") == "2024-08-12T12:13:04Z"
assert detection_results.data[0][0].get("@timestamp") == "2024-08-12T12:13:04+00:00"

def test_timestamp_is_normalised(self):
@pytest.mark.parametrize(
"testcase, timestamp",
[
("UNIX timestamp", "1723464784"),
("format from format list", "20240812121304"),
("already normalized timestamp", "2024-08-12T12:13:04+00:00"),
],
)
def test_timestamp_is_normalized(self, testcase, timestamp):
document = {
"@timestamp": "1723464784",
"@timestamp": timestamp,
"winlog": {"event_id": 123, "event_data": {"ServiceName": "VERY BAD"}},
}
detection_results = self.object.process(document)
assert detection_results.data[0][0].get("@timestamp") == "2024-08-12T12:13:04+00:00"
assert (
detection_results.data[0][0].get("@timestamp") == "2024-08-12T12:13:04+00:00"
), testcase

0 comments on commit 7ee1b48

Please sign in to comment.