From 6933c33dbcf4094edb878d566e828b4e121e2c0c Mon Sep 17 00:00:00 2001 From: Roger Labbe Date: Wed, 13 Jan 2016 17:33:14 -0800 Subject: [PATCH] Copy editing. --- Appendix-A-Installation.ipynb | 183 +- Appendix-B-Symbols-and-Notations.ipynb | 7 +- Appendix-C-Walking-Through-KF-Code.ipynb | 481 ---- Appendix-F-FilterPy-Code.ipynb | 3048 ---------------------- old-content.ipynb | 275 -- 5 files changed, 56 insertions(+), 3938 deletions(-) delete mode 100644 Appendix-C-Walking-Through-KF-Code.ipynb delete mode 100644 Appendix-F-FilterPy-Code.ipynb delete mode 100644 old-content.ipynb diff --git a/Appendix-A-Installation.ipynb b/Appendix-A-Installation.ipynb index 3ebe3e9..17ef2fa 100644 --- a/Appendix-A-Installation.ipynb +++ b/Appendix-A-Installation.ipynb @@ -39,6 +39,9 @@ "@import url('http://fonts.googleapis.com/css?family=Arimo');\n", "@import url('http://fonts.googleapis.com/css?family=Fira_sans');\n", "\n", + ".CodeMirror pre {\n", + " font-family: 'Source Code Pro', Consolas, monocco, monospace;\n", + "}\n", " div.cell{\n", " width: 900px;\n", " margin-left: 0% !important;\n", @@ -58,7 +61,7 @@ "\t\n", " div.input_area {\n", " background: #F6F6F9;\n", - " border: 1px solid #586e75;\n", + " border: 1px solid #586e75; \n", " }\n", "\n", " .text_cell_render h1 {\n", @@ -152,7 +155,8 @@ "}\n", "\n", " code{\n", - " font-size: 70%;\n", + " font-size: 6pt;\n", + "\n", " }\n", " .rendered_html code{\n", " background-color: transparent;\n", @@ -245,7 +249,8 @@ "\n" - ], - "text/plain": [ - "" - ] - }, - "execution_count": 1, - "metadata": {}, - "output_type": "execute_result" - } - ], - "source": [ - "#format the book\n", - "%matplotlib inline\n", - "from __future__ import division, print_function\n", - "from book_format import load_style\n", - "load_style()" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "collapsed": true - }, - "source": [ - "** author's note: this code is somewhat old. This section needs to be edited; I would not pay a lot of attention to it right now. **\n", - "\n", - "The kalman filter code that we are using is implemented in my Python library `FilterPy`. If you are interested in the full implementation of the filter you should look in `filterpy\\kalman\\kalman_filter.py`. In the following I will present a simplified implementation of the same code. The code in the library handles issues that are beyond the scope of this chapter, such as numerical stability and support for the extended Kalman filter, subject of a later chapter. \n", - "\n", - "The code is implemented as the class `KalmanFilter`. Some Python programmers are not a fan of object oriented (OO) Python, and eschew classes. I do not intend to enter into that battle other than to say that I have often seen OO abused. Here I use the class to encapsulate the data that is pertinent to the filter so that you do not have to store and pass around a half dozen variables everywhere.\n", - "\n", - "The method `__init__()` is used by Python to create the object. Here is the method \n", - "\n", - " def __init__(self, dim_x, dim_z):\n", - " \"\"\" Create a Kalman filter. You are responsible for setting the \n", - " various state variables to reasonable values; the defaults below will\n", - " not give you a functional filter.\n", - " \n", - " Parameters\n", - " ----------\n", - " dim_x : int\n", - " Number of state variables for the Kalman filter. For example, if\n", - " you are tracking the position and velocity of an object in two\n", - " dimensions, dim_x would be 4.\n", - " \n", - " This is used to set the default size of P, Q, and u\n", - " \n", - " dim_z : int\n", - " Number of of measurement inputs. For example, if the sensor\n", - " provides you with position in (x,y), dim_z would be 2. \n", - " \"\"\"\n", - " \n", - " self.dim_x = dim_x\n", - " self.dim_z = dim_z\n", - "\n", - " self.x = np.zeros((dim_x, 1)) # state\n", - " self.P = np.eye(dim_x) # uncertainty covariance\n", - " self.Q = np.eye(dim_x) # process uncertainty\n", - " self.u = 0 # control input vector\n", - " self.B = np.zeros((dim_x, 1))\n", - " self.F = 0 # state transition matrix\n", - " self.H = 0 # Measurement function\n", - " self.R = np.eye(dim_z) # state uncertainty\n", - "\n", - " # identity matrix. Do not alter this.\n", - " self._I = np.eye(dim_x)\n", - "\n", - "More than anything this method exists to document for you what the variable names are in the filter. To do anything useful with this filter you will have to modify most of these values. Some are set to useful values. For example, `R` is set to an identity matrix; if you want the diagonals of `R` to be 10. you may write (as we did earlier in this chapter) `my_filter.R += 10.`.\n", - "\n", - "The names used for each variable matches the math symbology used in this chapter. Thus, `self.P` is the covariance matrix, `self.x` is the state, and so on.\n", - "\n", - "The predict function implements the predict step of the Kalman equations, which are \n", - "\n", - "$$\n", - "\\begin{aligned}\n", - "\\mathbf{x}^- &= \\mathbf{F x} + \\mathbf{B u} \\\\\n", - "\\mathbf{P}^- &= \\mathbf{FP{F}}^\\mathsf{T} + \\mathbf{Q} \n", - "\\end{aligned}\n", - "$$\n", - "\n", - "The corresponding code is\n", - "\n", - " def predict(self): \n", - " self.x = self.F.dot(self.x) + self.B.dot(self.u)\n", - " self.P = self.F.dot(self.P).dot(self.F.T) + self.Q\n", - "\n", - "I haven't discussed the use of NumPy much until now, but this method illustrates the power of that package. We use NumPy's `array` class to store our data and perform the linear algebra for our filters. `array` implements matrix multiplication using the `.dot()` method; if you use `*` you will get element-wise multiplication. As a heavy user of linear algebra this design is somewhat distressing as I use matrix multiplication far more often than element-wise multiplication. However, this design is due to historical developments in the library and we must live with it. The Python community has recognized this problem, and in Python 3.5 we will have the `@` operator to implement matrix multiplication. \n", - "\n", - "With that in mind, the Python code `self.F.dot(self.x)` implements the math expression $\\mathbf{F x}$.\n", - "\n", - "NumPy's `array` implements matrix transposition by using the `.T` property. Therefore, `F.T` is the python implementation of $\\mathbf{F}^\\mathsf{T}$.\n", - "\n", - "The `update()` method implements the update equations of the Kalman filter, which are\n", - "\n", - "$$\n", - "\\begin{aligned}\n", - "\\mathbf{y} &= \\mathbf{z} - \\mathbf{H}\\mathbf{x^-} \\\\\n", - "\\mathbf{K} &= \\mathbf{P} \\mathbf{H}^\\mathsf{T} (\\mathbf{H} \\mathbf{P^-} \\mathbf{H}^\\mathsf{T} +\\mathbf{R})^{-1} \\\\\n", - "\\mathbf{x} &= \\mathbf{x}^- + \\mathbf{K} \\mathbf{y} \\\\\n", - "\\mathbf{P} &= (\\mathbf{I} - \\mathbf{K} \\mathbf{H})\\mathbf{P^-}\n", - "\\end{aligned}\n", - "$$\n", - "\n", - "The corresponding code is:\n", - "\n", - " def update(self, Z, R=None):\n", - " \"\"\"\n", - " Add a new measurement (Z) to the kalman filter. If Z is None, nothing\n", - " is changed.\n", - "\n", - " Optionally provide R to override the measurement noise for this\n", - " one call, otherwise self.R will be used.\n", - "\n", - " self.residual, self.S, and self.K are stored in case you want to\n", - " inspect these variables. Strictly speaking they are not part of the\n", - " output of the Kalman filter, however, it is often useful to know\n", - " what these values are in various scenarios.\n", - " \"\"\"\n", - "\n", - " if Z is None:\n", - " return\n", - "\n", - " if R is None:\n", - " R = self.R\n", - " elif np.isscalar(R):\n", - " R = np.eye(self.dim_z) * R\n", - "\n", - " # error (residual) between measurement and prediction\n", - " self.residual = Z - self.H.dot(self.x)\n", - "\n", - " # project system uncertainty into measurement space\n", - " self.S = self.H.dot(self.P).dot(self.H.T) + R\n", - "\n", - " # map system uncertainty into kalman gain\n", - " self.K = self.P.dot(self.H.T).dot(linalg.inv(self.S))\n", - "\n", - " # predict new x with residual scaled by the kalman gain\n", - " self.x += self.K.dot(self.residual)\n", - "\n", - " KH = self.K.dot(self.H)\n", - " I_KH = self._I - KH\n", - " self.P = (I_KH.dot(self.P.dot(I_KH.T)) +\n", - " self.K.dot(self.R.dot(self.K.T)))\n", - "\n", - "There are a few more complications in this piece of code compared to `predict()` but it should still be quite clear. \n", - "\n", - "The first complication are the lines:\n", - "\n", - " if Z is None:\n", - " return\n", - " \n", - "This just lets you deal with missing data in a natural way. It is typical to use `None` to indicate the absence of data. If there is no data for an update we skip the update equations. This bit of code means you can write something like:\n", - "\n", - " z = read_sensor() # may return None if no data\n", - " my_kf.update(z)\n", - " \n", - "instead of:\n", - " z = read_sensor()\n", - " if z is not None:\n", - " my_kf.update(z)\n", - " \n", - "Reasonable people will argue whether my choice is cleaner, or obscures the fact that we do not update if the measurement is `None`. Having written a lot of avionics code my proclivity is always to do the safe thing. If we pass 'None' into the function I do not want an exception to occur; instead, I want the reasonable thing to happen, which is to just return without doing anything. If you feel that my choice obscures that fact, go ahead and write the explicit `if` statement prior to calling `update()` and get the best of both worlds.\n", - "\n", - "The next bit of code lets you optionally pass in a value to override `R`. It is common for the sensor noise to vary over time; if it does you can pass in the value as the optional parameter `R`.\n", - "\n", - " if R is None:\n", - " R = self.R\n", - " elif np.isscalar(R):\n", - " R = np.eye(self.dim_z) * R\n", - " \n", - "This code will use self.R if you do not provide a value for `R`. If you did provide a value, it will check if you provided a scalar (number); if so it constructs a matrix of the correct dimension for you. Otherwise it assumes that you passed in a matrix of the correct dimension.\n", - "\n", - "The rest of the code implements the Kalman filter equations, with one exception. Instead of implementing \n", - "\n", - "$$\\mathbf{P} = (\\mathbf{I} - \\mathbf{KH})\\mathbf{P}^-$$\n", - "\n", - "it implements the somewhat more complicated form \n", - "\n", - "$$\\mathbf{P} = (\\mathbf{I} - \\mathbf{KH})\\mathbf{P}^-(\\mathbf{I} - \\mathbf{KH})^\\mathsf{T} + \\mathbf{KRK}^\\mathsf{T}$$.\n", - "\n", - "The reason for this altered equation is that it is more numerically stable than the former equation, at the cost of being a bit more expensive to compute. It is not always possible to find the optimal value for $\\text{K}$, in which case the former equation will not produce good results because it assumes optimality. The longer reformulation used in the code is derived from more general math that does not assume optimality, and hence provides good results for non-optimal filters (such as when we can not correctly model our measurement error).\n", - "\n", - "Various texts differ as to whether this form of the equation should always be used, or only used when you know you need it. I choose to expend a bit more processing power to ensure stability; if your hardware is very constrained and you are able to prove that the simpler equation is correct for your problem then you might choose to use it instead. Personally, I find that a risky approach and do not recommend it to non-experts. Brown's *Introduction to Random Signals and Applied Kalman Filtering* [3] discusses this issue in some detail, if you are interested." - ] - }, - { - "cell_type": "markdown", - "metadata": { - "collapsed": true - }, - "source": [] - }, - { - "cell_type": "markdown", - "metadata": { - "collapsed": true - }, - "source": [] - }, - { - "cell_type": "markdown", - "metadata": { - "collapsed": true - }, - "source": [] - } - ], - "metadata": { - "kernelspec": { - "display_name": "Python 3", - "language": "python", - "name": "python3" - }, - "language_info": { - "codemirror_mode": { - "name": "ipython", - "version": 3 - }, - "file_extension": ".py", - "mimetype": "text/x-python", - "name": "python", - "nbconvert_exporter": "python", - "pygments_lexer": "ipython3", - "version": "3.5.0" - } - }, - "nbformat": 4, - "nbformat_minor": 0 -} diff --git a/Appendix-F-FilterPy-Code.ipynb b/Appendix-F-FilterPy-Code.ipynb deleted file mode 100644 index 49f331a..0000000 --- a/Appendix-F-FilterPy-Code.ipynb +++ /dev/null @@ -1,3048 +0,0 @@ -{ - "cells": [ - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "[Table of Contents](http://nbviewer.ipython.org/github/rlabbe/Kalman-and-Bayesian-Filters-in-Python/blob/master/table_of_contents.ipynb)" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "# FilterPy Source" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "For your convienence I have loaded several of FilterPy's core algorithms into this appendix. " - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## g-h Filter" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "collapsed": true - }, - "outputs": [], - "source": [ - "# %load https://raw.githubusercontent.com/rlabbe/filterpy/master/filterpy/gh/gh_filter.py\n", - "\"\"\"Copyright 2015 Roger R Labbe Jr.\n", - "\n", - "FilterPy library.\n", - "http://github.com/rlabbe/filterpy\n", - "\n", - "Documentation at:\n", - "https://filterpy.readthedocs.org\n", - "\n", - "Supporting book at:\n", - "https://github.com/rlabbe/Kalman-and-Bayesian-Filters-in-Python\n", - "\n", - "This is licensed under an MIT license. See the readme.MD file\n", - "for more information.\n", - "\"\"\"\n", - "\n", - "\n", - "from __future__ import (absolute_import, division, print_function,\n", - " unicode_literals)\n", - "\n", - "import numpy as np\n", - "from numpy import dot\n", - "\n", - "\n", - "class GHFilterOrder(object):\n", - " \"\"\" A g-h filter of aspecified order 0, 1, or 2.\n", - "\n", - " Strictly speaking, the g-h filter is order 1, and the 2nd order\n", - " filter is called the g-h-k filter. I'm not aware of any filter name\n", - " that encompasses orders 0, 1, and 2 under one name, or I would use it.\n", - "\n", - " |\n", - " |\n", - "\n", - " **Methods**\n", - " \"\"\"\n", - "\n", - "\n", - " def __init__(self, x0, dt, order, g, h=None, k=None):\n", - " \"\"\" Creates a g-h filter of order 0, 1, or 2.\n", - "\n", - " **Parameters**\n", - "\n", - " x0 : 1D np.array or scalar\n", - " Initial value for the filter state. Each value can be a scalar\n", - " or a np.array.\n", - "\n", - " You can use a scalar for x0. If order > 0, then 0.0 is assumed\n", - " for the higher order terms.\n", - "\n", - " x[0] is the value being tracked\n", - " x[1] is the first derivative (for order 1 and 2 filters)\n", - " x[2] is the second derivative (for order 2 filters)\n", - "\n", - " dt : scalar\n", - " timestep\n", - "\n", - " order : int\n", - " order of the filter. Defines the order of the system\n", - " 0 - assumes system of form x = a_0 + a_1*t\n", - " 1 - assumes system of form x = a_0 +a_1*t + a_2*t^2\n", - " 2 - assumes system of form x = a_0 +a_1*t + a_2*t^2 + a_3*t^3\n", - "\n", - " g : float\n", - " filter g gain parameter.\n", - "\n", - " h : float, optional\n", - " filter h gain parameter, order 1 and 2 only\n", - "\n", - " k : float, optional\n", - " filter k gain parameter, order 2 only\n", - "\n", - " **Members**\n", - "\n", - " self.x : np.array\n", - " State of the filter.\n", - " x[0] is the value being tracked\n", - " x[1] is the derivative of x[0] (order 1 and 2 only)\n", - " x[2] is the 2nd derivative of x[0] (order 2 only)\n", - "\n", - " This is always an np.array, even for order 0 where you can\n", - " initialize x0 with a scalar.\n", - "\n", - " self.residual : np.array\n", - " difference between the measurement and the prediction\n", - " \"\"\"\n", - "\n", - "\n", - " assert order >= 0\n", - " assert order <= 2\n", - "\n", - " if np.isscalar(x0):\n", - " self.x = np.zeros(order+1)\n", - " self.x[0] = x0\n", - " else:\n", - " self.x = np.copy(x0.astype(float))\n", - "\n", - " self.dt = dt\n", - " self.order = order\n", - "\n", - " self.g = g\n", - " self.h = h\n", - " self.k = k\n", - "\n", - "\n", - " def update(self, z, g=None, h=None, k=None):\n", - " \"\"\" update the filter with measurement z. z must be the same type\n", - " or treatable as the same type as self.x[0].\n", - " \"\"\"\n", - "\n", - " if self.order == 0:\n", - " if g is None:\n", - " g = self.g\n", - " self.residual = z - self.x[0]\n", - " self.x += dot(g, self.residual)\n", - "\n", - " elif self.order == 1:\n", - " if g is None:\n", - " g = self.g\n", - " if h is None:\n", - " h = self.h\n", - " x = self.x[0]\n", - " dx = self.x[1]\n", - " dxdt = dot(dx, self.dt)\n", - "\n", - " self.residual = z - (x + dxdt)\n", - " self.x[0] = x + dxdt + g*self.residual\n", - " self.x[1] = dx + h*self.residual / self.dt\n", - "\n", - " else: # order == 2\n", - " if g is None:\n", - " g = self.g\n", - " if h is None:\n", - " h = self.h\n", - " if k is None:\n", - " k = self.k\n", - "\n", - " x = self.x[0]\n", - " dx = self.x[1]\n", - " ddx = self.x[2]\n", - " dxdt = dot(dx, self.dt)\n", - " T2 = self.dt**2.\n", - "\n", - " self.residual = z -(x + dxdt +0.5*ddx*T2)\n", - "\n", - " self.x[0] = x + dxdt + 0.5*ddx*T2 + g*self.residual\n", - " self.x[1] = dx + ddx*self.dt + h*self.residual / self.dt\n", - " self.x[2] = ddx + 2*self.k*self.residual / (self.dt**2)\n", - "\n", - "\n", - "class GHFilter(object):\n", - " \"\"\" Implements the g-h filter. The topic is too large to cover in\n", - " this comment. See my book \"Kalman and Bayesian Filters in Python\" [1]\n", - " or Eli Brookner's \"Tracking and Kalman Filters Made Easy\" [2].\n", - "\n", - " A few basic examples are below, and the tests in ./gh_tests.py may\n", - " give you more ideas on use.\n", - "\n", - " **Examples**\n", - "\n", - " Create a basic filter for a scalar value with g=.8, h=.2.\n", - " Initialize to 0, with a derivative(velocity) of 0.\n", - "\n", - " >>> from filterpy.gh import GHFilter\n", - " >>> f = GHFilter (x=0., dx=0., dt=1., g=.8, h=.2)\n", - "\n", - " Incorporate the measurement of 1\n", - "\n", - " >>> f.update(z=1)\n", - " (0.8, 0.2)\n", - "\n", - " Incorporate a measurement of 2 with g=1 and h=0.01\n", - "\n", - " >>> f.update(z=2, g=1, h=0.01)\n", - " (2.0, 0.21000000000000002)\n", - "\n", - " Create a filter with two independent variables.\n", - "\n", - " >>> from numpy import array\n", - " >>> f = GHFilter (x=array([0,1]), dx=array([0,0]), dt=1, g=.8, h=.02)\n", - "\n", - " and update with the measurements (2,4)\n", - "\n", - " >>> f.update(array([2,4])\n", - " (array([ 1.6, 3.4]), array([ 0.04, 0.06]))\n", - "\n", - "\n", - " **References**\n", - "\n", - " [1] Labbe, \"Kalman and Bayesian Filters in Python\"\n", - " http://rlabbe.github.io/Kalman-and-Bayesian-Filters-in-Python\n", - "\n", - " [2] Brookner, \"Tracking and Kalman Filters Made Easy\". John Wiley and\n", - " Sons, 1998.\n", - "\n", - " |\n", - " |\n", - "\n", - " **Methods**\n", - " \"\"\"\n", - "\n", - " def __init__(self, x, dx, dt, g, h):\n", - " \"\"\" Creates a g-h filter.\n", - "\n", - " **Parameters**\n", - "\n", - " x : 1D np.array or scalar\n", - " Initial value for the filter state. Each value can be a scalar\n", - " or a np.array.\n", - "\n", - " You can use a scalar for x0. If order > 0, then 0.0 is assumed\n", - " for the higher order terms.\n", - "\n", - " x[0] is the value being tracked\n", - " x[1] is the first derivative (for order 1 and 2 filters)\n", - " x[2] is the second derivative (for order 2 filters)\n", - "\n", - " dx : 1D np.array or scalar\n", - " Initial value for the derivative of the filter state.\n", - "\n", - " dt : scalar\n", - " time step\n", - "\n", - " g : float\n", - " filter g gain parameter.\n", - "\n", - " h : float\n", - " filter h gain parameter.\n", - " \"\"\"\n", - "\n", - " assert np.isscalar(dt)\n", - " assert np.isscalar(g)\n", - " assert np.isscalar(h)\n", - "\n", - " self.x = x\n", - " self.dx = dx\n", - " self.dt = dt\n", - " self.g = g\n", - " self.h = h\n", - "\n", - "\n", - " def update (self, z, g=None, h=None):\n", - " \"\"\"performs the g-h filter predict and update step on the\n", - " measurement z. Modifies the member variables listed below,\n", - " and returns the state of x and dx as a tuple as a convienence.\n", - "\n", - " **Modified Members**\n", - "\n", - " x\n", - " filtered state variable\n", - "\n", - " dx\n", - " derivative (velocity) of x\n", - "\n", - " residual\n", - " difference between the measurement and the prediction for x\n", - "\n", - " x_prediction\n", - " predicted value of x before incorporating the measurement z.\n", - "\n", - " dx_prediction\n", - " predicted value of the derivative of x before incorporating the\n", - " measurement z.\n", - "\n", - " **Parameters**\n", - "\n", - " z : any\n", - " the measurement\n", - " g : scalar (optional)\n", - " Override the fixed self.g value for this update\n", - " h : scalar (optional)\n", - " Override the fixed self.h value for this update\n", - "\n", - " **Returns**\n", - "\n", - " x filter output for x\n", - " dx filter output for dx (derivative of x\n", - " \"\"\"\n", - "\n", - " if g is None:\n", - " g = self.g\n", - " if h is None:\n", - " h = self.h\n", - "\n", - " #prediction step\n", - " self.dx_prediction = self.dx\n", - " self.x_prediction = self.x + (self.dx*self.dt)\n", - "\n", - " # update step\n", - " self.residual = z - self.x_prediction\n", - " self.dx = self.dx_prediction + h * self.residual / self.dt\n", - " self.x = self.x_prediction + g * self.residual\n", - "\n", - " return (self.x, self.dx)\n", - "\n", - "\n", - " def batch_filter (self, data, save_predictions=False):\n", - " \"\"\"\n", - " Given a sequenced list of data, performs g-h filter\n", - " with a fixed g and h. See update() if you need to vary g and/or h.\n", - "\n", - " Uses self.x and self.dx to initialize the filter, but DOES NOT\n", - " alter self.x and self.dx during execution, allowing you to use this\n", - " class multiple times without reseting self.x and self.dx. I'm not sure\n", - " how often you would need to do that, but the capability is there.\n", - " More exactly, none of the class member variables are modified\n", - " by this function, in distinct contrast to update(), which changes\n", - " most of them.\n", - "\n", - " **Parameters**\n", - "\n", - " data : list like\n", - " contains the data to be filtered.\n", - "\n", - " save_predictions : boolean\n", - " the predictions will be saved and returned if this is true\n", - "\n", - " **Returns**\n", - "\n", - " results : np.array shape (n+1, 2), where n=len(data)\n", - " contains the results of the filter, where\n", - " results[i,0] is x , and\n", - " results[i,1] is dx (derivative of x)\n", - " First entry is the initial values of x and dx as set by __init__.\n", - "\n", - " predictions : np.array shape(n), optional\n", - " the predictions for each step in the filter. Only retured if\n", - " save_predictions == True\n", - " \"\"\"\n", - "\n", - " x = self.x\n", - " dx = self.dx\n", - " n = len(data)\n", - "\n", - " results = np.zeros((n+1, 2))\n", - " results[0,0] = x\n", - " results[0,1] = dx\n", - "\n", - " if save_predictions:\n", - " predictions = np.zeros(n)\n", - "\n", - " # optimization to avoid n computations of h / dt\n", - " h_dt = self.h / self.dt\n", - "\n", - " for i,z in enumerate(data):\n", - " #prediction step\n", - " x_est = x + (dx*self.dt)\n", - "\n", - " # update step\n", - " residual = z - x_est\n", - " dx = dx + h_dt * residual # i.e. dx = dx + h * residual / dt\n", - " x = x_est + self.g * residual\n", - "\n", - " results[i+1,0] = x\n", - " results[i+1,1] = dx\n", - " if save_predictions:\n", - " predictions[i] = x_est\n", - "\n", - " if save_predictions:\n", - " return results, predictions\n", - " else:\n", - " return results\n", - "\n", - "\n", - " def VRF_prediction(self):\n", - " \"\"\" Returns the Variance Reduction Factor of the prediction\n", - " step of the filter. The VRF is the\n", - " normalized variance for the filter, as given in the equation below.\n", - "\n", - " .. math::\n", - " VRF(\\hat{x}_{n+1,n}) = \\\\frac{VAR(\\hat{x}_{n+1,n})}{\\sigma^2_x}\n", - "\n", - " **References**\n", - "\n", - " Asquith, \"Weight Selection in First Order Linear Filters\"\n", - " Report No RG-TR-69-12, U.S. Army Missle Command. Redstone Arsenal, Al.\n", - " November 24, 1970.\n", - " \"\"\"\n", - "\n", - " g = self.g\n", - " h = self.h\n", - "\n", - " return (2*g**2 + 2*h + g*h) / (g*(4 - 2*g - h))\n", - "\n", - "\n", - " def VRF(self):\n", - " \"\"\" Returns the Variance Reduction Factor (VRF) of the state variable\n", - " of the filter (x) and its derivatives (dx, ddx). The VRF is the\n", - " normalized variance for the filter, as given in the equations below.\n", - "\n", - " .. math::\n", - " VRF(\\hat{x}_{n,n}) = \\\\frac{VAR(\\hat{x}_{n,n})}{\\sigma^2_x}\n", - "\n", - " VRF(\\hat{\\dot{x}}_{n,n}) = \\\\frac{VAR(\\hat{\\dot{x}}_{n,n})}{\\sigma^2_x}\n", - "\n", - " VRF(\\hat{\\ddot{x}}_{n,n}) = \\\\frac{VAR(\\hat{\\ddot{x}}_{n,n})}{\\sigma^2_x}\n", - "\n", - " **Returns**\n", - "\n", - " vrf_x VRF of x state variable\n", - " vrf_dx VRF of the dx state variable (derivative of x)\n", - " \"\"\"\n", - "\n", - " g = self.g\n", - " h = self.h\n", - "\n", - " den = g*(4 - 2*g - h)\n", - "\n", - " vx = (2*g**2 + 2*h - 3*g*h) / den\n", - " vdx = 2*h**2 / (self.dt**2 * den)\n", - "\n", - " return (vx, vdx)\n", - "\n", - "\n", - "class GHKFilter(object):\n", - " \"\"\" Implements the g-h-k filter.\n", - "\n", - " **References**\n", - "\n", - " Brookner, \"Tracking and Kalman Filters Made Easy\". John Wiley and\n", - " Sons, 1998.\n", - "\n", - " |\n", - " |\n", - "\n", - " **Methods**\n", - " \"\"\"\n", - "\n", - " def __init__(self, x, dx, ddx, dt, g, h, k):\n", - " \"\"\" Creates a g-h filter.\n", - "\n", - " **Parameters**\n", - "\n", - " x : 1D np.array or scalar\n", - " Initial value for the filter state. Each value can be a scalar\n", - " or a np.array.\n", - "\n", - " You can use a scalar for x0. If order > 0, then 0.0 is assumed\n", - " for the higher order terms.\n", - "\n", - " x[0] is the value being tracked\n", - " x[1] is the first derivative (for order 1 and 2 filters)\n", - " x[2] is the second derivative (for order 2 filters)\n", - "\n", - " dx : 1D np.array or scalar\n", - " Initial value for the derivative of the filter state.\n", - "\n", - " ddx : 1D np.array or scalar\n", - " Initial value for the second derivative of the filter state.\n", - "\n", - " dt : scalar\n", - " time step\n", - "\n", - " g : float\n", - " filter g gain parameter.\n", - "\n", - " h : float\n", - " filter h gain parameter.\n", - "\n", - " k : float\n", - " filter k gain parameter.\n", - " \"\"\"\n", - "\n", - " assert np.isscalar(dt)\n", - " assert np.isscalar(g)\n", - " assert np.isscalar(h)\n", - "\n", - " self.x = x\n", - " self.dx = dx\n", - " self.ddx = ddx\n", - " self.dt = dt\n", - " self.g = g\n", - " self.h = h\n", - " self.k = k\n", - "\n", - "\n", - " def update (self, z, g=None, h=None, k=None):\n", - " \"\"\"performs the g-h filter predict and update step on the\n", - " measurement z.\n", - "\n", - " On return, self.x, self.dx, self.residual, and self.x_prediction\n", - " will have been updated with the results of the computation. For\n", - " convienence, self.x and self.dx are returned in a tuple.\n", - "\n", - " **Parameters**\n", - "\n", - " z : scalar\n", - " the measurement\n", - " g : scalar (optional)\n", - " Override the fixed self.g value for this update\n", - " h : scalar (optional)\n", - " Override the fixed self.h value for this update\n", - " k : scalar (optional)\n", - " Override the fixed self.k value for this update\n", - "\n", - " **Returns**\n", - "\n", - " x filter output for x\n", - " dx filter output for dx (derivative of x\n", - "\n", - " \"\"\"\n", - "\n", - " if g is None:\n", - " g = self.g\n", - " if h is None:\n", - " h = self.h\n", - " if k is None:\n", - " k = self.k\n", - "\n", - " dt = self.dt\n", - " dt_sqr = dt**2\n", - " #prediction step\n", - " self.ddx_prediction = self.ddx\n", - " self.dx_prediction = self.dx + self.ddx*dt\n", - " self.x_prediction = self.x + self.dx*dt + .5*self.ddx*(dt_sqr)\n", - "\n", - " # update step\n", - " self.residual = z - self.x_prediction\n", - "\n", - " self.ddx = self.ddx_prediction + 2*k*self.residual / dt_sqr\n", - " self.dx = self.dx_prediction + h * self.residual / dt\n", - " self.x = self.x_prediction + g * self.residual\n", - "\n", - " return (self.x, self.dx)\n", - "\n", - "\n", - " def batch_filter (self, data, save_predictions=False):\n", - " \"\"\"\n", - " Performs g-h filter with a fixed g and h.\n", - "\n", - " Uses self.x and self.dx to initialize the filter, but DOES NOT\n", - " alter self.x and self.dx during execution, allowing you to use this\n", - " class multiple times without reseting self.x and self.dx. I'm not sure\n", - " how often you would need to do that, but the capability is there.\n", - " More exactly, none of the class member variables are modified\n", - " by this function.\n", - "\n", - " **Parameters**\n", - "\n", - " data : list_like\n", - " contains the data to be filtered.\n", - "\n", - " save_predictions : boolean\n", - " The predictions will be saved and returned if this is true\n", - "\n", - " **Returns**\n", - "\n", - " results : np.array shape (n+1, 2), where n=len(data)\n", - " contains the results of the filter, where\n", - " results[i,0] is x , and\n", - " results[i,1] is dx (derivative of x)\n", - " First entry is the initial values of x and dx as set by __init__.\n", - "\n", - " predictions : np.array shape(n), or None\n", - " the predictions for each step in the filter. Only returned if\n", - " save_predictions == True\n", - " \"\"\"\n", - "\n", - " x = self.x\n", - " dx = self.dx\n", - " n = len(data)\n", - "\n", - " results = np.zeros((n+1, 2))\n", - " results[0,0] = x\n", - " results[0,1] = dx\n", - "\n", - " if save_predictions:\n", - " predictions = np.zeros(n)\n", - "\n", - " # optimization to avoid n computations of h / dt\n", - " h_dt = self.h / self.dt\n", - "\n", - " for i,z in enumerate(data):\n", - " #prediction step\n", - " x_est = x + (dx*self.dt)\n", - "\n", - " # update step\n", - " residual = z - x_est\n", - " dx = dx + h_dt * residual # i.e. dx = dx + h * residual / dt\n", - " x = x_est + self.g * residual\n", - "\n", - " results[i+1,0] = x\n", - " results[i+1,1] = dx\n", - " if save_predictions:\n", - " predictions[i] = x_est\n", - "\n", - " if save_predictions:\n", - " return results, predictions\n", - " else:\n", - " return results\n", - "\n", - "\n", - " def VRF_prediction(self):\n", - " \"\"\" Returns the Variance Reduction Factor for x of the prediction\n", - " step of the filter.\n", - "\n", - " This implements the equation\n", - "\n", - " .. math::\n", - " VRF(\\hat{x}_{n+1,n}) = \\\\frac{VAR(\\hat{x}_{n+1,n})}{\\sigma^2_x}\n", - "\n", - " **References**\n", - "\n", - " Asquith and Woods, \"Total Error Minimization in First\n", - " and Second Order Prediction Filters\" Report No RE-TR-70-17, U.S.\n", - " Army Missle Command. Redstone Arsenal, Al. November 24, 1970.\n", - " \"\"\"\n", - "\n", - " g = self.g\n", - " h = self.h\n", - " k = self.k\n", - " gh2 = 2*g + h\n", - " return ((g*k*(gh2-4)+ h*(g*gh2+2*h)) /\n", - " (2*k - (g*(h+k)*(gh2-4))))\n", - "\n", - "\n", - " def bias_error(self, dddx):\n", - " \"\"\" Returns the bias error given the specified constant jerk(dddx)\n", - "\n", - " **Parameters**\n", - "\n", - " dddx : type(self.x)\n", - " 3rd derivative (jerk) of the state variable x.\n", - "\n", - " **References**\n", - "\n", - " Asquith and Woods, \"Total Error Minimization in First\n", - " and Second Order Prediction Filters\" Report No RE-TR-70-17, U.S.\n", - " Army Missle Command. Redstone Arsenal, Al. November 24, 1970.\n", - " \"\"\"\n", - " return -self.dt**3 * dddx / (2*self.k)\n", - "\n", - "\n", - " def VRF(self):\n", - " \"\"\" Returns the Variance Reduction Factor (VRF) of the state variable\n", - " of the filter (x) and its derivatives (dx, ddx). The VRF is the\n", - " normalized variance for the filter, as given in the equations below.\n", - "\n", - " .. math::\n", - " VRF(\\hat{x}_{n,n}) = \\\\frac{VAR(\\hat{x}_{n,n})}{\\sigma^2_x}\n", - "\n", - " VRF(\\hat{\\dot{x}}_{n,n}) = \\\\frac{VAR(\\hat{\\dot{x}}_{n,n})}{\\sigma^2_x}\n", - "\n", - " VRF(\\hat{\\ddot{x}}_{n,n}) = \\\\frac{VAR(\\hat{\\ddot{x}}_{n,n})}{\\sigma^2_x}\n", - "\n", - " **Returns**\n", - "\n", - " vrf_x : type(x)\n", - " VRF of x state variable\n", - "\n", - " vrf_dx : type(x)\n", - " VRF of the dx state variable (derivative of x)\n", - "\n", - " vrf_ddx : type(x)\n", - " VRF of the ddx state variable (second derivative of x)\n", - " \"\"\"\n", - "\n", - " g = self.g\n", - " h = self.h\n", - " k = self.k\n", - "\n", - " # common subexpressions in the equations pulled out for efficiency,\n", - " # they don't 'mean' anything.\n", - " hg4 = 4- 2*g - h\n", - " ghk = g*h + g*k - 2*k\n", - "\n", - " vx = (2*h*(2*(g**2) + 2*h - 3*g*h) - 2*g*k*hg4) / (2*k - g*(h+k) * hg4)\n", - " vdx = (2*(h**3) - 4*(h**2)*k + 4*(k**2)*(2-g)) / (2*hg4*ghk)\n", - " vddx = 8*h*(k**2) / ((self.dt**4)*hg4*ghk)\n", - "\n", - " return (vx, vdx, vddx)\n", - "\n", - "\n", - "def optimal_noise_smoothing(g):\n", - " \"\"\" provides g,h,k parameters for optimal smoothing of noise for a given\n", - " value of g. This is due to Polge and Bhagavan[1].\n", - "\n", - " **Parameters**\n", - "\n", - " g : float\n", - " value for g for which we will optimize for\n", - "\n", - " **Returns**\n", - "\n", - " (g,h,k) : (float, float, float)\n", - " values for g,h,k that provide optimal smoothing of noise\n", - "\n", - "\n", - " **Example**::\n", - "\n", - " from filterpy.gh import GHKFilter, optimal_noise_smoothing\n", - "\n", - " g,h,k = optimal_noise_smoothing(g)\n", - " f = GHKFilter(0,0,0,1,g,h,k)\n", - " f.update(1.)\n", - "\n", - "\n", - " **References**\n", - "\n", - " [1] Polge and Bhagavan. \"A Study of the g-h-k Tracking Filter\".\n", - " Report No. RE-CR-76-1. University of Alabama in Huntsville.\n", - " July, 1975\n", - "\n", - " \"\"\"\n", - "\n", - " h = ((2*g**3 - 4*g**2) + (4*g**6 -64*g**5 + 64*g**4)**.5) / (8*(1-g))\n", - " k = (h*(2-g) - g**2) / g\n", - "\n", - " return (g,h,k)\n", - "\n", - "\n", - "def least_squares_parameters(n):\n", - " \"\"\" An order 1 least squared filter can be computed by a g-h filter\n", - " by varying g and h over time according to the formulas below, where\n", - " the first measurement is at n=0, the second is at n=1, and so on:\n", - "\n", - " .. math::\n", - "\n", - " h_n = \\\\frac{6}{(n+2)(n+1)}\n", - "\n", - " g_n = \\\\frac{2(2n+1)}{(n+2)(n+1)}\n", - "\n", - "\n", - "\n", - " **Parameters**\n", - "\n", - " n : int\n", - " the nth measurement, starting at 0 (i.e. first measurement has n==0)\n", - "\n", - " **Returns**\n", - "\n", - " (g,h) : (float, float)\n", - " g and h parameters for this time step for the least-squares filter\n", - "\n", - " **Example**::\n", - "\n", - " from filterpy.gh import GHFilter, least_squares_parameters\n", - "\n", - " lsf = GHFilter (0, 0, 1, 0, 0)\n", - " z = 10\n", - " for i in range(10):\n", - " g,h = least_squares_parameters(i)\n", - " lsf.update(z, g, h)\n", - "\n", - " \"\"\"\n", - " den = (n+2)*(n+1)\n", - "\n", - " g = (2*(2*n + 1)) / den\n", - " h = 6 / den\n", - " return (g,h)\n", - "\n", - "\n", - "def critical_damping_parameters(theta, order=2):\n", - " \"\"\" Computes values for g and h (and k for g-h-k filter) for a\n", - " critically damped filter.\n", - "\n", - " The idea here is to create a filter that reduces the influence of\n", - " old data as new data comes in. This allows the filter to track a\n", - " moving target better. This goes by different names. It may be called the\n", - " discounted least-squares g-h filter, a fading-memory polynomal filter\n", - " of order 1, or a critically damped g-h filter.\n", - "\n", - " In a normal least-squares filter we compute the error for each point as\n", - "\n", - " .. math::\n", - "\n", - " \\epsilon_t = (z-\\\\hat{x})^2\n", - "\n", - " For a crically damped filter we reduce the influence of each error by\n", - "\n", - " .. math::\n", - "\n", - " \\\\theta^{t-i}\n", - "\n", - " where\n", - "\n", - " .. math::\n", - "\n", - " 0 <= \\\\theta <= 1\n", - "\n", - " In other words the last error is scaled by theta, the next to last by\n", - " theta squared, the next by theta cubed, and so on.\n", - "\n", - " **Parameters**\n", - "\n", - " theta : float, 0 <= theta <= 1\n", - " scaling factor for previous terms\n", - "\n", - " order : int, 2 (default) or 3\n", - " order of filter to create the parameters for. g and h will be\n", - " calculated for the order 2, and g, h, and k for order 3.\n", - "\n", - " **Returns**\n", - "\n", - " g : scalar\n", - " optimal value for g in the g-h or g-h-k filter\n", - "\n", - " h : scalar\n", - " optimal value for h in the g-h or g-h-k filter\n", - "\n", - " k : scalar\n", - " optimal value for g in the g-h-k filter\n", - "\n", - " **Example**::\n", - "\n", - " from filterpy.gh import GHFilter, critical_damping_parameters\n", - "\n", - " g,h = critical_damping_parameters(0.3)\n", - " critical_filter = GHFilter(0, 0, 1, g, h)\n", - "\n", - " **References**\n", - "\n", - " Brookner, \"Tracking and Kalman Filters Made Easy\". John Wiley and\n", - " Sons, 1998.\n", - "\n", - " Polge and Bhagavan. \"A Study of the g-h-k Tracking Filter\".\n", - " Report No. RE-CR-76-1. University of Alabama in Huntsville.\n", - " July, 1975\n", - "\n", - " \"\"\"\n", - " assert theta >= 0\n", - " assert theta <= 1\n", - "\n", - " if order == 2:\n", - " return (1. - theta**2, (1. - theta)**2)\n", - "\n", - " if order == 3:\n", - " return (1. - theta**3, 1.5*(1.-theta**2)*(1.-theta), .5*(1 - theta)**3)\n", - "\n", - " raise Exception('bad order specified: {}'.format(order))\n", - "\n", - "\n", - "def benedict_bornder_constants(g, critical=False):\n", - " \"\"\" Computes the g,h constants for a Benedict-Bordner filter, which\n", - " minimizes transient errors for a g-h filter.\n", - "\n", - " Returns the values g,h for a specified g. Strictly speaking, only h\n", - " is computed, g is returned unchanged.\n", - "\n", - " The default formula for the Benedict-Bordner allows ringing. We can\n", - " \"nearly\" critically damp it; ringing will be reduced, but not entirely\n", - " eliminated at the cost of reduced performance.\n", - "\n", - " **Parameters**\n", - "\n", - " g : float\n", - " scaling factor g for the filter\n", - "\n", - " critical : boolean, default False\n", - " Attempts to critically damp the filter.\n", - "\n", - " **Returns**\n", - "\n", - " g : float\n", - " scaling factor g (same as the g that was passed in)\n", - "\n", - " h : float\n", - " scaling factor h that minimizes the transient errors\n", - "\n", - " **Example**::\n", - "\n", - " from filterpy.gh import GHFilter, benedict_bornder_constants\n", - " g, h = benedict_bornder_constants(.855)\n", - " f = GHFilter(0, 0, 1, g, h)\n", - "\n", - " **References**\n", - "\n", - " Brookner, \"Tracking and Kalman Filters Made Easy\". John Wiley and\n", - " Sons, 1998.\n", - "\n", - " \"\"\"\n", - "\n", - " g_sqr = g**2\n", - " if critical:\n", - " return (g, 0.8 * (2. - g_sqr - 2*(1-g_sqr)**.5) / g_sqr)\n", - " else:\n", - " return (g, g_sqr / (2.-g))\n" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## KalmanFilter" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "collapsed": false - }, - "outputs": [], - "source": [ - "# %load https://raw.githubusercontent.com/rlabbe/filterpy/master/filterpy/kalman/kalman_filter.py\n", - "\n", - "\"\"\"Copyright 2015 Roger R Labbe Jr.\n", - "\n", - "FilterPy library.\n", - "http://github.com/rlabbe/filterpy\n", - "\n", - "Documentation at:\n", - "https://filterpy.readthedocs.org\n", - "\n", - "Supporting book at:\n", - "https://github.com/rlabbe/Kalman-and-Bayesian-Filters-in-Python\n", - "\n", - "This is licensed under an MIT license. See the readme.MD file\n", - "for more information.\n", - "\"\"\"\n", - "\n", - "from __future__ import (absolute_import, division, print_function,\n", - " unicode_literals)\n", - "from filterpy.common import setter, setter_1d, setter_scalar, dot3\n", - "import numpy as np\n", - "from numpy import dot, zeros, eye, isscalar\n", - "import scipy.linalg as linalg\n", - "from scipy.stats import multivariate_normal\n", - "\n", - "\n", - "class KalmanFilter(object):\n", - " \"\"\" Implements a Kalman filter. You are responsible for setting the\n", - " various state variables to reasonable values; the defaults will\n", - " not give you a functional filter.\n", - "\n", - " You will have to set the following attributes after constructing this\n", - " object for the filter to perform properly. Please note that there are\n", - " various checks in place to ensure that you have made everything the\n", - " 'correct' size. However, it is possible to provide incorrectly sized\n", - " arrays such that the linear algebra can not perform an operation.\n", - " It can also fail silently - you can end up with matrices of a size that\n", - " allows the linear algebra to work, but are the wrong shape for the problem\n", - " you are trying to solve.\n", - "\n", - " **Attributes**\n", - "\n", - " x : numpy.array(dim_x, 1)\n", - " state estimate vector\n", - "\n", - " P : numpy.array(dim_x, dim_x)\n", - " covariance estimate matrix\n", - "\n", - " R : numpy.array(dim_z, dim_z)\n", - " measurement noise matrix\n", - "\n", - " Q : numpy.array(dim_x, dim_x)\n", - " process noise matrix\n", - "\n", - " F : numpy.array()\n", - " State Transition matrix\n", - "\n", - " H : numpy.array(dim_x, dim_x)\n", - "\n", - "\n", - " You may read the following attributes.\n", - "\n", - " **Readable Attributes**\n", - "\n", - " y : numpy.array\n", - " Residual of the update step.\n", - "\n", - " K : numpy.array(dim_x, dim_z)\n", - " Kalman gain of the update step\n", - "\n", - " S : numpy.array\n", - " Systen uncertaintly projected to measurement space\n", - "\n", - " likelihood : scalar\n", - " Likelihood of last measurment update.\n", - "\n", - " log_likelihood : scalar\n", - " Log likelihood of last measurment update.\n", - "\n", - "\n", - " \"\"\"\n", - "\n", - "\n", - "\n", - " def __init__(self, dim_x, dim_z, dim_u=0):\n", - " \"\"\" Create a Kalman filter. You are responsible for setting the\n", - " various state variables to reasonable values; the defaults below will\n", - " not give you a functional filter.\n", - "\n", - " **Parameters**\n", - "\n", - " dim_x : int\n", - " Number of state variables for the Kalman filter. For example, if\n", - " you are tracking the position and velocity of an object in two\n", - " dimensions, dim_x would be 4.\n", - "\n", - " This is used to set the default size of P, Q, and u\n", - "\n", - " dim_z : int\n", - " Number of of measurement inputs. For example, if the sensor\n", - " provides you with position in (x,y), dim_z would be 2.\n", - "\n", - " dim_u : int (optional)\n", - " size of the control input, if it is being used.\n", - " Default value of 0 indicates it is not used.\n", - " \"\"\"\n", - "\n", - " assert dim_x > 0\n", - " assert dim_z > 0\n", - " assert dim_u >= 0\n", - "\n", - " self.dim_x = dim_x\n", - " self.dim_z = dim_z\n", - " self.dim_u = dim_u\n", - "\n", - " self._x = zeros((dim_x,1)) # state\n", - " self._P = eye(dim_x) # uncertainty covariance\n", - " self._Q = eye(dim_x) # process uncertainty\n", - " self._B = 0 # control transition matrix\n", - " self._F = 0 # state transition matrix\n", - " self.H = 0 # Measurement function\n", - " self.R = eye(dim_z) # state uncertainty\n", - " self._alpha_sq = 1. # fading memory control\n", - " self.M = 0 # process-measurement cross correlation\n", - "\n", - " # gain and residual are computed during the innovation step. We\n", - " # save them so that in case you want to inspect them for various\n", - " # purposes\n", - " self._K = 0 # kalman gain\n", - " self._y = zeros((dim_z, 1))\n", - " self._S = np.zeros((dim_z, dim_z)) # system uncertainty\n", - "\n", - " # identity matrix. Do not alter this.\n", - " self._I = np.eye(dim_x)\n", - "\n", - "\n", - " def update(self, z, R=None, H=None):\n", - " \"\"\"\n", - " Add a new measurement (z) to the Kalman filter. If z is None, nothing\n", - " is changed.\n", - "\n", - " **Parameters**\n", - "\n", - " z : np.array\n", - " measurement for this update.\n", - "\n", - " R : np.array, scalar, or None\n", - " Optionally provide R to override the measurement noise for this\n", - " one call, otherwise self.R will be used.\n", - "\n", - " H : np.array, or None\n", - " Optionally provide H to override the measurement function for this\n", - " one call, otherwise self.H will be used.\n", - "\n", - " \"\"\"\n", - "\n", - " if z is None:\n", - " return\n", - "\n", - " if R is None:\n", - " R = self.R\n", - " elif isscalar(R):\n", - " R = eye(self.dim_z) * R\n", - "\n", - " # rename for readability and a tiny extra bit of speed\n", - " if H is None:\n", - " H = self.H\n", - " P = self._P\n", - " x = self._x\n", - "\n", - " # y = z - Hx\n", - " # error (residual) between measurement and prediction\n", - " self._y = z - dot(H, x)\n", - "\n", - " # S = HPH' + R\n", - " # project system uncertainty into measurement space\n", - " S = dot3(H, P, H.T) + R\n", - "\n", - " mean = np.array(dot(H, x)).flatten()\n", - " flatz = np.array(z).flatten()\n", - "\n", - " self.likelihood = multivariate_normal.pdf(flatz, mean, cov=S, allow_singular=True)\n", - " self.log_likelihood = multivariate_normal.logpdf(flatz, mean, cov=S, allow_singular=True)\n", - "\n", - " # K = PH'inv(S)\n", - " # map system uncertainty into kalman gain\n", - " K = dot3(P, H.T, linalg.inv(S))\n", - "\n", - " # x = x + Ky\n", - " # predict new x with residual scaled by the kalman gain\n", - " self._x = x + dot(K, self._y)\n", - "\n", - " # P = (I-KH)P(I-KH)' + KRK'\n", - " I_KH = self._I - dot(K, H)\n", - " self._P = dot3(I_KH, P, I_KH.T) + dot3(K, R, K.T)\n", - "\n", - " self._S = S\n", - " self._K = K\n", - "\n", - "\n", - "\n", - " def update_correlated(self, z, R=None, H=None):\n", - " \"\"\" Add a new measurement (z) to the Kalman filter assuming that\n", - " process noise and measurement noise are correlated as defined in\n", - " the `self.M` matrix.\n", - "\n", - " If z is None, nothing is changed.\n", - "\n", - " **Parameters**\n", - "\n", - " z : np.array\n", - " measurement for this update.\n", - "\n", - " R : np.array, scalar, or None\n", - " Optionally provide R to override the measurement noise for this\n", - " one call, otherwise self.R will be used.\n", - "\n", - " H : np.array, or None\n", - " Optionally provide H to override the measurement function for this\n", - " one call, otherwise self.H will be used.\n", - "\n", - " \"\"\"\n", - "\n", - " if z is None:\n", - " return\n", - "\n", - " if R is None:\n", - " R = self.R\n", - " elif isscalar(R):\n", - " R = eye(self.dim_z) * R\n", - "\n", - " # rename for readability and a tiny extra bit of speed\n", - " if H is None:\n", - " H = self.H\n", - " x = self._x\n", - " P = self._P\n", - " M = self.M\n", - "\n", - " # y = z - Hx\n", - " # error (residual) between measurement and prediction\n", - " self._y = z - dot(H, x)\n", - "\n", - " # project system uncertainty into measurement space\n", - " S = dot3(H, P, H.T) + dot(H, M) + dot(M.T, H.T) + R\n", - "\n", - " mean = np.array(dot(H, x)).flatten()\n", - " flatz = np.array(z).flatten()\n", - "\n", - " self.likelihood = multivariate_normal.pdf(flatz, mean, cov=S, allow_singular=True)\n", - " self.log_likelihood = multivariate_normal.logpdf(flatz, mean, cov=S, allow_singular=True)\n", - "\n", - " # K = PH'inv(S)\n", - " # map system uncertainty into kalman gain\n", - " K = dot(dot(P, H.T) + M, linalg.inv(S))\n", - "\n", - " # x = x + Ky\n", - " # predict new x with residual scaled by the kalman gain\n", - " self._x = x + dot(K, self._y)\n", - " self._P = P - dot(K, dot(H, P) + M.T)\n", - "\n", - " self._S = S\n", - " self._K = K\n", - "\n", - "\n", - " def test_matrix_dimensions(self):\n", - " \"\"\" Performs a series of asserts to check that the size of everything\n", - " is what it should be. This can help you debug problems in your design.\n", - "\n", - " This is only a test; you do not need to use it while filtering.\n", - " However, to use you will want to perform at least one predict() and\n", - " one update() before calling; some bad designs will cause the shapes\n", - " of x and P to change in a silent and bad way. For example, if you\n", - " pass in a badly dimensioned z into update that can cause x to be\n", - " misshapen.\"\"\"\n", - "\n", - " assert self._x.shape == (self.dim_x, 1), \\\n", - " \"Shape of x must be ({},{}), but is {}\".format(\n", - " self.dim_x, 1, self._x.shape)\n", - "\n", - " assert self._P.shape == (self.dim_x, self.dim_x), \\\n", - " \"Shape of P must be ({},{}), but is {}\".format(\n", - " self.dim_x, self.dim_x, self._P.shape)\n", - "\n", - " assert self._Q.shape == (self.dim_x, self.dim_x), \\\n", - " \"Shape of P must be ({},{}), but is {}\".format(\n", - " self.dim_x, self.dim_x, self._P.shape)\n", - "\n", - "\n", - " def predict(self, u=0, B=None, F=None, Q=None):\n", - " \"\"\" Predict next position using the Kalman filter state propagation\n", - " equations.\n", - "\n", - " **Parameters**\n", - "\n", - " u : np.array\n", - " Optional control vector. If non-zero, it is multiplied by B\n", - " to create the control input into the system.\n", - "\n", - " B : np.array(dim_x, dim_z), or None\n", - " Optional control transition matrix; a value of None in\n", - " any position will cause the filter to use `self.B`.\n", - "\n", - " F : np.array(dim_x, dim_x), or None\n", - " Optional state transition matrix; a value of None in\n", - " any position will cause the filter to use `self.F`.\n", - "\n", - " Q : np.array(dim_x, dim_x), scalar, or None\n", - " Optional process noise matrix; a value of None in\n", - " any position will cause the filter to use `self.Q`.\n", - " \"\"\"\n", - "\n", - " if B is None:\n", - " B = self._B\n", - " if F is None:\n", - " F = self._F\n", - " if Q is None:\n", - " Q = self._Q\n", - " elif isscalar(Q):\n", - " Q = eye(self.dim_x) * Q\n", - "\n", - " # x = Fx + Bu\n", - " self._x = dot(F, self.x) + dot(B, u)\n", - "\n", - " # P = FPF' + Q\n", - " self._P = self._alpha_sq * dot3(F, self._P, F.T) + Q\n", - "\n", - "\n", - " def batch_filter(self, zs, Fs=None, Qs=None, Hs=None, Rs=None, Bs=None, us=None, update_first=False):\n", - " \"\"\" Batch processes a sequences of measurements.\n", - "\n", - " **Parameters**\n", - "\n", - " zs : list-like\n", - " list of measurements at each time step `self.dt` Missing\n", - " measurements must be represented by 'None'.\n", - "\n", - " Fs : list-like, optional\n", - " optional list of values to use for the state transition matrix matrix;\n", - " a value of None in any position will cause the filter\n", - " to use `self.F` for that time step.\n", - "\n", - " Qs : list-like, optional\n", - " optional list of values to use for the process error\n", - " covariance; a value of None in any position will cause the filter\n", - " to use `self.Q` for that time step.\n", - "\n", - " Hs : list-like, optional\n", - " optional list of values to use for the measurement matrix;\n", - " a value of None in any position will cause the filter\n", - " to use `self.H` for that time step.\n", - "\n", - " Rs : list-like, optional\n", - " optional list of values to use for the measurement error\n", - " covariance; a value of None in any position will cause the filter\n", - " to use `self.R` for that time step.\n", - "\n", - " Bs : list-like, optional\n", - " optional list of values to use for the control transition matrix;\n", - " a value of None in any position will cause the filter\n", - " to use `self.B` for that time step.\n", - "\n", - " us : list-like, optional\n", - " optional list of values to use for the control input vector;\n", - " a value of None in any position will cause the filter to use\n", - " 0 for that time step.\n", - "\n", - " update_first : bool, optional,\n", - " controls whether the order of operations is update followed by\n", - " predict, or predict followed by update. Default is predict->update.\n", - "\n", - " **Returns**\n", - "\n", - " means: np.array((n,dim_x,1))\n", - " array of the state for each time step after the update. Each entry\n", - " is an np.array. In other words `means[k,:]` is the state at step\n", - " `k`.\n", - "\n", - " covariance: np.array((n,dim_x,dim_x))\n", - " array of the covariances for each time step after the update.\n", - " In other words `covariance[k,:,:]` is the covariance at step `k`.\n", - "\n", - " means_predictions: np.array((n,dim_x,1))\n", - " array of the state for each time step after the predictions. Each\n", - " entry is an np.array. In other words `means[k,:]` is the state at\n", - " step `k`.\n", - "\n", - " covariance_predictions: np.array((n,dim_x,dim_x))\n", - " array of the covariances for each time step after the prediction.\n", - " In other words `covariance[k,:,:]` is the covariance at step `k`.\n", - "\n", - " **Example**\n", - "\n", - " zs = [t + random.randn()*4 for t in range (40)]\n", - " Fs = [kf.F for t in range (40)]\n", - " Hs = [kf.H for t in range (40)]\n", - "\n", - " (mu, cov, _, _) = kf.batch_filter(zs, Rs=R_list, Fs=Fs, Hs=Hs, Qs=None,\n", - " Bs=None, us=None, update_first=False)\n", - " (xs, Ps, Ks) = kf.rts_smoother(mu, cov, Fs=Fs, Qs=None)\n", - "\n", - " \"\"\"\n", - "\n", - " n = np.size(zs,0)\n", - " if Fs is None:\n", - " Fs = [self.F] * n\n", - " if Qs is None:\n", - " Qs = [self.Q] * n\n", - " if Hs is None:\n", - " Hs = [self.H] * n\n", - " if Rs is None:\n", - " Rs = [self.R] * n\n", - " if Bs is None:\n", - " Bs = [self.B] * n\n", - " if us is None:\n", - " us = [0] * n\n", - "\n", - " # mean estimates from Kalman Filter\n", - " if self.x.ndim == 1:\n", - " means = zeros((n, self.dim_x))\n", - " means_p = zeros((n, self.dim_x))\n", - " else:\n", - " means = zeros((n, self.dim_x, 1))\n", - " means_p = zeros((n, self.dim_x, 1))\n", - "\n", - " # state covariances from Kalman Filter\n", - " covariances = zeros((n, self.dim_x, self.dim_x))\n", - " covariances_p = zeros((n, self.dim_x, self.dim_x))\n", - "\n", - " if update_first:\n", - " for i, (z, F, Q, H, R, B, u) in enumerate(zip(zs, Fs, Qs, Hs, Rs, Bs, us)):\n", - "\n", - " self.update(z, R=R, H=H)\n", - " means[i,:] = self._x\n", - " covariances[i,:,:] = self._P\n", - "\n", - " self.predict(u=u, B=B, F=F, Q=Q)\n", - " means_p[i,:] = self._x\n", - " covariances_p[i,:,:] = self._P\n", - " else:\n", - " for i, (z, F, Q, H, R, B, u) in enumerate(zip(zs, Fs, Qs, Hs, Rs, Bs, us)):\n", - "\n", - " self.predict(u=u, B=B, F=F, Q=Q)\n", - " means_p[i,:] = self._x\n", - " covariances_p[i,:,:] = self._P\n", - "\n", - " self.update(z, R=R, H=H)\n", - " means[i,:] = self._x\n", - " covariances[i,:,:] = self._P\n", - "\n", - " return (means, covariances, means_p, covariances_p)\n", - "\n", - "\n", - "\n", - " def rts_smoother(self, Xs, Ps, Fs=None, Qs=None):\n", - " \"\"\" Runs the Rauch-Tung-Striebal Kalman smoother on a set of\n", - " means and covariances computed by a Kalman filter. The usual input\n", - " would come from the output of `KalmanFilter.batch_filter()`.\n", - "\n", - " **Parameters**\n", - "\n", - " Xs : numpy.array\n", - " array of the means (state variable x) of the output of a Kalman\n", - " filter.\n", - "\n", - " Ps : numpy.array\n", - " array of the covariances of the output of a kalman filter.\n", - "\n", - " Fs : list-like collection of numpy.array, optional\n", - " State transition matrix of the Kalman filter at each time step. \n", - " Optional, if not provided the filter's self.F will be used\n", - "\n", - " Qs : list-like collection of numpy.array, optional\n", - " Process noise of the Kalman filter at each time step. Optional,\n", - " if not provided the filter's self.Q will be used\n", - "\n", - " **Returns**\n", - "\n", - " 'x' : numpy.ndarray\n", - " smoothed means\n", - "\n", - " 'P' : numpy.ndarray\n", - " smoothed state covariances\n", - "\n", - " 'K' : numpy.ndarray\n", - " smoother gain at each step\n", - "\n", - "\n", - " **Example**::\n", - "\n", - " zs = [t + random.randn()*4 for t in range (40)]\n", - "\n", - " (mu, cov, _, _) = kalman.batch_filter(zs)\n", - " (x, P, K) = rts_smoother(mu, cov, kf.F, kf.Q)\n", - "\n", - " \"\"\"\n", - "\n", - " assert len(Xs) == len(Ps)\n", - " shape = Xs.shape\n", - " n = shape[0]\n", - " dim_x = shape[1]\n", - "\n", - " if Fs is None:\n", - " Fs = [self.F] * n\n", - " if Qs is None:\n", - " Qs = [self.Q] * n\n", - "\n", - " # smoother gain\n", - " K = zeros((n,dim_x,dim_x))\n", - "\n", - " x, P = Xs.copy(), Ps.copy()\n", - "\n", - " for k in range(n-2,-1,-1):\n", - " P_pred = dot3(Fs[k], P[k], Fs[k].T) + Qs[k]\n", - "\n", - " K[k] = dot3(P[k], Fs[k].T, linalg.inv(P_pred))\n", - " x[k] += dot (K[k], x[k+1] - dot(Fs[k], x[k]))\n", - " P[k] += dot3 (K[k], P[k+1] - P_pred, K[k].T)\n", - "\n", - " return (x, P, K)\n", - "\n", - "\n", - " def get_prediction(self, u=0):\n", - " \"\"\" Predicts the next state of the filter and returns it. Does not\n", - " alter the state of the filter.\n", - "\n", - " **Parameters**\n", - "\n", - " u : np.array\n", - " optional control input\n", - "\n", - " **Returns**\n", - "\n", - " (x, P)\n", - " State vector and covariance array of the prediction.\n", - " \"\"\"\n", - "\n", - " x = dot(self._F, self._x) + dot(self._B, u)\n", - " P = self._alpha_sq * dot3(self._F, self._P, self._F.T) + self._Q\n", - " return (x, P)\n", - "\n", - "\n", - " def residual_of(self, z):\n", - " \"\"\" returns the residual for the given measurement (z). Does not alter\n", - " the state of the filter.\n", - " \"\"\"\n", - " return z - dot(self.H, self._x)\n", - "\n", - "\n", - " def measurement_of_state(self, x):\n", - " \"\"\" Helper function that converts a state into a measurement.\n", - "\n", - " **Parameters**\n", - "\n", - " x : np.array\n", - " kalman state vector\n", - "\n", - " **Returns**\n", - "\n", - " z : np.array\n", - " measurement corresponding to the given state\n", - " \"\"\"\n", - "\n", - " return dot(self.H, x)\n", - "\n", - "\n", - " @property\n", - " def alpha(self):\n", - " \"\"\" Fading memory setting. 1.0 gives the normal Kalman filter, and\n", - " values slightly larger than 1.0 (such as 1.02) give a fading\n", - " memory effect - previous measurements have less influence on the\n", - " filter's estimates. This formulation of the Fading memory filter\n", - " (there are many) is due to Dan Simon [1].\n", - "\n", - " ** References **\n", - "\n", - " [1] Dan Simon. \"Optimal State Estimation.\" John Wiley & Sons.\n", - " p. 208-212. (2006)\n", - " \"\"\"\n", - "\n", - " return self._alpha_sq**.5\n", - "\n", - "\n", - " @alpha.setter\n", - " def alpha(self, value):\n", - " assert np.isscalar(value)\n", - " assert value > 0\n", - "\n", - " self._alpha_sq = value**2\n", - "\n", - " @property\n", - " def Q(self):\n", - " \"\"\" Process uncertainty\"\"\"\n", - " return self._Q\n", - "\n", - "\n", - " @Q.setter\n", - " def Q(self, value):\n", - " self._Q = setter_scalar(value, self.dim_x)\n", - "\n", - " @property\n", - " def P(self):\n", - " \"\"\" covariance matrix\"\"\"\n", - " return self._P\n", - "\n", - "\n", - " @P.setter\n", - " def P(self, value):\n", - " self._P = setter_scalar(value, self.dim_x)\n", - "\n", - "\n", - " @property\n", - " def F(self):\n", - " \"\"\" state transition matrix\"\"\"\n", - " return self._F\n", - "\n", - "\n", - " @F.setter\n", - " def F(self, value):\n", - " self._F = setter(value, self.dim_x, self.dim_x)\n", - "\n", - " @property\n", - " def B(self):\n", - " \"\"\" control transition matrix\"\"\"\n", - " return self._B\n", - "\n", - "\n", - " @B.setter\n", - " def B(self, value):\n", - " self._B = value\n", - " \"\"\" control transition matrix\"\"\"\n", - " if np.isscalar(value):\n", - " self._B = value\n", - " else:\n", - " self._B = setter (value, self.dim_x, self.dim_u)\n", - "\n", - "\n", - " @property\n", - " def x(self):\n", - " \"\"\" filter state vector.\"\"\"\n", - " return self._x\n", - "\n", - "\n", - " @x.setter\n", - " def x(self, value):\n", - " self._x = setter_1d(value, self.dim_x)\n", - "\n", - "\n", - " @property\n", - " def K(self):\n", - " \"\"\" Kalman gain \"\"\"\n", - " return self._K\n", - "\n", - "\n", - " @property\n", - " def y(self):\n", - " \"\"\" measurement residual (innovation) \"\"\"\n", - " return self._y\n", - "\n", - " @property\n", - " def S(self):\n", - " \"\"\" system uncertainty in measurement space \"\"\"\n", - " return self._S\n" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## ExtendedKalmanFilter" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "collapsed": true - }, - "outputs": [], - "source": [ - "# %load https://raw.githubusercontent.com/rlabbe/filterpy/master/filterpy/kalman/EKF.py\n", - "\n", - "\"\"\"Copyright 2015 Roger R Labbe Jr.\n", - "\n", - "FilterPy library.\n", - "http://github.com/rlabbe/filterpy\n", - "\n", - "Documentation at:\n", - "https://filterpy.readthedocs.org\n", - "\n", - "Supporting book at:\n", - "https://github.com/rlabbe/Kalman-and-Bayesian-Filters-in-Python\n", - "\n", - "This is licensed under an MIT license. See the readme.MD file\n", - "for more information.\n", - "\"\"\"\n", - "\n", - "from __future__ import (absolute_import, division, print_function,\n", - " unicode_literals)\n", - "import numpy as np\n", - "import scipy.linalg as linalg\n", - "from numpy import dot, zeros, eye\n", - "from filterpy.common import setter, setter_1d, setter_scalar, dot3\n", - "\n", - "\n", - "class ExtendedKalmanFilter(object):\n", - "\n", - " def __init__(self, dim_x, dim_z, dim_u=0):\n", - " \"\"\" Extended Kalman filter. You are responsible for setting the\n", - " various state variables to reasonable values; the defaults below will\n", - " not give you a functional filter.\n", - "\n", - " **Parameters**\n", - "\n", - " dim_x : int\n", - " Number of state variables for the Kalman filter. For example, if\n", - " you are tracking the position and velocity of an object in two\n", - " dimensions, dim_x would be 4.\n", - "\n", - " This is used to set the default size of P, Q, and u\n", - "\n", - " dim_z : int\n", - " Number of of measurement inputs. For example, if the sensor\n", - " provides you with position in (x,y), dim_z would be 2.\n", - " \"\"\"\n", - "\n", - " self.dim_x = dim_x\n", - " self.dim_z = dim_z\n", - "\n", - " self._x = zeros((dim_x,1)) # state\n", - " self._P = eye(dim_x) # uncertainty covariance\n", - " self._B = 0 # control transition matrix\n", - " self._F = 0 # state transition matrix\n", - " self._R = eye(dim_z) # state uncertainty\n", - " self._Q = eye(dim_x) # process uncertainty\n", - " self._y = zeros((dim_z, 1))\n", - "\n", - " # identity matrix. Do not alter this.\n", - " self._I = np.eye(dim_x)\n", - "\n", - "\n", - " def predict_update(self, z, HJacobian, Hx, args=(), hx_args=(), u=0):\n", - " \"\"\" Performs the predict/update innovation of the extended Kalman\n", - " filter.\n", - "\n", - " **Parameters**\n", - "\n", - " z : np.array\n", - " measurement for this step.\n", - " If `None`, only predict step is perfomed.\n", - "\n", - " HJacobian : function\n", - " function which computes the Jacobian of the H matrix (measurement\n", - " function). Takes state variable (self.x) as input, along with the\n", - " optional arguments in args, and returns H.\n", - "\n", - " Hx : function\n", - " function which takes as input the state variable (self.x) along\n", - " with the optional arguments in hx_args, and returns the measurement\n", - " that would correspond to that state.\n", - "\n", - " args : tuple, optional, default (,)\n", - " arguments to be passed into HJacobian after the required state\n", - " variable.\n", - "\n", - " hx_args : tuple, optional, default (,)\n", - " arguments to be passed into HJacobian after the required state\n", - " variable.\n", - "\n", - " u : np.array or scalar\n", - " optional control vector input to the filter.\n", - " \"\"\"\n", - "\n", - " if not isinstance(args, tuple):\n", - " args = (args,)\n", - "\n", - " if not isinstance(hx_args, tuple):\n", - " hx_args = (hx_args,)\n", - "\n", - " if np.isscalar(z) and self.dim_z == 1:\n", - " z = np.asarray([z], float)\n", - "\n", - " F = self._F\n", - " B = self._B\n", - " P = self._P\n", - " Q = self._Q\n", - " R = self._R\n", - " x = self._x\n", - "\n", - " H = HJacobian(x, *args)\n", - "\n", - " # predict step\n", - " x = dot(F, x) + dot(B, u)\n", - " P = dot3(F, P, F.T) + Q\n", - "\n", - " # update step\n", - " S = dot3(H, P, H.T) + R\n", - " K = dot3(P, H.T, linalg.inv (S))\n", - "\n", - " self._x = x + dot(K, (z - Hx(x, *hx_args)))\n", - "\n", - " I_KH = self._I - dot(K, H)\n", - " self._P = dot3(I_KH, P, I_KH.T) + dot3(K, R, K.T)\n", - "\n", - "\n", - " def update(self, z, HJacobian, Hx, R=None, args=(), hx_args=(),\n", - " residual=np.subtract):\n", - " \"\"\" Performs the update innovation of the extended Kalman filter.\n", - "\n", - " **Parameters**\n", - "\n", - " z : np.array\n", - " measurement for this step.\n", - " If `None`, only predict step is perfomed.\n", - "\n", - " HJacobian : function\n", - " function which computes the Jacobian of the H matrix (measurement\n", - " function). Takes state variable (self.x) as input, returns H.\n", - "\n", - " Hx : function\n", - " function which takes as input the state variable (self.x) along\n", - " with the optional arguments in hx_args, and returns the measurement\n", - " that would correspond to that state.\n", - "\n", - " R : np.array, scalar, or None\n", - " Optionally provide R to override the measurement noise for this\n", - " one call, otherwise self.R will be used.\n", - "\n", - " args : tuple, optional, default (,)\n", - " arguments to be passed into HJacobian after the required state\n", - " variable. for robot localization you might need to pass in\n", - " information about the map and time of day, so you might have\n", - " `args=(map_data, time)`, where the signature of HCacobian will\n", - " be `def HJacobian(x, map, t)`\n", - "\n", - " hx_args : tuple, optional, default (,)\n", - " arguments to be passed into Hx function after the required state\n", - " variable.\n", - "\n", - " residual : function (z, z2), optional\n", - " Optional function that computes the residual (difference) between\n", - " the two measurement vectors. If you do not provide this, then the\n", - " built in minus operator will be used. You will normally want to use\n", - " the built in unless your residual computation is nonlinear (for\n", - " example, if they are angles)\n", - " \"\"\"\n", - "\n", - " if not isinstance(args, tuple):\n", - " args = (args,)\n", - "\n", - " if not isinstance(hx_args, tuple):\n", - " hx_args = (hx_args,)\n", - "\n", - " P = self._P\n", - " if R is None:\n", - " R = self._R\n", - " elif np.isscalar(R):\n", - " R = eye(self.dim_z) * R\n", - "\n", - " if np.isscalar(z) and self.dim_z == 1:\n", - " z = np.asarray([z], float)\n", - "\n", - " x = self._x\n", - "\n", - " H = HJacobian(x, *args)\n", - "\n", - " S = dot3(H, P, H.T) + R\n", - " K = dot3(P, H.T, linalg.inv (S))\n", - "\n", - " hx = Hx(x, *hx_args)\n", - " y = residual(z, hx)\n", - " self._x = x + dot(K, y)\n", - "\n", - " I_KH = self._I - dot(K, H)\n", - " self._P = dot3(I_KH, P, I_KH.T) + dot3(K, R, K.T)\n", - "\n", - "\n", - " def predict_x(self, u=0):\n", - " \"\"\" predicts the next state of X. If you need to\n", - " compute the next state yourself, override this function. You would\n", - " need to do this, for example, if the usual Taylor expansion to\n", - " generate F is not providing accurate results for you. \"\"\"\n", - "\n", - " self._x = dot(self._F, self._x) + dot(self._B, u)\n", - "\n", - "\n", - " def predict(self, u=0):\n", - " \"\"\" Predict next position.\n", - "\n", - " **Parameters**\n", - "\n", - " u : np.array\n", - " Optional control vector. If non-zero, it is multiplied by B\n", - " to create the control input into the system.\n", - " \"\"\"\n", - "\n", - " self.predict_x(u)\n", - " self._P = dot3(self._F, self._P, self._F.T) + self._Q\n", - "\n", - "\n", - " @property\n", - " def Q(self):\n", - " \"\"\" Process uncertainty\"\"\"\n", - " return self._Q\n", - "\n", - "\n", - " @Q.setter\n", - " def Q(self, value):\n", - " self._Q = setter_scalar(value, self.dim_x)\n", - "\n", - "\n", - " @property\n", - " def P(self):\n", - " \"\"\" covariance matrix\"\"\"\n", - " return self._P\n", - "\n", - "\n", - " @P.setter\n", - " def P(self, value):\n", - " self._P = setter_scalar(value, self.dim_x)\n", - "\n", - "\n", - " @property\n", - " def R(self):\n", - " \"\"\" measurement uncertainty\"\"\"\n", - " return self._R\n", - "\n", - "\n", - " @R.setter\n", - " def R(self, value):\n", - " self._R = setter_scalar(value, self.dim_z)\n", - "\n", - "\n", - " @property\n", - " def F(self):\n", - " return self._F\n", - "\n", - "\n", - " @F.setter\n", - " def F(self, value):\n", - " self._F = setter(value, self.dim_x, self.dim_x)\n", - "\n", - "\n", - " @property\n", - " def B(self):\n", - " return self._B\n", - "\n", - "\n", - " @B.setter\n", - " def B(self, value):\n", - " \"\"\" control transition matrix\"\"\"\n", - " self._B = setter(value, self.dim_x, self.dim_u)\n", - "\n", - "\n", - " @property\n", - " def x(self):\n", - " return self._x\n", - "\n", - " @x.setter\n", - " def x(self, value):\n", - " self._x = setter_1d(value, self.dim_x)\n", - "\n", - " @property\n", - " def K(self):\n", - " \"\"\" Kalman gain \"\"\"\n", - " return self._K\n", - "\n", - " @property\n", - " def y(self):\n", - " \"\"\" measurement residual (innovation) \"\"\"\n", - " return self._y\n", - "\n", - " @property\n", - " def S(self):\n", - " \"\"\" system uncertainty in measurement space \"\"\"\n", - " return self._S\n", - "\n", - "\n" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## UnscentedKalmanFilter" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "collapsed": true - }, - "outputs": [], - "source": [ - "# %load https://raw.githubusercontent.com/rlabbe/filterpy/master/filterpy/kalman/UKF.py\n", - "\"\"\"Copyright 2015 Roger R Labbe Jr.\n", - "\n", - "FilterPy library.\n", - "http://github.com/rlabbe/filterpy\n", - "\n", - "Documentation at:\n", - "https://filterpy.readthedocs.org\n", - "\n", - "Supporting book at:\n", - "https://github.com/rlabbe/Kalman-and-Bayesian-Filters-in-Python\n", - "\n", - "This is licensed under an MIT license. See the readme.MD file\n", - "for more information.\n", - "\"\"\"\n", - "\n", - "from __future__ import (absolute_import, division, print_function,\n", - " unicode_literals)\n", - "\n", - "from filterpy.common import dot3\n", - "from filterpy.kalman import unscented_transform\n", - "import numpy as np\n", - "from numpy import eye, zeros, dot, isscalar, outer\n", - "from scipy.linalg import inv, cholesky\n", - "\n", - "\n", - "class UnscentedKalmanFilter(object):\n", - " # pylint: disable=too-many-instance-attributes\n", - " # pylint: disable=C0103\n", - " r\"\"\" Implements the Scaled Unscented Kalman filter (UKF) as defined by\n", - " Simon Julier in [1], using the formulation provided by Wan and Merle\n", - " in [2]. This filter scales the sigma points to avoid strong nonlinearities.\n", - "\n", - "\n", - " You will have to set the following attributes after constructing this\n", - " object for the filter to perform properly.\n", - "\n", - " **Attributes**\n", - "\n", - " x : numpy.array(dim_x)\n", - " state estimate vector\n", - "\n", - " P : numpy.array(dim_x, dim_x)\n", - " covariance estimate matrix\n", - "\n", - " R : numpy.array(dim_z, dim_z)\n", - " measurement noise matrix\n", - "\n", - " Q : numpy.array(dim_x, dim_x)\n", - " process noise matrix\n", - "\n", - "\n", - " You may read the following attributes.\n", - "\n", - " **Readable Attributes**\n", - "\n", - " xp : numpy.array(dim_x)\n", - " predicted state (result of predict())\n", - "\n", - " Pp : numpy.array(dim_x, dim_x)\n", - " predicted covariance matrix (result of predict())\n", - "\n", - "\n", - " **References**\n", - "\n", - " .. [1] Julier, Simon J. \"The scaled unscented transformation,\"\n", - " American Control Converence, 2002, pp 4555-4559, vol 6.\n", - "\n", - " Online copy:\n", - " https://www.cs.unc.edu/~welch/kalman/media/pdf/ACC02-IEEE1357.PDF\n", - "\n", - "\n", - " .. [2] E. A. Wan and R. Van der Merwe, “The unscented Kalman filter for\n", - " nonlinear estimation,” in Proc. Symp. Adaptive Syst. Signal\n", - " Process., Commun. Contr., Lake Louise, AB, Canada, Oct. 2000.\n", - "\n", - " Online Copy:\n", - " https://www.seas.harvard.edu/courses/cs281/papers/unscented.pdf\n", - " \"\"\"\n", - "\n", - " def __init__(self, dim_x, dim_z, dt, hx, fx, points,\n", - " sqrt_fn=None, x_mean_fn=None, z_mean_fn=None,\n", - " residual_x=None,\n", - " residual_z=None):\n", - " r\"\"\" Create a Kalman filter. You are responsible for setting the\n", - " various state variables to reasonable values; the defaults below will\n", - " not give you a functional filter.\n", - "\n", - " **Parameters**\n", - "\n", - " dim_x : int\n", - " Number of state variables for the filter. For example, if\n", - " you are tracking the position and velocity of an object in two\n", - " dimensions, dim_x would be 4.\n", - "\n", - "\n", - " dim_z : int\n", - " Number of of measurement inputs. For example, if the sensor\n", - " provides you with position in (x,y), dim_z would be 2.\n", - "\n", - " dt : float\n", - " Time between steps in seconds.\n", - "\n", - " hx : function(x)\n", - " Measurement function. Converts state vector x into a measurement\n", - " vector of shape (dim_z).\n", - "\n", - " fx : function(x,dt)\n", - " function that returns the state x transformed by the\n", - " state transistion function. dt is the time step in seconds.\n", - "\n", - " points : class\n", - " Class which computes the sigma points and weights for a UKF\n", - " algorithm. You can vary the UKF implementation by changing this\n", - " class. For example, MerweScaledSigmaPoints implements the alpha,\n", - " beta, kappa parameterization of Van der Merwe, and\n", - " JulierSigmaPoints implements Julier's original kappa\n", - " parameterization. See either of those for the required\n", - " signature of this class if you want to implement your own.\n", - "\n", - " sqrt_fn : callable(ndarray), default = scipy.linalg.cholesky\n", - " Defines how we compute the square root of a matrix, which has\n", - " no unique answer. Cholesky is the default choice due to its\n", - " speed. Typically your alternative choice will be\n", - " scipy.linalg.sqrtm. Different choices affect how the sigma points\n", - " are arranged relative to the eigenvectors of the covariance matrix.\n", - " Usually this will not matter to you; if so the default cholesky()\n", - " yields maximal performance. As of van der Merwe's dissertation of\n", - " 2004 [6] this was not a well reseached area so I have no advice\n", - " to give you.\n", - "\n", - " If your method returns a triangular matrix it must be upper\n", - " triangular. Do not use numpy.linalg.cholesky - for historical\n", - " reasons it returns a lower triangular matrix. The SciPy version\n", - " does the right thing.\n", - "\n", - " x_mean_fn : callable (sigma_points, weights), optional\n", - " Function that computes the mean of the provided sigma points\n", - " and weights. Use this if your state variable contains nonlinear\n", - " values such as angles which cannot be summed.\n", - "\n", - " .. code-block:: Python\n", - "\n", - " def state_mean(sigmas, Wm):\n", - " x = np.zeros(3)\n", - " sum_sin, sum_cos = 0., 0.\n", - "\n", - " for i in range(len(sigmas)):\n", - " s = sigmas[i]\n", - " x[0] += s[0] * Wm[i]\n", - " x[1] += s[1] * Wm[i]\n", - " sum_sin += sin(s[2])*Wm[i]\n", - " sum_cos += cos(s[2])*Wm[i]\n", - " x[2] = atan2(sum_sin, sum_cos)\n", - " return x\n", - "\n", - " z_mean_fn : callable (sigma_points, weights), optional\n", - " Same as x_mean_fn, except it is called for sigma points which\n", - " form the measurements after being passed through hx().\n", - "\n", - " residual_x : callable (x, y), optional\n", - " residual_z : callable (x, y), optional\n", - " Function that computes the residual (difference) between x and y.\n", - " You will have to supply this if your state variable cannot support\n", - " subtraction, such as angles (359-1 degreees is 2, not 358). x and y\n", - " are state vectors, not scalars. One is for the state variable,\n", - " the other is for the measurement state.\n", - "\n", - " .. code-block:: Python\n", - "\n", - " def residual(a, b):\n", - " y = a[0] - b[0]\n", - " if y > np.pi:\n", - " y -= 2*np.pi\n", - " if y < -np.pi:\n", - " y = 2*np.pi\n", - " return y\n", - "\n", - " **References**\n", - "\n", - " .. [3] S. Julier, J. Uhlmann, and H. Durrant-Whyte. \"A new method for\n", - " the nonlinear transformation of means and covariances in filters\n", - " and estimators,\" IEEE Transactions on Automatic Control, 45(3),\n", - " pp. 477-482 (March 2000).\n", - "\n", - " .. [4] E. A. Wan and R. Van der Merwe, “The Unscented Kalman filter for\n", - " Nonlinear Estimation,” in Proc. Symp. Adaptive Syst. Signal\n", - " Process., Commun. Contr., Lake Louise, AB, Canada, Oct. 2000.\n", - "\n", - " https://www.seas.harvard.edu/courses/cs281/papers/unscented.pdf\n", - "\n", - " .. [5] Wan, Merle \"The Unscented Kalman Filter,\" chapter in *Kalman\n", - " Filtering and Neural Networks*, John Wiley & Sons, Inc., 2001.\n", - "\n", - " .. [6] R. Van der Merwe \"Sigma-Point Kalman Filters for Probabilitic\n", - " Inference in Dynamic State-Space Models\" (Doctoral dissertation)\n", - " \"\"\"\n", - "\n", - " self.Q = eye(dim_x)\n", - " self.R = eye(dim_z)\n", - " self.x = zeros(dim_x)\n", - " self.P = eye(dim_x)\n", - " self._dim_x = dim_x\n", - " self._dim_z = dim_z\n", - " self._dt = dt\n", - " self._num_sigmas = 2*dim_x + 1\n", - " self.hx = hx\n", - " self.fx = fx\n", - " self.points_fn = points\n", - " self.x_mean = x_mean_fn\n", - " self.z_mean = z_mean_fn\n", - "\n", - " if sqrt_fn is None:\n", - " self.msqrt = cholesky\n", - " else:\n", - " self.msqrt = sqrt_fn\n", - "\n", - " # weights for the means and covariances.\n", - " self.Wm, self.Wc = self.points_fn.weights()\n", - "\n", - " if residual_x is None:\n", - " self.residual_x = np.subtract\n", - " else:\n", - " self.residual_x = residual_x\n", - "\n", - " if residual_z is None:\n", - " self.residual_z = np.subtract\n", - " else:\n", - " self.residual_z = residual_z\n", - "\n", - " # sigma points transformed through f(x) and h(x)\n", - " # variables for efficiency so we don't recreate every update\n", - " self.sigmas_f = zeros((2*self._dim_x+1, self._dim_x))\n", - " self.sigmas_h = zeros((self._num_sigmas, self._dim_z))\n", - "\n", - "\n", - "\n", - " def predict(self, dt=None, UT=None, fx_args=()):\n", - " r\"\"\" Performs the predict step of the UKF. On return, self.x and\n", - " self.P contain the predicted state (x) and covariance (P). '\n", - "\n", - " Important: this MUST be called before update() is called for the first\n", - " time.\n", - "\n", - " **Parameters**\n", - "\n", - " dt : double, optional\n", - " If specified, the time step to be used for this prediction.\n", - " self._dt is used if this is not provided.\n", - "\n", - " UT : function(sigmas, Wm, Wc, noise_cov), optional\n", - " Optional function to compute the unscented transform for the sigma\n", - " points passed through hx. Typically the default function will\n", - " work - you can use x_mean_fn and z_mean_fn to alter the behavior\n", - " of the unscented transform.\n", - "\n", - " fx_args : tuple, optional, default (,)\n", - " optional arguments to be passed into fx() after the required state\n", - " variable.\n", - "\n", - " \"\"\"\n", - " if dt is None:\n", - " dt = self._dt\n", - "\n", - " if not isinstance(fx_args, tuple):\n", - " fx_args = (fx_args,)\n", - "\n", - " if UT is None:\n", - " UT = unscented_transform\n", - "\n", - " # calculate sigma points for given mean and covariance\n", - " sigmas = self.points_fn.sigma_points(self.x, self.P)\n", - "\n", - " for i in range(self._num_sigmas):\n", - " self.sigmas_f[i] = self.fx(sigmas[i], dt, *fx_args)\n", - "\n", - " self.x, self.P = UT(self.sigmas_f, self.Wm, self.Wc, self.Q,\n", - " self.x_mean, self.residual_x)\n", - "\n", - "\n", - " def update(self, z, R=None, UT=None, hx_args=()):\n", - " \"\"\" Update the UKF with the given measurements. On return,\n", - " self.x and self.P contain the new mean and covariance of the filter.\n", - "\n", - " **Parameters**\n", - "\n", - " z : numpy.array of shape (dim_z)\n", - " measurement vector\n", - "\n", - " R : numpy.array((dim_z, dim_z)), optional\n", - " Measurement noise. If provided, overrides self.R for\n", - " this function call.\n", - "\n", - " UT : function(sigmas, Wm, Wc, noise_cov), optional\n", - " Optional function to compute the unscented transform for the sigma\n", - " points passed through hx. Typically the default function will\n", - " work - you can use x_mean_fn and z_mean_fn to alter the behavior\n", - " of the unscented transform.\n", - "\n", - " hx_args : tuple, optional, default (,)\n", - " arguments to be passed into Hx function after the required state\n", - " variable.\n", - " \"\"\"\n", - "\n", - " if z is None:\n", - " return\n", - "\n", - " if not isinstance(hx_args, tuple):\n", - " hx_args = (hx_args,)\n", - "\n", - " if UT is None:\n", - " UT = unscented_transform\n", - "\n", - " if R is None:\n", - " R = self.R\n", - " elif isscalar(R):\n", - " R = eye(self._dim_z) * R\n", - "\n", - " for i in range(self._num_sigmas):\n", - " self.sigmas_h[i] = self.hx(self.sigmas_f[i], *hx_args)\n", - "\n", - " # mean and covariance of prediction passed through unscented transform\n", - " zp, Pz = UT(self.sigmas_h, self.Wm, self.Wc, R, self.z_mean, self.residual_z)\n", - "\n", - " # compute cross variance of the state and the measurements\n", - " Pxz = zeros((self._dim_x, self._dim_z))\n", - " for i in range(self._num_sigmas):\n", - " dx = self.residual_x(self.sigmas_f[i], self.x)\n", - " dz = self.residual_z(self.sigmas_h[i], zp)\n", - " Pxz += self.Wc[i] * outer(dx, dz)\n", - "\n", - " K = dot(Pxz, inv(Pz)) # Kalman gain\n", - " y = self.residual_z(z, zp) #residual\n", - " self.x = self.x + dot(K, y)\n", - " self.P = self.P - dot3(K, Pz, K.T)\n", - "\n", - "\n", - " def batch_filter(self, zs, Rs=None, residual=None, UT=None):\n", - " \"\"\" Performs the UKF filter over the list of measurement in `zs`.\n", - "\n", - "\n", - " **Parameters**\n", - "\n", - " zs : list-like\n", - " list of measurements at each time step `self._dt` Missing\n", - " measurements must be represented by 'None'.\n", - "\n", - " Rs : list-like, optional\n", - " optional list of values to use for the measurement error\n", - " covariance; a value of None in any position will cause the filter\n", - " to use `self.R` for that time step.\n", - "\n", - " residual : function (z, z2), optional\n", - " Optional function that computes the residual (difference) between\n", - " the two measurement vectors. If you do not provide this, then the\n", - " built in minus operator will be used. You will normally want to use\n", - " the built in unless your residual computation is nonlinear (for\n", - " example, if they are angles)\n", - "\n", - " UT : function(sigmas, Wm, Wc, noise_cov), optional\n", - " Optional function to compute the unscented transform for the sigma\n", - " points passed through hx. Typically the default function will\n", - " work - you can use x_mean_fn and z_mean_fn to alter the behavior\n", - " of the unscented transform.\n", - "\n", - " **Returns**\n", - "\n", - " means: ndarray((n,dim_x,1))\n", - " array of the state for each time step after the update. Each entry\n", - " is an np.array. In other words `means[k,:]` is the state at step\n", - " `k`.\n", - "\n", - " covariance: ndarray((n,dim_x,dim_x))\n", - " array of the covariances for each time step after the update.\n", - " In other words `covariance[k,:,:]` is the covariance at step `k`.\n", - "\n", - " \"\"\"\n", - "\n", - " try:\n", - " z = zs[0]\n", - " except:\n", - " assert not isscalar(zs), 'zs must be list-like'\n", - "\n", - " if self._dim_z == 1:\n", - " assert isscalar(z) or (z.ndim==1 and len(z) == 1), \\\n", - " 'zs must be a list of scalars or 1D, 1 element arrays'\n", - "\n", - " else:\n", - " assert len(z) == self._dim_z, 'each element in zs must be a' \\\n", - " '1D array of length {}'.format(self._dim_z)\n", - "\n", - "\n", - " if residual is None:\n", - " residual = np.subtract\n", - "\n", - " z_n = np.size(zs, 0)\n", - " if Rs is None:\n", - " Rs = [None] * z_n\n", - "\n", - " # mean estimates from Kalman Filter\n", - " if self.x.ndim == 1:\n", - " means = zeros((z_n, self._dim_x))\n", - " else:\n", - " means = zeros((z_n, self._dim_x, 1))\n", - "\n", - " # state covariances from Kalman Filter\n", - " covariances = zeros((z_n, self._dim_x, self._dim_x))\n", - "\n", - " for i, (z, r) in enumerate(zip(zs, Rs)):\n", - " self.predict()\n", - " self.update(z, r)\n", - " means[i,:] = self.x\n", - " covariances[i,:,:] = self.P\n", - "\n", - " return (means, covariances)\n", - "\n", - "\n", - " def rts_smoother(self, Xs, Ps, Qs=None, dt=None):\n", - " \"\"\" Runs the Rauch-Tung-Striebal Kalman smoother on a set of\n", - " means and covariances computed by the UKF. The usual input\n", - " would come from the output of `batch_filter()`.\n", - "\n", - " **Parameters**\n", - "\n", - " Xs : numpy.array\n", - " array of the means (state variable x) of the output of a Kalman\n", - " filter.\n", - "\n", - " Ps : numpy.array\n", - " array of the covariances of the output of a kalman filter.\n", - "\n", - " Qs: list-like collection of numpy.array, optional\n", - " Process noise of the Kalman filter at each time step. Optional,\n", - " if not provided the filter's self.Q will be used\n", - "\n", - " dt : optional, float or array-like of float\n", - " If provided, specifies the time step of each step of the filter.\n", - " If float, then the same time step is used for all steps. If\n", - " an array, then each element k contains the time at step k.\n", - " Units are seconds.\n", - "\n", - " **Returns**\n", - "\n", - " x : numpy.ndarray\n", - " smoothed means\n", - "\n", - " P : numpy.ndarray\n", - " smoothed state covariances\n", - "\n", - " K : numpy.ndarray\n", - " smoother gain at each step\n", - "\n", - "\n", - " **Example**\n", - "\n", - " .. code-block:: Python\n", - "\n", - " zs = [t + random.randn()*4 for t in range (40)]\n", - "\n", - " (mu, cov, _, _) = kalman.batch_filter(zs)\n", - " (x, P, K) = rts_smoother(mu, cov, fk.F, fk.Q)\n", - "\n", - " \"\"\"\n", - " assert len(Xs) == len(Ps)\n", - " n, dim_x = Xs.shape\n", - "\n", - " if dt is None:\n", - " dt = [self._dt] * n\n", - " elif isscalar(dt):\n", - " dt = [dt] * n\n", - "\n", - " if Qs is None:\n", - " Qs = [self.Q] * n\n", - "\n", - " # smoother gain\n", - " Ks = zeros((n,dim_x,dim_x))\n", - "\n", - " num_sigmas = 2*dim_x + 1\n", - "\n", - " xs, ps = Xs.copy(), Ps.copy()\n", - " sigmas_f = zeros((num_sigmas, dim_x))\n", - "\n", - "\n", - " for k in range(n-2,-1,-1):\n", - " # create sigma points from state estimate, pass through state func\n", - " sigmas = self.points_fn.sigma_points(xs[k], ps[k])\n", - " for i in range(num_sigmas):\n", - " sigmas_f[i] = self.fx(sigmas[i], dt[k])\n", - "\n", - " # compute backwards prior state and covariance\n", - " xb = dot(self.Wm, sigmas_f)\n", - " Pb = 0\n", - " x = Xs[k]\n", - " for i in range(num_sigmas):\n", - " y = sigmas_f[i] - x\n", - " Pb += self.Wm[i] * outer(y, y)\n", - " Pb += Qs[k]\n", - "\n", - " # compute cross variance\n", - " Pxb = 0\n", - " for i in range(num_sigmas):\n", - " z = sigmas[i] - Xs[k]\n", - " y = sigmas_f[i] - xb\n", - " Pxb += self.Wm[i] * outer(z, y)\n", - "\n", - " # compute gain\n", - " K = dot(Pxb, inv(Pb))\n", - "\n", - " # update the smoothed estimates\n", - " xs[k] += dot (K, xs[k+1] - xb)\n", - " ps[k] += dot3(K, ps[k+1] - Pb, K.T)\n", - " Ks[k] = K\n", - "\n", - " return (xs, ps, Ks)\n" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "collapsed": true - }, - "outputs": [], - "source": [ - "# %load https://raw.githubusercontent.com/rlabbe/filterpy/master/filterpy/kalman/sigma_points.py\n", - "\"\"\"Copyright 2015 Roger R Labbe Jr.\n", - "\n", - "FilterPy library.\n", - "http://github.com/rlabbe/filterpy\n", - "\n", - "Documentation at:\n", - "https://filterpy.readthedocs.org\n", - "\n", - "Supporting book at:\n", - "https://github.com/rlabbe/Kalman-and-Bayesian-Filters-in-Python\n", - "\n", - "This is licensed under an MIT license. See the readme.MD file\n", - "for more information.\n", - "\"\"\"\n", - "\n", - "import numpy as np\n", - "from scipy.linalg import cholesky\n", - "\n", - "\n", - "class MerweScaledSigmaPoints(object):\n", - "\n", - " def __init__(self, n, alpha, beta, kappa, sqrt_method=None, subtract=None):\n", - " \"\"\"\n", - " Generates sigma points and weights according to Van der Merwe's\n", - " 2004 dissertation [1]. It parametizes the sigma points using\n", - " alpha, beta, kappa terms, and is the version seen in most publications.\n", - "\n", - " Unless you know better, this should be your default choice.\n", - "\n", - " **Parameters**\n", - "\n", - " n : int\n", - " Dimensionality of the state. 2n+1 weights will be generated.\n", - "\n", - " alpha : float\n", - " Determins the spread of the sigma points around the mean.\n", - " Usually a small positive value (1e-3) according to [3].\n", - "\n", - " beta : float\n", - " Incorporates prior knowledge of the distribution of the mean. For\n", - " Gaussian x beta=2 is optimal, according to [3].\n", - "\n", - " kappa : float, default=0.0\n", - " Secondary scaling parameter usually set to 0 according to [4],\n", - " or to 3-n according to [5].\n", - "\n", - " sqrt_method : function(ndarray), default=scipy.linalg.cholesky\n", - " Defines how we compute the square root of a matrix, which has\n", - " no unique answer. Cholesky is the default choice due to its\n", - " speed. Typically your alternative choice will be\n", - " scipy.linalg.sqrtm. Different choices affect how the sigma points\n", - " are arranged relative to the eigenvectors of the covariance matrix.\n", - " Usually this will not matter to you; if so the default cholesky()\n", - " yields maximal performance. As of van der Merwe's dissertation of\n", - " 2004 [6] this was not a well reseached area so I have no advice\n", - " to give you.\n", - "\n", - " If your method returns a triangular matrix it must be upper\n", - " triangular. Do not use numpy.linalg.cholesky - for historical\n", - " reasons it returns a lower triangular matrix. The SciPy version\n", - " does the right thing.\n", - "\n", - " subtract : callable (x, y), optional\n", - " Function that computes the difference between x and y.\n", - " You will have to supply this if your state variable cannot support\n", - " subtraction, such as angles (359-1 degreees is 2, not 358). x and y\n", - " are state vectors, not scalars.\n", - "\n", - " **References**\n", - "\n", - " .. [1] R. Van der Merwe \"Sigma-Point Kalman Filters for Probabilitic\n", - " Inference in Dynamic State-Space Models\" (Doctoral dissertation)\n", - " \"\"\"\n", - "\n", - " self.n = n\n", - " self.alpha = alpha\n", - " self.beta = beta\n", - " self.kappa = kappa\n", - " if sqrt_method is None:\n", - " self.sqrt = cholesky\n", - " else:\n", - " self.sqrt = sqrt_method\n", - "\n", - " if subtract is None:\n", - " self.subtract= np.subtract\n", - " else:\n", - " self.subtract = subtract\n", - "\n", - "\n", - " def sigma_points(self, x, P):\n", - " \"\"\" Computes the sigma points for an unscented Kalman filter\n", - " given the mean (x) and covariance(P) of the filter.\n", - " Returns tuple of the sigma points and weights.\n", - "\n", - " Works with both scalar and array inputs:\n", - " sigma_points (5, 9, 2) # mean 5, covariance 9\n", - " sigma_points ([5, 2], 9*eye(2), 2) # means 5 and 2, covariance 9I\n", - "\n", - " **Parameters**\n", - "\n", - " X An array-like object of the means of length n\n", - " Can be a scalar if 1D.\n", - " examples: 1, [1,2], np.array([1,2])\n", - "\n", - " P : scalar, or np.array\n", - " Covariance of the filter. If scalar, is treated as eye(n)*P.\n", - "\n", - " **Returns**\n", - "\n", - " sigmas : np.array, of size (n, 2n+1)\n", - " Two dimensional array of sigma points. Each column contains all of\n", - " the sigmas for one dimension in the problem space.\n", - "\n", - " Ordered by Xi_0, Xi_{1..n}, Xi_{n+1..2n}\n", - " \"\"\"\n", - "\n", - " assert self.n == np.size(x), \"expected size {}, but size is {}\".format(\n", - " self.n, np.size(x))\n", - "\n", - " n = self.n\n", - "\n", - " if np.isscalar(x):\n", - " x = np.asarray([x])\n", - "\n", - " if np.isscalar(P):\n", - " P = np.eye(n)*P\n", - "\n", - " lambda_ = self.alpha**2 * (n + self.kappa) - n\n", - " U = self.sqrt((lambda_ + n)*P)\n", - "\n", - " sigmas = np.zeros((2*n+1, n))\n", - " sigmas[0] = x\n", - " for k in range(n):\n", - " sigmas[k+1] = self.subtract(x, -U[k])\n", - " sigmas[n+k+1] = self.subtract(x, U[k])\n", - "\n", - " return sigmas\n", - "\n", - "\n", - " def weights(self):\n", - " \"\"\" Computes the weights for the scaled unscented Kalman filter.\n", - "\n", - " **Returns**\n", - "\n", - " Wm : ndarray[2n+1]\n", - " weights for mean\n", - "\n", - " Wc : ndarray[2n+1]\n", - " weights for the covariances\n", - " \"\"\"\n", - "\n", - " n = self.n\n", - " lambda_ = self.alpha**2 * (n +self.kappa) - n\n", - "\n", - " c = .5 / (n + lambda_)\n", - " Wc = np.full(2*n + 1, c)\n", - " Wm = np.full(2*n + 1, c)\n", - " Wc[0] = lambda_ / (n + lambda_) + (1 - self.alpha**2 + self.beta)\n", - " Wm[0] = lambda_ / (n + lambda_)\n", - "\n", - " return Wm, Wc\n", - "\n", - "\n", - "class JulierSigmaPoints(object):\n", - "\n", - " def __init__(self,n, kappa, sqrt_method=None, subtract=None):\n", - " \"\"\"\n", - " Generates sigma points and weights according to Simon J. Julier\n", - " and Jeffery K. Uhlmann's original paper [1]. It parametizes the sigma\n", - " points using kappa.\n", - "\n", - " **Parameters**\n", - "\n", - "\n", - " n : int\n", - " Dimensionality of the state. 2n+1 weights will be generated.\n", - "\n", - " kappa : float, default=0.\n", - " Scaling factor that can reduce high order errors. kappa=0 gives\n", - " the standard unscented filter. According to [Julier], if you set\n", - " kappa to 3-dim_x for a Gaussian x you will minimize the fourth\n", - " order errors in x and P.\n", - "\n", - " sqrt_method : function(ndarray), default=scipy.linalg.cholesky\n", - " Defines how we compute the square root of a matrix, which has\n", - " no unique answer. Cholesky is the default choice due to its\n", - " speed. Typically your alternative choice will be\n", - " scipy.linalg.sqrtm. Different choices affect how the sigma points\n", - " are arranged relative to the eigenvectors of the covariance matrix.\n", - " Usually this will not matter to you; if so the default cholesky()\n", - " yields maximal performance. As of van der Merwe's dissertation of\n", - " 2004 [6] this was not a well reseached area so I have no advice\n", - " to give you.\n", - "\n", - " If your method returns a triangular matrix it must be upper\n", - " triangular. Do not use numpy.linalg.cholesky - for historical\n", - " reasons it returns a lower triangular matrix. The SciPy version\n", - " does the right thing.\n", - "\n", - "\n", - " subtract : callable (x, y), optional\n", - " Function that computes the difference between x and y.\n", - " You will have to supply this if your state variable cannot support\n", - " subtraction, such as angles (359-1 degreees is 2, not 358). x and y\n", - "\n", - " **References**\n", - "\n", - " .. [1] Julier, Simon J.; Uhlmann, Jeffrey \"A New Extension of the Kalman\n", - " Filter to Nonlinear Systems\". Proc. SPIE 3068, Signal Processing,\n", - " Sensor Fusion, and Target Recognition VI, 182 (July 28, 1997)\n", - " \"\"\"\n", - "\n", - " self.n = n\n", - " self.kappa = kappa\n", - " if sqrt_method is None:\n", - " self.sqrt = cholesky\n", - " else:\n", - " self.sqrt = sqrt_method\n", - "\n", - " if subtract is None:\n", - " self.subtract= np.subtract\n", - " else:\n", - " self.subtract = subtract\n", - "\n", - "\n", - " def sigma_points(self, x, P):\n", - " r\"\"\" Computes the sigma points for an unscented Kalman filter\n", - " given the mean (x) and covariance(P) of the filter.\n", - " kappa is an arbitrary constant. Returns sigma points.\n", - "\n", - " Works with both scalar and array inputs:\n", - " sigma_points (5, 9, 2) # mean 5, covariance 9\n", - " sigma_points ([5, 2], 9*eye(2), 2) # means 5 and 2, covariance 9I\n", - "\n", - " **Parameters**\n", - "\n", - " X : array-like object of the means of length n\n", - " Can be a scalar if 1D.\n", - " examples: 1, [1,2], np.array([1,2])\n", - "\n", - " P : scalar, or np.array\n", - " Covariance of the filter. If scalar, is treated as eye(n)*P.\n", - "\n", - " kappa : float\n", - " Scaling factor.\n", - "\n", - " **Returns**\n", - "\n", - " sigmas : np.array, of size (n, 2n+1)\n", - " 2D array of sigma points :math:`\\chi`. Each column contains all of\n", - " the sigmas for one dimension in the problem space. They\n", - " are ordered as:\n", - "\n", - " .. math::\n", - " :nowrap:\n", - "\n", - " \\begin{eqnarray}\n", - " \\chi[0] = &x \\\\\n", - " \\chi[1..n] = &x + [\\sqrt{(n+\\kappa)P}]_k \\\\\n", - " \\chi[n+1..2n] = &x - [\\sqrt{(n+\\kappa)P}]_k\n", - " \\end{eqnarray}\n", - " \"\"\"\n", - "\n", - " assert self.n == np.size(x)\n", - " n = self.n\n", - "\n", - " if np.isscalar(x):\n", - " x = np.asarray([x])\n", - "\n", - " n = np.size(x) # dimension of problem\n", - "\n", - " if np.isscalar(P):\n", - " P = np.eye(n)*P\n", - "\n", - " sigmas = np.zeros((2*n+1, n))\n", - "\n", - " # implements U'*U = (n+kappa)*P. Returns lower triangular matrix.\n", - " # Take transpose so we can access with U[i]\n", - " U = self.sqrt((n + self.kappa) * P)\n", - "\n", - " sigmas[0] = x\n", - " for k in range(n):\n", - " sigmas[k+1] = self.subtract(x, -U[k])\n", - " sigmas[n+k+1] = self.subtract(x, U[k])\n", - " return sigmas\n", - "\n", - "\n", - " def weights(self):\n", - " \"\"\" Computes the weights for the unscented Kalman filter. In this\n", - " formulatyion the weights for the mean and covariance are the same.\n", - "\n", - " **Returns**\n", - "\n", - " Wm : ndarray[2n+1]\n", - " weights for mean\n", - "\n", - " Wc : ndarray[2n+1]\n", - " weights for the covariances\n", - " \"\"\"\n", - " n = self.n\n", - " k = self.kappa\n", - "\n", - " W = np.full(2*n+1, .5 / (n + k))\n", - " W[0] = k / (n+k)\n", - " return W, W\n" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "collapsed": true - }, - "outputs": [], - "source": [ - "# %load https://raw.githubusercontent.com/rlabbe/filterpy/master/filterpy/kalman/unscented_transform.py\n", - "\"\"\"Copyright 2015 Roger R Labbe Jr.\n", - "\n", - "FilterPy library.\n", - "http://github.com/rlabbe/filterpy\n", - "\n", - "Documentation at:\n", - "https://filterpy.readthedocs.org\n", - "\n", - "Supporting book at:\n", - "https://github.com/rlabbe/Kalman-and-Bayesian-Filters-in-Python\n", - "\n", - "This is licensed under an MIT license. See the readme.MD file\n", - "for more information.\n", - "\"\"\"\n", - "\n", - "import numpy as np\n", - "\n", - "\n", - "def unscented_transform(sigmas, Wm, Wc, noise_cov=None,\n", - " mean_fn=None, residual_fn=None):\n", - " \"\"\" Computes unscented transform of a set of sigma points and weights.\n", - " returns the mean and covariance in a tuple.\n", - "\n", - " **Parameters**\n", - "\n", - "\n", - " sigamas: ndarray [#sigmas per dimension, dimension]\n", - " 2D array of sigma points.\n", - "\n", - " Wm : ndarray [# sigmas per dimension]\n", - " Weights for the mean. Must sum to 1.\n", - "\n", - "\n", - " Wc : ndarray [# sigmas per dimension]\n", - " Weights for the covariance. Must sum to 1.\n", - "\n", - " noise_cov : ndarray, optional\n", - " noise matrix added to the final computed covariance matrix.\n", - "\n", - " mean_fn : callable (sigma_points, weights), optional\n", - " Function that computes the mean of the provided sigma points\n", - " and weights. Use this if your state variable contains nonlinear\n", - " values such as angles which cannot be summed.\n", - "\n", - " .. code-block:: Python\n", - "\n", - " def state_mean(sigmas, Wm):\n", - " x = np.zeros(3)\n", - " sum_sin, sum_cos = 0., 0.\n", - "\n", - " for i in range(len(sigmas)):\n", - " s = sigmas[i]\n", - " x[0] += s[0] * Wm[i]\n", - " x[1] += s[1] * Wm[i]\n", - " sum_sin += sin(s[2])*Wm[i]\n", - " sum_cos += cos(s[2])*Wm[i]\n", - " x[2] = atan2(sum_sin, sum_cos)\n", - " return x\n", - "\n", - " residual_fn : callable (x, y), optional\n", - "\n", - " Function that computes the residual (difference) between x and y.\n", - " You will have to supply this if your state variable cannot support\n", - " subtraction, such as angles (359-1 degreees is 2, not 358). x and y\n", - " are state vectors, not scalars.\n", - "\n", - " .. code-block:: Python\n", - "\n", - " def residual(a, b):\n", - " y = a[0] - b[0]\n", - " if y > np.pi:\n", - " y -= 2*np.pi\n", - " if y < -np.pi:\n", - " y = 2*np.pi\n", - " return y\n", - "\n", - "\n", - " **Returns**\n", - "\n", - " x : ndarray [dimension]\n", - " Mean of the sigma points after passing through the transform.\n", - "\n", - " P : ndarray\n", - " covariance of the sigma points after passing throgh the transform.\n", - " \"\"\"\n", - "\n", - " kmax, n = sigmas.shape\n", - "\n", - "\n", - " if mean_fn is None:\n", - " # new mean is just the sum of the sigmas * weight\n", - " x = np.dot(Wm, sigmas) # dot = \\Sigma^n_1 (W[k]*Xi[k])\n", - " else:\n", - " x = mean_fn(sigmas, Wm)\n", - "\n", - "\n", - " # new covariance is the sum of the outer product of the residuals\n", - " # times the weights\n", - "\n", - " # this is the fast way to do this - see 'else' for the slow way\n", - " if residual_fn is None:\n", - " y = sigmas - x[np.newaxis,:]\n", - " P = y.T.dot(np.diag(Wc)).dot(y)\n", - " else:\n", - " P = np.zeros((n, n))\n", - " for k in range(kmax):\n", - " y = residual_fn(sigmas[k], x)\n", - " P += Wc[k] * np.outer(y, y)\n", - "\n", - " if noise_cov is not None:\n", - " P += noise_cov\n", - "\n", - " return (x, P)" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## Monte Carlo" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "collapsed": true - }, - "outputs": [], - "source": [ - "# %load https://raw.githubusercontent.com/rlabbe/filterpy/master/filterpy/monte_carlo/resampling.py\n", - "\n", - "\"\"\"Copyright 2015 Roger R Labbe Jr.\n", - "\n", - "FilterPy library.\n", - "http://github.com/rlabbe/filterpy\n", - "\n", - "Documentation at:\n", - "https://filterpy.readthedocs.org\n", - "\n", - "Supporting book at:\n", - "https://github.com/rlabbe/Kalman-and-Bayesian-Filters-in-Python\n", - "\n", - "This is licensed under an MIT license. See the readme.MD file\n", - "for more information.\n", - "\"\"\"\n", - "\n", - "\n", - "import numpy as np\n", - "from numpy.random import random\n", - "\n", - "\n", - "\n", - "def residual_resample(weights):\n", - " \"\"\" Performs the residual resampling algorithm used by particle filters.\n", - "\n", - " Based on observation that we don't need to use random numbers to select\n", - " most of the weights. Take int(N*w^i) samples of each particle i, and then\n", - " resample any remaining using a standard resampling algorithm [1]\n", - "\n", - "\n", - " **Parameters**\n", - " weights : list-like of float\n", - " list of weights as floats\n", - "\n", - " **Returns**\n", - "\n", - " indexes : ndarray of ints\n", - " array of indexes into the weights defining the resample. i.e. the\n", - " index of the zeroth resample is indexes[0], etc.\n", - "\n", - " **References**\n", - " .. [1] J. S. Liu and R. Chen. Sequential Monte Carlo methods for dynamic\n", - " systems. Journal of the American Statistical Association,\n", - " 93(443):1032–1044, 1998.\n", - " \"\"\"\n", - "\n", - " N = len(weights)\n", - " indexes = np.zeros(N, 'i')\n", - "\n", - " # take int(N*w) copies of each weight, which ensures particles with the\n", - " # same weight are drawn uniformly\n", - " num_copies = (np.floor(N*np.asarray(weights))).astype(int)\n", - " k = 0\n", - " for i in range(N):\n", - " for _ in range(num_copies[i]): # make n copies\n", - " indexes[k] = i\n", - " k += 1\n", - "\n", - " # use multinormal resample on the residual to fill up the rest. This\n", - " # maximizes the variance of the samples\n", - " residual = weights - num_copies # get fractional part\n", - " residual /= sum(residual) # normalize\n", - " cumulative_sum = np.cumsum(residual)\n", - " cumulative_sum[-1] = 1. # avoid round-off errors: ensures sum is exactly one\n", - " indexes[k:N] = np.searchsorted(cumulative_sum, random(N-k))\n", - "\n", - " return indexes\n", - "\n", - "\n", - "\n", - "def stratified_resample(weights):\n", - " N = len(weights)\n", - " # make N subdivisions, and chose a random position within each one\n", - " positions = (random(N) + range(N)) / N\n", - "\n", - " indexes = np.zeros(N, 'i')\n", - " cumulative_sum = np.cumsum(weights)\n", - " i, j = 0, 0\n", - " while i < N:\n", - " if positions[i] < cumulative_sum[j]:\n", - " indexes[i] = j\n", - " i += 1\n", - " else:\n", - " j += 1\n", - " return indexes\n", - "\n", - "\n", - "def systematic_resample(weights):\n", - " N = len(weights)\n", - "\n", - " # make N subdivisions, and choose positions with a consistent random offset\n", - " positions = (random() + np.arange(N)) / N\n", - "\n", - " indexes = np.zeros(N, 'i')\n", - " cumulative_sum = np.cumsum(weights)\n", - " i, j = 0, 0\n", - " while i < N:\n", - " if positions[i] < cumulative_sum[j]:\n", - " indexes[i] = j\n", - " i += 1\n", - " else:\n", - " j += 1\n", - " return indexes\n", - "\n", - "\n", - "\n", - "def multinomial_resample(weights):\n", - " \"\"\" This is the naive form of roulette sampling where we compute the\n", - " cumulative sum of the weights and then use binary search to select the\n", - " resampled point based on a uniformly distributed random number. Run time\n", - " is O(n log n).\n", - "\n", - " **Parameters**\n", - " weights : list-like of float\n", - " list of weights as floats\n", - "\n", - " **Returns**\n", - "\n", - " indexes : ndarray of ints\n", - " array of indexes into the weights defining the resample. i.e. the\n", - " index of the zeroth resample is indexes[0], etc.\n", - " \"\"\"\n", - " cumulative_sum = np.cumsum(weights)\n", - " cumulative_sum[-1] = 1. # avoid round-off errors: ensures sum is exactly one\n", - " return np.searchsorted(cumulative_sum, random(len(weights)))" - ] - } - ], - "metadata": { - "kernelspec": { - "display_name": "Python 3", - "language": "python", - "name": "python3" - }, - "language_info": { - "codemirror_mode": { - "name": "ipython", - "version": 3 - }, - "file_extension": ".py", - "mimetype": "text/x-python", - "name": "python", - "nbconvert_exporter": "python", - "pygments_lexer": "ipython3", - "version": "3.5.0" - } - }, - "nbformat": 4, - "nbformat_minor": 0 -} diff --git a/old-content.ipynb b/old-content.ipynb deleted file mode 100644 index 6a5a28d..0000000 --- a/old-content.ipynb +++ /dev/null @@ -1,275 +0,0 @@ -{ - "cells": [ - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "This contains text and code that I've removed from the book because it is redundant or replaced with what I think of as better material. Maybe you liked it, so I'll save it here for now." - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## Exercise: Track a target moving in a circle" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "Change the simulated target movement to move in a circle. Avoid angular nonlinearities by putting the sensors well outside the movement range of the target, and avoid the angles $0^\\circ$ and $180^\\circ$." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "collapsed": true - }, - "outputs": [], - "source": [ - "# your solution here" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "### Solution\n", - "We have a few choices here. First, if we know the movement of the target we can design our filter's state transition function so that it correctly predicts the circular movement. For example, suppose we were tracking a boat race optically - we would want to take track shape into account with our filter. However, in this chapter we have not been talking about such constrained behavior, so I will not build knowledge of the movement into the filter. So my implementation looks like this." - ] - }, - { - "cell_type": "code", - "execution_count": 1, - "metadata": { - "collapsed": true - }, - "outputs": [], - "source": [ - "def plot_circular_target(kf, std_noise, target_pos):\n", - " xs = []\n", - " txs = []\n", - " radius = 100\n", - " for i in range(300):\n", - " target_pos[0] = math.cos(i/10)*radius + randn()*0.0001\n", - " target_pos[1] = math.sin(i/10)*radius + randn()*0.0001\n", - " txs.append((target_pos[0], target_pos[1]))\n", - "\n", - " z = measurement(sa_pos, sb_pos, target_pos)\n", - " z[0] += randn() * std_noise\n", - " z[1] += randn() * std_noise\n", - "\n", - " kf.predict()\n", - " kf.update(z)\n", - " xs.append(kf.x)\n", - "\n", - " xs = np.asarray(xs)\n", - " txs = np.asarray(txs)\n", - "\n", - " plt.plot(xs[:, 0], xs[:, 2])\n", - " plt.plot(txs[: ,0], txs[:, 1], linestyle='-.')\n", - " plt.axis('equal')\n", - " plt.show()\n", - "\n", - "sa_pos = [-240, 200]\n", - "sb_pos = [240, 200]" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "collapsed": true - }, - "outputs": [], - "source": [ - "np.random.seed(12283)\n", - "std_noise = math.radians(0.5)\n", - "target_pos = [0, 0]\n", - "f = moving_target_filter(target_pos, std_noise, Q=1.1)\n", - "plot_circular_target(f, std_noise, target_pos)" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "### Discussion\n", - "\n", - "The filter tracks the movement of the target, but never really converges on the track. This is because the filter is modeling a constant velocity target, but the target is anything but constant velocity. As mentioned above we could model the circular behavior by defining the `fx()` function, but then we would have problems when the target is not moving in a circle. Instead, lets tell the filter we are are less sure about our process model by making $\\mathbf{Q}$ larger. Here I have increased the variance from 0.1 to 1.0" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "collapsed": true - }, - "outputs": [], - "source": [ - "np.random.seed(12283)\n", - "std_noise = math.radians(0.5)\n", - "cf = moving_target_filter(target_pos, std_noise, Q=10.)\n", - "target_pos = [0, 0]\n", - "plot_circular_target(cf, std_noise, target_pos)" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "The convergence is not perfect, but it is far better. " - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## Exercise: Sensor Position Effects\n", - "\n", - "Is the behavior of the filter invariant for any sensor position? Find a sensor position that produces bad filter behavior." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "collapsed": true - }, - "outputs": [], - "source": [ - "# your answer here" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "### Solution\n", - "\n", - "We have already discussed the problem of angles being modal, so causing that problem by putting the sensors at `y=0` would be a trivial solution. However, let's be more subtle than that. We want to create a situation where there are an infinite number of solutions for the sensor readings. We can achieve that whenever the target lies on the straight line between the two sensors. In that case there is no triangulation possible and there is no unique solution. My solution is as follows." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "collapsed": true - }, - "outputs": [], - "source": [ - "std_noise = math.radians(0.5)\n", - "sa_pos = [-200, 200]\n", - "sb_pos = [200, -200]\n", - "plt.scatter(*sa_pos, s=200)\n", - "plt.scatter(*sb_pos, s=200)\n", - "target_pos = [0, 0]\n", - "cf = moving_target_filter(target_pos, std_noise, Q=10.)\n", - "plot_circular_target(cf, std_noise, target_pos)" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "I put the sensors at the upper left hand side and lower right hand side of the target's movement. We can see how the filter diverges when the target enters the region directly between the two sensors. The state transition always predicts that the target is moving in a straight line. When the target is between the two sensors this straight line movement is well described the bearing measurements from the two sensors so the filter estimate starts to approximate a straight line. " - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## Exercise: Compute Position Errors\n", - "\n", - "The position errors of the filter vary depending on how far away the target is from a sensor. Write a function that computes the distance error due to a bearing error. " - ] - }, - { - "cell_type": "code", - "execution_count": 2, - "metadata": { - "collapsed": true - }, - "outputs": [], - "source": [ - "# your solution here" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "### Solution\n", - "\n", - "Basic trigonometry gives us this answer." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "collapsed": true - }, - "outputs": [], - "source": [ - "def distance_error(target_distance, angle_error):\n", - " x = 1 - math.cos(angle_error)\n", - " y = math.sin(angle_error)\n", - " return target_distance*(x**2 + y**2)**.5\n", - "\n", - "d = distance_error(100, math.radians(1.))\n", - "print('\\n\\nError of 1 degree at 100km is {:.3}km'.format(d))" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "collapsed": true - }, - "outputs": [], - "source": [] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "collapsed": true - }, - "outputs": [], - "source": [] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "collapsed": true - }, - "outputs": [], - "source": [] - } - ], - "metadata": { - "kernelspec": { - "display_name": "Python 3", - "language": "python", - "name": "python3" - }, - "language_info": { - "codemirror_mode": { - "name": "ipython", - "version": 3 - }, - "file_extension": ".py", - "mimetype": "text/x-python", - "name": "python", - "nbconvert_exporter": "python", - "pygments_lexer": "ipython3", - "version": "3.4.1" - } - }, - "nbformat": 4, - "nbformat_minor": 0 -}