diff --git a/opfgym/opf_env.py b/opfgym/opf_env.py index ece4125..ee18234 100644 --- a/opfgym/opf_env.py +++ b/opfgym/opf_env.py @@ -136,12 +136,13 @@ def __init__(self, if reward_function is None: # Default reward self.reward_function = opfgym.reward.Summation( - **reward_function_params) + env=self, **reward_function_params) elif isinstance(reward_function, str): # Load by string (e.g. 'Summation' or 'summation') reward_class = opfgym.util.load_class_from_module( reward_function, 'opfgym.reward') - self.reward_function = reward_class(**reward_function_params) + self.reward_function = reward_class( + env=self, **reward_function_params) elif isinstance(reward_function, opfgym.RewardFunction): # User-defined reward function self.reward_function = reward_function diff --git a/opfgym/reward.py b/opfgym/reward.py index ade593a..e758881 100644 --- a/opfgym/reward.py +++ b/opfgym/reward.py @@ -1,5 +1,6 @@ import abc +import copy import numpy as np @@ -9,37 +10,35 @@ def __init__(self, penalty_weight: float = 0.5, clip_range: tuple[float, float] = None, reward_scaling: str = None, - reward_scaling_params: dict = None, + scaling_params: dict = None, env = None): self.penalty_weight = penalty_weight self.clip_range = clip_range - self.prepare_reward_scaling(reward_scaling, reward_scaling_params, env) + self.scaling_params = self.prepare_reward_scaling( + reward_scaling, scaling_params, env) def prepare_reward_scaling(self, reward_scaling: str, - reward_scaling_params: dict, + scaling_params: dict, env) -> None: """ Prepare the reward scaling parameters for later use. """ - reward_scaling_params = reward_scaling_params or {} - if reward_scaling_params == 'auto' or ( - 'num_samples' in reward_scaling_params) or ( - not reward_scaling_params and reward_scaling): - scaling_params = estimate_reward_distribution(env, **reward_scaling_params) - else: - scaling_params = reward_scaling_params + if not isinstance(reward_scaling, str): + return {'penalty_factor': 1, 'penalty_bias': 0, + 'objective_factor': 1, 'objective_bias': 0} - if not reward_scaling: - scaling_params.update({'penalty_factor': 1, 'penalty_bias': 0, - 'objective_factor': 1, 'objective_bias': 0}) - elif reward_scaling == 'minmax11': - scaling_params.update(calculate_minmax_11_params(**scaling_params)) - elif reward_scaling == 'minmax01': - scaling_params.update(calculate_minmax_01_params(**scaling_params)) - elif reward_scaling == 'normalization': - scaling_params.update(calculate_normalization_params(**scaling_params)) - else: - raise NotImplementedError('This reward scaling does not exist!') + scaling_params = scaling_params or {} + user_scaling_params = copy.copy(scaling_params) + + reward_scaler = select_reward_scaler(reward_scaling) + try: + scaling_params.update(reward_scaler(**scaling_params)) + except: + scaling_params = estimate_reward_distribution(env, **scaling_params) + scaling_params.update(reward_scaler(**scaling_params)) + + # If the user defined some values, use these values instead + scaling_params.update(user_scaling_params) # Error handling if there were no constraint violations if np.isnan(scaling_params['penalty_bias']): @@ -47,7 +46,17 @@ def prepare_reward_scaling(self, if np.isinf(scaling_params['penalty_factor']): scaling_params['penalty_factor'] = 1 - self.scaling_params = scaling_params + return scaling_params + + def get_reward_scaler(self, reward_scaling: str): + if reward_scaling == 'minmax11': + return calculate_minmax11_params + elif reward_scaling == 'minmax01': + return calculate_minmax01_params + elif reward_scaling == 'normalization': + return calculate_normalization_params + else: + raise NotImplementedError('This reward scaling does not exist!') def __call__(self, objective: float, penalty: float, valid: bool) -> float: objective = self.adjust_objective(objective, valid) @@ -97,6 +106,17 @@ def adjust_objective(self, objective: float, valid: bool) -> float: return objective +def select_reward_scaler(reward_scaling: str): + if reward_scaling == 'minmax11': + return calculate_minmax11_params + elif reward_scaling == 'minmax01': + return calculate_minmax01_params + elif reward_scaling == 'normalization': + return calculate_normalization_params + else: + raise NotImplementedError('This reward scaling does not exist!') + + def calculate_normalization_params( std_objective: float, mean_objective: float, @@ -116,7 +136,7 @@ def calculate_normalization_params( return params -def calculate_minmax_01_params( +def calculate_minmax01_params( min_objective: float, max_objective: float, min_penalty: float, @@ -137,7 +157,7 @@ def calculate_minmax_01_params( return params -def calculate_minmax_11_params( +def calculate_minmax11_params( min_objective: float, max_objective: float, min_penalty: float, diff --git a/tests/test_reward.py b/tests/test_reward.py index f96da0b..18ea392 100644 --- a/tests/test_reward.py +++ b/tests/test_reward.py @@ -21,10 +21,10 @@ def test_reward_class(): assert reward_fct.compute_total_reward(penalty=0.4, objective=0.5) == 0.9 # Test reward minmax scaling in range [-1, 1] - reward_scaling_params = {'min_objective': 2.0, 'max_objective': 10.0, + scaling_params = {'min_objective': 2.0, 'max_objective': 10.0, 'min_penalty': 0.0, 'max_penalty': 5.0} reward_fct = reward.Summation(reward_scaling='minmax11', - reward_scaling_params=reward_scaling_params) + scaling_params=scaling_params) assert reward_fct.scale_objective(6.0) == 0.0 assert reward_fct.scale_objective(2.0) == -1.0 assert reward_fct.scale_objective(10.0) == 1.0 @@ -33,10 +33,10 @@ def test_reward_class(): assert reward_fct.scale_penalty(5.0) == 1.0 # Test reward minmax scaling in range [0, 1] - reward_scaling_params = {'min_objective': 2.0, 'max_objective': 10.0, + scaling_params = {'min_objective': 2.0, 'max_objective': 10.0, 'min_penalty': 0.0, 'max_penalty': 5.0} reward_fct = reward.Summation(reward_scaling='minmax01', - reward_scaling_params=reward_scaling_params) + scaling_params=scaling_params) assert reward_fct.scale_objective(6.0) == 0.5 assert reward_fct.scale_objective(2.0) == 0.0 assert reward_fct.scale_objective(10.0) == 1.0 @@ -45,10 +45,10 @@ def test_reward_class(): assert reward_fct.scale_penalty(5.0) == 1.0 # Test reward normalization scaling - reward_scaling_params = {'std_objective': 2.0, 'mean_objective': 6.0, + scaling_params = {'std_objective': 2.0, 'mean_objective': 6.0, 'std_penalty': 1.0, 'mean_penalty': 2.5} reward_fct = reward.Summation(reward_scaling='normalization', - reward_scaling_params=reward_scaling_params) + scaling_params=scaling_params) assert reward_fct.scale_objective(6.0) == 0.0 assert reward_fct.scale_objective(2.0) == -2.0 assert reward_fct.scale_objective(8.0) == 1.0