Skip to content

Commit

Permalink
Continue on a row (#221)
Browse files Browse the repository at this point in the history
Currently, the robot wants to start at the nearest end of a row, but
that leads to him turning on the row, if the start point is behind him.
This draft switches start and end point if the robot is between them.

I think this should do more than that.
If the robot is directly on a row, it should skip the approach state and
just continue weeding
If it's somewhere between two rows, it should not start and display a
message for the user to return to a suitable starting position

---------

Co-authored-by: Lukas Baecker <[email protected]>
Co-authored-by: LukasBaecker <[email protected]>
  • Loading branch information
3 people authored Nov 26, 2024
1 parent 1651b81 commit 3409ff2
Show file tree
Hide file tree
Showing 3 changed files with 136 additions and 61 deletions.
35 changes: 19 additions & 16 deletions field_friend/automations/field.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,22 +140,7 @@ def _generate_rows(self) -> list[Row]:

def _generate_outline(self) -> list[GeoPoint]:
assert len(self.rows) > 0
outline_unbuffered: list[Point] = [
self.first_row_end.cartesian(),
self.first_row_start.cartesian()
]
if len(self.rows) > 1:
for p in self.rows[-1].points:
outline_unbuffered.append(p.cartesian())
outline_shape = Polygon([p.tuple for p in outline_unbuffered])
else:
outline_shape = LineString([p.tuple for p in outline_unbuffered])
bufferd_polygon = outline_shape.buffer(
self.outline_buffer_width, cap_style='square', join_style='mitre', mitre_limit=math.inf)
bufferd_polygon_coords = bufferd_polygon.exterior.coords
outline: list[GeoPoint] = [localization.reference.shifted(
Point(x=p[0], y=p[1])) for p in bufferd_polygon_coords]
return outline
return self.get_buffered_area(self.rows, self.outline_buffer_width)

def to_dict(self) -> dict:
return {
Expand Down Expand Up @@ -186,3 +171,21 @@ def from_dict(cls, data: dict[str, Any]) -> Self:
RowSupportPoint, sp) for sp in data['row_support_points']] if 'row_support_points' in data else []
field_data = cls(**cls.args_from_dict(data))
return field_data

def get_buffered_area(self, rows: list[Row], buffer_width: float) -> list[GeoPoint]:
outline_unbuffered: list[Point] = [
self.first_row_end.cartesian(),
self.first_row_start.cartesian()
]
if len(self.rows) > 1:
for p in self.rows[-1].points:
outline_unbuffered.append(p.cartesian())
outline_shape = Polygon([p.tuple for p in outline_unbuffered])
else:
outline_shape = LineString([p.tuple for p in outline_unbuffered])
bufferd_polygon = outline_shape.buffer(
self.outline_buffer_width, cap_style='square', join_style='mitre', mitre_limit=math.inf)
bufferd_polygon_coords = bufferd_polygon.exterior.coords
outline: list[GeoPoint] = [localization.reference.shifted(
Point(x=p[0], y=p[1])) for p in bufferd_polygon_coords]
return outline
136 changes: 104 additions & 32 deletions field_friend/automations/navigation/field_navigation.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,20 @@


class State(Enum):
APPROACHING_ROW_START = auto()
FOLLOWING_ROW = auto()
APPROACH_START_ROW = auto()
CHANGE_ROW = auto()
FOLLOW_ROW = auto()
ROW_COMPLETED = auto()
FIELD_COMPLETED = auto()
ERROR = auto()


class FieldNavigation(StraightLineNavigation):
DRIVE_STEP = 0.2
TURN_STEP = np.deg2rad(25.0)
MAX_GNSS_WAITING_TIME = 15.0
MAX_DISTANCE_DEVIATION = 0.05
MAX_ANGLE_DEVIATION = np.deg2rad(10.0)

def __init__(self, system: 'System', implement: Implement) -> None:
super().__init__(system, implement)
Expand All @@ -36,7 +40,7 @@ def __init__(self, system: 'System', implement: Implement) -> None:
self.automation_watcher = system.automation_watcher
self.field_provider = system.field_provider

self._state = State.APPROACHING_ROW_START
self._state = State.APPROACH_START_ROW
self.row_index = 0
self.start_point: Point | None = None
self.end_point: Point | None = None
Expand All @@ -49,6 +53,7 @@ def __init__(self, system: 'System', implement: Implement) -> None:
self._turn_step = self.TURN_STEP
self._max_gnss_waiting_time = self.MAX_GNSS_WAITING_TIME
self.rows_to_work_on: list[Row] = []
self.robot_in_working_area = False

