forked from quantumiracle/robolite
-
Notifications
You must be signed in to change notification settings - Fork 0
/
playback_demonstrations_from_hdf5.py
106 lines (86 loc) · 3.21 KB
/
playback_demonstrations_from_hdf5.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
"""
A convenience script to playback random demonstrations from
a set of demonstrations stored in a hdf5 file.
Arguments:
--folder (str): Path to demonstrations
--use_actions (optional): If this flag is provided, the actions are played back
through the MuJoCo simulator, instead of loading the simulator states
one by one.
Example:
$ python playback_demonstrations_from_hdf5.py --folder ../models/assets/demonstrations/SawyerPickPlace/
"""
import os
import h5py
import argparse
import random
import numpy as np
import robosuite
from robosuite.utils.mjcf_utils import postprocess_model_xml
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument(
"--folder",
type=str,
default=os.path.join(
robosuite.models.assets_root, "demonstrations/SawyerNutAssembly"
),
)
parser.add_argument(
"--use-actions",
action='store_true',
)
args = parser.parse_args()
demo_path = args.folder
hdf5_path = os.path.join(demo_path, "demo.hdf5")
f = h5py.File(hdf5_path, "r")
env_name = f["data"].attrs["env"]
env = robosuite.make(
env_name,
has_renderer=True,
ignore_done=True,
use_camera_obs=False,
gripper_visualization=True,
reward_shaping=True,
control_freq=100,
)
# list of all demonstrations episodes
demos = list(f["data"].keys())
while True:
print("Playing back random episode... (press ESC to quit)")
# # select an episode randomly
ep = random.choice(demos)
# read the model xml, using the metadata stored in the attribute for this episode
model_file = f["data/{}".format(ep)].attrs["model_file"]
model_path = os.path.join(demo_path, "models", model_file)
with open(model_path, "r") as model_f:
model_xml = model_f.read()
env.reset()
xml = postprocess_model_xml(model_xml)
env.reset_from_xml_string(xml)
env.sim.reset()
env.viewer.set_camera(0)
# load the flattened mujoco states
states = f["data/{}/states".format(ep)].value
if args.use_actions:
# load the initial state
env.sim.set_state_from_flattened(states[0])
env.sim.forward()
# load the actions and play them back open-loop
jvels = f["data/{}/joint_velocities".format(ep)].value
grip_acts = f["data/{}/gripper_actuations".format(ep)].value
actions = np.concatenate([jvels, grip_acts], axis=1)
num_actions = actions.shape[0]
for j, action in enumerate(actions):
env.step(action)
env.render()
if j < num_actions - 1:
# ensure that the actions deterministically lead to the same recorded states
state_playback = env.sim.get_state().flatten()
assert(np.all(np.equal(states[j + 1], state_playback)))
else:
# force the sequence of internal mujoco states one by one
for state in states:
env.sim.set_state_from_flattened(state)
env.sim.forward()
env.render()
f.close()