From 74cd71af7e0954ee3db297c58714dd2b3f654822 Mon Sep 17 00:00:00 2001 From: Mat0vu <73690594+Mat0vu@users.noreply.github.com> Date: Tue, 14 Jan 2025 08:29:33 +0100 Subject: [PATCH 1/4] Enable Creation of Individual Files for Translated Rules (#1) * enable specifying output-dir * add test for output creation * add test for nesting-level --------- Co-authored-by: Jan Aigner --- sigma/cli/convert.py | 111 ++++++++++++++++-- .../nested_rules/group_one/sigma_rule.yml | 16 +++ .../group_two/another_sigma_rule.yml | 15 +++ tests/files/valid/another_sigma_rule.yml | 15 +++ tests/files/valid/sigma_rule.yml | 2 +- tests/test_convert.py | 87 +++++++++++++- 6 files changed, 231 insertions(+), 15 deletions(-) create mode 100644 tests/files/nested_rules/group_one/sigma_rule.yml create mode 100644 tests/files/nested_rules/group_two/another_sigma_rule.yml create mode 100644 tests/files/valid/another_sigma_rule.yml diff --git a/sigma/cli/convert.py b/sigma/cli/convert.py index cf74baa..c0fd4d4 100644 --- a/sigma/cli/convert.py +++ b/sigma/cli/convert.py @@ -1,12 +1,14 @@ import json import pathlib import textwrap +import os from typing import Sequence import click from sigma.cli.rules import load_rules from sigma.conversion.base import Backend +from sigma.collection import SigmaCollection from sigma.exceptions import ( SigmaError, SigmaPipelineNotAllowedForBackendError, @@ -21,6 +23,30 @@ pipeline_list = list(pipeline_resolver.pipelines.keys()) +def ensure_dir_exists(ctx, param, value): + if value is None: + return value + # Split the path into its components + path_parts = value.split(os.sep) + current_path = "" + + for part in path_parts: + current_path = os.path.join(current_path, part) + + # Check if the current path exists + if not os.path.exists(current_path): + # If it doesn't exist, create it + click.echo(f"Creating specified output directory '{current_path}'") + os.makedirs(current_path, exist_ok=True) + elif not os.path.isdir(current_path): + # If it exists but is not a directory, raise an error + raise NotADirectoryError( + f"Cannot create directory '{current_path}' because a file with the same name exists." + ) + + return value + + class KeyValueParamType(click.ParamType): """ key=value type for backend-specific options. @@ -106,7 +132,7 @@ def fail(self, message: str, param, ctx): @click.option( "--correlation-method", "-c", - help="Select method for generation of correlation queries. If not given the default method of the backend is used." + help="Select method for generation of correlation queries. If not given the default method of the backend is used.", ) @click.option( "--filter", @@ -125,7 +151,7 @@ def fail(self, message: str, param, ctx): "--skip-unsupported/--fail-unsupported", "-s/", default=False, - help="Skip conversion of rules that can't be handled by the backend", + help="Skip conversion of rules that can't be handled by the backend.", ) @click.option( "--output", @@ -135,6 +161,25 @@ def fail(self, message: str, param, ctx): show_default=True, help="Write result to specified file. '-' writes to standard output.", ) +@click.option( + "--output-dir", + "-od", + type=click.Path( + file_okay=False, dir_okay=True, writable=True, exists=False, resolve_path=False + ), + default=None, + show_default=True, + help="Write result in INDIVIDUAL files for each rule in specified directory.", + callback=ensure_dir_exists, +) +@click.option( + "--nesting-level", + "-nl", + type=int, + default=1, + show_default=True, + help="Write result in INDIVIDUAL files for each rule in specified directory.", +) @click.option( "--encoding", "-e", @@ -171,6 +216,7 @@ def fail(self, message: str, param, ctx): type=click.BOOL, help="Verbose output.", ) + def convert( target, pipeline, @@ -187,6 +233,8 @@ def convert( input, file_pattern, verbose, + output_dir, + nesting_level, ): """ Convert Sigma rules into queries. INPUT can be multiple files or directories. This command automatically recurses @@ -221,7 +269,6 @@ def convert( k: (v[0] if len(v) == 1 else v) # if there's only one item, return it. for k, v in backend_options.items() } - # Initialize processing pipeline and backend backend_class = backends[target] try: @@ -267,7 +314,6 @@ def convert( f"Parameter '{param}' is not supported by backend '{target}'.", param_hint="backend_option", ) - if format not in backends[target].formats.keys(): raise click.BadParameter( f"Output format '{format}' is not supported by backend '{target}'. Run " @@ -275,7 +321,7 @@ def convert( + " to list all available formats of the target.", param_hint="format", ) - + if correlation_method is not None: correlation_methods = backend.correlation_methods if correlation_methods is None: @@ -286,7 +332,9 @@ def convert( elif correlation_method not in correlation_methods.keys(): raise click.BadParameter( f"Correlation method '{correlation_method}' is not supported by backend '{target}'. Run " - + click.style(f"sigma list correlation-methods {target}", bold=True, fg="green") + + click.style( + f"sigma list correlation-methods {target}", bold=True, fg="green" + ) + " to list all available correlation methods of the target.", param_hint="correlation_method", ) @@ -294,6 +342,46 @@ def convert( try: rule_collection = load_rules(input + filter, file_pattern) result = backend.convert(rule_collection, format, correlation_method) + if output_dir: + writes_successful = True + + # Collect all Paths for Rules + all_paths = [] + for dir_path in input: + all_paths.extend( + list( + SigmaCollection.resolve_paths( + [dir_path], + recursion_pattern="**/" + file_pattern, + ) + ) + ) + for index, path_of_input in enumerate(all_paths): + original_path_part_to_keep = os.path.sep.join( + path_of_input.parts[-nesting_level:] + ) + + try: + out_path = os.path.join(output_dir, original_path_part_to_keep) + ensure_dir_exists( + ctx=None, param=None, value=os.path.dirname(out_path) + ) + with open(out_path, "w", encoding="utf-8") as f: + f.write(result[index]) + except Exception as ex: + click.echo( + f"Could not write translated rules into output-dir {output_dir}: \n {ex}" + ) + writes_successful = False + if writes_successful: + click.echo( + f"Written {len(result)} translated rule(s) into individual files in specified output-dir '{output_dir}'" + ) + else: + click.echo( + f"Could not write {len(result)} translated rule(s) into individual files in specified output-dir '{output_dir}'" + ) + if isinstance(result, str): # String result click.echo(bytes(result, encoding), output) elif isinstance(result, bytes): # Bytes result: only allow to write it to file. @@ -331,16 +419,21 @@ def convert( ) except SigmaError as e: if verbose: - click.echo('Error while converting') + click.echo("Error while converting") raise e else: raise click.ClickException("Error while converting: " + str(e)) except NotImplementedError as e: if verbose: - click.echo('Feature required for conversion of Sigma rule is not supported by backend') + click.echo( + "Feature required for conversion of Sigma rule is not supported by backend" + ) raise e else: - raise click.ClickException("Feature required for conversion of Sigma rule is not supported by backend: " + str(e)) + raise click.ClickException( + "Feature required for conversion of Sigma rule is not supported by backend: " + + str(e) + ) if len(backend.errors) > 0: click.echo("\nIgnored errors:", err=True) diff --git a/tests/files/nested_rules/group_one/sigma_rule.yml b/tests/files/nested_rules/group_one/sigma_rule.yml new file mode 100644 index 0000000..3fbf217 --- /dev/null +++ b/tests/files/nested_rules/group_one/sigma_rule.yml @@ -0,0 +1,16 @@ +title: Test rule +id: 5013332f-8a70-4e04-bcc1-06a98a2cca2e +description: it is a valid rule +status: stable +level: high +date: 2023-12-09 +logsource: + category: process_creation + product: windows +detection: + selection: + ParentImage|endswith: '\httpd.exe' + Image|endswith: '\cmd.exe' + condition: selection +tags: + - attack.t1505.003 diff --git a/tests/files/nested_rules/group_two/another_sigma_rule.yml b/tests/files/nested_rules/group_two/another_sigma_rule.yml new file mode 100644 index 0000000..b48b09d --- /dev/null +++ b/tests/files/nested_rules/group_two/another_sigma_rule.yml @@ -0,0 +1,15 @@ +title: Another Test rule +id: 5013332f-8a70-4e04-bcc1-06a98a2cca3f +description: it is a valid rule +status: stable +level: high +date: 2023-12-09 +logsource: + category: process_creation + product: windows +detection: + selection: + ParentImage|endswith: '\abc.exe' + condition: selection +tags: + - attack.t1505.003 diff --git a/tests/files/valid/another_sigma_rule.yml b/tests/files/valid/another_sigma_rule.yml new file mode 100644 index 0000000..b48b09d --- /dev/null +++ b/tests/files/valid/another_sigma_rule.yml @@ -0,0 +1,15 @@ +title: Another Test rule +id: 5013332f-8a70-4e04-bcc1-06a98a2cca3f +description: it is a valid rule +status: stable +level: high +date: 2023-12-09 +logsource: + category: process_creation + product: windows +detection: + selection: + ParentImage|endswith: '\abc.exe' + condition: selection +tags: + - attack.t1505.003 diff --git a/tests/files/valid/sigma_rule.yml b/tests/files/valid/sigma_rule.yml index 11d9a29..3fbf217 100644 --- a/tests/files/valid/sigma_rule.yml +++ b/tests/files/valid/sigma_rule.yml @@ -13,4 +13,4 @@ detection: Image|endswith: '\cmd.exe' condition: selection tags: - - attack.t1505.003 \ No newline at end of file + - attack.t1505.003 diff --git a/tests/test_convert.py b/tests/test_convert.py index 6fa5855..d5b1ff5 100644 --- a/tests/test_convert.py +++ b/tests/test_convert.py @@ -1,7 +1,9 @@ +import shutil from click.testing import CliRunner import pytest from sigma.cli.convert import convert import sigma.backends.test.backend +import os def test_convert_help(): @@ -77,7 +79,8 @@ def test_convert_output_list_of_dict_indent(): def test_convert_output_str(): cli = CliRunner() result = cli.invoke( - convert, ["-t", "text_query_test", "-f", "str", "-c", "test", "tests/files/valid"] + convert, + ["-t", "text_query_test", "-f", "str", "-c", "test", "tests/files/valid"], ) assert "ParentImage" in result.stdout @@ -237,19 +240,93 @@ def test_convert_output_backend_option_list(): ) assert '[123, "test"]' in result.stdout + def test_convert_correlation_method_without_backend_correlation_support(monkeypatch): - monkeypatch.setattr(sigma.backends.test.backend.TextQueryTestBackend, "correlation_methods", None) + monkeypatch.setattr( + sigma.backends.test.backend.TextQueryTestBackend, "correlation_methods", None + ) cli = CliRunner() result = cli.invoke( - convert, ["-t", "text_query_test", "-f", "str", "-c", "test", "tests/files/valid"] + convert, + ["-t", "text_query_test", "-f", "str", "-c", "test", "tests/files/valid"], ) assert result.exit_code != 0 assert "Backend 'text_query_test' does not support correlation" in result.stdout + def test_convert_invalid_correlation_method(): cli = CliRunner() result = cli.invoke( - convert, ["-t", "text_query_test", "-f", "str", "-c", "invalid", "tests/files/valid"] + convert, + ["-t", "text_query_test", "-f", "str", "-c", "invalid", "tests/files/valid"], ) assert result.exit_code != 0 - assert "Correlation method 'invalid' is not supported" in result.stdout \ No newline at end of file + assert "Correlation method 'invalid' is not supported" in result.stdout + + +def test_convert_write_to_output_dir(): + """Tests if the correct output directory is created and that multiple translated rules are stored in individual files within one single directory.""" + cli = CliRunner() + + output_dir = "output_directory" + result = cli.invoke( + convert, + [ + "-t", + "text_query_test", + "tests/files/valid/", + "--output-dir", + output_dir, + ], + ) + assert result.exit_code == 0 + assert os.path.exists(output_dir), f"{output_dir} was not created" + assert os.path.exists( + os.path.join(output_dir, "sigma_rule.yml") + ), "rule file was not created" + shutil.rmtree(output_dir, ignore_errors=True) + + +def test_convert_write_to_output_dir_two_nesting_level(): + """Tests if the correct output directory is created and that multiple translated rules are stored in individual files, keeping the original directory structure. + (e.g. group_one/sigma_rule.yml, group_two/another_sigma_rule.yml) + """ + cli = CliRunner() + + output_dir = "output_directory" + result = cli.invoke( + convert, + [ + "-t", + "text_query_test", + "tests/files/nested_rules", + "--output-dir", + output_dir, + "--nesting-level", + "2", + ], + ) + assert result.exit_code == 0 + assert os.path.exists( + output_dir + ), f"general output directory {output_dir} was not created" + dir_rule_files_group_one = os.path.join(output_dir, "group_one") + dir_rule_files_group_two = os.path.join(output_dir, "group_two") + assert os.path.exists( + dir_rule_files_group_one + ), f"sub-directory {dir_rule_files_group_one} was not created" + assert os.path.exists( + dir_rule_files_group_two + ), f"sub-directory {dir_rule_files_group_two} was not created" + + path_rule_file_one = os.path.join(dir_rule_files_group_one, "sigma_rule.yml") + path_rule_file_two = os.path.join( + dir_rule_files_group_two, "another_sigma_rule.yml" + ) + assert os.path.exists( + path_rule_file_one + ), f"rule file {path_rule_file_one} was not created" + assert os.path.exists( + path_rule_file_two + ), f"rule file {path_rule_file_two} was not created" + shutil.rmtree(output_dir, ignore_errors=True) From 9753993ed5a362ff48205b7610bd599fb668a949 Mon Sep 17 00:00:00 2001 From: Mat0vu Date: Tue, 14 Jan 2025 08:44:02 +0100 Subject: [PATCH 2/4] improve description --- sigma/cli/convert.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sigma/cli/convert.py b/sigma/cli/convert.py index c0fd4d4..e281689 100644 --- a/sigma/cli/convert.py +++ b/sigma/cli/convert.py @@ -178,7 +178,7 @@ def fail(self, message: str, param, ctx): type=int, default=1, show_default=True, - help="Write result in INDIVIDUAL files for each rule in specified directory.", + help="To be used in combination with --output-dir. \n While writing results in individual files for each rule in the specified directory, the original hierarchical structure of the rule files is conserved for the specified levels.", ) @click.option( "--encoding", From 3dc9b65c7ba4a4e9e00bc13ea4576e52b880d279 Mon Sep 17 00:00:00 2001 From: Mat0vu Date: Fri, 24 Jan 2025 10:23:13 +0100 Subject: [PATCH 3/4] change path handling, add correlation rule test --- sigma/cli/convert.py | 43 +++++---------- tests/files/valid/correlation_rule.yml | 24 +++++++++ tests/test_convert.py | 72 +++++++++++++++----------- 3 files changed, 78 insertions(+), 61 deletions(-) create mode 100644 tests/files/valid/correlation_rule.yml diff --git a/sigma/cli/convert.py b/sigma/cli/convert.py index e281689..8841631 100644 --- a/sigma/cli/convert.py +++ b/sigma/cli/convert.py @@ -3,7 +3,7 @@ import textwrap import os from typing import Sequence - +import pathlib import click from sigma.cli.rules import load_rules @@ -23,27 +23,11 @@ pipeline_list = list(pipeline_resolver.pipelines.keys()) -def ensure_dir_exists(ctx, param, value): +def ensure_dir_exists(ctx, param, value: pathlib.Path): if value is None: - return value - # Split the path into its components - path_parts = value.split(os.sep) - current_path = "" - - for part in path_parts: - current_path = os.path.join(current_path, part) - - # Check if the current path exists - if not os.path.exists(current_path): - # If it doesn't exist, create it - click.echo(f"Creating specified output directory '{current_path}'") - os.makedirs(current_path, exist_ok=True) - elif not os.path.isdir(current_path): - # If it exists but is not a directory, raise an error - raise NotADirectoryError( - f"Cannot create directory '{current_path}' because a file with the same name exists." - ) + return None + value.mkdir(parents=True, exist_ok=True) return value @@ -346,7 +330,7 @@ def convert( writes_successful = True # Collect all Paths for Rules - all_paths = [] + all_paths: list[pathlib.Path] = [] for dir_path in input: all_paths.extend( list( @@ -357,17 +341,14 @@ def convert( ) ) for index, path_of_input in enumerate(all_paths): - original_path_part_to_keep = os.path.sep.join( - path_of_input.parts[-nesting_level:] + original_path_part_to_keep = pathlib.Path( + *path_of_input.parts[-nesting_level:] ) try: - out_path = os.path.join(output_dir, original_path_part_to_keep) - ensure_dir_exists( - ctx=None, param=None, value=os.path.dirname(out_path) - ) - with open(out_path, "w", encoding="utf-8") as f: - f.write(result[index]) + out_path: pathlib.Path = output_dir / original_path_part_to_keep + ensure_dir_exists(ctx=None, param=None, value=out_path.parent) + out_path.open("w", encoding="utf-8").write(result[index]) except Exception as ex: click.echo( f"Could not write translated rules into output-dir {output_dir}: \n {ex}" @@ -375,11 +356,11 @@ def convert( writes_successful = False if writes_successful: click.echo( - f"Written {len(result)} translated rule(s) into individual files in specified output-dir '{output_dir}'" + f"Written {len(result)} translated rule(s) into {len(all_paths)} individual files in specified output-dir '{output_dir}'" ) else: click.echo( - f"Could not write {len(result)} translated rule(s) into individual files in specified output-dir '{output_dir}'" + f"Could not write {len(result)} translated rule(s) into {len(all_paths)} individual files in specified output-dir '{output_dir}'" ) if isinstance(result, str): # String result diff --git a/tests/files/valid/correlation_rule.yml b/tests/files/valid/correlation_rule.yml new file mode 100644 index 0000000..b7139d2 --- /dev/null +++ b/tests/files/valid/correlation_rule.yml @@ -0,0 +1,24 @@ +title: Failed logon +name: failed_logon +status: test +logsource: + product: windows + service: security +detection: + selection: + EventID: 4625 + condition: selection +--- +title: Multiple failed logons for a single user (possible brute force attack) +status: test +correlation: + generate: true + type: event_count + rules: + - failed_logon + group-by: + - TargetUserName + - TargetDomainName + timespan: 5m + condition: + gte: 10 diff --git a/tests/test_convert.py b/tests/test_convert.py index d5b1ff5..9b2836f 100644 --- a/tests/test_convert.py +++ b/tests/test_convert.py @@ -4,7 +4,7 @@ from sigma.cli.convert import convert import sigma.backends.test.backend import os - +import pathlib def test_convert_help(): cli = CliRunner() @@ -264,11 +264,30 @@ def test_convert_invalid_correlation_method(): assert "Correlation method 'invalid' is not supported" in result.stdout -def test_convert_write_to_output_dir(): +def test_convert_correlation_rule_to_output_dir(tmp_path: pathlib.Path): + """Tests if the correct output directory is created and that multiple translated rules are stored in individual files within one single directory.""" + cli = CliRunner() + + result = cli.invoke( + convert, + [ + "-t", + "text_query_test", + "tests/files/valid/correlation_rule.yml", + "--output-dir", + tmp_path, + ], + ) + assert result.exit_code == 0 + assert tmp_path.exists(), f"{tmp_path} was not created" + assert tmp_path.is_dir(), f"{tmp_path} is no directory" + assert (tmp_path / "correlation_rule.yml").exists(), "rule file was not created" + + +def test_convert_write_to_output_dir(tmp_path: pathlib.Path): """Tests if the correct output directory is created and that multiple translated rules are stored in individual files within one single directory.""" cli = CliRunner() - output_dir = "output_directory" result = cli.invoke( convert, [ @@ -276,24 +295,21 @@ def test_convert_write_to_output_dir(): "text_query_test", "tests/files/valid/", "--output-dir", - output_dir, + tmp_path, ], ) assert result.exit_code == 0 - assert os.path.exists(output_dir), f"{output_dir} was not created" - assert os.path.exists( - os.path.join(output_dir, "sigma_rule.yml") - ), "rule file was not created" - shutil.rmtree(output_dir, ignore_errors=True) + assert tmp_path.exists(), f"{tmp_path} was not created" + assert tmp_path.is_dir(), f"{tmp_path} is no directory" + assert (tmp_path / "sigma_rule.yml").exists(), "rule file was not created" -def test_convert_write_to_output_dir_two_nesting_level(): +def test_convert_write_to_output_dir_two_nesting_level(tmp_path: pathlib.Path): """Tests if the correct output directory is created and that multiple translated rules are stored in individual files, keeping the original directory structure. (e.g. group_one/sigma_rule.yml, group_two/another_sigma_rule.yml) """ cli = CliRunner() - output_dir = "output_directory" result = cli.invoke( convert, [ @@ -301,32 +317,28 @@ def test_convert_write_to_output_dir_two_nesting_level(): "text_query_test", "tests/files/nested_rules", "--output-dir", - output_dir, + tmp_path, "--nesting-level", "2", ], ) assert result.exit_code == 0 - assert os.path.exists( - output_dir - ), f"general output directory {output_dir} was not created" - dir_rule_files_group_one = os.path.join(output_dir, "group_one") - dir_rule_files_group_two = os.path.join(output_dir, "group_two") - assert os.path.exists( - dir_rule_files_group_one + assert tmp_path.exists(), f"general output directory {tmp_path} was not created" + dir_rule_files_group_one = tmp_path / "group_one" + dir_rule_files_group_two = tmp_path / "group_two" + assert ( + dir_rule_files_group_one.exists() ), f"sub-directory {dir_rule_files_group_one} was not created" - assert os.path.exists( - dir_rule_files_group_two + assert ( + dir_rule_files_group_two.exists() ), f"sub-directory {dir_rule_files_group_two} was not created" - path_rule_file_one = os.path.join(dir_rule_files_group_one, "sigma_rule.yml") - path_rule_file_two = os.path.join( - dir_rule_files_group_two, "another_sigma_rule.yml" - ) - assert os.path.exists( - path_rule_file_one + path_rule_file_one = dir_rule_files_group_one / "sigma_rule.yml" + path_rule_file_two = dir_rule_files_group_two / "another_sigma_rule.yml" + + assert ( + path_rule_file_one.exists() ), f"rule file {path_rule_file_one} was not created" - assert os.path.exists( - path_rule_file_two + assert ( + path_rule_file_two.exists() ), f"rule file {path_rule_file_two} was not created" - shutil.rmtree(output_dir, ignore_errors=True) From 8b00b500c16d105e57edf6b0643c006c25b13a0d Mon Sep 17 00:00:00 2001 From: Mat0vu Date: Fri, 24 Jan 2025 11:28:08 +0100 Subject: [PATCH 4/4] add uuids to correlation rule --- tests/files/valid/correlation_rule.yml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/files/valid/correlation_rule.yml b/tests/files/valid/correlation_rule.yml index b7139d2..8af9f8e 100644 --- a/tests/files/valid/correlation_rule.yml +++ b/tests/files/valid/correlation_rule.yml @@ -1,5 +1,6 @@ title: Failed logon name: failed_logon +id: bb450e7c-f1b7-4479-9c9c-e7503d728de1 status: test logsource: product: windows @@ -11,6 +12,7 @@ detection: --- title: Multiple failed logons for a single user (possible brute force attack) status: test +id: 887a603d-b9b9-4f53-a0e3-d24f15038e1e correlation: generate: true type: event_count