diff --git a/airo_planner/__init__.py b/airo_planner/__init__.py index 77f1a57..29c9078 100644 --- a/airo_planner/__init__.py +++ b/airo_planner/__init__.py @@ -3,6 +3,10 @@ # but for building and many toolings, you still need to have __init__ files (at least in the root of the package). # e.g. if you remove this init file and try to build with pip install . # you won't be able to import the dummy module. + +# We disable isort for the entire __init__.py file, because the order of imports is important here. +# If a function depends on another function, the dependent function should be imported after the function it depends on. +# isort: skip_file from airo_planner.exceptions import ( AllGoalConfigurationsRemovedError, GoalConfigurationOutOfBoundsError, @@ -30,7 +34,7 @@ ) from airo_planner.selection.path_selection import choose_shortest_path -from airo_planner.ompl.utilities import ( # isort:skip +from airo_planner.ompl.utilities import ( function_to_ompl, state_to_ompl, path_from_ompl, @@ -41,14 +45,22 @@ ) -from airo_planner.interfaces import DualArmPlanner, SingleArmPlanner # isort:skip -from airo_planner.ompl.single_arm_ompl_planner import SingleArmOmplPlanner # isort:skip -from airo_planner.ompl.dual_arm_ompl_planner import DualArmOmplPlanner # isort:skip +from airo_planner.interfaces import DualArmPlanner, SingleArmPlanner +from airo_planner.multiple_goal_planner import ( + MultipleGoalPlanner, + JointConfigurationsModifierType, + JointPathChooserType, +) +from airo_planner.ompl.single_arm_ompl_planner import SingleArmOmplPlanner +from airo_planner.ompl.dual_arm_ompl_planner import DualArmOmplPlanner __all__ = [ "SingleArmPlanner", "DualArmPlanner", + "MultipleGoalPlanner", + "JointConfigurationsModifierType", + "JointPathChooserType", "SingleArmOmplPlanner", "DualArmOmplPlanner", "JointBoundsType", diff --git a/airo_planner/multiple_goal_planner.py b/airo_planner/multiple_goal_planner.py new file mode 100644 index 0000000..6c71a2a --- /dev/null +++ b/airo_planner/multiple_goal_planner.py @@ -0,0 +1,161 @@ +import abc +import time +from typing import Callable + +import numpy as np +from airo_typing import JointConfigurationCheckerType, JointConfigurationType, JointPathType +from loguru import logger + +from airo_planner import ( + AllGoalConfigurationsRemovedError, + JointBoundsType, + NoInverseKinematicsSolutionsWithinBoundsError, + NoPathFoundError, + NoValidInverseKinematicsSolutionsError, + choose_shortest_path, +) + +# TODO move this to airo_typing? +JointConfigurationsModifierType = Callable[[list[JointConfigurationType]], list[JointConfigurationType]] +JointPathChooserType = Callable[[list[JointPathType]], JointPathType] + + +class MultipleGoalPlanner(abc.ABC): + """Base class for planning to a finite set of candidate goal configurations.""" + + def __init__( + self, + is_state_valid_fn: JointConfigurationCheckerType, + joint_bounds: JointBoundsType, + filter_goal_configurations_fn: JointConfigurationsModifierType | None = None, + rank_goal_configurations_fn: JointConfigurationsModifierType | None = None, + choose_path_fn: JointPathChooserType = choose_shortest_path, + ): + self.is_state_valid_fn = is_state_valid_fn + self.joint_bounds = joint_bounds + + # Functions for planning to multiple goal configurations + self.filter_goal_configurations_fn = filter_goal_configurations_fn + self.rank_goal_configurations_fn = rank_goal_configurations_fn + self.choose_path_fn = choose_path_fn + + # Used for debugging + self._all_paths: list[JointPathType] | None = None + self._goal_configurations: list[JointConfigurationType] | None = None + + @abc.abstractmethod + def _plan_to_joint_configuration_stacked( + self, start_configuration: JointConfigurationType, goal_configuration: JointConfigurationType + ) -> JointPathType: + raise NotImplementedError + + def _check_ik_solutions_validity(self, ik_solutions: list[JointConfigurationType]) -> list[JointConfigurationType]: + """Used by plan_to_tcp_pose() to check which IK solutions are valid.""" + ik_solutions_valid = [s for s in ik_solutions if self.is_state_valid_fn(s)] + if len(ik_solutions_valid) == 0: + raise NoValidInverseKinematicsSolutionsError(ik_solutions) + + logger.info(f"Found {len(ik_solutions_valid)}/{len(ik_solutions)} valid solutions.") + return ik_solutions_valid + + def _filter_ik_solutions(self, ik_solutions: list[JointConfigurationType]) -> list[JointConfigurationType]: + """Used by plan_to_tcp_pose() to filter IK solutions.""" + if self.filter_goal_configurations_fn is None: + return ik_solutions + + ik_solutions_filtered = self.filter_goal_configurations_fn(ik_solutions) + if len(ik_solutions_filtered) == 0: + raise AllGoalConfigurationsRemovedError(ik_solutions) + + logger.info(f"Filtered IK solutions to {len(ik_solutions_filtered)}/{len(ik_solutions)} solutions.") + return ik_solutions_filtered + + def _rank_ik_solutions(self, ik_solutions: list[JointConfigurationType]) -> list[JointConfigurationType]: + """Used by plan_to_tcp_pose() to rank IK solutions.""" + if self.rank_goal_configurations_fn is None: + return ik_solutions + + logger.info("Ranking IK solutions.") + ik_solutions_ranked = self.rank_goal_configurations_fn(ik_solutions) + + if len(ik_solutions_ranked) != len(ik_solutions): + logger.warning( + f"Ranking function changed the number of IK solutions from {len(ik_solutions)} to {len(ik_solutions_ranked)}." + ) + + if len(ik_solutions_ranked) == 0: + raise RuntimeError("Ranking function removed all IK solutions. This is probably an implementation error.") + + return ik_solutions_ranked + + def _check_ik_solutions_bounds(self, ik_solutions: list[JointConfigurationType]) -> list[JointConfigurationType]: + """Used by plan_to_tcp_pose() to check which IK solutions are within the joint bounds.""" + ik_solutions_within_bounds = [] + joints_lower, joints_upper = self.joint_bounds + for ik_solution in ik_solutions: + if np.all(ik_solution >= joints_lower) and np.all(ik_solution <= joints_upper): + ik_solutions_within_bounds.append(ik_solution) + + if len(ik_solutions_within_bounds) == 0: + raise NoInverseKinematicsSolutionsWithinBoundsError(ik_solutions, self.joint_bounds) + + logger.info(f"Found {len(ik_solutions_within_bounds)}/{len(ik_solutions)} solutions within joint bounds.") + return ik_solutions_within_bounds + + def _plan_to_goal_configurations( + self, + start_configuration: JointConfigurationType, + goal_configurations: list[JointConfigurationType], + return_first_success: bool = False, + ) -> JointPathType: + self._goal_configurations = goal_configurations # Saved for debugging + + # Try solving to each goal configuration + start_time = time.time() + paths = [] + for i, goal_configuration in enumerate(goal_configurations): + try: + path = self._plan_to_joint_configuration_stacked(start_configuration, goal_configuration) + except NoPathFoundError: + logger.info(f"No path found to goal configuration: {i}.") + continue + + if return_first_success: + logger.info(f"Returning first successful path (planning time: {time.time() - start_time:.2f} s).") + return path + paths.append(path) + + end_time = time.time() + + if len(paths) == 0: + raise NoPathFoundError(start_configuration, goal_configurations) + + self._all_paths = paths # Saved for debugging + logger.info(f"Found {len(paths)} paths towards IK solutions, (planning time: {end_time - start_time:.2f} s).") + + solution_path = self.choose_path_fn(paths) + + if solution_path is None: + raise RuntimeError(f"Path choosing function did not return a path out of {len(paths)} options.") + + logger.info(f"Chose path with {len(solution_path)} waypoints.") + + return solution_path + + def plan_to_ik_solutions( + self, start_configuration: JointConfigurationType, ik_solutions: list[JointConfigurationType] + ) -> JointPathType: + """Plan to a list of IK solutions.""" + ik_solutions_within_bounds = self._check_ik_solutions_bounds(ik_solutions) + ik_solutions_valid = self._check_ik_solutions_validity(ik_solutions_within_bounds) + ik_solutions_filtered = self._filter_ik_solutions(ik_solutions_valid) + ik_solutions_ranked = self._rank_ik_solutions(ik_solutions_filtered) + + logger.info(f"Running OMPL towards {len(ik_solutions_ranked)} IK solutions.") + + return_first_success = True if self.rank_goal_configurations_fn is not None else False + + solution_path = self._plan_to_goal_configurations( + start_configuration, ik_solutions_ranked, return_first_success + ) + return solution_path diff --git a/airo_planner/ompl/dual_arm_ompl_planner.py b/airo_planner/ompl/dual_arm_ompl_planner.py index 523965c..f1582ce 100644 --- a/airo_planner/ompl/dual_arm_ompl_planner.py +++ b/airo_planner/ompl/dual_arm_ompl_planner.py @@ -6,12 +6,18 @@ JointConfigurationType, JointPathType, ) +from loguru import logger from airo_planner import ( DualArmPlanner, JointBoundsType, + JointConfigurationsModifierType, + JointPathChooserType, + MultipleGoalPlanner, + NoInverseKinematicsSolutionsError, NoPathFoundError, SingleArmOmplPlanner, + choose_shortest_path, concatenate_joint_bounds, create_simple_setup, solve_and_smooth_path, @@ -21,7 +27,7 @@ ) -class DualArmOmplPlanner(DualArmPlanner): +class DualArmOmplPlanner(DualArmPlanner, MultipleGoalPlanner): def __init__( self, is_state_valid_fn: JointConfigurationCheckerType, @@ -29,6 +35,9 @@ def __init__( joint_bounds_right: JointBoundsType | None = None, inverse_kinematics_left_fn: InverseKinematicsFunctionType | None = None, inverse_kinematics_right_fn: InverseKinematicsFunctionType | None = None, + filter_goal_configurations_fn: JointConfigurationsModifierType | None = None, + rank_goal_configurations_fn: JointConfigurationsModifierType | None = None, + choose_path_fn: JointPathChooserType = choose_shortest_path, degrees_of_freedom_left: int = 6, degrees_of_freedom_right: int = 6, ): @@ -36,15 +45,26 @@ def __init__( self.inverse_kinematics_left_fn = inverse_kinematics_left_fn self.inverse_kinematics_right_fn = inverse_kinematics_right_fn - # Combine the joint bounds for the left and right arm for simplicity self.degrees_of_freedom: int = degrees_of_freedom_left + degrees_of_freedom_right + self.degrees_of_freedom_left: int = degrees_of_freedom_left + self.degrees_of_freedom_right: int = degrees_of_freedom_right + # Combine the joint bounds for the left and right arm for simplicity self._initialize_joint_bounds( joint_bounds_left, joint_bounds_right, degrees_of_freedom_left, degrees_of_freedom_right ) + # Initialize MultipleGoalPlanner + super().__init__( + is_state_valid_fn, + self.joint_bounds, + filter_goal_configurations_fn, + rank_goal_configurations_fn, + choose_path_fn, + ) + # The OMPL SimpleSetup for dual arm plannig is exactly the same as for single arm planning - self._simple_setup = create_simple_setup(self.is_state_valid_fn, self._joint_bounds) + self._simple_setup = create_simple_setup(self.is_state_valid_fn, self.joint_bounds) # We use SingleArmOmplPlanner to handle planning for a single arm requests # Note that we (re)create these when start and goal config are set, because @@ -70,9 +90,9 @@ def _initialize_joint_bounds( joint_bounds_right_ = joint_bounds_right joint_bounds = concatenate_joint_bounds([joint_bounds_left_, joint_bounds_right_]) - self._joint_bounds = joint_bounds - self._joint_bounds_left = joint_bounds_left_ - self._joint_bounds_right = joint_bounds_right_ + self.joint_bounds = joint_bounds + self.joint_bounds_left = joint_bounds_left_ + self.joint_bounds_right = joint_bounds_right_ def _set_start_and_goal_configurations( self, @@ -89,6 +109,11 @@ def _set_start_and_goal_configurations( goal_state = state_to_ompl(goal_configuration, space) self._simple_setup.setStartAndGoalStates(start_state, goal_state) + def _create_single_arm_planners( + self, + start_configuration_left: JointConfigurationType, + start_configuration_right: JointConfigurationType, + ) -> None: # Replace single arm planners for the left and right arm def is_left_state_valid_fn(left_state: JointConfigurationType) -> bool: return self.is_state_valid_fn(np.concatenate((left_state, start_configuration_right))) @@ -98,24 +123,16 @@ def is_right_state_valid_fn(right_state: JointConfigurationType) -> bool: self._single_arm_planner_left = SingleArmOmplPlanner( is_left_state_valid_fn, - self._joint_bounds_left, + self.joint_bounds_left, self.inverse_kinematics_left_fn, ) self._single_arm_planner_right = SingleArmOmplPlanner( is_right_state_valid_fn, - self._joint_bounds_right, + self.joint_bounds_right, self.inverse_kinematics_right_fn, ) - self._single_arm_planner_left._set_start_and_goal_configurations( - start_configuration_left, goal_configuration_left - ) - - self._single_arm_planner_right._set_start_and_goal_configurations( - start_configuration_right, goal_configuration_right - ) - def _plan_to_joint_configuration_dual_arm( self, start_configuration_left: JointConfigurationType, @@ -135,7 +152,9 @@ def _plan_to_joint_configuration_dual_arm( path = solve_and_smooth_path(simple_setup) if path is None: - raise NoPathFoundError(start_configuration_left, goal_configuration_left) + start_configuration = stack_joints(start_configuration_left, start_configuration_right) + goal_configuration = stack_joints(goal_configuration_left, goal_configuration_right) + raise NoPathFoundError(start_configuration, goal_configuration) return path @@ -145,11 +164,10 @@ def _plan_to_joint_configuration_left_arm_only( start_configuration_right: JointConfigurationType, goal_configuration_left: JointConfigurationType, ) -> JointPathType: - # Set right goal to right start configuration - self._set_start_and_goal_configurations( - start_configuration_left, start_configuration_right, goal_configuration_left, start_configuration_right + self._create_single_arm_planners( + start_configuration_left, + start_configuration_right, ) - assert self._single_arm_planner_left is not None # Mypy needs this to be explicit left_path = self._single_arm_planner_left.plan_to_joint_configuration( @@ -157,7 +175,7 @@ def _plan_to_joint_configuration_left_arm_only( ) if left_path is None: - return None + raise NoPathFoundError(start_configuration_left, goal_configuration_left) path = stack_joints(left_path, start_configuration_right) return path @@ -168,11 +186,10 @@ def _plan_to_joint_configuration_right_arm_only( start_configuration_right: JointConfigurationType, goal_configuration_right: JointConfigurationType, ) -> JointPathType: - # Set left goal to left start configuration - self._set_start_and_goal_configurations( - start_configuration_left, start_configuration_right, start_configuration_left, goal_configuration_right + self._create_single_arm_planners( + start_configuration_left, + start_configuration_right, ) - assert self._single_arm_planner_right is not None # Mypy needs this to be explicit right_path = self._single_arm_planner_right.plan_to_joint_configuration( @@ -180,7 +197,7 @@ def _plan_to_joint_configuration_right_arm_only( ) if right_path is None: - return None + NoPathFoundError(start_configuration_right, goal_configuration_right) path = stack_joints(start_configuration_left, right_path) return path @@ -215,6 +232,92 @@ def plan_to_joint_configuration( ) return path + def _plan_to_joint_configuration_stacked( + self, start_configuration: JointConfigurationType, goal_configuration: JointConfigurationType + ) -> JointPathType: + """This function is used by the MultipleGoalPlanner class.""" + start_configuration_left = start_configuration[: self.degrees_of_freedom_left] + start_configuration_right = start_configuration[self.degrees_of_freedom_left :] + goal_configuration_left = goal_configuration[: self.degrees_of_freedom_left] + goal_configuration_right = goal_configuration[self.degrees_of_freedom_left :] + return self._plan_to_joint_configuration_dual_arm( + start_configuration_left, start_configuration_right, goal_configuration_left, goal_configuration_right + ) + + def _plan_to_tcp_pose_left_arm_only( + self, + start_configuration_left: JointConfigurationType, + start_configuration_right: JointConfigurationType, + tcp_pose_left: HomogeneousMatrixType, + ) -> JointPathType: + self._create_single_arm_planners( + start_configuration_left, + start_configuration_right, + ) + + assert self._single_arm_planner_left is not None # Mypy needs this to be explicit + + left_path = self._single_arm_planner_left.plan_to_tcp_pose(start_configuration_left, tcp_pose_left) + path = stack_joints(left_path, start_configuration_right) + return path + + def _plan_to_tcp_pose_right_arm_only( + self, + start_configuration_left: JointConfigurationType, + start_configuration_right: JointConfigurationType, + tcp_pose_right: HomogeneousMatrixType, + ) -> JointPathType: + self._create_single_arm_planners( + start_configuration_left, + start_configuration_right, + ) + + assert self._single_arm_planner_right is not None + + right_path = self._single_arm_planner_right.plan_to_tcp_pose(start_configuration_right, tcp_pose_right) + path = stack_joints(start_configuration_left, right_path) + return path + + def _calculate_ik_solutions( + self, tcp_pose_left: HomogeneousMatrixType, tcp_pose_right: HomogeneousMatrixType + ) -> list[JointConfigurationType]: + """Used by plan_to_tcp_pose() to calculate IK solutions when planning to two TCP poses. + + TODO: This function is very similar to the one in SingleArmPlanner. Consider sharing some logic. + """ + + if self.inverse_kinematics_left_fn is None: + raise AttributeError( + "No inverse kinematics function for left arm was provided (required for planning to TCP poses)." + ) + + if self.inverse_kinematics_right_fn is None: + raise AttributeError( + "No inverse kinematics function for right arm was provided (required for planning to TCP poses)." + ) + + ik_solutions_left = self.inverse_kinematics_left_fn(tcp_pose_left) + ik_solutions_right = self.inverse_kinematics_right_fn(tcp_pose_right) + + ik_solutions_left = [solution.squeeze() for solution in ik_solutions_left] + ik_solutions_right = [solution.squeeze() for solution in ik_solutions_right] + + if ik_solutions_left is None or len(ik_solutions_left) == 0: + raise NoInverseKinematicsSolutionsError(tcp_pose_left) + + if ik_solutions_right is None or len(ik_solutions_right) == 0: + raise NoInverseKinematicsSolutionsError(tcp_pose_right) + + ik_solution_pairs_stacked = [] + for ik_solution_left in ik_solutions_left: + for ik_solution_right in ik_solutions_right: + ik_solution_pairs_stacked.append(stack_joints(ik_solution_left, ik_solution_right)) + + logger.info( + f"IK returned {len(ik_solution_pairs_stacked)} pairs of IK solutions ({len(ik_solutions_left)} x {len(ik_solutions_right)})." + ) + return ik_solution_pairs_stacked + def plan_to_tcp_pose( self, start_configuration_left: HomogeneousMatrixType, @@ -224,193 +327,37 @@ def plan_to_tcp_pose( ) -> JointPathType: if tcp_pose_left is None and tcp_pose_right is None: raise ValueError("A goal TCP pose must be specified for at least one of the arms.") - raise NoPathFoundError(start_configuration_left, start_configuration_right) # this just temporary - - # def _plan_to_tcp_pose_left_arm_only( - # self, - # start_configuration_left: JointConfigurationType, - # start_configuration_right: JointConfigurationType, - # tcp_pose_left_in_base: HomogeneousMatrixType, - # desirable_goal_configurations_left: list[JointConfigurationType] | None = None, - # ) -> list[tuple[JointConfigurationType, JointConfigurationType]] | None: - # # Set right goal to right start configuration - # self._set_start_and_goal_configurations( - # start_configuration_left, start_configuration_right, start_configuration_left, start_configuration_right - # ) - - # left_path = self._single_arm_planner_left.plan_to_tcp_pose( - # start_configuration_left, tcp_pose_left_in_base, desirable_goal_configurations_left - # ) - - # if left_path is None: - # return None - - # path = [(left_state, start_configuration_right) for left_state in left_path] - # return path - - # def _plan_to_tcp_pose_right_arm_only( - # self, - # start_configuration_left: JointConfigurationType, - # start_configuration_right: JointConfigurationType, - # tcp_pose_right_in_base: HomogeneousMatrixType, - # desirable_goal_configurations_right: list[JointConfigurationType] | None = None, - # ) -> list[tuple[JointConfigurationType, JointConfigurationType]] | None: - # # Set left goal to left start configuration - # self._set_start_and_goal_configurations( - # start_configuration_left, start_configuration_right, start_configuration_left, start_configuration_right - # ) - - # right_path = self._single_arm_planner_right.plan_to_tcp_pose( - # start_configuration_right, tcp_pose_right_in_base, desirable_goal_configurations_right - # ) - - # if right_path is None: - # return None - - # path = [(start_configuration_left, right_state) for right_state in right_path] - # return path - - # def _plan_to_tcp_pose_dual_arm( # noqa: C901 - # self, - # start_configuration_left: JointConfigurationType, - # start_configuration_right: JointConfigurationType, - # tcp_pose_left_in_base: HomogeneousMatrixType, - # tcp_pose_right_in_base: HomogeneousMatrixType, - # ) -> list[tuple[JointConfigurationType, JointConfigurationType]] | None: - # if self.inverse_kinematics_left_fn is None or self.inverse_kinematics_right_fn is None: - # logger.info( - # "Planning to left and right TCP poses attempted but inverse_kinematics_fn was not provided for both arms, returing None." - # ) - - # # MyPy needs this to be explicit - # assert self.inverse_kinematics_left_fn is not None - # assert self.inverse_kinematics_right_fn is not None - - # # 1. do IK for both arms - # ik_solutions_left = self.inverse_kinematics_left_fn(tcp_pose_left_in_base) - # ik_solutions_right = self.inverse_kinematics_right_fn(tcp_pose_right_in_base) - - # if ik_solutions_left is None or len(ik_solutions_left) == 0: - # logger.info("IK for left arm returned no solutions, returning None.") - # return None - # else: - # logger.info(f"Found {len(ik_solutions_left)} IK solutions for left arm.") - - # if ik_solutions_right is None or len(ik_solutions_right) == 0: - # logger.info("IK for right arm returned no solutions, returning None.") - # return None - # else: - # logger.info(f"Found {len(ik_solutions_right)} IK solutions for right arm.") - - # # 2. filter out IK solutions that are outside the joint bounds - # ik_solutions_in_bounds_left = [] - # for ik_solution in ik_solutions_left: - # if np.all(ik_solution >= self._joint_bounds_left[0]) and np.all(ik_solution <= self._joint_bounds_left[1]): - # ik_solutions_in_bounds_left.append(ik_solution) - - # ik_solutions_in_bounds_right = [] - # for ik_solution in ik_solutions_right: - # if np.all(ik_solution >= self._joint_bounds_right[0]) and np.all(ik_solution <= self._joint_bounds_right[1]): - # ik_solutions_in_bounds_right.append(ik_solution) - - # if len(ik_solutions_in_bounds_left) == 0: - # logger.info("No IK solutions for left arm are within the joint bounds, returning None.") - # return None - # else: - # logger.info( - # f"Found {len(ik_solutions_in_bounds_left)}/{len(ik_solutions_left)} IK solutions within the joint bounds for left arm." - # ) - - # if len(ik_solutions_in_bounds_right) == 0: - # logger.info("No IK solutions for right arm are within the joint bounds, returning None.") - # return None - # else: - # logger.info( - # f"Found {len(ik_solutions_in_bounds_right)}/{len(ik_solutions_right)} IK solutions within the joint bounds for right arm." - # ) - - # # 2. create all goal pairs - # goal_configurations = [] - # for ik_solution_left in ik_solutions_in_bounds_left: - # for ik_solution_right in ik_solutions_in_bounds_right: - # goal_configurations.append(np.concatenate((ik_solution_left, ik_solution_right))) - - # n_goal_configurations = len(goal_configurations) - - # # 3. filter out invalid goal pairs - # goal_configurations_valid = [s for s in goal_configurations if self.is_state_valid_fn(s)] - # n_valid_goal_configurations = len(goal_configurations_valid) - - # if n_valid_goal_configurations == 0: - # logger.info(f"All {n_goal_configurations} goal pairs are invalid, returning None.") - # return None - # else: - # logger.info(f"Found {n_valid_goal_configurations}/{n_goal_configurations} valid goal pairs.") - - # # 4. for each pair, plan to the goal pair - # paths = [] - # path_lengths: list[float] = [] - # for goal_configuration in goal_configurations_valid: - # path = self.plan_to_joint_configuration( - # start_configuration_left, start_configuration_right, goal_configuration[:6], goal_configuration[6:] - # ) - # if path is not None: - # paths.append(path) - # if self._path_length_dual is None: - # raise ValueError("For devs: path length should not be None at this point") - # path_lengths.append(self._path_length_dual) - - # if len(paths) == 0: - # logger.info("No paths founds towards any goal pairs, returning None.") - # return None - - # logger.info(f"Found {len(paths)} paths towards goal pairs.") - - # # 5. return the shortest path among all the planned paths - # shortest_path_idx = np.argmin(path_lengths) - # shortest_path = paths[shortest_path_idx] - # return shortest_path - - # def plan_to_tcp_pose( - # self, - # start_configuration_left: JointConfigurationType, - # start_configuration_right: JointConfigurationType, - # tcp_pose_left_in_base: HomogeneousMatrixType | None, - # tcp_pose_right_in_base: HomogeneousMatrixType | None, - # desirable_goal_configurations_left: list[JointConfigurationType] | None = None, - # desirable_goal_configurations_right: list[JointConfigurationType] | None = None, - # ) -> list[tuple[JointConfigurationType, JointConfigurationType]] | None: - # if tcp_pose_left_in_base is None and tcp_pose_right_in_base is None: - # raise ValueError("A goal TCP pose must be specified for at least one of the arms.") - - # if tcp_pose_right_in_base is None: - # assert tcp_pose_left_in_base is not None # Mypy needs this to be explicit - # path = self._plan_to_tcp_pose_left_arm_only( - # start_configuration_left, - # start_configuration_right, - # tcp_pose_left_in_base, - # desirable_goal_configurations_left, - # ) - # return path - - # if tcp_pose_left_in_base is None: - # assert tcp_pose_right_in_base is not None # Mypy needs this to be explicit - # path = self._plan_to_tcp_pose_right_arm_only( - # start_configuration_left, - # start_configuration_right, - # tcp_pose_right_in_base, - # desirable_goal_configurations_right, - # ) - # return path - - # # TODO use desirable_goal_configurations for dual arm planning - # if desirable_goal_configurations_left is not None or desirable_goal_configurations_right is not None: - # logger.warning( - # "Desirable goal configurations are not implemented yet for dual arm planning, ignoring them." - # ) - - # path = self._plan_to_tcp_pose_dual_arm( - # start_configuration_left, start_configuration_right, tcp_pose_left_in_base, tcp_pose_right_in_base - # ) - - # return path + + # 0. TODO if one of the two if None, use the single arm planners + if tcp_pose_right is None: + assert tcp_pose_left is not None # Mypy needs this to be explicit + return self._plan_to_tcp_pose_left_arm_only( + start_configuration_left, + start_configuration_right, + tcp_pose_left, + ) + + if tcp_pose_left is None: + assert tcp_pose_right is not None + return self._plan_to_tcp_pose_right_arm_only( + start_configuration_left, + start_configuration_right, + tcp_pose_right, + ) + + # 1.1. stack start_configuration + start_configuration = stack_joints(start_configuration_left, start_configuration_right) + + # 1.2. stack goal_configurations (combine ik_solutions_left and ik_solutions_right) + goal_configurations = self._calculate_ik_solutions(tcp_pose_left, tcp_pose_right) + + # all steps below are in multiple_goal_planner.py + # 2. Remove IK solutions that are outside the joint bounds + # 3. Create all goal pairs (use stack_joints() to concatenate left and right arm configurations) + # from this point on, all function from the single arm case can be reused + # 4. Remove invalid goal pairs + # 5. Apply the user's filter on goal pairs (stacked) + # 6. Rank the goal pairs with the user's ranking function + # 7. Plan with early stop or exhaustively + solution_path = self.plan_to_ik_solutions(start_configuration, goal_configurations) + return solution_path diff --git a/airo_planner/ompl/single_arm_ompl_planner.py b/airo_planner/ompl/single_arm_ompl_planner.py index 894f629..623164f 100644 --- a/airo_planner/ompl/single_arm_ompl_planner.py +++ b/airo_planner/ompl/single_arm_ompl_planner.py @@ -1,7 +1,3 @@ -import time -from typing import Callable - -import numpy as np from airo_typing import ( HomogeneousMatrixType, InverseKinematicsFunctionType, @@ -12,15 +8,15 @@ from loguru import logger from airo_planner import ( - AllGoalConfigurationsRemovedError, GoalConfigurationOutOfBoundsError, InvalidGoalConfigurationError, InvalidStartConfigurationError, JointBoundsType, + JointConfigurationsModifierType, + JointPathChooserType, + MultipleGoalPlanner, NoInverseKinematicsSolutionsError, - NoInverseKinematicsSolutionsWithinBoundsError, NoPathFoundError, - NoValidInverseKinematicsSolutionsError, SingleArmPlanner, StartConfigurationOutOfBoundsError, choose_shortest_path, @@ -31,12 +27,8 @@ uniform_symmetric_joint_bounds, ) -# TODO move this to airo_typing? -JointConfigurationsModifierType = Callable[[list[JointConfigurationType]], list[JointConfigurationType]] -JointPathChooserType = Callable[[list[JointPathType]], JointPathType] - -class SingleArmOmplPlanner(SingleArmPlanner): +class SingleArmOmplPlanner(SingleArmPlanner, MultipleGoalPlanner): """Utility class for single-arm motion planning using OMPL. This class only plans in joint space. @@ -74,9 +66,6 @@ def __init__( # Functions for planning to TCP poses self.inverse_kinematics_fn = inverse_kinematics_fn - self.filter_goal_configurations_fn = filter_goal_configurations_fn - self.rank_goal_configurations_fn = rank_goal_configurations_fn - self.choose_path_fn = choose_path_fn # Planning parameters self._degrees_of_freedom = degrees_of_freedom @@ -85,11 +74,21 @@ def __init__( else: self.joint_bounds = joint_bounds + # Initialize MultipleGoalPlanner + super().__init__( + is_state_valid_fn, + self.joint_bounds, + filter_goal_configurations_fn, + rank_goal_configurations_fn, + choose_path_fn, + ) + + # TODO consider creating a JointSpaceOmplPlanner class that can be used as base for both single and dual arm planning + # The SingleArmOmplPlanner then becomes even simpler, it bascially only handles IK and planning to TCP poses. self._simple_setup = create_simple_setup(self.is_state_valid_fn, self.joint_bounds) # Debug attributes self._ik_solutions: list[JointConfigurationType] | None = None - self._all_paths: list[JointPathType] | None = None def _set_start_and_goal_configurations( self, start_configuration: JointConfigurationType, goal_configuration: JointConfigurationType @@ -127,6 +126,16 @@ def plan_to_joint_configuration( return path + def _plan_to_joint_configuration_stacked( + self, start_configuration: JointConfigurationType, goal_configuration: JointConfigurationType + ) -> JointPathType: + """This function's only purpose is to called used by the MultipleGoalPlanner class. + + MultipleGoalPlanner can't call plan_to_joint_configuration for both signal arm planning and dual arm planning + because the parameters are different. This function is a workaround for that. + """ + return self.plan_to_joint_configuration(start_configuration, goal_configuration) + def _calculate_ik_solutions(self, tcp_pose: HomogeneousMatrixType) -> list[JointConfigurationType]: """Used by plan_to_tcp_pose() to calculate IK solutions.""" @@ -143,99 +152,9 @@ def _calculate_ik_solutions(self, tcp_pose: HomogeneousMatrixType) -> list[Joint logger.info(f"IK returned {len(ik_solutions)} solutions.") return ik_solutions - def _check_ik_solutions_bounds(self, ik_solutions: list[JointConfigurationType]) -> list[JointConfigurationType]: - """Used by plan_to_tcp_pose() to check which IK solutions are within the joint bounds.""" - ik_solutions_within_bounds = [] - joints_lower, joints_upper = self.joint_bounds - for ik_solution in ik_solutions: - if np.all(ik_solution >= joints_lower) and np.all(ik_solution <= joints_upper): - ik_solutions_within_bounds.append(ik_solution) - - if len(ik_solutions_within_bounds) == 0: - raise NoInverseKinematicsSolutionsWithinBoundsError(ik_solutions, self.joint_bounds) - - logger.info(f"Found {len(ik_solutions_within_bounds)}/{len(ik_solutions)} solutions within joint bounds.") - return ik_solutions_within_bounds - - def _check_ik_solutions_validity(self, ik_solutions: list[JointConfigurationType]) -> list[JointConfigurationType]: - """Used by plan_to_tcp_pose() to check which IK solutions are valid.""" - ik_solutions_valid = [s for s in ik_solutions if self.is_state_valid_fn(s)] - if len(ik_solutions_valid) == 0: - raise NoValidInverseKinematicsSolutionsError(ik_solutions) - - logger.info(f"Found {len(ik_solutions_valid)}/{len(ik_solutions)} valid solutions.") - return ik_solutions_valid - - def _filter_ik_solutions(self, ik_solutions: list[JointConfigurationType]) -> list[JointConfigurationType]: - """Used by plan_to_tcp_pose() to filter IK solutions.""" - if self.filter_goal_configurations_fn is None: - return ik_solutions - - ik_solutions_filtered = self.filter_goal_configurations_fn(ik_solutions) - if len(ik_solutions_filtered) == 0: - raise AllGoalConfigurationsRemovedError(ik_solutions) - - logger.info(f"Filtered IK solutions to {len(ik_solutions_filtered)}/{len(ik_solutions)} solutions.") - return ik_solutions_filtered - - def _rank_ik_solution(self, ik_solutions: list[JointConfigurationType]) -> list[JointConfigurationType]: - """Used by plan_to_tcp_pose() to rank IK solutions.""" - if self.rank_goal_configurations_fn is None: - return ik_solutions - - logger.info("Ranking IK solutions.") - ik_solutions_ranked = self.rank_goal_configurations_fn(ik_solutions) - - if len(ik_solutions_ranked) != len(ik_solutions): - logger.warning( - f"Ranking function changed the number of IK solutions from {len(ik_solutions)} to {len(ik_solutions_ranked)}." - ) - - if len(ik_solutions_ranked) == 0: - raise RuntimeError("Ranking function removed all IK solutions. This is probably an implementation error.") - - return ik_solutions_ranked - def plan_to_tcp_pose( self, start_configuration: JointConfigurationType, tcp_pose: HomogeneousMatrixType ) -> JointPathType: - ik_solutions = self._calculate_ik_solutions(tcp_pose) - ik_solutions_within_bounds = self._check_ik_solutions_bounds(ik_solutions) - ik_solutions_valid = self._check_ik_solutions_validity(ik_solutions_within_bounds) - ik_solutions_filtered = self._filter_ik_solutions(ik_solutions_valid) - ik_solutions_ranked = self._rank_ik_solution(ik_solutions_filtered) - - logger.info(f"Running OMPL towards {len(ik_solutions_ranked)} IK solutions.") - goal_configurations = ik_solutions_ranked - return_first_success = True if self.rank_goal_configurations_fn is not None else False - - # Try solving to each IK solution in joint space. - start_time = time.time() - paths = [] - for i, goal_configuration in enumerate(goal_configurations): - try: - path = self.plan_to_joint_configuration(start_configuration, goal_configuration) - except NoPathFoundError: - logger.info(f"No path found to goal configuration: {i}.") - continue - - if return_first_success: - logger.info(f"Returning first successful path (planning time: {time.time() - start_time:.2f} s).") - return path - paths.append(path) - - end_time = time.time() - - if len(paths) == 0: - raise NoPathFoundError(start_configuration, goal_configurations) - - self._all_paths = paths # Saved for debugging - logger.info(f"Found {len(paths)} paths towards IK solutions, (planning time: {end_time - start_time:.2f} s).") - - solution_path = self.choose_path_fn(paths) - - if solution_path is None: - raise RuntimeError(f"Path choosing function did not return a path out of {len(paths)} options.") - + solution_path = self.plan_to_ik_solutions(start_configuration, ik_solutions) return solution_path diff --git a/notebooks/02_planning_to_tcp_poses.ipynb b/notebooks/02_planning_to_tcp_poses.ipynb index 9a70c9d..3ae01bd 100644 --- a/notebooks/02_planning_to_tcp_poses.ipynb +++ b/notebooks/02_planning_to_tcp_poses.ipynb @@ -347,6 +347,7 @@ "from functools import partial\n", "from airo_planner import filter_with_distance_to_configurations\n", "\n", + "\n", "def plan_pregrasp(\n", " grasp_pose: HomogeneousMatrixType,\n", " start_configuration: JointConfigurationType,\n", @@ -384,19 +385,17 @@ " logger.info(f\"Failed to plan to pregrasp pose at distance {distance}, continuing to next distance.\")\n", " continue\n", "\n", - " # TODO find the grasp path of which the start is closest to the pregrasp path end (=pregrasp end joints)\n", + " # Find the grasp path of which the start is closest to the pregrasp path end (=pregrasp end joints)\n", " pregrasp_end_joints = pregrasp_path[-1]\n", " distances = [np.linalg.norm(start - pregrasp_end_joints) for start in grasp_path_starts]\n", " index_of_closest_start = np.argmin(distances)\n", "\n", - " print(len(pregrasp_path))\n", - "\n", - " assert np.isclose(distances[index_of_closest_start], 0, atol=0.01) # sanity check\n", + " assert np.isclose(distances[index_of_closest_start], 0, atol=0.01) # sanity check\n", "\n", " grasp_path = joint_paths[index_of_closest_start]\n", "\n", " return pregrasp_path, grasp_path\n", - " \n", + "\n", " logger.warning(\"Exhausted all pregrasp poses to try\")\n", "\n", "\n", diff --git a/notebooks/03_dual_arm_planning.ipynb b/notebooks/03_dual_arm_planning.ipynb index 1f2a885..daeb43a 100644 --- a/notebooks/03_dual_arm_planning.ipynb +++ b/notebooks/03_dual_arm_planning.ipynb @@ -335,11 +335,13 @@ "metadata": {}, "outputs": [], "source": [ - "from airo_drake import animate_joint_configurations\n", + "from airo_drake import animate_joint_configurations, visualize_frame\n", "\n", "grasp_pose = np.identity(4)\n", "grasp_pose[2, 3] = 0.4\n", "\n", + "visualize_frame(scene.meshcat, \"grasp_pose\", grasp_pose)\n", + "\n", "grasp_joints = inverse_kinematics_right_fn(grasp_pose)\n", "\n", "animate_joint_configurations(scene.meshcat, scene.robot_diagram, scene.arm_right_index, grasp_joints, context=context)" @@ -351,7 +353,79 @@ "metadata": {}, "outputs": [], "source": [ - "path = planner.plan_to_tcp_pose(tangled_joints_left, tangled_joints_right, None, grasp_pose)" + "path = planner.plan_to_tcp_pose(tangled_joints_left, home_joints_right, None, grasp_pose)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "trajectory = time_parametrize_toppra(scene.robot_diagram.plant(), path)\n", + "animate_dual_joint_trajectory(scene.meshcat, scene.robot_diagram, scene.arm_left_index, scene.arm_right_index, trajectory)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### 2.3 Joints to dual TCP poses (🦾,🦾) -> (📐,📐)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from airo_typing import RotationMatrixType\n", + "\n", + "\n", + "def hang_in_the_air_tcp_orientation(left: bool) -> RotationMatrixType:\n", + " gripper_forward_direction = np.array([0, -1, 0]) if left else np.array([0, 1, 0])\n", + " Z = gripper_forward_direction / np.linalg.norm(gripper_forward_direction)\n", + " X = np.array([0, 0, 1]) if left else np.array([0, 0, -1])\n", + " Y = np.cross(Z, X)\n", + " return np.column_stack([X, Y, Z])\n", + "\n", + "\n", + "def hang_in_the_air_tcp_pose(left: bool) -> HomogeneousMatrixType:\n", + " position = np.array([0, 0, 0.9]) # 1 m is too close to a singularity\n", + " gripper_orientation = hang_in_the_air_tcp_orientation(left)\n", + "\n", + " gripper_pose = np.identity(4)\n", + " gripper_pose[0:3, 0:3] = gripper_orientation\n", + " gripper_pose[0:3, 3] = position\n", + " return gripper_pose\n", + "\n", + "stretch_pose_left = hang_in_the_air_tcp_pose(left=True)\n", + "stretch_pose_right = hang_in_the_air_tcp_pose(left=False)\n", + "\n", + "stretch_pose_left[:3, 3] += np.array([-0.4, 0.02, 0])\n", + "stretch_pose_right[:3, 3] += np.array([-0.4, -0.02, 0])\n", + "\n", + "visualize_frame(scene.meshcat, \"stretch_pose_left\", stretch_pose_left, opacity=0.5)\n", + "visualize_frame(scene.meshcat, \"stretch_pose_right\", stretch_pose_right, opacity=0.5)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "path = planner.plan_to_tcp_pose(tangled_joints_left, tangled_joints_right, stretch_pose_left, stretch_pose_right)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "trajectory = time_parametrize_toppra(scene.robot_diagram.plant(), path)\n", + "animate_dual_joint_trajectory(scene.meshcat, scene.robot_diagram, scene.arm_left_index, scene.arm_right_index, trajectory)" ] }, { @@ -378,7 +452,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.10.13" + "version": "3.10.14" } }, "nbformat": 4, diff --git a/test/ompl/single_arm/test_basic_planning.py b/test/ompl/single_arm/test_basic_planning.py index 9ef8b06..64bf305 100644 --- a/test/ompl/single_arm/test_basic_planning.py +++ b/test/ompl/single_arm/test_basic_planning.py @@ -39,7 +39,7 @@ def joint_space_tunnel_fn(joint_configuration: np.ndarray): return joint_space_tunnel_fn joint_space_wall_fn = create_joint_space_tunnel_fn(0.0) - joint_space_tunnel_fn = create_joint_space_tunnel_fn(1.0) + joint_space_tunnel_fn = create_joint_space_tunnel_fn(10.0) # Create an unreasonable narrow tunnel as well joint_space_narrow_tunnel_fn = create_joint_space_tunnel_fn(0.00000001)