Source code for pytrajectory.collocation

# IMPORTS
import numpy as np
import sympy as sp
from scipy import sparse

from log import logging, Timer
from trajectories import Trajectory
from solver import Solver

from auxiliary import sym2num_vectorfield

from IPython import embed as IPS

[docs]class Container(object): """ Simple data structure to store additional internal information for debugging and checking the algorithms. Some of the attributes might indeed be neccessary """ def __init__(self, **kwargs): for key, value in kwargs.iteritems(): self.__setattr__(str(key), value)
[docs]class CollocationSystem(object): ''' This class represents the collocation system that is used to determine a solution for the free parameters of the control system, i.e. the independent coefficients of the trajectory splines. Parameters ---------- sys : system.DynamicalSystem Instance of a dynamical system ''' def __init__(self, sys, **kwargs): self.sys = sys # set parameters self._parameters = dict() self._parameters['tol'] = kwargs.get('tol', 1e-5) self._parameters['reltol'] = kwargs.get('reltol', 2e-5) self._parameters['sol_steps'] = kwargs.get('sol_steps', 50) self._parameters['method'] = kwargs.get('method', 'leven') self._parameters['coll_type'] = kwargs.get('coll_type', 'equidistant') # we don't have a soution, yet self.sol = None # create vectorized versions of the control system's vector field # and its jacobian for the faster evaluation of the collocation equation system `G` # and its jacobian `DG` (--> see self.build()) f = sys.f_sym(sp.symbols(sys.states), sp.symbols(sys.inputs)) # TODO: check order of variables of differentiation ([x,u] vs. [u,x]) # because in dot products in later evaluation of `DG` with vector `c` # values for u come first in `c` # TODO: remove this comment after reviewing the problem # previously the jacobian was calculated wrt to strings which triggered strange # strange sympy behavior (bug) for systems with more than 9 variables # workarround: we use real symbols now all_symbols = sp.symbols(sys.states + sys.inputs) Df = sp.Matrix(f).jacobian(all_symbols) self._ff_vectorized = sym2num_vectorfield(f, sys.states, sys.inputs, vectorized=True, cse=True) self._Df_vectorized = sym2num_vectorfield(Df, sys.states, sys.inputs, vectorized=True, cse=True) self._f = f self._Df = Df self.trajectories = Trajectory(sys, **kwargs) self._first_guess = kwargs.get('first_guess', None)
[docs] def build(self): ''' This method is used to set up the equations for the collocation equation system and defines functions for the numerical evaluation of the system and its jacobian. ''' logging.debug("Building Equation System") # make symbols local states = self.sys.states inputs = self.sys.inputs # determine for each spline the index range of its free coeffs in the concatenated # vector of all free coeffs indic = self._get_index_dict() # compute dependence matrices Mx, Mx_abs, Mdx, Mdx_abs, Mu, Mu_abs = self._build_dependence_matrices(indic) # in the later evaluation of the equation system `G` and its jacobian `DG` # there will be created the matrices `F` and DF in which every nx rows represent the # evaluation of the control systems vectorfield and its jacobian in a specific collocation # point, where nx is the number of state variables # # if we make use of the system structure, i.e. the integrator chains, not every # equation of the vector field has to be solved and because of that, not every row # of the matrices `F` and `DF` is neccessary # # therefore we now create an array with the indices of all rows we need from these matrices if self.trajectories._parameters['use_chains']: eqind = self.trajectories._eqind else: eqind = range(len(states)) # `eqind` now contains the indices of the equations/rows of the vector field # that have to be solved delta = 2 n_cpts = self.trajectories.n_parts_x * delta + 1 # this (-> `take_indices`) will be the array with indices of the rows we need # # to get these indices we iterate over all rows and take those whose indices # are contained in `eqind` (modulo the number of state variables -> `x_len`) take_indices = np.tile(eqind, (n_cpts,)) + np.arange(n_cpts).repeat(len(eqind)) * len(states) # here we determine the jacobian matrix of the derivatives of the system state functions # (as they depend on the free parameters in a linear fashion its just the above matrix Mdx) DdX = Mdx[take_indices, :] # here we compute the jacobian matrix of the system/input splines as they also depend on # the free parameters DXU = [] n_states = self.sys.n_states n_inputs = self.sys.n_inputs n_vars = n_states + n_inputs for i in xrange(n_cpts): DXU.append(np.vstack(( Mx[n_states * i : n_states * (i+1)].toarray(), Mu[n_inputs * i : n_inputs * (i+1)].toarray() ))) DXU_old = DXU DXU = np.vstack(DXU) DXU = sparse.csr_matrix(DXU) # localize vectorized functions for the control system's vector field and its jacobian ff_vec = self._ff_vectorized Df_vec = self._Df_vectorized # transform matrix formats for faster dot products Mx = Mx.tocsr() Mx_abs = Mx_abs.tocsr() Mdx = Mdx.tocsr() Mdx_abs = Mdx_abs.tocsr() Mu = Mu.tocsr() Mu_abs = Mu_abs.tocsr() DdX = DdX.tocsr() # define the callable functions for the eqs def G(c, info=False): # TODO: check if both spline approaches result in same values here X = Mx.dot(c)[:,None] + Mx_abs U = Mu.dot(c)[:,None] + Mu_abs X = np.array(X).reshape((n_states, -1), order='F') U = np.array(U).reshape((n_inputs, -1), order='F') # evaluate system equations and select those related # to lower ends of integrator chains (via eqind) # other equations need not be solved #F = ff_vec(X, U).take(eqind, axis=0) F = ff_vec(X, U).ravel(order='F').take(take_indices, axis=0)[:,None] dX = Mdx.dot(c)[:,None] + Mdx_abs dX = dX.take(take_indices, axis=0) #dX = np.array(dX).reshape((x_len, -1), order='F').take(eqind, axis=0) G = F - dX res = np.asarray(G).ravel(order='F') # debug: if info: # see Container docstring for motivation iC = Container(X=X, U=U, F=F, dX=dX, res=res) res = iC return res # and its jacobian def DG(c): # first we calculate the x and u values in all collocation points # with the current numerical values of the free parameters X = Mx.dot(c)[:,None] + Mx_abs X = np.array(X).reshape((n_states, -1), order='F') U = Mu.dot(c)[:,None] + Mu_abs U = np.array(U).reshape((n_inputs, -1), order='F') # get the jacobian blocks and turn them into the right shape DF_blocks = Df_vec(X,U).transpose([2,0,1]) # build a block diagonal matrix from the blocks DF_csr = sparse.block_diag(DF_blocks, format='csr').dot(DXU) # if we make use of the system structure # we have to select those rows which correspond to the equations # that have to be solved if self.trajectories._parameters['use_chains']: DF_csr = sparse.csr_matrix(DF_csr.toarray().take(take_indices, axis=0)) # TODO: is the performance gain that results from not having to solve # some equations (use integrator chains) greater than # the performance loss that results from transfering the # sparse matrix to a full numpy array and back to a sparse matrix? DG = DF_csr - DdX return DG C = Container(G=G, DG=DG, Mx=Mx, Mx_abs=Mx_abs, Mu=Mu, Mu_abs=Mu_abs, Mdx=Mdx, Mdx_abs=Mdx_abs, guess=self.guess) # return the callable functions #return G, DG # store internal information for diagnose purposes C.take_indices = take_indices self.C = C return C
def _get_index_dict(self): # here we do something that will be explained after we've done it ;-) indic = dict() i = 0 j = 0 # iterate over spline quantities for k, v in sorted(self.trajectories.indep_coeffs.items(), key=lambda (k, v): k): # increase j by the number of indep coeffs on which it depends j += len(v) indic[k] = (i, j) i = j # iterate over all quantities including inputs # and take care of integrator chain elements if self.trajectories._parameters['use_chains']: for sq in self.sys.states + self.sys.inputs: for ic in self.trajectories._chains: if sq in ic: indic[sq] = indic[ic.upper] # as promised: here comes the explanation # # now, the dictionary 'indic' looks something like # # indic = {u1 : (0, 6), x3 : (18, 24), x4 : (24, 30), x1 : (6, 12), x2 : (12, 18)} # # which means, that in the vector of all independent parameters of all splines # the 0th up to the 5th item [remember: Python starts indexing at 0 and leaves out the last] # belong to the spline created for u1, the items with indices from 6 to 11 belong to the # spline created for x1 and so on... return indic def _build_dependence_matrices(self, indic): # first we compute the collocation points cpts = collocation_nodes(a=self.sys.a, b=self.sys.b, npts=self.trajectories.n_parts_x * 2 + 1, coll_type=self._parameters['coll_type']) x_fnc = self.trajectories.x_fnc dx_fnc = self.trajectories.dx_fnc u_fnc = self.trajectories.u_fnc states = self.sys.states inputs = self.sys.inputs # total number of independent coefficients free_param = np.hstack(sorted(self.trajectories.indep_coeffs.values(), key=lambda arr: arr[0].name)) n_dof = free_param.size # store internal information: self.dbgC = Container(cpts=cpts, indic=indic, dx_fnc=dx_fnc, x_fnc=x_fnc, u_fnc=u_fnc) self.dbgC.free_param=free_param lx = len(cpts) * self.sys.n_states lu = len(cpts) * self.sys.n_inputs # initialize sparse dependence matrices Mx = sparse.lil_matrix((lx, n_dof)) Mx_abs = sparse.lil_matrix((lx, 1)) Mdx = sparse.lil_matrix((lx, n_dof)) Mdx_abs = sparse.lil_matrix((lx, 1)) Mu = sparse.lil_matrix((lu, n_dof)) Mu_abs = sparse.lil_matrix((lu, 1)) for ip, p in enumerate(cpts): for ix, xx in enumerate(states): # get index range of `xx` in vector of all indep coeffs i,j = indic[xx] # determine derivation order according to integrator chains dorder_fx = _get_derivation_order(x_fnc[xx]) dorder_dfx = _get_derivation_order(dx_fnc[xx]) assert dorder_dfx == dorder_fx + 1 # get dependence vector for the collocation point and spline variable mx, mx_abs = x_fnc[xx].im_self.get_dependence_vectors(p, d=dorder_fx) mdx, mdx_abs = dx_fnc[xx].im_self.get_dependence_vectors(p, d=dorder_dfx) k = ip * self.sys.n_states + ix Mx[k, i:j] = mx Mx_abs[k] = mx_abs Mdx[k, i:j] = mdx Mdx_abs[k] = mdx_abs for iu, uu in enumerate(self.sys.inputs): # get index range of `xx` in vector of all indep coeffs i,j = indic[uu] dorder_fu = _get_derivation_order(u_fnc[uu]) # get dependence vector for the collocation point and spline variable mu, mu_abs = u_fnc[uu].im_self.get_dependence_vectors(p, d=dorder_fu) k = ip * self.sys.n_inputs + iu Mu[k, i:j] = mu Mu_abs[k] = mu_abs return Mx, Mx_abs, Mdx, Mdx_abs, Mu, Mu_abs
[docs] def get_guess(self): ''' This method is used to determine a starting value (guess) for the solver of the collocation equation system. If it is the first iteration step, then a vector with the same length as the vector of the free parameters with arbitrary values is returned. Else, for every variable a spline has been created for, the old spline of the iteration before and the new spline are evaluated at specific points and a equation system is solved which ensures that they are equal in these points. The solution of this system is the new start value for the solver. ''' if not self.trajectories._old_splines: if self._first_guess is None: free_coeffs_all = np.hstack(self.trajectories.indep_coeffs.values()) guess = 0.1 * np.ones(free_coeffs_all.size) else: guess = np.empty(0) for k, v in sorted(self.trajectories.indep_coeffs.items(), key = lambda (k, v): k): logging.debug("Get new guess for spline {}".format(k)) if self._first_guess.has_key(k): s = self.trajectories.splines[k] f = self._first_guess[k] free_coeffs_guess = s.interpolate(f) elif self._first_guess.has_key('seed'): np.random.seed(self._first_guess.get('seed')) free_coeffs_guess = np.random.random(len(v)) else: free_coeffs_guess = 0.1 * np.ones(len(v)) guess = np.hstack((guess, free_coeffs_guess)) else: guess = np.empty(0) # now we compute a new guess for every free coefficient of every new (finer) spline # by interpolating the corresponding old (coarser) spline for k, v in sorted(self.trajectories.indep_coeffs.items(), key = lambda (k, v): k): # TODO: introduce a parameter `ku` (factor for increasing spline resolution for u) # formerly its spline resolution was constant # (from that period stems the following if-statement) # currently the input is handled like the states # thus the else branch is switched off if True or (self.trajectories.splines[k].type == 'x'): logging.debug("Get new guess for spline {}".format(k)) s_new = self.trajectories.splines[k] s_old = self.trajectories._old_splines[k] df0 = s_old.df(self.sys.a) dfn = s_old.df(self.sys.b) free_coeffs_guess = s_new.interpolate(s_old.f, m0=df0, mn=dfn) guess = np.hstack((guess, free_coeffs_guess)) else: # if it is a input variable, just take the old solution guess = np.hstack((guess, self.trajectories._old_splines[k]._indep_coeffs)) # the new guess self.guess = guess
[docs] def solve(self, G, DG, new_solver=True): ''' This method is used to solve the collocation equation system. Parameters ---------- G : callable Function that "evaluates" the equation system. DG : callable Function for the jacobian. new_solver : bool flag to determine whether a new solver instance should be initialized (default True) ''' logging.debug("Solving Equation System") # create our solver if new_solver: self.solver = Solver(F=G, DF=DG, x0=self.guess, tol=self._parameters['tol'], reltol=self._parameters['reltol'], maxIt=self._parameters['sol_steps'], method=self._parameters['method']) else: # assume self.solver exists and at we already did a solution run assert self.solver.solve_count > 0 # solve the equation system self.sol = self.solver.solve() return self.sol
[docs] def save(self): save = dict() # parameters save['parameters'] = self._parameters # vector field and jacobian save['f'] = self._f save['Df'] = self._Df # guess save['guess'] = self.guess # solution save['sol'] = self.sol return save
[docs]def collocation_nodes(a, b, npts, coll_type): ''' Create collocation points/nodes for the equation system. Parameters ---------- a : float The left border of the considered interval. b : float The right border of the considered interval. npts : int The number of nodes. coll_type : str Specifies how to generate the nodes. Returns ------- numpy.ndarray The collocation nodes. ''' if coll_type == 'equidistant': # get equidistant collocation points cpts = np.linspace(a, b, npts, endpoint=True) elif coll_type == 'chebychev': # determine rank of chebychev polynomial # of which to calculate zero points nc = int(npts) - 2 # calculate zero points of chebychev polynomial --> in [-1,1] cheb_cpts = [np.cos( (2.0*i+1)/(2*(nc+1)) * np.pi) for i in xrange(nc)] cheb_cpts.sort() # transfer chebychev nodes from [-1,1] to our interval [a,b] chpts = [a + (b-a)/2.0 * (chp + 1) for chp in cheb_cpts] # add left and right borders cpts = np.hstack((a, chpts, b)) else: logging.warning('Unknown type of collocation points.') logging.warning('--> will use equidistant points!') cpts = np.linspace(a, b, npts, endpoint=True) return cpts
def _get_derivation_order(fnc): ''' Returns derivation order of function according to place in integrator chain. ''' from .splines import Spline if fnc.im_func == Spline.f.im_func: return 0 elif fnc.im_func == Spline.df.im_func: return 1 elif fnc.im_func == Spline.ddf.im_func: return 2 elif fnc.im_func == Spline.dddf.im_func: return 3 else: raise ValueError() def _build_sol_from_free_coeffs(splines): ''' Concatenates the values of the independent coeffs of all splines in given dict to build pseudo solution. ''' sol = np.empty(0) for k, v in sorted(splines.items(), key = lambda (k, v): k): assert not v._prov_flag sol = np.hstack([sol, v._indep_coeffs]) return sol