Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Statement construction for mixed-scope rules #1671

1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
- Add support for flavor-based rule scopes @yelhamer
- Add ProcessesAddress and ThreadAddress #1612 @yelhamer
- Add dynamic capability extraction @yelhamer
- Add support for mixed-scopes rules @yelhamer

### Breaking Changes

Expand Down
7 changes: 7 additions & 0 deletions capa/features/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,13 @@ def evaluate(self, ctx, **kwargs):
FORMAT_SC32 = "sc32"
FORMAT_SC64 = "sc64"
FORMAT_CAPE = "cape"
STATIC_FORMATS = (
FORMAT_SC32,
FORMAT_SC64,
FORMAT_PE,
FORMAT_ELF,
FORMAT_DOTNET,
)
DYNAMIC_FORMATS = (FORMAT_CAPE,)
FORMAT_FREEZE = "freeze"
FORMAT_RESULT = "result"
Expand Down
21 changes: 21 additions & 0 deletions capa/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
import json
import inspect
import logging
import weakref
import functools
import contextlib
import importlib.util
from typing import NoReturn
Expand Down Expand Up @@ -130,6 +132,25 @@ def new_print(*args, **kwargs):
inspect.builtins.print = old_print # type: ignore


def weak_lru(maxsize=128, typed=False):
"""
LRU Cache decorator that keeps a weak reference to 'self'
"""

def wrapper(func):
@functools.lru_cache(maxsize, typed)
def _func(_self, *args, **kwargs):
return func(_self(), *args, **kwargs)

@functools.wraps(func)
def inner(self, *args, **kwargs):
return _func(weakref.ref(self), *args, **kwargs)

return inner

return wrapper
yelhamer marked this conversation as resolved.
Show resolved Hide resolved


def log_unsupported_format_error():
logger.error("-" * 80)
logger.error(" Input file does not appear to be a PE or ELF file.")
Expand Down
32 changes: 30 additions & 2 deletions capa/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import itertools
import contextlib
import collections
from enum import Enum
from typing import Any, Dict, List, Tuple, Callable, Optional
from pathlib import Path

Expand Down Expand Up @@ -78,6 +79,8 @@
FORMAT_DOTNET,
FORMAT_FREEZE,
FORMAT_RESULT,
STATIC_FORMATS,
DYNAMIC_FORMATS,
)
from capa.features.address import NO_ADDRESS, Address
from capa.features.extractors.base_extractor import (
Expand Down Expand Up @@ -113,6 +116,15 @@
logger = logging.getLogger("capa")


class ExecutionContext(str, Enum):
STATIC = "static"
DYNAMIC = "dynamic"


STATIC_CONTEXT = ExecutionContext.STATIC
DYNAMIC_CONTEXT = ExecutionContext.DYNAMIC


@contextlib.contextmanager
def timing(msg: str):
t0 = time.time()
Expand Down Expand Up @@ -823,6 +835,7 @@ def get_rules(
rule_paths: List[RulePath],
cache_dir=None,
on_load_rule: Callable[[RulePath, int, int], None] = on_load_rule_default,
analysis_context: Optional[ExecutionContext] = None,
) -> RuleSet:
"""
args:
Expand Down Expand Up @@ -861,7 +874,14 @@ def get_rules(
rules.append(rule)
logger.debug("loaded rule: '%s' with scope: %s", rule.name, rule.scopes)

ruleset = capa.rules.RuleSet(rules)
# filter rules according to the execution context
if analysis_context is STATIC_CONTEXT:
ruleset = capa.rules.RuleSet(rules, rules_filter_func=lambda rule: rule.scopes.static)
elif analysis_context is DYNAMIC_CONTEXT:
ruleset = capa.rules.RuleSet(rules, rules_filter_func=lambda rule: rule.scopes.dynamic)
else:
# default: load all rules
ruleset = capa.rules.RuleSet(rules)

capa.rules.cache.cache_ruleset(cache_dir, ruleset)

Expand Down Expand Up @@ -1382,7 +1402,15 @@ def main(argv: Optional[List[str]] = None):
else:
cache_dir = capa.rules.cache.get_default_cache_directory()

rules = get_rules(args.rules, cache_dir=cache_dir)
if format_ in STATIC_FORMATS:
analysis_context = STATIC_CONTEXT
elif format_ in DYNAMIC_FORMATS:
analysis_context = DYNAMIC_CONTEXT
else:
# freeze or result formats
analysis_context = None

rules = get_rules(args.rules, cache_dir=cache_dir, analysis_context=analysis_context)

logger.debug(
"successfully loaded %s rules",
Expand Down
Loading
Loading