diff --git a/src/viam/services/motion/client.py b/src/viam/services/motion/client.py index 8ed4ce6ad..27ce1dd69 100644 --- a/src/viam/services/motion/client.py +++ b/src/viam/services/motion/client.py @@ -10,7 +10,6 @@ GeoPoint, Pose, PoseInFrame, - ResourceName, Transform, WorldState, ) @@ -54,7 +53,7 @@ def __init__(self, name: str, channel: Channel): async def move( self, - component_name: ResourceName, + component_name: str, destination: PoseInFrame, world_state: Optional[WorldState] = None, constraints: Optional[Constraints] = None, @@ -77,9 +76,9 @@ async def move( async def move_on_globe( self, - component_name: ResourceName, + component_name: str, destination: GeoPoint, - movement_sensor_name: ResourceName, + movement_sensor_name: str, obstacles: Optional[Sequence[GeoGeometry]] = None, heading: Optional[float] = None, configuration: Optional[MotionConfiguration] = None, @@ -106,9 +105,9 @@ async def move_on_globe( async def move_on_map( self, - component_name: ResourceName, + component_name: str, destination: Pose, - slam_service_name: ResourceName, + slam_service_name: str, configuration: Optional[MotionConfiguration] = None, obstacles: Optional[Sequence[Geometry]] = None, *, @@ -131,7 +130,7 @@ async def move_on_map( async def stop_plan( self, - component_name: ResourceName, + component_name: str, *, extra: Optional[Mapping[str, ValueTypes]] = None, timeout: Optional[float] = None, @@ -149,7 +148,7 @@ async def stop_plan( async def get_plan( self, - component_name: ResourceName, + component_name: str, last_plan_only: bool = False, execution_id: Optional[str] = None, *, @@ -189,7 +188,7 @@ async def list_plan_statuses( async def get_pose( self, - component_name: ResourceName, + component_name: str, destination_frame: str, supplemental_transforms: Optional[Sequence[Transform]] = None, *, diff --git a/src/viam/services/motion/motion.py b/src/viam/services/motion/motion.py index 694e03ea8..7ca4d4fa8 100644 --- a/src/viam/services/motion/motion.py +++ b/src/viam/services/motion/motion.py @@ -7,7 +7,7 @@ else: from typing_extensions import TypeAlias -from viam.proto.common import GeoGeometry, Geometry, GeoPoint, Pose, PoseInFrame, ResourceName, Transform, WorldState +from viam.proto.common import GeoGeometry, Geometry, GeoPoint, Pose, PoseInFrame, Transform, WorldState from viam.proto.service.motion import Constraints, GetPlanResponse, MotionConfiguration, PlanStatusWithID from viam.resource.types import API, RESOURCE_NAMESPACE_RDK, RESOURCE_TYPE_SERVICE from viam.utils import ValueTypes @@ -33,7 +33,7 @@ class Motion(ServiceBase): @abc.abstractmethod async def move( self, - component_name: ResourceName, + component_name: str, destination: PoseInFrame, world_state: Optional[WorldState] = None, constraints: Optional[Constraints] = None, @@ -44,18 +44,17 @@ async def move( """Plan and execute a movement to move the component specified to its goal destination. Note: Frames designated with respect to components can also be used as the ``component_name`` when calling for a move. This - technique allows for planning and moving the frame itself to the ``destination``. To do so, simply create a resource name with - originating ReferenceFrame's name. Then pass in the resource name into ``component_name``. Ex:: + technique allows for planning and moving the frame itself to the ``destination``. + To do so, simply pass in a string into ``component_name``. Ex:: - resource_name = Gripper.get_resource_name("externalFrame") - success = await MotionServiceClient.move(resource_name, ...) + success = await MotionServiceClient.move("externalFrame", ...) :: motion = MotionClient.from_robot(robot=machine, name="builtin") - # Assumes a gripper configured with name "my_gripper" on the machine - gripper_name = Gripper.get_resource_name("my_gripper") + # Assumes "my_gripper" on the machine + gripper_name = "my_gripper" my_frame = "my_gripper_offset" goal_pose = Pose(x=0, y=0, z=300, o_x=0, o_y=0, o_z=1, theta=0) @@ -69,7 +68,7 @@ async def move( extra={}) Args: - component_name (viam.proto.common.ResourceName): Name of a component on a given robot. + component_name (str): Name of a component on a given robot. destination (viam.proto.common.PoseInFrame): The destination to move to, expressed as a ``Pose`` and the frame in which it was observed. world_state (viam.proto.common.WorldState): When supplied, the motion service will create a plan that obeys any constraints @@ -95,9 +94,9 @@ async def move( @abc.abstractmethod async def move_on_globe( self, - component_name: ResourceName, + component_name: str, destination: GeoPoint, - movement_sensor_name: ResourceName, + movement_sensor_name: str, obstacles: Optional[Sequence[GeoGeometry]] = None, heading: Optional[float] = None, configuration: Optional[MotionConfiguration] = None, @@ -120,24 +119,23 @@ async def move_on_globe( motion = MotionClient.from_robot(robot=machine, name="builtin") - # Get the ResourceNames of the base and movement sensor - my_base_resource_name = Base.get_resource_name("my_base") - mvmnt_sensor_resource_name = MovementSensor.get_resource_name( - "my_movement_sensor") + # Get the names of the base and movement sensor + my_base_name = "my_base" + mvmnt_sensor_name = "my_movement_sensor" # Define a destination GeoPoint at the GPS coordinates [0, 0] my_destination = movement_sensor.GeoPoint(latitude=0, longitude=0) # Move the base component to the designated geographic location, as reported by the movement sensor execution_id = await motion.move_on_globe( - component_name=my_base_resource_name, + component_name=my_base_name, destination=my_destination, - movement_sensor_name=mvmnt_sensor_resource_name) + movement_sensor_name=mvmnt_sensor_name) Args: - component_name (ResourceName): The ResourceName of the base to move. + component_name (str): The name of the base to move. destination (GeoPoint): The location of the component's destination, represented in geographic notation as a GeoPoint (lat, lng). - movement_sensor_name (ResourceName): The ResourceName of the movement sensor that you want to use to check + movement_sensor_name (str): The name of the movement sensor that you want to use to check the machine's location. obstacles (Optional[Sequence[GeoGeometry]]): Obstacles to consider when planning the motion of the component, with each represented as a GeoGeometry. Default: None @@ -170,9 +168,9 @@ async def move_on_globe( @abc.abstractmethod async def move_on_map( self, - component_name: ResourceName, + component_name: str, destination: Pose, - slam_service_name: ResourceName, + slam_service_name: str, configuration: Optional[MotionConfiguration] = None, obstacles: Optional[Sequence[Geometry]] = None, *, @@ -194,23 +192,23 @@ async def move_on_map( motion = MotionClient.from_robot(robot=machine, name="builtin") - # Get the ResourceNames of the base component and SLAM service - my_base_resource_name = Base.get_resource_name("my_base") - my_slam_service_name = SLAMClient.get_resource_name("my_slam_service") + # Get the names of the base component and SLAM service + my_base_name = "my_base" + my_slam_service_name = "my_slam_service" # Define a destination pose with respect to the origin of the map from the SLAM service "my_slam_service" my_pose = Pose(y=10) # Move the base component to the destination pose of Y=10, a location of # (0, 10, 0) in respect to the origin of the map - execution_id = await motion.move_on_map(component_name=my_base_resource_name, + execution_id = await motion.move_on_map(component_name=my_base_name, destination=my_pose, slam_service_name=my_slam_service_name) Args: - component_name (ResourceName): The ResourceName of the base to move. + component_name (str): The name of the base to move. destination (Pose): The destination, which can be any Pose with respect to the SLAM map's origin. - slam_service_name (ResourceName): The ResourceName of the SLAM service from which the SLAM map is requested. + slam_service_name (str): The name of the SLAM service from which the SLAM map is requested. configuration (Optional[MotionConfiguration]): The configuration you want to set across this machine for this motion service. This parameter and each of its fields are optional. @@ -237,7 +235,7 @@ async def move_on_map( @abc.abstractmethod async def stop_plan( self, - component_name: ResourceName, + component_name: str, *, extra: Optional[Mapping[str, ValueTypes]] = None, timeout: Optional[float] = None, @@ -251,11 +249,11 @@ async def stop_plan( # Assuming a `move_on_globe()` started the execution # Stop the base component which was instructed to move by `move_on_globe()` # or `move_on_map()` - my_base_resource_name = Base.get_resource_name("my_base") + my_base_name = "my_base" await motion.stop_plan(component_name=mvmnt_sensor) Args: - component_name (ResourceName): The component to stop + component_name (str): The component to stop For more information, see `Motion service `_. """ @@ -264,7 +262,7 @@ async def stop_plan( @abc.abstractmethod async def get_plan( self, - component_name: ResourceName, + component_name: str, last_plan_only: bool = False, execution_id: Optional[str] = None, *, @@ -291,12 +289,12 @@ async def get_plan( :: motion = MotionClient.from_robot(robot=machine, name="builtin") - my_base_resource_name = Base.get_resource_name("my_base") + my_base_name = "my_base" # Get the plan(s) of the base component which was instructed to move by `MoveOnGlobe()` or `MoveOnMap()` - resp = await motion.get_plan(component_name=my_base_resource_name) + resp = await motion.get_plan(component_name=my_base_name) Args: - component_name (ResourceName): The component to stop + component_name (str): The component to stop last_plan_only (Optional[bool]): If supplied, the response will only return the last plan for the component / execution. execution_id (Optional[str]): If supplied, the response will only return plans with the provided execution_id. @@ -343,7 +341,7 @@ async def list_plan_statuses( @abc.abstractmethod async def get_pose( self, - component_name: ResourceName, + component_name: str, destination_frame: str, supplemental_transforms: Optional[Sequence[Transform]] = None, *, @@ -359,7 +357,7 @@ async def get_pose( # (``Arm``, ``Base``, etc). # Create a `component_name`: - component_name = Gripper.get_resource_name("my_gripper") + component_name = "my_gripper" from viam.components.gripper import Gripper from viam.services.motion import MotionClient @@ -368,12 +366,12 @@ async def get_pose( robot = await connect() motion = MotionClient.from_robot(robot=machine, name="builtin") - gripperName = Gripper.get_resource_name("my_gripper") + gripperName = "my_gripper" gripperPoseInWorld = await motion.get_pose(component_name=gripperName, destination_frame="world") Args: - component_name (viam.proto.common.ResourceName): Name of a component on a robot. + component_name (str): Name of a component on a robot. destination_frame (str): Name of the desired reference frame. supplemental_transforms (Optional[List[viam.proto.common.Transform]]): Transforms used to augment the robot's frame while calculating pose. diff --git a/tests/test_motion_service.py b/tests/test_motion_service.py index 219474e04..f7ea2f11f 100644 --- a/tests/test_motion_service.py +++ b/tests/test_motion_service.py @@ -5,8 +5,6 @@ from google.protobuf.timestamp_pb2 import Timestamp from grpclib.testing import ChannelFor -from viam.components.arm import Arm -from viam.components.base import Base from viam.gen.service.motion.v1.motion_pb2 import ( PLAN_STATE_FAILED, PLAN_STATE_IN_PROGRESS, @@ -19,7 +17,7 @@ PlanStep, PlanWithStatus, ) -from viam.proto.common import GeoGeometry, Geometry, GeoPoint, Pose, PoseInFrame, ResourceName, Transform, WorldState +from viam.proto.common import GeoGeometry, Geometry, GeoPoint, Pose, PoseInFrame, Transform, WorldState from viam.proto.service.motion import Constraints, LinearConstraint, MotionConfiguration from viam.resource.manager import ResourceManager from viam.services.motion import MotionClient @@ -37,7 +35,7 @@ def motion(): class MockMotion(Motion): async def move( self, - component_name: ResourceName, + component_name: str, destination: PoseInFrame, world_state: Optional[WorldState] = None, constraints: Optional[Constraints] = None, @@ -49,9 +47,9 @@ async def move( async def move_on_globe( self, - component_name: ResourceName, + component_name: str, destination: GeoPoint, - movement_sensor_name: ResourceName, + movement_sensor_name: str, obstacles: Optional[Sequence[GeoGeometry]] = None, heading: Optional[float] = None, configuration: Optional[MotionConfiguration] = None, @@ -64,9 +62,9 @@ async def move_on_globe( async def move_on_map( self, - component_name: ResourceName, + component_name: str, destination: Pose, - slam_service_name: ResourceName, + slam_service_name: str, configuration: Optional[MotionConfiguration] = None, obstacles: Optional[Sequence[Geometry]] = None, *, @@ -76,13 +74,13 @@ async def move_on_map( raise NotImplementedError async def stop_plan( - self, component_name: ResourceName, *, extra: Optional[Mapping[str, ValueTypes]] = None, timeout: Optional[float] = None + self, component_name: str, *, extra: Optional[Mapping[str, ValueTypes]] = None, timeout: Optional[float] = None ): raise NotImplementedError async def get_plan( self, - component_name: ResourceName, + component_name: str, last_plan_only: bool = False, execution_id: Optional[str] = None, *, @@ -98,7 +96,7 @@ async def list_plan_statuses( async def get_pose( self, - component_name: ResourceName, + component_name: str, destination_frame: str, supplemental_transforms: Optional[Sequence[Transform]] = None, *, @@ -121,7 +119,7 @@ async def test_move(self, motion: Motion, service: MotionRPCService): patched_method.return_value = True async with ChannelFor([service]) as channel: client = MotionClient(MOTION_SERVICE_NAME, channel) - resource_name = Arm.get_resource_name("arm") + resource_name = "arm" destination = PoseInFrame(reference_frame="refframe") world_state = WorldState(transforms=[Transform(reference_frame="ws_tfrm_rf")]) constraints = Constraints(linear_constraint=[LinearConstraint(), LinearConstraint(line_tolerance_mm=2)]) @@ -150,7 +148,7 @@ async def test_get_pose(self, motion: Motion, service: MotionRPCService): patched_method.return_value = response async with ChannelFor([service]) as channel: client = MotionClient(MOTION_SERVICE_NAME, channel) - rn = Arm.get_resource_name("arm") + rn = "arm" destination_frame = "x" transforms = [Transform(reference_frame="y")] extra = {"foo": "bar"} @@ -170,9 +168,9 @@ async def test_move_on_map(self, motion: Motion, service: MotionRPCService): patched_method.return_value = resopnse async with ChannelFor([service]) as channel: client = MotionClient(MOTION_SERVICE_NAME, channel) - component_rn = Base.get_resource_name("move_on_globe_base") + component_rn = "move_on_globe_base" destination = Pose(x=1, y=2, z=3, theta=4) - slam_rn = ResourceName(namespace="rdk", type="service", subtype="slam", name="move_on_map_slam") + slam_rn = "move_on_map_slam" configuration = MotionConfiguration(position_polling_frequency_hz=4.44) obstacles = [Geometry(center=Pose(x=9, y=8, z=7, theta=6))] extra = {"foo": "bar"} @@ -202,9 +200,9 @@ async def test_move_on_globe(self, motion: Motion, service: MotionRPCService): patched_method.return_value = response async with ChannelFor([service]) as channel: client = MotionClient(MOTION_SERVICE_NAME, channel) - component_rn = Base.get_resource_name("move_on_globe_base") + component_rn = "move_on_globe_base" destination = GeoPoint(latitude=123, longitude=456) - movement_rn = ResourceName(namespace="rdk", type="component", subtype="movement_sensor", name="move_on_globe_ms") + movement_rn = "move_on_globe_ms" obstacles = [GeoGeometry(location=GeoPoint(latitude=44, longitude=786))] heading = 5.55 configuration = MotionConfiguration(position_polling_frequency_hz=4.44) @@ -238,7 +236,7 @@ async def test_stop_plan(self, motion: Motion, service: MotionRPCService): with patch.object(motion, "stop_plan") as patched_method: async with ChannelFor([service]) as channel: client = MotionClient(MOTION_SERVICE_NAME, channel) - component_rn = Base.get_resource_name("stop_plan_base") + component_rn = "stop_plan_base" extra = {"foo": "bar"} timeout = 4 await client.stop_plan(component_rn, extra=extra, timeout=timeout) @@ -253,7 +251,7 @@ async def test_get_plan(self, motion: Motion, service: MotionRPCService): current_plan_with_status=PlanWithStatus( plan=Plan( id="plan_id2", - component_name=Base.get_resource_name("get_plan_base"), + component_name="get_plan_base", execution_id="execution_id", steps=[ PlanStep(step={"get_plan_base": ComponentState(pose=Pose(x=2, y=3, z=4, o_x=3, o_y=4, o_z=5, theta=21))}), @@ -267,7 +265,7 @@ async def test_get_plan(self, motion: Motion, service: MotionRPCService): PlanWithStatus( plan=Plan( id="plan_id1", - component_name=Base.get_resource_name("get_plan_base"), + component_name="get_plan_base", execution_id="execution_id", steps=[ PlanStep(step={"get_plan_base": ComponentState(pose=Pose(x=1, y=1, z=1, o_x=1, o_y=1, o_z=1, theta=20))}), @@ -283,7 +281,7 @@ async def test_get_plan(self, motion: Motion, service: MotionRPCService): patched_method.return_value = response async with ChannelFor([service]) as channel: client = MotionClient(MOTION_SERVICE_NAME, channel) - component_rn = Base.get_resource_name("get_plan_base") + component_rn = "get_plan_base" last_plan_only = True execution_id = "ex_id" extra = {"foo": "bar"}