diff --git a/README.md b/README.md index 1367ea56..85d55f35 100644 --- a/README.md +++ b/README.md @@ -92,6 +92,24 @@ observable_attack_steps: # - phishing # ... + +# Optionally add false positive/negative rates to observations. +# +# False positive/negative `rate` is a number between 0.0 and 1.0. +# - A false positive rate of x for means that an inactive attack step +# will be observed as active at a rate of x in each observation +# - A false negative rate of x for means that an active attack step +# will be observed as inactive at a rate of x in each observation +# Default false positive/negative rate is 0, which is assumed if none are given. + +# Applies false positive rates per attack step (default 0) +false_positive_rates: + : + +# Applies false negative rates per attack step (default 0) +false_negative_rates: + : + ``` Note: When defining attackers and entrypoints in a scenario, these override potential attackers in the model. diff --git a/malsim/agents/searchers.py b/malsim/agents/searchers.py index 6b1f3a42..9265cb03 100644 --- a/malsim/agents/searchers.py +++ b/malsim/agents/searchers.py @@ -25,11 +25,7 @@ class BreadthFirstAttacker: def __init__(self, agent_config: dict) -> None: self.targets: Deque[int] = deque([]) self.current_target: int = None - seed = ( - agent_config["seed"] - if agent_config.get("seed", None) - else np.random.SeedSequence().entropy - ) + seed = agent_config.get("seed", np.random.SeedSequence().entropy) self.rng = ( np.random.default_rng(seed) if agent_config.get("randomize", False) @@ -85,11 +81,7 @@ class DepthFirstAttacker: def __init__(self, agent_config: dict) -> None: self.current_target = -1 self.targets: List[int] = [] - seed = ( - agent_config["seed"] - if agent_config.get("seed", None) - else np.random.SeedSequence().entropy - ) + seed = agent_config.get("seed", np.random.SeedSequence().entropy) self.rng = ( np.random.default_rng(seed) if agent_config.get("randomize", False) diff --git a/malsim/scenario.py b/malsim/scenario.py index e7009186..57258c7f 100644 --- a/malsim/scenario.py +++ b/malsim/scenario.py @@ -41,7 +41,9 @@ allowed_fields = required_fields + [ 'rewards', 'attacker_entry_points', - 'observable_attack_steps' + 'observable_attack_steps', + 'false_positive_rates', + 'false_negative_rates', ] @@ -182,6 +184,28 @@ def apply_scenario_observability_rules( step.extras['observable'] = 0 +def apply_scenario_false_positive_and_negative_rates( + attack_graph: AttackGraph, scenario_conf: dict +): + """Apply false positive/negative rates to all nodes in the + AttackGraph either to the default value, from the base rate + or from the specifically set rates per attack step""" + + # Apply false positive rates to specified attack nodes + fp_rates_per_attackstep = scenario_conf.get('false_positive_rates') + if fp_rates_per_attackstep: + for step_full_name, rate in fp_rates_per_attackstep.items(): + step = attack_graph.get_node_by_full_name(step_full_name) + step.extras['false_positive_rate'] = rate + + # Apply false negative rates to specified attack nodes + fn_rates_per_attackstep = scenario_conf.get('false_negative_rates') + if fn_rates_per_attackstep: + for step_full_name, rate in fn_rates_per_attackstep.items(): + step = attack_graph.get_node_by_full_name(step_full_name) + step.extras['false_negative_rate'] = rate + + def apply_scenario_attacker_entrypoints( attack_graph: AttackGraph, entry_points: dict ) -> None: @@ -280,6 +304,9 @@ def apply_scenario_to_attack_graph( observability_settings = scenario.get('observable_attack_steps') apply_scenario_observability_rules(attack_graph, observability_settings) + # Apply false positive and negative rates to attack graph + apply_scenario_false_positive_and_negative_rates(attack_graph, scenario) + def load_scenario(scenario_file: str) -> tuple[AttackGraph, dict]: """Load a scenario from a scenario file to an AttackGraph""" diff --git a/malsim/sims/mal_simulator.py b/malsim/sims/mal_simulator.py index dae8c82b..af634a7f 100644 --- a/malsim/sims/mal_simulator.py +++ b/malsim/sims/mal_simulator.py @@ -5,8 +5,8 @@ import logging import functools from typing import Optional, TYPE_CHECKING -import numpy as np +import numpy as np from gymnasium.spaces import MultiDiscrete, Box, Dict from pettingzoo import ParallelEnv @@ -87,6 +87,7 @@ def __init__( self.attack_graph = attack_graph self.sim_settings = sim_settings self.max_iter = max_iter + self.rng = np.random.default_rng(kwargs.get('seed')) self.attack_graph_backup = copy.deepcopy(self.attack_graph) @@ -416,6 +417,7 @@ def reset( ): logger.info("Resetting simulator.") self.attack_graph = copy.deepcopy(self.attack_graph_backup) + self.rng = np.random.default_rng(seed) return self.initialize(self.max_iter) def log_mapping_tables(self): @@ -671,6 +673,11 @@ def initialize(self, max_iter=ITERATIONS_LIMIT): # Initialize agents and record the entry point actions initial_actions = self._initialize_agents() + self._possible_false_positive_nodes = [ + n for n in self.attack_graph.nodes + if n.extras.get('false_positive_rate') + ] + observations, _, _, _, infos = ( self._observe_and_reward(initial_actions, [])) @@ -920,20 +927,42 @@ def _observe_defender( defender_agent, performed_actions: dict[str, list[int]] ): + """Update the observed_state of defender agent + based on performed_actions + """ obs_state = self.agents_dict[defender_agent]["observation"]\ ["observed_state"] if not self.sim_settings.cumulative_defender_obs: - # Clear the state if we do not it to accumulate observations over - # time. + # Clear the state if we do not want to accumulate + # observations between steps. obs_state.fill(0) - # Only show the latest steps taken + # Enable latest activated steps in obs state for _, actions in performed_actions.items(): for action in actions: obs_state[action] = 1 + # Possibly add false negatives to observation + for _, actions in performed_actions.items(): + for action in actions: + node = self.attack_graph.get_node_by_id( + self._index_to_id[action]) + if node.type in ('or', 'and'): + # Enabled attack steps can become false negatives + fn_rate = node.extras.get('false_negative_rate', 0) + make_fn = fn_rate and self.rng.random() < fn_rate + obs_state[action] = 0 if make_fn else 1 + + # Possibly add false positives to observation + for node in self._possible_false_positive_nodes: + if node.type in ('or', 'and') and not node.is_compromised(): + step_index = self._id_to_index[node.id] + fp_rate = node.extras.get('false_positive_rate', 0) + make_fp = fp_rate and self.rng.random() < fp_rate + obs_state[step_index] = 1 if make_fp else 0 + def _observe_agents(self, performed_actions): """Collect agents observations""" @@ -975,7 +1004,6 @@ def _reward_agents(self, performed_actions): # If a defender performed step, it will be penalized self.agents_dict[agent]["rewards"] -= node_reward - def _collect_agents_infos(self): """Collect agent info, this is used to determine the possible actions in the next iteration step. Then fill in all of the""" diff --git a/tests/test_agents.py b/tests/test_agents.py new file mode 100644 index 00000000..16c3c68a --- /dev/null +++ b/tests/test_agents.py @@ -0,0 +1,97 @@ +"""Test MalSimulator agents""" + +import pytest + +from malsim.sims.mal_simulator import MalSimulator +from malsim.scenario import load_scenario +from malsim.agents.searchers import BreadthFirstAttacker, DepthFirstAttacker + + +def run_simulation_n_steps_assert_attacker_actions( + sim, attacker_agent, seed, expected_actions, n_steps=10): + """Run simulation n steps with attacker agent taking actions""" + + attacker_agent_id = next(iter(sim.get_attacker_agents())) + defender_agent_id = next(iter(sim.get_defender_agents())) + + obs, infos = sim.reset(seed=seed) + chosen_actions = [] + for _ in range(n_steps): + attacker_action = attacker_agent.compute_action_from_dict( + obs[attacker_agent_id], + infos[attacker_agent_id]['action_mask'] + ) + chosen_actions.append(attacker_action) + actions = { + attacker_agent_id: attacker_action, + defender_agent_id: (0, None)} + + obs, _, _, _, infos = sim.step(actions) + + for i, action in enumerate(chosen_actions): + assert action == expected_actions[i] + + +def test_bfs_attacker_actions_seed(): + """Make sure MalSimulator bfs agent with seed is deterministic""" + attack_graph, _ = load_scenario( + 'tests/testdata/scenarios/simple_scenario.yml') + + sim = MalSimulator( + attack_graph.lang_graph, attack_graph.model, attack_graph) + + attacker_agent_id = "attacker" + defender_agent_id = "defender" + sim.register_attacker(attacker_agent_id, 0) + sim.register_defender(defender_agent_id) + + seed = 1337 + # Seed should yield these actions every time + expected_actions_seed_1337 = [ + (1, 329), (1, 353), (1, 330), (1, 354), (1, 355), + (1, 356), (1, 357), (1, 331), (1, 358), (1, 283) + ] + for _ in range(10): + bfs_attacker = BreadthFirstAttacker({'randomize': True, 'seed': seed}) + run_simulation_n_steps_assert_attacker_actions( + sim, bfs_attacker, seed, expected_actions_seed_1337) + + # Different seed gives different actions + seed = 1857 + bfs_attacker = BreadthFirstAttacker({'randomize': True, 'seed': seed}) + with pytest.raises(AssertionError): + run_simulation_n_steps_assert_attacker_actions( + sim, bfs_attacker, seed, expected_actions_seed_1337) + + +def test_dfs_attacker_actions_seed(): + """Make sure MalSimulator dfs agent with seed is deterministic""" + + attack_graph, _ = load_scenario( + 'tests/testdata/scenarios/simple_scenario.yml') + + sim = MalSimulator( + attack_graph.lang_graph, attack_graph.model, attack_graph) + + attacker_agent_id = "attacker" + defender_agent_id = "defender" + sim.register_attacker(attacker_agent_id, 0) + sim.register_defender(defender_agent_id) + + seed = 1337 + # Seed should yield these actions every time + expected_actions_seed_1337 = [ + (1, 353), (1, 354), (1, 355), (1, 356), (1, 357), + (1, 358), (1, 460), (1, 461), (1, 400), (1, 401) + ] + for _ in range(10): + bfs_attacker = DepthFirstAttacker({'randomize': True, 'seed': seed}) + run_simulation_n_steps_assert_attacker_actions( + sim, bfs_attacker, seed, expected_actions_seed_1337) + + # Different seed gives different actions + seed = 1697 + bfs_attacker = DepthFirstAttacker({'randomize': True, 'seed': seed}) + with pytest.raises(AssertionError): + run_simulation_n_steps_assert_attacker_actions( + sim, bfs_attacker, seed, expected_actions_seed_1337) diff --git a/tests/test_mal_simulator.py b/tests/test_mal_simulator.py index 0815c942..05a5622c 100644 --- a/tests/test_mal_simulator.py +++ b/tests/test_mal_simulator.py @@ -1,10 +1,12 @@ """Test MalSimulator class""" +import pytest + from maltoolbox.attackgraph import AttackGraph, Attacker from malsim.sims.mal_simulator import MalSimulator from malsim.scenario import load_scenario, create_simulator_from_scenario from malsim.sims import MalSimulatorSettings - +from malsim.agents.searchers import BreadthFirstAttacker, DepthFirstAttacker def test_malsimulator(corelang_lang_graph, model): attack_graph = AttackGraph(corelang_lang_graph, model) @@ -956,6 +958,255 @@ def test_simulator_default_settings_defender_observation(): assert not node.is_compromised() and not node.is_enabled_defense() +def test_default_settings_defender_observation_false_negatives(): + """Test default MalSimulator with false negative rates""" + + sim, _ = create_simulator_from_scenario( + 'tests/testdata/scenarios/traininglang_fp_fn_scenario.yml' + ) + sim.reset() + + attacker = sim.attack_graph.attackers[0] + attacker_agent_id = next(iter(sim.get_attacker_agents())) + defender_agent_id = next(iter(sim.get_defender_agents())) + + # Get an uncompromised step + user_3_compromise = sim.attack_graph.get_node_by_full_name( + 'User:3:compromise') + assert user_3_compromise.extras.get('false_negative_rate') == 1.0 + assert attacker not in user_3_compromise.compromised_by + + # Let the attacker compromise User:3:compromise + actions = { + attacker_agent_id: (1, sim._id_to_index[user_3_compromise.id]), + defender_agent_id: (0, None) + } + + obs, _, _, _, _ = sim.step(actions) + defender_obs_state = obs[defender_agent_id]['observed_state'] + + # Verify that all states in obs match the state of the attack graph + node_index = sim._id_to_index[user_3_compromise.id] + # user_3_compromise is set to disabled even though compromised + # since it is false negative 100% of times + assert not defender_obs_state[node_index] + + +def test_default_settings_defender_observation_false_negatives_seed(): + """Test default MalSimulator with false negative rates""" + + def run_simulation_with_seed_assert_obs( + seed, + expected_attacker_obs, + expected_defender_obs + ): + sim.reset(seed=seed) + attacker_agent_id = next(iter(sim.get_attacker_agents())) + defender_agent_id = next(iter(sim.get_defender_agents())) + + # Get an uncompromised step + user_3_compromise = sim.attack_graph.get_node_by_full_name( + 'User:3:compromise') + + # Let the attacker compromise User:3:compromise + actions = { + attacker_agent_id: (1, sim._id_to_index[user_3_compromise.id]), + defender_agent_id: (0, None)} + + obs, _, _, _, _ = sim.step(actions) + + actual_attacker_obs_state = obs[attacker_agent_id]['observed_state'] + for index, state in enumerate(actual_attacker_obs_state): + assert state == expected_attacker_obs[index] + + actual_defender_obs_state = obs[defender_agent_id]['observed_state'] + for index, state in enumerate(actual_defender_obs_state): + assert state == expected_defender_obs[index] + + sim, _ = create_simulator_from_scenario( + 'tests/testdata/scenarios/traininglang_fp_fn_scenario.yml', + ) + + expected_attacker_obs_100 = \ + [-1, 0, 1, 0, -1, -1, -1, -1, -1, -1, -1, -1, 1, 1, -1] + expected_defender_obs_100 = \ + [0, 0, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0] + seed = 100 + + # Make sure the seed makes the obs state deterministic + # in terms of false positives + for _ in range(100): + # running it many times with same seed + # yields same observations + run_simulation_with_seed_assert_obs( + seed, + expected_attacker_obs_100, + expected_defender_obs_100 + ) + + with pytest.raises(AssertionError): + # running it once with different seed + # yields assertion error (different observations) + run_simulation_with_seed_assert_obs( + 1337, + expected_attacker_obs_100, + expected_defender_obs_100 + ) + + +def run_simulation_n_steps_with_seed_assert_obs( + sim, + seed, + attacker_agent_class, + expected_attacker_obs, + expected_defender_obs, + n_steps = 5 + ): + """Helper function""" + + obs, infos = sim.reset(seed=seed) + + attacker_agent_id = next(iter(sim.get_attacker_agents())) + defender_agent_id = next(iter(sim.get_defender_agents())) + + attacker_agent = attacker_agent_class( + {'randomize': True, 'seed': seed} + ) + + for _ in range(n_steps): + # Get an uncompromised step + attacker_action = attacker_agent.compute_action_from_dict( + obs[attacker_agent_id], + infos[attacker_agent_id]['action_mask'] + ) + + # Let the attacker compromise whatever they want + actions = { + attacker_agent_id: attacker_action, + defender_agent_id: (0, None)} + + obs, _, _, _, infos = sim.step(actions) + + actual_attacker_obs_state = obs[attacker_agent_id]['observed_state'] + for index, state in enumerate(actual_attacker_obs_state): + assert state == expected_attacker_obs[index] + + actual_defender_obs_state = obs[defender_agent_id]['observed_state'] + for index, state in enumerate(actual_defender_obs_state): + assert state == expected_defender_obs[index] + + +def test_sim_dfs_agent_seed_deterministic(): + """Make sure observations are deterministic when giving seed + to both dfs agent and simulator and false positives are set""" + + sim, _ = create_simulator_from_scenario( + 'tests/testdata/scenarios/traininglang_fp_fn_scenario.yml') + + expected_dfs_attacker_obs_100 = \ + [-1, 1, 1, 1, -1, -1, -1, -1, -1, 1, 1, -1, 1, 1, 0] + expected_defender_obs_100 = \ + [0, 1, 1, 1, 0, 0, 0, 0, 0, 1, 1, 0, 0, 1, 0] + seed = 100 + + # Make sure the seed makes the obs state deterministic + for _ in range(10): + run_simulation_n_steps_with_seed_assert_obs( + sim, + seed, + DepthFirstAttacker, + expected_dfs_attacker_obs_100, + expected_defender_obs_100 + ) + + seed = 1337 + with pytest.raises(AssertionError): + # running it once with different seed + # yields assertion error (different observations) + run_simulation_n_steps_with_seed_assert_obs( + sim, + seed, + DepthFirstAttacker, + expected_dfs_attacker_obs_100, + expected_defender_obs_100 + ) + + +def test_sim_bfs_agent_seed_deterministic(): + """Make sure observations are deterministic when giving seed + to both bfs agent and simulator and false positives are set""" + + sim, _ = create_simulator_from_scenario( + 'tests/testdata/scenarios/traininglang_fp_fn_scenario.yml') + + expected_bfs_attacker_obs_100 = \ + [-1, 1, 1, 1, -1, -1, 0, -1, -1, 0, 1, -1, 1, 1, 1] + expected_defender_obs_100 = \ + [0, 1, 1, 1, 0, 0, 0, 0, 0, 0, 1, 0, 0, 1, 1] + seed = 100 + + # Make sure the seed makes the obs state deterministic + for _ in range(10): + run_simulation_n_steps_with_seed_assert_obs( + sim, + seed, + BreadthFirstAttacker, + expected_bfs_attacker_obs_100, + expected_defender_obs_100 + ) + + seed = 1337 + with pytest.raises(AssertionError): + # running it once with different seed + # yields assertion error (different observations) + run_simulation_n_steps_with_seed_assert_obs( + sim, + seed, + BreadthFirstAttacker, + expected_bfs_attacker_obs_100, + expected_defender_obs_100 + ) + + +def test_defender_observation_false_positives_negatives(): + """Test default MalSimulator with false negative and positive rates""" + + sim, _ = create_simulator_from_scenario( + 'tests/testdata/scenarios/traininglang_fp_fn_scenario.yml' + ) + sim.reset() + + attacker = sim.attack_graph.attackers[0] + attacker_agent_id = next(iter(sim.get_attacker_agents())) + defender_agent_id = next(iter(sim.get_defender_agents())) + + # Get an uncompromised step + user_3_compromise = sim.attack_graph.get_node_by_full_name( + 'User:3:compromise') + assert attacker not in user_3_compromise.compromised_by + + # Let the attacker compromise User:3:compromise + actions = { + attacker_agent_id: (1, sim._id_to_index[user_3_compromise.id]), + defender_agent_id: (0, None) + } + obs, _, _, _, _ = sim.step(actions) + defender_obs_state = obs[defender_agent_id]['observed_state'] + + host_1_access = sim.attack_graph.get_node_by_full_name('Host:1:access') + assert host_1_access.extras['false_positive_rate'] == 0.3 + + for i in range(100): + obs, _, _, _, _ = sim.step(actions) + host_1_access_index = sim._id_to_index[host_1_access.id] + if defender_obs_state[host_1_access_index]: + break + + if i == 99: + assert False, "False positive never happened" + + assert True, "False positive happened" + def test_simulator_settings_defender_observation(): """Test MalSimulatorSettings only show last steps in obs""" diff --git a/tests/test_scenario.py b/tests/test_scenario.py index e919b834..ef20c0ad 100644 --- a/tests/test_scenario.py +++ b/tests/test_scenario.py @@ -261,3 +261,36 @@ def test_apply_scenario_observability_faulty(): attack_graph, {'by_asset_name': {'OS App': ['nonExistingAttackStep']}} ) + + +def test_load_scenario_false_positive_negative_rate(): + """Load a scenario with observability settings given and + make sure observability is applied correctly""" + + # Load scenario with observability specifed + attack_graph, _ = load_scenario( + path_relative_to_tests( + './testdata/scenarios/traininglang_fp_fn_scenario.yml') + ) + + # Defined in scenario file + host_0_access_fp_rate = 0.2 + host_1_access_fp_rate = 0.3 + host_0_access_fn_rate = 0.4 + host_1_access_fn_rate = 0.5 + user_3_compromise_fn_rate = 1.0 + + for node in attack_graph.nodes: + if node.full_name == "Host:0:access": + assert node.extras['false_positive_rate'] == host_0_access_fp_rate + assert node.extras['false_negative_rate'] == host_0_access_fn_rate + elif node.full_name == "Host:1:access": + assert node.extras['false_positive_rate'] == host_1_access_fp_rate + assert node.extras['false_negative_rate'] == host_1_access_fn_rate + elif node.full_name == "User:3:compromise": + assert 'false_positive_rate' not in node.extras + assert node.extras['false_negative_rate'] \ + == user_3_compromise_fn_rate + else: + assert 'false_positive_rate' not in node.extras + assert 'false_negative_rate' not in node.extras diff --git a/tests/testdata/scenarios/traininglang_fp_fn_scenario.yml b/tests/testdata/scenarios/traininglang_fp_fn_scenario.yml new file mode 100644 index 00000000..fb83ca34 --- /dev/null +++ b/tests/testdata/scenarios/traininglang_fp_fn_scenario.yml @@ -0,0 +1,31 @@ +lang_file: ../langs/org.mal-lang.trainingLang-1.0.0.mar +model_file: ../models/traininglang_model.yml + +# Rewards for each attack step +rewards: + Host:0:notPresent: 2 + Host:0:access: 4 + Host:1:notPresent: 7 + Host:1:access: 5 + Data:2:notPresent: 8 + Data:2:read: 5 + Data:2:modify: 10 + +# Create an attacker with entrypoints +attacker_entry_points: + 'Attacker1': + - 'User:3:phishing' + - 'Host:0:connect' + +attacker_agent_class: 'BreadthFirstAttacker' +defender_agent_class: 'KeyboardAgent' + +# Specific fpr/fnr for some specific steps +false_positive_rates: + 'Host:0:access': 0.2 + 'Host:1:access': 0.3 + +false_negative_rates: + 'Host:0:access': 0.4 + 'Host:1:access': 0.5 + 'User:3:compromise': 1.0 # Always false negative \ No newline at end of file