Kalman-and-Bayesian-Filters.../code/pf_internal.py
2015-12-09 06:31:14 -08:00

463 lines
14 KiB
Python

# -*- coding: utf-8 -*-
"""Copyright 2015 Roger R Labbe Jr.
Code supporting the book
Kalman and Bayesian Filters in Python
https://github.com/rlabbe/Kalman-and-Bayesian-Filters-in-Python
This is licensed under an MIT license. See the LICENSE.txt file
for more information.
"""
from __future__ import (absolute_import, division, print_function,
unicode_literals)
from filterpy.monte_carlo import stratified_resample
import matplotlib as mpl
import matplotlib.pyplot as plt
import numpy as np
from numpy.random import randn, random, uniform, multivariate_normal, seed
#from nonlinear_plots import plot_monte_carlo_mean
import scipy.stats
from RobotLocalizationParticleFilter import *
class ParticleFilter(object):
def __init__(self, N, x_dim, y_dim):
self.particles = np.empty((N, 3)) # x, y, heading
self.N = N
self.x_dim = x_dim
self.y_dim = y_dim
# distribute particles randomly with uniform weight
self.weights = np.empty(N)
self.weights.fill(1./N)
self.particles[:, 0] = uniform(0, x_dim, size=N)
self.particles[:, 1] = uniform(0, y_dim, size=N)
self.particles[:, 2] = uniform(0, 2*np.pi, size=N)
def predict(self, u, std):
""" move according to control input u with noise std"""
self.particles[:, 2] += u[0] + randn(self.N) * std[0]
self.particles[:, 2] %= 2 * np.pi
d = u[1] + randn(self.N)
self.particles[:, 0] += np.cos(self.particles[:, 2]) * d
self.particles[:, 1] += np.sin(self.particles[:, 2]) * d
self.particles[:, 0:2] += u + randn(self.N, 2) * std
def weight(self, z, var):
dist = np.sqrt((self.particles[:, 0] - z[0])**2 +
(self.particles[:, 1] - z[1])**2)
# simplification assumes variance is invariant to world projection
n = scipy.stats.norm(0, np.sqrt(var))
prob = n.pdf(dist)
# particles far from a measurement will give us 0.0 for a probability
# due to floating point limits. Once we hit zero we can never recover,
# so add some small nonzero value to all points.
prob += 1.e-12
self.weights += prob
self.weights /= sum(self.weights) # normalize
def neff(self):
return 1. / np.sum(np.square(self.weights))
def resample(self):
p = np.zeros((self.N, 3))
w = np.zeros(self.N)
cumsum = np.cumsum(self.weights)
for i in range(self.N):
index = np.searchsorted(cumsum, random())
p[i] = self.particles[index]
w[i] = self.weights[index]
self.particles = p
self.weights = w / np.sum(w)
def estimate(self):
""" returns mean and variance """
pos = self.particles[:, 0:2]
mu = np.average(pos, weights=self.weights, axis=0)
var = np.average((pos - mu)**2, weights=self.weights, axis=0)
return mu, var
def plot_random_pd():
def norm(x, x0, sigma):
return np.exp(-0.5 * (x - x0) ** 2 / sigma ** 2)
def sigmoid(x, x0, alpha):
return 1. / (1. + np.exp(- (x - x0) / alpha))
x = np.linspace(0, 1, 100)
y2 = (0.1 * np.sin(norm(x, 0.2, 0.05)) + 0.25 * norm(x, 0.6, 0.05) +
.5*norm(x, .5, .08) +
np.sqrt(norm(x, 0.8, 0.06)) +0.1 * (1 - sigmoid(x, 0.45, 0.15)))
with plt.xkcd():
#plt.setp(plt.gca().get_xticklabels(), visible=False)
#plt.setp(plt.gca().get_yticklabels(), visible=False)
plt.axes(xticks=[], yticks=[], frameon=False)
plt.plot(x, y2)
plt.ylim([0, max(y2)+.1])
def plot_monte_carlo_ukf():
def f(x,y):
return x+y, .1*x**2 + y*y
mean = (0, 0)
p = np.array([[32, 15], [15., 40.]])
# Compute linearized mean
mean_fx = f(*mean)
#generate random points
xs, ys = multivariate_normal(mean=mean, cov=p, size=3000).T
fxs, fys = f(xs, ys)
plt.subplot(121)
plt.gca().grid(b=False)
plt.scatter(xs, ys, marker='.', alpha=.2, color='k')
plt.xlim(-25, 25)
plt.ylim(-25, 25)
plt.subplot(122)
plt.gca().grid(b=False)
plt.scatter(fxs, fys, marker='.', alpha=0.2, color='k')
plt.ylim([-10, 200])
plt.xlim([-100, 100])
plt.show()
def plot_pf(pf, xlim=100, ylim=100, weights=True):
if weights:
a = plt.subplot(221)
a.cla()
plt.xlim(0, ylim)
#plt.ylim(0, 1)
a.set_yticklabels('')
plt.scatter(pf.particles[:, 0], pf.weights, marker='.', s=1, color='k')
a.set_ylim(bottom=0)
a = plt.subplot(224)
a.cla()
a.set_xticklabels('')
plt.scatter(pf.weights, pf.particles[:, 1], marker='.', s=1, color='k')
plt.ylim(0, xlim)
a.set_xlim(left=0)
#plt.xlim(0, 1)
a = plt.subplot(223)
a.cla()
else:
plt.cla()
plt.scatter(pf.particles[:, 0], pf.particles[:, 1], marker='.', s=1, color='k')
plt.xlim(0, xlim)
plt.ylim(0, ylim)
def Gaussian(mu, sigma, x):
# calculates the probability of x for 1-dim Gaussian with mean mu and var. sigma
g = (np.exp(-((mu - x) ** 2) / (sigma ** 2) / 2.0) /
np.sqrt(2.0 * np.pi * (sigma ** 2)))
for i in range(len(g)):
g[i] = max(g[i], 1.e-29)
return g
def test_gaussian(N):
for i in range(N):
mean, std, x = randn(3)
std = abs(std)
d = Gaussian(mean, std, x) - scipy.stats.norm(mean, std).pdf(x)
assert abs(d) < 1.e-8, "{}, {}, {}, {}, {}, {}".format(d, mean, std, x, Gaussian(mean, std, x), scipy.stats.norm(mean, std).pdf(x))
def show_two_pf_plots():
""" Displays results of PF after 1 and 10 iterations for the book.
Note the book says this solves the full robot localization problem.
It doesn't bother simulating landmarks as this is just an illustration.
"""
seed(1234)
N = 3000
pf = ParticleFilter(N, 20, 20)
z = np.array([20, 20])
#plot(pf, weights=False)
for x in range(10):
z[0] = x+1 + randn()*0.3
z[1] = x+1 + randn()*0.3
pf.predict((1,1), (0.2, 0.2))
pf.weight(z=z, var=.8)
pf.resample()
if x == 0:
plt.subplot(121)
elif x == 9:
plt.subplot(122)
if x == 0 or x == 9:
mu, var = pf.estimate()
plot_pf(pf, 20, 20, weights=False)
if x == 0:
plt.scatter(mu[0], mu[1], color='g', s=100)
plt.scatter(x+1, x+1, marker='x', color='r', s=180, lw=3)
else:
plt.scatter(mu[0], mu[1], color='g', s=100, label="PF")
plt.scatter([x+1], [x+1], marker='x', color='r', s=180, label="True", lw=3)
plt.legend(scatterpoints=1)
plt.tight_layout()
def test_pf():
#seed(1234)
N = 10000
R = .2
landmarks = [[-1, 2], [20,4], [10,30], [18,25]]
#landmarks = [[-1, 2], [2,4]]
pf = RobotLocalizationParticleFilter(N, 20, 20, landmarks, R)
plot_pf(pf, 20, 20, weights=False)
dt = .01
plt.pause(dt)
for x in range(18):
zs = []
pos=(x+3, x+3)
for landmark in landmarks:
d = np.sqrt((landmark[0]-pos[0])**2 + (landmark[1]-pos[1])**2)
zs.append(d + randn()*R)
pf.predict((0.01, 1.414), (.2, .05))
pf.update(z=zs)
pf.resample()
#print(x, np.array(list(zip(pf.particles, pf.weights))))
mu, var = pf.estimate()
plot_pf(pf, 20, 20, weights=False)
plt.plot(pos[0], pos[1], marker='*', color='r', ms=10)
plt.scatter(mu[0], mu[1], color='g', s=100)
plt.tight_layout()
plt.pause(dt)
def test_pf2():
N = 1000
sensor_std_err = .2
landmarks = [[-1, 2], [20,4], [-20,6], [18,25]]
pf = RobotLocalizationParticleFilter(N, 20, 20, landmarks, sensor_std_err)
xs = []
for x in range(18):
zs = []
pos=(x+1, x+1)
for landmark in landmarks:
d = np.sqrt((landmark[0]-pos[0])**2 + (landmark[1]-pos[1])**2)
zs.append(d + randn()*sensor_std_err)
# move diagonally forward to (x+1, x+1)
pf.predict((0.00, 1.414), (.2, .05))
pf.update(z=zs)
pf.resample()
mu, var = pf.estimate()
xs.append(mu)
xs = np.array(xs)
plt.plot(xs[:, 0], xs[:, 1])
plt.show()
def plot_cumsum(a):
N = len(a)
cmap = mpl.colors.ListedColormap([[0., .4, 1.],
[0., .8, 1.],
[1., .8, 0.],
[1., .4, 0.]]*(int(N/4) + 1))
cumsum = np.cumsum(np.asarray(a) / np.sum(a))
cumsum = np.insert(cumsum, 0, 0)
fig = plt.figure(figsize=(6,3))
ax = fig.add_axes([0.05, 0.475, 0.9, 0.15])
norm = mpl.colors.BoundaryNorm(cumsum, cmap.N)
bar = mpl.colorbar.ColorbarBase(ax, cmap=cmap,
norm=norm,
drawedges=False,
spacing='proportional',
orientation='horizontal')
if N > 10:
bar.set_ticks([])
plt.show()
def plot_stratified_resample(a):
N = len(a)
cmap = mpl.colors.ListedColormap([[0., .4, 1.],
[0., .8, 1.],
[1., .8, 0.],
[1., .4, 0.]]*(int(N/4) + 1))
cumsum = np.cumsum(np.asarray(a) / np.sum(a))
cumsum = np.insert(cumsum, 0, 0)
fig = plt.figure(figsize=(6,3))
ax = fig.add_axes([0.05, 0.475, 0.9, 0.15])
norm = mpl.colors.BoundaryNorm(cumsum, cmap.N)
bar = mpl.colorbar.ColorbarBase(ax, cmap=cmap,
norm=norm,
drawedges=False,
spacing='proportional',
orientation='horizontal')
xs = np.linspace(0., 1.-1./N, N)
ax.vlines(xs, 0, 1, lw=2)
# make N subdivisions, and chose a random position within each one
b = (random(N) + range(N)) / N
plt.scatter(b, [.5]*len(b), s=60, facecolor='k', edgecolor='k')
bar.set_ticks([])
plt.title('stratified resampling')
plt.show()
def plot_systematic_resample(a):
N = len(a)
cmap = mpl.colors.ListedColormap([[0., .4, 1.],
[0., .8, 1.],
[1., .8, 0.],
[1., .4, 0.]]*(int(N/4) + 1))
cumsum = np.cumsum(np.asarray(a) / np.sum(a))
cumsum = np.insert(cumsum, 0, 0)
fig = plt.figure(figsize=(6,3))
ax = fig.add_axes([0.05, 0.475, 0.9, 0.15])
norm = mpl.colors.BoundaryNorm(cumsum, cmap.N)
bar = mpl.colorbar.ColorbarBase(ax, cmap=cmap,
norm=norm,
drawedges=False,
spacing='proportional',
orientation='horizontal')
xs = np.linspace(0., 1.-1./N, N)
ax.vlines(xs, 0, 1, lw=2)
# make N subdivisions, and chose a random position within each one
b = (random() + np.array(range(N))) / N
plt.scatter(b, [.5]*len(b), s=60, facecolor='k', edgecolor='k')
bar.set_ticks([])
plt.title('systematic resampling')
plt.show()
def plot_multinomial_resample(a):
N = len(a)
cmap = mpl.colors.ListedColormap([[0., .4, 1.],
[0., .8, 1.],
[1., .8, 0.],
[1., .4, 0.]]*(int(N/4) + 1))
cumsum = np.cumsum(np.asarray(a) / np.sum(a))
cumsum = np.insert(cumsum, 0, 0)
fig = plt.figure(figsize=(6,3))
ax = fig.add_axes([0.05, 0.475, 0.9, 0.15])
norm = mpl.colors.BoundaryNorm(cumsum, cmap.N)
bar = mpl.colorbar.ColorbarBase(ax, cmap=cmap,
norm=norm,
drawedges=False,
spacing='proportional',
orientation='horizontal')
# make N subdivisions, and chose a random position within each one
b = random(N)
plt.scatter(b, [.5]*len(b), s=60, facecolor='k', edgecolor='k')
bar.set_ticks([])
plt.title('multinomial resampling')
plt.show()
def plot_residual_resample(a):
N = len(a)
a_norm = np.asarray(a) / np.sum(a)
cumsum = np.cumsum(a_norm)
cumsum = np.insert(cumsum, 0, 0)
cmap = mpl.colors.ListedColormap([[0., .4, 1.],
[0., .8, 1.],
[1., .8, 0.],
[1., .4, 0.]]*(int(N/4) + 1))
fig = plt.figure(figsize=(6,3))
ax = fig.add_axes([0.05, 0.475, 0.9, 0.15])
norm = mpl.colors.BoundaryNorm(cumsum, cmap.N)
bar = mpl.colorbar.ColorbarBase(ax, cmap=cmap,
norm=norm,
drawedges=False,
spacing='proportional',
orientation='horizontal')
indexes = residual_resample(a_norm)
bins = np.bincount(indexes)
for i in range(1, N):
n = bins[i-1] # number particles in this sample
if n > 0:
b = np.linspace(cumsum[i-1], cumsum[i], n+2)[1:-1]
plt.scatter(b, [.5]*len(b), s=60, facecolor='k', edgecolor='k')
bar.set_ticks([])
plt.title('residual resampling')
plt.show()
if __name__ == '__main__':
plot_residual_resample([.1, .2, .3, .4, .2, .3, .1])
#example()
#show_two_pf_plots()
a = [.1, .2, .1, .6]
#plot_cumsum(a)
#test_pf()