diff --git a/CMakeLists.txt b/CMakeLists.txt index b1d5b9449..b0ccf4215 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -2,7 +2,7 @@ cmake_minimum_required(VERSION 3.10) # Set the build version -set(BUILD_VERSION 1.4.12) +set(BUILD_VERSION 1.4.13) # Add definition of Jiminy version for C++ headers add_definitions("-DJIMINY_VERSION=\"${BUILD_VERSION}\"") diff --git a/core/include/jiminy/core/Macro.h b/core/include/jiminy/core/Macro.h index 7bec60326..ce121016f 100644 --- a/core/include/jiminy/core/Macro.h +++ b/core/include/jiminy/core/Macro.h @@ -165,7 +165,14 @@ namespace jiminy #define FILE_LINE __FILE__ ":" STRINGIFY(__LINE__) #define PRINT_ERROR(...) \ - std::cout << "In " FILE_LINE ": In " << BOOST_CURRENT_FUNCTION << ":\n\033[1;31merror:\033[0m " << to_string(__VA_ARGS__) << std::endl; + std::cout << "In " FILE_LINE ": In " << BOOST_CURRENT_FUNCTION << ":\n\033[1;32merror:\033[0m " << to_string(__VA_ARGS__) << std::endl; + + #ifdef NDEBUG + #define PRINT_WARNING(...) + #else + #define PRINT_WARNING(...) \ + std::cout << "In " FILE_LINE ": In " << BOOST_CURRENT_FUNCTION << ":\n\033[1;93mwarning:\033[0m " << to_string(__VA_ARGS__) << std::endl; + #endif } #endif // JIMINY_MACRO_H diff --git a/core/src/engine/EngineMultiRobot.cc b/core/src/engine/EngineMultiRobot.cc index cb62d32b0..1c09365b8 100644 --- a/core/src/engine/EngineMultiRobot.cc +++ b/core/src/engine/EngineMultiRobot.cc @@ -1127,48 +1127,24 @@ namespace jiminy } } - if (stepperUpdatePeriod_ > EPS) + // Update the controller command if necessary (only for finite update frequency) + if (stepperUpdatePeriod_ > EPS && engineOptions_->stepper.controllerUpdatePeriod > EPS) { - // Update the sensor data if necessary (only for finite update frequency) - if (engineOptions_->stepper.sensorsUpdatePeriod > EPS) + float64_t const & controllerUpdatePeriod = engineOptions_->stepper.controllerUpdatePeriod; + float64_t dtNextControllerUpdatePeriod = controllerUpdatePeriod - std::fmod(t, controllerUpdatePeriod); + if (dtNextControllerUpdatePeriod <= SIMULATION_MIN_TIMESTEP / 2.0 + || controllerUpdatePeriod - dtNextControllerUpdatePeriod < SIMULATION_MIN_TIMESTEP / 2.0) { - float64_t const & sensorsUpdatePeriod = engineOptions_->stepper.sensorsUpdatePeriod; - float64_t dtNextSensorsUpdatePeriod = sensorsUpdatePeriod - std::fmod(t, sensorsUpdatePeriod); - if (dtNextSensorsUpdatePeriod <= SIMULATION_MIN_TIMESTEP / 2.0 - || sensorsUpdatePeriod - dtNextSensorsUpdatePeriod < SIMULATION_MIN_TIMESTEP / 2.0) + auto systemIt = systems_.begin(); + auto systemDataIt = systemsDataHolder_.begin(); + for ( ; systemIt != systems_.end(); ++systemIt, ++systemDataIt) { - auto systemIt = systems_.begin(); - auto systemDataIt = systemsDataHolder_.begin(); - for ( ; systemIt != systems_.end(); ++systemIt, ++systemDataIt) - { - vectorN_t const & q = systemDataIt->state.q; - vectorN_t const & v = systemDataIt->state.v; - vectorN_t const & a = systemDataIt->state.a; - vectorN_t const & uMotor = systemDataIt->state.uMotor; - systemIt->robot->setSensorsData(t, q, v, a, uMotor); - } - } - } - - // Update the controller command if necessary (only for finite update frequency) - if (engineOptions_->stepper.controllerUpdatePeriod > EPS) - { - float64_t const & controllerUpdatePeriod = engineOptions_->stepper.controllerUpdatePeriod; - float64_t dtNextControllerUpdatePeriod = controllerUpdatePeriod - std::fmod(t, controllerUpdatePeriod); - if (dtNextControllerUpdatePeriod <= SIMULATION_MIN_TIMESTEP / 2.0 - || controllerUpdatePeriod - dtNextControllerUpdatePeriod < SIMULATION_MIN_TIMESTEP / 2.0) - { - auto systemIt = systems_.begin(); - auto systemDataIt = systemsDataHolder_.begin(); - for ( ; systemIt != systems_.end(); ++systemIt, ++systemDataIt) - { - vectorN_t const & q = systemDataIt->state.q; - vectorN_t const & v = systemDataIt->state.v; - vectorN_t & uCommand = systemDataIt->state.uCommand; - computeCommand(*systemIt, t, q, v, uCommand); - } - hasDynamicsChanged = true; + vectorN_t const & q = systemDataIt->state.q; + vectorN_t const & v = systemDataIt->state.v; + vectorN_t & uCommand = systemDataIt->state.uCommand; + computeCommand(*systemIt, t, q, v, uCommand); } + hasDynamicsChanged = true; } } @@ -1400,6 +1376,29 @@ namespace jiminy } } + // Update sensors data if necessary, namely if time-continuous or breakpoint + float64_t const & sensorsUpdatePeriod = engineOptions_->stepper.sensorsUpdatePeriod; + bool mustUpdateSensors = sensorsUpdatePeriod < SIMULATION_MIN_TIMESTEP; + if (!mustUpdateSensors) + { + float64_t dtNextSensorsUpdatePeriod = sensorsUpdatePeriod - std::fmod(t, sensorsUpdatePeriod); + mustUpdateSensors = (dtNextSensorsUpdatePeriod <= SIMULATION_MIN_TIMESTEP / 2.0 + || sensorsUpdatePeriod - dtNextSensorsUpdatePeriod < SIMULATION_MIN_TIMESTEP / 2.0); + } + if (mustUpdateSensors) + { + auto systemIt = systems_.begin(); + auto systemDataIt = systemsDataHolder_.begin(); + for ( ; systemIt != systems_.end(); ++systemIt, ++systemDataIt) + { + vectorN_t const & q = systemDataIt->state.q; + vectorN_t const & v = systemDataIt->state.v; + vectorN_t const & a = systemDataIt->state.a; + vectorN_t const & uMotor = systemDataIt->state.uMotor; + systemIt->robot->setSensorsData(t, q, v, a, uMotor); + } + } + if (sucessiveIterFailed > engineOptions_->stepper.successiveIterFailedMax) { PRINT_ERROR("Too many successive iteration failures. Probably something is " diff --git a/core/src/robot/Model.cc b/core/src/robot/Model.cc index 3d423507c..1bd9ad1d7 100644 --- a/core/src/robot/Model.cc +++ b/core/src/robot/Model.cc @@ -1069,7 +1069,7 @@ namespace jiminy } catch (std::logic_error const & e) { - std::cout << "hpp-fcl not built with qhull. Impossible to convert meshes to convex hulls." << std::endl; + PRINT_WARNING("hpp-fcl not built with qhull. Impossible to convert meshes to convex hulls."); } // Instantiate ground FCL box geometry, wrapped as a pinocchio collision geometry. diff --git a/examples/tutorial.ipynb b/examples/tutorial.ipynb index 87e89ce3a..7d0bfd2e4 100644 --- a/examples/tutorial.ipynb +++ b/examples/tutorial.ipynb @@ -64,7 +64,6 @@ "\n", "import jiminy_py.core as jiminy # The main module of jiminy - this is what gives access to the Robot and Engine class.\n", "from jiminy_py.simulator import Simulator\n", - "from jiminy_py.controller import BaseJiminyController\n", "\n", "\n", "data_root_path = \"../data/toys_models/simple_pendulum\"\n", diff --git a/python/gym_jiminy/common/gym_jiminy/common/block_bases.py b/python/gym_jiminy/common/gym_jiminy/common/block_bases.py index f0ee0feb5..2ea57cf3b 100644 --- a/python/gym_jiminy/common/gym_jiminy/common/block_bases.py +++ b/python/gym_jiminy/common/gym_jiminy/common/block_bases.py @@ -8,15 +8,14 @@ - the base controller block - the base observer block """ -from typing import Optional, Any, Union +from typing import Optional, Any, List import gym import jiminy_py.core as jiminy -from jiminy_py.simulator import Simulator -from .utils import FieldDictRecursive, SpaceDictRecursive -from .generic_bases import ControlInterface, ObserveInterface +from .utils import FieldDictNested, SpaceDictNested +from .generic_bases import ControllerInterface, ObserverInterface from .env_bases import BaseJiminyEnv @@ -27,51 +26,54 @@ class BlockInterface: any number of subsequent blocks, or directly to a `BaseJiminyEnv` environment. """ - env: Optional[BaseJiminyEnv] observation_space: Optional[gym.Space] action_space: Optional[gym.Space] def __init__(self, + env: BaseJiminyEnv, update_ratio: int = 1, **kwargs: Any) -> None: """Initialize the block interface. It only allocates some attributes. + :param env: Environment to ultimately control, ie completely unwrapped. :param update_ratio: Ratio between the update period of the high-level controller and the one of the subsequent lower-level controller. :param kwargs: Extra keyword arguments that may be useful for mixing multiple inheritance through multiple inheritance. """ - # Define some attributes - self.env = None - self.observation_space = None - self.action_space = None - # Backup some user arguments + self.env = env self.update_ratio = update_ratio # Call super to allow mixing interfaces through multiple inheritance super().__init__(**kwargs) # type: ignore[call-arg] - @property - def robot(self) -> jiminy.Robot: - """Get low-level Jiminy robot of the associated environment. + # Refresh the observation and action spaces + self._refresh_observation_space() + self._refresh_action_space() + + # Assertion(s) for type checker + assert (self.observation_space is not None and + self.action_space is not None) + + def __getattr__(self, name: str) -> Any: + """Fallback attribute getter. + + It enables to get access to the attribute and methods of the low-level + Jiminy engine directly, without having to do it through `env`. """ - if self.env is None: - raise RuntimeError("Associated environment undefined.") - return self.env.robot + return getattr(self.env, name) - @property - def simulator(self) -> Simulator: - """Get low-level simulator of the associated environment. + def __dir__(self) -> List[str]: + """Attribute lookup. + + It is mainly used by autocomplete feature of Ipython. It is overloaded + to get consistent autocompletion wrt `getattr`. """ - if self.env is None: - raise RuntimeError("Associated environment undefined.") - if self.env.simulator is None: - raise RuntimeError("Associated environment not initialized.") - return self.env.simulator + return super().__dir__() + self.env.__dir__() # type: ignore[operator] @property def system_state(self) -> jiminy.SystemState: @@ -79,48 +81,20 @@ def system_state(self) -> jiminy.SystemState: """ return self.simulator.engine.system_state - def reset(self, env: Union[gym.Wrapper, BaseJiminyEnv]) -> None: - """Reset the block for a given environment, eventually already wrapped. - - .. note:: - The environment itself is not necessarily directly connected to - this block since it may actually be connected to another block - instead. - - .. warning:: - This method that must not be overloaded. `_setup` is the - unique entry-point to customize the block's initialization. - - :param env: Environment. - """ - # Backup the unwrapped environment - if isinstance(env, gym.Wrapper): - # Make sure the environment actually derive for BaseJiminyEnv - assert isinstance(env.unwrapped, BaseJiminyEnv), ( - "env.unwrapped must derived from `BaseJiminyEnv`.") - self.env = env.unwrapped - else: - self.env = env - - # Configure the block - self._setup() - - # Refresh the observation and action spaces - self._refresh_observation_space() - self._refresh_action_space() - # methods to override: # ---------------------------- def _setup(self) -> None: - """Configure the block. + """Reset the internal state of the block. + + .. note:: + The environment itself is not necessarily directly connected to + this block since it may actually be connected through another block + instead. .. note:: - Note that the environment `env` has already been fully initialized - at this point, so that each of its internal buffers is up-to-date, - but the simulation is not running yet. As a result, it is still - possible to update the configuration of the simulator, and for - example, to register some extra variables to monitor the internal + It is possible to update the configuration of the simulator, for + example to register some extra variables to monitor the internal state of the block. """ @@ -131,10 +105,6 @@ def _refresh_observation_space(self) -> None: The observation space refers to the output of system once connected with another block. For example, for a controller, it is the action from the next block. - - .. note:: - This method is called right after `_setup`, so that both the - environment and this block should be already initialized. """ raise NotImplementedError @@ -145,15 +115,11 @@ def _refresh_action_space(self) -> None: The action space refers to the input of the block. It does not have to be an actual action. For example, for an observer, it is the observation from the previous block. - - .. note:: - This method is called right after `_setup`, so that both the - environment and this block should be already initialized. """ raise NotImplementedError -class BaseControllerBlock(BlockInterface, ControlInterface): +class BaseControllerBlock(ControllerInterface, BlockInterface): r"""Base class to implement controller that can be used compute targets to apply to the robot of a `BaseJiminyEnv` environment, through any number of lower-level controllers. @@ -208,31 +174,21 @@ def _refresh_observation_space(self) -> None: it, then using `BaseObserverBlock` or `BlockInterface` directly is probably the way to go. """ - # Assertion(s) for type checker - assert self.env is not None - self.observation_space = self.env.action_space - def reset(self, env: Union[gym.Wrapper, BaseJiminyEnv]) -> None: - """Reset the controller for a given environment. - - In addition to the base implementation, it also set 'control_dt' - dynamically, based on the environment 'control_dt'. - - :param env: Environment to control, eventually already wrapped. - """ - # Call base implementation - super().reset(env) - - # Assertion(s) for type checker - assert self.env is not None + # methods to override: + # ---------------------------- + def _setup(self) -> None: + # Compute the update period self.control_dt = self.env.control_dt * self.update_ratio - # methods to override: - # ---------------------------- + # Make sure the controller period is lower than environment timestep + assert self.control_dt <= self.env.step_dt, ( + "The controller update period must be lower than or equal to the " + "environment simulation timestep.") - def get_fieldnames(self) -> FieldDictRecursive: + def get_fieldnames(self) -> FieldDictNested: """Get mapping between each scalar element of the action space of the controller and the associated fieldname for logging. @@ -292,7 +248,7 @@ def get_fieldnames(self) -> FieldDictRecursive: """ -class BaseObserverBlock(BlockInterface, ObserveInterface): +class BaseObserverBlock(ObserverInterface, BlockInterface): r"""Base class to implement observe that can be used compute observation features of a `BaseJiminyEnv` environment, through any number of lower-level observer. @@ -337,30 +293,23 @@ def _refresh_action_space(self) -> None: it, then using `BaseControllerBlock` or `BlockInterface` directly is probably the way to go. """ - # Assertion(s) for type checker - assert self.env is not None - self.action_space = self.env.observation_space - def reset(self, env: Union[gym.Wrapper, BaseJiminyEnv]) -> None: - """Reset the observer for a given environment. - - In addition to the base implementation, it also set 'observe_dt' - dynamically, based on the environment 'observe_dt'. - - :param env: Environment to observe, eventually already wrapped. - """ - # Call base implementation - super().reset(env) - - # Assertion(s) for type checker - assert self.env is not None + # methods to override: + # ---------------------------- + def _setup(self) -> None: + # Compute the update period self.observe_dt = self.env.observe_dt * self.update_ratio + # Make sure the controller period is lower than environment timestep + assert self.observe_dt <= self.env.step_dt, ( + "The observer update period must be lower than or equal to the " + "environment simulation timestep.") + def compute_observation(self, # type: ignore[override] - measure: SpaceDictRecursive - ) -> SpaceDictRecursive: + measure: SpaceDictNested + ) -> SpaceDictNested: """Compute observed features based on the current simulation state and lower-level measure. diff --git a/python/gym_jiminy/common/gym_jiminy/common/control_impl.py b/python/gym_jiminy/common/gym_jiminy/common/control_impl.py index 6207235a5..8d70114d8 100644 --- a/python/gym_jiminy/common/gym_jiminy/common/control_impl.py +++ b/python/gym_jiminy/common/gym_jiminy/common/control_impl.py @@ -1,15 +1,16 @@ """ TODO: Write documentation. """ from collections import OrderedDict -from typing import Union, Optional, Sequence, Any +from typing import Union, Any import numpy as np import gym from jiminy_py.core import EncoderSensor as encoder +from .env_bases import BaseJiminyEnv from .block_bases import BaseControllerBlock -from .utils import SpaceDictRecursive, FieldDictRecursive +from .utils import SpaceDictNested, FieldDictNested class PDController(BaseControllerBlock): @@ -20,6 +21,7 @@ class PDController(BaseControllerBlock): intermediary controllers. """ def __init__(self, + env: BaseJiminyEnv, update_ratio: int = 1, pid_kp: Union[float, np.ndarray] = 0.0, pid_kd: Union[float, np.ndarray] = 0.0, @@ -32,17 +34,33 @@ def __init__(self, :param kwargs: Used arguments to allow automatic pipeline wrapper generation. """ - # Initialize the controller - super().__init__(update_ratio) - # Backup some user arguments self.pid_kp = pid_kp self.pid_kd = pid_kd - # Low-level controller buffers - self.motor_to_encoder: Optional[Sequence[int]] = None - self._q_target = None - self._v_target = None + # Define the mapping from motors to encoders + encoder_joints = [] + for name in env.robot.sensors_names[encoder.type]: + sensor = env.robot.get_sensor(encoder.type, name) + encoder_joints.append(sensor.joint_name) + + self.motor_to_encoder = [] + for name in env.robot.motors_names: + motor = env.robot.get_motor(name) + motor_joint = motor.joint_name + encoder_found = False + for i, encoder_joint in enumerate(encoder_joints): + if motor_joint == encoder_joint: + self.motor_to_encoder.append(i) + encoder_found = True + break + if not encoder_found: + raise RuntimeError( + f"No encoder sensor associated with motor '{name}'. Every " + "actuated joint must have an encoder sensor attached.") + + # Initialize the controller + super().__init__(env, update_ratio) def _refresh_action_space(self) -> None: """Configure the action space of the controller. @@ -50,9 +68,6 @@ def _refresh_action_space(self) -> None: The action spaces corresponds to the position and velocity of motors instead of the torque. """ - # Assertion(s) for type checker - assert self.env is not None - # Extract the position and velocity bounds for the observation space sensors_space = self.env._get_sensors_space() encoders_space = sensors_space[encoder.type] @@ -72,33 +87,7 @@ def _refresh_action_space(self) -> None: (encoder.fieldnames[1], gym.spaces.Box( low=vel_low, high=vel_high, dtype=np.float64))]) - def _setup(self) -> None: - """Configure the controller. - - It updates the mapping from motors to encoders indices. - """ - # Refresh the mapping between the motors and encoders - encoder_joints = [] - for name in self.robot.sensors_names[encoder.type]: - sensor = self.robot.get_sensor(encoder.type, name) - encoder_joints.append(sensor.joint_name) - - self.motor_to_encoder = [] - for name in self.robot.motors_names: - motor = self.robot.get_motor(name) - motor_joint = motor.joint_name - encoder_found = False - for i, encoder_joint in enumerate(encoder_joints): - if motor_joint == encoder_joint: - self.motor_to_encoder.append(i) - encoder_found = True - break - if not encoder_found: - raise RuntimeError( - f"No encoder sensor associated with motor '{name}'. Every " - "actuated joint must have an encoder sensor attached.") - - def get_fieldnames(self) -> FieldDictRecursive: + def get_fieldnames(self) -> FieldDictNested: pos_fieldnames = [f"targetPosition{name}" for name in self.robot.motors_names] vel_fieldnames = [f"targetVelocity{name}" @@ -107,8 +96,8 @@ def get_fieldnames(self) -> FieldDictRecursive: (encoder.fieldnames[1], vel_fieldnames)]) def compute_command(self, - measure: SpaceDictRecursive, - action: SpaceDictRecursive + measure: SpaceDictNested, + action: SpaceDictNested ) -> np.ndarray: """Compute the motor torques using a PD controller. diff --git a/python/gym_jiminy/common/gym_jiminy/common/env_bases.py b/python/gym_jiminy/common/gym_jiminy/common/env_bases.py index 390df9e89..713ccdb49 100644 --- a/python/gym_jiminy/common/gym_jiminy/common/env_bases.py +++ b/python/gym_jiminy/common/gym_jiminy/common/env_bases.py @@ -20,12 +20,13 @@ from jiminy_py.dynamics import compute_freeflyer_state_from_fixed_body from jiminy_py.simulator import Simulator from jiminy_py.viewer import sleep, play_logfiles -from jiminy_py.controller import BaseJiminyController, ControllerHandleType +from jiminy_py.controller import ( + ObserverHandleType, ControllerHandleType, BaseJiminyObserverController) from pinocchio import neutral -from .utils import _clamp, zeros, fill, SpaceDictRecursive -from .generic_bases import ObserveAndControlInterface +from .utils import _clamp, zeros, fill, set_value, SpaceDictNested +from .generic_bases import ObserverControllerInterface from .play import loop_interactive @@ -43,7 +44,7 @@ SENSOR_ACCEL_MAX = 10000.0 -class BaseJiminyEnv(gym.Env, ObserveAndControlInterface): +class BaseJiminyEnv(ObserverControllerInterface, gym.Env): """Base class to train a robot in Gym OpenAI using a user-specified Python Jiminy engine for physics computations. @@ -89,15 +90,15 @@ def __init__(self, """ # pylint: disable=unused-argument - # Initialize the interfaces through multiple inheritance - super().__init__(**kwargs) - # Backup some user arguments self.simulator = simulator self.step_dt = step_dt self.enforce_bounded = enforce_bounded self.debug = debug + # Initialize the interfaces through multiple inheritance + super().__init__(**kwargs) + # Internal buffers for physics computations self.rg = np.random.RandomState() self._seed: Optional[np.uint32] = None @@ -376,9 +377,10 @@ def _refresh_action_space(self) -> None: dtype=np.float64) def reset(self, - controller_hook: Optional[ - Callable[[], Optional[ControllerHandleType]]] = None - ) -> SpaceDictRecursive: + controller_hook: Optional[Callable[[], Optional[Tuple[ + Optional[ObserverHandleType], + Optional[ControllerHandleType]]]]] = None + ) -> SpaceDictNested: """Reset the environment. In practice, it resets the backend simulator and set the initial state @@ -395,13 +397,14 @@ def reset(self, right after initialization of the environment, and just before actually starting the simulation. It is a callable that optionally - returns a controller handle. If so, it will be - used to initialize the low-level jiminy - controller. It is useful to override partially - the configuration of the low-level engine, set - a custom low-level controller handle, or to - register custom variables to the telemetry. Set - to `None` if unused. + returns observer and/or controller handles. If + defined, it will be used to initialize the + low-level jiminy controller. It is useful to + override partially the configuration of the + low-level engine, set a custom low-level + observer/controller handle, or to register + custom variables to the telemetry. Set to + `None` if unused. Optional: Disable by default. :returns: Initial observation of the episode. @@ -435,37 +438,39 @@ def reset(self, fd, self.log_path = tempfile.mkstemp(prefix="log_", suffix=".data") os.close(fd) - # Extract the controller and observer update period. + # Extract the observer/controller update period. # There is no actual observer by default, apart from the robot's state - # and raw sensors data, so the observation period matches the one of - # the sensors. Similarly, there is no actual controller by default, - # apart from forwarding the command torque to the motors. + # and raw sensors data. Similarly, there is no actual controller by + # default, apart from forwarding the command torque to the motors. engine_options = self.simulator.engine.get_options() - self.control_dt = \ + self.control_dt = self.observe_dt = \ float(engine_options['stepper']['controllerUpdatePeriod']) - self.observe_dt = \ - float(engine_options['stepper']['sensorsUpdatePeriod']) # Enforce the low-level controller. # The backend robot may have changed, for example if it is randomly # generated based on different URDF files. As a result, it is necessary # to instantiate a new low-level controller. - # Note that `BaseJiminyController` is used by default in place of - # `jiminy.ControllerFunctor`. Although it is less efficient because it - # adds an extra layer of indirection, it makes it possible to update + # Note that `BaseJiminyObserverController` is used by default in place + # of `jiminy.ControllerFunctor`. Although it is less efficient because + # it adds an extra layer of indirection, it makes it possible to update # the controller handle without instantiating a new controller, which # is necessary to allow registering telemetry variables before knowing # the controller handle in advance. - controller = BaseJiminyController() + controller = BaseJiminyObserverController() controller.initialize(self.robot) self.simulator.set_controller(controller) - # Run controller hook and set the controller handle - controller_handle = None + # Run controller hook and set the observer and controller handles + observer_handle, controller_handle = None, None if controller_hook is not None: - controller_handle = controller_hook() + handles = controller_hook() + if handles is not None: + observer_handle, controller_handle = handles + if observer_handle is None: + observer_handle = self._observer_handle + self.simulator.controller.set_observer_handle(observer_handle) if controller_handle is None: - controller_handle = self._send_command + controller_handle = self._controller_handle self.simulator.controller.set_controller_handle(controller_handle) # Sample the initial state and reset the low-level engine @@ -477,13 +482,8 @@ def reset(self, "inconsistent with the dimension or types of joints of the " "model.") - # Start the engine, in order to initialize the sensors data - hresult = self.simulator.start( - qpos, qvel, self.simulator.use_theoretical_model) - if hresult != jiminy.hresult_t.SUCCESS: - raise RuntimeError( - "Failed to start the simulation. Probably because the " - "initial state is invalid.") + # Start the engine + self.simulator.start(qpos, qvel, self.simulator.use_theoretical_model) # Update the observation self._observation = self.compute_observation() @@ -542,7 +542,7 @@ def close(self) -> None: def step(self, action: Optional[np.ndarray] = None - ) -> Tuple[SpaceDictRecursive, float, bool, Dict[str, Any]]: + ) -> Tuple[SpaceDictNested, float, bool, Dict[str, Any]]: """Run a simulation step for a given action. :param action: Action to perform. `None` to not update the action. @@ -563,9 +563,7 @@ def step(self, is_step_failed = True try: # Perform a single integration step - return_code = self.simulator.step(self.step_dt) - if return_code != jiminy.hresult_t.SUCCESS: - raise RuntimeError("Failed to perform the simulation step.") + self.simulator.step(self.step_dt) # Update some internal buffers self.num_steps += 1 @@ -815,8 +813,7 @@ def _sample_state(self) -> Tuple[np.ndarray, np.ndarray]: return qpos, qvel - def compute_observation(self # type: ignore[override] - ) -> SpaceDictRecursive: + def compute_observation(self) -> SpaceDictNested: # type: ignore[override] """Compute the observation based on the current state of the robot. """ # pylint: disable=arguments-differ @@ -831,7 +828,7 @@ def compute_observation(self # type: ignore[override] sensors=self._sensors_data) def compute_command(self, - measure: SpaceDictRecursive, + measure: SpaceDictNested, action: np.ndarray ) -> np.ndarray: """Compute the motors efforts to apply on the robot. @@ -843,6 +840,7 @@ def compute_command(self, :param measure: Observation of the environment. :param action: Desired motors efforts. """ + set_value(self._action, action) return _clamp(self.action_space, action) def is_done(self, *args: Any, **kwargs: Any) -> bool: @@ -887,7 +885,7 @@ def _key_to_action(key: str) -> np.ndarray: BaseJiminyEnv.compute_reward.__doc__ = \ """Compute reward at current episode state. - See `ControlInterface.compute_reward` for details. + See `ControllerInterface.compute_reward` for details. .. note:: This method is called after updating the internal buffer @@ -914,12 +912,12 @@ def __init__(self, simulator: Optional[Simulator], step_dt: float, debug: bool = False) -> None: - # Initialize base class - super().__init__(simulator, step_dt, debug) - # Define some internal buffers self._desired_goal: Optional[np.ndarray] = None + # Initialize base class + super().__init__(simulator, step_dt, debug) + def _refresh_observation_space(self) -> None: # Assertion(s) for type checker assert isinstance(self._desired_goal, np.ndarray), ( @@ -938,8 +936,7 @@ def _refresh_observation_space(self) -> None: -np.inf, np.inf, shape=self._desired_goal.shape, dtype=np.float64))) - def compute_observation(self # type: ignore[override] - ) -> SpaceDictRecursive: + def compute_observation(self) -> SpaceDictNested: # type: ignore[override] # Assertion(s) for type checker assert self._desired_goal is not None @@ -950,15 +947,17 @@ def compute_observation(self # type: ignore[override] ) def reset(self, - controller_hook: Optional[Callable[[], None]] = None - ) -> SpaceDictRecursive: + controller_hook: Optional[Callable[[], Optional[Tuple[ + Optional[ObserverHandleType], + Optional[ControllerHandleType]]]]] = None + ) -> SpaceDictNested: self._desired_goal = self._sample_goal() return super().reset(controller_hook) # methods to override: # ---------------------------- - def _sample_goal(self) -> SpaceDictRecursive: + def _sample_goal(self) -> SpaceDictNested: """Sample a goal randomly. .. note:: @@ -970,7 +969,7 @@ def _sample_goal(self) -> SpaceDictRecursive: """ raise NotImplementedError - def _get_achieved_goal(self) -> SpaceDictRecursive: + def _get_achieved_goal(self) -> SpaceDictNested: """Compute the achieved goal based on current state of the robot. .. note:: @@ -983,8 +982,8 @@ def _get_achieved_goal(self) -> SpaceDictRecursive: raise NotImplementedError def is_done(self, # type: ignore[override] - achieved_goal: Optional[SpaceDictRecursive] = None, - desired_goal: Optional[SpaceDictRecursive] = None) -> bool: + achieved_goal: Optional[SpaceDictNested] = None, + desired_goal: Optional[SpaceDictNested] = None) -> bool: """Determine whether a desired goal has been achieved. By default, it uses the termination condition inherited from normal @@ -1007,8 +1006,8 @@ def is_done(self, # type: ignore[override] raise NotImplementedError def compute_reward(self, # type: ignore[override] - achieved_goal: Optional[SpaceDictRecursive] = None, - desired_goal: Optional[SpaceDictRecursive] = None, + achieved_goal: Optional[SpaceDictNested] = None, + desired_goal: Optional[SpaceDictNested] = None, *, info: Dict[str, Any]) -> float: """Compute the reward for any given episode state. diff --git a/python/gym_jiminy/common/gym_jiminy/common/env_locomotion.py b/python/gym_jiminy/common/gym_jiminy/common/env_locomotion.py index 73ff43813..3589a57b5 100644 --- a/python/gym_jiminy/common/gym_jiminy/common/env_locomotion.py +++ b/python/gym_jiminy/common/gym_jiminy/common/env_locomotion.py @@ -152,13 +152,15 @@ def __init__(self, self._power_consumption_max = 0.0 self._height_neutral = 0.0 - # Configure and initialize the learning environment + # Configure the backend simulator simulator = Simulator.build( self.urdf_path, self.hardware_path, self.mesh_path, has_freeflyer=True, use_theoretical_model=False, config_path=self.config_path, avoid_instable_collisions=self.avoid_instable_collisions, debug=debug) + + # Initialize base class super().__init__(simulator, step_dt, enforce_bounded, debug, **kwargs) def _setup(self) -> None: @@ -171,7 +173,7 @@ def _setup(self) -> None: - enforce some options of the low-level robot and engine, - randomize the environment according to 'std_ratio'. - .. note:: TODO WRONG + .. note:: This method is called internally by `reset` method at the very beginning. One must overide it to implement new contributions to the environment stochasticity, or to create custom low-level robot diff --git a/python/gym_jiminy/common/gym_jiminy/common/generic_bases.py b/python/gym_jiminy/common/gym_jiminy/common/generic_bases.py index b581f59df..79bba9cfd 100644 --- a/python/gym_jiminy/common/gym_jiminy/common/generic_bases.py +++ b/python/gym_jiminy/common/gym_jiminy/common/generic_bases.py @@ -7,15 +7,15 @@ import jiminy_py.core as jiminy -from .utils import _clamp, set_value, SpaceDictRecursive +from .utils import _clamp, set_value, copy, SpaceDictNested -class ControlInterface: +class ControllerInterface: """Controller interface for both controllers and environments. """ control_dt: float action_space: Optional[gym.Space] - _action: Optional[SpaceDictRecursive] + _action: Optional[SpaceDictNested] def __init__(self, *args: Any, **kwargs: Any) -> None: """Initialize the control interface. @@ -33,7 +33,7 @@ def __init__(self, *args: Any, **kwargs: Any) -> None: self.enable_reward_terminal = ( self.compute_reward_terminal. # type: ignore[attr-defined] - __func__ is not ControlInterface.compute_reward_terminal) + __func__ is not ControllerInterface.compute_reward_terminal) # Call super to allow mixing interfaces through multiple inheritance super().__init__(*args, **kwargs) # type: ignore[call-arg] @@ -47,9 +47,9 @@ def _refresh_action_space(self) -> None: raise NotImplementedError def compute_command(self, - measure: SpaceDictRecursive, - action: SpaceDictRecursive - ) -> SpaceDictRecursive: + measure: SpaceDictNested, + action: SpaceDictNested + ) -> SpaceDictNested: """Compute the command to send to the subsequent block, based on the current target and observation of the environment. @@ -101,12 +101,12 @@ def compute_reward_terminal(self, info: Dict[str, Any]) -> float: raise NotImplementedError -class ObserveInterface: +class ObserverInterface: """Observer interface for both observers and environments. """ observe_dt: float observation_space: Optional[gym.Space] - _observation: Optional[SpaceDictRecursive] + _observation: Optional[SpaceDictNested] def __init__(self, *args: Any, **kwargs: Any) -> None: """Initialize the observation interface. @@ -136,7 +136,7 @@ def refresh_observation(self) -> None: """ set_value(self._observation, self.compute_observation()) - def get_observation(self, bypass: bool = False) -> SpaceDictRecursive: + def get_observation(self, bypass: bool = False) -> SpaceDictNested: """Get post-processed observation. By default, it clamps the observation to make sure it does not violate @@ -147,10 +147,11 @@ def get_observation(self, bypass: bool = False) -> SpaceDictRecursive: doing so may lead to unexpected behavior if not done carefully. :param bypass: Whether to nor to bypass post-processing and return - the original observation instead. + the original observation instead (yet recursively + shadow copied). """ if bypass: - return self._observation + return copy(self._observation) return _clamp(self.observation_space, self._observation) # methods to override: @@ -163,7 +164,7 @@ def _refresh_observation_space(self) -> None: def compute_observation(self, *args: Any, - **kwargs: Any) -> SpaceDictRecursive: + **kwargs: Any) -> SpaceDictNested: """Compute the observation based on the current simulation state and lower-level measure. @@ -174,7 +175,7 @@ def compute_observation(self, raise NotImplementedError -class ObserveAndControlInterface(ObserveInterface, ControlInterface): +class ObserverControllerInterface(ObserverInterface, ControllerInterface): """Observer plus controller interface for both generic pipeline blocks, including environments. """ @@ -182,12 +183,23 @@ def __init__(self, *args: Any, **kwargs: Any) -> None: # Call super to allow mixing interfaces through multiple inheritance super().__init__(*args, **kwargs) - def _send_command(self, - t: float, - q: np.ndarray, - v: np.ndarray, - sensors_data: jiminy.sensorsData, - u_command: np.ndarray) -> None: + def _observer_handle(self, + t: float, + q: np.ndarray, + v: np.ndarray, + sensors_data: jiminy.sensorsData) -> None: + """TODO Write documentation. + """ + # pylint: disable=unused-argument + + self.refresh_observation() + + def _controller_handle(self, + t: float, + q: np.ndarray, + v: np.ndarray, + sensors_data: jiminy.sensorsData, + u_command: np.ndarray) -> None: """This method is the main entry-point to interact with the simulator. It is design to apply motors efforts on the robot, but in practice it also updates the observation before computing the command. @@ -212,14 +224,5 @@ def _send_command(self, """ # pylint: disable=unused-argument - # Refresh the observation. - # Note that the controller update period must be multiple of the sensor - # update period, so that it is unnecessary to check if it is a - # breakpoint. A dedicated `BaseObserverBlock` must be used if one wants - # to observe a low-frequency features instead of overloading - # `compute_observation` and `_refresh_observation_space` directly. - self.refresh_observation() - - # Compute the command to send to the motors np.copyto(u_command, self.compute_command( self.get_observation(bypass=True), self._action)) diff --git a/python/gym_jiminy/common/gym_jiminy/common/pipeline_bases.py b/python/gym_jiminy/common/gym_jiminy/common/pipeline_bases.py index ae0ae18ab..c2f8f5df2 100644 --- a/python/gym_jiminy/common/gym_jiminy/common/pipeline_bases.py +++ b/python/gym_jiminy/common/gym_jiminy/common/pipeline_bases.py @@ -13,22 +13,24 @@ """ from copy import deepcopy from collections import OrderedDict -from typing import Optional, Union, Tuple, Dict, Any, Type, Sequence, Callable +from typing import ( + Optional, Union, Tuple, Dict, Any, Type, Sequence, List, Callable) import numpy as np import gym +from typing_extensions import TypedDict -from jiminy_py.controller import ControllerHandleType +from jiminy_py.controller import ObserverHandleType, ControllerHandleType from .utils import ( _is_breakpoint, _clamp, zeros, fill, set_value, register_variables, - SpaceDictRecursive) -from .generic_bases import ObserveAndControlInterface + SpaceDictNested) +from .generic_bases import ObserverControllerInterface from .env_bases import BaseJiminyEnv from .block_bases import BlockInterface, BaseControllerBlock, BaseObserverBlock -class BasePipelineWrapper(ObserveAndControlInterface, gym.Wrapper): +class BasePipelineWrapper(ObserverControllerInterface, gym.Wrapper): """Wrap a `BaseJiminyEnv` Gym environment and a single block, so that it appears as a single, unified, environment. Eventually, the environment can already be wrapped inside one or several `gym.Wrapper` containers. @@ -47,26 +49,18 @@ class BasePipelineWrapper(ObserveAndControlInterface, gym.Wrapper): def __init__(self, env: Union[gym.Wrapper, BaseJiminyEnv], - augment_observation: bool = False) -> None: + **kwargs: Any) -> None: """ - :param augment_observation: Whether or not to augment the observation - of the environment with information - provided by the wrapped block. What it - means in practice depends on the type of - the wrapped block. - Optional: disable by default. + :param kwargs: Extra keyword arguments for multiple inheritance. """ - # Backup some user arguments - self.augment_observation = augment_observation - # Initialize base wrapper and interfaces through multiple inheritance - super().__init__(env) + super().__init__(env) # Do not forward extra arguments, if any # Define some internal buffers self._dt_eps: Optional[float] = None self._command = zeros(self.env.unwrapped.action_space) - def __dir__(self) -> Sequence[str]: + def __dir__(self) -> List[str]: """Attribute lookup. It is mainly used by autocomplete feature of Ipython. It is overloaded @@ -89,24 +83,26 @@ def _get_block_index(self) -> int: block = block.env return i - def get_observation(self, bypass: bool = False) -> SpaceDictRecursive: + def get_observation(self, bypass: bool = False) -> SpaceDictNested: """Get post-processed observation. - By default, it clamps the observation to make sure it does not violate - the lower and upper bounds. + By default, it returns either the original observation from the + environment, and the clamped computed features to make sure it does not + violate the lower and upper bounds for block observation space. - :param bypass: Whether to nor to bypass post-processing and return - the original environment's observation instead. + :param bypass: Whether to nor to return the original environment's + observation or the post-processed computed features. """ if bypass: - return self.env.get_observation(True) + return self.env.get_observation() else: return _clamp(self.observation_space, self._observation) def reset(self, - controller_hook: Optional[ - Callable[[], Optional[ControllerHandleType]]] = None, - **kwargs: Any) -> SpaceDictRecursive: + controller_hook: Optional[Callable[[], Optional[Tuple[ + Optional[ObserverHandleType], + Optional[ControllerHandleType]]]]] = None, + **kwargs: Any) -> SpaceDictNested: """Reset the unified environment. In practice, it resets the environment and initializes the generic @@ -121,7 +117,7 @@ def reset(self, # pylint: disable=unused-argument # Define chained controller hook - def register() -> ControllerHandleType: + def register() -> Tuple[ObserverHandleType, ControllerHandleType]: """Register the block to the higher-level block. This method is used internally to make sure that `_setup` method @@ -131,9 +127,6 @@ def register() -> ControllerHandleType: """ nonlocal self, controller_hook - # Assertion(s) for type checker - assert self.env is not None - # Get the temporal resolution of simulator steps engine_options = self.simulator.engine.get_options() self._dt_eps = 1.0 / engine_options["telemetry"]["timeUnit"] @@ -141,14 +134,19 @@ def register() -> ControllerHandleType: # Initialize the pipeline wrapper self._setup() - # Forward the controller handle provided by the controller hook, - # if any, or use the one of the controller otherwise. - controller_handle = None + # Forward the observer and controller handles provided by the + # controller hook of higher-level block, if any, or use the + # ones of this block otherwise. + observer_handle, controller_handle = None, None if controller_hook is not None: - controller_handle = controller_hook() + handles = controller_hook() + if handles is not None: + observer_handle, controller_handle = handles + if controller_handle is None: + observer_handle = self._observer_handle if controller_handle is None: - controller_handle = self._send_command - return controller_handle + controller_handle = self._controller_handle + return observer_handle, controller_handle # Reset base pipeline self.env.reset(register, **kwargs) # type: ignore[call-arg] @@ -156,8 +154,8 @@ def register() -> ControllerHandleType: return self.get_observation() def step(self, - action: Optional[SpaceDictRecursive] = None - ) -> Tuple[SpaceDictRecursive, float, bool, Dict[str, Any]]: + action: Optional[SpaceDictNested] = None + ) -> Tuple[SpaceDictNested, float, bool, Dict[str, Any]]: """Run a simulation step for a given action. :param action: Next action to perform. `None` to not update it. @@ -172,6 +170,12 @@ def step(self, # Compute the next learning step _, reward, done, info = self.env.step() + # Compute block's reward and sum it to total + reward += self.compute_reward(info=info) + if self.enable_reward_terminal: + if done and self.env.unwrapped._num_steps_beyond_done == 0: + reward += self.compute_reward_terminal(info=info) + return self.get_observation(), reward, done, info # methods to override: @@ -190,8 +194,7 @@ def _setup(self) -> None: fill(self._command, 0.0) fill(self._observation, 0.0) - def compute_observation(self # type: ignore[override] - ) -> SpaceDictRecursive: + def compute_observation(self) -> SpaceDictNested: # type: ignore[override] """Compute the unified observation. By default, it forwards the observation computed by the environment. @@ -201,12 +204,12 @@ def compute_observation(self # type: ignore[override] # pylint: disable=arguments-differ self.env.refresh_observation() - return self.get_observation(bypass=True).copy() # No deepcopy ! + return self.get_observation(bypass=True) def compute_command(self, - measure: SpaceDictRecursive, - action: SpaceDictRecursive - ) -> SpaceDictRecursive: + measure: SpaceDictNested, + action: SpaceDictNested + ) -> SpaceDictNested: """Compute the motors efforts to apply on the robot. By default, it forwards the command computed by the environment. @@ -214,6 +217,7 @@ def compute_command(self, :param measure: Observation of the environment. :param action: Target to achieve. """ + set_value(self._action, action) return self.env.compute_command(measure, action) @@ -300,24 +304,22 @@ def __init__(self, :param kwargs: Extra keyword arguments to allow automatic pipeline wrapper generation. """ - # Initialize base wrapper - super().__init__(env, augment_observation) - # Backup user arguments self.controller = controller + self.augment_observation = augment_observation + + # Make sure that the unwrapped environment matches the controlled one + assert env.unwrapped is controller.env + + # Initialize base wrapper + super().__init__(env, **kwargs) # Assertion(s) for type checker assert (self.env.action_space is not None and self.env.observation_space is not None) - # Reset the controller - self.controller.reset(self.env) - self.control_dt = self.controller.control_dt - - # Make sure the controller period is lower than environment timestep - assert self.control_dt <= self.env.unwrapped.step_dt, ( - "The control update period must be lower than or equal to the " - "environment simulation timestep.") + # Enable terminal reward only if the controller implements it + self.enable_reward_terminal = self.controller.enable_reward_terminal # Set the controller name, based on the controller index self.controller_name = f"controller_{self._get_block_index()}" @@ -349,15 +351,22 @@ def __init__(self, def _setup(self) -> None: """Configure the wrapper. - In addition to the base implementation, it resgisters the controller's - target to the telemetry. + In addition to the base implementation, it configures the controller + and registers its target to the telemetry. """ + # Configure the controller + self.controller._setup() + # Call base implementation super()._setup() # Reset some additional internal buffers fill(self._target, 0.0) + # Compute the observe and control update periods + self.observe_dt = self.env.observe_dt + self.control_dt = self.controller.control_dt + # Register the controller target to the telemetry. # It may be useful for computing the terminal reward or debugging. register_variables( @@ -365,9 +374,9 @@ def _setup(self) -> None: self._action, self.controller_name) def compute_command(self, - measure: SpaceDictRecursive, - action: SpaceDictRecursive - ) -> SpaceDictRecursive: + measure: SpaceDictNested, + action: SpaceDictNested + ) -> SpaceDictNested: """Compute the motors efforts to apply on the robot. In practice, it updates, whenever it is necessary: @@ -379,10 +388,13 @@ def compute_command(self, :param measure: Observation of the environment. :param action: High-level target to achieve. """ + # Backup the action + set_value(self._action, action) + # Update the target to send to the subsequent block if necessary. # Note that `_observation` buffer has already been updated right before - # calling this method by `_send_command`, so it can be used as measure - # argument without issue. + # calling this method by `_controller_handle`, so it can be used as + # measure argument without issue. t = self.simulator.stepper_state.t if _is_breakpoint(t, self.control_dt, self._dt_eps): target = self.controller.compute_command(self._observation, action) @@ -391,8 +403,8 @@ def compute_command(self, # Update the command to send to the actuators of the robot. # Note that the environment itself is responsible of making sure to # update the command of the right period. Ultimately, this is done - # automatically by the engine, which is calling `_send_command` at the - # right period. + # automatically by the engine, which is calling `_controller_handle` at + # the right period. if self.env.simulator.is_simulation_running: # Do not update command during the first iteration because the # action is undefined at this point @@ -401,8 +413,7 @@ def compute_command(self, return self._command - def compute_observation(self # type: ignore[override] - ) -> SpaceDictRecursive: + def compute_observation(self) -> SpaceDictNested: # type: ignore[override] """Compute the unified observation based on the current wrapped environment's observation and controller's target. @@ -428,19 +439,11 @@ def compute_observation(self # type: ignore[override] return obs - def step(self, - action: Optional[np.ndarray] = None - ) -> Tuple[SpaceDictRecursive, float, bool, Dict[str, Any]]: - # Compute the next learning step - observation, reward, done, info = super().step(action) - - # Compute controller's rewards and sum it to total reward - reward += self.controller.compute_reward(info=info) - if self.controller.enable_reward_terminal: - if done and self.env.unwrapped._num_steps_beyond_done == 0: - reward += self.controller.compute_reward_terminal(info=info) + def compute_reward(self, *args: Any, **kwargs: Any) -> float: + return self.controller.compute_reward(*args, **kwargs) - return observation, reward, done, info + def compute_reward_terminal(self, *args: Any, **kwargs: Any) -> float: + return self.controller.compute_reward_terminal(*args, **kwargs) class ObservedJiminyEnv(BasePipelineWrapper): @@ -492,11 +495,15 @@ def __init__(self, :param kwargs: Extra keyword arguments to allow automatic pipeline wrapper generation. """ - # Initialize base wrapper - super().__init__(env, augment_observation) - # Backup user arguments self.observer = observer + self.augment_observation = augment_observation + + # Make sure that the unwrapped environment matches the controlled one + assert env.unwrapped is observer.env + + # Initialize base wrapper + super().__init__(env, **kwargs) # Assertion(s) for type checker assert (self.env.action_space is not None and @@ -508,9 +515,6 @@ def __init__(self, # Update the action space self.action_space = self.env.action_space - # Reset the observer - self.observer.reset(self.env) - # Set the controller name, based on the controller index self.observer_name = f"observer_{self._get_block_index()}" @@ -537,8 +541,22 @@ def __init__(self, self._action = zeros(self.action_space) self._observation = zeros(self.observation_space) - def compute_observation(self # type: ignore[override] - ) -> SpaceDictRecursive: + def _setup(self) -> None: + """Configure the wrapper. + + In addition to the base implementation, it configures the observer. + """ + # Configure the observer + self.observer._setup() + + # Call base implementation + super()._setup() + + # Compute the observe and control update periods + self.observe_dt = self.observer.observe_dt + self.control_dt = self.env.control_dt + + def compute_observation(self) -> SpaceDictNested: # type: ignore[override] """Compute high-level features based on the current wrapped environment's observation. @@ -574,62 +592,80 @@ def compute_observation(self # type: ignore[override] return obs -def build_pipeline(env_config: Tuple[ - Type[BaseJiminyEnv], - Dict[str, Any]], - controllers_config: Sequence[Tuple[ - Type[BaseControllerBlock], - Dict[str, Any], - Dict[str, Any]]] = (), - observers_config: Sequence[Tuple[ - Type[BaseObserverBlock], - Dict[str, Any], - Dict[str, Any]]] = (), - ) -> Type[BasePipelineWrapper]: - """Wrap together an environment inheriting from `BaseJiminyEnv` with any - number of controllers and observers as a unified pipeline environment class - inheriting from `BasePipelineWrapper`. +class EnvConfig(TypedDict, total=False): + """Environment class type. + """ + env_class: Type[BaseJiminyEnv] - Each controller and observers are wrapped individually, successively. The - controllers are wrapped first, using `ControlledJiminyEnv`. Then comes the - observers, using `ObservedJiminyEnv`, so that intermediary controllers - targets are always available if requested. + """Environment constructor default arguments. - :param env_config: - Configuration of the environment, as a tuple: + This attribute can be omitted. + """ + env_kwargs: Dict[str, Any] + + +class BlockConfig(TypedDict, total=False): + """Block class type. If specified, it must derive from + `BaseControllerBlock` for controller blocks or `BaseObserverBlock` for + observer blocks. + + This attribute can be omitted. If so, then 'block_kwargs' must be omitted + and 'wrapper_class' must be specified. Indeed, not all block are associated + with a dedicated observer or controller object. It happens when the block + is not doing any computation on its own but just transforming the action or + observation, e.g. stacking observation frames. + """ + block_class: Union[ + Type[BaseControllerBlock], Type[BaseObserverBlock]] + + """Block constructor default arguments. + + This attribute can be omitted. + """ + block_kwargs: Dict[str, Any] + + """Wrapper class type. - - [0] Environment class type. - - [1] Environment constructor default arguments. + This attribute can be omitted. If so, then 'wrapper_kwargs' must be omitted + and 'block_class' must be specified. The latter will be used to infer the + default wrapper type. + """ + wrapper_class: Type[BasePipelineWrapper] + + """Wrapper constructor default arguments. - :param controllers_config: - Configuration of the controllers, as a list. The list is ordered from - the lowest level controller to the highest, each element corresponding - to the configuration of a individual controller, as a tuple: + This attribute can be omitted. + """ + wrapper_kwargs: Dict[str, Any] - - [0] Controller class type. - - [1] Controller constructor default arguments. - - [2] `ControlledJiminyEnv` constructor default arguments. - :param observers_config: - Configuration of the observers, as a list. The list is ordered from - the lowest level observer to the highest, each element corresponding - to the configuration of a individual observer, as a tuple: +def build_pipeline(env_config: EnvConfig, + blocks_config: Sequence[BlockConfig] = () + ) -> Type[BasePipelineWrapper]: + """Wrap together an environment inheriting from `BaseJiminyEnv` with any + number of blocks, as a unified pipeline environment class inheriting from + `BasePipelineWrapper`. Each block is wrapped individually and successively. - - [0] Observer class type. - - [1] Observer constructor default arguments. - - [2] `ObservedJiminyEnv` constructor default arguments. + :param env_config: + Configuration of the environment, as a dict of type `EnvConfig`. + + :param blocks_config: + Configuration of the blocks, as a list. The list is ordered from the + lowest level block to the highest, each element corresponding to the + configuration of a individual block, as a dict of type `BlockConfig`. """ # pylint: disable-all def _build_wrapper(env_class: Type[Union[gym.Wrapper, BaseJiminyEnv]], - env_kwargs_default: Optional[Dict[str, Any]], - block_class: Type[BlockInterface], - block_kwargs_default: Dict[str, Any], - wrapper_class: Type[BasePipelineWrapper], - wrapper_kwargs_default: Dict[str, Any] + env_kwargs: Optional[Dict[str, Any]] = None, + block_class: Optional[Type[BlockInterface]] = None, + block_kwargs: Optional[Dict[str, Any]] = None, + wrapper_class: Optional[ + Type[BasePipelineWrapper]] = None, + wrapper_kwargs: Optional[Dict[str, Any]] = None ) -> Type[ControlledJiminyEnv]: - """Generate a class inheriting from 'wrapper_class' and wrapping a - given type of environment and block together. + """Generate a class inheriting from 'wrapper_class' wrapping a given + type of environment, optionally gathered with a block. .. warning:: Beware of the collision between the keywords arguments of the @@ -637,29 +673,44 @@ def _build_wrapper(env_class: Type[Union[gym.Wrapper, BaseJiminyEnv]], overwrite their default values independently. :param env_class: Type of environment to wrap. - :param env_kwargs_default: Keyword arguments to forward to the - constructor of the wrapped environment. Note - that it will only overwrite the default - value, so it will still be possible to set - different values by explicitly defining them - when calling the constructor of the - generated wrapper. - :param block_class: Type of block to connect to the environment. - :param block_kwargs_default: Keyword arguments to forward to the - constructor of the wrapped block. - See 'env_kwargs_default'. + :param env_kwargs: Keyword arguments to forward to the constructor of + the wrapped environment. Note that it will only + overwrite the default value, so it will still be + possible to set different values by explicitly + defining them when calling the constructor of the + generated wrapper. + :param block_class: Type of block to connect to the environment, if + any. `None` to disable. + Optional: Disable by default + :param block_kwargs: Keyword arguments to forward to the constructor of + the wrapped block. See 'env_kwargs'. :param wrapper_class: Type of wrapper to use to gather the environment and the block. - :param wrapper_kwargs_default: Keyword arguments to forward to the - constructor of the wrapper. - See 'env_kwargs_default'. + :param wrapper_kwargs: Keyword arguments to forward to the constructor + of the wrapper. See 'env_kwargs'. """ # pylint: disable-all - wrapped_env_class = type( - f"{block_class.__name__}Env", # Class name - (wrapper_class,), # Bases - {}) # methods (__init__ cannot be implemented this way, cf below) + # Handling of default arguments + if block_class is not None and wrapper_class is None: + if issubclass(block_class, BaseControllerBlock): + wrapper_class = ControlledJiminyEnv + elif issubclass(block_class, BaseObserverBlock): + wrapper_class = ObservedJiminyEnv + else: + raise ValueError( + f"Block of type '{block_class}' does not support " + "automatic default wrapper type inference. Please specify " + "it manually.") + + # Assertion(s) for type checker + assert wrapper_class is not None + + # Dynamically generate wrapping class + wrapper_name = f"{wrapper_class.__name__}Wrapper" + if block_class is not None: + wrapper_name += f"{block_class.__name__}Block" + wrapped_env_class = type(wrapper_name, (wrapper_class,), {}) # Implementation of __init__ method must be done after declaration of # the class, because the required closure for calling `super()` is not @@ -671,30 +722,45 @@ def __init__(self: wrapped_env_class, # type: ignore[valid-type] environment and the controller. It will overwrite default values. """ - if env_kwargs_default is not None: - env_kwargs = {**env_kwargs_default, **kwargs} + nonlocal env_class, env_kwargs, block_class, block_kwargs, \ + wrapper_kwargs + + # Initialize constructor arguments + args = [] + + # Define the arguments related to the environment + if env_kwargs is not None: + env_kwargs = {**env_kwargs, **kwargs} else: env_kwargs = kwargs env = env_class(**env_kwargs) - block = block_class(**{**block_kwargs_default, **kwargs}) + args.append(env) + + # Define the arguments related to the block, if any + if block_class is not None: + if block_kwargs is not None: + block_kwargs = {**block_kwargs, **kwargs} + else: + block_kwargs = kwargs + args.append(block_class(env.unwrapped, **block_kwargs)) + + # Define the arguments related to the wrapper + if wrapper_kwargs is not None: + wrapper_kwargs = {**wrapper_kwargs, **kwargs} + else: + wrapper_kwargs = kwargs + super(wrapped_env_class, self).__init__( # type: ignore[arg-type] - env, block, **{**wrapper_kwargs_default, **kwargs}) + *args, **wrapper_kwargs) wrapped_env_class.__init__ = __init__ # type: ignore[misc] return wrapped_env_class - env_kwargs: Optional[Dict[str, Any]] - env_class, env_kwargs = env_config - pipeline_class = env_class - for (ctrl_class, ctrl_kwargs, wrapper_kwargs) in controllers_config: - pipeline_class = _build_wrapper( - pipeline_class, env_kwargs, ctrl_class, ctrl_kwargs, - ControlledJiminyEnv, wrapper_kwargs) - env_kwargs = None - for (obs_class, obs_kwargs, wrapper_kwargs) in observers_config: + pipeline_class = env_config['env_class'] + env_kwargs = env_config.get('env_kwargs', None) + for config in blocks_config: pipeline_class = _build_wrapper( - pipeline_class, env_kwargs, obs_class, obs_kwargs, - ObservedJiminyEnv, wrapper_kwargs) + pipeline_class, env_kwargs, **config) env_kwargs = None return pipeline_class diff --git a/python/gym_jiminy/common/gym_jiminy/common/utils.py b/python/gym_jiminy/common/gym_jiminy/common/utils.py index 61a4535af..697e91041 100644 --- a/python/gym_jiminy/common/gym_jiminy/common/utils.py +++ b/python/gym_jiminy/common/gym_jiminy/common/utils.py @@ -10,14 +10,14 @@ import jiminy_py.core as jiminy -SpaceDictRecursive = Union[ # type: ignore - Dict[str, 'SpaceDictRecursive'], np.ndarray] # type: ignore +SpaceDictNested = Union[ # type: ignore + Dict[str, 'SpaceDictNested'], np.ndarray] # type: ignore ListStrRecursive = Sequence[Union[str, 'ListStrRecursive']] # type: ignore -FieldDictRecursive = Union[ # type: ignore - Dict[str, 'FieldDictRecursive'], ListStrRecursive] # type: ignore +FieldDictNested = Union[ # type: ignore + Dict[str, 'FieldDictNested'], ListStrRecursive] # type: ignore -def zeros(space: gym.Space) -> SpaceDictRecursive: +def zeros(space: gym.Space) -> Union[SpaceDictNested, int]: """Set to zero data from `Gym.Space`. """ if isinstance(space, gym.spaces.Dict): @@ -33,7 +33,7 @@ def zeros(space: gym.Space) -> SpaceDictRecursive: f"Space of type {type(space)} is not supported by this method.") -def fill(data: SpaceDictRecursive, +def fill(data: SpaceDictNested, fill_value: float) -> None: """Set every element of 'data' from `Gym.Space` to scalar 'fill_value'. """ @@ -47,8 +47,8 @@ def fill(data: SpaceDictRecursive, f"Data of type {type(data)} is not supported by this method.") -def set_value(data: SpaceDictRecursive, - value: SpaceDictRecursive) -> None: +def set_value(data: SpaceDictNested, + value: SpaceDictNested) -> None: """Partially set 'data' from `Gym.Space` to 'value'. It avoids memory allocation, so that memory pointers of 'data' remains @@ -69,19 +69,31 @@ def set_value(data: SpaceDictRecursive, f"Data of type {type(data)} is not supported by this method.") -def _clamp(space: gym.Space, x: SpaceDictRecursive) -> SpaceDictRecursive: +def copy(data: SpaceDictNested) -> SpaceDictNested: + """Shadow copy recursively 'data' from `Gym.Space`, so that only leaves + are still references. + """ + if isinstance(data, dict): + value = data.__class__() + for field, sub_data in data.items(): + value[field] = copy(sub_data) + return value + return data + + +def _clamp(space: gym.Space, value: SpaceDictNested) -> SpaceDictNested: """Clamp an element from Gym.Space to make sure it is within bounds. :meta private: """ if isinstance(space, gym.spaces.Dict): return OrderedDict( - (k, _clamp(subspace, x[k])) + (k, _clamp(subspace, value[k])) for k, subspace in space.spaces.items()) if isinstance(space, gym.spaces.Box): - return np.clip(x, space.low, space.high) + return np.clip(value, space.low, space.high) if isinstance(space, gym.spaces.Discrete): - return np.clip(x, 0, space.n) + return np.clip(value, 0, space.n) raise NotImplementedError( f"Gym.Space of type {type(space)} is not supported by this " "method.") @@ -106,8 +118,8 @@ def _is_breakpoint(t: float, dt: float, eps: float) -> bool: def register_variables(controller: jiminy.AbstractController, - field: FieldDictRecursive, - data: SpaceDictRecursive, + field: FieldDictNested, + data: SpaceDictNested, namespace: Optional[str] = None) -> bool: """Register data from `Gym.Space` to the telemetry of a controller. diff --git a/python/gym_jiminy/common/gym_jiminy/common/wrappers.py b/python/gym_jiminy/common/gym_jiminy/common/wrappers.py index fce8ca436..ccc936048 100644 --- a/python/gym_jiminy/common/gym_jiminy/common/wrappers.py +++ b/python/gym_jiminy/common/gym_jiminy/common/wrappers.py @@ -1,3 +1,5 @@ +""" TODO: Write documentation. +""" from copy import deepcopy from functools import reduce from collections import deque @@ -7,7 +9,8 @@ import gym -from .utils import zeros, SpaceDictRecursive +from .utils import _is_breakpoint, zeros, SpaceDictNested +from .pipeline_bases import BasePipelineWrapper class PartialFrameStack(gym.Wrapper): @@ -21,7 +24,8 @@ class PartialFrameStack(gym.Wrapper): def __init__(self, env: gym.Env, nested_fields_list: Sequence[Sequence[str]], - num_stack: int): + num_stack: int, + **kwargs: Any): """ :param env: Environment to wrap. :param nested_fields_list: List of nested observation fields to stack. @@ -29,12 +33,14 @@ def __init__(self, not, then every leaves fields from this root will be stacked. :param num_stack: Number of observation frames to partially stack. + :param kwargs: Extra keyword arguments to allow automatic pipeline + wrapper generation. """ # Define helper that will be used to determine the leaf fields to stack def _get_branches(root: Any) -> Iterator[List[str]]: - if isinstance(root, dict): - for field, node in root.items(): - if isinstance(node, dict): + if isinstance(root, gym.spaces.Dict): + for field, node in root.spaces.items(): + if isinstance(node, gym.spaces.Dict): for path in _get_branches(node): yield [field] + path else: @@ -46,18 +52,14 @@ def _get_branches(root: Any) -> Iterator[List[str]]: self.num_stack = num_stack # Initialize base wrapper - super().__init__(env) - - # Define some internal buffers - self._observation: SpaceDictRecursive = zeros( - self.env.observation_space) + super().__init__(env, **kwargs) # Get the leaf fields to stack self.leaf_fields_list: List[List[str]] = [] for fields in self.nested_fields_list: root_field = reduce( - lambda d, key: d[key], fields, self._observation) - if isinstance(root_field, dict): + lambda d, key: d[key], fields, self.env.observation_space) + if isinstance(root_field, gym.spaces.Dict): leaf_paths = _get_branches(root_field) self.leaf_fields_list += [fields + path for path in leaf_paths] else: @@ -66,8 +68,8 @@ def _get_branches(root: Any) -> Iterator[List[str]]: # Compute stacked observation space self.observation_space = deepcopy(self.env.observation_space) for fields in self.leaf_fields_list: - root_space = reduce(lambda d, key: d[key], fields[:-1], - self.observation_space) + root_space = reduce( + lambda d, key: d[key], fields[:-1], self.observation_space) space = root_space[fields[-1]] if not isinstance(space, gym.spaces.Box): raise TypeError( @@ -85,45 +87,123 @@ def _get_branches(root: Any) -> Iterator[List[str]]: deque(maxlen=self.num_stack) for _ in range(len(self.leaf_fields_list))] - def get_observation(self) -> SpaceDictRecursive: + def _setup(self) -> None: + """ TODO: Write documentation. + """ + # Initialize the frames by duplicating the original one + for fields, frames in zip(self.leaf_fields_list, self._frames): + leaf_space = reduce( + lambda d, key: d[key], fields, self.env.observation_space) + for _ in range(self.num_stack): + frames.append(zeros(leaf_space)) + + def observation(self, observation: SpaceDictNested) -> SpaceDictNested: + """ TODO: Write documentation. + """ # Replace nested fields of original observation by the stacked ones for fields, frames in zip(self.leaf_fields_list, self._frames): root_obs = reduce(lambda d, key: d[key], fields[:-1], - self._observation) + observation) root_obs[fields[-1]] = np.stack(frames) - return self._observation - def step(self, - action: SpaceDictRecursive - ) -> Tuple[SpaceDictRecursive, float, bool, Dict[str, Any]]: - # Perform a single step - self._observation, reward, done, info = self.env.step(action) + # Return the stacked observation + return observation + def compute_observation(self, measure: SpaceDictNested) -> SpaceDictNested: + """ TODO: Write documentation. + """ # Backup the nested observation fields to stack for fields, frames in zip(self.leaf_fields_list, self._frames): - leaf_obs = reduce(lambda d, key: d[key], fields, self._observation) - frames.append(leaf_obs) + leaf_obs = reduce(lambda d, key: d[key], fields, measure) + frames.append(leaf_obs.copy()) # Copy to make sure not altered - return self.get_observation(), reward, done, info + # Return the stacked observation + return self.observation(measure) - def reset(self, **kwargs: Any) -> SpaceDictRecursive: - self._observation = self.env.reset(**kwargs) - - # Initialize the frames by duplicating the original one - for fields, frames in zip(self.leaf_fields_list, self._frames): - leaf_obs = reduce(lambda d, key: d[key], fields, self._observation) - for _ in range(self.num_stack): - frames.append(leaf_obs) + def step(self, + action: SpaceDictNested + ) -> Tuple[SpaceDictNested, float, bool, Dict[str, Any]]: + observation, reward, done, info = self.env.step(action) + return self.compute_observation(observation), reward, done, info - return self.get_observation() + def reset(self, **kwargs: Any) -> SpaceDictNested: + observation = self.env.reset(**kwargs) + self._setup() + return self.compute_observation(observation) -def build_wrapper(env_config: Tuple[ - Type[gym.Env], - Dict[str, Any]], - wrapper_config: Tuple[ - Type[gym.Wrapper], - Dict[str, Any]]) -> Type[gym.Env]: +class StackedJiminyEnv(BasePipelineWrapper): + """ TODO: Write documentation. + """ + def __init__(self, + env: gym.Env, + skip_frames_ratio: int = 0, + **kwargs: Any) -> None: + """ TODO: Write documentation. + """ + # Backup some user argument(s) + self.skip_frames_ratio = skip_frames_ratio + + # Initialize base classes + super().__init__(env, **kwargs) + + # Instantiate wrapper + self.wrapper = PartialFrameStack(env, **kwargs) + + # Assertion(s) for type checker + assert self.env.action_space is not None + + # Define the observation and action spaces + self.action_space = self.env.action_space + self.observation_space = self.wrapper.observation_space + + # Initialize some internal buffers + self.__n_last_stack = 0 + self._action = zeros(self.action_space) + self._observation = zeros(self.observation_space) + + def _setup(self) -> None: + # Call base implementation + super()._setup() + + # Setup wrapper + self.wrapper._setup() + + # Re-initialize some internal buffer(s) + # Note that the initial observation is always stored. + self.__n_last_stack = self.skip_frames_ratio - 1 + + # Compute the observe and control update periods + self.control_dt = self.env.control_dt + self.observe_dt = self.env.observe_dt + + # Make sure observe update is discrete-time + if (self.observe_dt <= 0.0): + raise ValueError( + "`StackedJiminyEnv` does not support time-continuous update.") + + def compute_observation(self) -> SpaceDictNested: # type: ignore[override] + # Get environment observation + obs = super().compute_observation() + + # Update observed features if necessary + t = self.simulator.stepper_state.t + if self.simulator.is_simulation_running and \ + _is_breakpoint(t, self.observe_dt, self._dt_eps): + self.__n_last_stack += 1 + if self.__n_last_stack == self.skip_frames_ratio: + self.__n_last_stack = -1 + return self.wrapper.compute_observation(obs) + else: + return self.wrapper.observation(obs) + + +def build_outer_wrapper(env_config: Tuple[ + Type[gym.Env], + Dict[str, Any]], + wrapper_config: Tuple[ + Type[gym.Wrapper], + Dict[str, Any]]) -> Type[gym.Env]: """Generate a class inheriting from `gym.Wrapper` wrapping a given type of environment. @@ -162,7 +242,7 @@ def __init__(self: gym.Wrapper) -> None: super(wrapped_env_class, self).__init__( # type: ignore[arg-type] env, **wrapper_kwargs) - def __dir__(self: gym.Wrapper) -> Sequence[str]: + def __dir__(self: gym.Wrapper) -> List[str]: """Attribute lookup. It is mainly used by autocomplete feature of Ipython. It is overloaded diff --git a/python/gym_jiminy/common/setup.py b/python/gym_jiminy/common/setup.py index 49277996d..3a6439826 100644 --- a/python/gym_jiminy/common/setup.py +++ b/python/gym_jiminy/common/setup.py @@ -42,6 +42,7 @@ install_requires=[ "gym>=0.16.0", "numba", + "typing_extensions", f"jiminy-py~={version_required}" ], extras_require=extras, diff --git a/python/gym_jiminy/envs/gym_jiminy/envs/acrobot.py b/python/gym_jiminy/envs/gym_jiminy/envs/acrobot.py index 1a5fac50a..55bdbfd30 100644 --- a/python/gym_jiminy/envs/gym_jiminy/envs/acrobot.py +++ b/python/gym_jiminy/envs/gym_jiminy/envs/acrobot.py @@ -8,7 +8,7 @@ import jiminy_py.core as jiminy from jiminy_py.simulator import Simulator -from gym_jiminy.common.env_bases import SpaceDictRecursive, BaseJiminyGoalEnv +from gym_jiminy.common.env_bases import SpaceDictNested, BaseJiminyGoalEnv # Stepper update period @@ -214,7 +214,7 @@ def compute_reward(self, # type: ignore[override] def step(self, action: Optional[np.ndarray] = None - ) -> Tuple[SpaceDictRecursive, float, bool, Dict[str, Any]]: + ) -> Tuple[SpaceDictNested, float, bool, Dict[str, Any]]: """Run a simulation step for a given action. Convert a discrete action into its actual value if necessary, then add @@ -278,7 +278,7 @@ def _sample_goal(self) -> np.ndarray: else: return HEIGHT_REL_DEFAULT_THRESHOLD * self._tipPosZMax - def compute_observation(self) -> SpaceDictRecursive: + def compute_observation(self) -> SpaceDictNested: """Fetch the observation based on the current simulation state. In practice, it just returns the current state. diff --git a/python/gym_jiminy/envs/gym_jiminy/envs/anymal.py b/python/gym_jiminy/envs/gym_jiminy/envs/anymal.py index 5937b7c0b..0aa39d928 100644 --- a/python/gym_jiminy/envs/gym_jiminy/envs/anymal.py +++ b/python/gym_jiminy/envs/gym_jiminy/envs/anymal.py @@ -6,6 +6,7 @@ from gym_jiminy.common.env_locomotion import WalkerJiminyEnv from gym_jiminy.common.control_impl import PDController +from gym_jiminy.common.wrappers import StackedJiminyEnv from gym_jiminy.common.pipeline_bases import build_pipeline @@ -63,28 +64,29 @@ def __init__(self, debug: bool = False, **kwargs): debug=debug, **kwargs) - # def _refresh_observation_space(self) -> None: - # self.observation_space = self._get_state_space() - - # def compute_observation(self) -> None: - # return np.concatenate(self._state) - ANYmalPDControlJiminyEnv = build_pipeline(**{ - 'env_config': ( - ANYmalJiminyEnv, - {} - ), - 'controllers_config': [( - PDController, - { + 'env_config': { + 'env_class': ANYmalJiminyEnv + }, + 'blocks_config': [{ + 'block_class': PDController, + 'block_kwargs': { 'update_ratio': HLC_TO_LLC_RATIO, 'pid_kp': PID_KP, 'pid_kd': PID_KD }, + 'wrapper_kwargs': { + 'augment_observation': True + }}, { - 'augment_observation': False - } - )], - 'observers_config': () + 'wrapper_class': StackedJiminyEnv, + 'wrapper_kwargs': { + 'nested_fields_list': [ + ('targets',) + ], + 'num_stack': 3, + 'skip_frames_ratio': 1 + }} + ] }) diff --git a/python/gym_jiminy/envs/gym_jiminy/envs/atlas.py b/python/gym_jiminy/envs/gym_jiminy/envs/atlas.py index d2a4e62a8..5f3f40ca8 100644 --- a/python/gym_jiminy/envs/gym_jiminy/envs/atlas.py +++ b/python/gym_jiminy/envs/gym_jiminy/envs/atlas.py @@ -104,20 +104,18 @@ def joint_position_idx(joint_name): AtlasPDControlJiminyEnv = build_pipeline(**{ - 'env_config': ( - AtlasJiminyEnv, - {} - ), - 'controllers_config': [( - PDController, - { + 'env_config': { + 'env_class': AtlasJiminyEnv + }, + 'blocks_config': [{ + 'block_class': PDController, + 'block_kwargs': { 'update_ratio': HLC_TO_LLC_RATIO, 'pid_kp': PID_KP, 'pid_kd': PID_KD }, - { + 'wrapper_kwargs': { 'augment_observation': False - } - )], - 'observers_config': () + }} + ] }) diff --git a/python/gym_jiminy/envs/gym_jiminy/envs/cartpole.py b/python/gym_jiminy/envs/gym_jiminy/envs/cartpole.py index a9efe2824..941a2d92b 100644 --- a/python/gym_jiminy/envs/gym_jiminy/envs/cartpole.py +++ b/python/gym_jiminy/envs/gym_jiminy/envs/cartpole.py @@ -10,7 +10,7 @@ import jiminy_py.core as jiminy from jiminy_py.simulator import Simulator -from gym_jiminy.common.env_bases import SpaceDictRecursive, BaseJiminyEnv +from gym_jiminy.common.env_bases import SpaceDictNested, BaseJiminyEnv # Stepper update period @@ -198,7 +198,7 @@ def compute_reward(self, # type: ignore[override] return reward def compute_command(self, - measure: SpaceDictRecursive, + measure: SpaceDictNested, action: np.ndarray ) -> np.ndarray: """ TODO: Write documentation. diff --git a/python/jiminy_py/src/jiminy_py/controller.py b/python/jiminy_py/src/jiminy_py/controller.py index c26e9bfb9..3ee1530f4 100644 --- a/python/jiminy_py/src/jiminy_py/controller.py +++ b/python/jiminy_py/src/jiminy_py/controller.py @@ -12,39 +12,35 @@ from tqdm import tqdm +ObserverHandleType = Callable[[ + float, np.ndarray, np.ndarray, jiminy.sensorsData], None] ControllerHandleType = Callable[[ - float, np.ndarray, np.ndarray, jiminy.sensorsData, np.ndarray -], None] + float, np.ndarray, np.ndarray, jiminy.sensorsData, np.ndarray], None] -class BaseJiminyController(jiminy.ControllerFunctor): - """Base class to instantiate a Jiminy controller based on a callable - function that can be changed on-the-fly. - - This class is primarily intended to be used for prototyping, for those who - want to experiment various models of internal dynamics for a given robot. +class BaseJiminyObserverController(jiminy.ControllerFunctor): + """Base class to instantiate a Jiminy observer and/or controller based on + callables that can be changed on-the-fly. .. note:: Native Python implementation of internal dynamics would be extremely - slow, so it is recommended to compile this class using for instance a - JIT compiler such a Numba. Doing it is out of scope of this project - for now and left to the user. + slow, so it is recommended to compile this class using a JIT compiler + such a Numba. Doing it is left to the user. """ def __init__(self) -> None: - """ - .. note:: - Since no robot is available at this point, it is not possible to - specify a callable function directly, since it would be impossible - to check that it has the right signature and is properly defined. - """ # Define some buffer to help factorizing computations self.robot: Optional[jiminy.Robot] = None # Internal buffer for progress bar management self.__pbar: Optional[tqdmType] = None + # Define some internal buffers + self.__must_refresh_observer = True + self.__controller_handle = \ + lambda t, q, v, sensors_data, u: u.fill(0.0) + self.__observer_handle = lambda t, q, v, sensors_data: None + # Initialize base controller - self.__controller_handle = lambda t, q, v, sensors_data, u: u.fill(0.0) super().__init__( self.__compute_command, self.internal_dynamics) @@ -68,10 +64,41 @@ def reset(self) -> None: super().reset() self.close_progress_bar() + def set_observer_handle(self, + observer_handle: ObserverHandleType) -> None: + r"""Set the observer callback function. + + By default, the observer update period is the same of the controller. + One is responsible to implement custom breakpoint point management if + it must be different. + + :param compute_command: + .. raw:: html + + Controller entry-point, implemented as a callback function. + It must have the following signature: + + | controller_handle\(**t**: float, + | **q**: np.ndarray, + | **v**: np.ndarray, + | **sensors_data**: jiminy_py.core.sensorsData + | \) -> None + """ + try: + t = 0.0 + y, dy = np.zeros(self.robot.nq), np.zeros(self.robot.nv) + sensors_data = self.robot.sensors_data + observer_handle(t, y, dy, sensors_data) + self.__observer_handle = observer_handle + except Exception as e: + raise RuntimeError( + "The observer handle has wrong signature. It is expected:" + "\ncontroller_handle(t, y, dy, sensorsData) -> None" + ) from e + def set_controller_handle(self, - controller_handle: ControllerHandleType - ) -> None: - r"""Set the controller callback function to use. + controller_handle: ControllerHandleType) -> None: + r"""Set the controller callback function. :param compute_command: .. raw:: html @@ -79,12 +106,12 @@ def set_controller_handle(self, Controller entry-point, implemented as a callback function. It must have the following signature: - | controller_handle\( - | **t**: float, + | controller_handle\(**t**: float, | **q**: np.ndarray, | **v**: np.ndarray, | **sensors_data**: jiminy_py.core.sensorsData, - | **u_command**: np.ndarray\) -> None + | **u_command**: np.ndarray + | \) -> None """ try: t = 0.0 @@ -95,7 +122,7 @@ def set_controller_handle(self, self.__controller_handle = controller_handle except Exception as e: raise RuntimeError( - "The controller handle has a wrong signature. It is expected:" + "The controller handle has wrong signature. It is expected:" "\ncontroller_handle(t, y, dy, sensorsData, u_command) -> None" ) from e @@ -109,7 +136,21 @@ def __compute_command(self, """ if self.__pbar is not None: self.__pbar.update(t - self.__pbar.n) + if self.__must_refresh_observer: + self.__observer_handle(t, q, v, sensors_data) self.__controller_handle(t, q, v, sensors_data, u) + self.__must_refresh_observer = True + + def refresh_observation(self, + t: float, + q: np.ndarray, + v: np.ndarray, + sensors_data: jiminy.sensorsData) -> None: + """Refresh observer. + """ + if self.__must_refresh_observer: + self.__observer_handle(t, q, v, sensors_data) + self.__must_refresh_observer = False def set_progress_bar(self, tf: float) -> None: """Reset the progress bar. It must be called manually after calling @@ -130,7 +171,7 @@ def internal_dynamics(self, q: np.ndarray, v: np.ndarray, sensors_data: jiminy.sensorsData, - u_command: np.ndarray) -> None: + u: np.ndarray) -> None: """Internal dynamics of the robot. Overload this method to implement a custom internal dynamics for the diff --git a/python/jiminy_py/src/jiminy_py/simulator.py b/python/jiminy_py/src/jiminy_py/simulator.py index 433cba737..30f2c35ca 100644 --- a/python/jiminy_py/src/jiminy_py/simulator.py +++ b/python/jiminy_py/src/jiminy_py/simulator.py @@ -8,7 +8,7 @@ import matplotlib import matplotlib.pyplot as plt from matplotlib.widgets import Button -from typing import Optional, Union, Type, Dict, Tuple, Sequence, Any +from typing import Optional, Union, Type, Dict, Tuple, Sequence, List, Any import pinocchio as pin @@ -19,7 +19,7 @@ ForceSensor as force, ImuSensor as imu) from .robot import generate_hardware_description_file, BaseJiminyRobot -from .controller import BaseJiminyController +from .controller import BaseJiminyObserverController from .viewer import Viewer @@ -56,10 +56,10 @@ def __init__(self, viewer_backend: Optional[str] = None): """ :param robot: Jiminy robot already initialized. - :param controller: Jiminy controller already initialized. - Optional: jiminy_py.core.ControllerFunctor doing + :param controller: Jiminy (observer-)controller already initialized. + Optional: `BaseJiminyObserverController` doing nothing by default. - :param engine_class: The class of engine to use. + :param engine_class: Class of engine to use. Optional: jiminy_py.core.Engine by default. :param use_theoretical_model: Whether the state corresponds to the theoretical model when updating and @@ -75,7 +75,7 @@ def __init__(self, # Instantiate and initialize a controller doing nothing if necessary if controller is None: - controller = BaseJiminyController() + controller = BaseJiminyObserverController() controller.initialize(robot) # Instantiate the low-level Jiminy engine, then initialize it @@ -210,7 +210,7 @@ def __getattr__(self, name: str) -> Any: return AttributeError( f"'{self.__class__}' object has no attribute '{name}'.") - def __dir__(self) -> Sequence[str]: + def __dir__(self) -> List[str]: """Attribute lookup. It is mainly used by autocomplete feature of Ipython. It is overloaded @@ -283,6 +283,52 @@ def seed(self, seed: int) -> None: self.engine.set_options(engine_options) self.engine.reset() + def start(self, + q0: np.ndarray, + v0: np.ndarray, + is_state_theoretical: bool = False) -> None: + """Initialize a simulation, starting from x0=(q0,v0) at t=0. + + :param q0: Initial configuration. + :param v0: Initial velocity. + :param is_state_theoretical: Whether or not the initial state is + associated with the actual or theoretical + model of the robot. + """ + hresult = self.engine.start(q0, v0, is_state_theoretical) + if hresult != jiminy.hresult_t.SUCCESS: + raise RuntimeError( + "Failed to start the simulation. Probably because the " + "initial state is invalid.") + + # Update the observer at the end, if suitable + if isinstance(self.controller, BaseJiminyObserverController): + self.controller.refresh_observation( + self.engine.stepper_state.t, + self.engine.system_state.q, + self.engine.system_state.v, + self.robot.sensors_data) + + def step(self, step_dt: float = -1) -> None: + """Integrate system dynamics from current state for a given duration. + + :param step_dt: Duration for which to integrate. -1 to use default + duration, namely until the next breakpoint if any, + or 'engine_options["stepper"]["dtMax"]'. + """ + # Perform a single integration step + return_code = self.engine.step(step_dt) + if return_code != jiminy.hresult_t.SUCCESS: + raise RuntimeError("Failed to perform the simulation step.") + + # Update the observer at the end, if suitable + if isinstance(self.controller, BaseJiminyObserverController): + self.controller.refresh_observation( + self.engine.stepper_state.t, + self.engine.system_state.q, + self.engine.system_state.v, + self.robot.sensors_data) + def run(self, tf: float, q0: np.ndarray, @@ -295,9 +341,9 @@ def run(self, .. note:: Optionally, log the result of the simulation. + :param tf: Simulation end time. :param q0: Initial configuration. :param v0: Initial velocity. - :param tf: Simulation end time. :param is_state_theoretical: Whether or not the initial state is associated with the actual or theoretical model of the robot. @@ -317,7 +363,8 @@ def run(self, if show_progress_bar: raise RuntimeError( "'show_progress_bar' can only be used with controller " - "inherited from `BaseJiminyController`.") from e + "inherited from `BaseJiminyObserverController`." + ) from e show_progress_bar = False self.engine.simulate(tf, q0, v0, is_state_theoretical) if show_progress_bar is not False: diff --git a/unit_py/test_simple_pendulum.py b/unit_py/test_simple_pendulum.py index 91bb6cdce..fff2de466 100644 --- a/unit_py/test_simple_pendulum.py +++ b/unit_py/test_simple_pendulum.py @@ -208,12 +208,12 @@ def dynamics(t, x): # Compute sensor acceleration, i.e. acceleration in polar coordinates theta = x_rk_python[:, 0] dtheta = x_rk_python[:, 1] + dtheta = x_rk_python[:, 1] # Acceleration: to resolve algebraic loop (current acceleration is # function of input which itself is function of sensor signal, sensor - # data is computed using q_t, v_t, a_(t-1) - ddtheta = np.concatenate([ - np.zeros(1), dynamics(0.0, x_rk_python)[:-1, 1]]) + # data is computed using q_t, v_t, a_t + ddtheta = dynamics(0.0, x_rk_python)[:, 1] expected_accel = np.stack([ - self.l * ddtheta + self.g * np.sin(theta),