# IMPORTS
import numpy as np
import sympy as sp
from scipy import sparse
import pickle
import copy
import time
from trajectories import Trajectory
from collocation import CollocationSystem
from simulation import Simulator
import auxiliary
import visualisation
from log import logging
# DEBUGGING
from IPython import embed as IPS
[docs]class ControlSystem(object):
'''
Base class of the PyTrajectory project.
Parameters
----------
ff : callable
Vector field (rhs) of the control system.
a : float
Left border of the considered time interval.
b : float
Right border of the considered time interval.
xa : list
Boundary values at the left border.
xb : list
Boundary values at the right border.
ua : list
Boundary values of the input variables at left border.
ub : list
Boundary values of the input variables at right border.
constraints : dict
Box-constraints of the state variables.
kwargs
============= ============= ============================================================
key default value meaning
============= ============= ============================================================
sx 10 Initial number of spline parts for the system variables
su 10 Initial number of spline parts for the input variables
kx 2 Factor for raising the number of spline parts
maxIt 10 Maximum number of iteration steps
eps 1e-2 Tolerance for the solution of the initial value problem
ierr 1e-1 Tolerance for the error on the whole interval
tol 1e-5 Tolerance for the solver of the equation system
dt_sim 1e-2 Sample time for integration (initial value problem)
use_chains True Whether or not to use integrator chains
sol_steps 100 Maximum number of iteration steps for the eqs solver
first_guess None to initiate free parameters (might be useful: {'seed': value})
============= ============= ============================================================
'''
def __init__(self, ff, a=0., b=1., xa=[], xb=[], ua=[], ub=[], constraints=None, **kwargs):
# set method parameters
self._parameters = dict()
self._parameters['maxIt'] = kwargs.get('maxIt', 10)
self._parameters['eps'] = kwargs.get('eps', 1e-2)
self._parameters['ierr'] = kwargs.get('ierr', 1e-1)
self._parameters['dt_sim'] = kwargs.get('dt_sim', 0.01)
self._parameters['accIt'] = kwargs.get('accIt', 5)
# create an object for the dynamical system
self.dyn_sys = DynamicalSystem(f_sym=ff, a=a, b=b, xa=xa, xb=xb, ua=ua, ub=ub)
# handle eventual system constraints
self.constraints = constraints
if self.constraints is not None:
# transform the constrained vectorfield into an unconstrained one
self.unconstrain(constraints)
# we cannot make use of an integrator chain
# if it contains a constrained variable
kwargs['use_chains'] = False
# TODO: implement it so that just those chains are not used
# which actually contain a constrained variable
# create an object for the collocation equation system
self.eqs = CollocationSystem(sys=self.dyn_sys, **kwargs)
# We didn't really do anything yet, so this should be false
self.reached_accuracy = False
[docs] def set_param(self, param='', value=None):
'''
Alters the value of the method parameters.
Parameters
----------
param : str
The method parameter
value
The new value
'''
if param in {'maxIt', 'eps', 'ierr', 'dt_sim'}:
self._parameters[param] = value
elif param in {'n_parts_x', 'sx', 'n_parts_u', 'su', 'kx', 'use_chains', 'nodes_type', 'use_std_approach'}:
if param == 'nodes_type' and value != 'equidistant':
raise NotImplementedError()
if param == 'sx':
param = 'n_parts_x'
if param == 'su':
param = 'n_parts_u'
self.eqs.trajectories._parameters[param] = value
elif param in {'tol', 'method', 'coll_type', 'sol_steps'}:
self.eqs._parameters[param] = value
else:
raise AttributeError("Invalid method parameter ({})".format(param))
[docs] def unconstrain(self, constraints):
'''
This method is used to enable compliance with desired box constraints given by the user.
It transforms the vectorfield by projecting the constrained state variables on
new unconstrained ones.
Parameters
----------
constraints : dict
The box constraints for the state variables
'''
# save constraints
self.constraints = constraints
# backup the original constrained system
self._dyn_sys_orig = copy.deepcopy(self.dyn_sys)
# get symbolic vectorfield
# (as sympy matrix toenable replacement method)
x = sp.symbols(self.dyn_sys.states)
u = sp.symbols(self.dyn_sys.inputs)
ff_mat = sp.Matrix(self.dyn_sys.f_sym(x, u))
# get neccessary information form the dynamical system
a = self.dyn_sys.a
b = self.dyn_sys.b
boundary_values = self.dyn_sys.boundary_values
# handle the constraints by projecting the constrained state variables
# on new unconstrained variables using saturation functions
for k, v in self.constraints.items():
# check if boundary values are within saturation limits
xk = self.dyn_sys.states[k]
xa, xb = self.dyn_sys.boundary_values[xk]
if not ( v[0] < xa < v[1] ) or not ( v[0] < xb < v[1] ):
logging.error('Boundary values have to be strictly within the saturation limits!')
logging.info('Please have a look at the documentation, \
especially the example of the constrained double intgrator.')
raise ValueError('Boundary values have to be strictly within the saturation limits!')
# calculate saturation function expression and its derivative
yk = sp.Symbol(xk)
m = 4.0/(v[1] - v[0])
psi = v[1] - (v[1]-v[0])/(1. + sp.exp(m * yk))
#dpsi = ((v[1]-v[0])*m*sp.exp(m*yk))/(1.0+sp.exp(m*yk))**2
dpsi = (4. * sp.exp(m * yk))/(1. + sp.exp(m * yk))**2
# replace constrained variables in vectorfield with saturation expression
# x(t) = psi(y(t))
ff_mat = ff_mat.replace(sp.Symbol(xk), psi)
# update vectorfield to represent differential equation for new
# unconstrained state variable
#
# d/dt x(t) = (d/dy psi(y(t))) * d/dt y(t)
# <==> d/dt y(t) = d/dt x(t) / (d/dy psi(y(t)))
ff_mat[k] /= dpsi
# update boundary values for new unconstrained variable
boundary_values[xk] = ( (1./m) * np.log((xa - v[0]) / (v[1] - xa)),
(1./m) * np.log((xb - v[0]) / (v[1] - xb)) )
# create a callable function for the new symbolic vectorfield
ff = np.asarray(ff_mat).flatten().tolist()
xu = self.dyn_sys.states + self.dyn_sys.inputs
_f_sym = sp.lambdify(xu, ff, modules='sympy')
def f_sym(x, u):
xu = np.hstack((x,u))
return _f_sym(*xu)
# create a new unconstrained system
xa = [boundary_values[x][0] for x in self.dyn_sys.states]
xb = [boundary_values[x][1] for x in self.dyn_sys.states]
ua = [boundary_values[u][0] for u in self.dyn_sys.inputs]
ub = [boundary_values[u][1] for u in self.dyn_sys.inputs]
self.dyn_sys = DynamicalSystem(f_sym , a, b, xa, xb, ua, ub)
[docs] def constrain(self):
'''
This method is used to determine the solution of the original constrained
state variables by creating a composition of the saturation functions and
the calculated solution for the introduced unconstrained variables.
'''
# get a copy of the current function dictionaries
# (containing functions for unconstrained variables y_i)
x_fnc = copy.deepcopy(self.eqs.trajectories.x_fnc)
dx_fnc = copy.deepcopy(self.eqs.trajectories.dx_fnc)
# iterate over all constraints
for k, v in self.constraints.items():
# get symbols of original constrained variable x_k, the introduced unconstrained variable y_k
# and the saturation limits y0, y1
xk = self._dyn_sys_orig.states[k]
yk = self.dyn_sys.states[k]
y0, y1 = v
# get the calculated solution function for the unconstrained variable and its derivative
y_fnc = x_fnc[yk]
dy_fnc = dx_fnc[yk]
# create the compositions
psi_y, dpsi_dy = auxiliary.saturation_functions(y_fnc, dy_fnc, y0, y1)
# put created compositions into dictionaries of solution functions
self.eqs.trajectories.x_fnc[xk] = psi_y
self.eqs.trajectories.dx_fnc[xk] = dpsi_dy
[docs] def solve(self):
'''
This is the main loop.
While the desired accuracy has not been reached, the collocation system will
be set up and solved with a iteratively raised number of spline parts.
Returns
-------
callable
Callable function for the system state.
callable
Callable function for the input variables.
'''
T_start = time.time()
# do the first iteration step
logging.info("1st Iteration: {} spline parts".format(self.eqs.trajectories.n_parts_x))
try:
self._iterate()
except auxiliary.NanError:
logging.warn("NanError")
return None, None
# this was the first iteration
# now we are getting into the loop
self.nIt = 1
while not self.reached_accuracy and self.nIt < self._parameters['maxIt']:
# raise the number of spline parts
self.eqs.trajectories._raise_spline_parts()
if self.nIt == 1:
logging.info("2nd Iteration: {} spline parts".format(self.eqs.trajectories.n_parts_x))
elif self.nIt == 2:
logging.info("3rd Iteration: {} spline parts".format(self.eqs.trajectories.n_parts_x))
elif self.nIt >= 3:
logging.info("{}th Iteration: {} spline parts".format(self.nIt+1, self.eqs.trajectories.n_parts_x))
# start next iteration step
try:
self._iterate()
except auxiliary.NanError:
logging.warn("NanError")
return None, None
# increment iteration number
self.nIt += 1
# as a last, if there were any constraints to be taken care of,
# we project the unconstrained variables back on the original constrained ones
if self.constraints:
self.constrain()
self.T_sol = time.time() - T_start
# return the found solution functions
return self.eqs.trajectories.x, self.eqs.trajectories.u
def _iterate(self):
'''
This method is used to run one iteration step.
First, new splines are initialised.
Then, a start value for the solver is determined and the equation
system is set up.
Next, the equation system is solved and the resulting numerical values
for the free parameters are applied to the corresponding splines.
As a last, the resulting initial value problem is simulated.
'''
# Note: in pytrajectory there are Three main levels of 'iteration'
# Level 3: perform one LM-Step (i.e. calculate a new set of parameters)
# This is implemented in solver.py. Ends when tolerances are met or
# the maximum number of steps is reached
# Level 2: restarts the LM-Algorithm with the last values
# and stops if the desired accuracy for the initial value problem
# is met or if the maximum number of steps solution attempts is reached
# Level 1: increasing the spline number.
# In Each step solve a nonlinear optimization problem (with LM)
# Initialise the spline function objects
self.eqs.trajectories.init_splines()
# Get an initial value (guess)
self.eqs.get_guess()
# Build the collocation equations system
C = self.eqs.build()
G, DG = C.G, C.DG
# Solve the collocation equation system
new_solver = True
while True:
sol = self.eqs.solve(G, DG, new_solver=new_solver)
# in the following iterations we want to use the same solver
# object (we just had an intermediate look, whether the solution
# of the initial value problem is already sufficient accurate.)
new_solver = False
# Set the found solution
self.eqs.trajectories.set_coeffs(sol)
# Solve the resulting initial value problem
self.simulate()
# check if desired accuracy is reached
self.check_accuracy()
# any of the follwing conditions ends the loop
slvr = self.eqs.solver
cond1 = self.reached_accuracy
# following means: solver stopped not
# only because of maximum stepp # number
cond2 = (not slvr.cond_num_steps) or slvr.cond_abs_tol \
or slvr.cond_rel_tol
cond3 = self.eqs.solver.solve_count >= self._parameters['accIt']
if cond1 or cond2 or cond3:
break
else:
logging.debug('New attempt\n\n')
[docs] def simulate(self):
'''
This method is used to solve the resulting initial value problem
after the computation of a solution for the input trajectories.
'''
logging.debug("Solving Initial Value Problem")
# calulate simulation time
T = self.dyn_sys.b - self.dyn_sys.a
# get list of start values
start = []
if self.constraints is not None:
sys = self._dyn_sys_orig
else:
sys = self.dyn_sys
x_vars = sys.states
start_dict = dict([(k, v[0]) for k, v in sys.boundary_values.items() if k in x_vars])
ff = sys.f_num
for x in x_vars:
start.append(start_dict[x])
# create simulation object
S = Simulator(ff, T, start, self.eqs.trajectories.u, dt=self._parameters['dt_sim'])
logging.debug("start: %s"%str(start))
# start forward simulation
self.sim_data = S.simulate()
[docs] def check_accuracy(self):
'''
Checks whether the desired accuracy for the boundary values was reached.
It calculates the difference between the solution of the simulation
and the given boundary values at the right border and compares its
maximum against the tolerance.
If set by the user it also calculates some kind of consistency error
that shows how "well" the spline functions comply with the system
dynamic given by the vector field.
'''
# this is the solution of the simulation
a = self.sim_data[0][0]
b = self.sim_data[0][-1]
xt = self.sim_data[1]
# get boundary values at right border of the interval
if self.constraints:
bv = self._dyn_sys_orig.boundary_values
x_sym = self._dyn_sys_orig.states
else:
bv = self.dyn_sys.boundary_values
x_sym = self.dyn_sys.states
xb = dict([(k, v[1]) for k, v in bv.items() if k in x_sym])
# what is the error
logging.debug(40*"-")
logging.debug("Ending up with: Should Be: Difference:")
err = np.empty(xt.shape[1])
for i, xx in enumerate(x_sym):
err[i] = abs(xb[xx] - xt[-1][i])
logging.debug(str(xx)+" : %f %f %f"%(xt[-1][i], xb[xx], err[i]))
logging.debug(40*"-")
#if self._ierr:
ierr = self._parameters['ierr']
eps = self._parameters['eps']
if ierr:
# calculate maximum consistency error on the whole interval
maxH = auxiliary.consistency_error((a,b), self.eqs.trajectories.x, self.eqs.trajectories.u, self.eqs.trajectories.dx, self.dyn_sys.f_num)
reached_accuracy = (maxH < ierr) and (max(err) < eps)
logging.debug('maxH = %f'%maxH)
else:
# just check if tolerance for the boundary values is satisfied
reached_accuracy = max(err) < eps
if reached_accuracy:
logging.info(" --> reached desired accuracy: "+str(reached_accuracy))
else:
logging.debug(" --> reached desired accuracy: "+str(reached_accuracy))
self.reached_accuracy = reached_accuracy
[docs] def plot(self):
'''
Plot the calculated trajectories and show interval error functions.
This method calculates the error functions and then calls
the :py:func:`visualisation.plotsim` function.
'''
try:
import matplotlib
except ImportError:
logging.error('Matplotlib is not available for plotting.')
return
if self.constraints:
sys = self._dyn_sys_orig
else:
sys = self.dyn_sys
# calculate the error functions H_i(t)
max_con_err, error = auxiliary.consistency_error((sys.a, sys.b),
self.eqs.trajectories.x,
self.eqs.trajectories.u,
self.eqs.trajectories.dx,
sys.f_num, len(self.sim_data[0]), True)
H = dict()
for i in self.eqs.trajectories._eqind:
H[i] = error[:,i]
visualisation.plot_simulation(self.sim_data, H)
[docs] def save(self, fname=None):
'''
Save data using the python module :py:mod:`pickle`.
'''
save = dict.fromkeys(['sys', 'eqs', 'traj'])
# system state
save['sys'] = dict()
save['sys']['state'] = dict.fromkeys(['nIt', 'reached_accuracy'])
save['sys']['state']['nIt'] = self.nIt
save['sys']['state']['reached_accuracy'] = self.reached_accuracy
# simulation results
save['sys']['sim_data'] = self.sim_data
# parameters
save['sys']['parameters'] = self._parameters
save['eqs'] = self.eqs.save()
save['traj'] = self.eqs.trajectories.save()
if fname is not None:
if not (fname.endswith('.pcl') or fname.endswith('.pcl')):
fname += '.pcl'
with open(fname, 'w') as dumpfile:
pickle.dump(save, dumpfile)
return save
@property
def a(self):
return self.dyn_sys.a
@property
def b(self):
return self.dyn_sys.b
[docs]class DynamicalSystem(object):
'''
Provides access to information about the dynamical system that is the
object of the control process.
Parameters
----------
f_sym : callable
The (symbolic) vector field of the dynamical system
a, b : floats
The initial end final time of the control process
xa, xb : iterables
The initial and final conditions for the state variables
ua, ub : iterables
The initial and final conditions for the input variables
'''
def __init__(self, f_sym, a=0., b=1., xa=[], xb=[], ua=[], ub=[]):
self.f_sym = f_sym
self.a = a
self.b = b
# analyse the given system
self.n_states, self.n_inputs = self._determine_system_dimensions(n=len(xa))
# set names of the state and input variables
# (will be used as keys in various dictionaries)
self.states = tuple(['x{}'.format(i+1) for i in xrange(self.n_states)])
self.inputs = tuple(['u{}'.format(j+1) for j in xrange(self.n_inputs)])
# init dictionary for boundary values
self.boundary_values = self._get_boundary_dict_from_lists(xa, xb, ua, ub)
# create a numeric counterpart for the vector field
# for faster evaluation
self.f_num = auxiliary.sym2num_vectorfield(f_sym=self.f_sym, x_sym=self.states, u_sym=self.inputs,
vectorized=False, cse=False)
def _determine_system_dimensions(self, n):
'''
Determines the number of state and input variables.
Parameters
----------
n : int
Length of the list of initial state values
'''
# first, determine system dimensions
logging.debug("Determine system/input dimensions")
# the number of system variables can be determined via the length
# of the boundary value lists
n_states = n
# now we want to determine the input dimension
# therefore we iteratively increase the inputs dimension and try to call
# the vectorfield
found_n_inputs = False
x = np.ones(n_states)
j = 0
while not found_n_inputs:
u = np.ones(j)
try:
self.f_sym(x, u)
# if no ValueError is raised j is the dimension of the inputs
n_inputs = j
found_n_inputs = True
except (TypeError, ValueError):
# unpacking error inside f_sym
# (that means the dimensions don't match)
j += 1
logging.debug("--> state: {}".format(n_states))
logging.debug("--> input : {}".format(n_inputs))
return n_states, n_inputs
def _get_boundary_dict_from_lists(self, xa, xb, ua, ub):
'''
Creates a dictionary of boundary values for the state and input variables
for easier access.
'''
# consistency check
assert len(xa) == len(xb) == self.n_states
#assert len(ua) == len(ub) == self.n_inputs
if not ua and not ub:
ua = [None] * self.n_inputs
ub = [None] * self.n_inputs
# init dictionary
boundary_values = dict()
# add state boundary values
for i, x in enumerate(self.states):
boundary_values[x] = (xa[i], xb[i])
# add input boundary values
for j, u in enumerate(self.inputs):
boundary_values[u] = (ua[j], ub[j])
return boundary_values