Skip to content

Commit

Permalink
Update tipsy to use sensors service (#2125)
Browse files Browse the repository at this point in the history
Co-authored-by: Hazal Mestci <[email protected]>
  • Loading branch information
npentrel and mestcihazal authored Oct 31, 2023
1 parent 19c9b31 commit 1570a42
Showing 1 changed file with 33 additions and 31 deletions.
64 changes: 33 additions & 31 deletions docs/tutorials/projects/tipsy.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ mp4Src: "/tutorials/tipsy/tipsy-preview.mp4"
authors: ["Hazal Mestci"]
languages: ["python"]
viamresources:
["board", "motor", "base", "camera", "sensor", "mlmodel", "vision"]
["board", "motor", "base", "camera", "sensor", "mlmodel", "vision", "sensors"]
level: "Intermediate"
date: "2023-05-29"
# updated: ""
Expand Down Expand Up @@ -622,6 +622,7 @@ from viam.rpc.dial import Credentials, DialOptions
from viam.components.sensor import Sensor
from viam.components.base import Base
from viam.services.vision import VisionClient
from viam.services.sensors import SensorsClient
```

Then it connects to our robot using a robot location secret and address.
Expand All @@ -645,29 +646,26 @@ pause_interval = os.getenv('PAUSE_INTERVAL') or 3
{{% snippet "show-secret.md" %}}

Next, the code defines functions for obstacle detection.
The first method, `obstacle_detect()`, gets readings from a sensor, which is used by the second method, `gather_obstacle_readings()`, to gather all the distance readings from a list of sensors. Lastly, the third method, `obstacle_detect_loop()`, uses an infinite loop to periodically check the readings to stop the base if it’s closer than a certain distance from an obstacle:
The first method, `get_obstacle_readings()`, gets all the distance readings from a list of sensors. The second method, `obstacle_detect_loop()`, uses an infinite loop to periodically check the readings to stop the base if it’s closer than a certain distance from an obstacle:

```python {class="line-numbers linkable-line-numbers"}
async def obstacle_detect(sensor):
reading = (await sensor.get_readings())["distance"]
return reading
async def get_obstacle_readings(sensors: list[Sensor],
sensors_svc: SensorsClient):
sensor_values = (await sensors_svc.get_readings(sensors)).values()
return [r["distance"] for r in sensor_values]


async def gather_obstacle_readings(sensors):
return await asyncio.gather(
*[obstacle_detect(sensor) for sensor in sensors]
)


async def obstacle_detect_loop(sensors, base):
while (True):
distances = await gather_obstacle_readings(sensors)
async def obstacle_detect_loop(sensors: list[Sensor],
sensors_svc: SensorsClient,
base: Base):
while True:
distances = await get_obstacle_readings(sensors, sensors_svc)
if any(distance < 0.4 for distance in distances):
# stop the base if moving straight
if base_state == "straight":
await base.stop()
print("obstacle in front")
await asyncio.sleep(.01)
await asyncio.sleep(0.01)
```

Then, we define a person detection loop, where the robot is constantly looking for a person, and if it finds the person, it goes toward them as long as there are no obstacles in front.
Expand All @@ -676,24 +674,26 @@ If it doesn’t find a person, it will continue looking by rotating the robot ba
Lines 12 and 13 are where it checks specifically for detections with the label `Person` and not every object in the `labels.txt` file:

```python {class="line-numbers linkable-line-numbers" data-line="12-13"}
async def person_detect(detector, sensors, base):
while (True):
# look for a person
async def person_detect(detector: VisionClient,
sensors: list[Sensor],
sensors_svc: SensorsClient,
base: Base):
while True:
# look for person
found = False
global base_state
print("will detect")
detections = await detector.get_detections_from_camera(camera_name)
for d in detections:
if d.confidence > .7:
if d.confidence > 0.7:
print(d.class_name)
# specify it is just the person we want to detect
if (d.class_name == "Person"):
if d.class_name == "Person":
found = True
if (found):
if found:
print("I see a person")
# first manually call gather_obstacle_readings -
# don't even start moving if someone in the way
distances = await gather_obstacle_readings(sensors)
# first manually call get_obstacle_readings - don't even start
# moving if someone is in the way
distances = await get_obstacle_readings(sensors, sensors_svc)
if all(distance > 0.4 for distance in distances):
print("will move straight")
base_state = "straight"
Expand All @@ -715,22 +715,24 @@ It also creates two background tasks running asynchronously, one looking for obs
async def main():
robot = await connect()
base = Base.from_robot(robot, base_name)
sensors = [Sensor.from_robot(robot, sensor_name)
for sensor_name in sensor_names]
detector = VisionServiceClient.from_robot(robot, "myPeopleDetector")
sensors_svc = SensorsClient.from_robot(robot, name=sensor_service_name)
sensors = await sensors_svc.get_sensors()
detector = VisionClient.from_robot(robot, name=detector_name)

# create a background task that looks for obstacles and stops the base if
# it's moving
# it is moving
obstacle_task = asyncio.create_task(
obstacle_detect_loop(sensors, base))
obstacle_detect_loop(sensors, sensors_svc, base))
# create a background task that looks for a person and moves towards them,
# or turns and keeps looking
person_task = asyncio.create_task(
person_detect(detector, sensors, base))
person_detect(detector, sensors, sensors_svc, base))
results = await asyncio.gather(obstacle_task,
person_task,
return_exceptions=True)
print(results)

await robot.close()
```

When you run the code, you should see results like this:
Expand Down

0 comments on commit 1570a42

Please sign in to comment.