mirror of
https://github.com/AtsushiSakai/PythonRobotics.git
synced 2026-01-12 21:18:09 -05:00
fix inaccurate covariance calculation to add a new landmark in FastSLAM1,2 (#334)
This commit is contained in:
@@ -16,16 +16,16 @@ Q = np.diag([3.0, np.deg2rad(10.0)]) ** 2
|
||||
R = np.diag([1.0, np.deg2rad(20.0)]) ** 2
|
||||
|
||||
# Simulation parameter
|
||||
Qsim = np.diag([0.3, np.deg2rad(2.0)]) ** 2
|
||||
Rsim = np.diag([0.5, np.deg2rad(10.0)]) ** 2
|
||||
OFFSET_YAWRATE_NOISE = 0.01
|
||||
Q_sim = np.diag([0.3, np.deg2rad(2.0)]) ** 2
|
||||
R_sim = np.diag([0.5, np.deg2rad(10.0)]) ** 2
|
||||
OFFSET_YAW_RATE_NOISE = 0.01
|
||||
|
||||
DT = 0.1 # time tick [s]
|
||||
SIM_TIME = 50.0 # simulation time [s]
|
||||
MAX_RANGE = 20.0 # maximum observation range
|
||||
M_DIST_TH = 2.0 # Threshold of Mahalanobis distance for data association.
|
||||
STATE_SIZE = 3 # State size [x,y,yaw]
|
||||
LM_SIZE = 2 # LM srate size [x,y]
|
||||
LM_SIZE = 2 # LM state size [x,y]
|
||||
N_PARTICLE = 100 # number of particle
|
||||
NTH = N_PARTICLE / 1.5 # Number of particle for re-sampling
|
||||
|
||||
@@ -34,15 +34,15 @@ show_animation = True
|
||||
|
||||
class Particle:
|
||||
|
||||
def __init__(self, N_LM):
|
||||
def __init__(self, n_landmark):
|
||||
self.w = 1.0 / N_PARTICLE
|
||||
self.x = 0.0
|
||||
self.y = 0.0
|
||||
self.yaw = 0.0
|
||||
# landmark x-y positions
|
||||
self.lm = np.zeros((N_LM, LM_SIZE))
|
||||
self.lm = np.zeros((n_landmark, LM_SIZE))
|
||||
# landmark position covariance
|
||||
self.lmP = np.zeros((N_LM * LM_SIZE, LM_SIZE))
|
||||
self.lmP = np.zeros((n_landmark * LM_SIZE, LM_SIZE))
|
||||
|
||||
|
||||
def fast_slam1(particles, u, z):
|
||||
@@ -56,11 +56,11 @@ def fast_slam1(particles, u, z):
|
||||
|
||||
|
||||
def normalize_weight(particles):
|
||||
sumw = sum([p.w for p in particles])
|
||||
sum_w = sum([p.w for p in particles])
|
||||
|
||||
try:
|
||||
for i in range(N_PARTICLE):
|
||||
particles[i].w /= sumw
|
||||
particles[i].w /= sum_w
|
||||
except ZeroDivisionError:
|
||||
for i in range(N_PARTICLE):
|
||||
particles[i].w = 1.0 / N_PARTICLE
|
||||
@@ -101,7 +101,7 @@ def predict_particles(particles, u):
|
||||
return particles
|
||||
|
||||
|
||||
def add_new_lm(particle, z, Q_cov):
|
||||
def add_new_landmark(particle, z, Q_cov):
|
||||
r = z[0]
|
||||
b = z[1]
|
||||
lm_id = int(z[2])
|
||||
@@ -113,10 +113,14 @@ def add_new_lm(particle, z, Q_cov):
|
||||
particle.lm[lm_id, 1] = particle.y + r * s
|
||||
|
||||
# covariance
|
||||
Gz = np.array([[c, -r * s],
|
||||
[s, r * c]])
|
||||
|
||||
particle.lmP[2 * lm_id:2 * lm_id + 2] = Gz @ Q_cov @ Gz.T
|
||||
dx = r * c
|
||||
dy = r * s
|
||||
d2 = dx**2 + dy**2
|
||||
d = math.sqrt(d2)
|
||||
Gz = np.array([[dx / d, dy / d],
|
||||
[-dy / d2, dx / d2]])
|
||||
particle.lmP[2 * lm_id:2 * lm_id + 2] = np.linalg.inv(
|
||||
Gz) @ Q_cov @ np.linalg.inv(Gz.T)
|
||||
|
||||
return particle
|
||||
|
||||
@@ -146,10 +150,10 @@ def update_kf_with_cholesky(xf, Pf, v, Q_cov, Hf):
|
||||
S = Hf @ PHt + Q_cov
|
||||
|
||||
S = (S + S.T) * 0.5
|
||||
SChol = np.linalg.cholesky(S).T
|
||||
SCholInv = np.linalg.inv(SChol)
|
||||
W1 = PHt @ SCholInv
|
||||
W = W1 @ SCholInv.T
|
||||
s_chol = np.linalg.cholesky(S).T
|
||||
s_chol_inv = np.linalg.inv(s_chol)
|
||||
W1 = PHt @ s_chol_inv
|
||||
W = W1 @ s_chol_inv.T
|
||||
|
||||
x = xf + W @ v
|
||||
P = Pf - W1 @ W1.T
|
||||
@@ -187,7 +191,7 @@ def compute_weight(particle, z, Q_cov):
|
||||
try:
|
||||
invS = np.linalg.inv(Sf)
|
||||
except np.linalg.linalg.LinAlgError:
|
||||
print("singuler")
|
||||
print("singular")
|
||||
return 1.0
|
||||
|
||||
num = math.exp(-0.5 * dx.T @ invS @ dx)
|
||||
@@ -201,12 +205,12 @@ def compute_weight(particle, z, Q_cov):
|
||||
def update_with_observation(particles, z):
|
||||
for iz in range(len(z[0, :])):
|
||||
|
||||
lmid = int(z[2, iz])
|
||||
landmark_id = int(z[2, iz])
|
||||
|
||||
for ip in range(N_PARTICLE):
|
||||
# new landmark
|
||||
if abs(particles[ip].lm[lmid, 0]) <= 0.01:
|
||||
particles[ip] = add_new_lm(particles[ip], z[:, iz], Q)
|
||||
if abs(particles[ip].lm[landmark_id, 0]) <= 0.01:
|
||||
particles[ip] = add_new_landmark(particles[ip], z[:, iz], Q)
|
||||
# known landmark
|
||||
else:
|
||||
w = compute_weight(particles[ip], z[:, iz], Q)
|
||||
@@ -229,28 +233,29 @@ def resampling(particles):
|
||||
|
||||
pw = np.array(pw)
|
||||
|
||||
Neff = 1.0 / (pw @ pw.T) # Effective particle number
|
||||
# print(Neff)
|
||||
n_eff = 1.0 / (pw @ pw.T) # Effective particle number
|
||||
# print(n_eff)
|
||||
|
||||
if Neff < NTH: # resampling
|
||||
wcum = np.cumsum(pw)
|
||||
if n_eff < NTH: # resampling
|
||||
w_cum = np.cumsum(pw)
|
||||
base = np.cumsum(pw * 0.0 + 1 / N_PARTICLE) - 1 / N_PARTICLE
|
||||
resampleid = base + np.random.rand(base.shape[0]) / N_PARTICLE
|
||||
resample_id = base + np.random.rand(base.shape[0]) / N_PARTICLE
|
||||
|
||||
inds = []
|
||||
ind = 0
|
||||
for ip in range(N_PARTICLE):
|
||||
while (ind < wcum.shape[0] - 1) and (resampleid[ip] > wcum[ind]):
|
||||
while (ind < w_cum.shape[0] - 1) \
|
||||
and (resample_id[ip] > w_cum[ind]):
|
||||
ind += 1
|
||||
inds.append(ind)
|
||||
|
||||
tparticles = particles[:]
|
||||
tmp_particles = particles[:]
|
||||
for i in range(len(inds)):
|
||||
particles[i].x = tparticles[inds[i]].x
|
||||
particles[i].y = tparticles[inds[i]].y
|
||||
particles[i].yaw = tparticles[inds[i]].yaw
|
||||
particles[i].lm = tparticles[inds[i]].lm[:, :]
|
||||
particles[i].lmP = tparticles[inds[i]].lmP[:, :]
|
||||
particles[i].x = tmp_particles[inds[i]].x
|
||||
particles[i].y = tmp_particles[inds[i]].y
|
||||
particles[i].yaw = tmp_particles[inds[i]].yaw
|
||||
particles[i].lm = tmp_particles[inds[i]].lm[:, :]
|
||||
particles[i].lmP = tmp_particles[inds[i]].lmP[:, :]
|
||||
particles[i].w = 1.0 / N_PARTICLE
|
||||
|
||||
return particles
|
||||
@@ -259,37 +264,39 @@ def resampling(particles):
|
||||
def calc_input(time):
|
||||
if time <= 3.0: # wait at first
|
||||
v = 0.0
|
||||
yawrate = 0.0
|
||||
yaw_rate = 0.0
|
||||
else:
|
||||
v = 1.0 # [m/s]
|
||||
yawrate = 0.1 # [rad/s]
|
||||
yaw_rate = 0.1 # [rad/s]
|
||||
|
||||
u = np.array([v, yawrate]).reshape(2, 1)
|
||||
u = np.array([v, yaw_rate]).reshape(2, 1)
|
||||
|
||||
return u
|
||||
|
||||
|
||||
def observation(xTrue, xd, u, RFID):
|
||||
def observation(xTrue, xd, u, rfid):
|
||||
# calc true state
|
||||
xTrue = motion_model(xTrue, u)
|
||||
|
||||
# add noise to range observation
|
||||
z = np.zeros((3, 0))
|
||||
for i in range(len(RFID[:, 0])):
|
||||
for i in range(len(rfid[:, 0])):
|
||||
|
||||
dx = RFID[i, 0] - xTrue[0, 0]
|
||||
dy = RFID[i, 1] - xTrue[1, 0]
|
||||
dx = rfid[i, 0] - xTrue[0, 0]
|
||||
dy = rfid[i, 1] - xTrue[1, 0]
|
||||
d = math.hypot(dx, dy)
|
||||
angle = pi_2_pi(math.atan2(dy, dx) - xTrue[2, 0])
|
||||
if d <= MAX_RANGE:
|
||||
dn = d + np.random.randn() * Qsim[0, 0] ** 0.5 # add noise
|
||||
anglen = angle + np.random.randn() * Qsim[1, 1] ** 0.5 # add noise
|
||||
zi = np.array([dn, pi_2_pi(anglen), i]).reshape(3, 1)
|
||||
dn = d + np.random.randn() * Q_sim[0, 0] ** 0.5 # add noise
|
||||
angle_with_noize = angle + np.random.randn() * Q_sim[
|
||||
1, 1] ** 0.5 # add noise
|
||||
zi = np.array([dn, pi_2_pi(angle_with_noize), i]).reshape(3, 1)
|
||||
z = np.hstack((z, zi))
|
||||
|
||||
# add noise to input
|
||||
ud1 = u[0, 0] + np.random.randn() * Rsim[0, 0] ** 0.5
|
||||
ud2 = u[1, 0] + np.random.randn() * Rsim[1, 1] ** 0.5 + OFFSET_YAWRATE_NOISE
|
||||
ud1 = u[0, 0] + np.random.randn() * R_sim[0, 0] ** 0.5
|
||||
ud2 = u[1, 0] + np.random.randn() * R_sim[
|
||||
1, 1] ** 0.5 + OFFSET_YAW_RATE_NOISE
|
||||
ud = np.array([ud1, ud2]).reshape(2, 1)
|
||||
|
||||
xd = motion_model(xd, ud)
|
||||
@@ -332,7 +339,7 @@ def main():
|
||||
[-5.0, 5.0],
|
||||
[-10.0, 15.0]
|
||||
])
|
||||
N_LM = RFID.shape[0]
|
||||
n_landmark = RFID.shape[0]
|
||||
|
||||
# State Vector [x y yaw v]'
|
||||
xEst = np.zeros((STATE_SIZE, 1)) # SLAM estimation
|
||||
@@ -344,7 +351,7 @@ def main():
|
||||
hxTrue = xTrue
|
||||
hxDR = xTrue
|
||||
|
||||
particles = [Particle(N_LM) for _ in range(N_PARTICLE)]
|
||||
particles = [Particle(n_landmark) for _ in range(N_PARTICLE)]
|
||||
|
||||
while SIM_TIME >= time:
|
||||
time += DT
|
||||
@@ -366,8 +373,9 @@ def main():
|
||||
if show_animation: # pragma: no cover
|
||||
plt.cla()
|
||||
# for stopping simulation with the esc key.
|
||||
plt.gcf().canvas.mpl_connect('key_release_event',
|
||||
lambda event: [exit(0) if event.key == 'escape' else None])
|
||||
plt.gcf().canvas.mpl_connect(
|
||||
'key_release_event', lambda event:
|
||||
[exit(0) if event.key == 'escape' else None])
|
||||
plt.plot(RFID[:, 0], RFID[:, 1], "*k")
|
||||
|
||||
for i in range(N_PARTICLE):
|
||||
|
||||
@@ -113,10 +113,14 @@ def add_new_lm(particle, z, Q_cov):
|
||||
particle.lm[lm_id, 1] = particle.y + r * s
|
||||
|
||||
# covariance
|
||||
Gz = np.array([[c, -r * s],
|
||||
[s, r * c]])
|
||||
|
||||
particle.lmP[2 * lm_id:2 * lm_id + 2] = Gz @ Q_cov @ Gz.T
|
||||
dx = r * c
|
||||
dy = r * s
|
||||
d2 = dx ** 2 + dy ** 2
|
||||
d = math.sqrt(d2)
|
||||
Gz = np.array([[dx / d, dy / d],
|
||||
[-dy / d2, dx / d2]])
|
||||
particle.lmP[2 * lm_id:2 * lm_id + 2] = np.linalg.inv(
|
||||
Gz) @ Q_cov @ np.linalg.inv(Gz.T)
|
||||
|
||||
return particle
|
||||
|
||||
@@ -224,11 +228,11 @@ def proposal_sampling(particle, z, Q_cov):
|
||||
|
||||
def update_with_observation(particles, z):
|
||||
for iz in range(len(z[0, :])):
|
||||
lmid = int(z[2, iz])
|
||||
landmark_id = int(z[2, iz])
|
||||
|
||||
for ip in range(N_PARTICLE):
|
||||
# new landmark
|
||||
if abs(particles[ip].lm[lmid, 0]) <= 0.01:
|
||||
if abs(particles[ip].lm[landmark_id, 0]) <= 0.01:
|
||||
particles[ip] = add_new_lm(particles[ip], z[:, iz], Q)
|
||||
# known landmark
|
||||
else:
|
||||
@@ -254,27 +258,28 @@ def resampling(particles):
|
||||
|
||||
pw = np.array(pw)
|
||||
|
||||
Neff = 1.0 / (pw @ pw.T) # Effective particle number
|
||||
n_eff = 1.0 / (pw @ pw.T) # Effective particle number
|
||||
|
||||
if Neff < NTH: # resampling
|
||||
wcum = np.cumsum(pw)
|
||||
if n_eff < NTH: # resampling
|
||||
w_cum = np.cumsum(pw)
|
||||
base = np.cumsum(pw * 0.0 + 1 / N_PARTICLE) - 1 / N_PARTICLE
|
||||
resamplei_id = base + np.random.rand(base.shape[0]) / N_PARTICLE
|
||||
resample_id = base + np.random.rand(base.shape[0]) / N_PARTICLE
|
||||
|
||||
inds = []
|
||||
ind = 0
|
||||
for ip in range(N_PARTICLE):
|
||||
while (ind < wcum.shape[0] - 1) and (resamplei_id[ip] > wcum[ind]):
|
||||
while (ind < w_cum.shape[0] - 1) \
|
||||
and (resample_id[ip] > w_cum[ind]):
|
||||
ind += 1
|
||||
inds.append(ind)
|
||||
|
||||
tparticles = particles[:]
|
||||
tmp_particles = particles[:]
|
||||
for i in range(len(inds)):
|
||||
particles[i].x = tparticles[inds[i]].x
|
||||
particles[i].y = tparticles[inds[i]].y
|
||||
particles[i].yaw = tparticles[inds[i]].yaw
|
||||
particles[i].lm = tparticles[inds[i]].lm[:, :]
|
||||
particles[i].lmP = tparticles[inds[i]].lmP[:, :]
|
||||
particles[i].x = tmp_particles[inds[i]].x
|
||||
particles[i].y = tmp_particles[inds[i]].y
|
||||
particles[i].yaw = tmp_particles[inds[i]].yaw
|
||||
particles[i].lm = tmp_particles[inds[i]].lm[:, :]
|
||||
particles[i].lmP = tmp_particles[inds[i]].lmP[:, :]
|
||||
particles[i].w = 1.0 / N_PARTICLE
|
||||
|
||||
return particles
|
||||
@@ -283,12 +288,12 @@ def resampling(particles):
|
||||
def calc_input(time):
|
||||
if time <= 3.0: # wait at first
|
||||
v = 0.0
|
||||
yawrate = 0.0
|
||||
yaw_rate = 0.0
|
||||
else:
|
||||
v = 1.0 # [m/s]
|
||||
yawrate = 0.1 # [rad/s]
|
||||
yaw_rate = 0.1 # [rad/s]
|
||||
|
||||
u = np.array([v, yawrate]).reshape(2, 1)
|
||||
u = np.array([v, yaw_rate]).reshape(2, 1)
|
||||
|
||||
return u
|
||||
|
||||
@@ -308,13 +313,15 @@ def observation(xTrue, xd, u, RFID):
|
||||
angle = pi_2_pi(math.atan2(dy, dx) - xTrue[2, 0])
|
||||
if d <= MAX_RANGE:
|
||||
dn = d + np.random.randn() * Q_sim[0, 0] ** 0.5 # add noise
|
||||
anglen = angle + np.random.randn() * Q_sim[1, 1] ** 0.5 # add noise
|
||||
zi = np.array([dn, pi_2_pi(anglen), i]).reshape(3, 1)
|
||||
angle_noise = np.random.randn() * Q_sim[1, 1] ** 0.5
|
||||
angle_with_noise = angle + angle_noise # add noise
|
||||
zi = np.array([dn, pi_2_pi(angle_with_noise), i]).reshape(3, 1)
|
||||
z = np.hstack((z, zi))
|
||||
|
||||
# add noise to input
|
||||
ud1 = u[0, 0] + np.random.randn() * R_sim[0, 0] ** 0.5
|
||||
ud2 = u[1, 0] + np.random.randn() * R_sim[1, 1] ** 0.5 + OFFSET_YAW_RATE_NOISE
|
||||
ud2 = u[1, 0] + np.random.randn() * R_sim[
|
||||
1, 1] ** 0.5 + OFFSET_YAW_RATE_NOISE
|
||||
ud = np.array([ud1, ud2]).reshape(2, 1)
|
||||
|
||||
xd = motion_model(xd, ud)
|
||||
@@ -357,7 +364,7 @@ def main():
|
||||
[-5.0, 5.0],
|
||||
[-10.0, 15.0]
|
||||
])
|
||||
N_LM = RFID.shape[0]
|
||||
n_landmark = RFID.shape[0]
|
||||
|
||||
# State Vector [x y yaw v]'
|
||||
xEst = np.zeros((STATE_SIZE, 1)) # SLAM estimation
|
||||
@@ -369,7 +376,7 @@ def main():
|
||||
hxTrue = xTrue
|
||||
hxDR = xTrue
|
||||
|
||||
particles = [Particle(N_LM) for _ in range(N_PARTICLE)]
|
||||
particles = [Particle(n_landmark) for _ in range(N_PARTICLE)]
|
||||
|
||||
while SIM_TIME >= time:
|
||||
time += DT
|
||||
@@ -391,14 +398,15 @@ def main():
|
||||
if show_animation: # pragma: no cover
|
||||
plt.cla()
|
||||
# for stopping simulation with the esc key.
|
||||
plt.gcf().canvas.mpl_connect('key_release_event',
|
||||
lambda event: [exit(0) if event.key == 'escape' else None])
|
||||
plt.gcf().canvas.mpl_connect(
|
||||
'key_release_event',
|
||||
lambda event: [exit(0) if event.key == 'escape' else None])
|
||||
plt.plot(RFID[:, 0], RFID[:, 1], "*k")
|
||||
|
||||
for iz in range(len(z[:, 0])):
|
||||
lmid = int(z[2, iz])
|
||||
plt.plot([xEst[0], RFID[lmid, 0]], [
|
||||
xEst[1], RFID[lmid, 1]], "-k")
|
||||
landmark_id = int(z[2, iz])
|
||||
plt.plot([xEst[0], RFID[landmark_id, 0]], [
|
||||
xEst[1], RFID[landmark_id, 1]], "-k")
|
||||
|
||||
for i in range(N_PARTICLE):
|
||||
plt.plot(particles[i].x, particles[i].y, ".r")
|
||||
|
||||
Reference in New Issue
Block a user