-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
capable of driving routes consisting of a few locations
- Loading branch information
Showing
2 changed files
with
185 additions
and
4 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,149 @@ | ||
#!/usr/bin/env python | ||
|
||
# Modified to produce routes with multiple high level waypoints | ||
|
||
# Copyright (c) 2018 Intel Labs. | ||
# authors: German Ros ([email protected]) | ||
# | ||
# This work is licensed under the terms of the MIT license. | ||
# For a copy, see <https://opensource.org/licenses/MIT>. | ||
|
||
""" This module implements an agent that roams around a track following random | ||
waypoints and avoiding other vehicles. | ||
The agent also responds to traffic lights. """ | ||
|
||
import sys | ||
|
||
sys.path.append('../carla') | ||
import carla | ||
from agents.navigation.agent import Agent, AgentState | ||
from agents.navigation.local_planner import LocalPlanner | ||
from agents.navigation.global_route_planner import GlobalRoutePlanner | ||
from agents.navigation.global_route_planner_dao import GlobalRoutePlannerDAO | ||
|
||
class BasicAgent(Agent): | ||
""" | ||
BasicAgent implements a basic agent that navigates scenes to reach a given | ||
target destination. This agent respects traffic lights and other vehicles. | ||
""" | ||
|
||
def __init__(self, vehicle, target_speed=20): | ||
""" | ||
:param vehicle: actor to apply to local planner logic onto | ||
""" | ||
super(BasicAgent, self).__init__(vehicle) | ||
|
||
self._proximity_threshold = 10.0 # meters | ||
self._state = AgentState.NAVIGATING | ||
args_lateral_dict = { | ||
'K_P': 1, | ||
'K_D': 0.02, | ||
'K_I': 0, | ||
'dt': 1.0/20.0} | ||
self._local_planner = LocalPlanner( | ||
self._vehicle, opt_dict={'target_speed' : target_speed, | ||
'lateral_control_dict':args_lateral_dict}) | ||
self._hop_resolution = 2.0 | ||
self._path_seperation_hop = 2 | ||
self._path_seperation_threshold = 0.5 | ||
self._target_speed = target_speed | ||
self._grp = None | ||
|
||
def set_destination_list(self, locations): | ||
""" | ||
*** Modified *** | ||
This method creates a list of waypoints from agent's position through multiple | ||
destination locations based on the route returned by the global router | ||
""" | ||
|
||
curr_waypoint = self._map.get_waypoint(self._vehicle.get_location()) | ||
|
||
# cycle through desired locations and append waypoints for each | ||
full_route_trace = [] | ||
for location in locations: | ||
next_waypoint = self._map.get_waypoint( | ||
carla.Location(location[0], location[1], location[2])) | ||
seg_trace = self._trace_route(curr_waypoint, next_waypoint) | ||
assert seg_trace | ||
full_route_trace += seg_trace | ||
curr_waypoint = next_waypoint | ||
|
||
self._local_planner.set_global_plan(full_route_trace) | ||
|
||
def set_destination(self, location): | ||
""" | ||
This method creates a list of waypoints from agent's position to destination location | ||
based on the route returned by the global router | ||
""" | ||
|
||
start_waypoint = self._map.get_waypoint(self._vehicle.get_location()) | ||
end_waypoint = self._map.get_waypoint( | ||
carla.Location(location[0], location[1], location[2])) | ||
|
||
route_trace = self._trace_route(start_waypoint, end_waypoint) | ||
assert route_trace | ||
|
||
self._local_planner.set_global_plan(route_trace) | ||
|
||
def _trace_route(self, start_waypoint, end_waypoint): | ||
""" | ||
This method sets up a global router and returns the optimal route | ||
from start_waypoint to end_waypoint | ||
""" | ||
|
||
# Setting up global router | ||
if self._grp is None: | ||
dao = GlobalRoutePlannerDAO(self._vehicle.get_world().get_map(), self._hop_resolution) | ||
grp = GlobalRoutePlanner(dao) | ||
grp.setup() | ||
self._grp = grp | ||
|
||
# Obtain route plan | ||
route = self._grp.trace_route( | ||
start_waypoint.transform.location, | ||
end_waypoint.transform.location) | ||
|
||
return route | ||
|
||
def run_step(self, debug=False): | ||
""" | ||
Execute one step of navigation. | ||
:return: carla.VehicleControl | ||
""" | ||
|
||
# is there an obstacle in front of us? | ||
hazard_detected = False | ||
|
||
# retrieve relevant elements for safe navigation, i.e.: traffic lights | ||
# and other vehicles | ||
actor_list = self._world.get_actors() | ||
vehicle_list = actor_list.filter("*vehicle*") | ||
lights_list = actor_list.filter("*traffic_light*") | ||
|
||
# check possible obstacles | ||
vehicle_state, vehicle = self._is_vehicle_hazard(vehicle_list) | ||
if vehicle_state: | ||
if debug: | ||
print('!!! VEHICLE BLOCKING AHEAD [{}])'.format(vehicle.id)) | ||
|
||
self._state = AgentState.BLOCKED_BY_VEHICLE | ||
hazard_detected = True | ||
|
||
# check for the state of the traffic lights | ||
light_state, traffic_light = self._is_light_red(lights_list) | ||
if light_state: | ||
if debug: | ||
print('=== RED LIGHT AHEAD [{}])'.format(traffic_light.id)) | ||
|
||
self._state = AgentState.BLOCKED_RED_LIGHT | ||
hazard_detected = True | ||
|
||
if hazard_detected: | ||
control = self.emergency_stop() | ||
else: | ||
self._state = AgentState.NAVIGATING | ||
# standard local planner behavior | ||
control = self._local_planner.run_step(debug=debug) | ||
|
||
return control |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters