diff --git a/assets/user_readme.md b/assets/user_readme.md index 3dc1efa..1d59809 100644 --- a/assets/user_readme.md +++ b/assets/user_readme.md @@ -141,7 +141,7 @@ The API for the simulator is the same as the API for the physical robot, so you As well as the logs being displayed in the console, they are also saved to a file. -This file is saved in the `zone_0` folder and has a name in the format `log-.log`. +This file is saved in the `zone_0` folder and has a name in the format `log-zone--.log`. The date is when that simulation was run. ### Simulation of Time diff --git a/scripts/generate_release.py b/scripts/generate_release.py index 2abe1ca..c65a4b2 100755 --- a/scripts/generate_release.py +++ b/scripts/generate_release.py @@ -78,8 +78,14 @@ logger.info("Copying helper scripts to temp directory") shutil.copy(project_root / "scripts/setup.py", temp_dir / "setup.py") for script in project_root.glob("scripts/run_*.py"): + if "run_comp_" in str(script): + continue shutil.copy(script, temp_dir) + script_dir = temp_dir / "scripts" + script_dir.mkdir() + shutil.copy(project_root / "scripts/run_comp_match.py", script_dir) + logger.info("Copying example code to temp directory") shutil.copytree(project_root / "example_robots", temp_dir / "example_robots") diff --git a/scripts/run_comp_match.py b/scripts/run_comp_match.py new file mode 100755 index 0000000..655cc73 --- /dev/null +++ b/scripts/run_comp_match.py @@ -0,0 +1,298 @@ +#!/usr/bin/env python3 +"""A script to run a competition match.""" +from __future__ import annotations + +import argparse +import json +import os +import shutil +import subprocess +import sys +from pathlib import Path +from tempfile import TemporaryDirectory +from zipfile import ZipFile + +if (Path(__file__).parents[1] / 'simulator/VERSION').exists(): + # Running in release mode, run_simulator will be in folder above + sys.path.append(str(Path(__file__).parents[1])) + +from run_simulator import get_webots_parameters + +NUM_ZONES = 4 +GAME_DURATION_SECONDS = 150 + + +class MatchParams(argparse.Namespace): + """Parameters for running a competition match.""" + + archives_dir: Path + match_num: int + teams: list[str] + duration: int + video_enabled: bool + video_resolution: tuple[int, int] + + +def load_team_code( + usercode_dir: Path, + arena_root: Path, + match_parameters: MatchParams, +) -> None: + """Load the team code into the arena root.""" + for zone_id, tla in enumerate(match_parameters.teams): + zone_path = arena_root / f"zone_{zone_id}" + + if zone_path.exists(): + shutil.rmtree(zone_path) + + if tla == '-': + # no team in this zone + continue + + zone_path.mkdir() + with ZipFile(usercode_dir / f'{tla}.zip') as zipfile: + zipfile.extractall(zone_path) + + +def generate_match_file(save_path: Path, match_parameters: MatchParams) -> None: + """Write the match file to the arena root.""" + match_file = save_path / 'match.json' + + # Use a format that is compatible with SRComp + match_file.write_text(json.dumps( + { + 'match_number': match_parameters.match_num, + 'arena_id': 'Simulator', + 'teams': { + tla: {'zone': idx} + for idx, tla in enumerate(match_parameters.teams) + if tla != '-' + }, + 'duration': match_parameters.duration, + 'recording_config': { + 'enabled': match_parameters.video_enabled, + 'resolution': list(match_parameters.video_resolution) + } + }, + indent=4, + )) + + +def set_comp_mode(arena_root: Path) -> None: + """Write the mode file to indicate that the competition is running.""" + (arena_root / 'mode.txt').write_text('comp') + + +def archive_zone_files( + team_archives_dir: Path, + arena_root: Path, + zone: int, + match_id: str, +) -> None: + """Zip the files in the zone directory and save them to the team archives directory.""" + zone_dir = arena_root / f'zone_{zone}' + + shutil.make_archive(str(team_archives_dir / f'{match_id}-zone-{zone}'), 'zip', zone_dir) + + +def archive_zone_folders( + archives_dir: Path, + arena_root: Path, + teams: list[str], + match_id: str, +) -> None: + """Zip the zone folders and save them to the archives directory.""" + for zone_id, tla in enumerate(teams): + if tla == '-': + # no team in this zone + continue + + tla_dir = archives_dir / tla + tla_dir.mkdir(exist_ok=True) + + archive_zone_files(tla_dir, arena_root, zone_id, match_id) + + +def archive_match_recordings(archives_dir: Path, arena_root: Path, match_id: str) -> None: + """Copy the video, animation, and image files to the archives directory.""" + recordings_dir = archives_dir / 'recordings' + recordings_dir.mkdir(exist_ok=True) + + match_recordings = arena_root / 'recordings' + + # Copy the video file + video_file = match_recordings / f'{match_id}.mp4' + if video_file.exists(): + shutil.copy(video_file, recordings_dir) + + # Copy the animation files + animation_files = [ + match_recordings / f'{match_id}.html', + match_recordings / f'{match_id}.json', + match_recordings / f'{match_id}.x3d', + match_recordings / f'{match_id}.css', + ] + for animation_file in animation_files: + shutil.copy(animation_file, recordings_dir) + + # Copy the animation textures + # Every match will have the same textures, so we only need one copy of them + textures_dir = match_recordings / 'textures' + shutil.copytree(textures_dir, recordings_dir / 'textures', dirs_exist_ok=True) + + # Copy the image file + image_file = match_recordings / f'{match_id}.jpg' + shutil.copy(image_file, recordings_dir) + + +def archive_match_file(archives_dir: Path, match_file: Path, match_number: int) -> None: + """ + Copy the match file (which may contain scoring data) to the archives directory. + + This also renames the file to be compatible with SRComp. + """ + matches_dir = archives_dir / 'matches' + matches_dir.mkdir(exist_ok=True) + + # SRComp expects YAML files. JSON is a subset of YAML, so we can just rename the file. + completed_match_file = matches_dir / f'{match_number:0>3}.yaml' + + shutil.copy(match_file, completed_match_file) + + +def archive_supervisor_log(archives_dir: Path, arena_root: Path, match_id: str) -> None: + """Archive the supervisor log file.""" + log_archive_dir = archives_dir / 'supervisor_logs' + log_archive_dir.mkdir(exist_ok=True) + + log_file = arena_root / f'supervisor-log-{match_id}.txt' + + shutil.copy(log_file, log_archive_dir) + + +def execute_match(arena_root: Path) -> None: + """Run Webots with the right world.""" + # Webots is only on the PATH on Linux so we have a helper function to find it + try: + webots, world_file = get_webots_parameters() + except RuntimeError: + raise FileNotFoundError("Webots executable not found.") + + sim_env = os.environ.copy() + sim_env['ARENA_ROOT'] = str(arena_root) + try: + subprocess.check_call( + [ + str(webots), + '--batch', + '--stdout', + '--stderr', + '--mode=realtime', + str(world_file), + ], + env=sim_env, + ) + except subprocess.CalledProcessError as e: + # TODO review log output here + raise RuntimeError(f"Webots failed with return code {e.returncode}") from e + + +def run_match(match_parameters: MatchParams) -> None: + """Run the match in a temporary directory and archive the results.""" + with TemporaryDirectory(suffix=f'match-{match_parameters.match_num}') as temp_folder: + arena_root = Path(temp_folder) + match_num = match_parameters.match_num + match_id = f'match-{match_num}' + archives_dir = match_parameters.archives_dir + + # unzip teams code into zone_N folders under this folder + load_team_code(archives_dir, arena_root, match_parameters) + # Create info file to tell the comp supervisor what match this is + # and how to handle recordings + generate_match_file(arena_root, match_parameters) + # Set mode file to comp + set_comp_mode(arena_root) + + try: + # Run webots with the right world + execute_match(arena_root) + except (FileNotFoundError, RuntimeError) as e: + print(f"Failed to run match: {e}") + # Save the supervisor log as it may contain useful information + archive_supervisor_log(archives_dir, arena_root, match_id) + raise + + # Archive the supervisor log first in case any collation fails + archive_supervisor_log(archives_dir, arena_root, match_id) + # Zip up and collect all files for each zone + archive_zone_folders(archives_dir, arena_root, match_parameters.teams, match_id) + # Collect video, animation & image + archive_match_recordings(archives_dir, arena_root, match_id) + # Collect ancillary files + archive_match_file(archives_dir, arena_root / 'match.json', match_num) + + +def parse_args() -> MatchParams: + """Parse command line arguments.""" + parser = argparse.ArgumentParser(description="Run a competition match.") + + parser.add_argument( + 'archives_dir', + help=( + "The directory containing the teams' robot code, as Zip archives " + "named for the teams' TLAs. This directory will also be used as the " + "root for storing the resulting logs and recordings." + ), + type=Path, + ) + parser.add_argument( + 'match_num', + type=int, + help="The number of the match to run.", + ) + parser.add_argument( + 'teams', + nargs=NUM_ZONES, + help=( + "TLA of the team in each zone, in order from zone 0 to " + f"{NUM_ZONES - 1}. Use dash (-) for an empty zone. " + "Must specify all zones." + ), + metavar='tla', + ) + parser.add_argument( + '--duration', + help="The duration of the match (in seconds).", + type=int, + default=GAME_DURATION_SECONDS, + ) + parser.add_argument( + '--no-record', + help=( + "Inhibit creation of the MPEG video, the animation is unaffected. " + "This can greatly increase the execution speed on GPU limited systems " + "when the video is not required." + ), + action='store_false', + dest='video_enabled', + ) + parser.add_argument( + '--resolution', + help="Set the resolution of the produced video.", + type=int, + nargs=2, + default=[1920, 1080], + metavar=('width', 'height'), + dest='video_resolution', + ) + return parser.parse_args(namespace=MatchParams()) + + +def main() -> None: + """Run a competition match entrypoint.""" + match_parameters = parse_args() + run_match(match_parameters) + + +if __name__ == '__main__': + main() diff --git a/scripts/run_simulator.py b/scripts/run_simulator.py index 6a76e3e..eb5b326 100755 --- a/scripts/run_simulator.py +++ b/scripts/run_simulator.py @@ -3,8 +3,9 @@ A script to run the project in Webots. Largely just a shortcut to running the arena world in Webots. -Only functional in releases. """ +from __future__ import annotations + import sys import traceback from pathlib import Path @@ -14,13 +15,27 @@ if sys.platform == "win32": from subprocess import CREATE_NEW_PROCESS_GROUP, DETACHED_PROCESS -try: - if not (Path(__file__).parent / 'simulator/VERSION').exists(): - print("This script is only functional in releases.") - raise RuntimeError +if (Path(__file__).parent / 'simulator/VERSION').exists(): + print("Running in release mode") + SIM_BASE = Path(__file__).parent +else: + print("Running in development mode") + # Assume the script is in the scripts directory + SIM_BASE = Path(__file__).parents[1] + - world_file = Path(__file__).parent / "simulator/worlds/arena.wbt" +def get_webots_parameters() -> tuple[Path, Path]: + """ + Get the paths to the Webots executable and the arena world file. + :return: The paths to the Webots executable and the arena world file + """ + world_file = SIM_BASE / "simulator/worlds/arena.wbt" + + if not world_file.exists(): + raise RuntimeError("World file not found.") + + # Check if Webots is in the PATH webots = which("webots") # Find the webots executable, if it is not in the PATH @@ -46,21 +61,36 @@ print("Webots executable not found.") raise RuntimeError - if not (Path(__file__).parent / "venv").exists(): + if not (SIM_BASE / "venv").exists(): print("Please run the setup.py script before running the simulator.") raise RuntimeError - # Run the world file in Webots, - # detaching the process so it does not close when this script does - if sys.platform == "win32": - Popen([webots, world_file], creationflags=DETACHED_PROCESS | CREATE_NEW_PROCESS_GROUP) - else: - Popen([webots, world_file], start_new_session=True) -except RuntimeError: - input("Press enter to continue...") - exit(1) -except Exception as e: - print(f"An error occurred: {e}") - print(traceback.format_exc()) - input("Press enter to continue...") - exit(1) + return Path(webots), world_file + + +def main() -> None: + """Run the project in Webots.""" + try: + webots, world_file = get_webots_parameters() + + # Run the world file in Webots, + # detaching the process so it does not close when this script does + if sys.platform == "win32": + Popen( + [str(webots), str(world_file)], + creationflags=DETACHED_PROCESS | CREATE_NEW_PROCESS_GROUP, + ) + else: + Popen([str(webots), str(world_file)], start_new_session=True) + except RuntimeError: + input("Press enter to continue...") + exit(1) + except Exception as e: + print(f"An error occurred: {e}") + print(traceback.format_exc()) + input("Press enter to continue...") + exit(1) + + +if __name__ == "__main__": + main() diff --git a/scripts/setup.py b/scripts/setup.py index 928d9b7..4b1557e 100755 --- a/scripts/setup.py +++ b/scripts/setup.py @@ -20,37 +20,16 @@ logging.basicConfig(level=logging.INFO, format="[%(asctime)s] %(levelname)s: %(message)s") logger = logging.getLogger(__name__) -try: - if (Path(__file__).parent / 'simulator/VERSION').exists(): - # This is running from a release - project_root = Path(__file__).parent - requirements = project_root / "simulator/requirements.txt" - else: - # This is running from the repository - project_root = Path(__file__).parents[1] - requirements = project_root / "requirements.txt" - - venv_dir = project_root / "venv" - logger.info(f"Creating virtual environment in {venv_dir.absolute()}") - create(venv_dir, with_pip=True) - - logger.info(f"Installing dependencies from {requirements.absolute()}") - if platform.system() == "Windows": - pip = venv_dir / "Scripts/pip.exe" - venv_python = venv_dir / "Scripts/python" - else: - pip = venv_dir / "bin/pip" - venv_python = venv_dir / "bin/python" - run( - [str(venv_python), "-m", "pip", "install", "--upgrade", "pip", "setuptools", "wheel"], - cwd=venv_dir, - ) - run([str(pip), "install", "-r", str(requirements)], cwd=venv_dir) +def populate_python_config(runtime_ini: Path, venv_python: Path) -> None: + """ + Populate the python configuration in the runtime.ini file. - logger.info("Setting up Webots Python location") + This will set the python command to the virtual environment python. - runtime_ini = project_root / "simulator/controllers/usercode_runner/runtime.ini" + :param runtime_ini: The path to the runtime.ini file + :param venv_python: The path to the virtual environment python executable + """ runtime_content: list[str] = [] if runtime_ini.exists(): prev_runtime_content = runtime_ini.read_text().splitlines() @@ -79,6 +58,43 @@ runtime_ini.write_text('\n'.join(runtime_content)) + +try: + if (Path(__file__).parent / 'simulator/VERSION').exists(): + # This is running from a release + project_root = Path(__file__).parent + requirements = project_root / "simulator/requirements.txt" + else: + # This is running from the repository + project_root = Path(__file__).parents[1] + requirements = project_root / "requirements.txt" + + venv_dir = project_root / "venv" + + logger.info(f"Creating virtual environment in {venv_dir.absolute()}") + create(venv_dir, with_pip=True) + + logger.info(f"Installing dependencies from {requirements.absolute()}") + if platform.system() == "Windows": + pip = venv_dir / "Scripts/pip.exe" + venv_python = venv_dir / "Scripts/python" + else: + pip = venv_dir / "bin/pip" + venv_python = venv_dir / "bin/python" + run( + [str(venv_python), "-m", "pip", "install", "--upgrade", "pip", "setuptools", "wheel"], + cwd=venv_dir, + ) + run([str(pip), "install", "-r", str(requirements)], cwd=venv_dir) + + logger.info("Setting up Webots Python location") + + controllers_dir = project_root / "simulator/controllers" + usercode_ini = controllers_dir / "usercode_runner/runtime.ini" + supervisor_ini = controllers_dir / "competition_supervisor/runtime.ini" + populate_python_config(usercode_ini, venv_python) + populate_python_config(supervisor_ini, venv_python) + # repopulate zone 0 with example code if robot.py is missing zone_0 = project_root / "zone_0" if not (zone_0 / "robot.py").exists(): diff --git a/simulator/controllers/competition_supervisor/competition_supervisor.py b/simulator/controllers/competition_supervisor/competition_supervisor.py new file mode 100644 index 0000000..6eafd23 --- /dev/null +++ b/simulator/controllers/competition_supervisor/competition_supervisor.py @@ -0,0 +1,260 @@ +""".""" +from __future__ import annotations + +import sys +import time +from contextlib import contextmanager +from pathlib import Path +from typing import Iterator + +from controller import Supervisor + +# Robot constructor lacks a return type annotation in R2023b +sys.path.insert(0, Supervisor().getProjectPath()) # type: ignore[no-untyped-call] +import environment # configure path to include modules +from lighting_control import LightingControl +from robot_logging import get_match_identifier, prefix_and_tee_streams +from robot_utils import get_game_mode, get_match_data, get_robot_file + +# Get the robot object that was created when setting up the environment +_robot = Supervisor.created +assert _robot is not None, "Robot object not created" +supervisor: Supervisor = _robot # type: ignore[assignment] + + +class RobotData: + """Data about a robot in the arena.""" + + def __init__(self, zone: int): + self.registered_ready = False + self.zone = zone + self.robot = supervisor.getFromDef(f'ROBOT{zone}') + if self.robot is None: + raise ValueError(f"Failed to get Webots node for zone {zone}") + + def zone_occupied(self) -> bool: + """Check if this zone has a robot.py file associated with it.""" + try: + _ = get_robot_file(self.zone) + except FileNotFoundError: + return False + return True + + def remove_robot(self) -> None: + """Delete the robot proto from the world.""" + self.robot.remove() # type: ignore[attr-defined] + + def preset_robot(self) -> None: + """Arm the robot so that it waits for the start signal.""" + self.robot.getField('customData').setSFString('prestart') # type: ignore[attr-defined] + + def robot_ready(self) -> bool: + """Check if robot has set its pre-start flag.""" + return bool(self.robot.getField('customData').getSFString() == 'ready') # type: ignore[attr-defined] + + def start_robot(self) -> None: + """Signal to the robot that the start button has been pressed.""" + self.robot.getField('customData').setSFString('start') # type: ignore[attr-defined] + + +class Robots: + """A collection of robots in the arena.""" + + def __init__(self) -> None: + self.robots: dict[int, RobotData] = {} + + for zone in range(0, environment.NUM_ZONES): + try: + robot_data = RobotData(zone) + except ValueError as e: + print(e) + else: + self.robots[zone] = robot_data + + def remove_unoccupied_robots(self) -> None: + """Remove all robots that don't have usercode.""" + for robot in list(self.robots.values()): + if not robot.zone_occupied(): + robot.remove_robot() + _ = self.robots.pop(robot.zone) + + def preset_robots(self) -> None: + """Arm all robots so that they wait for the start signal.""" + for robot in self.robots.values(): + robot.preset_robot() + + def wait_for_ready(self, timeout: float) -> None: + """Wait for all robots to set their pre-start flags.""" + end_time = supervisor.getTime() + timeout + while supervisor.getTime() < end_time: + all_ready = True + # Sleep in individual timesteps to allow the robots to update + supervisor.step() + + for zone, robot in self.robots.items(): + if not robot.registered_ready: + if robot.robot_ready(): + print(f"Robot in zone {zone} is ready.") + # Log only once per robot when ready + robot.registered_ready = True + else: + all_ready = False + if all_ready: + break + else: + pending_robots = ', '.join([ + str(zone) + for zone, robot in self.robots.items() + if not robot.robot_ready() + ]) + raise TimeoutError( + f"Robots in zones {pending_robots} failed to initialise. " + f"Failed to reach wait_start() within {timeout} seconds." + ) + + def start_robots(self) -> None: + """Signal to all robots that their start buttons have been pressed.""" + for robot in self.robots.values(): + robot.start_robot() + + +def is_dev_mode() -> bool: + """Load the mode file and check if we are in dev mode.""" + return (get_game_mode() == 'dev') + + +@contextmanager +def record_animation(filename: Path) -> Iterator[None]: + """Record an animation for the duration of the manager.""" + filename.parent.mkdir(parents=True, exist_ok=True) + print(f"Saving animation to {filename}") + supervisor.animationStartRecording(str(filename)) + yield + supervisor.animationStopRecording() # type: ignore[no-untyped-call] + + +@contextmanager +def record_video( + filename: Path, + resolution: tuple[int, int], + skip: bool = False +) -> Iterator[None]: + """Record a video for the duration of the manager.""" + filename.parent.mkdir(parents=True, exist_ok=True) + + if skip: + print('Not recording movie') + yield + return + else: + print(f"Saving video to {filename}") + + supervisor.movieStartRecording( + str(filename), + width=resolution[0], + height=resolution[1], + quality=100, + codec=0, + acceleration=1, + caption=False, + ) + yield + supervisor.movieStopRecording() # type: ignore[no-untyped-call] + + while not supervisor.movieIsReady(): # type: ignore[no-untyped-call] + time.sleep(0.1) + + if supervisor.movieFailed(): # type: ignore[no-untyped-call] + print("Movie failed to record") + + +def save_image(filename: Path) -> None: + """Capture an image of the arena.""" + filename.parent.mkdir(parents=True, exist_ok=True) + print(f"Saving image to {filename}") + supervisor.exportImage(str(filename), 100) + + +def run_match( + match_duration: int, + media_path_stem: Path, + video_resolution: tuple[int, int], + skip_video: bool, +) -> None: + """Run a match in the arena.""" + robots = Robots() + robots.remove_unoccupied_robots() + + time_step = int(supervisor.getBasicTimeStep()) + match_timesteps = (match_duration * 1000) // time_step + lighting_control = LightingControl(supervisor, match_timesteps) + + robots.preset_robots() + + robots.wait_for_ready(5) + + with record_animation(media_path_stem.with_suffix('.html')): + # Animations don't support lighting changes so start the animation before + # setting the lighting. Step the simulation to allow the animation to start. + supervisor.step() + # Set initial lighting + lighting_control.service_lighting(0) + with record_video(media_path_stem.with_suffix('.mp4'), video_resolution, skip_video): + print("===========") + print("Match start") + print("===========") + + # We are ready to start the match now. "Press" the start button on the robots + robots.start_robots() + supervisor.simulationSetMode(Supervisor.SIMULATION_MODE_FAST) # type: ignore[attr-defined] + + for current_step in range(match_timesteps + 1): + lighting_control.service_lighting(current_step) + supervisor.step(time_step) + + print("==================") + print("Game over, pausing") + print("==================") + supervisor.simulationSetMode(Supervisor.SIMULATION_MODE_PAUSE) # type: ignore[attr-defined] + + # To allow for a clear image of the final state, we have reset the + # lighting after the final frame of the video. + save_image(media_path_stem.with_suffix('.jpg')) + # TODO score match + + +def main() -> None: + """Run the competition supervisor.""" + if is_dev_mode(): + exit() + + match_data = get_match_data() + match_id = get_match_identifier() + + prefix_and_tee_streams( + environment.ARENA_ROOT / f'supervisor-log-{match_id}.txt', + prefix=lambda: f'[{supervisor.getTime():0.3f}] ', + ) + + try: + # TODO check for required libraries? + + run_match( + match_data.match_duration, + environment.ARENA_ROOT / 'recordings' / match_id, + video_resolution=match_data.video_resolution, + skip_video=(not match_data.video_enabled), + ) + # Set the overall Webots exit code to follow the supervisor's exit code + except Exception as e: + # Print and step so error is printed to console + print(f"Error: {e}") + supervisor.step() + supervisor.simulationQuit(1) + raise + else: + supervisor.simulationQuit(0) + + +if __name__ == '__main__': + main() diff --git a/simulator/controllers/competition_supervisor/lighting_control.py b/simulator/controllers/competition_supervisor/lighting_control.py new file mode 100644 index 0000000..2485073 --- /dev/null +++ b/simulator/controllers/competition_supervisor/lighting_control.py @@ -0,0 +1,287 @@ +""" +The controller for altering arena lighting provided by a DirectionalLight and a Background. + +Currently doesn't support: +- Timed pre-match lighting changes +""" +from __future__ import annotations + +from typing import NamedTuple + +from controller import Node, Supervisor + +MATCH_LIGHTING_INTENSITY = 1.5 +DEFAULT_LUMINOSITY = 1 + + +class FromEnd(NamedTuple): + """ + Represents a time relative to the end of the match. + + Negative values are times before the end of the match. 0 is the last frame + of the video. All positive values will only appear in the post-match image. + """ + + time: float + + +class ArenaLighting(NamedTuple): + """Represents a lighting configuration for the arena.""" + + light_def: str + intensity: float + colour: tuple[float, float, float] = (1, 1, 1) + + +class LightingEffect(NamedTuple): + """Represents a lighting effect to be applied to the arena.""" + + start_time: float | FromEnd + fade_time: float | None = None + lighting: ArenaLighting = ArenaLighting('SUN', intensity=MATCH_LIGHTING_INTENSITY) + luminosity: float = DEFAULT_LUMINOSITY + name: str = "" + + def __repr__(self) -> str: + light = self.lighting + lights_info = [ + f"({light.light_def}, int={light.intensity}, col={light.colour})" + ] + return ( + f"" + ) + + +class LightingStep(NamedTuple): + """Represents a step in a lighting fade.""" + + timestep: int + light_node: Node + intensity: float | None + colour: tuple[float, float, float] | None + luminosity: float | None + name: str | None = None + + +CUE_STACK = [ + LightingEffect( + 0, + lighting=ArenaLighting('SUN', intensity=0.2), + luminosity=0.05, + name="Pre-set", + ), + LightingEffect( + 0, + fade_time=1.5, + lighting=ArenaLighting('SUN', intensity=MATCH_LIGHTING_INTENSITY), + luminosity=1, + name="Fade-up", + ), + LightingEffect( + FromEnd(0), # This time runs this cue as the last frame of the video + lighting=ArenaLighting('SUN', intensity=1, colour=(0.8, 0.1, 0.1)), + luminosity=0.1, + name="End of match", + ), + LightingEffect( + FromEnd(1), + lighting=ArenaLighting('SUN', intensity=MATCH_LIGHTING_INTENSITY), + luminosity=DEFAULT_LUMINOSITY, + name="Post-match image", + ), +] + + +class LightingControl: + """Controller for managing lighting effects in the arena.""" + + def __init__(self, supervisor: Supervisor, duration: int) -> None: + self._robot = supervisor + self._final_timestep = duration + self.timestep = self._robot.getBasicTimeStep() + self.ambient_node = supervisor.getFromDef('AMBIENT') + + # fetch all nodes used in effects, any missing nodes will be flagged here + light_names = set(effect.lighting.light_def for effect in CUE_STACK) + self.lights = { + name: supervisor.getFromDef(name) + for name in light_names + } + missing_lights = [name for name, light in self.lights.items() if light is None] + if missing_lights: + raise ValueError(f"Missing light nodes: {missing_lights}") + + # Convert FromEnd times to absolute times + cue_stack = self.convert_from_end_times(CUE_STACK) + + self.lighting_steps = self.generate_lighting_steps(cue_stack) + + def convert_from_end_times(self, cue_stack: list[LightingEffect]) -> list[LightingEffect]: + """Convert FromEnd times to absolute times.""" + new_cue_stack = [] + end_time = (self._final_timestep * self.timestep) / 1000 + # @ 25 fps the last 5 timesteps are not included in the video + start_of_frame_offset = self.timestep * 6 / 1000 + for cue in cue_stack: + if isinstance(cue.start_time, FromEnd): + abs_time = end_time + cue.start_time.time - start_of_frame_offset + new_cue_stack.append(cue._replace(start_time=abs_time)) + else: + new_cue_stack.append(cue) + + return new_cue_stack + + def generate_lighting_steps(self, cue_stack: list[LightingEffect]) -> list[LightingStep]: + """Expand the cue stack into a list of lighting steps.""" + steps: list[LightingStep] = [] + + # Generate current values for all lights + current_values = { + name: LightingStep( + 0, + light, + light.getField('intensity').getSFFloat(), # type: ignore[attr-defined] + light.getField('color').getSFColor(), # type: ignore[attr-defined] + 0, + ) + for name, light in self.lights.items() + } + current_luminosity = self.ambient_node.getField('luminosity').getSFFloat() # type: ignore[attr-defined] + + for cue in cue_stack: + # Get the current state of the light with the current luminosity + current_state = current_values[cue.lighting.light_def] + current_state = current_state._replace(luminosity=current_luminosity) + + expanded_cue = self.expand_lighting_fade(cue, current_state) + + # Update current values from the last step of the cue + current_values[cue.lighting.light_def] = expanded_cue[-1] + current_luminosity = expanded_cue[-1].luminosity + + steps.extend(expanded_cue) + + steps.sort(key=lambda x: x.timestep) + # TODO optimise steps to remove duplicate steps + + return steps + + def expand_lighting_fade( + self, + cue: LightingEffect, + current_state: LightingStep, + ) -> list[LightingStep]: + """Expand a fade effect into a list of steps.""" + fades = [] + + assert isinstance(cue.start_time, float), \ + "FromEnd times should be converted to absolute times" + cue_start = int((cue.start_time * 1000) / self.timestep) + + if cue.fade_time is None: + # no fade, just set values + return [LightingStep( + cue_start, + self.lights[cue.lighting.light_def], + cue.lighting.intensity, + cue.lighting.colour, + cue.luminosity, + cue.name + )] + + assert current_state.intensity is not None, "Current intensity should be set" + assert current_state.colour is not None, "Current colour should be set" + assert current_state.luminosity is not None, "Current luminosity should be set" + + fade_steps = int((cue.fade_time * 1000) / self.timestep) + if fade_steps == 0: + fade_steps = 1 + intensity_step = (cue.lighting.intensity - current_state.intensity) / fade_steps + colour_step = [ + (cue.lighting.colour[0] - current_state.colour[0]) / fade_steps, + (cue.lighting.colour[1] - current_state.colour[1]) / fade_steps, + (cue.lighting.colour[2] - current_state.colour[2]) / fade_steps, + ] + luminosity_step = (cue.luminosity - current_state.luminosity) / fade_steps + + for step in range(fade_steps): + fades.append( + LightingStep( + cue_start + step, + self.lights[cue.lighting.light_def], + current_state.intensity + intensity_step * step, + ( + current_state.colour[0] + (colour_step[0] * step), + current_state.colour[1] + (colour_step[1] * step), + current_state.colour[2] + (colour_step[2] * step), + ), + current_state.luminosity + luminosity_step * step, + cue.name if step == 0 else None, + ) + ) + + # Replace the last step with the final values + fades.pop() + fades.append(LightingStep( + cue_start + fade_steps, + self.lights[cue.lighting.light_def], + cue.lighting.intensity, + cue.lighting.colour, + cue.luminosity, + )) + + return fades + + def set_luminosity(self, luminosity: float) -> None: + """Set the luminosity of the ambient node.""" + self.ambient_node.getField('luminosity').setSFFloat(float(luminosity)) # type: ignore[attr-defined] + + def set_node_intensity(self, node: Node, intensity: float) -> None: + """Set the intensity of a node.""" + node.getField('intensity').setSFFloat(float(intensity)) # type: ignore[attr-defined] + + def set_node_colour(self, node: Node, colour: tuple[float, float, float]) -> None: + """Set the colour of a node.""" + node.getField('color').setSFColor(list(colour)) # type: ignore[attr-defined] + + def service_lighting(self, current_timestep: int) -> int: + """Service the lighting effects for the current timestep.""" + index = 0 + + if current_timestep >= self._final_timestep and self.lighting_steps: + # Run all remaining steps + current_timestep = self.lighting_steps[-1].timestep + + while ( + len(self.lighting_steps) > index + and self.lighting_steps[index].timestep == current_timestep + ): + lighting_step = self.lighting_steps[index] + if lighting_step.name is not None: + print( + f"Running lighting effect: {lighting_step.name} @ " + f"{current_timestep * self.timestep / 1000}" + ) + + if lighting_step.intensity is not None: + self.set_node_intensity(lighting_step.light_node, lighting_step.intensity) + + if lighting_step.colour is not None: + self.set_node_colour(lighting_step.light_node, lighting_step.colour) + + if lighting_step.luminosity is not None: + self.set_luminosity(lighting_step.luminosity) + + index += 1 + + # Remove all steps that have been processed + self.lighting_steps = self.lighting_steps[index:] + + if self.lighting_steps: + return self.lighting_steps[0].timestep - current_timestep + else: + return -1 diff --git a/simulator/controllers/usercode_runner/usercode_runner.py b/simulator/controllers/usercode_runner/usercode_runner.py index c090217..ce87da3 100644 --- a/simulator/controllers/usercode_runner/usercode_runner.py +++ b/simulator/controllers/usercode_runner/usercode_runner.py @@ -12,10 +12,8 @@ import logging import os import runpy -import subprocess import sys import threading -from datetime import datetime from pathlib import Path from tempfile import TemporaryDirectory @@ -24,7 +22,8 @@ # Robot constructor lacks a return type annotation in R2023b sys.path.insert(0, Robot().getProjectPath()) # type: ignore[no-untyped-call] import environment # configure path to include modules -from robot_logging import prefix_and_tee_streams +from robot_logging import get_match_identifier, prefix_and_tee_streams +from robot_utils import get_game_mode, get_robot_file, print_simulation_version from sbot_interface.setup import setup_devices from sbot_interface.socket_server import SocketServer @@ -36,65 +35,6 @@ LOGGER = logging.getLogger('usercode_runner') -def get_robot_file(robot_zone: int) -> Path: - """ - Get the path to the robot file for the given zone. - - :param robot_zone: The zone number - :return: The path to the robot file - :raises FileNotFoundError: If no robot controller is found for the given zone - """ - robot_file = environment.ZONE_ROOT / f'zone_{robot_zone}' / 'robot.py' - - # Check if the robot file exists - if not robot_file.exists(): - raise FileNotFoundError(f"No robot code to run for zone {robot_zone}") - - return robot_file - - -def get_game_mode() -> str: - """ - Get the game mode from the game mode file. - - Default to 'dev' if the file does not exist. - - :return: The game mode - """ - if environment.GAME_MODE_FILE.exists(): - game_mode = environment.GAME_MODE_FILE.read_text().strip() - else: - game_mode = 'dev' - - assert game_mode in ['dev', 'comp'], f'Invalid game mode: {game_mode}' - - return game_mode - - -def print_simulation_version() -> None: - """ - Print the version of the simulator that is running. - - Uses a VERSION file in the root of the simulator to determine the version. - For development, the version is uses the git describe command. - - The version is printed to the console. - """ - version_file = environment.SIM_ROOT / 'VERSION' - if version_file.exists(): - version = version_file.read_text().strip() - else: - try: - version = subprocess.check_output( - ['git', 'describe', '--tags', '--always'], - cwd=str(environment.SIM_ROOT.resolve()), - ).decode().strip() - except subprocess.CalledProcessError: - version = 'unknown' - - print(f"Running simulator version: {version}") - - def start_devices() -> SocketServer: """ Create the board simulators and return the SocketServer object. @@ -175,7 +115,7 @@ def main() -> bool: # Setup log file prefix_and_tee_streams( - robot_file.parent / f'log-{datetime.now():%Y_%m_%dT%H_%M_%S}.txt', + robot_file.parent / f'log-zone-{zone}-{get_match_identifier()}.txt', prefix=lambda: f'[{zone}| {robot.getTime():0.3f}] ', ) diff --git a/simulator/environment.py b/simulator/environment.py index b512acc..2a1d673 100644 --- a/simulator/environment.py +++ b/simulator/environment.py @@ -3,15 +3,27 @@ Also contains constants for where several important files are located. """ +import os import sys from pathlib import Path SIM_ROOT = Path(__file__).absolute().parent -ZONE_ROOT = SIM_ROOT.parent MODULES_ROOT = SIM_ROOT / 'modules' -GAME_MODE_FILE = SIM_ROOT / 'mode.txt' + +ARENA_ROOT = Path(os.environ.get('ARENA_ROOT', SIM_ROOT.parent)) +ZONE_ROOT = ARENA_ROOT +GAME_MODE_FILE = ARENA_ROOT / 'mode.txt' NUM_ZONES = 4 +DEFAULT_MATCH_DURATION = 150 # seconds + + +if not ARENA_ROOT.is_absolute(): + # Webots sets the current directory of each controller to the directory of + # the controller file. As such, relative paths would be ambiguous. + # Hint: `$PWD` or `%CD%` may be useful to construct an absolute path from + # your relative path. + raise ValueError(f"'ARENA_ROOT' must be an absolute path, got {ARENA_ROOT!r}") def setup_environment() -> None: diff --git a/simulator/modules/robot_logging.py b/simulator/modules/robot_logging.py index 56ba910..a6d39ff 100644 --- a/simulator/modules/robot_logging.py +++ b/simulator/modules/robot_logging.py @@ -7,10 +7,15 @@ from __future__ import annotations import sys +from datetime import datetime from io import TextIOWrapper from pathlib import Path from typing import Callable, TextIO +from robot_utils import get_match_data + +DATE_IDENTIFIER = datetime.now().strftime("%Y_%m_%dT%H_%M_%S") + class Tee(TextIOWrapper): """Forwards calls from its `write` and `flush` methods to each of the given targets.""" @@ -109,3 +114,19 @@ def prefix_and_tee_streams(name: Path, prefix: Callable[[], str] | str | None = ), prefix=prefix, ) + + +def get_match_identifier() -> str: + """ + Get the identifier for this run of the simulator. + + This identifier is used to name the log files. + + :return: The match identifier + """ + match_data = get_match_data() + + if match_data.match_number is not None: + return f"match-{match_data.match_number}" + else: + return DATE_IDENTIFIER diff --git a/simulator/modules/robot_utils.py b/simulator/modules/robot_utils.py new file mode 100644 index 0000000..04b2833 --- /dev/null +++ b/simulator/modules/robot_utils.py @@ -0,0 +1,116 @@ +"""General utilities that are useful across runners.""" +from __future__ import annotations + +import json +import subprocess +import sys +from pathlib import Path +from typing import NamedTuple + +# Configure path to import the environment configuration +sys.path.insert(0, str(Path(__file__).parents[1])) +import environment + +# Reset the path +del sys.path[0] + + +class MatchData(NamedTuple): + """ + Data about the current match. + + :param match_number: The current match number + :param match_duration: The duration of the match in seconds + :param video_enabled: Whether video recording is enabled + :param video_resolution: The resolution of the video recording + """ + + match_number: int | None = None + match_duration: int = environment.DEFAULT_MATCH_DURATION + video_enabled: bool = True + video_resolution: tuple[int, int] = (1920, 1080) + + +def get_robot_file(robot_zone: int) -> Path: + """ + Get the path to the robot file for the given zone. + + :param robot_zone: The zone number + :return: The path to the robot file + :raises FileNotFoundError: If no robot controller is found for the given zone + """ + robot_file = environment.ZONE_ROOT / f'zone_{robot_zone}' / 'robot.py' + + # Check if the robot file exists + if not robot_file.exists(): + raise FileNotFoundError(f"No robot code to run for zone {robot_zone}") + + return robot_file + + +def get_game_mode() -> str: + """ + Get the game mode from the game mode file. + + Default to 'dev' if the file does not exist. + + :return: The game mode + """ + if environment.GAME_MODE_FILE.exists(): + game_mode = environment.GAME_MODE_FILE.read_text().strip() + else: + game_mode = 'dev' + + assert game_mode in ['dev', 'comp'], f'Invalid game mode: {game_mode}' + + return game_mode + + +def print_simulation_version() -> None: + """ + Print the version of the simulator that is running. + + Uses a VERSION file in the root of the simulator to determine the version. + For development, the version is uses the git describe command. + + The version is printed to the console. + """ + version_file = environment.SIM_ROOT / 'VERSION' + if version_file.exists(): + version = version_file.read_text().strip() + else: + try: + version = subprocess.check_output( + ['git', 'describe', '--tags', '--always'], + cwd=str(environment.SIM_ROOT.resolve()), + ).decode().strip() + except subprocess.CalledProcessError: + version = 'unknown' + + print(f"Running simulator version: {version}") + + +def get_match_data() -> MatchData: + """Load the match data from the match data file.""" + match_data_file = environment.ARENA_ROOT / 'match.json' + default_match_data = MatchData() + + if match_data_file.exists(): + # TODO error handling for invalid json + raw_data = json.loads(match_data_file.read_text()) + match_data = MatchData( + match_number=raw_data.get('match_number', default_match_data.match_number), + match_duration=raw_data.get('duration', default_match_data.match_duration), + video_enabled=( + raw_data.get('recording_config', {}) + .get('enabled', default_match_data.video_enabled) + ), + video_resolution=( + raw_data.get('recording_config', {}) + .get('video_resolution', default_match_data.video_resolution) + ), + ) + else: + match_data = default_match_data + + return match_data diff --git a/simulator/worlds/arena.wbt b/simulator/worlds/arena.wbt index 321f53f..e56ff2f 100755 --- a/simulator/worlds/arena.wbt +++ b/simulator/worlds/arena.wbt @@ -18,7 +18,7 @@ DEF AMBIENT Background { ] luminosity 1 } -DirectionalLight { +DEF SUN DirectionalLight { ambientIntensity 1 direction -0.1 -0.2 -1 color 0.95 0.95 1 @@ -64,6 +64,13 @@ DEF ROBOT3 SR2025bot { customData "start" } +Robot { + name "competition_supervisor" + controller "competition_supervisor" + supervisor TRUE +} + + Arena { size 5.75 5.75 locked TRUE