Skip to content

Commit

Permalink
Linear interpolation for explicit shooting (#1109)
Browse files Browse the repository at this point in the history
* Added a new control interpolation method for explicit shooting

* changed the new method to instead use cubic spline interpolation

* Added derivatives for polynomial controls to get control rates

* added ability to select the control interpolation used when defining a simulation phase
  • Loading branch information
kaushikponnapalli authored Sep 18, 2024
1 parent 5408b12 commit dd09fc2
Show file tree
Hide file tree
Showing 7 changed files with 250 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,7 @@ def _test_integrate_control_rate2(self, transcription):

# Set the driver.
p.driver = om.pyOptSparseDriver(optimizer='SLSQP')
p.driver.opt_settings['ACC'] = 1e-9

# Allow OpenMDAO to automatically determine our sparsity pattern.
# Doing so can significant speed up the execution of Dymos.
Expand All @@ -317,7 +318,8 @@ def _test_integrate_control_rate2(self, transcription):
phase.set_control_val('theta', [0, 100], units='deg')

# Run the driver to solve the problem
dm.run_problem(p, simulate=True, make_plots=False, simulate_kwargs={'atol': 1.0E-9, 'rtol': 1.0E-9})
dm.run_problem(p, simulate=True, make_plots=False, simulate_kwargs={'atol': 1.0E-9, 'rtol': 1.0E-9,
'times_per_seg': 10})

sol_db = 'dymos_solution.db'
sim_db = 'dymos_simulation.db'
Expand Down Expand Up @@ -346,13 +348,14 @@ def _test_integrate_control_rate2(self, transcription):
theta_sol = sol.get_val('traj.phase0.timeseries.theta')
theta_sim = sim.get_val('traj.phase0.timeseries.theta')

assert_timeseries_near_equal(t_sol, x_sol, t_sim, x_sim, rel_tolerance=4.0E-3, abs_tolerance=1.0E-2)
assert_timeseries_near_equal(t_sol, y_sol, t_sim, y_sim, rel_tolerance=4.0E-3, abs_tolerance=1.0E-2)
assert_timeseries_near_equal(t_sol, v_sol, t_sim, v_sim, rel_tolerance=4.0E-3, abs_tolerance=1.0E-2)
assert_timeseries_near_equal(t_sol, int_theta_sol, t_sim, int_theta_sim, rel_tolerance=4.0E-3, abs_tolerance=1.0E-2)
assert_timeseries_near_equal(t_sol, x_sol, t_sim, x_sim, rel_tolerance=1.0E-2, abs_tolerance=1.0E-1)
assert_timeseries_near_equal(t_sol, y_sol, t_sim, y_sim, rel_tolerance=1.0E-2, abs_tolerance=1.0E-1)
assert_timeseries_near_equal(t_sol, v_sol, t_sim, v_sim, rel_tolerance=1.0E-2, abs_tolerance=1.0E-1)
assert_timeseries_near_equal(t_sol, int_theta_sol, t_sim, int_theta_sim, rel_tolerance=1.0E-2,
abs_tolerance=1.0E-1)

assert_timeseries_near_equal(t_sol, int_theta_sol, t_sol, theta_sol, rel_tolerance=4.0E-3, abs_tolerance=1.0E-2)
assert_timeseries_near_equal(t_sim, int_theta_sim, t_sim, theta_sim, rel_tolerance=4.0E-3, abs_tolerance=1.0E-2)
assert_timeseries_near_equal(t_sol, int_theta_sol, t_sol, theta_sol, rel_tolerance=1.0E-2, abs_tolerance=1.0E-1)
assert_timeseries_near_equal(t_sim, int_theta_sim, t_sim, theta_sim, rel_tolerance=1.0E-2, abs_tolerance=1.0E-1)

def test_integrate_control_gl(self):
self._test_integrate_control(dm.GaussLobatto)
Expand Down Expand Up @@ -478,8 +481,10 @@ def _test_integrate_polynomial_control(self, transcription):
assert_timeseries_near_equal(t_sol, x_sol, t_sim, x_sim, rel_tolerance=4.0E-3, abs_tolerance=1.0E-2)
assert_timeseries_near_equal(t_sol, y_sol, t_sim, y_sim, rel_tolerance=4.0E-3, abs_tolerance=1.0E-2)
assert_timeseries_near_equal(t_sol, v_sol, t_sim, v_sim, rel_tolerance=4.0E-3, abs_tolerance=1.0E-2)
assert_timeseries_near_equal(t_sol, int_theta_sol, t_sim, int_theta_sim, rel_tolerance=4.0E-3, abs_tolerance=1.0E-2)
assert_timeseries_near_equal(t_sim, theta_rate_sim, t_sol, theta_rate_sol, rel_tolerance=4.0E-3, abs_tolerance=1.0E-2)
assert_timeseries_near_equal(t_sol, int_theta_sol, t_sim, int_theta_sim, rel_tolerance=4.0E-3,
abs_tolerance=1.0E-2)
assert_timeseries_near_equal(t_sim, theta_rate_sim, t_sol, theta_rate_sol, rel_tolerance=4.0E-3,
abs_tolerance=1.0E-2)

def _test_integrate_polynomial_control_rate(self, transcription):
#
Expand Down Expand Up @@ -556,7 +561,8 @@ def _test_integrate_polynomial_control_rate(self, transcription):
phase.set_control_val('theta', 45.0, units='deg')

# Run the driver to solve the problem
dm.run_problem(p, simulate=True, make_plots=False)
dm.run_problem(p, simulate=True, make_plots=False,
simulate_kwargs={'times_per_seg': 10})

sol_db = 'dymos_solution.db'
sim_db = 'dymos_simulation.db'
Expand Down Expand Up @@ -589,7 +595,7 @@ def _test_integrate_polynomial_control_rate(self, transcription):
assert_timeseries_near_equal(t_sol, y_sol, t_sim, y_sim, rel_tolerance=4.0E-3, abs_tolerance=1.0E-2)
assert_timeseries_near_equal(t_sol, v_sol, t_sim, v_sim, rel_tolerance=4.0E-3, abs_tolerance=1.0E-2)
assert_timeseries_near_equal(t_sol, int_theta_sol, t_sim, int_theta_sim,
rel_tolerance=4.0E-3, abs_tolerance=1.0E-2)
rel_tolerance=1.0E-2, abs_tolerance=1.5E-2)

