-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathgamepad.py
106 lines (97 loc) · 4.87 KB
/
gamepad.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
from time import time, sleep
import math
import os.path
from select import select
import evdev
import rospy
from ctrl_pkg.msg import ServoCtrlMsg
from ctrl_pkg.srv import (ActiveStateSrv,
EnableStateSrv,
NavThrottleSrv)
DEVICE = "/dev/input/event6"
ROS_RATE = 20 # 20hz
TIME_DIFF = 1.0/ROS_RATE
TIME_TO_STOP = 1.0 # 1 seconds to stop the motor
X_AXIS_THRESHOLD = 0.85
throttle_max = 0.6
motor_state = True
def scale(val, src, dst):
return (float(val - src[0]) / (src[1] - src[0])) * (dst[1] - dst[0]) + dst[0]
def scale_stick(value):
return scale(value, (0, 255), (-1.0, 1.0))
if __name__ == '__main__':
rospy.init_node('gamepad_node', disable_signals=True)
pub_manual_drive = rospy.Publisher('manual_drive', ServoCtrlMsg, queue_size=10)
enable_state_req = rospy.ServiceProxy('enable_state', EnableStateSrv)
msg = ServoCtrlMsg()
while True:
if not os.path.exists(DEVICE):
print("sleep!")
sleep(1.0)
continue
try:
gamepad = evdev.InputDevice(DEVICE)
rospy.loginfo(gamepad)
last_time = time() - TIME_DIFF
angle = 0.0
throttle = 0.0
x_axis = 0.0
y_axis = 0.0
start_stop_state = False
while True:
# print(evdev.categorize(event))
r, w, x = select([gamepad.fd], [], [], TIME_TO_STOP)
if r:
for event in gamepad.read():
now = time()
if event.type == 1: # Right Button
if event.code == 304 and event.value == 1: #BTN_SOUTH
motor_state = not motor_state
start_stop_state = not start_stop_state
enable_state_req(start_stop_state)
rospy.loginfo("###### START/STOP: " + str(start_stop_state) + ":" + str(motor_state))
elif event.code == 307 and event.value == 1: #BTN_NORTH
throttle_max = min(1.0, throttle_max + 0.1)
rospy.loginfo("###### throttle_max: " + str(throttle_max))
elif event.code == 308 and event.value == 1: #BTN_WEST
throttle_max = max(0.3, throttle_max - 0.1)
rospy.loginfo("###### throttle_max: " + str(throttle_max))
if event.type == 3 and (event.code == 0 or event.code == 1): # Analog stick
if now - last_time < TIME_DIFF:
continue
if event.code == 0: # X axis
x_axis = scale_stick(event.value)
angle = - x_axis
if event.code == 1: # Y axis
y_axis = scale_stick(event.value)
throttle = min(1.0, math.sqrt(y_axis*y_axis + x_axis*x_axis))
throttle = - math.copysign(throttle, y_axis)
# Reduce the throttle if abs(x_axis) near 1 so throttle doesn't make big change abruptly
# when +y_axis switch to -y_axis
if abs(x_axis) > X_AXIS_THRESHOLD:
throttle_scale = (1.0 - abs(x_axis)) / (1.0 - X_AXIS_THRESHOLD)
throttle = throttle_scale * throttle
if motor_state and not start_stop_state and abs(angle) >= 0.1 and abs(throttle) >= 0.1:
start_stop_state = True
enable_state_req(start_stop_state)
rospy.loginfo("###### START")
if event.code == 0 or event.code == 1:
if start_stop_state:
try:
if not rospy.is_shutdown():
msg.angle = angle
msg.throttle = throttle_max * throttle
pub_manual_drive.publish(msg)
# rospy.loginfo(msg)
last_time = now
except rospy.ROSInterruptException:
print("ROS exit")
exit(0)
else:
if start_stop_state:
start_stop_state = False
enable_state_req(start_stop_state)
rospy.loginfo("###### STOP")
except IOError:
print("IOError")
pass