@property
def current_row(self) -> Row:
Expand All @@ -73,7 +78,7 @@ async def prepare(self) -> bool:
rosys.notify(f'Row {idx} on field {self.field.name} has not enough points', 'negative')
return False
self.row_index = self.field.rows.index(self.get_nearest_row())
self._state = State.APPROACHING_ROW_START
self._state = State.APPROACH_START_ROW
self.plant_provider.clear()

self.automation_watcher.start_field_watch(self.field.outline)
Expand All @@ -83,7 +88,7 @@ async def prepare(self) -> bool:
return True

def _should_finish(self) -> bool:
return self._state == State.FIELD_COMPLETED
return self._state in (State.FIELD_COMPLETED, State.ERROR)

async def finish(self) -> None:
await super().finish()
Expand All @@ -103,17 +108,27 @@ def set_start_and_end_points(self):
assert self.field is not None
self.start_point = None
self.end_point = None
relative_point_0 = self.odometer.prediction.distance(self.current_row.points[0].cartesian())
relative_point_1 = self.odometer.prediction.distance(self.current_row.points[-1].cartesian())
# self.log.info(f'Relative point 0: {relative_point_0} Relative point 1: {relative_point_1}')
if relative_point_0 < relative_point_1:
self.start_point = self.current_row.points[0].cartesian()
self.end_point = self.current_row.points[-1].cartesian()
elif relative_point_1 < relative_point_0:
self.start_point = self.current_row.points[-1].cartesian()
self.end_point = self.current_row.points[0].cartesian()
start_point = self.current_row.points[0].cartesian()
end_point = self.current_row.points[-1].cartesian()

swap_points: bool
self.robot_in_working_area = self._is_in_working_area(start_point, end_point)
if self.robot_in_working_area:
abs_angle_to_start = abs(self.odometer.prediction.relative_direction(start_point))
abs_angle_to_end = abs(self.odometer.prediction.relative_direction(end_point))
swap_points = abs_angle_to_start < abs_angle_to_end
else:
distance_to_start = self.odometer.prediction.distance(start_point)
distance_to_end = self.odometer.prediction.distance(end_point)
swap_points = distance_to_start > distance_to_end

if swap_points:
self.log.debug('Swapping start and end points')
start_point, end_point = end_point, start_point
self.start_point = start_point
self.end_point = end_point
self.log.debug('Start point: %s End point: %s', self.start_point, self.end_point)
self.update_target()
# self.log.info(f'Start point: {self.start_point} End point: {self.end_point}')

def update_target(self) -> None:
self.origin = self.odometer.prediction.point
Expand All @@ -127,15 +142,54 @@ async def _drive(self, distance: float) -> None:
if self.odometer.prediction.distance(self.gnss._last_gnss_pose) > 1.0: # pylint: disable=protected-access
await self.gnss.ROBOT_POSE_LOCATED.emitted(self._max_gnss_waiting_time)

if self._state == State.APPROACHING_ROW_START:
self._state = await self._run_approaching_row_start()
elif self._state == State.FOLLOWING_ROW:
self._state = await self._run_following_row(distance)
if self._state == State.APPROACH_START_ROW:
self._state = await self._run_approach_start_row()
elif self._state == State.CHANGE_ROW:
self._state = await self._run_change_row()
elif self._state == State.FOLLOW_ROW:
self._state = await self._run_follow_row(distance)
elif self._state == State.ROW_COMPLETED:
self._state = await self._run_row_completed()

async def _run_approaching_row_start(self) -> State:
async def _run_approach_start_row(self) -> State:
self.robot_in_working_area = False
self.set_start_and_end_points()
if self.start_point is None or self.end_point is None:
return State.ERROR
if not self._is_start_allowed(self.start_point, self.end_point, self.robot_in_working_area):
return State.ERROR

if isinstance(self.detector, rosys.vision.DetectorSimulation) and not rosys.is_test:
self.create_simulation()
else:
self.plant_provider.clear()

await self.gnss.ROBOT_POSE_LOCATED.emitted(self._max_gnss_waiting_time)
if not self.robot_in_working_area:
# turn towards row start
assert self.start_point is not None
target_yaw = self.odometer.prediction.direction(self.start_point)
await self.turn_in_steps(target_yaw)
# drive to row start
await self.drive_in_steps(Pose(x=self.start_point.x, y=self.start_point.y, yaw=target_yaw))
await self.gnss.ROBOT_POSE_LOCATED.emitted(self._max_gnss_waiting_time)
# turn to row
assert self.end_point is not None
driving_yaw = self.odometer.prediction.direction(self.end_point)
await self.turn_in_steps(driving_yaw)
return State.FOLLOW_ROW

async def _run_change_row(self) -> State:
self.robot_in_working_area = False
self.set_start_and_end_points()
if self.start_point is None or self.end_point is None:
return State.ERROR