assert_timeseries_near_equal(t_sol, int_theta_sol, t_sol, theta_sol, rel_tolerance=4.0E-3, abs_tolerance=1.0E-2)

Expand Down
13 changes: 9 additions & 4 deletions dymos/phase/phase.py
Original file line number Diff line number Diff line change
Expand Up @@ -2509,7 +2509,7 @@ def interp(self, name=None, ys=None, xs=None, nodes=None, kind='linear', axis=0)

def get_simulation_phase(self, times_per_seg=_unspecified, method=_unspecified, atol=_unspecified,
rtol=_unspecified, first_step=_unspecified, max_step=_unspecified,
reports=False):
reports=False, interpolant='cubic'):
"""
Return a SimulationPhase instance that is essentially a copy of this Phase.
Expand All @@ -2536,6 +2536,8 @@ def get_simulation_phase(self, times_per_seg=_unspecified, method=_unspecified,
Maximum step size for the integration.
reports : bool or None or str or Sequence
The reports setting for the subproblem run under each simulation segment.
interpolant : str
The interpolation method to be used for the controls in the simulation phase.
Returns
-------
Expand Down Expand Up @@ -2592,7 +2594,7 @@ def get_simulation_phase(self, times_per_seg=_unspecified, method=_unspecified,
rtol=_rtol,
first_step=_first_step,
max_step=_max_step,
control_interp='barycentric')
control_interp=interpolant)

sim_phase = SimulationPhase(transcription=tx,
ode_class=ode_class,
Expand Down Expand Up @@ -2693,7 +2695,8 @@ def initialize_values_from_phase(self, prob, from_phase, phase_path='', skip_par
prob.set_val(prob_path, val)

def simulate(self, times_per_seg=None, method=_unspecified, atol=_unspecified, rtol=_unspecified,
first_step=_unspecified, max_step=_unspecified, record_file=None, reports=False):
first_step=_unspecified, max_step=_unspecified, record_file=None, reports=False,
interpolant='cubic'):
"""
Simulate the Phase using scipy.integrate.solve_ivp.
Expand All @@ -2717,6 +2720,8 @@ def simulate(self, times_per_seg=None, method=_unspecified, atol=_unspecified, r
If None, no record of the simulation will be saved.
reports : bool or None or str or Sequence
Reports setting for the subproblems run under simualate.
interpolant : str
The interpolation method to be used for the controls in the simulation phase.
Returns
-------
Expand All @@ -2730,7 +2735,7 @@ def simulate(self, times_per_seg=None, method=_unspecified, atol=_unspecified, r
reports=reports)

sim_phase = self.get_simulation_phase(times_per_seg=times_per_seg, method=method, atol=atol, rtol=rtol,
first_step=first_step, max_step=max_step)
first_step=first_step, max_step=max_step, interpolant=interpolant)

sim_prob.model.add_subsystem(self.name, sim_phase)

