From 04216eaea3803cc70bc11e04fad5a9c927539417 Mon Sep 17 00:00:00 2001 From: chrehall68 <60240707+chrehall68@users.noreply.github.com> Date: Sat, 13 Apr 2024 13:01:46 -0700 Subject: [PATCH] Fully migrate to Bridge GPS (#128) * migrate from wireless to bridge, move gps dependency to worldview * fix local import errors * Consolidate bridge services into `rover.py` --- README.md | 8 ++ unified_frameworks/NavigatorClass.py | 23 ++++- unified_frameworks/captain.py | 11 +-- unified_frameworks/rover.py | 19 ++++ .../sensor_array/actual_gps_test.py | 15 --- .../gps_compass/actual_gps_compass.py | 70 ++++++++------ .../{ => gps_compass}/bridge_gps.py | 73 +++++++++----- .../gps_compass/fake_gps_compass.py | 11 ++- .../sensor_array/gps_compass/gps_compass.py | 27 ------ .../gps_compass/gps_compass_class.py | 7 +- .../gps_compass/wireless_gps_compass.py | 94 ------------------- unified_frameworks/sensor_array/server_gps.py | 56 ----------- unified_frameworks/worldview.py | 69 ++++++++------ 13 files changed, 194 insertions(+), 289 deletions(-) create mode 100644 unified_frameworks/rover.py delete mode 100644 unified_frameworks/sensor_array/actual_gps_test.py rename unified_frameworks/sensor_array/{ => gps_compass}/bridge_gps.py (58%) delete mode 100644 unified_frameworks/sensor_array/gps_compass/gps_compass.py delete mode 100644 unified_frameworks/sensor_array/gps_compass/wireless_gps_compass.py delete mode 100644 unified_frameworks/sensor_array/server_gps.py diff --git a/README.md b/README.md index a49c3fd..995cba6 100644 --- a/README.md +++ b/README.md @@ -20,3 +20,11 @@ pip install -r env_files/rover_requirements.txt ``` ![classes](https://github.com/SJSURoboticsTeam/urc-intelligent-systems-2023/assets/50222631/3d47c3d4-b21a-463e-9739-bc99b61f450f) + +## Running + +To get the mission started, first run `python3 rover.py` from within the `unified_frameworks` directory on the Raspberry Pi. This +starts up the bridge on the rover side and gets all necessary sensors running. + +From there, you can run `python3 captain.py` from within the `unified_frameworks` directory on your local machine +to begin the pathfinding and visualization. diff --git a/unified_frameworks/NavigatorClass.py b/unified_frameworks/NavigatorClass.py index 950cd9b..4a7f60e 100644 --- a/unified_frameworks/NavigatorClass.py +++ b/unified_frameworks/NavigatorClass.py @@ -1,24 +1,43 @@ from abc import ABC, abstractmethod import numpy as np +from worldview import Worldview + class Navigator(ABC): - def __init__(self, worldview) -> None: + def __init__(self, worldview: Worldview) -> None: self.worldview = worldview + @abstractmethod def get_path(self) -> np.ndarray: return [] + @abstractmethod def get_tree_links(self) -> np.ndarray: return [] + @abstractmethod def start_pathfinder_service(self): pass + @abstractmethod def stop_pathfinder_service(self): pass + @abstractmethod def set_goal(self, polar_point): pass + @abstractmethod def get_goal(self): - pass \ No newline at end of file + pass + + def set_gps_goal(self, target_latitude: float, target_longitude: float) -> None: + """ + Set's the navigator's goal to a given GPS coordinate. Must be called frequently in + order to continually head in the right direction. + """ + self.set_goal( + self.worldview.geographic_coordinates_to_relative_coordinates( + target_latitude, target_longitude + ) + ) diff --git a/unified_frameworks/captain.py b/unified_frameworks/captain.py index 4d9fbf0..290ad90 100644 --- a/unified_frameworks/captain.py +++ b/unified_frameworks/captain.py @@ -15,7 +15,6 @@ except: sys.path.append(os.path.realpath(__file__ + os.sep + ".." + os.sep + "..")) from proj_modules.WiFi import WiFi, make_drive_command, Modes -from unified_frameworks.sensor_array.gps_compass import gps_compass import pathfinder_visualizer import time from straight_shot import StraightShot @@ -97,8 +96,7 @@ def captain_act(get_target_speed): def captain_stop(): command = make_drive_command(speed_percent=0) rover.send_command(command) if config["send_commands_to_rover"] else None - with _command_printer: - print(command) + print(command) def set_goal_coordinates(coordinate: Tuple[int, int]): @@ -124,11 +122,7 @@ def run_captain(is_captain_running): while is_captain_running() and _cur_gps_coordinate is not None: print("Running") - pathfinder.set_goal( - gps_compass.geographic_coordinates_to_relative_coordinates( - *_cur_gps_coordinate - ) - ) + pathfinder.set_gps_goal(*_cur_gps_coordinate) captain_act(_get_target_speed) time.sleep(1 / config["command_frequency"]) @@ -140,7 +134,6 @@ def run_captain(is_captain_running): captain_stop() pathfinder.stop_pathfinder_service() - gps_compass.disconnect() _service = Service(run_captain, "Captain Service") diff --git a/unified_frameworks/rover.py b/unified_frameworks/rover.py new file mode 100644 index 0000000..c31736f --- /dev/null +++ b/unified_frameworks/rover.py @@ -0,0 +1,19 @@ +import time +from bridge import rover_side +from sensor_array.bridge_lidar import BridgeLidar +from sensor_array.gps_compass.bridge_gps import BridgeGPS + +if __name__ == "__main__": + rover_side.service.start_service() + time.sleep(1) + lidar = BridgeLidar() + gps = BridgeGPS() + lidar.connect() + gps.connect() + time.sleep(3) # give time for threads to start + try: + while 1: + print(rover_side.data) + time.sleep(1) + except KeyboardInterrupt: + print("closing rover sensors") diff --git a/unified_frameworks/sensor_array/actual_gps_test.py b/unified_frameworks/sensor_array/actual_gps_test.py deleted file mode 100644 index e2a97b3..0000000 --- a/unified_frameworks/sensor_array/actual_gps_test.py +++ /dev/null @@ -1,15 +0,0 @@ -import sys -import re -root = (next(re.finditer(".*sensor_array", __file__)).group()) -sys.path.append(root) if root not in sys.path else None -from gps_compass.actual_gps_compass import ActualGPSCompass -import time - -if __name__=='__main__': - gps = ActualGPSCompass() - try: - while 1: - print(gps.get_cur_gps(), gps.get_cur_angle()) - time.sleep(1) - except KeyboardInterrupt: - gps.disconnect() diff --git a/unified_frameworks/sensor_array/gps_compass/actual_gps_compass.py b/unified_frameworks/sensor_array/gps_compass/actual_gps_compass.py index 763691f..cc9d01c 100644 --- a/unified_frameworks/sensor_array/gps_compass/actual_gps_compass.py +++ b/unified_frameworks/sensor_array/gps_compass/actual_gps_compass.py @@ -1,13 +1,13 @@ from typing import Tuple -from gps_compass.gps_compass_class import GPSCompass +from sensor_array.gps_compass.gps_compass_class import _GPSCompass import sys import serial.tools.list_ports as port_list import re import os -root = (next(re.finditer(".*unified_frameworks", __file__)).group()) +root = next(re.finditer(".*unified_frameworks", __file__)).group() sys.path.append(root) if root not in sys.path else None -root = os.path.realpath(os.path.join(root, '..')) +root = os.path.realpath(os.path.join(root, "..")) sys.path.append(root) if root not in sys.path else None # import from modules @@ -15,37 +15,44 @@ from proj_modules import LSM303 from threading import Thread -class ActualGPSCompass(GPSCompass): - def __init__(self) -> None: - self.gpsState = True # Keeps track of the reading state of the GPS + +class ActualGPSCompass(_GPSCompass): + def __init__(self, port: str = None) -> None: + self.gpsState = True # Keeps track of the reading state of the GPS self.cur_gps = None port_number = 0 ports = list( filter(lambda port: "USB" not in port.device, port_list.comports()) ) - print("====> Designated Port not found. Using Port:", ports[port_number].device) - port = ports[port_number].device - self.gps = GPS.gpsRead(port, 57600) - while ( - self.gps.get_position() == ["error"] * 2 or self.gps.get_position() is None - ): - print("Port not found, going to next port...") - port_number += 1 + if port in ports: + self.gps = GPS.gpsRead(port, 57600) + else: + print("====> No port specified. Using Port:", ports[port_number].device) port = ports[port_number].device - try: - self.gps = GPS.gpsRead(port, 57600) - break - except: - continue + self.gps = GPS.gpsRead(port, 57600) + while self.gps.get_position() == [ + "error" + ] * 2 or self.gps.get_position() == ["None", "None"]: + print(f"Port {port} not working, going to next port...") + port_number += 1 + port = ports[port_number].device + try: + self.gps = GPS.gpsRead(port, 57600) + break + except: + continue self.cur_gps = self.gps.get_position() + if self.cur_gps is None: + raise Exception("Unable to get proper GPS lock.") self.compass = LSM303.Compass() self.gpsThreadCall = Thread(target=self.read) - self.gpsThreadCall.start() - def get_cur_angle(self) -> float: + """ + Returns the latest known heading, relative to North + """ return self.compass.get_heading() def read(self) -> None: @@ -60,25 +67,32 @@ def read(self) -> None: def get_cur_gps(self) -> Tuple[int, int]: """ - Returns latest GPS coordinates + Returns latest GPS coordinates, in the format (longitude, latitude) """ return self.cur_gps - - def join(self): + + def start_service(self) -> None: + """ + Should be called to start the service. + """ + self.gpsThreadCall.start() + + def stop_service(self) -> None: """ Should be called when we disconnect """ self.gpsState = False self.gpsThreadCall.join() + if __name__ == "__main__": import time + gps = ActualGPSCompass() + gps.start_service() try: while True: - print(gps.gps, gps.angle) + print(gps.get_cur_gps(), gps.get_cur_angle()) time.sleep(1) except KeyboardInterrupt: - gps.disconnect() - gps.join() - + gps.stop_service() diff --git a/unified_frameworks/sensor_array/bridge_gps.py b/unified_frameworks/sensor_array/gps_compass/bridge_gps.py similarity index 58% rename from unified_frameworks/sensor_array/bridge_gps.py rename to unified_frameworks/sensor_array/gps_compass/bridge_gps.py index 68c0fe9..3d5bc9a 100644 --- a/unified_frameworks/sensor_array/bridge_gps.py +++ b/unified_frameworks/sensor_array/gps_compass/bridge_gps.py @@ -1,34 +1,39 @@ -import traceback -try: - from gps_compass.actual_gps_compass import ActualGPSCompass - print("Bridge GPS did import ActualGPSCompass") -except: - print("Bridge GPS did not import ActualGPSCompass") - pass - - +from typing import Tuple import time import sys import re -root = (next(re.finditer(".*unified_frameworks", __file__)).group()) + +root = next(re.finditer(".*unified_frameworks", __file__)).group() sys.path.append(root) if root not in sys.path else None from bridge import rover_side, client_side from bridge.exceptions import NoOpenBridgeException from unified_utils import Service +try: + from sensor_array.gps_compass.actual_gps_compass import ActualGPSCompass +except ImportError as e: + print(e.msg) + print("Assuming this means on client side.") +from sensor_array.gps_compass.gps_compass_class import _GPSCompass + import json config = { - "update_frequency": 2, #Hz + "update_frequency": 2, # Hz } -class BridgeGPS: - PATH = '/gps' + +class BridgeGPS(_GPSCompass): + PATH = "/gps" + def __init__(self) -> None: super().__init__() if rover_side.bridge_is_up(): - assert not client_side.bridge_is_up(), "Both Rover and Client side bridge is up! Whats going on here??" + assert ( + not client_side.bridge_is_up() + ), "Both Rover and Client side bridge is up! Whats going on here??" self.gps = ActualGPSCompass() + self.gps.start_service() self.data = rover_side.data self.ON_ROVER_SIDE = True self.ON_CLIENT_SIDE = not self.ON_ROVER_SIDE @@ -39,54 +44,70 @@ def __init__(self) -> None: self.ON_CLIENT_SIDE = not self.ON_ROVER_SIDE return raise NoOpenBridgeException("Neither Rover nor Client side of bridge is up!") + def connect(self, max_attempts=3, wait_seconds=1, verbose_attempts=False) -> bool: if self.ON_CLIENT_SIDE: for attempt in range(max_attempts): - if BridgeGPS.PATH in self.data: return True + if BridgeGPS.PATH in self.data: + return True time.sleep(wait_seconds) return False # return BridgeLidar.PATH in self.data + def update_gps_data(is_alive): while is_alive(): self.data[BridgeGPS.PATH] = json.dumps( {"gps": self.gps.get_cur_gps(), "angle": self.gps.get_cur_angle()} ) - time.sleep(1/config['update_frequency']) + time.sleep(1 / config["update_frequency"]) + self._service = Service(update_gps_data, "Actual GPS to Bridge") self._service.start_service() return True + def disconnect(self): if self.ON_ROVER_SIDE: self._service.stop_service() - def get_measures(self): - return json.loads(self.data[BridgeGPS.PATH]) if BridgeGPS.PATH in self.data else None + self.gps.stop_service() + + def start_service(self): + pass + def stop_service(self): + pass -if __name__=='__main__': - if sys.argv[1]=='r': + def get_cur_angle(self) -> int: + return json.loads(self.data[BridgeGPS.PATH])["angle"] + + def get_cur_gps(self) -> Tuple[int, int]: + return json.loads(self.data[BridgeGPS.PATH])["gps"] + + +if __name__ == "__main__": + if sys.argv[1] == "r": rover_side.service.start_service() gps = BridgeGPS() gps.connect() + time.sleep(3) # give time for threads to start while True: try: - # time.sleep(2) - print(gps.get_measures()) + print(f"GPS: {gps.get_cur_gps()}, Heading: {gps.get_cur_angle()}") time.sleep(1) except KeyboardInterrupt: print("Keyboard Interrupted") break gps.disconnect() rover_side.service.stop_service() - elif sys.argv[1]=='c': + elif sys.argv[1] == "c": # client_side.service.start_service() - if not client_side.open_bridge(): exit(0) + if not client_side.open_bridge(): + exit(0) gps = BridgeGPS() try: while 1: - print(gps.get_measures()) + print(f"GPS: {gps.get_cur_gps()}, Heading: {gps.get_cur_angle()}") time.sleep(1) except KeyboardInterrupt: gps.disconnect() # client_side.service.stop_service() client_side.close_bridge() - diff --git a/unified_frameworks/sensor_array/gps_compass/fake_gps_compass.py b/unified_frameworks/sensor_array/gps_compass/fake_gps_compass.py index 190812b..a364c36 100644 --- a/unified_frameworks/sensor_array/gps_compass/fake_gps_compass.py +++ b/unified_frameworks/sensor_array/gps_compass/fake_gps_compass.py @@ -1,12 +1,13 @@ from typing import Tuple import sys + root = __file__[: __file__.index("/unified_frameworks")] sys.path.append(root + "/unified_frameworks") -from sensor_array.gps_compass.gps_compass_class import GPSCompass +from sensor_array.gps_compass.gps_compass_class import _GPSCompass import math -class FakeGPSCompass(GPSCompass): +class FakeGPSCompass(_GPSCompass): def __init__(self) -> None: super().__init__() @@ -20,3 +21,9 @@ def geographic_coordinates_to_relative_coordinates( self, target_latitude: float, target_longitude: float ): return (math.pi / 2, 4) # arbitrary values + + def start_service(self): + pass + + def stop_service(self): + pass diff --git a/unified_frameworks/sensor_array/gps_compass/gps_compass.py b/unified_frameworks/sensor_array/gps_compass/gps_compass.py deleted file mode 100644 index 5e450d4..0000000 --- a/unified_frameworks/sensor_array/gps_compass/gps_compass.py +++ /dev/null @@ -1,27 +0,0 @@ -from typing import Tuple -import sys -root = __file__[: __file__.index("/unified_frameworks")] -sys.path.append(root + "/unified_frameworks") -from sensor_array.gps_compass.fake_gps_compass import FakeGPSCompass -from sensor_array.gps_compass.wireless_gps_compass import WirelessGPSCompass - -try: - _gps = WirelessGPSCompass() - print("using wireless") -except Exception as e: - print("exception e", e) - _gps = FakeGPSCompass() - print("using fake") - - -def geographic_coordinates_to_relative_coordinates( - lat: float, long: float -) -> Tuple[float, float]: - while _gps.get_cur_gps() is None: - pass - return _gps.geographic_coordinates_to_relative_coordinates(lat, long) - - -def disconnect(): - if isinstance(_gps, WirelessGPSCompass): - _gps.disconnect() diff --git a/unified_frameworks/sensor_array/gps_compass/gps_compass_class.py b/unified_frameworks/sensor_array/gps_compass/gps_compass_class.py index 534d2a1..07ce504 100644 --- a/unified_frameworks/sensor_array/gps_compass/gps_compass_class.py +++ b/unified_frameworks/sensor_array/gps_compass/gps_compass_class.py @@ -1,6 +1,7 @@ from abc import ABC, abstractmethod from typing import Tuple, Union import math +from unified_utils import _Abstract_Service class Util: @@ -43,13 +44,13 @@ def get_distance( def get_bearing(current_GPS, target_GPS): """Returns the angle between two GPS coordinates - + PARAMS: current_GPS (tuple): (latitude, longitude) target_GPS (tuple): (latitude, longitude) RETURNS: float. angle between the two coordinates - + """ try: current_latitude = math.radians(current_GPS[1]) @@ -72,7 +73,7 @@ def get_bearing(current_GPS, target_GPS): print("No GPS Data") -class GPSCompass(ABC): +class _GPSCompass(_Abstract_Service): @abstractmethod def get_cur_angle(self) -> int: """ diff --git a/unified_frameworks/sensor_array/gps_compass/wireless_gps_compass.py b/unified_frameworks/sensor_array/gps_compass/wireless_gps_compass.py deleted file mode 100644 index 8c7c226..0000000 --- a/unified_frameworks/sensor_array/gps_compass/wireless_gps_compass.py +++ /dev/null @@ -1,94 +0,0 @@ -import sys -import re -from typing import Tuple - -root = next(re.finditer(".*unified_frameworks", __file__)).group() -sys.path.append(root) if root not in sys.path else None -from sensor_array.gps_compass.gps_compass_class import GPSCompass -import asyncio -import websockets -from threading import Thread -import json -import time - -wifi_address = { - "robotics_wifi": "ws://192.168.1.130:8777", - "tp_link": "ws://192.168.0.100:8777", -} - - -class WirelessGPSCompass(GPSCompass): - def __init__(self, uri=wifi_address["robotics_wifi"]): - self.uri = uri - self.gps = None - self.angle = None - - self.connect() - - def connect(self, max_attempts=3, verbose_attempts=False) -> bool: - self.stay_connected = True - self.connected = False - self.verbose_attempts = verbose_attempts - self.thread = Thread(target=self.start_connection) - self.thread.start() - for i in range(max_attempts): - if self.connected: - return True - time.sleep(1) - self.disconnect() - return False - - async def receive_data(self): - while self.stay_connected: - try: - print(f"Trying to connect to websocket at uri `{self.uri}`") - async with websockets.connect(self.uri) as websocket: - self.connected = True - while self.stay_connected: - data = await asyncio.wait_for(websocket.recv(), timeout=5) - data = json.loads(data) - - # store results - self.gps = data["gps"] - self.angle = data["angle"] - - # in case of not connected - self.connected = False - except asyncio.TimeoutError: - self.connected = False - print("GPS update timed out") - time.sleep(1) - print("Trying again") - except websockets.exceptions.ConnectionClosedOK: - self.connected = False - print("Connection closed") - time.sleep(1) - print("Trying again") - - def start_connection(self): - # asyncio.set_event_loop(asyncio.new_event_loop()) - asyncio.get_event_loop().run_until_complete(self.receive_data()) - # asyncio.ru) - - def disconnect(self): - self.stay_connected = False - for _ in range(10): - if self.connected: - print("still connected waiting to disconnect") - time.sleep(1) - - def get_cur_angle(self) -> int: - return self.angle - - def get_cur_gps(self) -> Tuple[int, int]: - return self.gps - - -if __name__ == "__main__": - gps = WirelessGPSCompass() - try: - while 1: - print(gps.gps, gps.angle) - time.sleep(1) - except KeyboardInterrupt: - gps.disconnect() diff --git a/unified_frameworks/sensor_array/server_gps.py b/unified_frameworks/sensor_array/server_gps.py deleted file mode 100644 index 74ef444..0000000 --- a/unified_frameworks/sensor_array/server_gps.py +++ /dev/null @@ -1,56 +0,0 @@ -import websockets -import asyncio -import json -from gps_compass.actual_gps_compass import ActualGPSCompass - -# gps = ActualGPSCompass() # Commented this cuz it should not run if the file is not __main__ -print("successfully made actual gps") - -clients = [] # @note List of clients connected in the server -buffer = None - - -# Creating WebSocket server -# @note Server listens for ConnectionClosedOK -# @note Sends data from the server to the client via websockets -# @note checks if the lidar we are sending data from is either FakeLidar or ActualLidar -# @note If current lidar is FakeLidar then we utilize NumpyEncoder. -async def sendServerDataToClient(websocket): - clients.append(websocket) - print("[SERVER] Client has connected to the server") - # await websocket.send(json.dumps("[SERVER] You have connected to the server!")) - # await asyncio.sleep(0.01) - isConnectedStill = True - - try: - while isConnectedStill: - buffer = json.dumps( - {"gps": gps.get_cur_gps(), "angle": gps.get_cur_angle()} - ) - - if len(clients) > 0: - await websocket.send(buffer) - await asyncio.sleep(0.2) - - except websockets.exceptions.ConnectionClosedOK: - print("[SERVER] Client disconnected from server!") - # await websocket.send(json.dumps("[SERVER] You have disconnected from the server")) - # await asyncio.sleep(0.01) - clients.remove(websocket) - except websockets.ConnectionClosedError: - print("[SERVER] Internal Server Error.") - # await websocket.send(json.dumps("[SERVER] Internal Server error has occurred!")) - # await asyncio.sleep(0.01) - - -async def startServer(): - async with websockets.serve(sendServerDataToClient, "0.0.0.0", 8777): - await asyncio.Future() # run forever - - -if __name__ == "__main__": - try: - print("[SERVER] Server ON") - asyncio.run(startServer()) - except KeyboardInterrupt: - print("[SERVER] Keyboard Interrupt occurred!") diff --git a/unified_frameworks/worldview.py b/unified_frameworks/worldview.py index abaff0c..92e8af5 100644 --- a/unified_frameworks/worldview.py +++ b/unified_frameworks/worldview.py @@ -3,6 +3,7 @@ and unifying them into a single worldview for everything else to access """ + import time from threading import Thread from math import pi @@ -10,61 +11,75 @@ import sensor_array.lidar import bridge.client_side import importlib +from sensor_array.gps_compass.bridge_gps import BridgeGPS +from typing import Tuple +from bridge import client_side + importlib.reload(bridge.client_side) importlib.reload(sensor_array.lidar) lidar = sensor_array.lidar config = { - "update_frequency": 20, #Hz + "update_frequency": 20, # Hz "service_event_verbose": True, - "rover_body": [(-pi/2, 1), (pi, 0.5), (0, 0.5)], - "verbose": True + "rover_body": [(-pi / 2, 1), (pi, 0.5), (0, 0.5)], + "verbose": True, } + class Worldview(_Abstract_Service): def __init__(self) -> None: - """Set up all the sensors to feed their data into this worldview. - - """ + """Set up all the sensors to feed their data into this worldview.""" super().__init__() - if config['verbose']: print("Initializing Worldview") + client_side.service.start_service() + time.sleep(3) + if config["verbose"]: + print("Initializing Worldview") self._lidar = lidar.Lidar() - + self._gps = BridgeGPS() + def start_service(self): - """Start sensors and start unifying data from all sensors - - """ - if config['verbose']: print("Starting worldview service") + """Start sensors and start unifying data from all sensors""" + if config["verbose"]: + print("Starting worldview service") self._lidar.start_service() + def stop_service(self): - """Stop unifying data and stop all sensors - - """ - if config['verbose']: print("Stopping worldview service") + """Stop unifying data and stop all sensors""" + if config["verbose"]: + print("Stopping worldview service") self._lidar.stop_service() - + self._gps.disconnect() + client_side.service.stop_service() + def get_obstacles(self): - """Get a list of obstacles where each obstacle is a point cloud of the + """Get a list of obstacles where each obstacle is a point cloud of the obstacle - + """ return self._lidar.get_obstacles() - + def get_points(self): - """Get a point cloud of the environment. The point cloud is a list of + """Get a point cloud of the environment. The point cloud is a list of ``(radians, meters)`` - + """ return sum(self._lidar.get_point_clouds(), []) - + def get_rover_body(self): - """Get the coordinates of the rovers body - - """ + """Get the coordinates of the rovers body""" return config["rover_body"] -if __name__=="__main__": + def geographic_coordinates_to_relative_coordinates( + self, target_latitude: float, target_longitude: float + ) -> Tuple[float, float]: + return self._gps.geographic_coordinates_to_relative_coordinates( + target_latitude, target_longitude + ) + + +if __name__ == "__main__": w = Worldview() w.start_service() time.sleep(10)