Skip to content
This repository has been archived by the owner on May 4, 2024. It is now read-only.

Commit

Permalink
Merge branch 'dev' into eliot/group-modules
Browse files Browse the repository at this point in the history
  • Loading branch information
chrehall68 committed Apr 13, 2024
2 parents 89191c5 + 04216ea commit 9d2459c
Show file tree
Hide file tree
Showing 13 changed files with 185 additions and 287 deletions.
8 changes: 8 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,11 @@ pip install -r env_files/rover_requirements.txt
```

![classes](https://github.com/SJSURoboticsTeam/urc-intelligent-systems-2023/assets/50222631/3d47c3d4-b21a-463e-9739-bc99b61f450f)

## Running

To get the mission started, first run `python3 rover.py` from within the `unified_frameworks` directory on the Raspberry Pi. This
starts up the bridge on the rover side and gets all necessary sensors running.

From there, you can run `python3 captain.py` from within the `unified_frameworks` directory on your local machine
to begin the pathfinding and visualization.
8 changes: 1 addition & 7 deletions unified_frameworks/captain.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
except:
sys.path.append(os.path.realpath(__file__ + os.sep + ".." + os.sep + ".."))
from proj_modules.WiFi import WiFi, make_drive_command, Modes
from unified_frameworks.sensor_array.gps_compass import gps_compass
import pathfinder_visualizer
import time
import unified_frameworks.pathfinders.pathfinder as _pathfinder
Expand Down Expand Up @@ -113,11 +112,7 @@ def run_captain(self, is_captain_running: Callable[[], bool]):
while is_captain_running() and self._cur_gps_coordinate is not None:
print("Running")

self.pathfinder.set_goal(
gps_compass.geographic_coordinates_to_relative_coordinates(
*self._cur_gps_coordinate
)
)
self.pathfinder.set_gps_goal(*self._cur_gps_coordinate)
self.captain_act()
time.sleep(1 / config["command_frequency"])

Expand All @@ -129,7 +124,6 @@ def run_captain(self, is_captain_running: Callable[[], bool]):

self.captain_stop()
self.pathfinder.stop_pathfinder_service()
gps_compass.disconnect()
self.finished = True


Expand Down
14 changes: 13 additions & 1 deletion unified_frameworks/pathfinders/NavigatorClass.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@
import numpy as np
from typing import Tuple
import math
from worldview import Worldview


class Navigator(ABC):
def __init__(self, worldview) -> None:
def __init__(self, worldview: Worldview) -> None:
self.worldview = worldview
self.goal: Tuple[float, float] = (math.pi / 2, 1) # theta, r

Expand Down Expand Up @@ -33,3 +34,14 @@ def get_goal(self):

def distance_to_target(self):
return self.goal[1]

def set_gps_goal(self, target_latitude: float, target_longitude: float) -> None:
"""
Set's the navigator's goal to a given GPS coordinate. Must be called frequently in
order to continually head in the right direction.
"""
self.set_goal(
self.worldview.geographic_coordinates_to_relative_coordinates(
target_latitude, target_longitude
)
)
19 changes: 19 additions & 0 deletions unified_frameworks/rover.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import time
from bridge import rover_side
from sensor_array.bridge_lidar import BridgeLidar
from sensor_array.gps_compass.bridge_gps import BridgeGPS

if __name__ == "__main__":
rover_side.service.start_service()
time.sleep(1)
lidar = BridgeLidar()
gps = BridgeGPS()
lidar.connect()
gps.connect()
time.sleep(3) # give time for threads to start
try:
while 1:
print(rover_side.data)
time.sleep(1)
except KeyboardInterrupt:
print("closing rover sensors")
15 changes: 0 additions & 15 deletions unified_frameworks/sensor_array/actual_gps_test.py

This file was deleted.

70 changes: 42 additions & 28 deletions unified_frameworks/sensor_array/gps_compass/actual_gps_compass.py
Original file line number Diff line number Diff line change
@@ -1,51 +1,58 @@
from typing import Tuple
from gps_compass.gps_compass_class import GPSCompass
from sensor_array.gps_compass.gps_compass_class import _GPSCompass
import sys
import serial.tools.list_ports as port_list
import re
import os

root = (next(re.finditer(".*unified_frameworks", __file__)).group())
root = next(re.finditer(".*unified_frameworks", __file__)).group()
sys.path.append(root) if root not in sys.path else None
root = os.path.realpath(os.path.join(root, '..'))
root = os.path.realpath(os.path.join(root, ".."))
sys.path.append(root) if root not in sys.path else None

# import from modules
from proj_modules import GPS
from proj_modules import LSM303
from threading import Thread

class ActualGPSCompass(GPSCompass):
def __init__(self) -> None:
self.gpsState = True # Keeps track of the reading state of the GPS

class ActualGPSCompass(_GPSCompass):
def __init__(self, port: str = None) -> None:
self.gpsState = True # Keeps track of the reading state of the GPS
self.cur_gps = None
port_number = 0
ports = list(
filter(lambda port: "USB" not in port.device, port_list.comports())
)
print("====> Designated Port not found. Using Port:", ports[port_number].device)
port = ports[port_number].device
self.gps = GPS.gpsRead(port, 57600)
while (
self.gps.get_position() == ["error"] * 2 or self.gps.get_position() is None
):
print("Port not found, going to next port...")
port_number += 1
if port in ports:
self.gps = GPS.gpsRead(port, 57600)
else:
print("====> No port specified. Using Port:", ports[port_number].device)
port = ports[port_number].device
try:
self.gps = GPS.gpsRead(port, 57600)
break
except:
continue
self.gps = GPS.gpsRead(port, 57600)
while self.gps.get_position() == [
"error"
] * 2 or self.gps.get_position() == ["None", "None"]:
print(f"Port {port} not working, going to next port...")
port_number += 1
port = ports[port_number].device
try:
self.gps = GPS.gpsRead(port, 57600)
break
except:
continue
self.cur_gps = self.gps.get_position()
if self.cur_gps is None:
raise Exception("Unable to get proper GPS lock.")

self.compass = LSM303.Compass()

self.gpsThreadCall = Thread(target=self.read)
self.gpsThreadCall.start()


def get_cur_angle(self) -> float:
"""
Returns the latest known heading, relative to North
"""
return self.compass.get_heading()

def read(self) -> None:
Expand All @@ -60,25 +67,32 @@ def read(self) -> None:

def get_cur_gps(self) -> Tuple[int, int]:
"""
Returns latest GPS coordinates
Returns latest GPS coordinates, in the format (longitude, latitude)
"""
return self.cur_gps

def join(self):

def start_service(self) -> None:
"""
Should be called to start the service.
"""
self.gpsThreadCall.start()

def stop_service(self) -> None:
"""
Should be called when we disconnect
"""
self.gpsState = False
self.gpsThreadCall.join()


if __name__ == "__main__":
import time

gps = ActualGPSCompass()
gps.start_service()
try:
while True:
print(gps.gps, gps.angle)
print(gps.get_cur_gps(), gps.get_cur_angle())
time.sleep(1)
except KeyboardInterrupt:
gps.disconnect()
gps.join()

gps.stop_service()
Original file line number Diff line number Diff line change
@@ -1,34 +1,39 @@
import traceback
try:
from gps_compass.actual_gps_compass import ActualGPSCompass
print("Bridge GPS did import ActualGPSCompass")
except:
print("Bridge GPS did not import ActualGPSCompass")
pass


from typing import Tuple
import time
import sys
import re
root = (next(re.finditer(".*unified_frameworks", __file__)).group())

root = next(re.finditer(".*unified_frameworks", __file__)).group()
sys.path.append(root) if root not in sys.path else None
from bridge import rover_side, client_side
from bridge.exceptions import NoOpenBridgeException
from unified_utils import Service

try:
from sensor_array.gps_compass.actual_gps_compass import ActualGPSCompass
except ImportError as e:
print(e.msg)
print("Assuming this means on client side.")
from sensor_array.gps_compass.gps_compass_class import _GPSCompass

import json

config = {
"update_frequency": 2, #Hz
"update_frequency": 2, # Hz
}

class BridgeGPS:
PATH = '/gps'

class BridgeGPS(_GPSCompass):
PATH = "/gps"

def __init__(self) -> None:
super().__init__()
if rover_side.bridge_is_up():
assert not client_side.bridge_is_up(), "Both Rover and Client side bridge is up! Whats going on here??"
assert (
not client_side.bridge_is_up()
), "Both Rover and Client side bridge is up! Whats going on here??"
self.gps = ActualGPSCompass()
self.gps.start_service()
self.data = rover_side.data
self.ON_ROVER_SIDE = True
self.ON_CLIENT_SIDE = not self.ON_ROVER_SIDE
Expand All @@ -39,54 +44,70 @@ def __init__(self) -> None:
self.ON_CLIENT_SIDE = not self.ON_ROVER_SIDE
return
raise NoOpenBridgeException("Neither Rover nor Client side of bridge is up!")

def connect(self, max_attempts=3, wait_seconds=1, verbose_attempts=False) -> bool:
if self.ON_CLIENT_SIDE:
for attempt in range(max_attempts):
if BridgeGPS.PATH in self.data: return True
if BridgeGPS.PATH in self.data:
return True
time.sleep(wait_seconds)
return False
# return BridgeLidar.PATH in self.data

def update_gps_data(is_alive):
while is_alive():
self.data[BridgeGPS.PATH] = json.dumps(
{"gps": self.gps.get_cur_gps(), "angle": self.gps.get_cur_angle()}
)
time.sleep(1/config['update_frequency'])
time.sleep(1 / config["update_frequency"])

self._service = Service(update_gps_data, "Actual GPS to Bridge")
self._service.start_service()
return True

def disconnect(self):
if self.ON_ROVER_SIDE:
self._service.stop_service()
def get_measures(self):
return json.loads(self.data[BridgeGPS.PATH]) if BridgeGPS.PATH in self.data else None
self.gps.stop_service()

def start_service(self):
pass

def stop_service(self):
pass

if __name__=='__main__':
if sys.argv[1]=='r':
def get_cur_angle(self) -> int:
return json.loads(self.data[BridgeGPS.PATH])["angle"]

def get_cur_gps(self) -> Tuple[int, int]:
return json.loads(self.data[BridgeGPS.PATH])["gps"]


if __name__ == "__main__":
if sys.argv[1] == "r":
rover_side.service.start_service()
gps = BridgeGPS()
gps.connect()
time.sleep(3) # give time for threads to start
while True:
try:
# time.sleep(2)
print(gps.get_measures())
print(f"GPS: {gps.get_cur_gps()}, Heading: {gps.get_cur_angle()}")
time.sleep(1)
except KeyboardInterrupt:
print("Keyboard Interrupted")
break
gps.disconnect()
rover_side.service.stop_service()
elif sys.argv[1]=='c':
elif sys.argv[1] == "c":
# client_side.service.start_service()
if not client_side.open_bridge(): exit(0)
if not client_side.open_bridge():
exit(0)
gps = BridgeGPS()
try:
while 1:
print(gps.get_measures())
print(f"GPS: {gps.get_cur_gps()}, Heading: {gps.get_cur_angle()}")
time.sleep(1)
except KeyboardInterrupt:
gps.disconnect()
# client_side.service.stop_service()
client_side.close_bridge()

11 changes: 9 additions & 2 deletions unified_frameworks/sensor_array/gps_compass/fake_gps_compass.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
from typing import Tuple
import sys

root = __file__[: __file__.index("/unified_frameworks")]
sys.path.append(root + "/unified_frameworks")
from sensor_array.gps_compass.gps_compass_class import GPSCompass
from sensor_array.gps_compass.gps_compass_class import _GPSCompass
import math


class FakeGPSCompass(GPSCompass):
class FakeGPSCompass(_GPSCompass):
def __init__(self) -> None:
super().__init__()

Expand All @@ -20,3 +21,9 @@ def geographic_coordinates_to_relative_coordinates(
self, target_latitude: float, target_longitude: float
):
return (math.pi / 2, 4) # arbitrary values

def start_service(self):
pass

def stop_service(self):
pass
Loading

0 comments on commit 9d2459c

Please sign in to comment.