Skip to content

Commit

Permalink
Python formatting tools (#37)
Browse files Browse the repository at this point in the history
* Python formatting tools

* Format new files

* Gitignore

* Pylint submodule ignore

* Github workflow comment

* Remove duplicate by adding command generation

* Tests

* PR fixes
  • Loading branch information
Xierumeng authored and janez45 committed Mar 26, 2024
1 parent ccc3074 commit 01690a3
Show file tree
Hide file tree
Showing 41 changed files with 1,243 additions and 974 deletions.
9 changes: 8 additions & 1 deletion .github/workflows/run-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ jobs:
git submodule update --init --recursive --remote
pip install -r ./modules/common/requirements.txt
# Install pathing dependencies
# Install project dependencies
- name: Install project dependencies
run: |
python -m pip install --upgrade pip
Expand All @@ -40,6 +40,13 @@ jobs:
- name: Install zbar library
run: sudo apt-get install libzbar0

# Run linters and formatters
- name: Linters and formatters
run: |
black --check .
flake8 .
pylint .
# Run tests with PyTest
- name: Run tests
run: pytest -vv
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,4 @@ __pycache__/
venv/

# Logging
*log*
logs/
22 changes: 17 additions & 5 deletions connection_check.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""
For testing MAVLink connection with DroneKit-Python.
"""

import time

import dronekit
Expand Down Expand Up @@ -99,19 +100,30 @@ def read_data(drone: dronekit.Vehicle) -> bool:
print("Next waypoint index: " + str(command_sequence.next))


if __name__ == "__main__":
def main() -> int:
"""
Main function.
"""
# TODO: Fails when wait_ready=True, debugging why this is
# wait_ready=False is dangerous
drone = dronekit.connect(CONNECTION_ADDRESS, wait_ready=False)
dronekit_vehicle = dronekit.connect(CONNECTION_ADDRESS, wait_ready=False)

if WRITE_TEST:
write_test_mission(drone)
write_test_mission(dronekit_vehicle)

if READ_TEST:
for _ in range(0, 40):
read_data(drone)
read_data(dronekit_vehicle)
time.sleep(0.5)

drone.close()
dronekit_vehicle.close()

return 0


if __name__ == "__main__":
result_main = main()
if result_main < 0:
print(f"ERROR: Status code: {result_main}")

print("Done!")
47 changes: 7 additions & 40 deletions modules/add_takeoff_and_landing_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,12 @@

import dronekit

from . import generate_command

MAVLINK_TAKEOFF_FRAME = dronekit.mavutil.mavlink.MAV_FRAME_GLOBAL_RELATIVE_ALT
MAVLINK_LANDING_FRAME = dronekit.mavutil.mavlink.MAV_FRAME_GLOBAL
MAVLINK_TAKEOFF_COMMAND = dronekit.mavutil.mavlink.MAV_CMD_NAV_TAKEOFF
MAVLINK_LANDING_COMMAND = dronekit.mavutil.mavlink.MAV_CMD_NAV_LAND


def add_takeoff_and_landing_command(commands: "list[dronekit.Command]",
altitude: float) -> "tuple[bool, list[dronekit.Command] | None]":
def add_takeoff_and_landing_command(
commands: "list[dronekit.Command]", altitude: float
) -> "tuple[bool, list[dronekit.Command] | None]":
"""
Prepends a takeoff command and appends a landing command to a list of dronekit commands.
Expand All @@ -25,47 +22,17 @@ def add_takeoff_and_landing_command(commands: "list[dronekit.Command]",
Returns
-------
tuple[bool, list[dronekit.Command] | None]:
tuple[bool, list[dronekit.Command] | None]:
(False, None) if empty commands list,
(True, dronekit commands with takeoff and land commands that can be sent to the drone) otherwise.
"""
if len(commands) == 0:
return False, None

takeoff_command = dronekit.Command(
0,
0,
0,
MAVLINK_TAKEOFF_FRAME,
MAVLINK_TAKEOFF_COMMAND,
0,
0,
0, # param1
0,
0,
0,
0,
0,
altitude,
)
takeoff_command = generate_command.takeoff(altitude)
commands.insert(0, takeoff_command)

landing_command = dronekit.Command(
0,
0,
0,
MAVLINK_LANDING_FRAME,
MAVLINK_LANDING_COMMAND,
0,
0,
0, # param1
0,
0,
0,
0,
0,
0,
)
landing_command = generate_command.landing()
commands.append(landing_command)

return True, commands
47 changes: 7 additions & 40 deletions modules/add_takeoff_and_rtl_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,12 @@

import dronekit

from . import generate_command

MAVLINK_TAKEOFF_FRAME = dronekit.mavutil.mavlink.MAV_FRAME_GLOBAL_RELATIVE_ALT
MAVLINK_RTL_FRAME = dronekit.mavutil.mavlink.MAV_FRAME_GLOBAL
MAVLINK_TAKEOFF_COMMAND = dronekit.mavutil.mavlink.MAV_CMD_NAV_TAKEOFF
MAVLINK_RTL_COMMAND = dronekit.mavutil.mavlink.MAV_CMD_NAV_RETURN_TO_LAUNCH


def add_takeoff_and_rtl_command(commands: "list[dronekit.Command]",
altitude: float) -> "tuple[bool, list[dronekit.Command] | None]":
def add_takeoff_and_rtl_command(
commands: "list[dronekit.Command]", altitude: float
) -> "tuple[bool, list[dronekit.Command] | None]":
"""
Prepends a takeoff command and appends a RTL command to a list of dronekit commands.
Expand All @@ -25,47 +22,17 @@ def add_takeoff_and_rtl_command(commands: "list[dronekit.Command]",
Returns
-------
tuple[bool, list[dronekit.Command] | None]:
tuple[bool, list[dronekit.Command] | None]:
(False, None) if empty commands list,
(True, dronekit commands with takeoff and land commands that can be sent to the drone) otherwise.
"""
if len(commands) == 0:
return False, None

takeoff_command = dronekit.Command(
0,
0,
0,
MAVLINK_TAKEOFF_FRAME,
MAVLINK_TAKEOFF_COMMAND,
0,
0,
0, # param1
0,
0,
0,
0,
0,
altitude,
)
takeoff_command = generate_command.takeoff(altitude)
commands.insert(0, takeoff_command)

rtl_command = dronekit.Command(
0,
0,
0,
MAVLINK_RTL_FRAME,
MAVLINK_RTL_COMMAND,
0,
0,
0, # param1
0,
0,
0,
0,
0,
0,
)
rtl_command = generate_command.return_to_launch()
commands.append(rtl_command)

return True, commands
36 changes: 10 additions & 26 deletions modules/check_stop_condition.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,16 @@

import dronekit

from . import generate_command
from . import upload_commands


MAVLINK_RTL_FRAME = dronekit.mavutil.mavlink.MAV_FRAME_GLOBAL
MAVLINK_RTL_COMMAND = dronekit.mavutil.mavlink.MAV_CMD_NAV_RETURN_TO_LAUNCH
DRONE_TIMEOUT = 30.0 # seconds


def check_stop_condition(start_time: float,
current_time: float,
drone: dronekit.Vehicle,
maximum_flight_time: float) -> bool:
def check_stop_condition(
start_time: float, current_time: float, drone: dronekit.Vehicle, maximum_flight_time: float
) -> bool:
"""
Check if drone exceeds the maximum flight time limit and replace with new mission of returning to launch.
Expand All @@ -27,35 +26,20 @@ def check_stop_condition(start_time: float,
drone: dronekit.Vehicle
The connected drone.
maximum_flight_time: float
Max flight time for drone in seconds.
Max flight time for drone in seconds.
Returns
-------
bool: True if max flight time was exceeded, False otherwise.
"""
if current_time - start_time < maximum_flight_time:
return False

rtl_command = dronekit.Command(
0,
0,
0,
MAVLINK_RTL_FRAME,
MAVLINK_RTL_COMMAND,
0,
0,
0, # param1
0,
0,
0,
0,
0,
0,
)
rtl_command = generate_command.return_to_launch()

# Invoke upload_commands to clear previous commands and direct drone back to launch location
result = upload_commands.upload_commands(drone, [rtl_command])
result = upload_commands.upload_commands(drone, [rtl_command], DRONE_TIMEOUT)
if not result:
print("Unable to upload rtl command to drone command sequence.")
print("Unable to upload RTL command to drone command sequence.")

return True
10 changes: 7 additions & 3 deletions modules/diversion_qr_to_waypoint_list.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""
Function to parse diversion QR string into a list of vertices and a rejoin waypoint.
"""

import re


Expand All @@ -16,7 +17,7 @@ def diversion_qr_to_waypoint_list(qr_text: str) -> "tuple[bool, tuple[list[str],
Returns
-------
tuple[bool, tuple[list[Waypoint], Waypoint] | None]
tuple[bool, tuple[list[Waypoint], Waypoint] | None]
Returns False, None if the string is invalid or if the list is empty.
"""
# Check if string is of valid form
Expand All @@ -25,11 +26,14 @@ def diversion_qr_to_waypoint_list(qr_text: str) -> "tuple[bool, tuple[list[str],
return False, None

# Separate qr_text into diversion_waypoints and rejoin_waypoint strings
diversion_waypoints_string, sep, rejoin_waypoint_string = qr_text.partition('.')
diversion_waypoints_string, _, rejoin_waypoint_string = qr_text.partition(".")

# Extract semicolon-separated list of diversion waypoint names without leading/trailing whitespace
diversion_waypoints = [
text.strip(" ") for text in diversion_waypoints_string.replace("Avoid the area bounded by:", "", 1).split(";")
text.strip(" ")
for text in diversion_waypoints_string.replace("Avoid the area bounded by:", "", 1).split(
";"
)
]

# Remove remaining cases of empty names
Expand Down
Loading

0 comments on commit 01690a3

Please sign in to comment.