Expand Down
7 changes: 5 additions & 2 deletions dymos/trajectory/trajectory.py
Original file line number Diff line number Diff line change
Expand Up @@ -1418,7 +1418,7 @@ def _print_constraints(phase, outstream):

def simulate(self, times_per_seg=_unspecified, method=_unspecified, atol=_unspecified, rtol=_unspecified,
first_step=_unspecified, max_step=_unspecified, record_file=None, case_prefix=None,
reset_iter_counts=True, reports=False):
reset_iter_counts=True, reports=False, interpolant='cubic'):
"""
Simulate the Trajectory using scipy.integrate.solve_ivp.
Expand Down Expand Up @@ -1446,6 +1446,8 @@ def simulate(self, times_per_seg=_unspecified, method=_unspecified, atol=_unspec
If True and model has been run previously, reset all iteration counters.
reports : bool or None or str or Sequence
Reports setting for the subproblems run under simualate.
interpolant : str
The interpolation method to be used for the controls in the simulation phase.
Returns
-------
Expand All @@ -1461,7 +1463,8 @@ def simulate(self, times_per_seg=_unspecified, method=_unspecified, atol=_unspec
continue
sim_phs = phs.get_simulation_phase(times_per_seg=times_per_seg, method=method,
atol=atol, rtol=rtol, first_step=first_step,
max_step=max_step, reports=reports)
max_step=max_step, reports=reports,
interpolant=interpolant)
sim_traj.add_phase(name, sim_phs)

if not sim_traj._phases:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
import numpy as np
import openmdao.api as om

from ...utils.misc import get_rate_units
from ...utils.lgl import lgl

from scipy.interpolate import CubicSpline


class CubicSplineControlInterpComp(om.ExplicitComponent):
"""
A component which interpolates control values in 1D using cubic spline interpolation.
Takes training values for control variables at given _input_ nodes,
broadcasts them to _discretization_ nodes, and then interpolates the discretization values
to provide a control variable at a given segment tau or phase tau.
This interpolation method is intended for use in simulation phases and should not be used to
solve optimization problems.
For dynamic controls, the current segment is given as a discrete input and the interpolation is
a smooth polynomial along the given segment.
OpenMDAO assumes sizes of variables at setup time, and we don't want to need to change the
size of the control input nodes when we evaluate different segments. Instead, this component
will take in the control values of all segments and internally use the appropriate one.
Parameters
----------
grid_data : GridData
A GridData instance that details information on how the control input and discretization
nodes are layed out.
control_options : dict of {str: ControlOptionsDictionary}
A mapping that maps the name of each control to a ControlOptionsDictionary of its options.
time_units : str
The time units pertaining to the control rates.
standalone_mode : bool
If True, this component runs its configuration steps during setup. This is useful for
unittests in which the component does not exist in a larger group.
**kwargs
Keyword arguments passed to ExplicitComponent.
"""
def __init__(self, grid_data, control_options=None,
time_units=None, standalone_mode=False, **kwargs):
self._grid_data = grid_data
self._control_options = {} if control_options is None else control_options
self._time_units = time_units
self._standalone_mode = standalone_mode

# Cache formatted strings: { control_name : (input_name, output_name) }
self._control_io_names = {}
self._polynomial_control_nodes = {}
self.num_uhat_nodes = None
self._input_grid = []

super().__init__(**kwargs)

def initialize(self):
"""
Declare component options.
"""
self.options.declare('segment_index', types=int, desc='index of the current segment')
self.options.declare('vec_size', types=int, default=1,
desc='number of points at which the control will be evaluated. This is not'
'necessarily the same as the number of nodes in the GridData.')
self.options.declare('compute_derivs', types=bool, default=True,
desc='Set to True if the interpolant needs to also compute the derivatives '
'of the outputs, otherwise False. This should be set to False for simulation mode '
'and true when using ExplicitShooting for optimization')

def _configure_controls(self):
vec_size = self.options['vec_size']
gd = self._grid_data

if not self._control_options:
return

self.num_uhat_nodes = gd.subset_num_nodes['control_input']
for control_name, options in self._control_options.items():
if options['control_type'] == 'full':
shape = options['shape']
units = options['units']
input_name = f'controls:{control_name}'
output_name = f'control_values:{control_name}'
rate_name = f'control_rates:{control_name}_rate'
rate2_name = f'control_rates:{control_name}_rate2'
rate_units = get_rate_units(units, self._time_units)
rate2_units = get_rate_units(units, self._time_units, deriv=2)
uhat_shape = (self.num_uhat_nodes,) + shape
output_shape = (vec_size,) + shape
self.add_input(input_name, shape=uhat_shape, units=units)
self.add_output(output_name, shape=output_shape, units=units)
self.add_output(rate_name, shape=output_shape, units=rate_units)
self.add_output(rate2_name, shape=output_shape, units=rate2_units)
self._control_io_names[control_name] = (input_name, output_name, rate_name, rate2_name)

input_ptau_grid = self._grid_data.node_ptau[self._grid_data.subset_node_indices['control_input']]
_, self._input_grid = np.unique(input_ptau_grid, return_index=True)

else:
order = options['order']
shape = options['shape']
units = options['units']
input_name = f'controls:{control_name}'
output_name = f'control_values:{control_name}'
rate_name = f'control_rates:{control_name}_rate'
rate2_name = f'control_rates:{control_name}_rate2'
rate_units = get_rate_units(units, self._time_units)
rate2_units = get_rate_units(units, self._time_units, deriv=2)
input_shape = (order + 1,) + shape
output_shape = (vec_size,) + shape
self.add_input(input_name, shape=input_shape, units=units)
self.add_output(output_name, shape=output_shape, units=units)
self.add_output(rate_name, shape=output_shape, units=rate_units)
self.add_output(rate2_name, shape=output_shape, units=rate2_units)
self._control_io_names[control_name] = (input_name, output_name, rate_name, rate2_name)
self._polynomial_control_nodes[control_name], _ = lgl(order+1)

def setup(self):
"""
Perform the I/O creation if operating in _standalone_mode.
"""
if self._standalone_mode:
self.configure_io()

def set_segment_index(self, idx, **kwargs):
"""
Set the active segment index for control interpolation.
Parameters
----------
idx : int
The index of the segment in which the controls are to be interpolated.
**kwargs : dict, optional
Keyword arguments that make this interpolant call-compatible with the BarycentricLagrangeInterpolant.
"""
self.options['segment_index'] = idx

def configure_io(self):
"""
I/O creation is delayed until configure so we can determine shape and units for the controls.
"""
vec_size = self.options['vec_size']

self.add_input('stau', shape=(vec_size,), units=None)
self.add_input('dstau_dt', val=1.0, units=f'1/{self._time_units}')
self.add_input('t_duration', val=1.0, units=self._time_units)
self.add_input('ptau', shape=(vec_size,), units=None)

self._configure_controls()

def compute(self, inputs, outputs, discrete_inputs=None, discrete_outputs=None):
"""
Compute interpolated control values and rates.
Parameters
----------
inputs : `Vector`
`Vector` containing inputs.
outputs : `Vector`
`Vector` containing outputs.
discrete_inputs : `Vector`
`Vector` containing discrete_inputs.
discrete_outputs : `Vector`
`Vector` containing discrete_outputs.
"""
vec_size = self.options['vec_size']
ptau = inputs['ptau']
ptau_grid = self._grid_data.node_ptau

if self._control_options:
input_node_idxs = self._grid_data.subset_node_indices['control_input']

for control_name, options in self._control_options.items():
if options['control_type'] == 'full':
input_name, output_name, rate_name, rate2_name = self._control_io_names[control_name]
shape = options['shape']

size = np.prod(shape)
out = np.zeros((vec_size, size))
rate = np.zeros((vec_size, size))
rate2 = np.zeros((vec_size, size))

for i in range(size):
spl = CubicSpline(ptau_grid[input_node_idxs][self._input_grid],
inputs[input_name][self._input_grid].flatten('F')[self.num_uhat_nodes*i:
self.num_uhat_nodes*(i+1)])
out[:, i] = spl(ptau)
rate[:, i] = spl(ptau, nu=1) / (0.5 * inputs['t_duration'])
rate2[:, i] = spl(ptau, nu=2) / (0.5 * inputs['t_duration'])**2

outputs[output_name] = out.reshape((vec_size,) + shape, order='F')
outputs[rate_name] = rate.reshape((vec_size,) + shape, order='F')
outputs[rate2_name] = rate2.reshape((vec_size,) + shape, order='F')
else:
input_name, output_name, rate_name, rate2_name = self._control_io_names[control_name]
order = options['order']
poly = np.polyfit(self._polynomial_control_nodes[control_name], inputs[input_name].ravel(), order)
der1 = np.polyder(poly)
der2 = np.polyder(der1)
outputs[output_name] = np.polyval(poly, ptau)
outputs[rate_name] = np.polyval(der1, ptau) / (0.5 * inputs['t_duration'])
outputs[rate2_name] = np.polyval(der2, ptau) / (0.5 * inputs['t_duration'])**2
Loading

0 comments on commit dd09fc2

Please sign in to comment.