Source code for pytrajectory.trajectories

# IMPORTS
import numpy as np
import copy

from splines import Spline, differentiate
from log import logging
import auxiliary

[docs]class Trajectory(object): ''' This class handles the creation and managing of the spline functions that are intended to approximate the desired trajectories. Parameters ---------- sys : system.DynamicalSystem Instance of a dynamical system providing information like vector field function and boundary values ''' def __init__(self, sys, **kwargs): # save the dynamical system self.sys = sys # set parameters self._parameters = dict() self._parameters['n_parts_x'] = kwargs.get('sx', 10) self._parameters['n_parts_u'] = kwargs.get('su', 10) self._parameters['kx'] = kwargs.get('kx', 2) self._parameters['nodes_type'] = kwargs.get('nodes_type', 'equidistant') self._parameters['use_std_approach'] = kwargs.get('use_std_approach', True) self._chains, self._eqind = auxiliary.find_integrator_chains(sys) self._parameters['use_chains'] = kwargs.get('use_chains', True) # Initialise dictionaries as containers for all # spline functions that will be created self.splines = dict() self.x_fnc = dict() self.u_fnc = dict() self.dx_fnc = dict() # This will be the free parameters of the control problem # (list of all independent spline coefficients) self.indep_coeffs = [] self._old_splines = None @property def n_parts_x(self): ''' Number of polynomial spline parts for system variables. ''' return self._parameters['n_parts_x'] @property def n_parts_u(self): ''' Number of polynomial spline parts for input variables. ''' return self._parameters['n_parts_u'] def _raise_spline_parts(self, k=None): if k is not None: # This normally does not happen, and is only for # experiments and debugging self._parameters['n_parts_x'] *= int(k) else: self._parameters['n_parts_x'] *= self._parameters['kx'] # TODO: introduce parameter `ku` and handle it here # (and in CollocationSystem.get_guess()) self._parameters['n_parts_u'] *= self._parameters['kx'] return self.n_parts_x
[docs] def x(self, t): ''' Returns the current system state. Parameters ---------- t : float The time point in (a,b) to evaluate the system at. ''' if not self.sys.a <= t <= self.sys.b: logging.warning("Time point 't' has to be in (a,b)") arr = None else: arr = np.array([self.x_fnc[xx](t) for xx in self.sys.states]) return arr
[docs] def u(self, t): ''' Returns the state of the input variables. Parameters ---------- t : float The time point in (a,b) to evaluate the input variables at. ''' if not self.sys.a <= t <= self.sys.b: #logging.warning("Time point 't' has to be in (a,b)") arr = np.array([self.u_fnc[uu](self.sys.b) for uu in self.sys.inputs]) else: arr = np.array([self.u_fnc[uu](t) for uu in self.sys.inputs]) return arr
[docs] def dx(self, t): ''' Returns the state of the 1st derivatives of the system variables. Parameters ---------- t : float The time point in (a,b) to evaluate the 1st derivatives at. ''' if not self.sys.a <= t <= self.sys.b: logging.warning("Time point 't' has to be in (a,b)") arr = None else: arr = np.array([self.dx_fnc[xx](t) for xx in self.sys.states]) return arr
[docs] def init_splines(self): ''' This method is used to create the necessary spline function objects. Parameters ---------- boundary_values : dict Dictionary of boundary values for the state and input splines functions. ''' logging.debug("Initialise Splines") # store the old splines to calculate the guess later self._old_splines = copy.deepcopy(self.splines) bv = self.sys.boundary_values # dictionaries for splines and callable solution function for x,u and dx splines = dict() x_fnc = dict() u_fnc = dict() dx_fnc = dict() if self._parameters['use_chains']: # first handle variables that are part of an integrator chain for chain in self._chains: upper = chain.upper lower = chain.lower # here we just create a spline object for the upper ends of every chain # w.r.t. its lower end (whether it is an input variable or not) if chain.lower.startswith('x'): splines[upper] = Spline(self.sys.a, self.sys.b, n=self.n_parts_x, bv={0:bv[upper]}, tag=upper, nodes_type=self._parameters['nodes_type'], use_std_approach=self._parameters['use_std_approach']) splines[upper].type = 'x' elif chain.lower.startswith('u'): splines[upper] = Spline(self.sys.a, self.sys.b, n=self.n_parts_u, bv={0:bv[lower]}, tag=upper, nodes_type=self._parameters['nodes_type'], use_std_approach=self._parameters['use_std_approach']) splines[upper].type = 'u' # search for boundary values to satisfy for i, elem in enumerate(chain.elements): if elem in self.sys.states: splines[upper]._boundary_values[i] = bv[elem] if splines[upper].type == 'u': splines[upper]._boundary_values[i+1] = bv[lower] # solve smoothness and boundary conditions splines[upper].make_steady() # calculate derivatives for i, elem in enumerate(chain.elements): if elem in self.sys.inputs: if (i == 0): u_fnc[elem] = splines[upper].f if (i == 1): u_fnc[elem] = splines[upper].df if (i == 2): u_fnc[elem] = splines[upper].ddf elif elem in self.sys.states: if (i == 0): splines[upper]._boundary_values[0] = bv[elem] if splines[upper].type == 'u': splines[upper]._boundary_values[1] = bv[lower] x_fnc[elem] = splines[upper].f if (i == 1): splines[upper]._boundary_values[1] = bv[elem] if splines[upper].type == 'u': splines[upper]._boundary_values[2] = bv[lower] x_fnc[elem] = splines[upper].df if (i == 2): splines[upper]._boundary_values[2] = bv[elem] x_fnc[elem] = splines[upper].ddf # now handle the variables which are not part of any chain for i, xx in enumerate(self.sys.states): if not x_fnc.has_key(xx): splines[xx] = Spline(self.sys.a, self.sys.b, n=self.n_parts_x, bv={0:bv[xx]}, tag=xx, nodes_type=self._parameters['nodes_type'], use_std_approach=self._parameters['use_std_approach']) splines[xx].make_steady() splines[xx].type = 'x' x_fnc[xx] = splines[xx].f offset = self.sys.n_states for j, uu in enumerate(self.sys.inputs): if not u_fnc.has_key(uu): splines[uu] = Spline(self.sys.a, self.sys.b, n=self.n_parts_u, bv={0:bv[uu]}, tag=uu, nodes_type=self._parameters['nodes_type'], use_std_approach=self._parameters['use_std_approach']) splines[uu].make_steady() splines[uu].type = 'u' u_fnc[uu] = splines[uu].f # calculate derivatives of every state variable spline for xx in self.sys.states: dx_fnc[xx] = differentiate(x_fnc[xx]) indep_coeffs = dict() for ss in splines.keys(): indep_coeffs[ss] = splines[ss]._indep_coeffs self.indep_coeffs = indep_coeffs self.splines = splines self.x_fnc = x_fnc self.u_fnc = u_fnc self.dx_fnc = dx_fnc
[docs] def set_coeffs(self, sol): ''' Set found numerical values for the independent parameters of each spline. This method is used to get the actual splines by using the numerical solutions to set up the coefficients of the polynomial spline parts of every created spline. Parameters ---------- sol : numpy.ndarray The solution vector for the free parameters, i.e. the independent coefficients. ''' # TODO: look for bugs here! logging.debug("Set spline coefficients") sol_bak = sol.copy() subs = dict() for k, v in sorted(self.indep_coeffs.items(), key=lambda (k, v): k): i = len(v) subs[k] = sol[:i] sol = sol[i:] if self._parameters['use_chains']: for var in self.sys.states + self.sys.inputs: for ic in self._chains: if var in ic: subs[var] = subs[ic.upper] # set numerical coefficients for each spline and derivative for k in self.splines.keys(): self.splines[k].set_coefficients(free_coeffs=subs[k]) # yet another dictionary for solution and coeffs coeffs_sol = dict() # used for indexing i = 0 j = 0 for k, v in sorted(self.indep_coeffs.items(), key=lambda (k, v): k): j += len(v) coeffs_sol[k] = sol_bak[i:j] i = j self.coeffs_sol = coeffs_sol
[docs] def save(self): save = dict() # parameters save['parameters'] = self._parameters # splines save['splines'] = dict((var, spline.save()) for var, spline in self.splines.iteritems()) # sol save['coeffs_col'] = self.coeffs_sol return save