diff --git a/sigma/cli/convert.py b/sigma/cli/convert.py index cf74baa..8841631 100644 --- a/sigma/cli/convert.py +++ b/sigma/cli/convert.py @@ -1,12 +1,14 @@ import json import pathlib import textwrap +import os from typing import Sequence - +import pathlib import click from sigma.cli.rules import load_rules from sigma.conversion.base import Backend +from sigma.collection import SigmaCollection from sigma.exceptions import ( SigmaError, SigmaPipelineNotAllowedForBackendError, @@ -21,6 +23,14 @@ pipeline_list = list(pipeline_resolver.pipelines.keys()) +def ensure_dir_exists(ctx, param, value: pathlib.Path): + if value is None: + return None + + value.mkdir(parents=True, exist_ok=True) + return value + + class KeyValueParamType(click.ParamType): """ key=value type for backend-specific options. @@ -106,7 +116,7 @@ def fail(self, message: str, param, ctx): @click.option( "--correlation-method", "-c", - help="Select method for generation of correlation queries. If not given the default method of the backend is used." + help="Select method for generation of correlation queries. If not given the default method of the backend is used.", ) @click.option( "--filter", @@ -125,7 +135,7 @@ def fail(self, message: str, param, ctx): "--skip-unsupported/--fail-unsupported", "-s/", default=False, - help="Skip conversion of rules that can't be handled by the backend", + help="Skip conversion of rules that can't be handled by the backend.", ) @click.option( "--output", @@ -135,6 +145,25 @@ def fail(self, message: str, param, ctx): show_default=True, help="Write result to specified file. '-' writes to standard output.", ) +@click.option( + "--output-dir", + "-od", + type=click.Path( + file_okay=False, dir_okay=True, writable=True, exists=False, resolve_path=False + ), + default=None, + show_default=True, + help="Write result in INDIVIDUAL files for each rule in specified directory.", + callback=ensure_dir_exists, +) +@click.option( + "--nesting-level", + "-nl", + type=int, + default=1, + show_default=True, + help="To be used in combination with --output-dir. \n While writing results in individual files for each rule in the specified directory, the original hierarchical structure of the rule files is conserved for the specified levels.", +) @click.option( "--encoding", "-e", @@ -171,6 +200,7 @@ def fail(self, message: str, param, ctx): type=click.BOOL, help="Verbose output.", ) + def convert( target, pipeline, @@ -187,6 +217,8 @@ def convert( input, file_pattern, verbose, + output_dir, + nesting_level, ): """ Convert Sigma rules into queries. INPUT can be multiple files or directories. This command automatically recurses @@ -221,7 +253,6 @@ def convert( k: (v[0] if len(v) == 1 else v) # if there's only one item, return it. for k, v in backend_options.items() } - # Initialize processing pipeline and backend backend_class = backends[target] try: @@ -267,7 +298,6 @@ def convert( f"Parameter '{param}' is not supported by backend '{target}'.", param_hint="backend_option", ) - if format not in backends[target].formats.keys(): raise click.BadParameter( f"Output format '{format}' is not supported by backend '{target}'. Run " @@ -275,7 +305,7 @@ def convert( + " to list all available formats of the target.", param_hint="format", ) - + if correlation_method is not None: correlation_methods = backend.correlation_methods if correlation_methods is None: @@ -286,7 +316,9 @@ def convert( elif correlation_method not in correlation_methods.keys(): raise click.BadParameter( f"Correlation method '{correlation_method}' is not supported by backend '{target}'. Run " - + click.style(f"sigma list correlation-methods {target}", bold=True, fg="green") + + click.style( + f"sigma list correlation-methods {target}", bold=True, fg="green" + ) + " to list all available correlation methods of the target.", param_hint="correlation_method", ) @@ -294,6 +326,43 @@ def convert( try: rule_collection = load_rules(input + filter, file_pattern) result = backend.convert(rule_collection, format, correlation_method) + if output_dir: + writes_successful = True + + # Collect all Paths for Rules + all_paths: list[pathlib.Path] = [] + for dir_path in input: + all_paths.extend( + list( + SigmaCollection.resolve_paths( + [dir_path], + recursion_pattern="**/" + file_pattern, + ) + ) + ) + for index, path_of_input in enumerate(all_paths): + original_path_part_to_keep = pathlib.Path( + *path_of_input.parts[-nesting_level:] + ) + + try: + out_path: pathlib.Path = output_dir / original_path_part_to_keep + ensure_dir_exists(ctx=None, param=None, value=out_path.parent) + out_path.open("w", encoding="utf-8").write(result[index]) + except Exception as ex: + click.echo( + f"Could not write translated rules into output-dir {output_dir}: \n {ex}" + ) + writes_successful = False + if writes_successful: + click.echo( + f"Written {len(result)} translated rule(s) into {len(all_paths)} individual files in specified output-dir '{output_dir}'" + ) + else: + click.echo( + f"Could not write {len(result)} translated rule(s) into {len(all_paths)} individual files in specified output-dir '{output_dir}'" + ) + if isinstance(result, str): # String result click.echo(bytes(result, encoding), output) elif isinstance(result, bytes): # Bytes result: only allow to write it to file. @@ -331,16 +400,21 @@ def convert( ) except SigmaError as e: if verbose: - click.echo('Error while converting') + click.echo("Error while converting") raise e else: raise click.ClickException("Error while converting: " + str(e)) except NotImplementedError as e: if verbose: - click.echo('Feature required for conversion of Sigma rule is not supported by backend') + click.echo( + "Feature required for conversion of Sigma rule is not supported by backend" + ) raise e else: - raise click.ClickException("Feature required for conversion of Sigma rule is not supported by backend: " + str(e)) + raise click.ClickException( + "Feature required for conversion of Sigma rule is not supported by backend: " + + str(e) + ) if len(backend.errors) > 0: click.echo("\nIgnored errors:", err=True) diff --git a/tests/files/nested_rules/group_one/sigma_rule.yml b/tests/files/nested_rules/group_one/sigma_rule.yml new file mode 100644 index 0000000..3fbf217 --- /dev/null +++ b/tests/files/nested_rules/group_one/sigma_rule.yml @@ -0,0 +1,16 @@ +title: Test rule +id: 5013332f-8a70-4e04-bcc1-06a98a2cca2e +description: it is a valid rule +status: stable +level: high +date: 2023-12-09 +logsource: + category: process_creation + product: windows +detection: + selection: + ParentImage|endswith: '\httpd.exe' + Image|endswith: '\cmd.exe' + condition: selection +tags: + - attack.t1505.003 diff --git a/tests/files/nested_rules/group_two/another_sigma_rule.yml b/tests/files/nested_rules/group_two/another_sigma_rule.yml new file mode 100644 index 0000000..b48b09d --- /dev/null +++ b/tests/files/nested_rules/group_two/another_sigma_rule.yml @@ -0,0 +1,15 @@ +title: Another Test rule +id: 5013332f-8a70-4e04-bcc1-06a98a2cca3f +description: it is a valid rule +status: stable +level: high +date: 2023-12-09 +logsource: + category: process_creation + product: windows +detection: + selection: + ParentImage|endswith: '\abc.exe' + condition: selection +tags: + - attack.t1505.003 diff --git a/tests/files/valid/another_sigma_rule.yml b/tests/files/valid/another_sigma_rule.yml new file mode 100644 index 0000000..b48b09d --- /dev/null +++ b/tests/files/valid/another_sigma_rule.yml @@ -0,0 +1,15 @@ +title: Another Test rule +id: 5013332f-8a70-4e04-bcc1-06a98a2cca3f +description: it is a valid rule +status: stable +level: high +date: 2023-12-09 +logsource: + category: process_creation + product: windows +detection: + selection: + ParentImage|endswith: '\abc.exe' + condition: selection +tags: + - attack.t1505.003 diff --git a/tests/files/valid/correlation_rule.yml b/tests/files/valid/correlation_rule.yml new file mode 100644 index 0000000..8af9f8e --- /dev/null +++ b/tests/files/valid/correlation_rule.yml @@ -0,0 +1,26 @@ +title: Failed logon +name: failed_logon +id: bb450e7c-f1b7-4479-9c9c-e7503d728de1 +status: test +logsource: + product: windows + service: security +detection: + selection: + EventID: 4625 + condition: selection +--- +title: Multiple failed logons for a single user (possible brute force attack) +status: test +id: 887a603d-b9b9-4f53-a0e3-d24f15038e1e +correlation: + generate: true + type: event_count + rules: + - failed_logon + group-by: + - TargetUserName + - TargetDomainName + timespan: 5m + condition: + gte: 10 diff --git a/tests/files/valid/sigma_rule.yml b/tests/files/valid/sigma_rule.yml index 11d9a29..3fbf217 100644 --- a/tests/files/valid/sigma_rule.yml +++ b/tests/files/valid/sigma_rule.yml @@ -13,4 +13,4 @@ detection: Image|endswith: '\cmd.exe' condition: selection tags: - - attack.t1505.003 \ No newline at end of file + - attack.t1505.003 diff --git a/tests/test_convert.py b/tests/test_convert.py index 6fa5855..9b2836f 100644 --- a/tests/test_convert.py +++ b/tests/test_convert.py @@ -1,8 +1,10 @@ +import shutil from click.testing import CliRunner import pytest from sigma.cli.convert import convert import sigma.backends.test.backend - +import os +import pathlib def test_convert_help(): cli = CliRunner() @@ -77,7 +79,8 @@ def test_convert_output_list_of_dict_indent(): def test_convert_output_str(): cli = CliRunner() result = cli.invoke( - convert, ["-t", "text_query_test", "-f", "str", "-c", "test", "tests/files/valid"] + convert, + ["-t", "text_query_test", "-f", "str", "-c", "test", "tests/files/valid"], ) assert "ParentImage" in result.stdout @@ -237,19 +240,105 @@ def test_convert_output_backend_option_list(): ) assert '[123, "test"]' in result.stdout + def test_convert_correlation_method_without_backend_correlation_support(monkeypatch): - monkeypatch.setattr(sigma.backends.test.backend.TextQueryTestBackend, "correlation_methods", None) + monkeypatch.setattr( + sigma.backends.test.backend.TextQueryTestBackend, "correlation_methods", None + ) cli = CliRunner() result = cli.invoke( - convert, ["-t", "text_query_test", "-f", "str", "-c", "test", "tests/files/valid"] + convert, + ["-t", "text_query_test", "-f", "str", "-c", "test", "tests/files/valid"], ) assert result.exit_code != 0 assert "Backend 'text_query_test' does not support correlation" in result.stdout + def test_convert_invalid_correlation_method(): cli = CliRunner() result = cli.invoke( - convert, ["-t", "text_query_test", "-f", "str", "-c", "invalid", "tests/files/valid"] + convert, + ["-t", "text_query_test", "-f", "str", "-c", "invalid", "tests/files/valid"], ) assert result.exit_code != 0 - assert "Correlation method 'invalid' is not supported" in result.stdout \ No newline at end of file + assert "Correlation method 'invalid' is not supported" in result.stdout + + +def test_convert_correlation_rule_to_output_dir(tmp_path: pathlib.Path): + """Tests if the correct output directory is created and that multiple translated rules are stored in individual files within one single directory.""" + cli = CliRunner() + + result = cli.invoke( + convert, + [ + "-t", + "text_query_test", + "tests/files/valid/correlation_rule.yml", + "--output-dir", + tmp_path, + ], + ) + assert result.exit_code == 0 + assert tmp_path.exists(), f"{tmp_path} was not created" + assert tmp_path.is_dir(), f"{tmp_path} is no directory" + assert (tmp_path / "correlation_rule.yml").exists(), "rule file was not created" + + +def test_convert_write_to_output_dir(tmp_path: pathlib.Path): + """Tests if the correct output directory is created and that multiple translated rules are stored in individual files within one single directory.""" + cli = CliRunner() + + result = cli.invoke( + convert, + [ + "-t", + "text_query_test", + "tests/files/valid/", + "--output-dir", + tmp_path, + ], + ) + assert result.exit_code == 0 + assert tmp_path.exists(), f"{tmp_path} was not created" + assert tmp_path.is_dir(), f"{tmp_path} is no directory" + assert (tmp_path / "sigma_rule.yml").exists(), "rule file was not created" + + +def test_convert_write_to_output_dir_two_nesting_level(tmp_path: pathlib.Path): + """Tests if the correct output directory is created and that multiple translated rules are stored in individual files, keeping the original directory structure. + (e.g. group_one/sigma_rule.yml, group_two/another_sigma_rule.yml) + """ + cli = CliRunner() + + result = cli.invoke( + convert, + [ + "-t", + "text_query_test", + "tests/files/nested_rules", + "--output-dir", + tmp_path, + "--nesting-level", + "2", + ], + ) + assert result.exit_code == 0 + assert tmp_path.exists(), f"general output directory {tmp_path} was not created" + dir_rule_files_group_one = tmp_path / "group_one" + dir_rule_files_group_two = tmp_path / "group_two" + assert ( + dir_rule_files_group_one.exists() + ), f"sub-directory {dir_rule_files_group_one} was not created" + assert ( + dir_rule_files_group_two.exists() + ), f"sub-directory {dir_rule_files_group_two} was not created" + + path_rule_file_one = dir_rule_files_group_one / "sigma_rule.yml" + path_rule_file_two = dir_rule_files_group_two / "another_sigma_rule.yml" + + assert ( + path_rule_file_one.exists() + ), f"rule file {path_rule_file_one} was not created" + assert ( + path_rule_file_two.exists() + ), f"rule file {path_rule_file_two} was not created"