Skip to content

Commit

Permalink
update head tracking
Browse files Browse the repository at this point in the history
  • Loading branch information
misoshiruseijin committed Sep 28, 2023
1 parent 0215590 commit 0cfcbe1
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 32 deletions.
123 changes: 94 additions & 29 deletions omnigibson/action_primitives/starter_semantic_action_primitives.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ class StarterSemanticActionPrimitiveSet(IntEnum):
TOGGLE_OFF = 8

class StarterSemanticActionPrimitives(BaseActionPrimitiveSet):
def __init__(self, task, scene, robot, add_context=False):
def __init__(self, task, scene, robot, add_context=False, enable_head_tracking=True):
logger.warning(
"The StarterSemanticActionPrimitive is a work-in-progress and is only provided as an example. "
"It currently only works with BehaviorRobot with its JointControllers set to absolute mode. "
Expand All @@ -238,7 +238,10 @@ def __init__(self, task, scene, robot, add_context=False):
self.robot_model = self.robot.model_name
self.robot_base_mass = self.robot._links["base_link"].mass
self.add_context = add_context
self._tracking_object = None
# self._tracking_object = None
self.enable_head_tracking = enable_head_tracking
self._tracking_object_pose = None
self._track_eef = False

self.skill_functions = ["_grasp", "_place_on_top", "_place_or_top", "_open_or_close"]

Expand All @@ -263,7 +266,7 @@ def with_context(self, action):
action_type = "nav"

context = action_type + context_function
print(context)
# print(context)
return action, context

# Disable grasping frame for Tiago robot (Should be cleaned up in the future)
Expand Down Expand Up @@ -452,6 +455,10 @@ def _close(self, obj):
def _open_or_close(self, obj, should_open):
relevant_joint = None

# Reset object to track
self._tracking_object_pose = None
self._track_eef = False

if self._get_obj_in_hand():
raise ActionPrimitiveError(
ActionPrimitiveError.Reason.PRE_CONDITION_ERROR,
Expand All @@ -462,6 +469,9 @@ def _open_or_close(self, obj, should_open):
# Open the hand first
yield from self._execute_release()

# Let the head track the eef
self._track_eef = True

for _ in range(MAX_ATTEMPTS_FOR_OPEN_CLOSE):
try:
if should_open:
Expand Down Expand Up @@ -545,13 +555,17 @@ def _open_or_close(self, obj, should_open):
else:
yield from self._move_hand_backward()


if obj.states[object_states.Open].get_value() != should_open:
raise ActionPrimitiveError(
ActionPrimitiveError.Reason.POST_CONDITION_ERROR,
"Despite executing the planned trajectory, the object did not open or close as expected. Maybe try again",
{"target object": obj.name, "is it currently open": obj.states[object_states.Open].get_value()},
)

# Reset object to track
self._tracking_object_pose = None
self._track_eef = False


def _move_base_backward(self, steps=10, speed=0.1):
"""
Expand All @@ -563,8 +577,9 @@ def _move_base_backward(self, steps=10, speed=0.1):
Returns:
np.array or None: Action array for one step for the robot to move hand or None if its at the target pose
"""
initial_eef_pos = self.robot.get_eef_position()
for _ in range(steps):
action = self._empty_action()
action = self._empty_action(arm_pose_to_keep=initial_eef_pos)
action[self.robot.controller_action_idx["gripper_{}".format(self.arm)]] = 1.0
action[self.robot.base_control_idx[0]] = -speed
yield self.with_context(action)
Expand All @@ -579,9 +594,10 @@ def _move_hand_backward(self, steps=10, speed=0.1):
Returns:
np.array or None: Action array for one step for the robot to move hand or None if its at the target pose
"""
initial_eef_pos = self.robot.get_eef_position()
print("moving hand backward")
for _ in range(steps):
action = self._empty_action()
action = self._empty_action(arm_pose_to_keep=initial_eef_pos)
action[self.robot.controller_action_idx["gripper_{}".format(self.arm)]] = 1.0
action[self.robot.controller_action_idx["arm_{}".format(self.arm)][0]] = -speed
yield self.with_context(action)
Expand All @@ -597,6 +613,9 @@ def _grasp(self, obj):
Returns:
np.array or None: Action array for one step for the robot to grasp or None if grasp completed
"""
# Reset object to track
self._tracking_object_pose = None
self._track_eef = False

# Don't do anything if the object is already grasped.
obj_in_hand = self._get_obj_in_hand()
Expand All @@ -621,6 +640,11 @@ def _grasp(self, obj):
# Prepare data for the approach later.
approach_pos = grasp_pose[0] + object_direction * GRASP_APPROACH_DISTANCE
approach_pose = (approach_pos, grasp_pose[1])

# Set grasp object as object to track with camera
if self.enable_head_tracking:
self._tracking_object_pose = obj.get_position_orientation()

# If the grasp pose is too far, navigate.
yield from self._navigate_if_needed(obj, pose_on_obj=grasp_pose)
yield from self._move_hand(grasp_pose)
Expand All @@ -644,6 +668,10 @@ def _grasp(self, obj):
{"target object": obj.name},
)

# Reset object to track
self._tracking_object_pose = None
self._track_eef = False

yield from self._reset_hand()

if self._get_obj_in_hand() != obj:
Expand Down Expand Up @@ -715,14 +743,24 @@ def _place_with_predicate(self, obj, predicate):
Returns:
np.array or None: Action array for one step for the robot to place or None if place completed
"""
# Reset object to track
self._tracking_object_pose = None
self._track_eef = False

obj_in_hand = self._get_obj_in_hand()
if obj_in_hand is None:
raise ActionPrimitiveError(
ActionPrimitiveError.Reason.PRE_CONDITION_ERROR, "You need to be grasping an object first to place it somewhere."
)

# Sample location to place object
obj_pose = self._sample_pose_with_object_and_predicate(predicate, obj_in_hand, obj)
hand_pose = self._get_hand_pose_for_object_pose(obj_pose)

# Set object to track to the target object to place on
if self.enable_head_tracking:
self._tracking_object_pose = obj.get_position_orientation()

yield from self._navigate_if_needed(obj, pose_on_obj=hand_pose)
yield from self._move_hand(hand_pose)
yield from self._execute_release()
Expand All @@ -742,6 +780,10 @@ def _place_with_predicate(self, obj, predicate):
{"dropped object": obj_in_hand.name, "target object": obj.name}
)

# Reset object to track
self._tracking_object_pose = None
self._track_eef = False

# yield from self._reset_hand()

def _convert_cartesian_to_joint_space(self, target_pose):
Expand Down Expand Up @@ -955,7 +997,6 @@ def _compute_delta_command(self, controller_name, diff_joint_pos, gain=1.0, min_

return delta_action


def _move_hand_direct_joint(self, joint_pos, control_idx, stop_on_contact=False, max_steps_for_hand_move=MAX_STEPS_FOR_HAND_MOVE, ignore_failure=False):
"""
Yields action for the robot to move its arm to reach the specified joint positions by directly actuating with no planner
Expand Down Expand Up @@ -994,7 +1035,7 @@ def _move_hand_direct_joint(self, joint_pos, control_idx, stop_on_contact=False,

if use_delta:
action[self.robot.controller_action_idx[controller_name]] = self._compute_delta_command(controller_name, diff_joint_pos)
action = self._overwrite_head_action(action, self._tracking_object) if self._tracking_object is not None else action
action = self._overwrite_head_action(action, self._tracking_object_pose)

prev_eef_pos = self.robot.get_eef_position(self.arm)
yield self.with_context(action)
Expand Down Expand Up @@ -1051,7 +1092,7 @@ def _move_hand_direct_ik(self, target_pose, stop_on_contact=False, ignore_failur
prev_orn = current_orn

action[control_idx] = np.concatenate([delta_pos, target_orn_axisangle])
action = self._overwrite_head_action(action, self._tracking_object) if self._tracking_object is not None else action
action = self._overwrite_head_action(action, self._tracking_object_pose)
yield self.with_context(action)

if not ignore_failure:
Expand Down Expand Up @@ -1173,9 +1214,10 @@ def _execute_grasp(self):
Returns:
np.array or None: Action array for one step for the robot to grasp or None if its done grasping
"""
initial_eef_pos = self.robot.get_eef_position()
for _ in range(MAX_STEPS_FOR_GRASP_OR_RELEASE):
action = self._empty_action()
action = self._overwrite_head_action(action, self._tracking_object) if self._tracking_object is not None else action
action = self._empty_action(arm_pose_to_keep=initial_eef_pos)
action = self._overwrite_head_action(action, self._tracking_object_pose)
controller_name = "gripper_{}".format(self.arm)
action[self.robot.controller_action_idx[controller_name]] = -1.0
yield self.with_context(action)
Expand All @@ -1187,10 +1229,10 @@ def _execute_release(self):
Returns:
np.array or None: Action array for one step for the robot to release or None if its done releasing
"""

initial_eef_pos = self.robot.get_eef_position()
for _ in range(MAX_STEPS_FOR_GRASP_OR_RELEASE):
action = self._empty_action()
action = self._overwrite_head_action(action, self._tracking_object) if self._tracking_object is not None else action
action = self._empty_action(arm_pose_to_keep=initial_eef_pos)
action = self._overwrite_head_action(action, self._tracking_object_pose)
controller_name = "gripper_{}".format(self.arm)
action[self.robot.controller_action_idx[controller_name]] = 1.0
yield self.with_context(action)
Expand All @@ -1202,9 +1244,24 @@ def _execute_release(self):
{"object in hand": self._get_obj_in_hand().name},
)

def _overwrite_head_action(self, action, obj):
def _overwrite_head_action(self, action, target_obj_pose):
"""
Overwrites camera control actions to track an object of interest.
If self._track_eef is True, target_obj_pose is ignored and the camera tracks the robot's end-effector.
Args:
action (array) : action array to overwrite
target_obj_pose (tuple of arrays) : (position, quat) in world frame of object to track
"""
assert self.robot_model == "Tiago", "Tracking object with camera is currently only supported for Tiago"
head_q = self._get_head_goal_q(obj)

if not self._track_eef and target_obj_pose is None:
return action

if self._track_eef:
target_obj_pose = (self.robot.get_eef_position(), self.robot.get_eef_orientation())

head_q = self._get_head_goal_q(target_obj_pose)
head_idx = self.robot.controller_action_idx["camera"]

config = self.robot._controller_config["camera"]
Expand All @@ -1220,7 +1277,7 @@ def _overwrite_head_action(self, action, obj):
action[head_idx] = head_action
return action

def _get_head_goal_q(self, obj):
def _get_head_goal_q(self, target_obj_pose):
"""
Get goal joint positions for head to look at an object of interest,
If the object cannot be seen, return the current head joint positions.
Expand All @@ -1236,8 +1293,8 @@ def _get_head_goal_q(self, obj):

# grab robot and object poses
robot_pose = self.robot.get_position_orientation()
obj_pose = obj.get_position_orientation()
obj_in_base = T.relative_pose_transform(*obj_pose, *robot_pose)
# obj_pose = obj.get_position_orientation()
obj_in_base = T.relative_pose_transform(*target_obj_pose, *robot_pose)

# compute angle between base and object in xy plane (parallel to floor)
theta = np.arctan2(obj_in_base[0][1], obj_in_base[0][0])
Expand All @@ -1262,10 +1319,13 @@ def _get_head_goal_q(self, obj):

return [head1_joint_goal, head2_joint_goal]

def _empty_action(self):
def _empty_action(self, arm_pose_to_keep=None):
"""
No op action
Args:
arm_pose_to_keep (array) : if provided, returns action that keeps the arm in this pose.
Currently only used in IK control pose_absolute_ori mode: assumes [eef position]
Returns:
np.array or None: Action array for one step for the robot to do nothing
"""
Expand All @@ -1288,6 +1348,8 @@ def _empty_action(self):
current_ori = T.quat2axisangle(current_quat)
control_idx = self.robot.controller_action_idx["arm_" + self.arm]
action[control_idx[3:]] = current_ori
if arm_pose_to_keep is not None:
action[control_idx[:3]] = arm_pose_to_keep - self.robot.get_eef_position()

return action

Expand Down Expand Up @@ -1513,7 +1575,8 @@ def _navigate_to_pose_direct(self, pose_2d, low_precision=False):

end_pose = self._get_robot_pose_from_2d_pose(pose_2d)
body_target_pose = self._get_pose_in_robot_frame(end_pose)

initial_eef_pos = self.robot.get_eef_position()

while np.linalg.norm(body_target_pose[0][:2]) > dist_threshold:
diff_pos = end_pose[0] - self.robot.get_position()
intermediate_pose = (end_pose[0], T.euler2quat([0, 0, np.arctan2(diff_pos[1], diff_pos[0])]))
Expand All @@ -1522,12 +1585,12 @@ def _navigate_to_pose_direct(self, pose_2d, low_precision=False):
if abs(diff_yaw) > DEFAULT_ANGLE_THRESHOLD:
yield from self._rotate_in_place(intermediate_pose, angle_threshold=DEFAULT_ANGLE_THRESHOLD)
else:
action = self._empty_action()
action = self._empty_action(arm_pose_to_keep=initial_eef_pos)
if self.robot_model == "Tiago":
direction_vec = body_target_pose[0][:2] / (np.linalg.norm(body_target_pose[0][:2]) * 5)
base_action = [direction_vec[0], direction_vec[1], 0.0]
action[self.robot.controller_action_idx["base"]] = base_action
action = self._overwrite_head_action(action, self._tracking_object) if self._tracking_object is not None else action
action = self._overwrite_head_action(action, self._tracking_object_pose)
else:
base_action = [KP_LIN_VEL, 0.0]
action[self.robot.controller_action_idx["base"]] = base_action
Expand All @@ -1551,22 +1614,24 @@ def _rotate_in_place(self, end_pose, angle_threshold = DEFAULT_ANGLE_THRESHOLD):
"""
body_target_pose = self._get_pose_in_robot_frame(end_pose)
diff_yaw = T.wrap_angle(T.quat2euler(body_target_pose[1])[2])
initial_eef_pos = self.robot.get_eef_position()

while abs(diff_yaw) > angle_threshold:
action = self._empty_action()
action = self._empty_action(arm_pose_to_keep=initial_eef_pos)

direction = -1.0 if diff_yaw < 0.0 else 1.0
ang_vel = KP_ANGLE_VEL * direction

base_action = [0.0, 0.0, ang_vel] if self.robot_model == "Tiago" else [0.0, ang_vel]
action[self.robot.controller_action_idx["base"]] = base_action

action = self._overwrite_head_action(action, self._tracking_object) if self._tracking_object is not None else action
action = self._overwrite_head_action(action, self._tracking_object_pose)
yield self.with_context(action)

body_target_pose = self._get_pose_in_robot_frame(end_pose)
diff_yaw = T.wrap_angle(T.quat2euler(body_target_pose[1])[2])

empty_action = self._empty_action()
empty_action = self._empty_action(arm_pose_to_keep=initial_eef_pos)
yield self.with_context(empty_action)

def _sample_pose_near_object(self, obj, pose_on_obj=None, **kwargs):
Expand Down Expand Up @@ -1785,11 +1850,11 @@ def _settle_robot(self):
Returns:
np.array or None: Action array for one step for the robot to do nothing
"""
# return
initial_eef_pos = self.robot.get_eef_position()
for _ in range(10):
empty_action = self._empty_action()
empty_action = self._empty_action(arm_pose_to_keep=initial_eef_pos)
yield self.with_context(empty_action)

while np.linalg.norm(self.robot.get_linear_velocity()) > 0.01:
empty_action = self._empty_action()
empty_action = self._empty_action(arm_pose_to_keep=initial_eef_pos)
yield self.with_context(empty_action)
2 changes: 1 addition & 1 deletion omnigibson/controllers/ik_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ def _command_to_control(self, command, control_dict):
target_quat=target_quat,
tolerance_pos=0.002,
weight_pos=20.0,
max_iterations=2000,
max_iterations=100,
# initial_joint_pos=current_joint_pos,
)

Expand Down
4 changes: 2 additions & 2 deletions omnigibson/robots/tiago.py
Original file line number Diff line number Diff line change
Expand Up @@ -665,8 +665,8 @@ def finger_joint_names(self):

@property
def usd_path(self):
# return os.path.join(gm.ASSET_PATH, "models/tiago/tiago_dual_omnidirectional_stanford/tiago_dual_omnidirectional_stanford_33_with_wrist_cam.usd")
return os.path.join(gm.ASSET_PATH, "models/tiago/tiago_dual_omnidirectional_stanford/tiago_dual_omnidirectional_stanford_33.usd")
return os.path.join(gm.ASSET_PATH, "models/tiago/tiago_dual_omnidirectional_stanford/tiago_dual_omnidirectional_stanford_33_with_wrist_cam.usd")
# return os.path.join(gm.ASSET_PATH, "models/tiago/tiago_dual_omnidirectional_stanford/tiago_dual_omnidirectional_stanford_33.usd")

@property
def simplified_mesh_usd_path(self):
Expand Down

0 comments on commit 0cfcbe1

Please sign in to comment.