Skip to content

Commit

Permalink
feat: initial solution to treat non-straight smartRTH as a separate f…
Browse files Browse the repository at this point in the history
…ormation
  • Loading branch information
vasarhelyi committed Aug 29, 2023
1 parent 1fe47d9 commit 460ce52
Show file tree
Hide file tree
Showing 3 changed files with 231 additions and 38 deletions.
70 changes: 68 additions & 2 deletions src/modules/sbstudio/api/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

from .constants import COMMUNITY_SERVER_URL
from .errors import SkybrushStudioAPIError
from .types import Mapping, TransitionPlan
from .types import Mapping, SmartRTHPlan, TransitionPlan

__all__ = ("SkybrushStudioAPI",)

Expand Down Expand Up @@ -515,6 +515,72 @@ def plan_landing(

return result["start_times"], result["durations"]

def plan_smart_rth(
self,
source: Sequence[Coordinate3D],
target: Sequence[Coordinate3D],
*,
max_velocity_xy: float,
max_velocity_z: float,
max_acceleration: float,
min_distance: float,
rth_model: str = "straight_line_with_neck",
) -> SmartRTHPlan:
"""Proposes starting times for predefined smart RTH procedures
between source and target with the given parameters.
Parameters:
source: the list of source points
target: the list of target points
max_velocity_xy: maximum allowed velocity in the XY plane
max_velocity_z: maximum allowed velocity along the Z axis
max_acceleration: maximum allowed acceleration
min_distance: minimum distance to maintain during the transition
rth_model: the smart RTH model to use when matching source points
to target points; see the server documentation for more details.
"""
if not source or not target:
return SmartRTHPlan.empty()

data = {
"version": 1,
"source": source,
"target": target,
"max_velocity_xy": max_velocity_xy,
"max_velocity_z": max_velocity_z,
"max_acceleration": max_acceleration,
"min_distance": min_distance,
"rth_model": rth_model,
}

with self._send_request("operations/plan-smart-rth", data) as response:
result = response.as_json()

if result.get("version") != 1:
raise SkybrushStudioAPIError("invalid response version")

start_times = result.get("start_times")
if start_times is None:
raise SkybrushStudioAPIError(
"invalid response format, start times are missing"
)
durations = result.get("durations")
if durations is None:
raise SkybrushStudioAPIError(
"invalid response format, durations are missing"
)
inner_points = result.get("inner_points")
if inner_points is None:
raise SkybrushStudioAPIError(
"invalid response format, inner points are missing"
)

return SmartRTHPlan(
start_times=list(start_times),
durations=list(durations),
inner_points=list(inner_points),
)

def plan_transition(
self,
source: Sequence[Coordinate3D],
Expand Down Expand Up @@ -543,7 +609,7 @@ def plan_transition(
downwards
max_acceleration: maximum allowed acceleration
matching_method: the algorithm to use when matching source points
to target points; see the server documentation fof more details.
to target points; see the server documentation for more details.
"""
if not source or not target:
return TransitionPlan.empty()
Expand Down
36 changes: 31 additions & 5 deletions src/modules/sbstudio/api/types.py
Original file line number Diff line number Diff line change
@@ -1,28 +1,54 @@
from dataclasses import dataclass, field
from typing import List, Optional
from typing import Optional

Mapping = List[Optional[int]]
Mapping = list[Optional[int]]
"""Type alias for mappings from drone indices to the corresponding target
marker indices.
"""


@dataclass
class SmartRTHPlan:
"""Response returned from a "plan smart RTH" API request."""

start_times: list[float] = field(default_factory=list)
"""The computed start times where the i-th item of the list contains the
start time of the drone that will travel to the i-th target point [s].
"""

durations: list[float] = field(default_factory=list)
"""The computed smart RTH durations where the i-th item of the list
contains the travel time of the drone that is assigned to the i-th
target point [s].
"""

inner_points: list[list[list[float]]] = field(default_factory=list)
"""The inner points of the smart RTH transition if it is not a
straight line transition, where the i-th item of the main list
contains the inner points of the drone that is assigned to the i-th
target point. Points are represented in [t, x, y, z] order."""

@classmethod
def empty(cls):
return cls(start_times=[], durations=[], inner_points=[])


@dataclass
class TransitionPlan:
"""Response returned from a "plan transition" API request."""

start_times: List[float] = field(default_factory=list)
start_times: list[float] = field(default_factory=list)
"""The computed start times where the i-th item of the list contains the
start time of the drone that will travel to the i-th target point.
"""

durations: List[float] = field(default_factory=list)
durations: list[float] = field(default_factory=list)
"""The computed transition durations where the i-th item of the list
contains the travel time of the drone that is assigned to the i-th
target point.
"""

mapping: Optional[List[Optional[int]]] = None
mapping: Optional[list[Optional[int]]] = None
"""The computed matching where the i-th item of the list contains the
index of the source point that the i-th target point is mapped to, or
``None`` if the given target point is left unmatched. Omitted if the
Expand Down
163 changes: 132 additions & 31 deletions src/modules/sbstudio/plugin/operators/return_to_home.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,18 @@
import bpy

from bpy.props import FloatProperty, IntProperty
from bpy.props import FloatProperty, IntProperty, BoolProperty
from bpy.types import Context
from functools import partial
from math import ceil, sqrt

from sbstudio.plugin.api import get_api
from sbstudio.plugin.constants import Collections
from sbstudio.plugin.model.formation import create_formation
from sbstudio.plugin.actions import (
ensure_action_exists_for_object,
find_f_curve_for_data_path_and_index,
)
from sbstudio.plugin.model.formation import create_formation, get_markers_from_formation

from sbstudio.plugin.model.safety_check import get_proximity_warning_threshold
from sbstudio.plugin.utils.evaluator import create_position_evaluator

Expand Down Expand Up @@ -53,14 +60,24 @@ class ReturnToHomeOperator(StoryboardOperator):
description=(
"Specifies the difference between altitudes of landing layers "
"for multi-phase landings when multiple drones occupy the same "
"slot within safety distance."
"slot within safety distance"
),
default=5,
soft_min=0,
soft_max=50,
unit="LENGTH",
)

use_smart_rth = BoolProperty(
name="Use smart RTH",
description=(
"Enable the smart return to home function that ensures that "
"all drones return to their own home position with an optimal "
"collision free smart transition"
),
default=False,
)

@classmethod
def poll(cls, context):
if not super().poll(context):
Expand Down Expand Up @@ -95,37 +112,121 @@ def _run(self, storyboard, *, context) -> bool:
drones,
frame=first_frame,
base_altitude=self.altitude,
layer_height=self.altitude_shift,
layer_height=self.altitude_shift if self.use_smart_rth is False else 0,
min_distance=get_proximity_warning_threshold(context),
)

diffs = [
sqrt((s[0] - t[0]) ** 2 + (s[1] - t[1]) ** 2 + (s[2] - t[2]) ** 2)
for s, t in zip(source, target)
]

# Calculate RTH duration from max distance to travel and the
# average velocity
max_distance = max(diffs)
fps = context.scene.render.fps
rth_duration = ceil((max_distance / self.velocity) * fps)

# Extend the duration of the last formation to the frame where we want
# to start the RTH maneuver
if len(storyboard.entries) > 0:
storyboard.last_entry.extend_until(self.start_frame)

# Calculate when the RTH should end
end_of_rth = self.start_frame + rth_duration

# Add a new storyboard entry with the given formation
storyboard.add_new_entry(
formation=create_formation("Return to home", target),
frame_start=end_of_rth,
duration=0,
select=True,
context=context,
)

if self.use_smart_rth:
# Set up safety parameters
safety_check = getattr(context.scene.skybrush, "safety_check", None)
settings = getattr(context.scene.skybrush, "settings", None)
max_velocity_xy = self.velocity
max_velocity_z = (
safety_check.velocity_z_warning_threshold
if safety_check
else min(self.velocity, 2)
)
max_acceleration = settings.max_acceleration if settings else 4

# call api to create smart RTH plan
plan = get_api().plan_smart_rth(
source,
target,
max_velocity_xy=max_velocity_xy,
max_velocity_z=max_velocity_z,
max_acceleration=max_acceleration,
min_distance=get_proximity_warning_threshold(context),
rth_model="straight_line_with_neck",
)
if not plan.start_times or not plan.durations:
return False

# Add a new storyboard entry for the smart RTH formation
entry = storyboard.add_new_entry(
formation=create_formation("Smart return to home", source),
frame_start=self.start_frame,
duration=int(max(plan.durations) * fps),
select=True,
context=context,
)
assert entry is not None
markers = get_markers_from_formation(entry.formation)

# generate smart RTH trajectories in the new formation
for start_time, duration, inner_points, p, q, marker in zip(
plan.start_times,
plan.durations,
plan.inner_points,
source,
target,
markers,
strict=True,
):
action = ensure_action_exists_for_object(
marker, name=f"Animation data for {marker.name}"
)
f_curves = []
for i in range(3):
f_curve = find_f_curve_for_data_path_and_index(
action, "location", i
)
if f_curve is None:
f_curve = action.fcurves.new("location", index=i)
else:
# We should clear the keyframes that fall within the
# range of our keyframes. Currently it's not needed because
# it's a freshly created marker so it can't have any
# keyframes that we don't know about.
pass
f_curves.append(f_curve)
insert = [
partial(f_curve.keyframe_points.insert, options={"FAST"})
for f_curve in f_curves
]
for point in (
[[0, *p], [start_time, *p]]
+ inner_points
+ [[start_time + duration, *q]]
):
frame = int(self.start_frame + point[0] * fps)
keyframes = (
insert[0](frame, point[1]),
insert[1](frame, point[2]),
insert[2](frame, point[3]),
)
for keyframe in keyframes:
keyframe.interpolation = "LINEAR"
# Commit the insertions that we've made in "fast" mode
for f_curve in f_curves:
f_curve.update()
else:
diffs = [
sqrt((s[0] - t[0]) ** 2 + (s[1] - t[1]) ** 2 + (s[2] - t[2]) ** 2)
for s, t in zip(source, target)
]

# Calculate RTH duration from max distance to travel and the
# average velocity
max_distance = max(diffs)
rth_duration = ceil((max_distance / self.velocity) * fps)

# Extend the duration of the last formation to the frame where we want
# to start the RTH maneuver
if len(storyboard.entries) > 0:
storyboard.last_entry.extend_until(self.start_frame)

# Calculate when the RTH should end
end_of_rth = self.start_frame + rth_duration

# Add a new storyboard entry with the given formation
storyboard.add_new_entry(
formation=create_formation("Return to home", target),
frame_start=end_of_rth,
duration=0,
select=True,
context=context,
)

# Recalculate the transition leading to the target formation
bpy.ops.skybrush.recalculate_transitions(scope="TO_SELECTED")
Expand Down

0 comments on commit 460ce52

Please sign in to comment.