if isinstance(self.detector, rosys.vision.DetectorSimulation) and not rosys.is_test:
self.create_simulation()
else:
self.plant_provider.clear()

await self.gnss.ROBOT_POSE_LOCATED.emitted(self._max_gnss_waiting_time)
# turn towards row start
assert self.start_point is not None
Expand All @@ -146,14 +200,9 @@ async def _run_approaching_row_start(self) -> State:
await self.gnss.ROBOT_POSE_LOCATED.emitted(self._max_gnss_waiting_time)
# turn to row
assert self.end_point is not None
row_yaw = self.start_point.direction(self.end_point)
await self.turn_in_steps(row_yaw)

if isinstance(self.detector, rosys.vision.DetectorSimulation) and not rosys.is_test:
self.create_simulation()
else:
self.plant_provider.clear()
return State.FOLLOWING_ROW
driving_yaw = self.odometer.prediction.direction(self.end_point)
await self.turn_in_steps(driving_yaw)
return State.FOLLOW_ROW

async def drive_in_steps(self, target: Pose) -> None:
while True:
Expand Down Expand Up @@ -191,7 +240,7 @@ async def turn_in_steps(self, target_yaw: float) -> None:
await self.gnss.ROBOT_POSE_LOCATED.emitted(self._max_gnss_waiting_time)
angle_difference = rosys.helpers.angle(self.odometer.prediction.yaw, target_yaw)

async def _run_following_row(self, distance: float) -> State:
async def _run_follow_row(self, distance: float) -> State:
assert self.end_point is not None
assert self.start_point is not None
end_pose = rosys.geometry.Pose(x=self.end_point.x, y=self.end_point.y,
Expand All @@ -204,7 +253,7 @@ async def _run_following_row(self, distance: float) -> State:
await self.implement.activate()
self.update_target()
await super()._drive(distance)
return State.FOLLOWING_ROW
return State.FOLLOW_ROW

async def _run_row_completed(self) -> State:
await self.driver.wheels.stop()
Expand All @@ -213,7 +262,7 @@ async def _run_row_completed(self) -> State:
if not self._loop and self.current_row == self.rows_to_work_on[-1]:
return State.FIELD_COMPLETED
self.row_index += 1
next_state = State.APPROACHING_ROW_START
next_state = State.CHANGE_ROW

# TODO: rework later, when starting at any row is possible
if self.row_index >= len(self.rows_to_work_on):
Expand All @@ -223,6 +272,29 @@ async def _run_row_completed(self) -> State:
next_state = State.FIELD_COMPLETED
return next_state

def _is_in_working_area(self, start_point: Point, end_point: Point) -> bool:
# TODO: check if in working rectangle, current just checks if between start and stop
relative_start = self.odometer.prediction.relative_point(start_point)
relative_end = self.odometer.prediction.relative_point(end_point)
robot_in_working_area = relative_start.x * relative_end.x <= 0
self.log.debug('Robot in working area: %s', robot_in_working_area)
return robot_in_working_area

def _is_start_allowed(self, start_point: Point, end_point: Point, robot_in_working_area: bool) -> bool:
if not robot_in_working_area:
return True
foot_point = self.current_row.line_segment().line.foot_point(self.odometer.prediction.point)
distance_to_row = foot_point.distance(self.odometer.prediction.point)
if distance_to_row > self.MAX_DISTANCE_DEVIATION:
rosys.notify('Between two rows', 'negative')
return False
abs_angle_to_start = abs(self.odometer.prediction.relative_direction(start_point))
abs_angle_to_end = abs(self.odometer.prediction.relative_direction(end_point))
if abs_angle_to_start > self.MAX_ANGLE_DEVIATION and abs_angle_to_end > self.MAX_ANGLE_DEVIATION:
rosys.notify('Robot heading deviates too much from row direction', 'negative')
return False
return True

def backup(self) -> dict:
return super().backup() | {
'field_id': self.field.id if self.field else None,
Expand All @@ -239,7 +311,7 @@ def restore(self, data: dict[str, Any]) -> None:
field_id = data.get('field_id', self.field_provider.fields[0].id if self.field_provider.fields else None)
self.field = self.field_provider.get_field(field_id)
self.row_index = data.get('row_index', 0)
self._state = State[data.get('state', State.APPROACHING_ROW_START.name)]
self._state = State[data.get('state', State.APPROACH_START_ROW.name)]
self._loop = data.get('loop', False)
self._drive_step = data.get('drive_step', self.DRIVE_STEP)
self._turn_step = data.get('turn_step', self.TURN_STEP)
Expand Down
Loading

0 comments on commit 3409ff2

Please sign in to comment.