diff --git a/CMakeLists.txt b/CMakeLists.txt index cd0a862..e8ef709 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -13,7 +13,9 @@ add_message_files(FILES ) add_service_files(FILES + EstablishBond.srv GetCapabilitySpec.srv + FreeCapability.srv GetCapabilitySpecs.srv GetInterfaces.srv GetNodeletManagerName.srv @@ -22,6 +24,7 @@ add_service_files(FILES GetSemanticInterfaces.srv StartCapability.srv StopCapability.srv + UseCapability.srv ) generate_messages(DEPENDENCIES std_msgs std_srvs) diff --git a/package.xml b/package.xml index 7868ebb..e68b948 100644 --- a/package.xml +++ b/package.xml @@ -24,6 +24,7 @@ std_msgs std_srvs + bondpy message_runtime nodelet python-yaml diff --git a/src/capabilities/client.py b/src/capabilities/client.py new file mode 100644 index 0000000..3d3c73d --- /dev/null +++ b/src/capabilities/client.py @@ -0,0 +1,220 @@ +# Software License Agreement (BSD License) +# +# Copyright (c) 2013, Open Source Robotics Foundation, Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions +# are met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following +# disclaimer in the documentation and/or other materials provided +# with the distribution. +# * Neither the name of Open Source Robotics Foundation, Inc. nor +# the names of its contributors may be used to endorse or promote +# products derived from this software without specific prior +# written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS +# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE +# COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, +# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, +# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; +# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT +# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN +# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +# POSSIBILITY OF SUCH DAMAGE. + +# Author: William Woodall + +"""Provides a simple Python interface for interacting with the capability server + +Typical usage:: + + >>> from capabilities.client import Client + >>> client = Client() + # Use the line below if the capability_server has a different name + # client = Client(capability_server_node_name='/capability_server_node_name') + >>> client.use_capability('foo_pkg/Foo') + >>> client.use_capability('foo_pkg/Foo') + >>> client.free_capability('foo_pkg/Foo') + >>> client.shutdown() + +""" + +import atexit + +import rospy + +from bondpy.bondpy import Bond + +from capabilities.srv import EstablishBond +from capabilities.srv import FreeCapability +from capabilities.srv import UseCapability + + +class CapabilitiesClient(object): + """Single point of entry for interacting with a remote capability server. + + Instantiation of this class does not check to see if the underlying + services are available, call :py:method:`wait_for_services` if you want + to ensure that the services are available before continuing. + + :param capability_server_node_name: name of the remote capability server + :type capability_server_node_name: str + """ + def __init__(self, capability_server_node_name='/capability_server'): + self._name = capability_server_node_name + self._bond = None + self._bond_id = None + self._used_capabilities = set([]) + self._services = {} + # Create service proxy for establish_bond + service_name = '{0}/establish_bond'.format(capability_server_node_name) + self.__establish_bond = rospy.ServiceProxy(service_name, EstablishBond) + self._services['establish_bond'] = self.__establish_bond + # Create service proxy for free_capability + service_name = '{0}/free_capability'.format(capability_server_node_name) + self.__free_capability = rospy.ServiceProxy(service_name, FreeCapability) + self._services['free_capability'] = self.__free_capability + # Create service proxy for use_capability + service_name = '{0}/use_capability'.format(capability_server_node_name) + self.__use_capability = rospy.ServiceProxy(service_name, UseCapability) + self._services['use_capability'] = self.__use_capability + # Register atexit shutdown + atexit.register(self.shutdown) + + def wait_for_services(self, timeout=None, services=None): + """Blocks until the requested services are available. + + Services are waited on one at a time. Therefore, if a non-None value + for timeout is given, and one or more services are being waited on, + the full timeout will be given to each wait, meaning that the total + run time of this function can exceed the timeout provided. + + :param timeout: Seconds to wait for the services before returning False. + If None is passed, then this will block indefinitely until the + services are available. + :type timeout: float + :param services: List of services to wait on. + If None is passed, then this will check all the services. + :type services: list + :returns: True is the services are available, False otherwise (timeout) + :rtype: bool + """ + services = self._services.keys() if services is None else services + assert isinstance(services, list), services + for service in services: + if service not in self._services: + raise ValueError( + "Service '{0}' is not a valid service and cannot be waited on. These are the valid services: {1}" + .format(service, list(self._services.keys()))) + if self.__wait_for_service(self._services[service], timeout) is False: + return False + return True + + def __wait_for_service(self, service, timeout): + try: + service.wait_for_service(timeout) + except rospy.ROSException: + rospy.logwarn("Timed out after waiting '{0}' seconds for service '{1}' to be available." + .format(timeout, service.resolved_name)) + return False + return True + + def establish_bond(self, timeout=None): + """Establishes a bond using the ~establish_bond service call + + The bond id which is returned by the service call is stored internally + and used implicitly by the use/free capabilities functions. + + If establish_bond had previously been called, then the old bond will be + broken, which will result in any capability uses under that bond to be + implicitly freed. + + This function is called implicitly by :py:method:`use_capability` if + no bond exists. + + This function will block waiting for the service call to become + available and to wait for the bond to be established. In both cases + the timeout parameter is used. + + None is returned if the timeout occurs while waiting on the service + to become available or while waiting for the bond to form. + + :param timeout: time in seconds to wait for the service to be available + :returns: the bond_id received from the server or None on failure + :rtype: str + """ + if self.__wait_for_service(self.__establish_bond, timeout) is False: + return None + resp = self.__establish_bond() + if not resp.bond_id: + return None + self._bond_id = resp.bond_id + self._bond = Bond('{0}/bonds'.format(self._name), self._bond_id) + self._bond.start() + if self._bond.wait_until_formed(rospy.Duration(timeout)) is False: + return None + return self._bond_id + + def free_capability(self, capability_interface, timeout=None): + """Free's a previously used capability. + + Calls the ~free_capability service, and closes the bond with that + topic if the reference count is zero. + + :param capability_interface: Name of the capability interface to free up + :type capability_interface: str + :param timeout: time to wait on service to be available (optional) + :type timeout: rospy.Duration + :returns: True if successful, otherwise False + :rtype: bool + """ + if capability_interface not in self._used_capabilities: + rospy.logerr("Cannot free capability interface '{0}', because it was not first used." + .format(capability_interface)) + return False + if self.__wait_for_service(self.__free_capability, timeout) is False: + return False + self.__free_capability.call(capability_interface, self._bond_id) + return True + + def shutdown(self): + """Cleanly frees any used capabilities.""" + if self._bond: + self._bond.break_bond() + + def use_capability(self, capability_interface, preferred_provider=None, timeout=None): + """Declares that this capability is being used. + + Calls the `~use_capability` service, and opens a bond with that topic. + This way the capability is started if it has not been already, and the + internal reference count for the capability server is incremented. + If the bond fails (this program crashes) then the reference is decremented. + The reference is also decremented if free_capability is called. + + :param capability_interface: Name of the capability interface to use + :type capability_interface: str + :param preferred_provider: preferred provider or None for default provider (optional) + :type preferred_provider: str + :param timeout: time to wait on service to be available (optional) + :type timeout: rospy.Duration + :returns: True if successful, otherwise False + :rtype: bool + """ + # If no bond has been established, establish one first + if self._bond is None: + if self.establish_bond(timeout) is None: + return False + if self.__wait_for_service(self.__use_capability, timeout) is False: + return False + self.__use_capability.call(capability_interface, preferred_provider or '', self._bond_id) + self._used_capabilities.add(capability_interface) + return True diff --git a/src/capabilities/server.py b/src/capabilities/server.py index e781bfa..e37652f 100644 --- a/src/capabilities/server.py +++ b/src/capabilities/server.py @@ -49,14 +49,21 @@ import sys import threading import traceback +import uuid import rospy +from bondpy.bondpy import Bond + from std_srvs.srv import Empty from std_srvs.srv import EmptyResponse +from capabilities.srv import EstablishBond +from capabilities.srv import EstablishBondResponse from capabilities.srv import GetCapabilitySpec from capabilities.srv import GetCapabilitySpecResponse +from capabilities.srv import FreeCapability +from capabilities.srv import FreeCapabilityResponse from capabilities.srv import GetCapabilitySpecs from capabilities.srv import GetCapabilitySpecsResponse from capabilities.srv import GetInterfaces @@ -73,6 +80,8 @@ from capabilities.srv import StartCapabilityResponse from capabilities.srv import StopCapability from capabilities.srv import StopCapabilityResponse +from capabilities.srv import UseCapability +from capabilities.srv import UseCapabilityResponse from capabilities.discovery import package_index_from_package_path from capabilities.discovery import spec_file_index_from_package_index @@ -135,6 +144,11 @@ def __init__(self, provider, provider_path, started_by='unknown'): self.depends_on = [x for x in provider.dependencies] self.canceled = False self.started_by = started_by + self.bonds = {} # {bond_id: reference_count} + + @property + def reference_count(self): + return sum(list(self.bonds.values())) @property def state(self): @@ -270,6 +284,7 @@ def __init__(self, package_paths, screen=None): self.__blacklist = None self.__default_providers = {} self.__missing_default_provider_is_an_error = rospy.get_param('~missing_default_provider_is_an_error', False) + self.__bonds = {} def spin(self): """Starts the capability server by setting up ROS comms, then spins""" @@ -301,6 +316,8 @@ def spin(self): self.__load_capabilities() + self.__bond_topic = rospy.get_name() + "/bonds" + # Collect default arguments self.__populate_default_providers() @@ -310,6 +327,15 @@ def spin(self): self.__stop_capability_service = rospy.Service( '~stop_capability', StopCapability, self.handle_stop_capability) + self.__establish_bond_service = rospy.Service( + '~establish_bond', EstablishBond, self.handle_establish_bond) + + self.__free_capability_service = rospy.Service( + '~free_capability', FreeCapability, self.handle_free_capability) + + self.__use_capability_service = rospy.Service( + '~use_capability', UseCapability, self.handle_use_capability) + self.__reload_service = rospy.Service( '~reload_capabilities', Empty, self.handle_reload_request) @@ -715,6 +741,94 @@ def _handle_stop_capability(self, req): self.__stop_capability(capability) return StopCapabilityResponse(True) + def handle_establish_bond(self, req): + return self.__catch_and_log(self._handle_establish_bond, req) + + def _handle_establish_bond(self, req): + rospy.loginfo("Request to establish a bond") + bond_id = str(uuid.uuid1()) + + def on_formed(): + rospy.loginfo("Bond formed with bond_id of '{0}'" + .format(bond_id)) + + def on_broken(): + # if bond_id in self.__bonds: + rospy.loginfo("Bond with bond id '{0}' was broken, freeing associated capabilities" + .format(bond_id)) + self.__free_capabilities_by_bond_id(bond_id) + del self.__bonds[bond_id] + + self.__bonds[bond_id] = Bond(self.__bond_topic, bond_id, on_broken=on_broken, on_formed=on_formed) + self.__bonds[bond_id].start() + return EstablishBondResponse(bond_id) + + def __free_capabilities_by_bond_id(self, bond_id): + if bond_id in self.__bonds: + for capability in self.__capability_instances.values(): + if bond_id in capability.bonds: + del capability.bonds[bond_id] + if capability.reference_count == 0: + rospy.loginfo("Capability '{0}' being stopped because it has zero references" + .format(capability.interface)) + self.__stop_capability(capability.interface) + + def __free_capability(self, capability_name, bond_id): + if capability_name not in self.__capability_instances: + raise RuntimeError("No Capability '{0}' in use".format(capability_name)) + capability = self.__capability_instances[capability_name] + if bond_id not in capability.bonds: + raise RuntimeError("Given bond_id '{0}' not associated with given capability '{1}'" + .format(bond_id, capability_name)) + if capability.bonds[bond_id] == 0: + raise RuntimeError("Cannot free capability '{0}' for bond_id '{1}', it already has a reference count of 0" + .format(capability_name, bond_id)) + capability.bonds[bond_id] -= 1 + if capability.reference_count == 0: + rospy.loginfo("Capability '{0}' being stopped because it has zero references" + .format(capability.interface)) + self.__stop_capability(capability.interface) + + def handle_free_capability(self, req): + return self.__catch_and_log(self._handle_free_capability, req) + + def _handle_free_capability(self, req): + rospy.loginfo("Request to free usage of capability '{0}' (bond id '{1}')" + .format(req.capability, req.bond_id)) + self.__free_capability(req.capability, req.bond_id) + return FreeCapabilityResponse() + + def handle_use_capability(self, req): + return self.__catch_and_log(self._handle_use_capability, req) + + def _handle_use_capability(self, req): + msg = "Request to use capability '{0}'".format(req.capability) + if req.preferred_provider: + msg += " with provider '{0}'".format(req.preferred_provider) + rospy.loginfo(msg) + # Make sure the bond_id is valid + if req.bond_id not in self.__bonds: + raise RuntimeError("Invalid bond_id given to ~use_capability: '{0}'".format(req.bond_id)) + # Start the capability if it is not already running + if req.capability not in self.__capability_instances: + ret = self.__start_capability(req.capability, req.preferred_provider) + if not ret: + raise RuntimeError("Failed to start capability '{0}' with preferred provider '{1}'" + .format(req.capability, req.preferred_provider)) + assert req.capability in self.__capability_instances # Should be true + # Get a handle ont he capability + capability = self.__capability_instances[req.capability] + if req.preferred_provider and capability.name != req.preferred_provider: + raise RuntimeError("Requested to use capability '{0}' with preferred provider '{1}', " + .format(capability.interface, req.preferred_provider) + + "but the capability is already running with provider '{0}'" + .format(capability.name)) + if req.bond_id not in capability.bonds: + capability.bonds[req.bond_id] = 0 + capability.bonds[req.bond_id] += 1 + + return UseCapabilityResponse() + def handle_reload_request(self, req): return self.__catch_and_log(self._handle_reload_request, req) diff --git a/srv/EstablishBond.srv b/srv/EstablishBond.srv new file mode 100644 index 0000000..57235e2 --- /dev/null +++ b/srv/EstablishBond.srv @@ -0,0 +1,3 @@ + +--- +string bond_id diff --git a/srv/FreeCapability.srv b/srv/FreeCapability.srv new file mode 100644 index 0000000..ff718c5 --- /dev/null +++ b/srv/FreeCapability.srv @@ -0,0 +1,3 @@ +string capability +string bond_id +--- diff --git a/srv/UseCapability.srv b/srv/UseCapability.srv new file mode 100644 index 0000000..fd6b98e --- /dev/null +++ b/srv/UseCapability.srv @@ -0,0 +1,4 @@ +string capability +string preferred_provider +string bond_id +---