Skip to content

Commit

Permalink
Move launch and interfacing boilerplate to common file
Browse files Browse the repository at this point in the history
  • Loading branch information
RobertWilbrandt committed Oct 17, 2023
1 parent 40a9bfa commit fba56d2
Show file tree
Hide file tree
Showing 4 changed files with 293 additions and 360 deletions.
85 changes: 7 additions & 78 deletions ur_robot_driver/test/dashboard_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.


import os
import sys
import time
Expand All @@ -36,23 +34,10 @@
import pytest
import rclpy
from rclpy.node import Node
from std_srvs.srv import Trigger
from ur_dashboard_msgs.msg import RobotMode
from ur_dashboard_msgs.srv import (
GetLoadedProgram,
GetProgramState,
GetRobotMode,
IsProgramRunning,
Load,
)

sys.path.append(os.path.dirname(__file__))
from test_common import generate_dashboard_test_description # noqa: E402

TIMEOUT_WAIT_SERVICE = 10
# If we download the docker image simultaneously to the tests, it can take quite some time until the
# dashboard server is reachable and usable.
TIMEOUT_WAIT_SERVICE_INITIAL = 120
from test_common import DashboardInterface, generate_dashboard_test_description # noqa: E402


@pytest.mark.launch_test
Expand All @@ -75,38 +60,7 @@ def tearDownClass(cls):
rclpy.shutdown()

def init_robot(self):
# We wait longer for the first client, as the robot is still starting up
power_on_client = waitForService(
self.node, "/dashboard_client/power_on", Trigger, timeout=TIMEOUT_WAIT_SERVICE_INITIAL
)

# Connect to all other expected services
dashboard_interfaces = {
"power_off": Trigger,
"brake_release": Trigger,
"unlock_protective_stop": Trigger,
"restart_safety": Trigger,
"get_robot_mode": GetRobotMode,
"load_installation": Load,
"load_program": Load,
"close_popup": Trigger,
"get_loaded_program": GetLoadedProgram,
"program_state": GetProgramState,
"program_running": IsProgramRunning,
"play": Trigger,
"stop": Trigger,
}
self.dashboard_clients = {
srv_name: waitForService(self.node, f"/dashboard_client/{srv_name}", srv_type)
for (srv_name, srv_type) in dashboard_interfaces.items()
}

# Add first client to dict
self.dashboard_clients["power_on"] = power_on_client

#
# Test functions
#
self._dashboard_interface = DashboardInterface(self.node)

def test_switch_on(self):
"""Test power on a robot."""
Expand All @@ -115,59 +69,34 @@ def test_switch_on(self):
mode = RobotMode.DISCONNECTED
while mode != RobotMode.POWER_OFF and time.time() < end_time:
time.sleep(0.1)
result = self.call_dashboard_service("get_robot_mode", GetRobotMode.Request())
result = self._dashboard_interface.get_robot_mode()
self.assertTrue(result.success)
mode = result.robot_mode.mode

# Power on robot
self.assertTrue(self.call_dashboard_service("power_on", Trigger.Request()).success)
self.assertTrue(self._dashboard_interface.power_on().success)

# Wait until robot mode changes
end_time = time.time() + 10
mode = RobotMode.DISCONNECTED
while mode not in (RobotMode.IDLE, RobotMode.RUNNING) and time.time() < end_time:
time.sleep(0.1)
result = self.call_dashboard_service("get_robot_mode", GetRobotMode.Request())
result = self._dashboard_interface.get_robot_mode()
self.assertTrue(result.success)
mode = result.robot_mode.mode

self.assertIn(mode, (RobotMode.IDLE, RobotMode.RUNNING))

# Release robot brakes
self.assertTrue(self.call_dashboard_service("brake_release", Trigger.Request()).success)
self.assertTrue(self._dashboard_interface.brake_release().success)

# Wait until robot mode is RUNNING
end_time = time.time() + 10
mode = RobotMode.DISCONNECTED
while mode != RobotMode.RUNNING and time.time() < end_time:
time.sleep(0.1)
result = self.call_dashboard_service("get_robot_mode", GetRobotMode.Request())
result = self._dashboard_interface.get_robot_mode()
self.assertTrue(result.success)
mode = result.robot_mode.mode

self.assertEqual(mode, RobotMode.RUNNING)

#
# Utility functions
#

def call_dashboard_service(self, srv_name, request):
self.node.get_logger().info(
f"Calling dashboard service '{srv_name}' with request {request}"
)
future = self.dashboard_clients[srv_name].call_async(request)
rclpy.spin_until_future_complete(self.node, future)
if future.result() is not None:
self.node.get_logger().info(f"Received result {future.result()}")
return future.result()
else:
raise Exception(f"Exception while calling service: {future.exception()}")


def waitForService(node, srv_name, srv_type, timeout=TIMEOUT_WAIT_SERVICE):
client = node.create_client(srv_type, srv_name)
if client.wait_for_service(timeout) is False:
raise Exception(f"Could not reach service '{srv_name}' within timeout of {timeout}")

node.get_logger().info(f"Successfully connected to service '{srv_name}'")
return client
Loading

0 comments on commit fba56d2

Please sign in to comment.