Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

apply black formatter #54

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions sigma/cli/analyze.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ def analyze_attack(
}
json.dump(layer, output, indent=2)


@analyze_group.command(name="logsource", help="Create stats about logsources.")
@click.option(
"--file-pattern",
Expand Down
8 changes: 5 additions & 3 deletions sigma/cli/check.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,9 +145,11 @@ def check(
# Need to split SigmaValidationIssue __str__
rules = ", ".join(
[
str(rule.source)
if rule.source is not None
else str(rule.id) or rule.title
(
str(rule.source)
if rule.source is not None
else str(rule.id) or rule.title
)
for rule in issue.rules
]
)
Expand Down
21 changes: 14 additions & 7 deletions sigma/cli/convert.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ def fail(self, message: str, param, ctx):
@click.option(
"--correlation-method",
"-c",
help="Select method for generation of correlation queries. If not given the default method of the backend is used."
help="Select method for generation of correlation queries. If not given the default method of the backend is used.",
)
@click.option(
"--filter",
Expand Down Expand Up @@ -212,7 +212,7 @@ def convert(
)
)

# Merge backend options: multiple occurences of a key result in array of values
# Merge backend options: multiple occurrences of a key result in array of values
backend_options = dict()
for option in backend_option:
for k, v in option.items():
Expand Down Expand Up @@ -275,7 +275,7 @@ def convert(
+ " to list all available formats of the target.",
param_hint="format",
)

if correlation_method is not None:
correlation_methods = backend.correlation_methods
if correlation_methods is None:
Expand All @@ -286,7 +286,9 @@ def convert(
elif correlation_method not in correlation_methods.keys():
raise click.BadParameter(
f"Correlation method '{correlation_method}' is not supported by backend '{target}'. Run "
+ click.style(f"sigma list correlation-methods {target}", bold=True, fg="green")
+ click.style(
f"sigma list correlation-methods {target}", bold=True, fg="green"
)
+ " to list all available correlation methods of the target.",
param_hint="correlation_method",
)
Expand Down Expand Up @@ -331,16 +333,21 @@ def convert(
)
except SigmaError as e:
if verbose:
click.echo('Error while converting')
click.echo("Error while converting")
raise e
else:
raise click.ClickException("Error while converting: " + str(e))
except NotImplementedError as e:
if verbose:
click.echo('Feature required for conversion of Sigma rule is not supported by backend')
click.echo(
"Feature required for conversion of Sigma rule is not supported by backend"
)
raise e
else:
raise click.ClickException("Feature required for conversion of Sigma rule is not supported by backend: " + str(e))
raise click.ClickException(
"Feature required for conversion of Sigma rule is not supported by backend: "
+ str(e)
)

if len(backend.errors) > 0:
click.echo("\nIgnored errors:", err=True)
Expand Down
4 changes: 3 additions & 1 deletion sigma/cli/list.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,10 @@ def list_formats(backend):
table.align = "l"
click.echo(table.get_string())


@list_group.command(
name="correlation-methods", help="List correlation methods supported by specified backend."
name="correlation-methods",
help="List correlation methods supported by specified backend.",
)
@click.argument(
"backend",
Expand Down
6 changes: 2 additions & 4 deletions sigma/cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
)
click.echo(
click.style(
"It is strogly recommended to install Sigma CLI with pipx (https://pypa.github.io/pipx/) to ensure a clean environment.",
"It is strongly recommended to install Sigma CLI with pipx (https://pypa.github.io/pipx/) to ensure a clean environment.",
fg="green",
)
)
Expand All @@ -37,9 +37,7 @@
from .pysigma import check_pysigma_command


CONTEXT_SETTINGS={
"help_option_names": ['-h', '--help']
}
CONTEXT_SETTINGS = {"help_option_names": ["-h", "--help"]}


@click.group(context_settings=CONTEXT_SETTINGS)
Expand Down
28 changes: 23 additions & 5 deletions sigma/cli/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,14 @@ def list_plugins(plugin_type: str, plugin_state: str, compatible: bool, search:
norm_search = search.lower()

table = PrettyTable()
table.field_names = ("Identifier", "Type", "State", "Description", "Compatible?", "Capabilities")
table.field_names = (
"Identifier",
"Type",
"State",
"Description",
"Compatible?",
"Capabilities",
)
table.add_rows(
[
(
Expand Down Expand Up @@ -108,7 +115,10 @@ def show_plugin(uuid: bool, plugin_identifier: str):
("Description", fill(plugin.description, width=60)),
("Required pySigma version", plugin.pysigma_version),
("Compatible?", plugin.is_compatible()),
("Capabilities", "\n".join(str(capability) for capability in plugin.capabilities)),
(
"Capabilities",
"\n".join(str(capability) for capability in plugin.capabilities),
),
("Project URL", plugin.project_url),
("Report Issue URL", plugin.report_issue_url),
]
Expand All @@ -133,7 +143,10 @@ def show_plugin(uuid: bool, plugin_identifier: str):
)
@click.argument("plugin-identifiers", nargs=-1)
def install_plugin(
uuid: bool, compatibility_check: bool, check_pysigma: bool, plugin_identifiers: List[str]
uuid: bool,
compatibility_check: bool,
check_pysigma: bool,
plugin_identifiers: List[str],
):
for plugin_identifier in plugin_identifiers:
plugin = get_plugin(uuid, plugin_identifier)
Expand All @@ -142,12 +155,17 @@ def install_plugin(
click.echo(f"Successfully installed plugin '{plugin_identifier}'")
else:
raise click.exceptions.ClickException(
"Plugin not compatible with installed pySigma version! " + click.style("Use '--force-install' or its shortcut '-f' to install anyway.", fg="green")
"Plugin not compatible with installed pySigma version! "
+ click.style(
"Use '--force-install' or its shortcut '-f' to install anyway.",
fg="green",
)
)

if check_pysigma:
check_pysigma_command()


@plugin_group.command(name="upgrade", help="Upgrade installed plugin.")
@click.option(
"--compatibility-check/--no-compatibility-check",
Expand Down
54 changes: 39 additions & 15 deletions sigma/cli/pysigma.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,22 @@
import click
from packaging.specifiers import SpecifierSet


def get_pysigma_requirement():
requires = importlib.metadata.requires("sigma-cli")
return [
r
for r in requires
if r.startswith("pysigma ")
][0]
return [r for r in requires if r.startswith("pysigma ")][0]


def check_pysigma_version():
"""Check if the installed version of pysigma is compatible with the version required by sigma-cli."""
requires_pysgima = get_pysigma_requirement()
version_specifier = SpecifierSet(requires_pysgima.split(" ")[1][1:-1])
return importlib.metadata.version("pysigma") in version_specifier


@click.command(
name="check-pysigma",
help="Check if the installed version of pysigma is compatible with the version required by sigma-cli."
help="Check if the installed version of pysigma is compatible with the version required by sigma-cli.",
)
@click.option(
"--quiet/--no-quiet",
Expand All @@ -31,21 +30,43 @@ def check_pysigma_version():
def check_pysigma_command(quiet):
check_pysigma(quiet)


def check_pysigma(quiet=False):
"""Check the version of pySigma against the required version of sigma-cli and reinstall on user prompt if
necessary."""
if check_pysigma_version():
if not quiet:
click.echo(click.style("pySigma version is compatible with sigma-cli", fg="green"))
click.echo(
click.style("pySigma version is compatible with sigma-cli", fg="green")
)
else:
click.echo(click.style("The currently installed pySigma version is not compatible with sigma-cli!", fg="red"))
click.echo(click.style("Installed pySigma version: ", fg="yellow") + click.style(importlib.metadata.version("pysigma"), bold=True, fg="yellow"))
click.echo(click.style("Required pySigma version: ", fg="yellow") + click.style(get_pysigma_requirement(), bold=True, fg="yellow"))
click.echo("Usually the reason for this is that a backend with a different required pySigma version was installed.")
click.echo("Because pySigma development aims to keep the API stable, it is likely that the backend is still working with the pySigma version required by sigma-cli.")
click.echo(
click.style(
"The currently installed pySigma version is not compatible with sigma-cli!",
fg="red",
)
)
click.echo(
click.style("Installed pySigma version: ", fg="yellow")
+ click.style(importlib.metadata.version("pysigma"), bold=True, fg="yellow")
)
click.echo(
click.style("Required pySigma version: ", fg="yellow")
+ click.style(get_pysigma_requirement(), bold=True, fg="yellow")
)
click.echo(
"Usually the reason for this is that a backend with a different required pySigma version was installed."
)
click.echo(
"Because pySigma development aims to keep the API stable, it is likely that the backend is still working with the pySigma version required by sigma-cli."
)
click.echo("You have now two options:")
click.echo("✅ Reinstall pySigma to match the pySigma version required by the CLI. This can break the functionality of the backend!")
click.echo("❌ Ignore this warning and continue using the CLI. This can lead to unexpected behaviour or break other plugins that rely on current features.")
click.echo(
"✅ Reinstall pySigma to match the pySigma version required by the CLI. This can break the functionality of the backend!"
)
click.echo(
"❌ Ignore this warning and continue using the CLI. This can lead to unexpected behavior or break other plugins that rely on current features."
)
if click.confirm("Do you want to reinstall pySigma now?"):
subprocess.run(
[
Expand All @@ -61,4 +82,7 @@ def check_pysigma(quiet=False):
)
click.echo("pySigma successfully reinstalled")
else:
click.echo("Incompatible pySigma version was keeped. You can rerun the check with: " + click.style("sigma check-pysigma", fg="green"))
click.echo(
"Incompatible pySigma version was keeped. You can rerun the check with: "
+ click.style("sigma check-pysigma", fg="green")
)
28 changes: 16 additions & 12 deletions sigma/cli/rules.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,25 +12,29 @@ def load_rules(input, file_pattern):

for path in list(input):
if path == Path("-"):
rule_collection = SigmaCollection.merge([
rule_collection,
SigmaCollection.from_yaml(click.get_text_stream("stdin"))
])
rule_collection = SigmaCollection.merge(
[
rule_collection,
SigmaCollection.from_yaml(click.get_text_stream("stdin")),
]
)
else:
rule_paths = SigmaCollection.resolve_paths(
[path],
recursion_pattern="**/" + file_pattern,
)
with click.progressbar(
list(rule_paths), label="Parsing Sigma rules", file=stderr
list(rule_paths), label="Parsing Sigma rules", file=stderr
) as progress_rule_paths:
rule_collection = SigmaCollection.merge([
rule_collection,
SigmaCollection.load_ruleset(
progress_rule_paths,
collect_errors=True,
)
])
rule_collection = SigmaCollection.merge(
[
rule_collection,
SigmaCollection.load_ruleset(
progress_rule_paths,
collect_errors=True,
),
]
)

rule_collection.resolve_rule_references()

Expand Down
5 changes: 2 additions & 3 deletions tests/test_analyze.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,5 @@ def test_logsource_get_rulelevel_mapping(sigma_rules):
def test_logsource_create_logsourcestats(sigma_rules):
ret = create_logsourcestats(sigma_rules)

assert 'test' in ret
assert ret['test'].get("Overall") == len(sigma_rules)

assert "test" in ret
assert ret["test"].get("Overall") == len(sigma_rules)
17 changes: 12 additions & 5 deletions tests/test_convert.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ def test_convert_output_list_of_dict_indent():
def test_convert_output_str():
cli = CliRunner()
result = cli.invoke(
convert, ["-t", "text_query_test", "-f", "str", "-c", "test", "tests/files/valid"]
convert,
["-t", "text_query_test", "-f", "str", "-c", "test", "tests/files/valid"],
)
assert "ParentImage" in result.stdout

Expand Down Expand Up @@ -247,19 +248,25 @@ def test_convert_output_backend_option_list():
)
assert '[123, "test"]' in result.stdout


def test_convert_correlation_method_without_backend_correlation_support(monkeypatch):
monkeypatch.setattr(sigma.backends.test.backend.TextQueryTestBackend, "correlation_methods", None)
monkeypatch.setattr(
sigma.backends.test.backend.TextQueryTestBackend, "correlation_methods", None
)
cli = CliRunner()
result = cli.invoke(
convert, ["-t", "text_query_test", "-f", "str", "-c", "test", "tests/files/valid"]
convert,
["-t", "text_query_test", "-f", "str", "-c", "test", "tests/files/valid"],
)
assert result.exit_code != 0
assert "Backend 'text_query_test' does not support correlation" in result.stdout


def test_convert_invalid_correlation_method():
cli = CliRunner()
result = cli.invoke(
convert, ["-t", "text_query_test", "-f", "str", "-c", "invalid", "tests/files/valid"]
convert,
["-t", "text_query_test", "-f", "str", "-c", "invalid", "tests/files/valid"],
)
assert result.exit_code != 0
assert "Correlation method 'invalid' is not supported" in result.stdout
assert "Correlation method 'invalid' is not supported" in result.stdout
Loading
Loading