Skip to content

Commit

Permalink
Merge pull request #1714 from mandiant/fix/issue-1697-1
Browse files Browse the repository at this point in the history
rule scoping tweaks
  • Loading branch information
williballenthin authored Aug 15, 2023
2 parents 476c7ff + 4411911 commit c001c88
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 50 deletions.
32 changes: 2 additions & 30 deletions capa/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import itertools
import contextlib
import collections
from enum import Enum
from typing import Any, Dict, List, Tuple, Callable, Optional
from pathlib import Path

Expand Down Expand Up @@ -80,8 +79,6 @@
FORMAT_DOTNET,
FORMAT_FREEZE,
FORMAT_RESULT,
STATIC_FORMATS,
DYNAMIC_FORMATS,
)
from capa.features.address import NO_ADDRESS, Address
from capa.features.extractors.base_extractor import (
Expand Down Expand Up @@ -118,15 +115,6 @@
logger = logging.getLogger("capa")


class ExecutionContext(str, Enum):
STATIC = "static"
DYNAMIC = "dynamic"


STATIC_CONTEXT = ExecutionContext.STATIC
DYNAMIC_CONTEXT = ExecutionContext.DYNAMIC


@contextlib.contextmanager
def timing(msg: str):
t0 = time.time()
Expand Down Expand Up @@ -890,7 +878,6 @@ def get_rules(
rule_paths: List[RulePath],
cache_dir=None,
on_load_rule: Callable[[RulePath, int, int], None] = on_load_rule_default,
analysis_context: Optional[ExecutionContext] = None,
) -> RuleSet:
"""
args:
Expand Down Expand Up @@ -929,14 +916,7 @@ def get_rules(
rules.append(rule)
logger.debug("loaded rule: '%s' with scope: %s", rule.name, rule.scopes)

# filter rules according to the execution context
if analysis_context is STATIC_CONTEXT:
ruleset = capa.rules.RuleSet(rules, rules_filter_func=lambda rule: rule.scopes.static)
elif analysis_context is DYNAMIC_CONTEXT:
ruleset = capa.rules.RuleSet(rules, rules_filter_func=lambda rule: rule.scopes.dynamic)
else:
# default: load all rules
ruleset = capa.rules.RuleSet(rules)
ruleset = capa.rules.RuleSet(rules)

capa.rules.cache.cache_ruleset(cache_dir, ruleset)

Expand Down Expand Up @@ -1465,15 +1445,7 @@ def main(argv: Optional[List[str]] = None):
else:
cache_dir = capa.rules.cache.get_default_cache_directory()

if format_ in STATIC_FORMATS:
analysis_context = STATIC_CONTEXT
elif format_ in DYNAMIC_FORMATS:
analysis_context = DYNAMIC_CONTEXT
else:
# freeze or result formats
analysis_context = None

rules = get_rules(args.rules, cache_dir=cache_dir, analysis_context=analysis_context)
rules = get_rules(args.rules, cache_dir=cache_dir)

logger.debug(
"successfully loaded %s rules",
Expand Down
31 changes: 13 additions & 18 deletions capa/rules/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,25 +97,27 @@ class Scope(str, Enum):

# these literals are used to check if the flavor
# of a rule is correct.
STATIC_SCOPES = (
STATIC_SCOPES = {
FILE_SCOPE,
GLOBAL_SCOPE,
FUNCTION_SCOPE,
BASIC_BLOCK_SCOPE,
INSTRUCTION_SCOPE,
)
DYNAMIC_SCOPES = (
}
DYNAMIC_SCOPES = {
FILE_SCOPE,
GLOBAL_SCOPE,
PROCESS_SCOPE,
THREAD_SCOPE,
CALL_SCOPE,
)
}


@dataclass
class Scopes:
# when None, the scope is not supported by a rule
static: Optional[str] = None
# when None, the scope is not supported by a rule
dynamic: Optional[str] = None

def __contains__(self, scope: Union[Scope, str]) -> bool:
Expand All @@ -135,6 +137,10 @@ def __repr__(self) -> str:
@classmethod
def from_dict(self, scopes: dict) -> "Scopes":
assert isinstance(scopes, dict)

# make local copy so we don't make changes outside of this routine
scopes = dict(scopes)

# mark non-specified scopes as invalid
if "static" not in scopes:
scopes["static"] = None
Expand All @@ -148,15 +154,10 @@ def from_dict(self, scopes: dict) -> "Scopes":
raise InvalidRule("invalid scopes value. At least one scope must be specified")

# check that all the specified scopes are valid
if scopes["static"] not in (
*STATIC_SCOPES,
None,
):
if scopes["static"] and scopes["static"] not in STATIC_SCOPES:
raise InvalidRule(f"{scopes['static']} is not a valid static scope")
if scopes["dynamic"] not in (
*DYNAMIC_SCOPES,
None,
):

if scopes["dynamic"] and scopes["dynamic"] not in DYNAMIC_SCOPES:
raise InvalidRule(f"{scopes['dynamic']} is not a valid dynamic scope")

return Scopes(static=scopes["static"], dynamic=scopes["dynamic"])
Expand Down Expand Up @@ -1262,7 +1263,6 @@ class RuleSet:
def __init__(
self,
rules: List[Rule],
rules_filter_func=None,
):
super().__init__()

Expand All @@ -1280,11 +1280,6 @@ def __init__(

ensure_rule_dependencies_are_met(rules)

if rules_filter_func:
# this allows for filtering the ruleset based on
# the execution context (static or dynamic)
rules = list(filter(rules_filter_func, rules))

if len(rules) == 0:
raise InvalidRuleSet("no rules selected")

Expand Down
4 changes: 2 additions & 2 deletions tests/test_rules.py
Original file line number Diff line number Diff line change
Expand Up @@ -408,8 +408,8 @@ def test_rules_flavor_filtering():
),
]

static_rules = capa.rules.RuleSet(rules.copy(), rules_filter_func=lambda rule: rule.scopes.static)
dynamic_rules = capa.rules.RuleSet(rules, rules_filter_func=lambda rule: rule.scopes.dynamic)
static_rules = capa.rules.RuleSet([r for r in rules if r.meta.scopes.static is not None])
dynamic_rules = capa.rules.RuleSet([r for r in rules if r.meta.scopes.dynamic is not None])

# only static rule
assert len(static_rules) == 1
Expand Down

0 comments on commit c001c88

Please sign in to comment.