diff --git a/Control_Toolkit_ASF_Template/Controllers/controller_barebone.py b/Control_Toolkit_ASF_Template/Controllers/controller_barebone.py index ecc1858..b25092c 100644 --- a/Control_Toolkit_ASF_Template/Controllers/controller_barebone.py +++ b/Control_Toolkit_ASF_Template/Controllers/controller_barebone.py @@ -9,7 +9,8 @@ import os from Control_Toolkit.Controllers import template_controller -from others.globals_and_utils import create_rng +from others.globals_and_utils import create_rng, update_attributes + # TODO: You can load and access config files here, like this: # config = yaml.load(open("config.yml", "r"), Loader=yaml.FullLoader) @@ -31,10 +32,11 @@ def configure(self): # u = 0.0 pass - def step(self, s: np.ndarray, time=None, updated_attributes: "dict[str, TensorType]" = {}): + def step(self, state: np.ndarray, time=None, updated_attributes: "dict[str, TensorType]" = {}): + # The controller has to adapt when environment-related attributes such as target positions change # Updated targets etc. are passed as a dictionary updated_attributes - self.update_attributes(updated_attributes) # After this call, updated attributes are available as self.<> + update_attributes(updated_attributes,self) # After this call, updated attributes are available as self.<> # TODO: Implement your controller here # Examples: diff --git a/Control_Toolkit_ASF_Template/Controllers/controller_do_mpc.py b/Control_Toolkit_ASF_Template/Controllers/controller_do_mpc.py index 1d2f7d7..c2431fa 100644 --- a/Control_Toolkit_ASF_Template/Controllers/controller_do_mpc.py +++ b/Control_Toolkit_ASF_Template/Controllers/controller_do_mpc.py @@ -9,7 +9,7 @@ from CartPole.cartpole_model import Q2u, cartpole_ode_namespace from CartPole.state_utilities import cartpole_state_vector_to_namespace from Control_Toolkit.Controllers import template_controller -from others.globals_and_utils import create_rng +from others.globals_and_utils import create_rng, update_attributes from SI_Toolkit.computation_library import NumpyLibrary, TensorType @@ -136,16 +136,16 @@ def tvp_fun(self, t_ind): return self.tvp_template - def step(self, s: np.ndarray, time=None, updated_attributes: "dict[str, TensorType]" = {}): - self.update_attributes(updated_attributes) + def step(self, state: np.ndarray, time=None, updated_attributes: "dict[str, TensorType]" = {}): + update_attributes(updated_attributes,self) - s = cartpole_state_vector_to_namespace(s) + state = cartpole_state_vector_to_namespace(state) - self.x0['s.position'] = s.position - self.x0['s.positionD'] = s.positionD + self.x0['s.position'] = state.position + self.x0['s.positionD'] = state.positionD - self.x0['s.angle'] = s.angle - self.x0['s.angleD'] = s.angleD + self.x0['s.angle'] = state.angle + self.x0['s.angleD'] = state.angleD self.tvp_template['_tvp', :, 'target_position'] = self.target_position diff --git a/Control_Toolkit_ASF_Template/Controllers/controller_do_mpc_discrete.py b/Control_Toolkit_ASF_Template/Controllers/controller_do_mpc_discrete.py index 11f40c5..5e23315 100644 --- a/Control_Toolkit_ASF_Template/Controllers/controller_do_mpc_discrete.py +++ b/Control_Toolkit_ASF_Template/Controllers/controller_do_mpc_discrete.py @@ -11,6 +11,7 @@ from CartPole.state_utilities import cartpole_state_vector_to_namespace from Control_Toolkit.Controllers import template_controller from SI_Toolkit.computation_library import NumpyLibrary, TensorType +from others.globals_and_utils import update_attributes def mpc_next_state(s, u, dt): @@ -147,16 +148,16 @@ def configure(self): def tvp_fun(self, t_ind): return self.tvp_template - def step(self, s: np.ndarray, time=None, updated_attributes: "dict[str, TensorType]" = {}): - self.update_attributes(updated_attributes) + def step(self, state: np.ndarray, time=None, updated_attributes: "dict[str, TensorType]" = {}): + update_attributes(updated_attributes,self) - s = cartpole_state_vector_to_namespace(s) + state = cartpole_state_vector_to_namespace(state) - self.x0['s.position'] = s.position - self.x0['s.positionD'] = s.positionD + self.x0['s.position'] = state.position + self.x0['s.positionD'] = state.positionD - self.x0['s.angle'] = s.angle - self.x0['s.angleD'] = s.angleD + self.x0['s.angle'] = state.angle + self.x0['s.angleD'] = state.angleD self.tvp_template['_tvp', :, 'target_position'] = self.target_position diff --git a/Control_Toolkit_ASF_Template/Controllers/controller_lqr.py b/Control_Toolkit_ASF_Template/Controllers/controller_lqr.py index 4060ad1..3b895a5 100644 --- a/Control_Toolkit_ASF_Template/Controllers/controller_lqr.py +++ b/Control_Toolkit_ASF_Template/Controllers/controller_lqr.py @@ -14,7 +14,7 @@ from CartPole.state_utilities import (ANGLE_IDX, ANGLED_IDX, POSITION_IDX, POSITIOND_IDX) from Control_Toolkit.Controllers import template_controller -from others.globals_and_utils import create_rng +from others.globals_and_utils import create_rng, update_attributes config = yaml.load(open("config.yml", "r"), Loader=yaml.FullLoader) actuator_noise = config["cartpole"]["actuator_noise"] @@ -80,11 +80,11 @@ def configure(self): self.X = X self.eigVals = eigVals - def step(self, s: np.ndarray, time=None, updated_attributes: "dict[str, TensorType]" = {}): - self.update_attributes(updated_attributes) + def step(self, state: np.ndarray, time=None, updated_attributes: "dict[str, TensorType]" = {}): + update_attributes(updated_attributes,self) state = np.array( - [[s[POSITION_IDX] - self.target_position], [s[POSITIOND_IDX]], [s[ANGLE_IDX]], [s[ANGLED_IDX]]]) + [[state[POSITION_IDX] - self.target_position], [state[POSITIOND_IDX]], [state[ANGLE_IDX]], [state[ANGLED_IDX]]]) Q = np.dot(-self.K, state).item() diff --git a/Control_Toolkit_ASF_Template/Cost_Functions/EnvironmentName/cost_function_barebone.py b/Control_Toolkit_ASF_Template/Cost_Functions/EnvironmentName/cost_function_barebone.py index d3355d2..edf60dc 100644 --- a/Control_Toolkit_ASF_Template/Cost_Functions/EnvironmentName/cost_function_barebone.py +++ b/Control_Toolkit_ASF_Template/Cost_Functions/EnvironmentName/cost_function_barebone.py @@ -9,7 +9,7 @@ # TODO: Load constants from the cost config file, like this: -config = yaml.load(open(os.path.join("Control_Toolkit_ASF", "config_cost_function.yml"), "r"), Loader=yaml.FullLoader) +config = yaml.load(open(os.path.join("Control_Toolkit_ASF", "config_cost_functions.yml"), "r"), Loader=yaml.FullLoader) # TODO: Rename parent folder from EnvironmentName to the actual name of you environment # TODO: Load constants like this: @@ -23,7 +23,7 @@ class cost_function_barebone(cost_function_base): """This class can contain arbitrary helper functions to compute the cost of a trajectory or inputs.""" MAX_COST = 0.0 # Define maximum value the cost can take. Used for shifting - + # Example: Cost for difference from upright position # def _E_pot_cost(self, angle): # """Compute penalty for not balancing pole upright (penalize large angles)""" @@ -39,8 +39,8 @@ def get_terminal_cost(self, terminal_states: TensorType): # return terminal_cost pass - # all stage costs together - def _get_stage_cost(self, states: TensorType, inputs: TensorType, previous_input: TensorType): + # all stage costs together. A 'stage' is one timestep of a rollout. + def get_stage_cost(self, states: TensorType, inputs: TensorType, previous_input: TensorType): # Shape of states: [batch_size, mpc_horizon, num_states] # TODO: Compute stage cost # return stage_cost diff --git a/Control_Toolkit_ASF_Template/config_controllers.yml b/Control_Toolkit_ASF_Template/config_controllers.yml index ed1d8f5..c7383e1 100644 --- a/Control_Toolkit_ASF_Template/config_controllers.yml +++ b/Control_Toolkit_ASF_Template/config_controllers.yml @@ -11,7 +11,6 @@ mpc: controller_logging: true do-mpc-discrete: mpc_horizon: 50 # steps - num_rollouts: 1 # Initial positions position_init: 0.0 positionD_init: 0.0 @@ -21,7 +20,6 @@ do-mpc-discrete: do-mpc: seed: null # If null, random seed based on datetime is used mpc_horizon: 50 # steps - num_rollouts: 1 p_Q: 0.00 # Perturbation factors: Change of output from optimal # Random change of cost function by factor p_position: 0.0 diff --git a/Controllers/__init__.py b/Controllers/__init__.py index 1598823..c078b49 100644 --- a/Controllers/__init__.py +++ b/Controllers/__init__.py @@ -1,20 +1,23 @@ import os from abc import ABC, abstractmethod -from typing import Tuple +from pathlib import Path +from typing import Tuple, Union import numpy as np import yaml -from Control_Toolkit.others.globals_and_utils import get_logger from SI_Toolkit.computation_library import (ComputationLibrary, NumpyLibrary, PyTorchLibrary, TensorFlowLibrary, TensorType) +from others.globals_and_utils import load_or_reload_config_if_modified -config_cost_function = yaml.load(open(os.path.join("Control_Toolkit_ASF", "config_cost_function.yml")), Loader=yaml.FullLoader) -logger = get_logger(__name__) +from Control_Toolkit.others.get_logger import get_logger +log = get_logger(__name__) + +config_cost_function = yaml.load(open(os.path.join("Control_Toolkit_ASF", "config_cost_functions.yml")), Loader=yaml.FullLoader) """ For a controller to be found and imported by CartPoleGUI/DataGenerator it must: -1. Be in Controller folder +1. Be in Controllers folder 2. Have a name starting with "controller_" 3. The name of the controller class must be the same as the name of the file. 4. It must have __init__ and step methods @@ -26,7 +29,7 @@ class template_controller(ABC): _has_optimizer = False # Define the computation library in your controller class or in the controller's configuration: - _computation_library: "type[ComputationLibrary]" = None + _computation_library: ComputationLibrary = None def __init__( self, @@ -38,14 +41,17 @@ def __init__( initial_environment_attributes: "dict[str, TensorType]", ): # Load controller config and select the entry for the current controller - config_controllers = yaml.load( - open(os.path.join("Control_Toolkit_ASF", "config_controllers.yml")), - Loader=yaml.FullLoader - ) + (config_controllers,_) = load_or_reload_config_if_modified(os.path.join("Control_Toolkit_ASF", "config_controllers.yml")) # ignore the _ changes return since this is initial call + f=os.path.join("Control_Toolkit_ASF", "config_controllers.yml") + fp=Path(f) + log.debug(f'loading controller config from "{fp.absolute()}"') + config_controllers = yaml.load(open(f), Loader=yaml.FullLoader) # self.controller_name is inferred from the class name, which is the class being instantiated # Example: If you create a controller_mpc, this controller_template.__init__ will be called # but the class name will be controller_mpc, not template_controller. - self.config_controller = dict(config_controllers[self.controller_name]) + config_key=self.controller_name + self.config_controller = config_controllers[config_key] + # add timestep .dt to all controllers here self.config_controller["dt"] = dt # Set computation library @@ -53,7 +59,7 @@ def __init__( if computation_library_name: # Assign computation library from config - logger.info(f"Found library {computation_library_name} for MPC controller.") + log.info(f"Found library {computation_library_name} for MPC controller.") if "tensorflow" in computation_library_name.lower(): self._computation_library = TensorFlowLibrary elif "pytorch" in computation_library_name.lower(): @@ -67,7 +73,7 @@ def __init__( if not issubclass(self.computation_library, ComputationLibrary): raise ValueError(f"{self.__class__.__name__} does not have a default computation library set. You have to define one in this controller's config.") else: - logger.info(f"No computation library specified in controller config. Using default {self.computation_library} for class.") + log.info(f"No computation library specified in controller config. Using default {self.computation_library} for class.") self.lib = self.computation_library # Shortcut to make easy using functions from computation library, this is also used by CompileAdaptive to recognize library # Environment-related parameters @@ -78,15 +84,20 @@ def __init__( self.num_control_inputs = num_control_inputs self.control_limits = control_limits self.action_low, self.action_high = self.control_limits - + + # todo these are special for cartpole but we would need a base cartpole controller class to put them there + # self.target_position=None + # self.target_equilibrium=None + # Set properties like target positions on this controller - for p, v in initial_environment_attributes.items(): - if type(v) in {np.ndarray, float, int, bool}: - data_type = getattr(v, "dtype", self.lib.float32) - data_type = self.lib.int32 if data_type == int else self.lib.float32 - v = self.lib.to_variable(v, data_type) - setattr(self, p, v) - + for property, new_value in initial_environment_attributes.items(): + setattr(self, property, self.computation_library.to_variable(new_value, self.computation_library.float32)) + + # set all controller config numerical values as float variables in the computation space, e.g. tensorflow, so they can be updqted during runtime + for property, value in self.config_controller.items(): + if value is float or value is int: + setattr(self, property, self.computation_library.to_variable(value, self.computation_library.float32)) + # Initialize control variable self.u = 0.0 @@ -106,21 +117,30 @@ def __init__( def configure(self, **kwargs): # In your controller, implement any additional initialization steps here pass - - def update_attributes(self, updated_attributes: "dict[str, TensorType]"): + + def update_attributes(self, updated_attributes: "dict[str,TensorType]"): for property, new_value in updated_attributes.items(): try: - # Assume the variable is an attribute type and assign - attr = getattr(self, property) + attr = getattr(self.variable_parameters, property) self.computation_library.assign(attr, self.lib.to_tensor(new_value, attr.dtype)) except: - setattr(self, property, new_value) - + setattr(self.variable_parameters, property, new_value) + @abstractmethod - def step(self, s: np.ndarray, time=None, updated_attributes: "dict[str, TensorType]" = {}): + def step(self, state: np.ndarray, time:float=None, updated_attributes: "dict[str, Union[TensorType,float]]" = dict()): + """ + Execute one timestep of control. + + :param state: the state array, dimensions are TODO add dimension to help users + :param time: the time now in seconds + :param updated_attributes: dict of updated attributes + + :returns: the next control action u e.g. a normed control input in the range [-1,1] TODO is this correct? + + """ ### Any computations in order to retrieve the current control. Such as: ## If the environment's target positions etc. change, copy the new attributes over to this controller so the cost function knows about it: - # self.update_attributes(updated_attributes) + # update_attributes(updated_attributes,self) ## Use some sort of optimization procedure to get your control, e.g. # u = self.optimizer.step(s, time) ## Use the following call to populate the self.logs dictionary with savevars, such as: @@ -132,7 +152,7 @@ def step(self, s: np.ndarray, time=None, updated_attributes: "dict[str, TensorTy # Optionally: A method called after an experiment. # May be used to print some statistics about controller performance (e.g. number of iter. to converge) def controller_report(self): - logger.info("No controller report implemented for this controller. Stopping without report.") + log.info("No controller report implemented for this controller. Stopping without report.") pass # Optionally: reset the controller after an experiment @@ -143,10 +163,17 @@ def controller_report(self): def controller_reset(self): raise NotImplementedError - @property + @property # decorates the controller so it has the field .controller_name that gets its short name which is the key in the .yml config file def controller_name(self): + """ Generates standard name for this controller, but use this method like it were a field. + + :returns: the short name which is the key to the controller in the config_controller.yml file, e.g. 'cartpole_mppi' + + """ name = self.__class__.__name__ if name != "template_controller": + if 'controller_' not in name: + raise AttributeError(f'this controller named "{name}" does not contain "controller_". Controllers should start or contain "controller_" and the key in the config_controllers.yml file should follow the underscore') return name.replace("controller_", "").replace("_", "-").lower() else: raise AttributeError() @@ -156,7 +183,7 @@ def controller_data_for_csv(self): return {} @property - def computation_library(self) -> "type[ComputationLibrary]": + def computation_library(self) -> ComputationLibrary: if self._computation_library == None: raise NotImplementedError("Controller class needs to specify its computation library") return self._computation_library @@ -177,6 +204,8 @@ def get_outputs(self) -> "dict[str, np.ndarray]": } def update_logs(self, logging_values: "dict[str, TensorType]") -> None: + """ Appends controller logging information to memory in self.logs if self.controller_logging exists, according to self.save_vars + """ if self.controller_logging: for name, var in zip( self.save_vars, [logging_values.get(var_name, None) for var_name in self.save_vars] @@ -185,3 +214,13 @@ def update_logs(self, logging_values: "dict[str, TensorType]") -> None: self.logs[name].append( var.numpy().copy() if hasattr(var, "numpy") else var.copy() ) + + def keyboard_input(self, c): + """ process keyboard input character. Default implementation does nothing. + :param c: character + """ + pass + + def print_keyboard_help(self): + """ Print help for keybaord input""" + pass diff --git a/Controllers/controller_mpc.py b/Controllers/controller_mpc.py index d6cd4ae..056b6d8 100644 --- a/Controllers/controller_mpc.py +++ b/Controllers/controller_mpc.py @@ -1,5 +1,7 @@ import os from typing import Optional + +from Control_Toolkit_ASF.Cost_Functions.CartPole.cartpole_trajectory_generator import cartpole_trajectory_generator from SI_Toolkit.Predictors.predictor_wrapper import PredictorWrapper import numpy as np @@ -9,42 +11,61 @@ from Control_Toolkit.Optimizers import template_optimizer from SI_Toolkit.computation_library import TensorType -from Control_Toolkit.others.globals_and_utils import get_logger, import_optimizer_by_name +from Control_Toolkit.others.globals_and_utils import import_optimizer_by_name from torch import inference_mode +from others.globals_and_utils import load_or_reload_config_if_modified, update_attributes + +from Control_Toolkit.others.get_logger import get_logger +log = get_logger(__name__) + config_optimizers = yaml.load(open(os.path.join("Control_Toolkit_ASF", "config_optimizers.yml")), Loader=yaml.FullLoader) -config_cost_function = yaml.load(open(os.path.join("Control_Toolkit_ASF", "config_cost_function.yml")), Loader=yaml.FullLoader) -logger = get_logger(__name__) +config_cost_function = yaml.load(open(os.path.join("Control_Toolkit_ASF", "config_cost_functions.yml")), Loader=yaml.FullLoader) class controller_mpc(template_controller): + _has_optimizer = True - + + def __init__( + self, + dt: float, + environment_name: str, + num_states: int, + num_control_inputs: int, + control_limits: "Tuple[np.ndarray, np.ndarray]", + initial_environment_attributes: "dict[str, TensorType]", + ): + super().__init__(dt, environment_name, num_states, num_control_inputs, control_limits, + initial_environment_attributes) + self.cartpole_trajectory_generator:cartpole_trajectory_generator = cartpole_trajectory_generator() # TODO awkward to add this specific cost function generator here to generic controller_mpc + def configure(self, optimizer_name: Optional[str]=None, predictor_specification: Optional[str]=None): if optimizer_name in {None, ""}: optimizer_name = str(self.config_controller["optimizer"]) - logger.info(f"Using optimizer {optimizer_name} specified in controller config file") + log.info(f'Using optimizer \'{optimizer_name}\' specified in controller config file') if predictor_specification in {None, ""}: predictor_specification: Optional[str] = self.config_controller.get("predictor_specification", None) - logger.info(f"Using predictor {predictor_specification} specified in controller config file") - + log.info(f"Using predictor_specification='{predictor_specification}' specified in controller config file") + config_optimizer = config_optimizers[optimizer_name] - + # Create cost function cost_function_specification = self.config_controller.get("cost_function_specification", None) - self.cost_function = CostFunctionWrapper() - self.cost_function.configure(self, cost_function_specification=cost_function_specification) - + log.info(f'using cost_function_specification=\'{cost_function_specification}\' in config {self.config_controller}') + self.cost_function_wrapper = CostFunctionWrapper() + self.cost_function_wrapper.configure(self, cost_function_specification=cost_function_specification) + # Create predictor - self.predictor = PredictorWrapper() + self.predictor_wrapper = PredictorWrapper() # MPC Controller always has an optimizer Optimizer = import_optimizer_by_name(optimizer_name) self.optimizer: template_optimizer = Optimizer( - predictor=self.predictor, - cost_function=self.cost_function, + predictor=self.predictor_wrapper, + cost_function=self.cost_function_wrapper, num_states=self.num_states, num_control_inputs=self.num_control_inputs, control_limits=self.control_limits, @@ -56,7 +77,7 @@ def configure(self, optimizer_name: Optional[str]=None, predictor_specification: # Do this here. If the optimizer does not require any additional parameters, it will ignore them. self.optimizer.configure(dt=self.config_controller["dt"], predictor_specification=predictor_specification) - self.predictor.configure( + self.predictor_wrapper.configure( batch_size=self.optimizer.num_rollouts, horizon=self.optimizer.mpc_horizon, dt=self.config_controller["dt"], @@ -70,12 +91,67 @@ def configure(self, optimizer_name: Optional[str]=None, predictor_specification: self.step = self.step - def step(self, s: np.ndarray, time=None, updated_attributes: "dict[str, TensorType]" = {}): - self.update_attributes(updated_attributes) - u = self.optimizer.step(s, time) + def step(self, state: np.ndarray, time:float=None, updated_attributes: "dict[str, TensorType]" = {})->float: + """ Compute one step of control. + + :param state: the current state as 1d state vector + :param time: the current time in seconds + :param updated_attributes: a dict of values to update tensorflow assignment values in the cost function + + :returns: the control vector (for cartpole, a scalar cart acceleration) + + """ + # log.debug(f'step time={time:.3f}s') + + + # following gets target_position, target_equilibrium, and target_trajectory passed to tensorflow. The trajectory is passed in + # as updated_attributes, and is transferred to tensorflow by the update_attributes call + cost_function=self.cost_function_wrapper.cost_function + update_attributes(updated_attributes,cost_function) # update target_position and target_equilibrium in cost function to use + updated_attributes.clear() + self.cartpole_trajectory_generator.\ + generate_cartpole_trajectory(time=time, state=state, controller=self, cost_function=self.cost_function_wrapper.cost_function) + + # now we fill this dict with config file changes if there are any and update attributes in the controller, the cost function, and the optimizer + # detect any changes in config scalar values and pass to this controller or the cost function or optimizer + + # note that the cost function that has its attributes updated is the enclosed cost function of the wrapper! + # note the following mapping for config files + # config_controller.yml -> self (i.e. controller_mpc) + # config_cost_functions.yml -> self.cost_function_wrapper.cost_function + # config_optimizers.yml -> self.optimizer AND self.predictor_wrapper.predictor + for (objs,config) in (((self,),'config_controllers.yml'), ((self.cost_function_wrapper.cost_function,), 'config_cost_functions.yml'), ((self.optimizer,self.predictor_wrapper.predictor), 'config_optimizers.yml')): + (config,changes)=load_or_reload_config_if_modified(os.path.join('Control_Toolkit_ASF',config), search_path=['CartPoleSimulation']) # all the config files are currently assumed to be in Control_Toolkit_ASF folder + # process changes to configs using new returned change list + if not changes is None: + for k,v in changes.items(): + if isinstance(v, (int, float, str)): + updated_attributes[k]=v + for o in objs: # for each object in objs, update its attributes + update_attributes(updated_attributes,o) + log.debug(f'updated {objs} with scalar updated_attributes {updated_attributes}') + + # obtain the next control action from the optimizer + u = self.optimizer.step(state, time) + + # for analysis of model mismatch, obtain the prediction of next state with this control action + + self.update_logs(self.optimizer.logging_values) return u def controller_reset(self): self.optimizer.optimizer_reset() - \ No newline at end of file + self.cartpole_trajectory_generator.reset() + + def keyboard_input(self, c): + try: + self.cartpole_trajectory_generator.keyboard_input(c) + except AttributeError: + pass + + def print_keyboard_help(self): + try: + self.cartpole_trajectory_generator.print_keyboard_help() + except AttributeError: + pass diff --git a/Controllers/controller_neural_imitator_pytorch.py b/Controllers/controller_neural_imitator_pytorch.py index b37e764..0e96ad5 100644 --- a/Controllers/controller_neural_imitator_pytorch.py +++ b/Controllers/controller_neural_imitator_pytorch.py @@ -5,6 +5,7 @@ from Control_Toolkit.Controllers import template_controller from SI_Toolkit.load_and_normalize import normalize_numpy_array +from others.globals_and_utils import update_attributes try: from SI_Toolkit_ASF.predictors_customization import STATE_INDICES @@ -44,10 +45,10 @@ def configure(self): self.net.reset() self.net.eval() - def step(self, s: np.ndarray, time=None, updated_attributes: "dict[str, TensorType]" = {}): - self.update_attributes(updated_attributes) + def step(self, state: np.ndarray, time=None, updated_attributes: "dict[str, TensorType]" = {}): + update_attributes(updated_attributes,self) - net_input = s[ + net_input = state[ ..., [STATE_INDICES.get(key) for key in self.net_info.inputs[:-1]] ] # -1 is a fix to exclude target position net_input = np.append(net_input, self.target_position) diff --git a/Controllers/controller_neural_imitator_tf.py b/Controllers/controller_neural_imitator_tf.py index 9b5e6b5..a7235db 100644 --- a/Controllers/controller_neural_imitator_tf.py +++ b/Controllers/controller_neural_imitator_tf.py @@ -4,6 +4,8 @@ import numpy as np from Control_Toolkit.Controllers import template_controller +from SI_Toolkit.load_and_normalize import normalize_numpy_array +from others.globals_and_utils import update_attributes from SI_Toolkit.load_and_normalize import normalize_numpy_array, denormalize_numpy_array try: @@ -39,10 +41,10 @@ def configure(self): self.evaluate_net = CompileAdaptive(self._evaluate_net) - def step(self, s: np.ndarray, time=None, updated_attributes: "dict[str, TensorType]" = {}): - self.update_attributes(updated_attributes) + def step(self, state: np.ndarray, time=None, updated_attributes: "dict[str, TensorType]" = {}): + update_attributes(updated_attributes,self) - net_input = s[ + net_input = state[ ..., [STATE_INDICES.get(key) for key in self.net_info.inputs[:-1]] ] # -1 is a fix to exclude target position net_input = np.append(net_input, self.target_position) diff --git a/Cost_Functions/__init__.py b/Cost_Functions/__init__.py index 7bd952c..2a26ea6 100644 --- a/Cost_Functions/__init__.py +++ b/Cost_Functions/__init__.py @@ -1,22 +1,36 @@ +from typing import Optional + from SI_Toolkit.computation_library import ComputationLibrary, NumpyLibrary, PyTorchLibrary, TensorFlowLibrary, TensorType from Control_Toolkit.Controllers import template_controller -from Control_Toolkit.others.globals_and_utils import get_logger - -logger = get_logger(__name__) +from Control_Toolkit.others.get_logger import get_logger +log = get_logger(__name__) class cost_function_base: + """ Base cost function for all MPC systems + """ # Default: Class supports all libs to compute costs supported_computation_libraries = {NumpyLibrary, TensorFlowLibrary, PyTorchLibrary} # Define default values used for cost normalization MIN_COST = -1.0 MAX_COST = 0.0 COST_RANGE = MAX_COST - MIN_COST - - def __init__(self, controller: template_controller, ComputationLib: "type[ComputationLibrary]") -> None: - self.controller = controller + + def __init__(self, controller: template_controller, ComputationLib: "type[ComputationLibrary]", config:dict=None) -> None: + """ makes a new cost function + + :param controller: the controller + :param ComputationLib: the library, e.g. python, tensorflow + :param config: the dict of configuration for this cost function. The caller can modify the config to change behavior during runtime. + + """ + + self.lib:Optional[ComputationLibrary] = None + self.controller:template_controller = controller + self.config:dict=config self.set_computation_library(ComputationLib) - + log.info(f'constructed {self} with controller {controller} computation library {ComputationLib} and config {config}') + def get_terminal_cost(self, terminal_states: TensorType) -> TensorType: """Compute a batch of terminal costs for a batch of terminal states. @@ -28,7 +42,7 @@ def get_terminal_cost(self, terminal_states: TensorType) -> TensorType: # Default behavior: Return a zero cost scalar per sample of batch return self.lib.zeros_like(terminal_states)[:,:1] # Shape: (batch_size x 1) - def get_stage_cost(self, states: TensorType, inputs: TensorType, previous_input: TensorType) -> TensorType: + def get_stage_cost(self, states: TensorType, inputs: TensorType, previous_input: TensorType, time:float) -> TensorType: """Compute all stage costs of a batch of states and contol inputs. One "stage" is one step in the MPC horizon. Stage costs are shifted so that they are <= 0. Reason: reward = -cost is then >= 0 and therefore easier to interpret. @@ -45,14 +59,12 @@ def get_stage_cost(self, states: TensorType, inputs: TensorType, previous_input: stage_costs = self._get_stage_cost(states, inputs, previous_input) # Select all but last state of the horizon return stage_costs - self.MAX_COST # Could also normalize to [-1, 0]: - # (stage_costs - self.MIN_COST) / self.COST_RANGE - 1 - + # (stage_costs - self.MIN_COST) / self.COST_RANGE - 1 + def _get_stage_cost(self, states: TensorType, inputs: TensorType, previous_input: TensorType) -> TensorType: raise NotImplementedError("To be implemented in subclass.") - def get_trajectory_cost( - self, state_horizon: TensorType, inputs: TensorType, previous_input: TensorType = None - ) -> TensorType: + def get_trajectory_cost(self, state_horizon: TensorType, inputs: TensorType, previous_input: TensorType = None, time:float=None) -> TensorType: """Helper function which computes a batch of the summed cost of a trajectory. Can be overwritten in a subclass, e.g. if weighted sum is required. The batch dimension is used to compute for multiple rollouts in parallel. @@ -63,12 +75,20 @@ def get_trajectory_cost( :type inputs: TensorType :param previous_input: The most recent actually applied control, defaults to None :type previous_input: TensorType, optional + :param time: the time in seconds + :type time: float + :return: The summed cost of the trajectory. Has shape [batch_size]. :rtype: TensorType """ - stage_costs = self.get_stage_cost(state_horizon[:, :-1, :], inputs, previous_input) # Select all but last state of the horizon - terminal_cost = self.lib.reshape(self.get_terminal_cost(state_horizon[:, -1, :]), (-1, 1)) - total_cost = self.lib.mean(self.lib.concat([stage_costs, terminal_cost], 1), 1) # Average across the MPC horizon dimension + # Select all but last state of the horizon + # stages costs has dimension [num_rollouts, horizon, states] + stage_costs = self.get_stage_cost(state_horizon[:, :-1, :], inputs, previous_input) + # select last states of horizon for all rollouts + # compute the cost of these terminal states, result is ??? + # terminal_cost = self.lib.reshape(self.get_terminal_cost(state_horizon[:, -1, :]), (-1, 1)) + # total_cost = self.lost = self.lib.reshape(self.get_terminal_cost(state_horizon[:, -1, :]), (-1, 1)) + total_cost = self.lib.mean(stage_costs, 1) # Average across the MPC horizon dimension to leave a 1-d vector of num_rollouts dimension return total_cost def set_computation_library(self, ComputationLib: "type[ComputationLibrary]"): @@ -76,3 +96,4 @@ def set_computation_library(self, ComputationLib: "type[ComputationLibrary]"): if not ComputationLib in self.supported_computation_libraries: raise ValueError(f"The cost function {self.__class__.__name__} does not support {ComputationLib.__name__}") self.lib = ComputationLib + diff --git a/Cost_Functions/cost_function_wrapper.py b/Cost_Functions/cost_function_wrapper.py index 423b9cf..b484190 100644 --- a/Cost_Functions/cost_function_wrapper.py +++ b/Cost_Functions/cost_function_wrapper.py @@ -1,42 +1,54 @@ from importlib import import_module import os from SI_Toolkit.computation_library import TensorType -import yaml -from copy import deepcopy as dcp -from types import MappingProxyType from Control_Toolkit.Controllers import template_controller from Control_Toolkit.Cost_Functions import cost_function_base - -# cost_function config -cost_function_config = yaml.load( - open(os.path.join("Control_Toolkit_ASF", "config_cost_function.yml"), "r"), - Loader=yaml.FullLoader, -) - +from others.globals_and_utils import load_or_reload_config_if_modified, update_attributes +from Control_Toolkit.others.get_logger import get_logger +log=get_logger(__name__) class CostFunctionWrapper: """ Wrapper class for cost functions. It allows the creation of an instance with deferred specification which specific class containing cost functions is to be used. - + Usage: 1) Instantiate this wrapper in controller. 2) Call `configure` with a reference to the controller instance and the name of the cost function, once this is known to the controller. - - You can do both steps + + You can do both steps """ def __init__(self): self.cost_function = None - self.cost_function_name_default: str = cost_function_config[ - "cost_function_name_default" - ] + # cost_function config + (self.cost_function_config, _) = load_or_reload_config_if_modified( + os.path.join("Control_Toolkit_ASF", "config_cost_functions.yml"), search_path=['CartPoleSimulation']) # if run from physical-cartpole, need to find it relative to CartPoleSimulation submodule + + self.cost_function_name_default: str = self.cost_function_config.cost_function_name_default + self.lib = None # filled by configure(), needed to update TF variables + + log.debug(f'default cost function name is {self.cost_function_name_default}') + # cost_function config + (self.cost_function_config, _) = load_or_reload_config_if_modified( + os.path.join("Control_Toolkit_ASF", "config_cost_functions.yml"),search_path=['CartPoleSimulation']) + + self.cost_function_name_default: str = self.cost_function_config.cost_function_name_default + self.lib = None # filled by configure(), needed to update TF variables + + log.debug(f'default cost function name is {self.cost_function_name_default}') def configure( self, controller: template_controller, cost_function_specification: str=None, ): + """ + Configures the cost function. TODO This lazy constructor is needed why? + + :param controller: the controller that uses this cost function + :param cost_function_specification: the string name of the cost function class, to construct the class and find the config values in config_cost_functions.yml + """ """Configure this wrapper as part of a controller. :param controller: Reference to the controller instance this wrapper is used by @@ -59,6 +71,12 @@ def configure( cost_function_module, self.cost_function_name )(controller, computation_library) + self.lib=self.cost_function.lib + + config=self.cost_function_config['CartPole'][self.cost_function_name] # todo hardcoded 'CartPole' has to go, not sure how to determine it, maybe from module folder? + update_attributes(config, self.cost_function) + log.debug(f'configured controller {controller.__class__.__name__} with cost function {self.cost_function.__class__}') + def update_cost_function_name_from_specification( self, environment_name: str, cost_function_specification: str = None ): @@ -80,10 +98,9 @@ def get_stage_cost(self, states: TensorType, inputs: TensorType, previous_input: return self.cost_function.get_stage_cost(states, inputs, previous_input) def get_trajectory_cost( - self, state_horizon: TensorType, inputs: TensorType, previous_input: TensorType = None - ): + self, state_horizon: TensorType, inputs: TensorType, previous_input: TensorType = None, config:dict=None, time:float=None): """Refer to :func:`the base cost function `""" - return self.cost_function.get_trajectory_cost(state_horizon, inputs, previous_input) + return self.cost_function.get_trajectory_cost(state_horizon, inputs, previous_input, time=time) def copy(self): """ diff --git a/Optimizers/__init__.py b/Optimizers/__init__.py index aecd35d..91875ba 100644 --- a/Optimizers/__init__.py +++ b/Optimizers/__init__.py @@ -54,7 +54,7 @@ def configure(self, **kwargs): """Pass any additional arguments from the controller to the optimizer.""" pass - def step(self, s: np.ndarray, time=None): + def step(self, s: np.ndarray, time=None, updated_attributes: "dict[str, TensorType]" = {}): raise NotImplementedError("Implement this function in a subclass.") def optimizer_reset(self): diff --git a/Optimizers/optimizer_cem_gmm_tf.py b/Optimizers/optimizer_cem_gmm_tf.py index b371414..90d4b56 100644 --- a/Optimizers/optimizer_cem_gmm_tf.py +++ b/Optimizers/optimizer_cem_gmm_tf.py @@ -26,7 +26,7 @@ def __init__( mpc_horizon: int, cem_outer_it: int, cem_initial_action_stdev: float, - num_rollouts: int, + batch_size: int, cem_stdev_min: float, cem_best_k: int, optimizer_logging: bool, @@ -39,7 +39,7 @@ def __init__( control_limits=control_limits, optimizer_logging=optimizer_logging, seed=seed, - num_rollouts=num_rollouts, + batch_size=batch_size, mpc_horizon=mpc_horizon, computation_library=computation_library, ) @@ -66,7 +66,7 @@ def update_distribution(self, s: tf.Tensor, Q: tf.Tensor, traj_cost: tf.Tensor, #rollout the trajectories and get cost traj_cost, rollout_trajectory = self.predict_and_cost(s, Q) - rollout_trajectory = tf.ensure_shape(rollout_trajectory, [self.num_rollouts, self.mpc_horizon+1, self.num_states]) + rollout_trajectory = tf.ensure_shape(rollout_trajectory, [self.num_rollouts, self.mpc_horizon + 1, self.num_states]) #sort the costs and find best k costs sorted_cost = tf.argsort(traj_cost) @@ -105,7 +105,7 @@ def step(self, s: np.ndarray, time=None): s = np.tile(s, tf.constant([self.num_rollouts, 1])) s = tf.convert_to_tensor(s, dtype=tf.float32) Q = tf.zeros((self.num_rollouts, self.mpc_horizon, self.num_control_inputs), dtype=tf.float32) - rollout_trajectory = tf.zeros((self.num_rollouts, self.mpc_horizon+1, self.num_states), dtype=tf.float32) + rollout_trajectory = tf.zeros((self.num_rollouts, self.mpc_horizon + 1, self.num_states), dtype=tf.float32) traj_cost = tf.zeros((self.num_rollouts), dtype=tf.float32) for _ in range(0, self.cem_outer_it): diff --git a/Optimizers/optimizer_cem_grad_bharadhwaj_tf.py b/Optimizers/optimizer_cem_grad_bharadhwaj_tf.py index e56e999..f61a9c4 100644 --- a/Optimizers/optimizer_cem_grad_bharadhwaj_tf.py +++ b/Optimizers/optimizer_cem_grad_bharadhwaj_tf.py @@ -27,7 +27,7 @@ def __init__( seed: int, mpc_horizon: int, cem_outer_it: int, - num_rollouts: int, + batch_size: int, cem_initial_action_stdev: float, cem_stdev_min: float, cem_best_k: int, @@ -48,7 +48,7 @@ def __init__( control_limits=control_limits, optimizer_logging=optimizer_logging, seed=seed, - num_rollouts=num_rollouts, + batch_size=batch_size, mpc_horizon=mpc_horizon, computation_library=computation_library, ) diff --git a/Optimizers/optimizer_cem_naive_grad_tf.py b/Optimizers/optimizer_cem_naive_grad_tf.py index 9624994..9181928 100644 --- a/Optimizers/optimizer_cem_naive_grad_tf.py +++ b/Optimizers/optimizer_cem_naive_grad_tf.py @@ -26,7 +26,7 @@ def __init__( seed: int, mpc_horizon: int, cem_outer_it: int, - num_rollouts: int, + batch_size: int, cem_initial_action_stdev: float, cem_stdev_min: float, cem_best_k: int, @@ -42,7 +42,7 @@ def __init__( control_limits=control_limits, optimizer_logging=optimizer_logging, seed=seed, - num_rollouts=num_rollouts, + batch_size=batch_size, mpc_horizon=mpc_horizon, computation_library=computation_library, ) diff --git a/Optimizers/optimizer_cem_tf.py b/Optimizers/optimizer_cem_tf.py index 9c3a3ea..38ca38d 100644 --- a/Optimizers/optimizer_cem_tf.py +++ b/Optimizers/optimizer_cem_tf.py @@ -25,7 +25,7 @@ def __init__( mpc_horizon: int, cem_outer_it: int, cem_initial_action_stdev: float, - num_rollouts: int, + batch_size: int, cem_stdev_min: float, cem_best_k: int, warmup: bool, @@ -40,7 +40,7 @@ def __init__( control_limits=control_limits, optimizer_logging=optimizer_logging, seed=seed, - num_rollouts=num_rollouts, + batch_size=batch_size, mpc_horizon=mpc_horizon, computation_library=computation_library, ) @@ -65,13 +65,13 @@ def predict_and_cost(self, s, Q): @CompileTF def update_distribution(self, s: tf.Tensor, Q: tf.Tensor, traj_cost: tf.Tensor, rollout_trajectory: tf.Tensor, dist_mue: tf.Tensor, stdev: tf.Tensor, rng: tf.random.Generator): #generate random input sequence and clip to control limits - Q = tf.tile(dist_mue,(self.num_rollouts,1,1)) + tf.multiply(rng.normal( + Q = tf.tile(dist_mue, (self.num_rollouts, 1, 1)) + tf.multiply(rng.normal( shape=(self.num_rollouts, self.mpc_horizon, self.num_control_inputs), dtype=tf.float32), stdev) Q = tf.clip_by_value(Q, self.action_low, self.action_high) #rollout the trajectories and get cost traj_cost, rollout_trajectory = self.predict_and_cost(s, Q) - rollout_trajectory = tf.ensure_shape(rollout_trajectory, [self.num_rollouts, self.mpc_horizon+1, self.num_states]) + rollout_trajectory = tf.ensure_shape(rollout_trajectory, [self.num_rollouts, self.mpc_horizon + 1, self.num_states]) #sort the costs and find best k costs sorted_cost = tf.argsort(traj_cost) @@ -90,7 +90,7 @@ def step(self, s: np.ndarray, time=None): s = np.tile(s, tf.constant([self.num_rollouts, 1])) s = tf.convert_to_tensor(s, dtype=tf.float32) Q = tf.zeros((self.num_rollouts, self.mpc_horizon, self.num_control_inputs), dtype=tf.float32) - rollout_trajectory = tf.zeros((self.num_rollouts, self.mpc_horizon+1, self.num_states), dtype=tf.float32) + rollout_trajectory = tf.zeros((self.num_rollouts, self.mpc_horizon + 1, self.num_states), dtype=tf.float32) traj_cost = tf.zeros((self.num_rollouts), dtype=tf.float32) iterations = self.warmup_iterations if self.warmup and self.count == 0 else self.cem_outer_it diff --git a/Optimizers/optimizer_gradient_tf.py b/Optimizers/optimizer_gradient_tf.py index a70346e..ab9aa1d 100644 --- a/Optimizers/optimizer_gradient_tf.py +++ b/Optimizers/optimizer_gradient_tf.py @@ -23,7 +23,7 @@ def __init__( seed: int, mpc_horizon: int, gradient_steps: int, - num_rollouts: int, + batch_size: int, initial_action_stdev: float, learning_rate: float, adam_beta_1: float, @@ -43,7 +43,7 @@ def __init__( control_limits=control_limits, optimizer_logging=optimizer_logging, seed=seed, - num_rollouts=num_rollouts, + batch_size=batch_size, mpc_horizon=mpc_horizon, computation_library=computation_library, ) diff --git a/Optimizers/optimizer_mppi.py b/Optimizers/optimizer_mppi.py index 53cc18a..2420316 100644 --- a/Optimizers/optimizer_mppi.py +++ b/Optimizers/optimizer_mppi.py @@ -1,16 +1,35 @@ from typing import Tuple -from SI_Toolkit.computation_library import ComputationLibrary, NumpyLibrary, TensorFlowLibrary, PyTorchLibrary + +from Control_Toolkit.others.globals_and_utils import get_logger +from SI_Toolkit.computation_library import ComputationLibrary, NumpyLibrary, TensorFlowLibrary, PyTorchLibrary, \ + TensorType import numpy as np from Control_Toolkit.Cost_Functions.cost_function_wrapper import CostFunctionWrapper from Control_Toolkit.Optimizers import template_optimizer -from Control_Toolkit.others.globals_and_utils import CompileAdaptive from Control_Toolkit.others.Interpolator import Interpolator from SI_Toolkit.Predictors.predictor_wrapper import PredictorWrapper +from SI_Toolkit.Functions.TF.Compile import CompileAdaptive +log = get_logger(__name__) class optimizer_mppi(template_optimizer): + """ Model Predictive Path Integral optimizer, based on + + The equations and parameters are defined in the following: + + Williams, G., P. Drews, B. Goldfain, J. M. Rehg, and E. A. Theodorou. 2016. “Aggressive Driving with Model Predictive Path Integral Control.” In 2016 IEEE International Conference on Robotics and Automation (ICRA), 1433–40. https://doi.org/10.1109/ICRA.2016.7487277. + + A longer paper with all the math is + + Williams, Grady, Andrew Aldrich, and Evangelos A. Theodorou. 2017. “Model Predictive Path Integral Control: From Theory to Parallel Computation.” Journal of Guidance, Control, and Dynamics: A Publication of the American Institute of Aeronautics and Astronautics Devoted to the Technology of Dynamics and Control 40 (2): 344–57. https://doi.org/10.2514/1.G001921. + + The following paper uses a LWPR model to fly crazyfly drones using another type of MPPI with minibatches of rollouts + + Williams, Grady, Eric Rombokas, and Tom Daniel. 2015. “GPU Based Path Integral Control with Learned Dynamics.” arXiv [cs.RO]. arXiv. http://arxiv.org/abs/1503.00330. + + """ supported_computation_libraries = {NumpyLibrary, TensorFlowLibrary, PyTorchLibrary} def __init__( @@ -53,8 +72,8 @@ def __init__( :type cc_weight: float :param R: Weight of quadratic cost term. :type R: float - :param LBD: Positive parameter defining greediness of the optimizer in the weighted sum of rollouts. - If lambda is strongly positive, all trajectories get roughly the same weight even if one has much lower cost than the other. + :param LBD: Positive parameter defining greediness of the optimizer in the weighted sum of rollouts. + If lambda is strongly positive, all trajectories get roughly the same weight even if one has much lower cost than the other. As lambda approaches 0, the relative importance of the most low-cost trajectories when determining the weighted average increases. :type LBD: float :param mpc_horizon: Length of the MPC horizon in steps. dt is implicitly given by the predictor provided. @@ -90,10 +109,10 @@ def __init__( # MPPI parameters - self.cc_weight = cc_weight - self.R = self.lib.to_tensor(R, self.lib.float32) - self.LBD = LBD - self.NU = self.lib.to_tensor(NU, self.lib.float32) + self.cc_weight = self.lib.to_variable(cc_weight, self.lib.float32) + self.R = self.lib.to_variable(R, self.lib.float32) + self.LBD = self.lib.to_variable(LBD, self.lib.float32) + self.NU = self.lib.to_variable(NU, self.lib.float32) self._SQRTRHOINV = SQRTRHOINV self.update_internal_state = self.update_internal_state_of_RNN # FIXME: There is one unnecessary operation in this function in case it is not an RNN. @@ -105,6 +124,8 @@ def __init__( self.Interpolator = Interpolator(self.mpc_horizon, period_interpolation_inducing_points, self.num_control_inputs, self.lib) + # here the predictor, cost computer, and optimizer are compiled to native instrutions by tensorflow graphs and XLA JIT + self.predict_and_cost = CompileAdaptive(self._predict_and_cost) self.predict_optimal_trajectory = CompileAdaptive(self._predict_optimal_trajectory) @@ -132,13 +153,24 @@ def check_dimensions_s(self, s): return s #mppi correction - def mppi_correction_cost(self, u, delta_u): + def mppi_correction_cost(self, u, delta_u, time=None): return self.lib.sum(self.cc_weight * (0.5 * (1 - 1.0 / self.NU) * self.R * (delta_u ** 2) + self.R * u * delta_u + 0.5 * self.R * (u ** 2)), (1, 2)) #total cost of the trajectory - def get_mppi_trajectory_cost(self, state_horizon ,u, u_prev, delta_u): - total_cost = self.cost_function.get_trajectory_cost(state_horizon,u, u_prev) - total_mppi_cost = total_cost + self.mppi_correction_cost(u, delta_u) + def get_mppi_trajectory_cost(self, state_horizon ,u, u_prev, delta_u, time:float=None): + """ Compute the total trajectory costs for all the rollouts + + :param state_horizon: the states as [rollouts,timesteps,states] + :param u: the control as ??? TODO + :param u_prev: the previous control input + :param delta_u: change in control input, TODO passed in for efficiency? + :param time: the time in seconds + + :returns: the total mppi cost for each rollout, i.e. 1d-vector of costs per rollout + """ + total_cost = self.cost_function.get_trajectory_cost(state_horizon,u, u_prev,time=time) + mppi_correction_cost =self.mppi_correction_cost(u, delta_u, time=time) + total_mppi_cost = total_cost +mppi_correction_cost return total_mppi_cost def reward_weighted_average(self, S, delta_u): @@ -159,18 +191,28 @@ def inizialize_pertubation(self, random_gen): return delta_u - def _predict_and_cost(self, s, u_nom, random_gen, u_old): - s = self.lib.tile(s, (self.num_rollouts, 1)) + def _predict_and_cost(self, state:TensorType, u_nom:TensorType, random_gen, u_old:TensorType, time:float=None): + """ Predict dynamics and compute costs of trajectories + + :param state: the current state of system, dimensions are [rollouts, timesteps, states] + :param u_nom: the nominal control input + :param random_gen: the random generator + :param u_old: previous control input + :param time: time in seconds + + :returns: u, u_nom: the new control input TODO what are u and u_nom? + """ + state = self.lib.tile(state, (self.num_rollouts, 1)) # generate random input sequence and clip to control limits u_nom = self.lib.concat([u_nom[:, 1:, :], u_nom[:, -1:, :]], 1) delta_u = self.inizialize_pertubation(random_gen) u_run = self.lib.tile(u_nom, (self.num_rollouts, 1, 1))+delta_u u_run = self.lib.clip(u_run, self.action_low, self.action_high) - rollout_trajectory = self.predictor.predict_tf(s, u_run) - traj_cost = self.get_mppi_trajectory_cost(rollout_trajectory, u_run, u_old, delta_u) + rollout_trajectory = self.predictor.predict_tf(state, u_run, time=time) + traj_cost = self.get_mppi_trajectory_cost(rollout_trajectory, u_run, u_old, delta_u, time=time) u_nom = self.lib.clip(u_nom + self.reward_weighted_average(traj_cost, delta_u), self.action_low, self.action_high) u = u_nom[0, 0, :] - self.update_internal_state(s, u_nom) + self.update_internal_state(state, u_nom) return self.mppi_output(u, u_nom, rollout_trajectory, traj_cost, u_run) def update_internal_state_of_RNN(self, s, u_nom): @@ -183,15 +225,27 @@ def _predict_optimal_trajectory(self, s, u_nom): return optimal_trajectory #step function to find control - def step(self, s: np.ndarray, time=None): + def step(self, state: np.ndarray, time=None): + """ Does one timestep of control + + :param state: the current state + :param time: the current time in seconds + + :returns: u, the new control input to system + """ if self.optimizer_logging: - self.logging_values = {"s_logged": s.copy()} - s = self.lib.to_tensor(s, self.lib.float32) - s = self.check_dimensions_s(s) + self.logging_values = {"s_logged": state.copy()} + state = self.lib.to_tensor(state, self.lib.float32) + state = self.check_dimensions_s(state) + + tf_time=self.lib.to_tensor(time,self.lib.float32) # must pass scalar tensor for time to prevent recompiling tensorflow functions over and over - self.u, self.u_nom, self.rollout_trajectories, traj_cost, u_run = self.predict_and_cost(s, self.u_nom, self.rng, self.u) + self.u, self.u_nom, rollout_trajectory, traj_cost, u_run = self.predict_and_cost(state, self.u_nom, self.rng, self.u, time=tf_time) self.u = self.lib.to_numpy(self.lib.squeeze(self.u)) + # print(f'mean traj cost={np.mean(traj_cost.numpy()):.2f}') # todo debug + + if self.optimizer_logging: self.logging_values["Q_logged"] = self.lib.to_numpy(u_run) self.logging_values["J_logged"] = self.lib.to_numpy(traj_cost) @@ -199,7 +253,7 @@ def step(self, s: np.ndarray, time=None): self.logging_values["u_logged"] = self.u if False: - self.optimal_trajectory = self.lib.to_numpy(self.predict_optimal_trajectory(s, self.u_nom)) + self.optimal_trajectory = self.lib.to_numpy(self.predict_optimal_trajectory(state, self.u_nom)) return self.u diff --git a/Optimizers/optimizer_mppi_optimize_tf.py b/Optimizers/optimizer_mppi_optimize_tf.py index 180eee0..e27fc81 100644 --- a/Optimizers/optimizer_mppi_optimize_tf.py +++ b/Optimizers/optimizer_mppi_optimize_tf.py @@ -27,7 +27,7 @@ def __init__( R: float, LBD: float, mpc_horizon: int, - num_rollouts: int, + batch_size: int, NU: float, SQRTRHOINV: float, gradmax_clip: float, @@ -47,7 +47,7 @@ def __init__( control_limits=control_limits, optimizer_logging=optimizer_logging, seed=seed, - num_rollouts=num_rollouts, + batch_size=batch_size, mpc_horizon=mpc_horizon, computation_library=computation_library, ) diff --git a/Optimizers/optimizer_mppi_var_tf.py b/Optimizers/optimizer_mppi_var_tf.py index 3edd3c3..c6618e8 100644 --- a/Optimizers/optimizer_mppi_var_tf.py +++ b/Optimizers/optimizer_mppi_var_tf.py @@ -28,7 +28,7 @@ def __init__( R: float, LBD_mc: float, mpc_horizon: int, - num_rollouts: int, + batch_size: int, NU_mc: float, SQRTRHOINV_mc: float, LR: float, @@ -46,7 +46,7 @@ def __init__( control_limits=control_limits, optimizer_logging=optimizer_logging, seed=seed, - num_rollouts=num_rollouts, + batch_size=batch_size, mpc_horizon=mpc_horizon, computation_library=computation_library, ) diff --git a/Optimizers/optimizer_random_action_tf.py b/Optimizers/optimizer_random_action_tf.py index b7f18e2..43d1124 100644 --- a/Optimizers/optimizer_random_action_tf.py +++ b/Optimizers/optimizer_random_action_tf.py @@ -22,7 +22,7 @@ def __init__( computation_library: "type[ComputationLibrary]", seed: int, mpc_horizon: int, - num_rollouts: int, + batch_size: int, optimizer_logging: bool, ): super().__init__( @@ -33,7 +33,7 @@ def __init__( control_limits=control_limits, optimizer_logging=optimizer_logging, seed=seed, - num_rollouts=num_rollouts, + batch_size=batch_size, mpc_horizon=mpc_horizon, computation_library=computation_library, ) diff --git a/Optimizers/optimizer_rpgd_me_tf.py b/Optimizers/optimizer_rpgd_me_tf.py index 8fd70b5..e352781 100644 --- a/Optimizers/optimizer_rpgd_me_tf.py +++ b/Optimizers/optimizer_rpgd_me_tf.py @@ -6,11 +6,12 @@ import tensorflow_probability as tfp from Control_Toolkit.Cost_Functions.cost_function_wrapper import CostFunctionWrapper from Control_Toolkit.Optimizers import template_optimizer -from Control_Toolkit.others.globals_and_utils import CompileTF, get_logger +from Control_Toolkit.others.globals_and_utils import CompileTF from Control_Toolkit.others.Interpolator import Interpolator from SI_Toolkit.Predictors.predictor_wrapper import PredictorWrapper -logger = get_logger(__name__) +from Control_Toolkit.others.get_logger import get_logger +log = get_logger(__name__) class optimizer_rpgd_me_tf(template_optimizer): diff --git a/Optimizers/optimizer_rpgd_ml_tf.py b/Optimizers/optimizer_rpgd_ml_tf.py index 85baa8f..c966f99 100644 --- a/Optimizers/optimizer_rpgd_ml_tf.py +++ b/Optimizers/optimizer_rpgd_ml_tf.py @@ -6,11 +6,12 @@ import tensorflow_probability as tfp from Control_Toolkit.Cost_Functions.cost_function_wrapper import CostFunctionWrapper from Control_Toolkit.Optimizers import template_optimizer -from Control_Toolkit.others.globals_and_utils import CompileTF, get_logger +from Control_Toolkit.others.globals_and_utils import CompileTF from Control_Toolkit.others.Interpolator import Interpolator from SI_Toolkit.Predictors.predictor_wrapper import PredictorWrapper -logger = get_logger(__name__) +from Control_Toolkit.others.get_logger import get_logger +log = get_logger(__name__) class optimizer_rpgd_ml_tf(template_optimizer): @@ -258,7 +259,7 @@ def step(self, s: np.ndarray, time=None): tf.zeros(self.num_rollouts - self.opt_keep_k, dtype=tf.int32), tf.gather(self.trajectory_ages, best_idx, axis=0), ], axis=0) - + if len(adam_weights) > 0: wk1 = tf.concat( [ diff --git a/Optimizers/optimizer_rpgd_particle_tf.py b/Optimizers/optimizer_rpgd_particle_tf.py index a22fdf5..c7f388a 100644 --- a/Optimizers/optimizer_rpgd_particle_tf.py +++ b/Optimizers/optimizer_rpgd_particle_tf.py @@ -5,11 +5,12 @@ import tensorflow as tf from Control_Toolkit.Cost_Functions.cost_function_wrapper import CostFunctionWrapper from Control_Toolkit.Optimizers import template_optimizer -from Control_Toolkit.others.globals_and_utils import CompileTF, get_logger +from Control_Toolkit.others.globals_and_utils import CompileTF from Control_Toolkit.others.Interpolator import Interpolator from SI_Toolkit.Predictors.predictor_wrapper import PredictorWrapper -logger = get_logger(__name__) +from Control_Toolkit.others.get_logger import get_logger +log = get_logger(__name__) class optimizer_rpgd_particle_tf(template_optimizer): @@ -94,7 +95,7 @@ def __init__( self.theta_max = tf.repeat(tf.expand_dims(self.action_high, 1), 2, 1) else: raise ValueError(f"Unsupported sampling distribution {self.SAMPLING_DISTRIBUTION}") - + self.optimizer_reset() def predict_and_cost(self, s: tf.Tensor, Q: tf.Variable): @@ -160,7 +161,7 @@ def get_action(self, s: tf.Tensor, Q: tf.Variable): sorted_cost = tf.argsort(traj_cost) best_idx = sorted_cost[: self.opt_keep_k] - # Warmstart for next iteration + # Warmstart for next iteration Qn = tf.concat([Q[:, 1:, :], Q[:, -1:, :]], axis=1) return Qn, best_idx, traj_cost, rollout_trajectory @@ -172,9 +173,9 @@ def index_to_2d(n: int, c: Union[tf.Tensor, int]) -> Union[tf.Tensor, int]: [tf.math.mod(c, n), tf.math.floormod(c, n)], axis=1, ) - - - + + + @CompileTF def get_plans_to_resample(self, Qn: tf.Tensor, terminal_states: tf.Tensor, number_of_plans: int) -> tf.Tensor: """Find out which of the terminal states are in the least dense region. @@ -197,11 +198,11 @@ def get_plans_to_resample(self, Qn: tf.Tensor, terminal_states: tf.Tensor, numbe distances_min = tf.reduce_min(distances, axis=1) # indices_of_min = tf.where(distances == tf.repeat(distances_min[:, tf.newaxis], batch_size, axis=1)) # Sanity check: Is np.all(tf.gather_nd(distances, indices_of_min) == distances_min) == True? - + # Determine which of the plans to gather gather_indices = tf.argsort(distances_min, direction="DESCENDING")[:number_of_plans] - - return tf.gather(Qn, gather_indices, axis=0) + + return tf.gather(Qn, gather_indices, axis=0) def step(self, s: np.ndarray, time=None): if self.optimizer_logging: diff --git a/Optimizers/optimizer_rpgd_tf.py b/Optimizers/optimizer_rpgd_tf.py index 192a2e9..3e1a2cf 100644 --- a/Optimizers/optimizer_rpgd_tf.py +++ b/Optimizers/optimizer_rpgd_tf.py @@ -1,18 +1,21 @@ from typing import Tuple + from SI_Toolkit.computation_library import ComputationLibrary, TensorFlowLibrary import numpy as np import tensorflow as tf from Control_Toolkit.Cost_Functions.cost_function_wrapper import CostFunctionWrapper from Control_Toolkit.Optimizers import template_optimizer -from Control_Toolkit.others.globals_and_utils import CompileTF, get_logger +from SI_Toolkit.Functions.TF.Compile import CompileTF from Control_Toolkit.others.Interpolator import Interpolator from SI_Toolkit.Predictors.predictor_wrapper import PredictorWrapper -logger = get_logger(__name__) +from Control_Toolkit.others.get_logger import get_logger +log = get_logger(__name__) class optimizer_rpgd_tf(template_optimizer): + """ The tensorflow (tf) version of the Resampling Parallel Gradient Descent (RPGD) optimizer reported in ICRA paper """ supported_computation_libraries = {TensorFlowLibrary} def __init__( @@ -75,7 +78,7 @@ def __init__( self.Interpolator = Interpolator(self.mpc_horizon, period_interpolation_inducing_points, self.num_control_inputs, self.lib) - self.opt = tf.keras.optimizers.Adam( + self.opt:tf.keras.optimizers.Adam = tf.keras.optimizers.Adam( learning_rate=learning_rate, beta_1=adam_beta_1, beta_2=adam_beta_2, @@ -136,7 +139,7 @@ def grad_step( def get_action(self, s: tf.Tensor, Q: tf.Variable): # Rollout trajectories and retrieve cost traj_cost, rollout_trajectory = self.predict_and_cost(s, Q) - + # sort the costs and find best k costs sorted_cost = tf.argsort(traj_cost) best_idx = sorted_cost[: self.opt_keep_k] diff --git a/README.md b/README.md index 6d712ac..d7d2abe 100644 --- a/README.md +++ b/README.md @@ -76,6 +76,8 @@ The toolkit provides a uniform interface to log values in the controller. These The `controller_mpc.step` method takes the `optimizer.logging_values` dictionary and copies it to its `controller_mpc.logs` dictionary in each step. The `template_controller` has two related attributes: `controller_logging` and `save_vars`. If the former is `true`, then the controller populates the fields of `save_vars` in the `template_controller.logs` dictionary with values if your controller calls `update_logs` within the `step` method. +The resulting dict (stored in RAM during runtime) is written out to disk on exit. + ## Examples of Application-Specific Controllers diff --git a/others/environment.py b/others/environment.py index 80c0e8a..9dbef5c 100644 --- a/others/environment.py +++ b/others/environment.py @@ -1,13 +1,11 @@ from typing import Any, Optional, Tuple, Union import numpy as np -import tensorflow as tf -import torch from SI_Toolkit.computation_library import ComputationLibrary, TensorType -from Control_Toolkit.others.globals_and_utils import get_logger from gymnasium.spaces import Box +from Control_Toolkit.others.get_logger import get_logger log = get_logger(__name__) @@ -132,6 +130,6 @@ def set_computation_library(self, ComputationLib: "type[ComputationLibrary]"): @property def logs(self): return getattr(self, "_logs", dict()) - + def set_logs(self, logs: "dict[str, Any]"): self._logs = logs diff --git a/others/get_logger.py b/others/get_logger.py new file mode 100644 index 0000000..6546b15 --- /dev/null +++ b/others/get_logger.py @@ -0,0 +1,54 @@ +import logging +# general logger for all control/si_toolkit users. Produces nice output format with live hyperlinks for pycharm users +# to use it, just call log=get_logger() at the top of your python file +# all these loggers share the same logger name 'Control_Toolkit' + +LOGGING_LEVEL = logging.DEBUG # usually INFO is good +class CustomFormatter(logging.Formatter): + """Logging Formatter to add colors and count warning / errors""" + # see https://stackoverflow.com/questions/384076/how-can-i-color-python-logging-output/7995762#7995762 + + # \x1b[ (ESC[) is the CSI introductory sequence for ANSI https://en.wikipedia.org/wiki/ANSI_escape_code + # The control sequence CSI n m, named Select Graphic Rendition (SGR), sets display attributes. + grey = "\x1b[2;37m" # 2 faint, 37 gray + yellow = "\x1b[33;21m" + cyan = "\x1b[0;36m" # 0 normal 36 cyan + green = "\x1b[31;21m" # dark green + red = "\x1b[31;21m" # bold red + bold_red = "\x1b[31;1m" + light_blue = "\x1b[1;36m" + blue = "\x1b[1;34m" + reset = "\x1b[0m" + # File "{file}", line {max(line, 1)}'.replace("\\", "/") + format = '[%(levelname)s]: %(asctime)s - %(name)s - %(message)s (File "%(pathname)s", line %(lineno)d, in %(funcName)s)' + + FORMATS = { + logging.DEBUG: grey + format + reset, + logging.INFO: cyan + format + reset, + logging.WARNING: red + format + reset, + logging.ERROR: bold_red + format + reset, + logging.CRITICAL: bold_red + format + reset + } + + def format(self, record): + log_fmt = self.FORMATS.get(record.levelno) + formatter = logging.Formatter(log_fmt) + return formatter.format(record).replace("\\", "/") #replace \ with / for pycharm links + + +def get_logger(name='ControlToolkit'): + """ Use get_logger to define a logger with useful color output and info and warning turned on according to the global LOGGING_LEVEL. + + :param name: ignored -- all loggers here have the name 'ControlToolkit' so that all can be affected uniformly + + :returns: the logger. + """ + # logging.basicConfig(stream=sys.stdout, level=logging.INFO) + logger = logging.getLogger('ControlToolkit') # tobi changed so all have same name so we can uniformly affect all of them + logger.setLevel(LOGGING_LEVEL) + # create console handler if this logger does not have handler yet + if len(logger.handlers)==0: + ch = logging.StreamHandler() + ch.setFormatter(CustomFormatter()) + logger.addHandler(ch) + return logger \ No newline at end of file diff --git a/others/globals_and_utils.py b/others/globals_and_utils.py index 13ac619..57a7914 100644 --- a/others/globals_and_utils.py +++ b/others/globals_and_utils.py @@ -1,5 +1,4 @@ import glob -import logging import os from datetime import datetime from importlib import import_module @@ -9,49 +8,8 @@ import tensorflow as tf import torch from numpy.random import SFC64, Generator -from SI_Toolkit.Functions.TF.Compile import CompileTF, CompileAdaptive from SI_Toolkit.computation_library import ComputationLibrary, NumpyLibrary, TensorFlowLibrary, PyTorchLibrary - -LOGGING_LEVEL = logging.INFO - - -class CustomFormatter(logging.Formatter): - """Logging Formatter to add colors and count warning / errors""" - - grey = "\x1b[38;21m" - yellow = "\x1b[33;21m" - red = "\x1b[31;21m" - bold_red = "\x1b[31;1m" - reset = "\x1b[0m" - format = ( - "%(asctime)s - %(name)s - %(levelname)s - %(message)s (%(filename)s:%(lineno)d)" - ) - - FORMATS = { - logging.DEBUG: grey + format + reset, - logging.INFO: grey + format + reset, - logging.WARNING: yellow + format + reset, - logging.ERROR: red + format + reset, - logging.CRITICAL: bold_red + format + reset, - } - - def format(self, record): - log_fmt = self.FORMATS.get(record.levelno) - formatter = logging.Formatter(log_fmt) - return formatter.format(record) - - -def get_logger(name): - # logging.basicConfig(stream=sys.stdout, level=logging.INFO) - logger = logging.getLogger(name) - logger.setLevel(LOGGING_LEVEL) - # create console handler - ch = logging.StreamHandler() - ch.setFormatter(CustomFormatter()) - logger.addHandler(ch) - return logger - - +from Control_Toolkit.others.get_logger import get_logger log = get_logger(__name__) @@ -142,7 +100,7 @@ def import_controller_by_name(controller_name: str) -> type: assert len(controller_relative_paths) == 1, f"Controller {controller_full_name} must be in a unique location. {len(controller_relative_paths)} found." controller_relative_path = controller_relative_paths[0] - log.info(f"Importing controller from {controller_relative_path}") + log.debug(f"Importing controller from {controller_relative_path}") Controller: type = getattr(import_module(controller_relative_path.replace(".py", "").replace(os.sep, ".")), controller_full_name) return Controller @@ -201,7 +159,7 @@ def get_controller_name(controller_names=None, controller_name=None, controller_ try: controller_idx = controller_names.index(controller_name) except ValueError: - print('{} is not in list. \n In list are: {}'.format(controller_name, controller_names)) + log.error('{} is not in list. \n In list are: {}'.format(controller_name, controller_names)) return None else: controller_name = controller_names[controller_idx] @@ -239,4 +197,6 @@ def get_optimizer_name(optimizer_names=None, optimizer_name=None, optimizer_idx= else: optimizer_name = optimizer_names[optimizer_idx] - return optimizer_name, optimizer_idx \ No newline at end of file + return optimizer_name, optimizer_idx + + diff --git a/others/timer.py b/others/timer.py new file mode 100644 index 0000000..b83b6a9 --- /dev/null +++ b/others/timer.py @@ -0,0 +1,129 @@ +import atexit + +# a useful timer class to measure execution time statistics + +from engineering_notation import EngNumber as eng # only from pip +from matplotlib import pyplot as plt +import numpy as np +import time + +from get_logger import get_logger +log=get_logger(__name__) + +timers = {} +times = {} +class timer: + def __init__(self, timer_name='', delay=None, show_hist=False, numpy_file=None): + """ Make a timer() in a _with_ statement for a block of code. + The timer is started when the block is entered and stopped when exited. + + The accumulated timer statistics are automatically printed on exit with atexit (see end of this file). + + The timer _must_ be used in a with statement, e.g. + + while True: + timestr = time.strftime("%Y%m%d-%H%M") + with timer('overall consumer loop', numpy_file=f'{DATA_FOLDER}/consumer-frame-rate-{timestr}.npy', show_hist=True): + with timer('recieve UDP'): + # do something + + timers can be nested + + + :param timer_name: the str by which this timer is repeatedly called and which it is named when summary is printed on exit + :param delay: set this to a value to simply accumulate this externally determined interval + :param show_hist: whether to plot a histogram with pyplot + :param numpy_file: optional numpy file path + """ + self.timer_name = timer_name + self.show_hist = show_hist + self.numpy_file = numpy_file + self.delay=delay + + if self.timer_name not in timers.keys(): + timers[self.timer_name] = self + if self.timer_name not in times.keys(): + times[self.timer_name]=[] + + def __enter__(self): + if self.delay is None: + self.start = time.time() + return self + + def __exit__(self, *args): + if self.delay is None: + self.end = time.time() + self.interval = self.end - self.start # measured in seconds + else: + self.interval=self.delay + times[self.timer_name].append(self.interval) + + def print_timing_info(self, logger=None): + """ Prints the timing information accumulated for this timer + + :param logger: write to the supplied logger, otherwise use the built-in logger + """ + if len(times)==0: + log.error(f'Timer {self.timer_name} has no statistics; was it used without a "with" statement?') + return + a = np.array(times[self.timer_name]) + timing_mean = np.mean(a) # todo use built in print method for timer + timing_std = np.std(a) + timing_median = np.median(a) + timing_min = np.min(a) + timing_max = np.max(a) + s='{} n={}: {}s +/- {}s (median {}s, min {}s max {}s)'.format(self.timer_name, len(a), + eng(timing_mean), eng(timing_std), + eng(timing_median), eng(timing_min), + eng(timing_max)) + + if logger is not None: + logger.info(s) + else: + log.info(s) + +def print_timing_info(): + for k,v in times.items(): # k is the name, v is the list of times + a = np.array(v) + timing_mean = np.mean(a) + timing_std = np.std(a) + timing_median = np.median(a) + timing_min = np.min(a) + timing_max = np.max(a) + log.info('== Timing statistics from all Timer ==\n{} n={}: {}s +/- {}s (median {}s, min {}s max {}s)'.format(k, len(a), + eng(timing_mean), eng(timing_std), + eng(timing_median), eng(timing_min), + eng(timing_max))) + if timers[k].numpy_file is not None: + try: + log.info(f'saving timing data for {k} in numpy file {timers[k].numpy_file}') + log.info('there are {} times'.format(len(a))) + np.save(timers[k].numpy_file, a) + except Exception as e: + log.error(f'could not save numpy file {timers[k].numpy_file}; caught {e}') + + if timers[k].show_hist: + + def plot_loghist(x, bins): + hist, bins = np.histogram(x, bins=bins) # histogram x linearly + if len(bins)<2 or bins[0]<=0: + log.error(f'cannot plot histogram since bins={bins}') + return + logbins = np.logspace(np.log10(bins[0]), np.log10(bins[-1]), len(bins)) # use resulting bin ends to get log bins + plt.hist(x, bins=logbins) # now again histogram x, but with the log-spaced bins, and plot this histogram + plt.xscale('log') + + dt = np.clip(a,1e-6, None) + # logbins = np.logspace(np.log10(bins[0]), np.log10(bins[-1]), len(bins)) + try: + plot_loghist(dt,bins=100) + plt.xlabel('interval[ms]') + plt.ylabel('frequency') + plt.title(k) + plt.show() + except Exception as e: + log.error(f'could not plot histogram: got {e}') + + +# this will print all the timer values upon termination of any program that imported this file +atexit.register(print_timing_info) diff --git a/others/yes_or_no.py b/others/yes_or_no.py new file mode 100644 index 0000000..bf2aa32 --- /dev/null +++ b/others/yes_or_no.py @@ -0,0 +1,62 @@ +# useful utilies to ask question at console terminal with default answer and timeout + +import signal +import time + +def alarm_handler(signum, frame): + raise TimeoutError +def input_with_timeout(prompt, timeout=30): + """ get input with timeout + + :param prompt: the prompt to print + :param timeout: timeout in seconds, or None to disable + + :returns: the input + :raises: TimeoutError if times out + """ + # set signal handler + if timeout is not None: + signal.signal(signal.SIGALRM, alarm_handler) + signal.alarm(timeout) # produce SIGALRM in `timeout` seconds + try: + time.sleep(.5) # get input to be printed after logging + return input(prompt) + except TimeoutError as to: + raise to + finally: + if timeout is not None: + signal.alarm(0) # cancel alarm + +def yes_or_no(question, default='y', timeout=None): + """ Get y/n answer with default choice and optional timeout + + :param question: prompt + :param default: the default choice, i.e. 'y' or 'n' + :param timeout: the timeout in seconds, default is None + + :returns: True or False + """ + if default is not None and (default!='y' and default!='n'): + log.error(f'bad option for default: {default}') + quit(1) + y='Y' if default=='y' else 'y' + n='N' if default=='n' else 'n' + while "the answer is invalid": + try: + to_str='' if timeout is None or os.name=='nt' else f'(Timeout {default} in {timeout}s)' + if os.name=='nt': + log.warning('cannot use timeout signal on windows') + time.sleep(.1) # make the warning come out first + reply=str(input(f'{question} {to_str} ({y}/{n}): ')).lower().strip() + else: + reply = str(input_with_timeout(f'{question} {to_str} ({y}/{n}): ',timeout=timeout)).lower().strip() + except TimeoutError: + log.warning(f'timeout expired, returning default={default} answer') + reply='' + if len(reply)==0 or reply=='': + return True if default=='y' else False + elif reply[0].lower() == 'y': + return True + if reply[0].lower() == 'n': + return False + diff --git a/requirements.txt b/requirements.txt index 67fdbcd..dd6f2c5 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,4 +3,7 @@ tensorflow_probability numpy torch torchvision +gym +dictdiffer +ruamel.yaml gymnasium \ No newline at end of file