diff --git a/PythonAPI/examples/manual_control_with_traffic.py b/PythonAPI/examples/manual_control_with_traffic.py deleted file mode 100644 index 3421401d07..0000000000 --- a/PythonAPI/examples/manual_control_with_traffic.py +++ /dev/null @@ -1,1628 +0,0 @@ -#!/usr/bin/env python - -# Copyright (c) 2019 Computer Vision Center (CVC) at the Universitat Autonoma de -# Barcelona (UAB). -# -# This work is licensed under the terms of the MIT license. -# For a copy, see . - -"""Allows controlling a vehicle with a keyboard, generating traffic in the simulation""" - -""" -Welcome to CARLA manual control. - -Use ARROWS or WASD keys for control. - - W : throttle - S : brake - A/D : steer left/right - Q : toggle reverse - Space : hand-brake - P : toggle autopilot - M : toggle manual transmission - ,/. : gear up/down - CTRL + W : toggle constant velocity mode at 60 km/h - - L : toggle next light type - SHIFT + L : toggle high beam - Z/X : toggle right/left blinker - I : toggle interior light - - TAB : change sensor position - ` or N : next sensor - [1-9] : change to sensor [1-9] - G : toggle radar visualization - C : change weather (Shift+C reverse) - Backspace : change vehicle - - O : open/close all doors of vehicle - T : toggle vehicle's telemetry - - V : Select next map layer (Shift+V reverse) - B : Load current selected map layer (Shift+B to unload) - - R : toggle recording images to disk - - CTRL + R : toggle recording of simulation (replacing any previous) - CTRL + P : start replaying last recorded simulation - CTRL + + : increments the start time of the replay by 1 second (+SHIFT = 10 seconds) - CTRL + - : decrements the start time of the replay by 1 second (+SHIFT = 10 seconds) - - F1 : toggle HUD - H/? : toggle help - ESC : quit -""" - -# ============================================================================== -# -- find carla module --------------------------------------------------------- -# ============================================================================== - -# ============================================================================== -# -- imports ------------------------------------------------------------------- -# ============================================================================== - -import carla - -from carla import ColorConverter as cc - -import argparse -import collections -import datetime -import logging -import math -import random -import re -import os -import weakref - -try: - import pygame - from pygame.locals import KMOD_CTRL - from pygame.locals import KMOD_SHIFT - from pygame.locals import K_0 - from pygame.locals import K_9 - from pygame.locals import K_BACKQUOTE - from pygame.locals import K_BACKSPACE - from pygame.locals import K_COMMA - from pygame.locals import K_DOWN - from pygame.locals import K_ESCAPE - from pygame.locals import K_F1 - from pygame.locals import K_LEFT - from pygame.locals import K_PERIOD - from pygame.locals import K_RIGHT - from pygame.locals import K_SLASH - from pygame.locals import K_SPACE - from pygame.locals import K_TAB - from pygame.locals import K_UP - from pygame.locals import K_a - from pygame.locals import K_b - from pygame.locals import K_c - from pygame.locals import K_d - from pygame.locals import K_f - from pygame.locals import K_g - from pygame.locals import K_h - from pygame.locals import K_i - from pygame.locals import K_l - from pygame.locals import K_m - from pygame.locals import K_n - from pygame.locals import K_o - from pygame.locals import K_p - from pygame.locals import K_q - from pygame.locals import K_r - from pygame.locals import K_s - from pygame.locals import K_t - from pygame.locals import K_v - from pygame.locals import K_w - from pygame.locals import K_x - from pygame.locals import K_z - from pygame.locals import K_MINUS - from pygame.locals import K_EQUALS -except ImportError: - raise RuntimeError('cannot import pygame, make sure pygame package is installed') - -try: - import numpy as np -except ImportError: - raise RuntimeError('cannot import numpy, make sure numpy package is installed') - -OBJECT_TO_COLOR = [ - (255, 255, 255), - (128, 64, 128), - (244, 35, 232), - (70, 70, 70), - (102, 102, 156), - (190, 153, 153), - (153, 153, 153), - (250, 170, 30), - (220, 220, 0), - (107, 142, 35), - (152, 251, 152), - (70, 130, 180), - (220, 20, 60), - (255, 0, 0), - (0, 0, 142), - (0, 0, 70), - (0, 60, 100), - (0, 80, 100), - (0, 0, 230), - (119, 11, 32), - (110, 190, 160), - (170, 120, 50), - (55, 90, 80), - (45, 60, 150), - (157, 234, 50), - (81, 0, 81), - (150, 100, 100), - (230, 150, 140), - (180, 165, 180), -] - -# ============================================================================== -# -- Global functions ---------------------------------------------------------- -# ============================================================================== - - -def find_weather_presets(): - rgx = re.compile('.+?(?:(?<=[a-z])(?=[A-Z])|(?<=[A-Z])(?=[A-Z][a-z])|$)') - name = lambda x: ' '.join(m.group(0) for m in rgx.finditer(x)) - presets = [x for x in dir(carla.WeatherParameters) if re.match('[A-Z].+', x)] - return [(getattr(carla.WeatherParameters, x), name(x)) for x in presets] - - -def get_actor_display_name(actor, truncate=250): - name = ' '.join(actor.type_id.replace('_', '.').title().split('.')[1:]) - return (name[:truncate - 1] + u'\u2026') if len(name) > truncate else name - -def get_actor_blueprints(world, filter, generation): - bps = world.get_blueprint_library().filter(filter) - - if generation.lower() == "all": - return bps - - # If the filter returns only one bp, we assume that this one needed - # and therefore, we ignore the generation - if len(bps) == 1: - return bps - - try: - int_generation = int(generation) - # Check if generation is in available generations - if int_generation in [1, 2, 3, 4]: - bps = [x for x in bps if int(x.get_attribute('generation')) == int_generation] - return bps - else: - print(" Warning! Actor Generation is not valid. No actor will be spawned.") - return [] - except: - print(" Warning! Actor Generation is not valid. No actor will be spawned.") - return [] - - -# ============================================================================== -# -- World --------------------------------------------------------------------- -# ============================================================================== - - -class World(object): - def __init__(self, carla_world, hud, traffic_manager, args): - self.world = carla_world - self.args = args - self.traffic_manager = traffic_manager - self.sync = args.sync - self.actor_role_name = args.rolename - try: - self.map = self.world.get_map() - except RuntimeError as error: - print('RuntimeError: {}'.format(error)) - print(' The server could not send the OpenDRIVE (.xodr) file:') - print(' Make sure it exists, has the same name of your town, and is correct.') - sys.exit(1) - self.hud = hud - self.player = None - self.collision_sensor = None - self.lane_invasion_sensor = None - self.gnss_sensor = None - self.imu_sensor = None - self.radar_sensor = None - self.camera_manager = None - self._weather_presets = find_weather_presets() - self._weather_index = 0 - self._actor_filter = args.filter - self._actor_generation = args.generation - self._gamma = args.gamma - self.restart() - self.world.on_tick(hud.on_world_tick) - self.recording_enabled = False - self.recording_start = 0 - self.constant_velocity_enabled = False - self.show_vehicle_telemetry = False - self.doors_are_open = False - self.current_map_layer = 0 - self.map_layer_names = [ - carla.MapLayer.NONE, - carla.MapLayer.Buildings, - carla.MapLayer.Decals, - carla.MapLayer.Foliage, - carla.MapLayer.Ground, - carla.MapLayer.ParkedVehicles, - carla.MapLayer.Particles, - carla.MapLayer.Props, - carla.MapLayer.StreetLights, - carla.MapLayer.Walls, - carla.MapLayer.All - ] - - def restart(self): - self.player_max_speed = 1.589 - self.player_max_speed_fast = 3.713 - # Keep same camera config if the camera manager exists. - cam_index = self.camera_manager.index if self.camera_manager is not None else 0 - cam_pos_index = self.camera_manager.transform_index if self.camera_manager is not None else 0 - # Get a random blueprint. - blueprint_list = get_actor_blueprints(self.world, self._actor_filter, self._actor_generation) - if self.args.safe: - blueprint_list = [x for x in blueprint_list if x.get_attribute('base_type') == 'car'] - if not blueprint_list: - raise ValueError("Couldn't find any blueprints with the specified filters") - blueprint = random.choice(blueprint_list) - blueprint.set_attribute('role_name', self.actor_role_name) - if blueprint.has_attribute('terramechanics'): - blueprint.set_attribute('terramechanics', 'true') - if blueprint.has_attribute('color'): - color = random.choice(blueprint.get_attribute('color').recommended_values) - blueprint.set_attribute('color', color) - if blueprint.has_attribute('driver_id'): - driver_id = random.choice(blueprint.get_attribute('driver_id').recommended_values) - blueprint.set_attribute('driver_id', driver_id) - if blueprint.has_attribute('is_invincible'): - blueprint.set_attribute('is_invincible', 'true') - # set the max speed - if blueprint.has_attribute('speed'): - self.player_max_speed = float(blueprint.get_attribute('speed').recommended_values[1]) - self.player_max_speed_fast = float(blueprint.get_attribute('speed').recommended_values[2]) - - # Spawn the player. - if self.player is not None: - spawn_point = self.player.get_transform() - spawn_point.location.z += 2.0 - spawn_point.rotation.roll = 0.0 - spawn_point.rotation.pitch = 0.0 - self.destroy() - self.player = self.world.try_spawn_actor(blueprint, spawn_point) - self.show_vehicle_telemetry = False - self.modify_vehicle_physics(self.player) - while self.player is None: - if not self.map.get_spawn_points(): - print('There are no spawn points available in your map/town.') - print('Please add some Vehicle Spawn Point to your UE5 scene.') - sys.exit(1) - spawn_points = self.map.get_spawn_points() - spawn_point = random.choice(spawn_points) if spawn_points else carla.Transform() - self.player = self.world.try_spawn_actor(blueprint, spawn_point) - self.show_vehicle_telemetry = False - self.modify_vehicle_physics(self.player) - # Set up the sensors. - self.collision_sensor = CollisionSensor(self.player, self.hud) - self.lane_invasion_sensor = LaneInvasionSensor(self.player, self.hud) - self.gnss_sensor = GnssSensor(self.player) - self.imu_sensor = IMUSensor(self.player) - self.camera_manager = CameraManager(self.player, self.hud, self._gamma) - self.camera_manager.transform_index = cam_pos_index - self.camera_manager.set_sensor(cam_index, notify=False) - actor_type = get_actor_display_name(self.player) - self.hud.notification(actor_type) - self.traffic_manager.update_vehicle_lights(self.player, True) - - if self.sync: - self.world.tick() - else: - self.world.wait_for_tick() - - def next_weather(self, reverse=False): - self._weather_index += -1 if reverse else 1 - self._weather_index %= len(self._weather_presets) - preset = self._weather_presets[self._weather_index] - self.hud.notification('Weather: %s' % preset[1]) - self.player.get_world().set_weather(preset[0]) - - def next_map_layer(self, reverse=False): - self.current_map_layer += -1 if reverse else 1 - self.current_map_layer %= len(self.map_layer_names) - selected = self.map_layer_names[self.current_map_layer] - self.hud.notification('LayerMap selected: %s' % selected) - - def load_map_layer(self, unload=False): - selected = self.map_layer_names[self.current_map_layer] - if unload: - self.hud.notification('Unloading map layer: %s' % selected) - self.world.unload_map_layer(selected) - else: - self.hud.notification('Loading map layer: %s' % selected) - self.world.load_map_layer(selected) - - def toggle_radar(self): - if self.radar_sensor is None: - self.radar_sensor = RadarSensor(self.player) - elif self.radar_sensor.sensor is not None: - self.radar_sensor.sensor.destroy() - self.radar_sensor = None - - def modify_vehicle_physics(self, actor): - #If actor is not a vehicle, we cannot use the physics control - try: - physics_control = actor.get_physics_control() - physics_control.use_sweep_wheel_collision = True - actor.apply_physics_control(physics_control) - except Exception: - pass - - def tick(self, clock): - self.hud.tick(self, clock) - - def render(self, display): - self.camera_manager.render(display) - self.hud.render(display) - - def destroy_sensors(self): - self.camera_manager.sensor.destroy() - self.camera_manager.sensor = None - self.camera_manager.index = None - - def destroy(self): - if self.radar_sensor is not None: - self.toggle_radar() - sensors = [ - self.camera_manager.sensor, - self.collision_sensor.sensor, - self.lane_invasion_sensor.sensor, - self.gnss_sensor.sensor, - self.imu_sensor.sensor] - for sensor in sensors: - if sensor is not None: - sensor.stop() - sensor.destroy() - if self.player is not None: - self.player.destroy() - - -# ============================================================================== -# -- KeyboardControl ----------------------------------------------------------- -# ============================================================================== - - -class KeyboardControl(object): - """Class that handles keyboard input.""" - def __init__(self, world, start_in_autopilot): - self._autopilot_enabled = start_in_autopilot - self._ackermann_enabled = False - self._ackermann_reverse = 1 - if isinstance(world.player, carla.Vehicle): - self._control = carla.VehicleControl() - self._ackermann_control = carla.VehicleAckermannControl() - self._lights = carla.VehicleLightState.NONE - world.player.set_autopilot(self._autopilot_enabled) - world.player.set_light_state(self._lights) - elif isinstance(world.player, carla.Walker): - self._control = carla.WalkerControl() - self._autopilot_enabled = False - self._rotation = world.player.get_transform().rotation - else: - raise NotImplementedError("Actor type not supported") - self._steer_cache = 0.0 - world.hud.notification("Press 'H' or '?' for help.", seconds=4.0) - - def parse_events(self, client, world, clock, sync_mode): - if isinstance(self._control, carla.VehicleControl): - current_lights = self._lights - for event in pygame.event.get(): - if event.type == pygame.QUIT: - return True - elif event.type == pygame.KEYUP: - if self._is_quit_shortcut(event.key): - return True - elif event.key == K_BACKSPACE: - if self._autopilot_enabled: - world.player.set_autopilot(False) - world.restart() - world.player.set_autopilot(True) - else: - world.restart() - elif event.key == K_F1: - world.hud.toggle_info() - elif event.key == K_v and pygame.key.get_mods() & KMOD_SHIFT: - world.next_map_layer(reverse=True) - elif event.key == K_v: - world.next_map_layer() - elif event.key == K_b and pygame.key.get_mods() & KMOD_SHIFT: - world.load_map_layer(unload=True) - elif event.key == K_b: - world.load_map_layer() - elif event.key == K_h or (event.key == K_SLASH and pygame.key.get_mods() & KMOD_SHIFT): - world.hud.help.toggle() - elif event.key == K_TAB: - world.camera_manager.toggle_camera() - elif event.key == K_c and pygame.key.get_mods() & KMOD_SHIFT: - world.next_weather(reverse=True) - elif event.key == K_c: - world.next_weather() - elif event.key == K_g: - world.toggle_radar() - elif event.key == K_BACKQUOTE: - world.camera_manager.next_sensor() - elif event.key == K_n: - world.camera_manager.next_sensor() - elif event.key == K_w and (pygame.key.get_mods() & KMOD_CTRL): - if world.constant_velocity_enabled: - world.player.disable_constant_velocity() - world.constant_velocity_enabled = False - world.hud.notification("Disabled Constant Velocity Mode") - else: - world.player.enable_constant_velocity(carla.Vector3D(17, 0, 0)) - world.constant_velocity_enabled = True - world.hud.notification("Enabled Constant Velocity Mode at 60 km/h") - elif event.key == K_o: - try: - if world.doors_are_open: - world.hud.notification("Closing Doors") - world.doors_are_open = False - world.player.close_door(carla.VehicleDoor.All) - else: - world.hud.notification("Opening doors") - world.doors_are_open = True - world.player.open_door(carla.VehicleDoor.All) - except Exception: - pass - elif event.key == K_t: - if world.show_vehicle_telemetry: - world.player.show_debug_telemetry(False) - world.show_vehicle_telemetry = False - world.hud.notification("Disabled Vehicle Telemetry") - else: - try: - world.player.show_debug_telemetry(True) - world.show_vehicle_telemetry = True - world.hud.notification("Enabled Vehicle Telemetry") - except Exception: - pass - elif event.key > K_0 and event.key <= K_9: - index_ctrl = 0 - if pygame.key.get_mods() & KMOD_CTRL: - index_ctrl = 9 - world.camera_manager.set_sensor(event.key - 1 - K_0 + index_ctrl) - elif event.key == K_r and not (pygame.key.get_mods() & KMOD_CTRL): - world.camera_manager.toggle_recording() - elif event.key == K_r and (pygame.key.get_mods() & KMOD_CTRL): - if (world.recording_enabled): - client.stop_recorder() - world.recording_enabled = False - world.hud.notification("Recorder is OFF") - else: - client.start_recorder("manual_recording.rec") - world.recording_enabled = True - world.hud.notification("Recorder is ON") - elif event.key == K_p and (pygame.key.get_mods() & KMOD_CTRL): - # stop recorder - client.stop_recorder() - world.recording_enabled = False - # work around to fix camera at start of replaying - current_index = world.camera_manager.index - world.destroy_sensors() - # disable autopilot - self._autopilot_enabled = False - world.player.set_autopilot(self._autopilot_enabled) - world.hud.notification("Replaying file 'manual_recording.rec'") - # replayer - client.replay_file("manual_recording.rec", world.recording_start, 0, 0) - world.camera_manager.set_sensor(current_index) - elif event.key == K_MINUS and (pygame.key.get_mods() & KMOD_CTRL): - if pygame.key.get_mods() & KMOD_SHIFT: - world.recording_start -= 10 - else: - world.recording_start -= 1 - world.hud.notification("Recording start time is %d" % (world.recording_start)) - elif event.key == K_EQUALS and (pygame.key.get_mods() & KMOD_CTRL): - if pygame.key.get_mods() & KMOD_SHIFT: - world.recording_start += 10 - else: - world.recording_start += 1 - world.hud.notification("Recording start time is %d" % (world.recording_start)) - if isinstance(self._control, carla.VehicleControl): - if event.key == K_f: - # Toggle ackermann controller - self._ackermann_enabled = not self._ackermann_enabled - world.hud.show_ackermann_info(self._ackermann_enabled) - world.hud.notification("Ackermann Controller %s" % - ("Enabled" if self._ackermann_enabled else "Disabled")) - if event.key == K_q: - if not self._ackermann_enabled: - self._control.gear = 1 if self._control.reverse else -1 - else: - self._ackermann_reverse *= -1 - # Reset ackermann control - self._ackermann_control = carla.VehicleAckermannControl() - elif event.key == K_m: - self._control.manual_gear_shift = not self._control.manual_gear_shift - self._control.gear = world.player.get_control().gear - world.hud.notification('%s Transmission' % - ('Manual' if self._control.manual_gear_shift else 'Automatic')) - elif self._control.manual_gear_shift and event.key == K_COMMA: - self._control.gear = max(-1, self._control.gear - 1) - elif self._control.manual_gear_shift and event.key == K_PERIOD: - self._control.gear = self._control.gear + 1 - elif event.key == K_p and not pygame.key.get_mods() & KMOD_CTRL: - if not self._autopilot_enabled and not sync_mode: - print("WARNING: You are currently in asynchronous mode and could " - "experience some issues with the traffic simulation") - self._autopilot_enabled = not self._autopilot_enabled - world.player.set_autopilot(self._autopilot_enabled) - world.hud.notification( - 'Autopilot %s' % ('On' if self._autopilot_enabled else 'Off')) - elif event.key == K_l and pygame.key.get_mods() & KMOD_CTRL: - current_lights ^= carla.VehicleLightState.Special1 - elif event.key == K_l and pygame.key.get_mods() & KMOD_SHIFT: - current_lights ^= carla.VehicleLightState.HighBeam - elif event.key == K_l: - # Use 'L' key to switch between lights: - # closed -> position -> low beam -> fog - if not self._lights & carla.VehicleLightState.Position: - world.hud.notification("Position lights") - current_lights |= carla.VehicleLightState.Position - else: - world.hud.notification("Low beam lights") - current_lights |= carla.VehicleLightState.LowBeam - if self._lights & carla.VehicleLightState.LowBeam: - world.hud.notification("Fog lights") - current_lights |= carla.VehicleLightState.Fog - if self._lights & carla.VehicleLightState.Fog: - world.hud.notification("Lights off") - current_lights ^= carla.VehicleLightState.Position - current_lights ^= carla.VehicleLightState.LowBeam - current_lights ^= carla.VehicleLightState.Fog - elif event.key == K_i: - current_lights ^= carla.VehicleLightState.Interior - elif event.key == K_z: - current_lights ^= carla.VehicleLightState.LeftBlinker - elif event.key == K_x: - current_lights ^= carla.VehicleLightState.RightBlinker - - if not self._autopilot_enabled: - if isinstance(self._control, carla.VehicleControl): - self._parse_vehicle_keys(pygame.key.get_pressed(), clock.get_time()) - self._control.reverse = self._control.gear < 0 - # Set automatic control-related vehicle lights - if self._control.brake: - current_lights |= carla.VehicleLightState.Brake - else: # Remove the Brake flag - current_lights &= ~carla.VehicleLightState.Brake - if self._control.reverse: - current_lights |= carla.VehicleLightState.Reverse - else: # Remove the Reverse flag - current_lights &= ~carla.VehicleLightState.Reverse - if current_lights != self._lights: # Change the light state only if necessary - world.player.set_light_state(carla.VehicleLightState(current_lights)) - # Apply control - if not self._ackermann_enabled: - world.player.apply_control(self._control) - else: - world.player.apply_ackermann_control(self._ackermann_control) - # Update control to the last one applied by the ackermann controller. - self._control = world.player.get_control() - # Update hud with the newest ackermann control - world.hud.update_ackermann_control(self._ackermann_control) - - elif isinstance(self._control, carla.WalkerControl): - self._parse_walker_keys(pygame.key.get_pressed(), clock.get_time(), world) - world.player.apply_control(self._control) - - self._lights = current_lights - - def _parse_vehicle_keys(self, keys, milliseconds): - if keys[K_UP] or keys[K_w]: - if not self._ackermann_enabled: - self._control.throttle = min(self._control.throttle + 0.1, 1.00) - else: - self._ackermann_control.speed += round(milliseconds * 0.005, 2) * self._ackermann_reverse - else: - if not self._ackermann_enabled: - self._control.throttle = 0.0 - - if keys[K_DOWN] or keys[K_s]: - if not self._ackermann_enabled: - self._control.brake = min(self._control.brake + 0.2, 1) - else: - self._ackermann_control.speed -= min(abs(self._ackermann_control.speed), round(milliseconds * 0.005, 2)) * self._ackermann_reverse - self._ackermann_control.speed = max(0, abs(self._ackermann_control.speed)) * self._ackermann_reverse - else: - if not self._ackermann_enabled: - self._control.brake = 0 - - steer_increment = 5e-4 * milliseconds - if keys[K_LEFT] or keys[K_a]: - if self._steer_cache > 0: - self._steer_cache = 0 - else: - self._steer_cache -= steer_increment - elif keys[K_RIGHT] or keys[K_d]: - if self._steer_cache < 0: - self._steer_cache = 0 - else: - self._steer_cache += steer_increment - else: - self._steer_cache = 0.0 - self._steer_cache = min(0.7, max(-0.7, self._steer_cache)) - if not self._ackermann_enabled: - self._control.steer = round(self._steer_cache, 1) - self._control.hand_brake = keys[K_SPACE] - else: - self._ackermann_control.steer = round(self._steer_cache, 1) - - def _parse_walker_keys(self, keys, milliseconds, world): - self._control.speed = 0.0 - if keys[K_DOWN] or keys[K_s]: - self._control.speed = 0.0 - if keys[K_LEFT] or keys[K_a]: - self._control.speed = .01 - self._rotation.yaw -= 0.08 * milliseconds - if keys[K_RIGHT] or keys[K_d]: - self._control.speed = .01 - self._rotation.yaw += 0.08 * milliseconds - if keys[K_UP] or keys[K_w]: - self._control.speed = world.player_max_speed_fast if pygame.key.get_mods() & KMOD_SHIFT else world.player_max_speed - self._control.jump = keys[K_SPACE] - self._rotation.yaw = round(self._rotation.yaw, 1) - self._control.direction = self._rotation.get_forward_vector() - - @staticmethod - def _is_quit_shortcut(key): - return (key == K_ESCAPE) or (key == K_q and pygame.key.get_mods() & KMOD_CTRL) - - -# ============================================================================== -# -- HUD ----------------------------------------------------------------------- -# ============================================================================== - - -class HUD(object): - def __init__(self, width, height): - self.dim = (width, height) - font = pygame.font.Font(pygame.font.get_default_font(), 20) - font_name = 'courier' if os.name == 'nt' else 'mono' - fonts = [x for x in pygame.font.get_fonts() if font_name in x] - default_font = 'ubuntumono' - mono = default_font if default_font in fonts else fonts[0] - mono = pygame.font.match_font(mono) - self._font_mono = pygame.font.Font(mono, 12 if os.name == 'nt' else 14) - self._notifications = FadingText(font, (width, 40), (0, height - 40)) - self.help = HelpText(pygame.font.Font(mono, 16), width, height) - self.server_fps = 0 - self.frame = 0 - self.simulation_time = 0 - self._show_info = True - self._info_text = [] - self._server_clock = pygame.time.Clock() - - self._show_ackermann_info = False - self._ackermann_control = carla.VehicleAckermannControl() - - def on_world_tick(self, timestamp): - self._server_clock.tick() - self.server_fps = self._server_clock.get_fps() - self.frame = timestamp.frame - self.simulation_time = timestamp.elapsed_seconds - - def tick(self, world, clock): - self._notifications.tick(world, clock) - if not self._show_info: - return - t = world.player.get_transform() - v = world.player.get_velocity() - c = world.player.get_control() - compass = world.imu_sensor.compass - heading = 'N' if compass > 270.5 or compass < 89.5 else '' - heading += 'S' if 90.5 < compass < 269.5 else '' - heading += 'E' if 0.5 < compass < 179.5 else '' - heading += 'W' if 180.5 < compass < 359.5 else '' - colhist = world.collision_sensor.get_collision_history() - collision = [colhist[x + self.frame - 200] for x in range(0, 200)] - max_col = max(1.0, max(collision)) - collision = [x / max_col for x in collision] - vehicles = world.world.get_actors().filter('vehicle.*') - self._info_text = [ - 'Server: % 16.0f FPS' % self.server_fps, - 'Client: % 16.0f FPS' % clock.get_fps(), - '', - 'Vehicle: % 20s' % get_actor_display_name(world.player, truncate=20), - 'Map: % 20s' % world.map.name.split('/')[-1], - 'Simulation time: % 12s' % datetime.timedelta(seconds=int(self.simulation_time)), - '', - 'Speed: % 15.0f km/h' % (3.6 * math.sqrt(v.x**2 + v.y**2 + v.z**2)), - u'Compass:% 17.0f\N{DEGREE SIGN} % 2s' % (compass, heading), - 'Accelero: (%5.1f,%5.1f,%5.1f)' % (world.imu_sensor.accelerometer), - 'Gyroscop: (%5.1f,%5.1f,%5.1f)' % (world.imu_sensor.gyroscope), - 'Location:% 20s' % ('(% 5.1f, % 5.1f)' % (t.location.x, t.location.y)), - 'GNSS:% 24s' % ('(% 2.6f, % 3.6f)' % (world.gnss_sensor.lat, world.gnss_sensor.lon)), - 'Height: % 18.0f m' % t.location.z, - ''] - if isinstance(c, carla.VehicleControl): - self._info_text += [ - ('Throttle:', c.throttle, 0.0, 1.0), - ('Steer:', c.steer, -1.0, 1.0), - ('Brake:', c.brake, 0.0, 1.0), - ('Reverse:', c.reverse), - ('Hand brake:', c.hand_brake), - ('Manual:', c.manual_gear_shift), - 'Gear: %s' % {-1: 'R', 0: 'N'}.get(c.gear, c.gear)] - if self._show_ackermann_info: - self._info_text += [ - '', - 'Ackermann Controller:', - ' Target speed: % 8.0f km/h' % (3.6*self._ackermann_control.speed), - ] - elif isinstance(c, carla.WalkerControl): - self._info_text += [ - ('Speed:', c.speed, 0.0, 5.556), - ('Jump:', c.jump)] - self._info_text += [ - '', - 'Collision:', - collision, - '', - 'Number of vehicles: % 8d' % len(vehicles)] - if len(vehicles) > 1: - self._info_text += ['Nearby vehicles:'] - distance = lambda l: math.sqrt((l.x - t.location.x)**2 + (l.y - t.location.y)**2 + (l.z - t.location.z)**2) - vehicles = [(distance(x.get_location()), x) for x in vehicles if x.id != world.player.id] - for d, vehicle in sorted(vehicles, key=lambda vehicles: vehicles[0]): - if d > 200.0: - break - vehicle_type = get_actor_display_name(vehicle, truncate=22) - self._info_text.append('% 4dm %s' % (d, vehicle_type)) - - def show_ackermann_info(self, enabled): - self._show_ackermann_info = enabled - - def update_ackermann_control(self, ackermann_control): - self._ackermann_control = ackermann_control - - def toggle_info(self): - self._show_info = not self._show_info - - def notification(self, text, seconds=2.0): - self._notifications.set_text(text, seconds=seconds) - - def error(self, text): - self._notifications.set_text('Error: %s' % text, (255, 0, 0)) - - def render(self, display): - if self._show_info: - info_surface = pygame.Surface((220, self.dim[1])) - info_surface.set_alpha(100) - display.blit(info_surface, (0, 0)) - v_offset = 4 - bar_h_offset = 100 - bar_width = 106 - for item in self._info_text: - if v_offset + 18 > self.dim[1]: - break - if isinstance(item, list): - if len(item) > 1: - points = [(x + 8, v_offset + 8 + (1.0 - y) * 30) for x, y in enumerate(item)] - pygame.draw.lines(display, (255, 136, 0), False, points, 2) - item = None - v_offset += 18 - elif isinstance(item, tuple): - if isinstance(item[1], bool): - rect = pygame.Rect((bar_h_offset, v_offset + 8), (6, 6)) - pygame.draw.rect(display, (255, 255, 255), rect, 0 if item[1] else 1) - else: - rect_border = pygame.Rect((bar_h_offset, v_offset + 8), (bar_width, 6)) - pygame.draw.rect(display, (255, 255, 255), rect_border, 1) - f = (item[1] - item[2]) / (item[3] - item[2]) - if item[2] < 0.0: - rect = pygame.Rect((bar_h_offset + f * (bar_width - 6), v_offset + 8), (6, 6)) - else: - rect = pygame.Rect((bar_h_offset, v_offset + 8), (f * bar_width, 6)) - pygame.draw.rect(display, (255, 255, 255), rect) - item = item[0] - if item: # At this point has to be a str. - surface = self._font_mono.render(item, True, (255, 255, 255)) - display.blit(surface, (8, v_offset)) - v_offset += 18 - self._notifications.render(display) - self.help.render(display) - - -# ============================================================================== -# -- FadingText ---------------------------------------------------------------- -# ============================================================================== - - -class FadingText(object): - def __init__(self, font, dim, pos): - self.font = font - self.dim = dim - self.pos = pos - self.seconds_left = 0 - self.surface = pygame.Surface(self.dim) - - def set_text(self, text, color=(255, 255, 255), seconds=2.0): - text_texture = self.font.render(text, True, color) - self.surface = pygame.Surface(self.dim) - self.seconds_left = seconds - self.surface.fill((0, 0, 0, 0)) - self.surface.blit(text_texture, (10, 11)) - - def tick(self, _, clock): - delta_seconds = 1e-3 * clock.get_time() - self.seconds_left = max(0.0, self.seconds_left - delta_seconds) - self.surface.set_alpha(500.0 * self.seconds_left) - - def render(self, display): - display.blit(self.surface, self.pos) - - -# ============================================================================== -# -- HelpText ------------------------------------------------------------------ -# ============================================================================== - - -class HelpText(object): - """Helper class to handle text output using pygame""" - def __init__(self, font, width, height): - lines = __doc__.split('\n') - self.font = font - self.line_space = 18 - self.dim = (780, len(lines) * self.line_space + 12) - self.pos = (0.5 * width - 0.5 * self.dim[0], 0.5 * height - 0.5 * self.dim[1]) - self.seconds_left = 0 - self.surface = pygame.Surface(self.dim) - self.surface.fill((0, 0, 0, 0)) - for n, line in enumerate(lines): - text_texture = self.font.render(line, True, (255, 255, 255)) - self.surface.blit(text_texture, (22, n * self.line_space)) - self._render = False - self.surface.set_alpha(220) - - def toggle(self): - self._render = not self._render - - def render(self, display): - if self._render: - display.blit(self.surface, self.pos) - - -# ============================================================================== -# -- CollisionSensor ----------------------------------------------------------- -# ============================================================================== - - -class CollisionSensor(object): - def __init__(self, parent_actor, hud): - self.sensor = None - self.history = [] - self._parent = parent_actor - self.hud = hud - world = self._parent.get_world() - bp = world.get_blueprint_library().find('sensor.other.collision') - self.sensor = world.spawn_actor(bp, carla.Transform(), attach_to=self._parent) - # We need to pass the lambda a weak reference to self to avoid circular - # reference. - weak_self = weakref.ref(self) - self.sensor.listen(lambda event: CollisionSensor._on_collision(weak_self, event)) - - def get_collision_history(self): - history = collections.defaultdict(int) - for frame, intensity in self.history: - history[frame] += intensity - return history - - @staticmethod - def _on_collision(weak_self, event): - self = weak_self() - if not self: - return - actor_type = get_actor_display_name(event.other_actor) - self.hud.notification('Collision with %r' % actor_type) - impulse = event.normal_impulse - intensity = math.sqrt(impulse.x**2 + impulse.y**2 + impulse.z**2) - self.history.append((event.frame, intensity)) - if len(self.history) > 4000: - self.history.pop(0) - - -# ============================================================================== -# -- LaneInvasionSensor -------------------------------------------------------- -# ============================================================================== - - -class LaneInvasionSensor(object): - def __init__(self, parent_actor, hud): - self.sensor = None - - # If the spawn object is not a vehicle, we cannot use the Lane Invasion Sensor - if parent_actor.type_id.startswith("vehicle."): - self._parent = parent_actor - self.hud = hud - world = self._parent.get_world() - bp = world.get_blueprint_library().find('sensor.other.lane_invasion') - self.sensor = world.spawn_actor(bp, carla.Transform(), attach_to=self._parent) - # We need to pass the lambda a weak reference to self to avoid circular - # reference. - weak_self = weakref.ref(self) - self.sensor.listen(lambda event: LaneInvasionSensor._on_invasion(weak_self, event)) - - @staticmethod - def _on_invasion(weak_self, event): - self = weak_self() - if not self: - return - lane_types = set(x.type for x in event.crossed_lane_markings) - text = ['%r' % str(x).split()[-1] for x in lane_types] - self.hud.notification('Crossed line %s' % ' and '.join(text)) - - -# ============================================================================== -# -- GnssSensor ---------------------------------------------------------------- -# ============================================================================== - - -class GnssSensor(object): - def __init__(self, parent_actor): - self.sensor = None - self._parent = parent_actor - self.lat = 0.0 - self.lon = 0.0 - world = self._parent.get_world() - bp = world.get_blueprint_library().find('sensor.other.gnss') - self.sensor = world.spawn_actor(bp, carla.Transform(carla.Location(x=1.0, z=2.8)), attach_to=self._parent) - # We need to pass the lambda a weak reference to self to avoid circular - # reference. - weak_self = weakref.ref(self) - self.sensor.listen(lambda event: GnssSensor._on_gnss_event(weak_self, event)) - - @staticmethod - def _on_gnss_event(weak_self, event): - self = weak_self() - if not self: - return - self.lat = event.latitude - self.lon = event.longitude - - -# ============================================================================== -# -- IMUSensor ----------------------------------------------------------------- -# ============================================================================== - - -class IMUSensor(object): - def __init__(self, parent_actor): - self.sensor = None - self._parent = parent_actor - self.accelerometer = (0.0, 0.0, 0.0) - self.gyroscope = (0.0, 0.0, 0.0) - self.compass = 0.0 - world = self._parent.get_world() - bp = world.get_blueprint_library().find('sensor.other.imu') - self.sensor = world.spawn_actor( - bp, carla.Transform(), attach_to=self._parent) - # We need to pass the lambda a weak reference to self to avoid circular - # reference. - weak_self = weakref.ref(self) - self.sensor.listen( - lambda sensor_data: IMUSensor._IMU_callback(weak_self, sensor_data)) - - @staticmethod - def _IMU_callback(weak_self, sensor_data): - self = weak_self() - if not self: - return - limits = (-99.9, 99.9) - self.accelerometer = ( - max(limits[0], min(limits[1], sensor_data.accelerometer.x)), - max(limits[0], min(limits[1], sensor_data.accelerometer.y)), - max(limits[0], min(limits[1], sensor_data.accelerometer.z))) - self.gyroscope = ( - max(limits[0], min(limits[1], math.degrees(sensor_data.gyroscope.x))), - max(limits[0], min(limits[1], math.degrees(sensor_data.gyroscope.y))), - max(limits[0], min(limits[1], math.degrees(sensor_data.gyroscope.z)))) - self.compass = math.degrees(sensor_data.compass) - - -# ============================================================================== -# -- RadarSensor --------------------------------------------------------------- -# ============================================================================== - - -class RadarSensor(object): - def __init__(self, parent_actor): - self.sensor = None - self._parent = parent_actor - bound_x = 0.5 + self._parent.bounding_box.extent.x - bound_y = 0.5 + self._parent.bounding_box.extent.y - bound_z = 0.5 + self._parent.bounding_box.extent.z - - self.velocity_range = 7.5 # m/s - world = self._parent.get_world() - self.debug = world.debug - bp = world.get_blueprint_library().find('sensor.other.radar') - bp.set_attribute('horizontal_fov', str(35)) - bp.set_attribute('vertical_fov', str(20)) - self.sensor = world.spawn_actor( - bp, - carla.Transform( - carla.Location(x=bound_x + 0.05, z=bound_z+0.05), - carla.Rotation(pitch=5)), - attach_to=self._parent) - # We need a weak reference to self to avoid circular reference. - weak_self = weakref.ref(self) - self.sensor.listen( - lambda radar_data: RadarSensor._Radar_callback(weak_self, radar_data)) - - @staticmethod - def _Radar_callback(weak_self, radar_data): - self = weak_self() - if not self: - return - # To get a numpy [[vel, altitude, azimuth, depth],...[,,,]]: - # points = np.frombuffer(radar_data.raw_data, dtype=np.dtype('f4')) - # points = np.reshape(points, (len(radar_data), 4)) - - current_rot = radar_data.transform.rotation - for detect in radar_data: - azi = math.degrees(detect.azimuth) - alt = math.degrees(detect.altitude) - # The 0.25 adjusts a bit the distance so the dots can - # be properly seen - fw_vec = carla.Vector3D(x=detect.depth - 0.25) - carla.Transform( - carla.Location(), - carla.Rotation( - pitch=current_rot.pitch + alt, - yaw=current_rot.yaw + azi, - roll=current_rot.roll)).transform(fw_vec) - - def clamp(min_v, max_v, value): - return max(min_v, min(value, max_v)) - - norm_velocity = detect.velocity / self.velocity_range # range [-1, 1] - r = int(clamp(0.0, 1.0, 1.0 - norm_velocity) * 255.0) - g = int(clamp(0.0, 1.0, 1.0 - abs(norm_velocity)) * 255.0) - b = int(abs(clamp(- 1.0, 0.0, - 1.0 - norm_velocity)) * 255.0) - self.debug.draw_point( - radar_data.transform.location + fw_vec, - size=0.075, - life_time=0.06, - persistent_lines=False, - color=carla.Color(r, g, b)) - -# ============================================================================== -# -- CameraManager ------------------------------------------------------------- -# ============================================================================== - - -class CameraManager(object): - def __init__(self, parent_actor, hud, gamma_correction): - self.sensor = None - self.surface = None - self._parent = parent_actor - self.hud = hud - self.recording = False - bound_x = 0.5 + self._parent.bounding_box.extent.x - bound_y = 0.5 + self._parent.bounding_box.extent.y - bound_z = 0.5 + self._parent.bounding_box.extent.z - Attachment = carla.AttachmentType - - if not self._parent.type_id.startswith("walker.pedestrian"): - self._camera_transforms = [ - (carla.Transform(carla.Location(x=-2.0*bound_x, y=+0.0*bound_y, z=2.0*bound_z), carla.Rotation(pitch=8.0)), Attachment.SpringArmGhost), - (carla.Transform(carla.Location(x=+0.8*bound_x, y=+0.0*bound_y, z=1.3*bound_z)), Attachment.Rigid), - (carla.Transform(carla.Location(x=+1.9*bound_x, y=+1.0*bound_y, z=1.2*bound_z)), Attachment.SpringArmGhost), - (carla.Transform(carla.Location(x=-2.8*bound_x, y=+0.0*bound_y, z=4.6*bound_z), carla.Rotation(pitch=6.0)), Attachment.SpringArmGhost), - (carla.Transform(carla.Location(x=-1.0, y=-1.0*bound_y, z=0.4*bound_z)), Attachment.Rigid)] - else: - self._camera_transforms = [ - (carla.Transform(carla.Location(x=-2.5, z=0.0), carla.Rotation(pitch=-8.0)), Attachment.SpringArmGhost), - (carla.Transform(carla.Location(x=1.6, z=1.7)), Attachment.Rigid), - (carla.Transform(carla.Location(x=2.5, y=0.5, z=0.0), carla.Rotation(pitch=-8.0)), Attachment.SpringArmGhost), - (carla.Transform(carla.Location(x=-4.0, z=2.0), carla.Rotation(pitch=6.0)), Attachment.SpringArmGhost), - (carla.Transform(carla.Location(x=0, y=-2.5, z=-0.0), carla.Rotation(yaw=90.0)), Attachment.Rigid)] - - self.transform_index = 1 - self.sensors = [ - ['sensor.camera.rgb', cc.Raw, 'Camera RGB', {}], - ['sensor.camera.depth', cc.Raw, 'Camera Depth (Raw)', {}], - ['sensor.camera.depth', cc.Depth, 'Camera Depth (Gray Scale)', {}], - ['sensor.camera.depth', cc.LogarithmicDepth, 'Camera Depth (Logarithmic Gray Scale)', {}], - ['sensor.camera.semantic_segmentation', cc.Raw, 'Camera Semantic Segmentation (Raw)', {}], - ['sensor.camera.semantic_segmentation', cc.CityScapesPalette, 'Camera Semantic Segmentation (CityScapes Palette)', {}], - ['sensor.camera.instance_segmentation', cc.Raw, 'Camera Instance Segmentation (Raw)', {}], - ['sensor.lidar.ray_cast', None, 'Lidar (Ray-Cast)', {'range': '50'}], - ['sensor.lidar.ray_cast_semantic', None, 'Semantic Lidar (Ray-Cast)', {'range': '50'}], - ['sensor.camera.rgb', cc.Raw, 'Camera RGB Distorted', - {'lens_circle_multiplier': '3.0', - 'lens_circle_falloff': '3.0', - 'chromatic_aberration_intensity': '0.5', - 'chromatic_aberration_offset': '0'}], - ['sensor.camera.optical_flow', cc.Raw, 'Optical Flow', {}], - ['sensor.camera.normals', cc.Raw, 'Camera Normals', {}], - ] - world = self._parent.get_world() - bp_library = world.get_blueprint_library() - for item in self.sensors: - bp = bp_library.find(item[0]) - if item[0].startswith('sensor.camera'): - bp.set_attribute('image_size_x', str(hud.dim[0])) - bp.set_attribute('image_size_y', str(hud.dim[1])) - if bp.has_attribute('gamma'): - bp.set_attribute('gamma', str(gamma_correction)) - for attr_name, attr_value in item[3].items(): - bp.set_attribute(attr_name, attr_value) - elif item[0].startswith('sensor.lidar'): - self.lidar_range = 50 - - for attr_name, attr_value in item[3].items(): - bp.set_attribute(attr_name, attr_value) - if attr_name == 'range': - self.lidar_range = float(attr_value) - - item.append(bp) - self.index = None - - def toggle_camera(self): - self.transform_index = (self.transform_index + 1) % len(self._camera_transforms) - self.set_sensor(self.index, notify=False, force_respawn=True) - - def set_sensor(self, index, notify=True, force_respawn=False): - index = index % len(self.sensors) - needs_respawn = True if self.index is None else \ - (force_respawn or (self.sensors[index][2] != self.sensors[self.index][2])) - if needs_respawn: - if self.sensor is not None: - self.sensor.destroy() - self.surface = None - self.sensor = self._parent.get_world().spawn_actor( - self.sensors[index][-1], - self._camera_transforms[self.transform_index][0], - attach_to=self._parent, - attachment_type=self._camera_transforms[self.transform_index][1]) - # We need to pass the lambda a weak reference to self to avoid - # circular reference. - weak_self = weakref.ref(self) - self.sensor.listen(lambda image: CameraManager._parse_image(weak_self, image)) - if notify: - self.hud.notification(self.sensors[index][2]) - self.index = index - - def next_sensor(self): - self.set_sensor(self.index + 1) - - def toggle_recording(self): - self.recording = not self.recording - self.hud.notification('Recording %s' % ('On' if self.recording else 'Off')) - - def render(self, display): - if self.surface is not None: - display.blit(self.surface, (0, 0)) - - @staticmethod - def _parse_image(weak_self, image): - self = weak_self() - if not self: - return - if self.sensors[self.index][0] == 'sensor.lidar.ray_cast': - points = np.frombuffer(image.raw_data, dtype=np.dtype('f4')) - points = np.reshape(points, (int(points.shape[0] / 4), 4)) - lidar_data = np.array(points[:, :2]) - lidar_data *= min(self.hud.dim) / (2.0 * self.lidar_range) - lidar_data += (0.5 * self.hud.dim[0], 0.5 * self.hud.dim[1]) - lidar_data = np.fabs(lidar_data) # pylint: disable=E1111 - lidar_data = lidar_data.astype(np.int32) - lidar_data = np.reshape(lidar_data, (-1, 2)) - lidar_img_size = (self.hud.dim[0], self.hud.dim[1], 3) - lidar_img = np.zeros((lidar_img_size), dtype=np.uint8) - lidar_img[tuple(lidar_data.T)] = (255, 255, 255) - self.surface = pygame.surfarray.make_surface(lidar_img) - elif self.sensors[self.index][0] == 'sensor.lidar.ray_cast_semantic': - points = np.frombuffer(image.raw_data, dtype=np.dtype('f4')) - points = np.reshape(points, (int(points.shape[0] / 6), 6)) - lidar_data = np.array(points[:, :2]) - lidar_data *= min(self.hud.dim) / (2.0 * self.lidar_range) - lidar_data += (0.5 * self.hud.dim[0], 0.5 * self.hud.dim[1]) - lidar_data = lidar_data.astype(np.int32) - lidar_data = np.reshape(lidar_data, (-1, 2)) - lidar_img_size = (self.hud.dim[0], self.hud.dim[1], 3) - lidar_img = np.zeros((lidar_img_size), dtype=np.uint8) - for i in range(len(image)): - point = lidar_data[i] - lidar_tag = image[i].object_tag - lidar_img[tuple(point.T)] = OBJECT_TO_COLOR[int(lidar_tag)] - self.surface = pygame.surfarray.make_surface(lidar_img) - elif self.sensors[self.index][0].startswith('sensor.camera.optical_flow'): - image = image.get_color_coded_flow() - array = np.frombuffer(image.raw_data, dtype=np.dtype("uint8")) - array = np.reshape(array, (image.height, image.width, 4)) - array = array[:, :, :3] - array = array[:, :, ::-1] - self.surface = pygame.surfarray.make_surface(array.swapaxes(0, 1)) - else: - image.convert(self.sensors[self.index][1]) - array = np.frombuffer(image.raw_data, dtype=np.dtype("uint8")) - array = np.reshape(array, (image.height, image.width, 4)) - array = array[:, :, :3] - array = array[:, :, ::-1] - self.surface = pygame.surfarray.make_surface(array.swapaxes(0, 1)) - if self.recording: - image.save_to_disk('_out/%08d' % image.frame) - - -class Traffic(object): - - def __init__(self, client, world, traffic_manager, args): - self.client = client - self.world = world - self.traffic_manager = traffic_manager - self.args = args - self.vehicle_ids = [] - self.vehicles = [] # USed to print "spawned X vehicles" - self.walkers = [] - self.walker_and_control_ids = [] # These include the controllers too - self.walkers_and_controls = [] - - if self.args.sync: - self.traffic_manager.set_synchronous_mode(True) - if self.args.respawn: - self.traffic_manager.set_respawn_dormant_vehicles(True) - if self.args.hybrid: - self.traffic_manager.set_hybrid_physics_mode(True) - self.traffic_manager.set_hybrid_physics_radius(70.0) - if self.args.seed is not None: - self.traffic_manager.set_random_device_seed(self.args.seed) - - self.traffic_manager.set_global_distance_to_leading_vehicle(2.5) - self.traffic_manager.global_percentage_speed_difference(0) - - if not args.sync: - print("You are currently in asynchronous mode, and traffic might experience some issues") - - self.spawn_vehicles() - self.spawn_pedestrians() - - print('Spawned %d vehicles and %d walkers, press Ctrl+C to exit.' % (len(self.vehicles), len(self.walkers))) - - def spawn_vehicles(self): - - if self.args.number_of_vehicles == 0: - return - - # Get the vehicle blueprints - blueprints_vehicles = get_actor_blueprints(self.world, self.args.filterv, self.args.generationv) - if not blueprints_vehicles: - raise ValueError("Couldn't find any vehicles with the specified filters") - if self.args.safe: - blueprints_vehicles = [x for x in blueprints_vehicles if x.get_attribute('base_type') == 'car'] - blueprints_vehicles = sorted(blueprints_vehicles, key=lambda bp: bp.id) - - # Get their spawn points - spawn_points = self.world.get_map().get_spawn_points() - number_of_spawn_points = len(spawn_points) - number_vehicles = self.args.number_of_vehicles - if number_vehicles < number_of_spawn_points: - random.shuffle(spawn_points) - elif self.args.number_of_vehicles > number_of_spawn_points: - print(f"Requested {number_vehicles} vehicles, but could only find {number_of_spawn_points} spawn points") - number_vehicles = number_of_spawn_points - - hero = self.args.hero - - SpawnActor = carla.command.SpawnActor - SetAutopilot = carla.command.SetAutopilot - FutureActor = carla.command.FutureActor - - batch = [] - for n, transform in enumerate(spawn_points): - if n >= self.args.number_of_vehicles: - break - blueprint = random.choice(blueprints_vehicles) - if blueprint.has_attribute('color'): - color = random.choice(blueprint.get_attribute('color').recommended_values) - blueprint.set_attribute('color', color) - if blueprint.has_attribute('driver_id'): - driver_id = random.choice(blueprint.get_attribute('driver_id').recommended_values) - blueprint.set_attribute('driver_id', driver_id) - if hero: - blueprint.set_attribute('role_name', 'hero') - hero = False - else: - blueprint.set_attribute('role_name', 'autopilot') - - # Spawn the cars and set their autopilot - batch.append(SpawnActor(blueprint, transform) - .then(SetAutopilot(FutureActor, True, self.traffic_manager.get_port()))) - - for response in self.client.apply_batch_sync(batch, self.args.sync): - if response.error: - logging.error(response.error) - else: - self.vehicle_ids.append(response.actor_id) - - self.vehicles = self.world.get_actors(self.vehicle_ids) - - # Set automatic vehicle lights update - if self.args.car_lights_on: - for actor in self.vehicles: - self.traffic_manager.update_vehicle_lights(actor, True) - - - def spawn_pedestrians(self): - - if self.args.number_of_walkers == 0: - return - - blueprints_walkers = get_actor_blueprints(self.world, self.args.filterw, self.args.generationw) - if not blueprints_walkers: - raise ValueError("Couldn't find any walkers with the specified filters") - blueprints_walkers = sorted(blueprints_walkers, key=lambda bp: bp.id) - - controller_bp = self.world.get_blueprint_library().find('controller.ai.walker') - - SpawnActor = carla.command.SpawnActor - - percentagePedestriansRunning = 0.0 # how many pedestrians will run - percentagePedestriansCrossing = 0.0 # how many pedestrians will walk through the road - - if self.args.seed: - self.world.set_pedestrians_seed(self.args.seed) - - # Get the spawn points - walker_spawn_points = [] - for i in range(self.args.number_of_walkers): - spawn_point = carla.Transform() - loc = self.world.get_random_location_from_navigation() - if (loc != None): - spawn_point.location = loc - spawn_point.location.z += 2 - walker_spawn_points.append(spawn_point) - - # Get the blueprints - batch = [] - walker_speed = [] - for spawn_point in walker_spawn_points: - walker_bp = random.choice(blueprints_walkers) - - if walker_bp.has_attribute('is_invincible'): - walker_bp.set_attribute('is_invincible', 'false') - - if walker_bp.has_attribute('speed'): - if (random.random() > percentagePedestriansRunning): - walker_speed.append(walker_bp.get_attribute('speed').recommended_values[1]) - else: - walker_speed.append(walker_bp.get_attribute('speed').recommended_values[2]) - else: - print("Found a walker has no speed") - walker_speed.append(0.0) - - batch.append(SpawnActor(walker_bp, spawn_point)) - - # Spawn the walkers - results = self.client.apply_batch_sync(batch, self.args.sync) - new_walker_speed = [] - for i in range(len(results)): - if results[i].error: - print(results[i].error) - else: - self.walkers.append({"id": results[i].actor_id}) - new_walker_speed.append(walker_speed[i]) - walker_speed = new_walker_speed - - # Spawn the controllers - batch = [] - for i in range(len(self.walkers)): - batch.append(SpawnActor(controller_bp, carla.Transform(), self.walkers[i]["id"])) - - results = self.client.apply_batch_sync(batch, self.args.sync) - for i in range(len(results)): - if results[i].error: - print(results[i].error) - else: - self.walkers[i]["controller"] = results[i].actor_id - - for i in range(len(self.walkers)): - self.walker_and_control_ids.append(self.walkers[i]["controller"]) - self.walker_and_control_ids.append(self.walkers[i]["id"]) - self.walkers_and_controls = self.world.get_actors(self.walker_and_control_ids) - - if not self.args.sync: - self.world.wait_for_tick() - else: - self.world.tick() - - self.world.set_pedestrians_cross_factor(percentagePedestriansCrossing) - for i in range(0, len(self.walker_and_control_ids), 2): - self.walkers_and_controls[i].start() - self.walkers_and_controls[i].go_to_location(self.world.get_random_location_from_navigation()) - self.walkers_and_controls[i].set_max_speed(float(walker_speed[int(i/2)])) - - def destroy(self): - self.client.apply_batch([carla.command.DestroyActor(x) for x in self.vehicle_ids]) - - # Stop walker controllers (list is [controller, actor, controller, actor ...]) - for i in range(0, len(self.walker_and_control_ids), 2): - self.walkers_and_controls[i].stop() - - self.client.apply_batch([carla.command.DestroyActor(x) for x in self.walker_and_control_ids]) - - -# ============================================================================== -# -- game_loop() --------------------------------------------------------------- -# ============================================================================== - - -def game_loop(args): - pygame.init() - pygame.font.init() - world = None - traffic = None - original_settings = None - - try: - client = carla.Client(args.host, args.port) - client.set_timeout(2000.0) - traffic_manager = client.get_trafficmanager() - - sim_world = client.get_world() - if args.sync: - original_settings = sim_world.get_settings() - settings = sim_world.get_settings() - if not settings.synchronous_mode: - settings.synchronous_mode = True - settings.fixed_delta_seconds = 0.05 - sim_world.apply_settings(settings) - - if args.autopilot and not sim_world.get_settings().synchronous_mode: - print("WARNING: You are currently in asynchronous mode and could " - "experience some issues with the traffic simulation") - - display = pygame.display.set_mode( - (args.width, args.height), - pygame.HWSURFACE | pygame.DOUBLEBUF) - display.fill((0,0,0)) - pygame.display.flip() - - hud = HUD(args.width, args.height) - world = World(sim_world, hud, traffic_manager, args) - controller = KeyboardControl(world, args.autopilot) - traffic = Traffic(client, sim_world, traffic_manager, args) - - if args.sync: - sim_world.tick() - else: - sim_world.wait_for_tick() - - clock = pygame.time.Clock() - while True: - if args.sync: - sim_world.tick() - clock.tick_busy_loop(60) - if controller.parse_events(client, world, clock, args.sync): - return - world.tick(clock) - world.render(display) - pygame.display.flip() - - finally: - - if original_settings: - if not original_settings.synchronous_mode: - traffic_manager.set_synchronous_mode(False) - sim_world.apply_settings(original_settings) - - if (world and world.recording_enabled): - client.stop_recorder() - - if world is not None: - world.destroy() - - if traffic is not None: - traffic.destroy() - - pygame.quit() - - -# ============================================================================== -# -- main() -------------------------------------------------------------------- -# ============================================================================== - - -def main(): - argparser = argparse.ArgumentParser(description='CARLA Manual Control Client') - argparser.add_argument( - '-v', '--verbose', action='store_true', dest='debug', - help='print debug information') - argparser.add_argument( - '--host', metavar='H', default='127.0.0.1', - help='IP of the host server (default: 127.0.0.1)') - argparser.add_argument( - '-p', '--port', metavar='P', default=2000, type=int, - help='TCP port to listen to (default: 2000)') - argparser.add_argument( - '-a', '--autopilot', action='store_true', - help='enable the autopilot for the manual controleld vehicle') - argparser.add_argument( - '--res', metavar='WIDTHxHEIGHT', default='1280x720', - help='window resolution (default: 1280x720)') - argparser.add_argument( - '--sync', action='store_true', - help='Activate synchronous mode execution') - argparser.add_argument( - '--rolename', metavar='NAME', default='hero', - help='actor role name (default: "hero")') - argparser.add_argument( - '-s', '--seed', metavar='S', type=int, - help='Sets the seed of all random behaviors') - argparser.add_argument( - '--car-lights-on', action='store_true', default=False, - help='Enable automatic car light management') - argparser.add_argument( - '--filter', metavar='PATTERN', default='vehicle.*', - help='actor filter (default: "vehicle.*")') - argparser.add_argument( - '--generation', metavar='G', default='All', - help='restrict to certain actor generation (values: "2","3","All" - default: "All")') - argparser.add_argument( - '--filterv', metavar='PATTERN', default='vehicle.*', - help='Traffic filter (default: "vehicle.*")') - argparser.add_argument( - '--generationv', metavar='G', default='All', - help='restrict the traffic to certain actor generation (values: "2","3","All" - default: "All")') - argparser.add_argument( - '--filterw', metavar='PATTERN', default='walker.pedestrian.*', - help='pedestrian filter (default: "walker.pedestrian.*")') - argparser.add_argument( - '--generationw', metavar='G', default='All', - help='restrict the pedestrians to certain actor generation (values: "2","3","All" - default: "All")') - argparser.add_argument( - '-n', '--number-of-vehicles', metavar='N', default=30, type=int, - help='Number of vehicles (default: 30)') - argparser.add_argument( - '-w', '--number-of-walkers', metavar='W', default=10, type=int, - help='Number of walkers (default: 10)') - argparser.add_argument( - '--safe', action='store_true', - help='Avoid spawning vehicles prone to accidents') - argparser.add_argument( - '--tm-port', metavar='P', default=8000, type=int, - help='Port to communicate with TM (default: 8000)') - argparser.add_argument( - '--hybrid', action='store_true', - help='Activate hybrid mode for Traffic Manager') - argparser.add_argument( - '--respawn', action='store_true', default=False, - help='Automatically respawn dormant vehicles (only in large maps)') - argparser.add_argument( - '--hero', action='store_true', default=False, - help='Set one of the vehicles as hero') - argparser.add_argument( - '--gamma', default=1.0, type=float, - help='Gamma correction of the camera (default: 1.0)') - args = argparser.parse_args() - - args.width, args.height = [int(x) for x in args.res.split('x')] - - log_level = logging.DEBUG if args.debug else logging.INFO - logging.basicConfig(format='%(levelname)s: %(message)s', level=log_level) - - logging.info('listening to server %s:%s', args.host, args.port) - - print(__doc__) - - try: - - game_loop(args) - - except KeyboardInterrupt: - print('\nCancelled by user. Bye!') - - -if __name__ == '__main__': - - main() diff --git a/PythonAPI/util/osm_to_xodr.py b/PythonAPI/util/osm_to_xodr.py deleted file mode 100644 index 5de2af782f..0000000000 --- a/PythonAPI/util/osm_to_xodr.py +++ /dev/null @@ -1,106 +0,0 @@ -#!/usr/bin/env python - -# Copyright (c) 2024 Computer Vision Center (CVC) at the Universitat Autonoma de -# Barcelona (UAB). -# -# This work is licensed under the terms of the MIT license. -# For a copy, see . - -""" Convert OpenStreetMap file to OpenDRIVE file. """ - -import argparse -import os -import sys - -import carla - - -def convert(args): - # Read the .osm data - with open(args.input_path, mode="r", encoding="utf-8") as osmFile: - osm_data = osmFile.read() - - # Define the desired settings - settings = carla.Osm2OdrSettings() - - # Set OSM road types to export to OpenDRIVE - settings.set_osm_way_types([ - "motorway", - "motorway_link", - "trunk", - "trunk_link", - "primary", - "primary_link", - "secondary", - "secondary_link", - "tertiary", - "tertiary_link", - "unclassified", - "residential" - ]) - settings.default_lane_width = args.lane_width - settings.generate_traffic_lights = args.traffic_lights - settings.all_junctions_with_traffic_lights = args.all_junctions_lights - settings.center_map = args.center_map - - # Convert to .xodr - xodr_data = carla.Osm2Odr.convert(osm_data, settings) - - # save opendrive file - with open(args.output_path, "w", encoding="utf-8") as xodrFile: - xodrFile.write(xodr_data) - -# ============================================================================== -# -- main() -------------------------------------------------------------------- -# ============================================================================== - - -def main(): - argparser = argparse.ArgumentParser( - description=__doc__) - argparser.add_argument( - '-i', '--input-path', required=True, metavar='OSM_FILE_PATH', - help='set the input OSM file path') - argparser.add_argument( - '-o', '--output-path', required=True, metavar='XODR_FILE_PATH', - help='set the output XODR file path') - argparser.add_argument( - '--lane-width', default=6.0, - help='width of each road lane in meters') - argparser.add_argument( - '--traffic-lights', action='store_true', - help='enable traffic light generation from OSM data') - argparser.add_argument( - '--all-junctions-lights', action='store_true', - help='set traffic lights for all junctions') - argparser.add_argument( - '--center-map', action='store_true', - help='set center of map to the origin coordinates') - - if len(sys.argv) < 2: - argparser.print_help() - return - - args = argparser.parse_args() - - if args.input_path is None or not os.path.exists(args.input_path): - print('input file not found.') - if args.output_path is None: - print('output file path not found.') - - print(__doc__) - - try: - convert(args) - except: - print('\nAn error has occurred in conversion.') - - -if __name__ == '__main__': - - try: - main() - except KeyboardInterrupt: - print('\nCancelled by user. Bye!') - except RuntimeError as e: - print(e)