Checkpoint. Added a lot to the particle filter chapter.
This commit is contained in:
parent
7cf3980e8c
commit
1b92397175
File diff suppressed because one or more lines are too long
263
code/RobotLocalizationParticleFilter.py
Normal file
263
code/RobotLocalizationParticleFilter.py
Normal file
@ -0,0 +1,263 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
"""
|
||||
Created on Sat Jun 27 19:45:45 2015
|
||||
|
||||
@author: Roger
|
||||
"""
|
||||
|
||||
import numpy as np
|
||||
import numpy.random
|
||||
from numpy.random import randn, random, uniform
|
||||
import scipy.stats
|
||||
|
||||
|
||||
class RobotLocalizationParticleFilter(object):
|
||||
|
||||
def __init__(self, N, x_dim, y_dim, landmarks, measure_std_error):
|
||||
self.particles = np.empty((N, 3)) # x, y, heading
|
||||
self.N = N
|
||||
self.x_dim = x_dim
|
||||
self.y_dim = y_dim
|
||||
self.landmarks = landmarks
|
||||
self.R = measure_std_error
|
||||
|
||||
# distribute particles randomly with uniform weight
|
||||
self.weights = np.empty(N)
|
||||
#self.weights.fill(1./N)
|
||||
self.particles[:, 0] = uniform(0, x_dim, size=N)
|
||||
self.particles[:, 1] = uniform(0, y_dim, size=N)
|
||||
self.particles[:, 2] = uniform(0, 2*np.pi, size=N)
|
||||
|
||||
|
||||
def create_uniform_particles(self, x_range, y_range, hdg_range):
|
||||
self.particles[:, 0] = uniform(x_range[0], x_range[1], size=N)
|
||||
self.particles[:, 1] = uniform(y_range[0], y_range[1], size=N)
|
||||
self.particles[:, 2] = uniform(hdg_range[0], hdg_range[1], size=N)
|
||||
self.particles[:, 2] %= 2 * np.pi
|
||||
|
||||
def create_gaussian_particles(self, mean, var):
|
||||
self.particles[:, 0] = mean[0] + randn(self.N)*var[0]
|
||||
self.particles[:, 1] = mean[1] + randn(self.N)*var[1]
|
||||
self.particles[:, 2] = mean[2] + randn(self.N)*var[2]
|
||||
self.particles[:, 2] %= 2 * np.pi
|
||||
|
||||
|
||||
def predict(self, u, std, dt=1.):
|
||||
""" move according to control input u (heading change, velocity)
|
||||
with noise std"""
|
||||
|
||||
self.particles[:, 2] += u[0] + randn(self.N) * std[0]
|
||||
self.particles[:, 2] %= 2 * np.pi
|
||||
|
||||
d = u[1]*dt + randn(self.N) * std[1]
|
||||
self.particles[:, 0] += np.cos(self.particles[:, 2]) * d
|
||||
self.particles[:, 1] += np.sin(self.particles[:, 2]) * d
|
||||
|
||||
|
||||
def update(self, z):
|
||||
self.weights.fill(1.)
|
||||
for i, landmark in enumerate(self.landmarks):
|
||||
distance = np.linalg.norm(self.particles[:, 0:2] - landmark, axis=1)
|
||||
self.weights *= scipy.stats.norm(distance, self.R).pdf(z[i])
|
||||
#self.weights *= Gaussian(distance, self.R, z[i])
|
||||
|
||||
self.weights += 1.e-300
|
||||
self.weights /= sum(self.weights) # normalize
|
||||
|
||||
|
||||
def neff(self):
|
||||
return 1. / np.sum(np.square(self.weights))
|
||||
|
||||
|
||||
def resample(self):
|
||||
p = np.zeros((self.N, 3))
|
||||
w = np.zeros(self.N)
|
||||
|
||||
cumsum = np.cumsum(self.weights)
|
||||
for i in range(self.N):
|
||||
index = np.searchsorted(cumsum, random())
|
||||
p[i] = self.particles[index]
|
||||
w[i] = self.weights[index]
|
||||
|
||||
self.particles = p
|
||||
assert np.sum(w) != 0
|
||||
assert w[0] is not np.nan
|
||||
self.weights = w / np.sum(w)
|
||||
|
||||
|
||||
def resample_from_index(self, indexes):
|
||||
assert len(indexes) == self.N
|
||||
|
||||
p = np.zeros((self.N, 3))
|
||||
w = np.zeros(self.N)
|
||||
for i, index in enumerate(indexes):
|
||||
p[i] = self.particles[index]
|
||||
w[i] = self.weights[index]
|
||||
|
||||
self.particles = p
|
||||
self.weights = w / np.sum(w)
|
||||
|
||||
|
||||
|
||||
def estimate(self):
|
||||
""" returns mean and variance """
|
||||
pos = self.particles[:, 0:2]
|
||||
mu = np.average(pos, weights=self.weights, axis=0)
|
||||
var = np.average((pos - mu)**2, weights=self.weights, axis=0)
|
||||
|
||||
return mu, var
|
||||
|
||||
def mean(self):
|
||||
""" returns weighted mean position"""
|
||||
return np.average(self.particles[:, 0:2], weights=self.weights, axis=0)
|
||||
|
||||
|
||||
|
||||
def residual_resample(w):
|
||||
|
||||
N = len(w)
|
||||
|
||||
w_ints = np.floor(N*w).astype(int)
|
||||
residual = w - w_ints
|
||||
residual /= sum(residual)
|
||||
|
||||
indexes = np.zeros(N, 'i')
|
||||
k = 0
|
||||
for i in range(N):
|
||||
for j in range(w_ints[i]):
|
||||
indexes[k] = i
|
||||
k += 1
|
||||
cumsum = np.cumsum(residual)
|
||||
cumsum[N-1] = 1.
|
||||
for j in range(k, N):
|
||||
indexes[j] = np.searchsorted(cumsum, random())
|
||||
|
||||
return indexes
|
||||
|
||||
|
||||
|
||||
def residual_resample2(w):
|
||||
|
||||
N = len(w)
|
||||
|
||||
w_ints =np.floor(N*w).astype(int)
|
||||
|
||||
R = np.sum(w_ints)
|
||||
m_rdn = N - R
|
||||
|
||||
Ws = (N*w - w_ints)/ m_rdn
|
||||
indexes = np.zeros(N, 'i')
|
||||
i = 0
|
||||
for j in range(N):
|
||||
for k in range(w_ints[j]):
|
||||
indexes[i] = j
|
||||
i += 1
|
||||
cumsum = np.cumsum(Ws)
|
||||
cumsum[N-1] = 1 # just in case
|
||||
|
||||
for j in range(i, N):
|
||||
indexes[j] = np.searchsorted(cumsum, random())
|
||||
|
||||
return indexes
|
||||
|
||||
|
||||
|
||||
def systemic_resample(w):
|
||||
N = len(w)
|
||||
Q = np.cumsum(w)
|
||||
indexes = np.zeros(N)
|
||||
t = np.linspace(0, 1-1/N, N) + random()/N
|
||||
|
||||
i, j = 0, 0
|
||||
while i < N and j < N:
|
||||
while Q[j] < t[i]:
|
||||
j += 1
|
||||
indexes[i] = j
|
||||
i += 1
|
||||
|
||||
return indexes
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
def Gaussian(mu, sigma, x):
|
||||
|
||||
# calculates the probability of x for 1-dim Gaussian with mean mu and var. sigma
|
||||
g = (np.exp(-((mu - x) ** 2) / (sigma ** 2) / 2.0) /
|
||||
np.sqrt(2.0 * np.pi * (sigma ** 2)))
|
||||
for i in range(len(g)):
|
||||
g[i] = max(g[i], 1.e-229)
|
||||
return g
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
||||
DO_PLOT_PARTICLES = False
|
||||
from numpy.random import seed
|
||||
import matplotlib.pyplot as plt
|
||||
|
||||
#plt.figure()
|
||||
|
||||
seed(5)
|
||||
for count in range(1):
|
||||
print()
|
||||
print(count)
|
||||
#numpy.random.set_state(fail_state)
|
||||
#if count == 12:
|
||||
# #fail_state = numpy.random.get_state()
|
||||
# DO_PLOT_PARTICLES = True
|
||||
|
||||
N = 4000
|
||||
sensor_std_err = .1
|
||||
landmarks = np.array([[-1, 2], [2,4], [10,6], [18,25]])
|
||||
NL = len(landmarks)
|
||||
|
||||
#landmarks = [[-1, 2], [2,4]]
|
||||
|
||||
pf = RobotLocalizationParticleFilter(N, 20, 20, landmarks, sensor_std_err)
|
||||
#pf.create_gaussian_particles([3, 2, 0], [5, 5, 2])
|
||||
pf.create_uniform_particles((0,20), (0,20), (0, 6.28))
|
||||
|
||||
if DO_PLOT_PARTICLES:
|
||||
plt.scatter(pf.particles[:, 0], pf.particles[:, 1], alpha=.2, color='g')
|
||||
|
||||
xs = []
|
||||
for x in range(18):
|
||||
zs = []
|
||||
pos=(x+1, x+1)
|
||||
|
||||
for landmark in landmarks:
|
||||
d = np.sqrt((landmark[0]-pos[0])**2 + (landmark[1]-pos[1])**2)
|
||||
zs.append(d + randn()*sensor_std_err)
|
||||
|
||||
|
||||
zs = np.linalg.norm(landmarks - pos, axis=1) + randn(NL)*sensor_std_err
|
||||
|
||||
|
||||
# move diagonally forward to (x+1, x+1)
|
||||
pf.predict((0.00, 1.414), (.2, .05))
|
||||
pf.update(z=zs)
|
||||
if x == 0:
|
||||
print(max(pf.weights))
|
||||
#while abs(pf.neff() -N) < .1:
|
||||
# print('neffing')
|
||||
# pf.create_uniform_particles((0,20), (0,20), (0, 6.28))
|
||||
# pf.update(z=zs)
|
||||
#print(pf.neff())
|
||||
#indexes = residual_resample2(pf.weights)
|
||||
indexes = systemic_resample(pf.weights)
|
||||
|
||||
pf.resample_from_index(indexes)
|
||||
#pf.resample()
|
||||
|
||||
mu, var = pf.estimate()
|
||||
xs.append(mu)
|
||||
if DO_PLOT_PARTICLES:
|
||||
plt.scatter(pf.particles[:, 0], pf.particles[:, 1], alpha=.2)
|
||||
plt.scatter(pos[0], pos[1], marker='*', color='r')
|
||||
plt.scatter(mu[0], mu[1], marker='s', color='r')
|
||||
plt.pause(.01)
|
||||
|
||||
xs = np.array(xs)
|
||||
plt.plot(xs[:, 0], xs[:, 1])
|
||||
plt.show()
|
@ -153,10 +153,6 @@ def hinton(W, maxweight=None):
|
||||
if reenable:
|
||||
plt.ion()
|
||||
|
||||
if __name__ == "__main__":
|
||||
hinton(np.random.randn(20, 20))
|
||||
plt.title('Example Hinton diagram - 20x20 random normal')
|
||||
plt.show()
|
||||
|
||||
if __name__ == "__main__":
|
||||
p = [0.2245871, 0.06288015, 0.06109133, 0.0581008, 0.09334062, 0.2245871,
|
||||
|
@ -1,62 +1,10 @@
|
||||
import numpy as np
|
||||
import pylab as plt
|
||||
from matplotlib.patches import Circle, Rectangle, Polygon, Arrow, FancyArrow
|
||||
import book_plots
|
||||
import numpy as np
|
||||
from numpy.random import randn, random, uniform, multivariate_normal, seed
|
||||
from nonlinear_plots import plot_monte_carlo_mean
|
||||
import scipy
|
||||
#from nonlinear_plots import plot_monte_carlo_mean
|
||||
import pylab as plt
|
||||
import scipy.stats
|
||||
from RobotLocalizationParticleFilter import *
|
||||
|
||||
def plot_random_pd():
|
||||
def norm(x, x0, sigma):
|
||||
return np.exp(-0.5 * (x - x0) ** 2 / sigma ** 2)
|
||||
|
||||
|
||||
def sigmoid(x, x0, alpha):
|
||||
return 1. / (1. + np.exp(- (x - x0) / alpha))
|
||||
|
||||
x = np.linspace(0, 1, 100)
|
||||
y2 = (0.1 * np.sin(norm(x, 0.2, 0.05)) + 0.25 * norm(x, 0.6, 0.05) +
|
||||
.5*norm(x, .5, .08) +
|
||||
np.sqrt(norm(x, 0.8, 0.06)) +0.1 * (1 - sigmoid(x, 0.45, 0.15)))
|
||||
with plt.xkcd():
|
||||
#plt.setp(plt.gca().get_xticklabels(), visible=False)
|
||||
#plt.setp(plt.gca().get_yticklabels(), visible=False)
|
||||
plt.axes(xticks=[], yticks=[], frameon=False)
|
||||
plt.plot(x, y2)
|
||||
|
||||
|
||||
|
||||
def plot_monte_carlo_ukf():
|
||||
|
||||
def f(x,y):
|
||||
return x+y, .1*x**2 + y*y
|
||||
|
||||
mean = (0, 0)
|
||||
p = np.array([[32, 15], [15., 40.]])
|
||||
|
||||
# Compute linearized mean
|
||||
mean_fx = f(*mean)
|
||||
|
||||
#generate random points
|
||||
xs, ys = multivariate_normal(mean=mean, cov=p, size=3000).T
|
||||
fxs, fys = f(xs, ys)
|
||||
|
||||
plt.subplot(121)
|
||||
plt.gca().grid(b=False)
|
||||
|
||||
plt.scatter(xs, ys, marker='.', alpha=.2, color='k')
|
||||
plt.xlim(-25, 25)
|
||||
plt.ylim(-25, 25)
|
||||
|
||||
plt.subplot(122)
|
||||
plt.gca().grid(b=False)
|
||||
|
||||
plt.scatter(fxs, fys, marker='.', alpha=0.2, color='k')
|
||||
|
||||
plt.ylim([-10, 200])
|
||||
plt.xlim([-100, 100])
|
||||
plt.show()
|
||||
|
||||
|
||||
|
||||
@ -76,30 +24,6 @@ class ParticleFilter(object):
|
||||
self.particles[:, 2] = uniform(0, 2*np.pi, size=N)
|
||||
|
||||
|
||||
def create_particles(self, mean, variance):
|
||||
""" create particles with the specified mean and variance"""
|
||||
self.particles[:, 0] = mean[0] + randn(self.N) * np.sqrt(variance)
|
||||
self.particles[:, 1] = mean[1] + randn(self.N) * np.sqrt(variance)
|
||||
|
||||
def create_particle(self):
|
||||
""" create particles uniformly distributed over entire space"""
|
||||
return [uniform(0, self.x_dim), uniform(0, self.y_dim), 0, 0]
|
||||
|
||||
|
||||
'''def assign_speed_by_gaussian(self, speed, var):
|
||||
""" move every particle by the specified speed (assuming time=1.)
|
||||
with the specified variance, assuming Gaussian distribution. """
|
||||
|
||||
self.particles[:, 2] = np.random.normal(speed, var, self.N)'''
|
||||
|
||||
def control(self, dx):
|
||||
self.particles[:, 0] += dx[0]
|
||||
self.particles[:, 1] += dx[1]
|
||||
|
||||
|
||||
|
||||
self.particles[:, 1] = (self.particles[:, 1] + vy*dt)
|
||||
|
||||
|
||||
def predict(self, u, std):
|
||||
""" move according to control input u with noise std"""
|
||||
@ -158,6 +82,57 @@ class ParticleFilter(object):
|
||||
|
||||
|
||||
|
||||
def plot_random_pd():
|
||||
def norm(x, x0, sigma):
|
||||
return np.exp(-0.5 * (x - x0) ** 2 / sigma ** 2)
|
||||
|
||||
|
||||
def sigmoid(x, x0, alpha):
|
||||
return 1. / (1. + np.exp(- (x - x0) / alpha))
|
||||
|
||||
x = np.linspace(0, 1, 100)
|
||||
y2 = (0.1 * np.sin(norm(x, 0.2, 0.05)) + 0.25 * norm(x, 0.6, 0.05) +
|
||||
.5*norm(x, .5, .08) +
|
||||
np.sqrt(norm(x, 0.8, 0.06)) +0.1 * (1 - sigmoid(x, 0.45, 0.15)))
|
||||
with plt.xkcd():
|
||||
#plt.setp(plt.gca().get_xticklabels(), visible=False)
|
||||
#plt.setp(plt.gca().get_yticklabels(), visible=False)
|
||||
plt.axes(xticks=[], yticks=[], frameon=False)
|
||||
plt.plot(x, y2)
|
||||
|
||||
|
||||
|
||||
def plot_monte_carlo_ukf():
|
||||
|
||||
def f(x,y):
|
||||
return x+y, .1*x**2 + y*y
|
||||
|
||||
mean = (0, 0)
|
||||
p = np.array([[32, 15], [15., 40.]])
|
||||
|
||||
# Compute linearized mean
|
||||
mean_fx = f(*mean)
|
||||
|
||||
#generate random points
|
||||
xs, ys = multivariate_normal(mean=mean, cov=p, size=3000).T
|
||||
fxs, fys = f(xs, ys)
|
||||
|
||||
plt.subplot(121)
|
||||
plt.gca().grid(b=False)
|
||||
|
||||
plt.scatter(xs, ys, marker='.', alpha=.2, color='k')
|
||||
plt.xlim(-25, 25)
|
||||
plt.ylim(-25, 25)
|
||||
|
||||
plt.subplot(122)
|
||||
plt.gca().grid(b=False)
|
||||
|
||||
plt.scatter(fxs, fys, marker='.', alpha=0.2, color='k')
|
||||
|
||||
plt.ylim([-10, 200])
|
||||
plt.xlim([-100, 100])
|
||||
plt.show()
|
||||
|
||||
|
||||
def plot_pf(pf, xlim=100, ylim=100, weights=True):
|
||||
|
||||
@ -189,6 +164,26 @@ def plot_pf(pf, xlim=100, ylim=100, weights=True):
|
||||
plt.ylim(0, ylim)
|
||||
|
||||
|
||||
|
||||
def Gaussian(mu, sigma, x):
|
||||
|
||||
# calculates the probability of x for 1-dim Gaussian with mean mu and var. sigma
|
||||
g = (np.exp(-((mu - x) ** 2) / (sigma ** 2) / 2.0) /
|
||||
np.sqrt(2.0 * np.pi * (sigma ** 2)))
|
||||
for i in range(len(g)):
|
||||
g[i] = max(g[i], 1.e-29)
|
||||
|
||||
return g
|
||||
|
||||
def test_gaussian(N):
|
||||
for i in range(N):
|
||||
mean, std, x = randn(3)
|
||||
std = abs(std)
|
||||
|
||||
d = Gaussian(mean, std, x) - scipy.stats.norm(mean, std).pdf(x)
|
||||
assert abs(d) < 1.e-8, "{}, {}, {}, {}, {}, {}".format(d, mean, std, x, Gaussian(mean, std, x), scipy.stats.norm(mean, std).pdf(x))
|
||||
|
||||
|
||||
def show_two_pf_plots():
|
||||
""" Displays results of PF after 1 and 10 iterations for the book.
|
||||
Note the book says this solves the full robot localization problem.
|
||||
@ -229,5 +224,74 @@ def show_two_pf_plots():
|
||||
plt.tight_layout()
|
||||
|
||||
|
||||
|
||||
|
||||
def test_pf():
|
||||
|
||||
#seed(1234)
|
||||
N = 10000
|
||||
R = .2
|
||||
landmarks = [[-1, 2], [20,4], [10,30], [18,25]]
|
||||
#landmarks = [[-1, 2], [2,4]]
|
||||
|
||||
pf = RobotLocalizationParticleFilter(N, 20, 20, landmarks, R)
|
||||
|
||||
plot_pf(pf, 20, 20, weights=False)
|
||||
|
||||
dt = .01
|
||||
plt.pause(dt)
|
||||
|
||||
for x in range(18):
|
||||
|
||||
zs = []
|
||||
pos=(x+3, x+3)
|
||||
|
||||
for landmark in landmarks:
|
||||
d = np.sqrt((landmark[0]-pos[0])**2 + (landmark[1]-pos[1])**2)
|
||||
zs.append(d + randn()*R)
|
||||
|
||||
pf.predict((0.01, 1.414), (.2, .05))
|
||||
pf.update(z=zs)
|
||||
pf.resample()
|
||||
#print(x, np.array(list(zip(pf.particles, pf.weights))))
|
||||
|
||||
mu, var = pf.estimate()
|
||||
plot_pf(pf, 20, 20, weights=False)
|
||||
plt.plot(pos[0], pos[1], marker='*', color='r', ms=10)
|
||||
plt.scatter(mu[0], mu[1], color='g', s=100)
|
||||
plt.tight_layout()
|
||||
plt.pause(dt)
|
||||
#print(mu - pos)
|
||||
|
||||
|
||||
def test_pf2():
|
||||
N = 1000
|
||||
sensor_std_err = .2
|
||||
landmarks = [[-1, 2], [20,4], [-20,6], [18,25]]
|
||||
|
||||
pf = RobotLocalizationParticleFilter(N, 20, 20, landmarks, sensor_std_err)
|
||||
|
||||
xs = []
|
||||
for x in range(18):
|
||||
zs = []
|
||||
pos=(x+1, x+1)
|
||||
|
||||
for landmark in landmarks:
|
||||
d = np.sqrt((landmark[0]-pos[0])**2 + (landmark[1]-pos[1])**2)
|
||||
zs.append(d + randn()*sensor_std_err)
|
||||
|
||||
# move diagonally forward to (x+1, x+1)
|
||||
pf.predict((0.00, 1.414), (.2, .05))
|
||||
pf.update(z=zs)
|
||||
pf.resample()
|
||||
|
||||
mu, var = pf.estimate()
|
||||
xs.append(mu)
|
||||
|
||||
xs = np.array(xs)
|
||||
plt.plot(xs[:, 0], xs[:, 1])
|
||||
plt.show()
|
||||
|
||||
if __name__ == '__main__':
|
||||
show_two_pf_plots()
|
||||
#show_two_pf_plots()
|
||||
test_pf()
|
||||
|
Loading…
Reference in New Issue
Block a user