From 40d3a9f285e13801693f0ce9e03ff48791f5b0da Mon Sep 17 00:00:00 2001 From: Joakim Loxdal Date: Tue, 1 Oct 2024 14:19:25 +0200 Subject: [PATCH 01/13] Implement false positive/negative rates in scenario - Make it possible to set fp/fn rates in a scenario config file - Add rate to nodes .extras field --- malsim/scenario.py | 38 +++++++++++++++++++++++++++++++++++- malsim/sims/mal_simulator.py | 4 ++-- 2 files changed, 39 insertions(+), 3 deletions(-) diff --git a/malsim/scenario.py b/malsim/scenario.py index e7009186..712285c0 100644 --- a/malsim/scenario.py +++ b/malsim/scenario.py @@ -41,7 +41,11 @@ allowed_fields = required_fields + [ 'rewards', 'attacker_entry_points', - 'observable_attack_steps' + 'observable_attack_steps', + 'false_positive_base_rate', + 'false_positive_rates', + 'false_negative_base_rate', + 'false_negative_rates', ] @@ -182,6 +186,35 @@ def apply_scenario_observability_rules( step.extras['observable'] = 0 +def apply_scenario_false_positive_and_negative_rates( + attack_graph: AttackGraph, scenario_conf: dict +): + """Apply false positive/negative rates to all nodes in the + AttackGraph either to the default value, from the base rate + or from the specifically set rates per attack step""" + + # Apply false positive/negative base rates for all nodes (default 0) + fp_baseline = scenario_conf.get('false_positive_base_rate', 0) + fn_baseline = scenario_conf.get('false_negative_base_rate', 0) + for node in attack_graph.nodes: + node.extras['false_positive_rate'] = fp_baseline + node.extras['false_negative_rate'] = fn_baseline + + # Apply false positive rates to specified attack nodes + fp_rates_per_attackstep = scenario_conf.get('false_positive_rates') + if fp_rates_per_attackstep: + for step_full_name, rate in fp_rates_per_attackstep.items(): + step = attack_graph.get_node_by_full_name(step_full_name) + step.extras['false_positive_rate'] = rate + + # Apply false negative rates to specified attack nodes + fn_rates_per_attackstep = scenario_conf.get('false_negative_rates') + if fn_rates_per_attackstep: + for step_full_name, rate in fn_rates_per_attackstep.items(): + step = attack_graph.get_node_by_full_name(step_full_name) + step.extras['false_negative_rate'] = rate + + def apply_scenario_attacker_entrypoints( attack_graph: AttackGraph, entry_points: dict ) -> None: @@ -280,6 +313,9 @@ def apply_scenario_to_attack_graph( observability_settings = scenario.get('observable_attack_steps') apply_scenario_observability_rules(attack_graph, observability_settings) + # Apply false positive and negative rates to attack graph + apply_scenario_false_positive_and_negative_rates(attack_graph, scenario) + def load_scenario(scenario_file: str) -> tuple[AttackGraph, dict]: """Load a scenario from a scenario file to an AttackGraph""" diff --git a/malsim/sims/mal_simulator.py b/malsim/sims/mal_simulator.py index dae8c82b..1fb667e8 100644 --- a/malsim/sims/mal_simulator.py +++ b/malsim/sims/mal_simulator.py @@ -5,8 +5,9 @@ import logging import functools from typing import Optional, TYPE_CHECKING -import numpy as np +from random import random +import numpy as np from gymnasium.spaces import MultiDiscrete, Box, Dict from pettingzoo import ParallelEnv @@ -975,7 +976,6 @@ def _reward_agents(self, performed_actions): # If a defender performed step, it will be penalized self.agents_dict[agent]["rewards"] -= node_reward - def _collect_agents_infos(self): """Collect agent info, this is used to determine the possible actions in the next iteration step. Then fill in all of the""" From 6bd9d0d1ea5541e37b9454533481645d6461b7a0 Mon Sep 17 00:00:00 2001 From: Joakim Loxdal Date: Tue, 1 Oct 2024 14:26:35 +0200 Subject: [PATCH 02/13] Add fp/np rates info to the README --- README.md | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/README.md b/README.md index 1367ea56..1cef4870 100644 --- a/README.md +++ b/README.md @@ -92,6 +92,28 @@ observable_attack_steps: # - phishing # ... + +# Optionally add false positive/negative rates to observations. +# +# False positive/negative `rate` is a number between 0.0 and 1.0. +# - A false positive rate of x for means that an inactive attack step +# will be observed as active at a rate of x in each observation +# - A false negative rate of x for means that an active attack step +# will be observed as inactive at a rate of x in each observation +# Default false positive/negative rate is 0, which is assumed if none are given. + +# A baseline that applies to all attack steps (default 0) +false_positive_base_rate: +false_negative_base_rate: + +# Applies false positive rates per attack step (default 0) +false_positive_rates: + : + +# Applies false negative rates per attack step (default 0) +false_negative_rates: + : + ``` Note: When defining attackers and entrypoints in a scenario, these override potential attackers in the model. From a744bafae43f43bd0f168a27a974edcff383d642 Mon Sep 17 00:00:00 2001 From: Joakim Loxdal Date: Tue, 1 Oct 2024 14:26:58 +0200 Subject: [PATCH 03/13] Add test for applying false positive/negative rates in scenario file --- tests/test_scenario.py | 28 ++++++++++++++++ .../scenarios/traininglang_fp_fn_scenario.yml | 32 +++++++++++++++++++ 2 files changed, 60 insertions(+) create mode 100644 tests/testdata/scenarios/traininglang_fp_fn_scenario.yml diff --git a/tests/test_scenario.py b/tests/test_scenario.py index e919b834..117d5f0d 100644 --- a/tests/test_scenario.py +++ b/tests/test_scenario.py @@ -261,3 +261,31 @@ def test_apply_scenario_observability_faulty(): attack_graph, {'by_asset_name': {'OS App': ['nonExistingAttackStep']}} ) + + +def test_load_scenario_false_positive_negative_rate(): + """Load a scenario with observability settings given and + make sure observability is applied correctly""" + + # Load scenario with observability specifed + attack_graph, _ = load_scenario( + path_relative_to_tests( + './testdata/scenarios/traininglang_fp_fn_scenario.yml') + ) + + # Defined in scenario file + fp_base_rate = 0.1 + fn_base_rate = 0.1 + host_0_access_fp_rate = 0.2 + host_1_access_fn_rate = 0.3 + + for node in attack_graph.nodes: + if node.full_name == "Host:0:access": + assert node.extras['false_positive_rate'] == host_0_access_fp_rate + assert node.extras['false_negative_rate'] == fn_base_rate + elif node.full_name == "Host:1:access": + assert node.extras['false_positive_rate'] == fp_base_rate + assert node.extras['false_negative_rate'] == host_1_access_fn_rate + else: + assert node.extras['false_positive_rate'] == fp_base_rate + assert node.extras['false_negative_rate'] == fn_base_rate diff --git a/tests/testdata/scenarios/traininglang_fp_fn_scenario.yml b/tests/testdata/scenarios/traininglang_fp_fn_scenario.yml new file mode 100644 index 00000000..d2faaaab --- /dev/null +++ b/tests/testdata/scenarios/traininglang_fp_fn_scenario.yml @@ -0,0 +1,32 @@ +lang_file: ../langs/org.mal-lang.trainingLang-1.0.0.mar +model_file: ../models/traininglang_model.yml + +# Rewards for each attack step +rewards: + Host:0:notPresent: 2 + Host:0:access: 4 + Host:1:notPresent: 7 + Host:1:access: 5 + Data:2:notPresent: 8 + Data:2:read: 5 + Data:2:modify: 10 + +# Create an attacker with entrypoints +attacker_entry_points: + 'Attacker1': + - 'User:3:phishing' + - 'Host:0:connect' + +attacker_agent_class: 'BreadthFirstAttacker' +defender_agent_class: 'KeyboardAgent' + +# Baseline +false_positive_base_rate: 0.1 +false_negative_base_rate: 0.1 + +# Specific fpr/fnr for some steps +false_positive_rates: + 'Host:0:access': 0.2 + +false_negative_rates: + 'Host:1:access': 0.3 \ No newline at end of file From 151f4fbe72124c49e93795366dd269aed6916d38 Mon Sep 17 00:00:00 2001 From: Joakim Loxdal Date: Wed, 2 Oct 2024 14:30:35 +0200 Subject: [PATCH 04/13] Add testdata --- tests/test_scenario.py | 8 +++++--- tests/testdata/scenarios/traininglang_fp_fn_scenario.yml | 8 +++++--- 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/tests/test_scenario.py b/tests/test_scenario.py index 117d5f0d..7705a2ac 100644 --- a/tests/test_scenario.py +++ b/tests/test_scenario.py @@ -277,14 +277,16 @@ def test_load_scenario_false_positive_negative_rate(): fp_base_rate = 0.1 fn_base_rate = 0.1 host_0_access_fp_rate = 0.2 - host_1_access_fn_rate = 0.3 + host_1_access_fp_rate = 0.3 + host_0_access_fn_rate = 0.4 + host_1_access_fn_rate = 0.5 for node in attack_graph.nodes: if node.full_name == "Host:0:access": assert node.extras['false_positive_rate'] == host_0_access_fp_rate - assert node.extras['false_negative_rate'] == fn_base_rate + assert node.extras['false_negative_rate'] == host_0_access_fn_rate elif node.full_name == "Host:1:access": - assert node.extras['false_positive_rate'] == fp_base_rate + assert node.extras['false_positive_rate'] == host_1_access_fp_rate assert node.extras['false_negative_rate'] == host_1_access_fn_rate else: assert node.extras['false_positive_rate'] == fp_base_rate diff --git a/tests/testdata/scenarios/traininglang_fp_fn_scenario.yml b/tests/testdata/scenarios/traininglang_fp_fn_scenario.yml index d2faaaab..ead0c763 100644 --- a/tests/testdata/scenarios/traininglang_fp_fn_scenario.yml +++ b/tests/testdata/scenarios/traininglang_fp_fn_scenario.yml @@ -20,13 +20,15 @@ attacker_entry_points: attacker_agent_class: 'BreadthFirstAttacker' defender_agent_class: 'KeyboardAgent' -# Baseline +# Baseline applies to all attack steps false_positive_base_rate: 0.1 false_negative_base_rate: 0.1 -# Specific fpr/fnr for some steps +# Specific fpr/fnr for some specific steps false_positive_rates: 'Host:0:access': 0.2 + 'Host:1:access': 0.3 false_negative_rates: - 'Host:1:access': 0.3 \ No newline at end of file + 'Host:0:access': 0.4 + 'Host:1:access': 0.5 \ No newline at end of file From 301db4c179e451bac033902de8b4971d2b026eb1 Mon Sep 17 00:00:00 2001 From: Joakim Loxdal Date: Mon, 14 Oct 2024 15:41:01 +0200 Subject: [PATCH 05/13] Apply false negatives in MalSimulator defense observation --- malsim/sims/mal_simulator.py | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/malsim/sims/mal_simulator.py b/malsim/sims/mal_simulator.py index 1fb667e8..8e3e38a1 100644 --- a/malsim/sims/mal_simulator.py +++ b/malsim/sims/mal_simulator.py @@ -5,7 +5,6 @@ import logging import functools from typing import Optional, TYPE_CHECKING -from random import random import numpy as np from gymnasium.spaces import MultiDiscrete, Box, Dict @@ -88,6 +87,7 @@ def __init__( self.attack_graph = attack_graph self.sim_settings = sim_settings self.max_iter = max_iter + self.rng = np.random.default_rng(kwargs.get('seed')) self.attack_graph_backup = copy.deepcopy(self.attack_graph) @@ -921,6 +921,9 @@ def _observe_defender( defender_agent, performed_actions: dict[str, list[int]] ): + """Update the observed_state of defender agent + based on performed_actions + """ obs_state = self.agents_dict[defender_agent]["observation"]\ ["observed_state"] @@ -933,7 +936,17 @@ def _observe_defender( # Only show the latest steps taken for _, actions in performed_actions.items(): for action in actions: - obs_state[action] = 1 + node_id = self._index_to_id[action] + node = self.attack_graph.get_node_by_id(node_id) + + if node.is_enabled_defense(): + # Defenses are never false negatives + obs_state[action] = 1 + else: + # Attacks can become false negatives + fn_rate = node.extras.get('false_negative_rate', 0) + make_fn = fn_rate and self.rng.random() < fn_rate + obs_state[action ] = 0 if make_fn else 1 def _observe_agents(self, performed_actions): """Collect agents observations""" From a8df8e5b1782d62ff942445980cd80fe0f584522 Mon Sep 17 00:00:00 2001 From: Joakim Loxdal Date: Mon, 14 Oct 2024 15:41:35 +0200 Subject: [PATCH 06/13] Test that 100% false negative rate is applied --- tests/test_mal_simulator.py | 33 +++++++++++++++++++ .../scenarios/traininglang_fp_fn_scenario.yml | 3 +- 2 files changed, 35 insertions(+), 1 deletion(-) diff --git a/tests/test_mal_simulator.py b/tests/test_mal_simulator.py index 0815c942..59f4ef13 100644 --- a/tests/test_mal_simulator.py +++ b/tests/test_mal_simulator.py @@ -956,6 +956,39 @@ def test_simulator_default_settings_defender_observation(): assert not node.is_compromised() and not node.is_enabled_defense() +def test_default_settings_defender_observation_false_negatives(): + """Test default MalSimulator with false negative rates""" + + sim, _ = create_simulator_from_scenario( + 'tests/testdata/scenarios/traininglang_fp_fn_scenario.yml' + ) + sim.reset() + + attacker = sim.attack_graph.attackers[0] + attacker_agent_id = next(iter(sim.get_attacker_agents())) + defender_agent_id = next(iter(sim.get_defender_agents())) + + # Get an uncompromised step + user_3_compromise = sim.attack_graph.get_node_by_full_name( + 'User:3:compromise') + assert attacker not in user_3_compromise.compromised_by + + # Let the attacker compromise User:3:compromise + actions = { + attacker_agent_id: (1, sim._id_to_index[user_3_compromise.id]), + defender_agent_id: (0, None) + } + + obs, _, _, _, _ = sim.step(actions) + defender_obs_state = obs[defender_agent_id]['observed_state'] + + # Verify that all states in obs match the state of the attack graph + node_index = sim._id_to_index[user_3_compromise.id] + # user_3_compromise is set to disabled even though compromised + # since it is false negative 100% of times + assert not defender_obs_state[node_index] + + def test_simulator_settings_defender_observation(): """Test MalSimulatorSettings only show last steps in obs""" diff --git a/tests/testdata/scenarios/traininglang_fp_fn_scenario.yml b/tests/testdata/scenarios/traininglang_fp_fn_scenario.yml index ead0c763..69e24041 100644 --- a/tests/testdata/scenarios/traininglang_fp_fn_scenario.yml +++ b/tests/testdata/scenarios/traininglang_fp_fn_scenario.yml @@ -31,4 +31,5 @@ false_positive_rates: false_negative_rates: 'Host:0:access': 0.4 - 'Host:1:access': 0.5 \ No newline at end of file + 'Host:1:access': 0.5 + 'User:3:compromise': 1.0 # Always false negative \ No newline at end of file From e02b160ea473d003671bec25c6ce41a8b2835cbe Mon Sep 17 00:00:00 2001 From: Joakim Loxdal Date: Mon, 14 Oct 2024 16:57:08 +0200 Subject: [PATCH 07/13] Split up the default defender obs and the false pos/neg logic, Add support for false positives --- malsim/sims/mal_simulator.py | 36 +++++++++++++++++++++++++----------- 1 file changed, 25 insertions(+), 11 deletions(-) diff --git a/malsim/sims/mal_simulator.py b/malsim/sims/mal_simulator.py index 8e3e38a1..13b90a3d 100644 --- a/malsim/sims/mal_simulator.py +++ b/malsim/sims/mal_simulator.py @@ -672,6 +672,11 @@ def initialize(self, max_iter=ITERATIONS_LIMIT): # Initialize agents and record the entry point actions initial_actions = self._initialize_agents() + self._possible_false_positive_nodes = [ + n for n in self.attack_graph.nodes + if n.extras.get('false_positive_rate') + ] + observations, _, _, _, infos = ( self._observe_and_reward(initial_actions, [])) @@ -929,24 +934,33 @@ def _observe_defender( ["observed_state"] if not self.sim_settings.cumulative_defender_obs: - # Clear the state if we do not it to accumulate observations over - # time. + # Clear the state if we do not want to accumulate + # observations between steps. obs_state.fill(0) - # Only show the latest steps taken + # Enable latest activated steps in obs state for _, actions in performed_actions.items(): for action in actions: - node_id = self._index_to_id[action] - node = self.attack_graph.get_node_by_id(node_id) + obs_state[action] = 1 - if node.is_enabled_defense(): - # Defenses are never false negatives - obs_state[action] = 1 - else: - # Attacks can become false negatives + # Possibly add false negatives to observation + for _, actions in performed_actions.items(): + for action in actions: + node = self.attack_graph.get_node_by_id( + self._index_to_id[action]) + if node.type in ('or', 'and'): + # Enabled attack steps can become false negatives fn_rate = node.extras.get('false_negative_rate', 0) make_fn = fn_rate and self.rng.random() < fn_rate - obs_state[action ] = 0 if make_fn else 1 + obs_state[action] = 0 if make_fn else 1 + + # Possibly add false positives to observation + for node in self._possible_false_positive_nodes: + if node.type in ('or', 'and') and not node.is_compromised(): + step_index = self._id_to_index[node.id] + fp_rate = node.extras.get('false_positive_rate', 0) + make_fp = fp_rate and self.rng.random() < fp_rate + obs_state[step_index] = 1 if make_fp else 0 def _observe_agents(self, performed_actions): """Collect agents observations""" From 428864dfe0e9a17c68c8ebd10d81de2931d58904 Mon Sep 17 00:00:00 2001 From: Joakim Loxdal Date: Mon, 14 Oct 2024 17:03:51 +0200 Subject: [PATCH 08/13] Remove base false positive/negative rates --- README.md | 4 ---- malsim/scenario.py | 9 --------- tests/test_scenario.py | 11 +++++++---- .../scenarios/traininglang_fp_fn_scenario.yml | 4 ---- 4 files changed, 7 insertions(+), 21 deletions(-) diff --git a/README.md b/README.md index 1cef4870..85d55f35 100644 --- a/README.md +++ b/README.md @@ -102,10 +102,6 @@ observable_attack_steps: # will be observed as inactive at a rate of x in each observation # Default false positive/negative rate is 0, which is assumed if none are given. -# A baseline that applies to all attack steps (default 0) -false_positive_base_rate: -false_negative_base_rate: - # Applies false positive rates per attack step (default 0) false_positive_rates: : diff --git a/malsim/scenario.py b/malsim/scenario.py index 712285c0..57258c7f 100644 --- a/malsim/scenario.py +++ b/malsim/scenario.py @@ -42,9 +42,7 @@ 'rewards', 'attacker_entry_points', 'observable_attack_steps', - 'false_positive_base_rate', 'false_positive_rates', - 'false_negative_base_rate', 'false_negative_rates', ] @@ -193,13 +191,6 @@ def apply_scenario_false_positive_and_negative_rates( AttackGraph either to the default value, from the base rate or from the specifically set rates per attack step""" - # Apply false positive/negative base rates for all nodes (default 0) - fp_baseline = scenario_conf.get('false_positive_base_rate', 0) - fn_baseline = scenario_conf.get('false_negative_base_rate', 0) - for node in attack_graph.nodes: - node.extras['false_positive_rate'] = fp_baseline - node.extras['false_negative_rate'] = fn_baseline - # Apply false positive rates to specified attack nodes fp_rates_per_attackstep = scenario_conf.get('false_positive_rates') if fp_rates_per_attackstep: diff --git a/tests/test_scenario.py b/tests/test_scenario.py index 7705a2ac..ef20c0ad 100644 --- a/tests/test_scenario.py +++ b/tests/test_scenario.py @@ -274,12 +274,11 @@ def test_load_scenario_false_positive_negative_rate(): ) # Defined in scenario file - fp_base_rate = 0.1 - fn_base_rate = 0.1 host_0_access_fp_rate = 0.2 host_1_access_fp_rate = 0.3 host_0_access_fn_rate = 0.4 host_1_access_fn_rate = 0.5 + user_3_compromise_fn_rate = 1.0 for node in attack_graph.nodes: if node.full_name == "Host:0:access": @@ -288,6 +287,10 @@ def test_load_scenario_false_positive_negative_rate(): elif node.full_name == "Host:1:access": assert node.extras['false_positive_rate'] == host_1_access_fp_rate assert node.extras['false_negative_rate'] == host_1_access_fn_rate + elif node.full_name == "User:3:compromise": + assert 'false_positive_rate' not in node.extras + assert node.extras['false_negative_rate'] \ + == user_3_compromise_fn_rate else: - assert node.extras['false_positive_rate'] == fp_base_rate - assert node.extras['false_negative_rate'] == fn_base_rate + assert 'false_positive_rate' not in node.extras + assert 'false_negative_rate' not in node.extras diff --git a/tests/testdata/scenarios/traininglang_fp_fn_scenario.yml b/tests/testdata/scenarios/traininglang_fp_fn_scenario.yml index 69e24041..fb83ca34 100644 --- a/tests/testdata/scenarios/traininglang_fp_fn_scenario.yml +++ b/tests/testdata/scenarios/traininglang_fp_fn_scenario.yml @@ -20,10 +20,6 @@ attacker_entry_points: attacker_agent_class: 'BreadthFirstAttacker' defender_agent_class: 'KeyboardAgent' -# Baseline applies to all attack steps -false_positive_base_rate: 0.1 -false_negative_base_rate: 0.1 - # Specific fpr/fnr for some specific steps false_positive_rates: 'Host:0:access': 0.2 From 4dde3d9407d2d2b80d7a929d1327d44ead5f844a Mon Sep 17 00:00:00 2001 From: Joakim Loxdal Date: Mon, 14 Oct 2024 17:26:42 +0200 Subject: [PATCH 09/13] Test false positive --- tests/test_mal_simulator.py | 40 +++++++++++++++++++++++++++++++++++++ 1 file changed, 40 insertions(+) diff --git a/tests/test_mal_simulator.py b/tests/test_mal_simulator.py index 59f4ef13..82ce23f6 100644 --- a/tests/test_mal_simulator.py +++ b/tests/test_mal_simulator.py @@ -971,6 +971,7 @@ def test_default_settings_defender_observation_false_negatives(): # Get an uncompromised step user_3_compromise = sim.attack_graph.get_node_by_full_name( 'User:3:compromise') + assert user_3_compromise.extras.get('false_negative_rate') == 1.0 assert attacker not in user_3_compromise.compromised_by # Let the attacker compromise User:3:compromise @@ -989,6 +990,45 @@ def test_default_settings_defender_observation_false_negatives(): assert not defender_obs_state[node_index] +def test_defender_observation_false_positives_negatives(): + """Test default MalSimulator with false negative and positive rates""" + + sim, _ = create_simulator_from_scenario( + 'tests/testdata/scenarios/traininglang_fp_fn_scenario.yml' + ) + sim.reset() + + attacker = sim.attack_graph.attackers[0] + attacker_agent_id = next(iter(sim.get_attacker_agents())) + defender_agent_id = next(iter(sim.get_defender_agents())) + + # Get an uncompromised step + user_3_compromise = sim.attack_graph.get_node_by_full_name( + 'User:3:compromise') + assert attacker not in user_3_compromise.compromised_by + + # Let the attacker compromise User:3:compromise + actions = { + attacker_agent_id: (1, sim._id_to_index[user_3_compromise.id]), + defender_agent_id: (0, None) + } + obs, _, _, _, _ = sim.step(actions) + defender_obs_state = obs[defender_agent_id]['observed_state'] + + host_1_access = sim.attack_graph.get_node_by_full_name('Host:1:access') + assert host_1_access.extras['false_positive_rate'] == 0.3 + + for i in range(100): + obs, _, _, _, _ = sim.step(actions) + host_1_access_index = sim._id_to_index[host_1_access.id] + if defender_obs_state[host_1_access_index]: + break + + if i == 99: + assert False, "False positive never happened" + + assert True, "False positive happened" + def test_simulator_settings_defender_observation(): """Test MalSimulatorSettings only show last steps in obs""" From 6c4699c5a90089752d894bb8676ceaf85163be47 Mon Sep 17 00:00:00 2001 From: Joakim Loxdal Date: Thu, 17 Oct 2024 11:43:53 +0200 Subject: [PATCH 10/13] Reset rng with seed in MalSimulator.reset --- malsim/sims/mal_simulator.py | 1 + 1 file changed, 1 insertion(+) diff --git a/malsim/sims/mal_simulator.py b/malsim/sims/mal_simulator.py index 13b90a3d..af634a7f 100644 --- a/malsim/sims/mal_simulator.py +++ b/malsim/sims/mal_simulator.py @@ -417,6 +417,7 @@ def reset( ): logger.info("Resetting simulator.") self.attack_graph = copy.deepcopy(self.attack_graph_backup) + self.rng = np.random.default_rng(seed) return self.initialize(self.max_iter) def log_mapping_tables(self): From 12c2cda3f2a8d7e51e8080ec134839f1aa377a51 Mon Sep 17 00:00:00 2001 From: Joakim Loxdal Date: Thu, 17 Oct 2024 14:09:46 +0200 Subject: [PATCH 11/13] Add test to test deterministic simulation when seed is given --- tests/test_mal_simulator.py | 64 ++++++++++++++++++++++++++++++++++++- 1 file changed, 63 insertions(+), 1 deletion(-) diff --git a/tests/test_mal_simulator.py b/tests/test_mal_simulator.py index 82ce23f6..4e1a94b6 100644 --- a/tests/test_mal_simulator.py +++ b/tests/test_mal_simulator.py @@ -1,11 +1,12 @@ """Test MalSimulator class""" +import pytest + from maltoolbox.attackgraph import AttackGraph, Attacker from malsim.sims.mal_simulator import MalSimulator from malsim.scenario import load_scenario, create_simulator_from_scenario from malsim.sims import MalSimulatorSettings - def test_malsimulator(corelang_lang_graph, model): attack_graph = AttackGraph(corelang_lang_graph, model) MalSimulator(corelang_lang_graph, model, attack_graph) @@ -990,6 +991,67 @@ def test_default_settings_defender_observation_false_negatives(): assert not defender_obs_state[node_index] +def test_default_settings_defender_observation_false_negatives_seed(): + """Test default MalSimulator with false negative rates""" + + def run_simulation_with_seed( + seed, + expected_attacker_obs, + expected_defender_obs + ): + sim.reset(seed=seed) + attacker_agent_id = next(iter(sim.get_attacker_agents())) + defender_agent_id = next(iter(sim.get_defender_agents())) + + # Get an uncompromised step + user_3_compromise = sim.attack_graph.get_node_by_full_name( + 'User:3:compromise') + + # Let the attacker compromise User:3:compromise + actions = { + attacker_agent_id: (1, sim._id_to_index[user_3_compromise.id]), + defender_agent_id: (0, None)} + + obs, _, _, _, _ = sim.step(actions) + + actual_attacker_obs_state = obs[attacker_agent_id]['observed_state'] + for index, state in enumerate(actual_attacker_obs_state): + assert state == expected_attacker_obs[index] + + actual_defender_obs_state = obs[defender_agent_id]['observed_state'] + for index, state in enumerate(actual_defender_obs_state): + assert state == expected_defender_obs[index] + + sim, _ = create_simulator_from_scenario( + 'tests/testdata/scenarios/traininglang_fp_fn_scenario.yml', + ) + + expected_attacker_obs_100 = \ + [-1, 0, 1, 0, -1, -1, -1, -1, -1, -1, -1, -1, 1, 1, -1] + expected_defender_obs_100 = \ + [0, 0, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0] + seed = 100 + + # Make sure the seed makes the obs state deterministic + # in terms of false positives + for _ in range(100): + # running it many times with same seed + # yields same observations + run_simulation_with_seed( + seed, + expected_attacker_obs_100, + expected_defender_obs_100 + ) + + with pytest.raises(AssertionError): + # running it once with different seed + # yields assertion error (different observations) + run_simulation_with_seed( + 1337, + expected_attacker_obs_100, + expected_defender_obs_100 + ) + def test_defender_observation_false_positives_negatives(): """Test default MalSimulator with false negative and positive rates""" From 5494b3ab05684eaab8fbabca2cf000488c6ba581 Mon Sep 17 00:00:00 2001 From: Joakim Loxdal Date: Mon, 21 Oct 2024 11:25:12 +0200 Subject: [PATCH 12/13] Test to make sure agents seed makes actions deterministic --- malsim/agents/searchers.py | 12 +---- tests/test_agents.py | 97 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 99 insertions(+), 10 deletions(-) create mode 100644 tests/test_agents.py diff --git a/malsim/agents/searchers.py b/malsim/agents/searchers.py index 6b1f3a42..9265cb03 100644 --- a/malsim/agents/searchers.py +++ b/malsim/agents/searchers.py @@ -25,11 +25,7 @@ class BreadthFirstAttacker: def __init__(self, agent_config: dict) -> None: self.targets: Deque[int] = deque([]) self.current_target: int = None - seed = ( - agent_config["seed"] - if agent_config.get("seed", None) - else np.random.SeedSequence().entropy - ) + seed = agent_config.get("seed", np.random.SeedSequence().entropy) self.rng = ( np.random.default_rng(seed) if agent_config.get("randomize", False) @@ -85,11 +81,7 @@ class DepthFirstAttacker: def __init__(self, agent_config: dict) -> None: self.current_target = -1 self.targets: List[int] = [] - seed = ( - agent_config["seed"] - if agent_config.get("seed", None) - else np.random.SeedSequence().entropy - ) + seed = agent_config.get("seed", np.random.SeedSequence().entropy) self.rng = ( np.random.default_rng(seed) if agent_config.get("randomize", False) diff --git a/tests/test_agents.py b/tests/test_agents.py new file mode 100644 index 00000000..16c3c68a --- /dev/null +++ b/tests/test_agents.py @@ -0,0 +1,97 @@ +"""Test MalSimulator agents""" + +import pytest + +from malsim.sims.mal_simulator import MalSimulator +from malsim.scenario import load_scenario +from malsim.agents.searchers import BreadthFirstAttacker, DepthFirstAttacker + + +def run_simulation_n_steps_assert_attacker_actions( + sim, attacker_agent, seed, expected_actions, n_steps=10): + """Run simulation n steps with attacker agent taking actions""" + + attacker_agent_id = next(iter(sim.get_attacker_agents())) + defender_agent_id = next(iter(sim.get_defender_agents())) + + obs, infos = sim.reset(seed=seed) + chosen_actions = [] + for _ in range(n_steps): + attacker_action = attacker_agent.compute_action_from_dict( + obs[attacker_agent_id], + infos[attacker_agent_id]['action_mask'] + ) + chosen_actions.append(attacker_action) + actions = { + attacker_agent_id: attacker_action, + defender_agent_id: (0, None)} + + obs, _, _, _, infos = sim.step(actions) + + for i, action in enumerate(chosen_actions): + assert action == expected_actions[i] + + +def test_bfs_attacker_actions_seed(): + """Make sure MalSimulator bfs agent with seed is deterministic""" + attack_graph, _ = load_scenario( + 'tests/testdata/scenarios/simple_scenario.yml') + + sim = MalSimulator( + attack_graph.lang_graph, attack_graph.model, attack_graph) + + attacker_agent_id = "attacker" + defender_agent_id = "defender" + sim.register_attacker(attacker_agent_id, 0) + sim.register_defender(defender_agent_id) + + seed = 1337 + # Seed should yield these actions every time + expected_actions_seed_1337 = [ + (1, 329), (1, 353), (1, 330), (1, 354), (1, 355), + (1, 356), (1, 357), (1, 331), (1, 358), (1, 283) + ] + for _ in range(10): + bfs_attacker = BreadthFirstAttacker({'randomize': True, 'seed': seed}) + run_simulation_n_steps_assert_attacker_actions( + sim, bfs_attacker, seed, expected_actions_seed_1337) + + # Different seed gives different actions + seed = 1857 + bfs_attacker = BreadthFirstAttacker({'randomize': True, 'seed': seed}) + with pytest.raises(AssertionError): + run_simulation_n_steps_assert_attacker_actions( + sim, bfs_attacker, seed, expected_actions_seed_1337) + + +def test_dfs_attacker_actions_seed(): + """Make sure MalSimulator dfs agent with seed is deterministic""" + + attack_graph, _ = load_scenario( + 'tests/testdata/scenarios/simple_scenario.yml') + + sim = MalSimulator( + attack_graph.lang_graph, attack_graph.model, attack_graph) + + attacker_agent_id = "attacker" + defender_agent_id = "defender" + sim.register_attacker(attacker_agent_id, 0) + sim.register_defender(defender_agent_id) + + seed = 1337 + # Seed should yield these actions every time + expected_actions_seed_1337 = [ + (1, 353), (1, 354), (1, 355), (1, 356), (1, 357), + (1, 358), (1, 460), (1, 461), (1, 400), (1, 401) + ] + for _ in range(10): + bfs_attacker = DepthFirstAttacker({'randomize': True, 'seed': seed}) + run_simulation_n_steps_assert_attacker_actions( + sim, bfs_attacker, seed, expected_actions_seed_1337) + + # Different seed gives different actions + seed = 1697 + bfs_attacker = DepthFirstAttacker({'randomize': True, 'seed': seed}) + with pytest.raises(AssertionError): + run_simulation_n_steps_assert_attacker_actions( + sim, bfs_attacker, seed, expected_actions_seed_1337) From f4b0f83549bfbde5c67b716f818cc26685334624 Mon Sep 17 00:00:00 2001 From: Joakim Loxdal Date: Mon, 21 Oct 2024 11:42:17 +0200 Subject: [PATCH 13/13] Add test that assert deterministic fp/fn observation when seed is given to both agent and simulator --- tests/test_mal_simulator.py | 122 +++++++++++++++++++++++++++++++++++- 1 file changed, 119 insertions(+), 3 deletions(-) diff --git a/tests/test_mal_simulator.py b/tests/test_mal_simulator.py index 4e1a94b6..05a5622c 100644 --- a/tests/test_mal_simulator.py +++ b/tests/test_mal_simulator.py @@ -6,6 +6,7 @@ from malsim.sims.mal_simulator import MalSimulator from malsim.scenario import load_scenario, create_simulator_from_scenario from malsim.sims import MalSimulatorSettings +from malsim.agents.searchers import BreadthFirstAttacker, DepthFirstAttacker def test_malsimulator(corelang_lang_graph, model): attack_graph = AttackGraph(corelang_lang_graph, model) @@ -994,7 +995,7 @@ def test_default_settings_defender_observation_false_negatives(): def test_default_settings_defender_observation_false_negatives_seed(): """Test default MalSimulator with false negative rates""" - def run_simulation_with_seed( + def run_simulation_with_seed_assert_obs( seed, expected_attacker_obs, expected_defender_obs @@ -1037,7 +1038,7 @@ def run_simulation_with_seed( for _ in range(100): # running it many times with same seed # yields same observations - run_simulation_with_seed( + run_simulation_with_seed_assert_obs( seed, expected_attacker_obs_100, expected_defender_obs_100 @@ -1046,12 +1047,127 @@ def run_simulation_with_seed( with pytest.raises(AssertionError): # running it once with different seed # yields assertion error (different observations) - run_simulation_with_seed( + run_simulation_with_seed_assert_obs( 1337, expected_attacker_obs_100, expected_defender_obs_100 ) + +def run_simulation_n_steps_with_seed_assert_obs( + sim, + seed, + attacker_agent_class, + expected_attacker_obs, + expected_defender_obs, + n_steps = 5 + ): + """Helper function""" + + obs, infos = sim.reset(seed=seed) + + attacker_agent_id = next(iter(sim.get_attacker_agents())) + defender_agent_id = next(iter(sim.get_defender_agents())) + + attacker_agent = attacker_agent_class( + {'randomize': True, 'seed': seed} + ) + + for _ in range(n_steps): + # Get an uncompromised step + attacker_action = attacker_agent.compute_action_from_dict( + obs[attacker_agent_id], + infos[attacker_agent_id]['action_mask'] + ) + + # Let the attacker compromise whatever they want + actions = { + attacker_agent_id: attacker_action, + defender_agent_id: (0, None)} + + obs, _, _, _, infos = sim.step(actions) + + actual_attacker_obs_state = obs[attacker_agent_id]['observed_state'] + for index, state in enumerate(actual_attacker_obs_state): + assert state == expected_attacker_obs[index] + + actual_defender_obs_state = obs[defender_agent_id]['observed_state'] + for index, state in enumerate(actual_defender_obs_state): + assert state == expected_defender_obs[index] + + +def test_sim_dfs_agent_seed_deterministic(): + """Make sure observations are deterministic when giving seed + to both dfs agent and simulator and false positives are set""" + + sim, _ = create_simulator_from_scenario( + 'tests/testdata/scenarios/traininglang_fp_fn_scenario.yml') + + expected_dfs_attacker_obs_100 = \ + [-1, 1, 1, 1, -1, -1, -1, -1, -1, 1, 1, -1, 1, 1, 0] + expected_defender_obs_100 = \ + [0, 1, 1, 1, 0, 0, 0, 0, 0, 1, 1, 0, 0, 1, 0] + seed = 100 + + # Make sure the seed makes the obs state deterministic + for _ in range(10): + run_simulation_n_steps_with_seed_assert_obs( + sim, + seed, + DepthFirstAttacker, + expected_dfs_attacker_obs_100, + expected_defender_obs_100 + ) + + seed = 1337 + with pytest.raises(AssertionError): + # running it once with different seed + # yields assertion error (different observations) + run_simulation_n_steps_with_seed_assert_obs( + sim, + seed, + DepthFirstAttacker, + expected_dfs_attacker_obs_100, + expected_defender_obs_100 + ) + + +def test_sim_bfs_agent_seed_deterministic(): + """Make sure observations are deterministic when giving seed + to both bfs agent and simulator and false positives are set""" + + sim, _ = create_simulator_from_scenario( + 'tests/testdata/scenarios/traininglang_fp_fn_scenario.yml') + + expected_bfs_attacker_obs_100 = \ + [-1, 1, 1, 1, -1, -1, 0, -1, -1, 0, 1, -1, 1, 1, 1] + expected_defender_obs_100 = \ + [0, 1, 1, 1, 0, 0, 0, 0, 0, 0, 1, 0, 0, 1, 1] + seed = 100 + + # Make sure the seed makes the obs state deterministic + for _ in range(10): + run_simulation_n_steps_with_seed_assert_obs( + sim, + seed, + BreadthFirstAttacker, + expected_bfs_attacker_obs_100, + expected_defender_obs_100 + ) + + seed = 1337 + with pytest.raises(AssertionError): + # running it once with different seed + # yields assertion error (different observations) + run_simulation_n_steps_with_seed_assert_obs( + sim, + seed, + BreadthFirstAttacker, + expected_bfs_attacker_obs_100, + expected_defender_obs_100 + ) + + def test_defender_observation_false_positives_negatives(): """Test default MalSimulator with false negative and positive rates"""