This commit is contained in:
Göktuğ Karakaşlı
2019-10-15 21:27:56 +03:00
7 changed files with 206 additions and 245 deletions

View File

@@ -1,4 +1,3 @@
"""
Ensemble Kalman Filter(EnKF) localization sample
@@ -10,13 +9,14 @@ Ref:
"""
import numpy as np
import math
import matplotlib.pyplot as plt
import numpy as np
# Simulation parameter
Qsim = np.diag([0.2, np.deg2rad(1.0)])**2
Rsim = np.diag([1.0, np.deg2rad(30.0)])**2
Q_sim = np.diag([0.2, np.deg2rad(1.0)]) ** 2
R_sim = np.diag([1.0, np.deg2rad(30.0)]) ** 2
DT = 0.1 # time tick [s]
SIM_TIME = 50.0 # simulation time [s]
@@ -30,13 +30,12 @@ show_animation = True
def calc_input():
v = 1.0 # [m/s]
yawrate = 0.1 # [rad/s]
u = np.array([[v, yawrate]]).T
yaw_rate = 0.1 # [rad/s]
u = np.array([[v, yaw_rate]]).T
return u
def observation(xTrue, xd, u, RFID):
xTrue = motion_model(xTrue, u)
z = np.zeros((0, 4))
@@ -45,18 +44,18 @@ def observation(xTrue, xd, u, RFID):
dx = RFID[i, 0] - xTrue[0, 0]
dy = RFID[i, 1] - xTrue[1, 0]
d = math.sqrt(dx**2 + dy**2)
d = math.sqrt(dx ** 2 + dy ** 2)
angle = pi_2_pi(math.atan2(dy, dx) - xTrue[2, 0])
if d <= MAX_RANGE:
dn = d + np.random.randn() * Qsim[0, 0] # add noise
anglen = angle + np.random.randn() * Qsim[1, 1] # add noise
dn = d + np.random.randn() * Q_sim[0, 0] ** 0.5 # add noise
anglen = angle + np.random.randn() * Q_sim[1, 1] ** 0.5 # add noise
zi = np.array([dn, anglen, RFID[i, 0], RFID[i, 1]])
z = np.vstack((z, zi))
# add noise to input
ud = np.array([[
u[0, 0] + np.random.randn() * Rsim[0, 0],
u[1, 0] + np.random.randn() * Rsim[1, 1]]]).T
u[0, 0] + np.random.randn() * R_sim[0, 0] ** 0.5,
u[1, 0] + np.random.randn() * R_sim[1, 1] ** 0.5]]).T
xd = motion_model(xd, ud)
return xTrue, z, xd, ud
@@ -77,15 +76,13 @@ def motion_model(x, u):
return x
def calc_LM_Pos(x, landmarks):
landmarks_pos = np.zeros((2*landmarks.shape[0], 1))
def observe_landmark_position(x, landmarks):
landmarks_pos = np.zeros((2 * landmarks.shape[0], 1))
for (i, lm) in enumerate(landmarks):
landmarks_pos[2*i] = x[0, 0] + lm[0] * \
math.cos(x[2, 0] + lm[1]) + np.random.randn() * \
Qsim[0, 0]/np.sqrt(2)
landmarks_pos[2*i+1] = x[1, 0] + lm[0] * \
math.sin(x[2, 0] + lm[1]) + np.random.randn() * \
Qsim[0, 0]/np.sqrt(2)
landmarks_pos[2 * i] = x[0, 0] + lm[0] * math.cos(x[2, 0] + lm[1]) + np.random.randn() * Q_sim[
0, 0] ** 0.5 / np.sqrt(2)
landmarks_pos[2 * i + 1] = x[1, 0] + lm[0] * math.sin(x[2, 0] + lm[1]) + np.random.randn() * Q_sim[
0, 0] ** 0.5 / np.sqrt(2)
return landmarks_pos
@@ -95,25 +92,26 @@ def calc_covariance(xEst, px):
for i in range(px.shape[1]):
dx = (px[:, i] - xEst)[0:3]
cov += dx.dot(dx.T)
cov /= NP
return cov
def enkf_localization(px, xEst, PEst, z, u):
def enkf_localization(px, z, u):
"""
Localization with Ensemble Kalman filter
"""
pz = np.zeros((z.shape[0]*2, NP)) # Particle store of z
pz = np.zeros((z.shape[0] * 2, NP)) # Particle store of z
for ip in range(NP):
x = np.array([px[:, ip]]).T
# Predict with random input sampling
ud1 = u[0, 0] + np.random.randn() * Rsim[0, 0]
ud2 = u[1, 0] + np.random.randn() * Rsim[1, 1]
ud1 = u[0, 0] + np.random.randn() * R_sim[0, 0] ** 0.5
ud2 = u[1, 0] + np.random.randn() * R_sim[1, 1] ** 0.5
ud = np.array([[ud1, ud2]]).T
x = motion_model(x, ud)
px[:, ip] = x[:, 0]
z_pos = calc_LM_Pos(x, z)
z_pos = observe_landmark_position(x, z)
pz[:, ip] = z_pos[:, 0]
x_ave = np.mean(px, axis=1)
@@ -122,12 +120,12 @@ def enkf_localization(px, xEst, PEst, z, u):
z_ave = np.mean(pz, axis=1)
z_dif = pz - np.tile(z_ave, (NP, 1)).T
U = 1/(NP-1) * x_dif @ z_dif.T
V = 1/(NP-1) * z_dif @ z_dif.T
U = 1 / (NP - 1) * x_dif @ z_dif.T
V = 1 / (NP - 1) * z_dif @ z_dif.T
K = U @ np.linalg.inv(V) # Kalman Gain
z_lm_pos = z[:, [2, 3]].reshape(-1,)
z_lm_pos = z[:, [2, 3]].reshape(-1, )
px_hat = px + K @ (np.tile(z_lm_pos, (NP, 1)).T - pz)
@@ -139,32 +137,32 @@ def enkf_localization(px, xEst, PEst, z, u):
def plot_covariance_ellipse(xEst, PEst): # pragma: no cover
Pxy = PEst[0:2, 0:2]
eigval, eigvec = np.linalg.eig(Pxy)
eig_val, eig_vec = np.linalg.eig(Pxy)
if eigval[0] >= eigval[1]:
bigind = 0
smallind = 1
if eig_val[0] >= eig_val[1]:
big_ind = 0
small_ind = 1
else:
bigind = 1
smallind = 0
big_ind = 1
small_ind = 0
t = np.arange(0, 2 * math.pi + 0.1, 0.1)
# eigval[bigind] or eiqval[smallind] were occassionally negative numbers extremely
# eig_val[big_ind] or eiq_val[small_ind] were occasionally negative numbers extremely
# close to 0 (~10^-20), catch these cases and set the respective variable to 0
try:
a = math.sqrt(eigval[bigind])
a = math.sqrt(eig_val[big_ind])
except ValueError:
a = 0
try:
b = math.sqrt(eigval[smallind])
b = math.sqrt(eig_val[small_ind])
except ValueError:
b = 0
x = [a * math.cos(it) for it in t]
y = [b * math.sin(it) for it in t]
angle = math.atan2(eigvec[bigind, 1], eigvec[bigind, 0])
angle = math.atan2(eig_vec[big_ind, 1], eig_vec[big_ind, 0])
R = np.array([[math.cos(angle), math.sin(angle)],
[-math.sin(angle), math.cos(angle)]])
fx = R.dot(np.array([[x, y]]))
@@ -182,17 +180,15 @@ def main():
time = 0.0
# RFID positions [x, y]
RFID = np.array([[10.0, 0.0],
[10.0, 10.0],
[0.0, 15.0],
[-5.0, 20.0]])
# RF_ID positions [x, y]
RF_ID = np.array([[10.0, 0.0],
[10.0, 10.0],
[0.0, 15.0],
[-5.0, 20.0]])
# State Vector [x y yaw v]'
xEst = np.zeros((4, 1))
xTrue = np.zeros((4, 1))
PEst = np.eye(4)
px = np.zeros((4, NP)) # Particle store of x
xDR = np.zeros((4, 1)) # Dead reckoning
@@ -206,9 +202,9 @@ def main():
time += DT
u = calc_input()
xTrue, z, xDR, ud = observation(xTrue, xDR, u, RFID)
xTrue, z, xDR, ud = observation(xTrue, xDR, u, RF_ID)
xEst, PEst, px = enkf_localization(px, xEst, PEst, z, ud)
xEst, PEst, px = enkf_localization(px, z, ud)
# store data history
hxEst = np.hstack((hxEst, xEst))
@@ -220,7 +216,7 @@ def main():
for i in range(len(z[:, 0])):
plt.plot([xTrue[0, 0], z[i, 2]], [xTrue[1, 0], z[i, 3]], "-k")
plt.plot(RFID[:, 0], RFID[:, 1], "*k")
plt.plot(RF_ID[:, 0], RF_ID[:, 1], "*k")
plt.plot(px[0, :], px[1, :], ".r")
plt.plot(np.array(hxTrue[0, :]).flatten(),
np.array(hxTrue[1, :]).flatten(), "-b")
@@ -228,7 +224,7 @@ def main():
np.array(hxDR[1, :]).flatten(), "-k")
plt.plot(np.array(hxEst[0, :]).flatten(),
np.array(hxEst[1, :]).flatten(), "-r")
#plot_covariance_ellipse(xEst, PEst)
# plot_covariance_ellipse(xEst, PEst)
plt.axis("equal")
plt.grid(True)
plt.pause(0.001)
@@ -236,4 +232,3 @@ def main():
if __name__ == '__main__':
main()

View File

@@ -7,21 +7,22 @@ author: Atsushi Sakai (@Atsushi_twi)
"""
import math
import numpy as np
import matplotlib.pyplot as plt
import numpy as np
# Covariance for EKF simulation
Q = np.diag([
0.1, # variance of location on x-axis
0.1, # variance of location on y-axis
np.deg2rad(1.0), # variance of yaw angle
1.0 # variance of velocity
])**2 # predict state covariance
R = np.diag([1.0, 1.0])**2 # Observation x,y position covariance
0.1, # variance of location on x-axis
0.1, # variance of location on y-axis
np.deg2rad(1.0), # variance of yaw angle
1.0 # variance of velocity
]) ** 2 # predict state covariance
R = np.diag([1.0, 1.0]) ** 2 # Observation x,y position covariance
# Simulation parameter
INPUT_NOISE = np.diag([1.0, np.deg2rad(30.0)])**2
GPS_NOISE = np.diag([0.5, 0.5])**2
INPUT_NOISE = np.diag([1.0, np.deg2rad(30.0)]) ** 2
GPS_NOISE = np.diag([0.5, 0.5]) ** 2
DT = 0.1 # time tick [s]
SIM_TIME = 50.0 # simulation time [s]
@@ -37,7 +38,6 @@ def calc_input():
def observation(xTrue, xd, u):
xTrue = motion_model(xTrue, u)
# add noise to gps x-y
@@ -52,7 +52,6 @@ def observation(xTrue, xd, u):
def motion_model(x, u):
F = np.array([[1.0, 0, 0, 0],
[0, 1.0, 0, 0],
[0, 0, 1.0, 0],
@@ -79,7 +78,7 @@ def observation_model(x):
return z
def jacobF(x, u):
def jacob_f(x, u):
"""
Jacobian of Motion Model
@@ -105,7 +104,7 @@ def jacobF(x, u):
return jF
def jacobH(x):
def jacob_h():
# Jacobian of Observation Model
jH = np.array([
[1, 0, 0, 0],
@@ -116,20 +115,19 @@ def jacobH(x):
def ekf_estimation(xEst, PEst, z, u):
# Predict
xPred = motion_model(xEst, u)
jF = jacobF(xPred, u)
PPred = jF@PEst@jF.T + Q
jF = jacob_f(xPred, u)
PPred = jF @ PEst @ jF.T + Q
# Update
jH = jacobH(xPred)
jH = jacob_h()
zPred = observation_model(xPred)
y = z - zPred
S = jH@PPred@jH.T + R
K = PPred@jH.T@np.linalg.inv(S)
xEst = xPred + K@y
PEst = (np.eye(len(xEst)) - K@jH)@PPred
S = jH @ PPred @ jH.T + R
K = PPred @ jH.T @ np.linalg.inv(S)
xEst = xPred + K @ y
PEst = (np.eye(len(xEst)) - K @ jH) @ PPred
return xEst, PEst
@@ -151,9 +149,9 @@ def plot_covariance_ellipse(xEst, PEst): # pragma: no cover
x = [a * math.cos(it) for it in t]
y = [b * math.sin(it) for it in t]
angle = math.atan2(eigvec[bigind, 1], eigvec[bigind, 0])
R = np.array([[math.cos(angle), math.sin(angle)],
[-math.sin(angle), math.cos(angle)]])
fx = R@(np.array([x, y]))
rot = np.array([[math.cos(angle), math.sin(angle)],
[-math.sin(angle), math.cos(angle)]])
fx = rot @ (np.array([x, y]))
px = np.array(fx[0, :] + xEst[0, 0]).flatten()
py = np.array(fx[1, :] + xEst[1, 0]).flatten()
plt.plot(px, py, "--r")

View File

@@ -6,23 +6,24 @@ author: Atsushi Sakai (@Atsushi_twi)
"""
import math
import matplotlib.pyplot as plt
import numpy as np
import scipy.linalg
import math
import matplotlib.pyplot as plt
# Covariance for UKF simulation
Q = np.diag([
0.1, # variance of location on x-axis
0.1, # variance of location on y-axis
np.deg2rad(1.0), # variance of yaw angle
1.0 # variance of velocity
])**2 # predict state covariance
R = np.diag([1.0, 1.0])**2 # Observation x,y position covariance
0.1, # variance of location on x-axis
0.1, # variance of location on y-axis
np.deg2rad(1.0), # variance of yaw angle
1.0 # variance of velocity
]) ** 2 # predict state covariance
R = np.diag([1.0, 1.0]) ** 2 # Observation x,y position covariance
# Simulation parameter
INPUT_NOISE = np.diag([1.0, np.deg2rad(30.0)])**2
GPS_NOISE = np.diag([0.5, 0.5])**2
INPUT_NOISE = np.diag([1.0, np.deg2rad(30.0)]) ** 2
GPS_NOISE = np.diag([0.5, 0.5]) ** 2
DT = 0.1 # time tick [s]
SIM_TIME = 50.0 # simulation time [s]
@@ -43,7 +44,6 @@ def calc_input():
def observation(xTrue, xd, u):
xTrue = motion_model(xTrue, u)
# add noise to gps x-y
@@ -58,7 +58,6 @@ def observation(xTrue, xd, u):
def motion_model(x, u):
F = np.array([[1.0, 0, 0, 0],
[0, 1.0, 0, 0],
[0, 0, 1.0, 0],
@@ -144,7 +143,6 @@ def calc_pxz(sigma, x, z_sigma, zb, wc):
def ukf_estimation(xEst, PEst, z, u, wm, wc, gamma):
# Predict
sigma = generate_sigma_points(xEst, PEst, gamma)
sigma = predict_sigma_motion(sigma, u)
@@ -183,9 +181,9 @@ def plot_covariance_ellipse(xEst, PEst): # pragma: no cover
x = [a * math.cos(it) for it in t]
y = [b * math.sin(it) for it in t]
angle = math.atan2(eigvec[bigind, 1], eigvec[bigind, 0])
R = np.array([[math.cos(angle), math.sin(angle)],
[-math.sin(angle), math.cos(angle)]])
fx = R @ np.array([x, y])
rot = np.array([[math.cos(angle), math.sin(angle)],
[-math.sin(angle), math.cos(angle)]])
fx = rot @ np.array([x, y])
px = np.array(fx[0, :] + xEst[0, 0]).flatten()
py = np.array(fx[1, :] + xEst[1, 0]).flatten()
plt.plot(px, py, "--r")

View File

@@ -7,26 +7,26 @@ author: Atsushi Sakai (@Atsushi_twi)
"""
import math
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.pyplot as plt
import numpy as np
show_animation = True
def dwa_control(x, u, config, goal, ob):
def dwa_control(x, config, goal, ob):
"""
Dynamic Window Approach control
"""
dw = calc_dynamic_window(x, config)
u, traj = calc_final_input(x, u, dw, config, goal, ob)
u, trajectory = calc_final_input(x, dw, config, goal, ob)
return u, traj
return u, trajectory
class Config():
class Config:
"""
simulation parameter class
"""
@@ -48,7 +48,6 @@ class Config():
self.robot_radius = 1.0 # [m] for collision check
def motion(x, u, dt):
"""
motion model
@@ -101,27 +100,26 @@ def predict_trajectory(x_init, v, y, config):
return traj
def calc_final_input(x, u, dw, config, goal, ob):
def calc_final_input(x, dw, config, goal, ob):
"""
calculation final input with dinamic window
calculation final input with dynamic window
"""
x_init = x[:]
min_cost = float("inf")
best_u = [0.0, 0.0]
best_traj = np.array([x])
best_trajectory = np.array([x])
# evalucate all trajectory with sampled input in dynamic window
# evaluate all trajectory with sampled input in dynamic window
for v in np.arange(dw[0], dw[1], config.v_reso):
for y in np.arange(dw[2], dw[3], config.yawrate_reso):
traj = predict_trajectory(x_init, v, y, config)
trajectory = predict_trajectory(x_init, v, y, config)
# calc cost
to_goal_cost = config.to_goal_cost_gain * calc_to_goal_cost(traj, goal, config)
speed_cost = config.speed_cost_gain * \
(config.max_speed - traj[-1, 3])
ob_cost = config.obstacle_cost_gain*calc_obstacle_cost(traj, ob, config)
to_goal_cost = config.to_goal_cost_gain * calc_to_goal_cost(trajectory, goal)
speed_cost = config.speed_cost_gain * (config.max_speed - trajectory[-1, 3])
ob_cost = config.obstacle_cost_gain * calc_obstacle_cost(trajectory, ob, config)
final_cost = to_goal_cost + speed_cost + ob_cost
@@ -129,37 +127,34 @@ def calc_final_input(x, u, dw, config, goal, ob):
if min_cost >= final_cost:
min_cost = final_cost
best_u = [v, y]
best_traj = traj
best_trajectory = trajectory
return best_u, best_traj
return best_u, best_trajectory
def calc_obstacle_cost(traj, ob, config):
def calc_obstacle_cost(trajectory, ob, config):
"""
calc obstacle cost inf: collision
"""
ox = ob[:, 0]
oy = ob[:, 1]
dx = traj[:, 0] - ox[:, None]
dy = traj[:, 1] - oy[:, None]
dx = trajectory[:, 0] - ox[:, None]
dy = trajectory[:, 1] - oy[:, None]
r = np.sqrt(np.square(dx) + np.square(dy))
if (r <= config.robot_radius).any():
return float("Inf")
minr = np.min(r)
min_r = np.min(r)
return 1.0 / min_r # OK
return 1.0 / minr # OK
def calc_to_goal_cost(traj, goal, config):
def calc_to_goal_cost(trajectory, goal):
"""
calc to goal cost with angle difference
"""
dx = goal[0] - traj[-1, 0]
dy = goal[1] - traj[-1, 1]
dx = goal[0] - trajectory[-1, 0]
dy = goal[1] - trajectory[-1, 1]
error_angle = math.atan2(dy, dx)
cost_angle = error_angle - traj[-1, 2]
cost_angle = error_angle - trajectory[-1, 2]
cost = abs(math.atan2(math.sin(cost_angle), math.cos(cost_angle)))
return cost
@@ -171,7 +166,7 @@ def plot_arrow(x, y, yaw, length=0.5, width=0.1): # pragma: no cover
plt.plot(x, y)
def main(gx=10, gy=10):
def main(gx=10.0, gy=10.0):
print(__file__ + " start!!")
# initial state [x(m), y(m), yaw(rad), v(m/s), omega(rad/s)]
x = np.array([0.0, 0.0, math.pi / 8.0, 0.0, 0.0])
@@ -187,23 +182,27 @@ def main(gx=10, gy=10):
[5.0, 9.0],
[8.0, 9.0],
[7.0, 9.0],
[12.0, 12.0]
[8.0, 10.0],
[9.0, 11.0],
[12.0, 13.0],
[12.0, 12.0],
[15.0, 15.0],
[13.0, 13.0]
])
# input [forward speed, yawrate]
u = np.array([0.0, 0.0])
config = Config()
traj = np.array(x)
trajectory = np.array(x)
while True:
u, ptraj = dwa_control(x, u, config, goal, ob)
x = motion(x, u, config.dt) # simulate robot
traj = np.vstack((traj, x)) # store state history
u, predicted_trajectory = dwa_control(x, config, goal, ob)
x = motion(x, u, config.dt) # simulate robot
trajectory = np.vstack((trajectory, x)) # store state history
if show_animation:
plt.cla()
plt.plot(ptraj[:, 0], ptraj[:, 1], "-g")
plt.plot(predicted_trajectory[:, 0], predicted_trajectory[:, 1], "-g")
plt.plot(x[0], x[1], "xr")
plt.plot(goal[0], goal[1], "xb")
plt.plot(ob[:, 0], ob[:, 1], "ok")
@@ -213,14 +212,14 @@ def main(gx=10, gy=10):
plt.pause(0.0001)
# check reaching goal
dist_to_goal = math.sqrt((x[0] - goal[0])**2 + (x[1] - goal[1])**2)
dist_to_goal = math.sqrt((x[0] - goal[0]) ** 2 + (x[1] - goal[1]) ** 2)
if dist_to_goal <= config.robot_radius:
print("Goal!!")
break
print("Done")
if show_animation:
plt.plot(traj[:, 0], traj[:, 1], "-r")
plt.plot(trajectory[:, 0], trajectory[:, 1], "-r")
plt.pause(0.0001)
plt.show()

View File

@@ -4,16 +4,16 @@ author: Atsushi Sakai (@Atsushi_twi)
"""
import math
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.pyplot as plt
import numpy as np
# EKF state covariance
Cx = np.diag([0.5, 0.5, np.deg2rad(30.0)])**2
Cx = np.diag([0.5, 0.5, np.deg2rad(30.0)]) ** 2
# Simulation parameter
Qsim = np.diag([0.2, np.deg2rad(1.0)])**2
Rsim = np.diag([1.0, np.deg2rad(10.0)])**2
Q_sim = np.diag([0.2, np.deg2rad(1.0)]) ** 2
R_sim = np.diag([1.0, np.deg2rad(10.0)]) ** 2
DT = 0.1 # time tick [s]
SIM_TIME = 50.0 # simulation time [s]
@@ -26,7 +26,6 @@ show_animation = True
def ekf_slam(xEst, PEst, u, z):
# Predict
S = STATE_SIZE
xEst[0:S] = motion_model(xEst[0:S], u)
@@ -36,18 +35,18 @@ def ekf_slam(xEst, PEst, u, z):
# Update
for iz in range(len(z[:, 0])): # for each observation
minid = search_correspond_LM_ID(xEst, PEst, z[iz, 0:2])
minid = search_correspond_landmark_id(xEst, PEst, z[iz, 0:2])
nLM = calc_n_LM(xEst)
nLM = calc_n_lm(xEst)
if minid == nLM:
print("New LM")
# Extend state and covariance matrix
xAug = np.vstack((xEst, calc_LM_Pos(xEst, z[iz, :])))
xAug = np.vstack((xEst, calc_landmark_position(xEst, z[iz, :])))
PAug = np.vstack((np.hstack((PEst, np.zeros((len(xEst), LM_SIZE)))),
np.hstack((np.zeros((LM_SIZE, len(xEst))), initP))))
xEst = xAug
PEst = PAug
lm = get_LM_Pos_from_state(xEst, minid)
lm = get_landmark_position_from_state(xEst, minid)
y, S, H = calc_innovation(lm, xEst, PEst, z[iz, 0:2], minid)
K = (PEst @ H.T) @ np.linalg.inv(S)
@@ -67,7 +66,6 @@ def calc_input():
def observation(xTrue, xd, u, RFID):
xTrue = motion_model(xTrue, u)
# add noise to gps x-y
@@ -77,25 +75,24 @@ def observation(xTrue, xd, u, RFID):
dx = RFID[i, 0] - xTrue[0, 0]
dy = RFID[i, 1] - xTrue[1, 0]
d = math.sqrt(dx**2 + dy**2)
d = math.sqrt(dx ** 2 + dy ** 2)
angle = pi_2_pi(math.atan2(dy, dx) - xTrue[2, 0])
if d <= MAX_RANGE:
dn = d + np.random.randn() * Qsim[0, 0] # add noise
anglen = angle + np.random.randn() * Qsim[1, 1] # add noise
dn = d + np.random.randn() * Q_sim[0, 0] ** 0.5 # add noise
anglen = angle + np.random.randn() * Q_sim[1, 1] ** 0.5 # add noise
zi = np.array([dn, anglen, i])
z = np.vstack((z, zi))
# add noise to input
ud = np.array([[
u[0, 0] + np.random.randn() * Rsim[0, 0],
u[1, 0] + np.random.randn() * Rsim[1, 1]]]).T
u[0, 0] + np.random.randn() * R_sim[0, 0] ** 0.5,
u[1, 0] + np.random.randn() * R_sim[1, 1] ** 0.5]]).T
xd = motion_model(xd, ud)
return xTrue, z, xd, ud
def motion_model(x, u):
F = np.array([[1.0, 0, 0],
[0, 1.0, 0],
[0, 0, 1.0]])
@@ -108,15 +105,14 @@ def motion_model(x, u):
return x
def calc_n_LM(x):
def calc_n_lm(x):
n = int((len(x) - STATE_SIZE) / LM_SIZE)
return n
def jacob_motion(x, u):
Fx = np.hstack((np.eye(STATE_SIZE), np.zeros(
(STATE_SIZE, LM_SIZE * calc_n_LM(x)))))
(STATE_SIZE, LM_SIZE * calc_n_lm(x)))))
jF = np.array([[0.0, 0.0, -DT * u[0] * math.sin(x[2, 0])],
[0.0, 0.0, DT * u[0] * math.cos(x[2, 0])],
@@ -127,35 +123,34 @@ def jacob_motion(x, u):
return G, Fx,
def calc_LM_Pos(x, z):
def calc_landmark_position(x, z):
zp = np.zeros((2, 1))
zp[0, 0] = x[0, 0] + z[0] * math.cos(x[2, 0] + z[1])
zp[1, 0] = x[1, 0] + z[0] * math.sin(x[2, 0] + z[1])
#zp[0, 0] = x[0, 0] + z[0, 0] * math.cos(x[2, 0] + z[0, 1])
#zp[1, 0] = x[1, 0] + z[0, 0] * math.sin(x[2, 0] + z[0, 1])
# zp[0, 0] = x[0, 0] + z[0, 0] * math.cos(x[2, 0] + z[0, 1])
# zp[1, 0] = x[1, 0] + z[0, 0] * math.sin(x[2, 0] + z[0, 1])
return zp
def get_LM_Pos_from_state(x, ind):
def get_landmark_position_from_state(x, ind):
lm = x[STATE_SIZE + LM_SIZE * ind: STATE_SIZE + LM_SIZE * (ind + 1), :]
return lm
def search_correspond_LM_ID(xAug, PAug, zi):
def search_correspond_landmark_id(xAug, PAug, zi):
"""
Landmark association with Mahalanobis distance
"""
nLM = calc_n_LM(xAug)
nLM = calc_n_lm(xAug)
mdist = []
for i in range(nLM):
lm = get_LM_Pos_from_state(xAug, i)
lm = get_landmark_position_from_state(xAug, i)
y, S, H = calc_innovation(lm, xAug, PAug, zi, i)
mdist.append(y.T @ np.linalg.inv(S) @ y)
@@ -173,19 +168,19 @@ def calc_innovation(lm, xEst, PEst, z, LMid):
zp = np.array([[math.sqrt(q), pi_2_pi(zangle)]])
y = (z - zp).T
y[1] = pi_2_pi(y[1])
H = jacobH(q, delta, xEst, LMid + 1)
H = jacob_h(q, delta, xEst, LMid + 1)
S = H @ PEst @ H.T + Cx[0:2, 0:2]
return y, S, H
def jacobH(q, delta, x, i):
def jacob_h(q, delta, x, i):
sq = math.sqrt(q)
G = np.array([[-sq * delta[0, 0], - sq * delta[1, 0], 0, sq * delta[0, 0], sq * delta[1, 0]],
[delta[1, 0], - delta[0, 0], - 1.0, - delta[1, 0], delta[0, 0]]])
G = G / q
nLM = calc_n_LM(x)
nLM = calc_n_lm(x)
F1 = np.hstack((np.eye(3), np.zeros((3, 2 * nLM))))
F2 = np.hstack((np.zeros((2, 3)), np.zeros((2, 2 * (i - 1))),
np.eye(2), np.zeros((2, 2 * nLM - 2 * i))))
@@ -246,7 +241,7 @@ def main():
plt.plot(xEst[0], xEst[1], ".r")
# plot landmark
for i in range(calc_n_LM(xEst)):
for i in range(calc_n_lm(xEst)):
plt.plot(xEst[STATE_SIZE + i * 2],
xEst[STATE_SIZE + i * 2 + 1], "xg")
@@ -262,4 +257,4 @@ def main():
if __name__ == '__main__':
main()
main()

View File

@@ -6,18 +6,18 @@ author: Atsushi Sakai (@Atsushi_twi)
"""
import numpy as np
import math
import matplotlib.pyplot as plt
import matplotlib.pyplot as plt
import numpy as np
# Fast SLAM covariance
Q = np.diag([3.0, np.deg2rad(10.0)])**2
R = np.diag([1.0, np.deg2rad(20.0)])**2
Q = np.diag([3.0, np.deg2rad(10.0)]) ** 2
R = np.diag([1.0, np.deg2rad(20.0)]) ** 2
# Simulation parameter
Qsim = np.diag([0.3, np.deg2rad(2.0)])**2
Rsim = np.diag([0.5, np.deg2rad(10.0)])**2
Qsim = np.diag([0.3, np.deg2rad(2.0)]) ** 2
Rsim = np.diag([0.5, np.deg2rad(10.0)]) ** 2
OFFSET_YAWRATE_NOISE = 0.01
DT = 0.1 # time tick [s]
@@ -46,7 +46,6 @@ class Particle:
def fast_slam1(particles, u, z):
particles = predict_particles(particles, u)
particles = update_with_observation(particles, z)
@@ -57,7 +56,6 @@ def fast_slam1(particles, u, z):
def normalize_weight(particles):
sumw = sum([p.w for p in particles])
try:
@@ -73,7 +71,6 @@ def normalize_weight(particles):
def calc_final_state(particles):
xEst = np.zeros((STATE_SIZE, 1))
particles = normalize_weight(particles)
@@ -90,13 +87,12 @@ def calc_final_state(particles):
def predict_particles(particles, u):
for i in range(N_PARTICLE):
px = np.zeros((STATE_SIZE, 1))
px[0, 0] = particles[i].x
px[1, 0] = particles[i].y
px[2, 0] = particles[i].yaw
ud = u + (np.random.randn(1, 2) @ R).T # add noise
ud = u + (np.random.randn(1, 2) @ R ** 0.5).T # add noise
px = motion_model(px, ud)
particles[i].x = px[0, 0]
particles[i].y = px[1, 0]
@@ -105,8 +101,7 @@ def predict_particles(particles, u):
return particles
def add_new_lm(particle, z, Q):
def add_new_lm(particle, z, Q_cov):
r = z[0]
b = z[1]
lm_id = int(z[2])
@@ -121,15 +116,15 @@ def add_new_lm(particle, z, Q):
Gz = np.array([[c, -r * s],
[s, r * c]])
particle.lmP[2 * lm_id:2 * lm_id + 2] = Gz @ Q @ Gz.T
particle.lmP[2 * lm_id:2 * lm_id + 2] = Gz @ Q_cov @ Gz.T
return particle
def compute_jacobians(particle, xf, Pf, Q):
def compute_jacobians(particle, xf, Pf, Q_cov):
dx = xf[0, 0] - particle.x
dy = xf[1, 0] - particle.y
d2 = dx**2 + dy**2
d2 = dx ** 2 + dy ** 2
d = math.sqrt(d2)
zp = np.array(
@@ -141,14 +136,14 @@ def compute_jacobians(particle, xf, Pf, Q):
Hf = np.array([[dx / d, dy / d],
[-dy / d2, dx / d2]])
Sf = Hf @ Pf @ Hf.T + Q
Sf = Hf @ Pf @ Hf.T + Q_cov
return zp, Hv, Hf, Sf
def update_KF_with_cholesky(xf, Pf, v, Q, Hf):
def update_kf_with_cholesky(xf, Pf, v, Q_cov, Hf):
PHt = Pf @ Hf.T
S = Hf @ PHt + Q
S = Hf @ PHt + Q_cov
S = (S + S.T) * 0.5
SChol = np.linalg.cholesky(S).T
@@ -162,8 +157,7 @@ def update_KF_with_cholesky(xf, Pf, v, Q, Hf):
return x, P
def update_landmark(particle, z, Q):
def update_landmark(particle, z, Q_cov):
lm_id = int(z[2])
xf = np.array(particle.lm[lm_id, :]).reshape(2, 1)
Pf = np.array(particle.lmP[2 * lm_id:2 * lm_id + 2, :])
@@ -173,7 +167,7 @@ def update_landmark(particle, z, Q):
dz = z[0:2].reshape(2, 1) - zp
dz[1, 0] = pi_2_pi(dz[1, 0])
xf, Pf = update_KF_with_cholesky(xf, Pf, dz, Q, Hf)
xf, Pf = update_kf_with_cholesky(xf, Pf, dz, Q_cov, Hf)
particle.lm[lm_id, :] = xf.T
particle.lmP[2 * lm_id:2 * lm_id + 2, :] = Pf
@@ -181,11 +175,11 @@ def update_landmark(particle, z, Q):
return particle
def compute_weight(particle, z, Q):
def compute_weight(particle, z, Q_cov):
lm_id = int(z[2])
xf = np.array(particle.lm[lm_id, :]).reshape(2, 1)
Pf = np.array(particle.lmP[2 * lm_id:2 * lm_id + 2])
zp, Hv, Hf, Sf = compute_jacobians(particle, xf, Pf, Q)
zp, Hv, Hf, Sf = compute_jacobians(particle, xf, Pf, Q_cov)
dx = z[0:2].reshape(2, 1) - zp
dx[1, 0] = pi_2_pi(dx[1, 0])
@@ -205,7 +199,6 @@ def compute_weight(particle, z, Q):
def update_with_observation(particles, z):
for iz in range(len(z[0, :])):
lmid = int(z[2, iz])
@@ -247,7 +240,7 @@ def resampling(particles):
inds = []
ind = 0
for ip in range(N_PARTICLE):
while ((ind < wcum.shape[0] - 1) and (resampleid[ip] > wcum[ind])):
while (ind < wcum.shape[0] - 1) and (resampleid[ip] > wcum[ind]):
ind += 1
inds.append(ind)
@@ -264,7 +257,6 @@ def resampling(particles):
def calc_input(time):
if time <= 3.0: # wait at first
v = 0.0
yawrate = 0.0
@@ -278,7 +270,6 @@ def calc_input(time):
def observation(xTrue, xd, u, RFID):
# calc true state
xTrue = motion_model(xTrue, u)
@@ -288,17 +279,17 @@ def observation(xTrue, xd, u, RFID):
dx = RFID[i, 0] - xTrue[0, 0]
dy = RFID[i, 1] - xTrue[1, 0]
d = math.sqrt(dx**2 + dy**2)
d = math.sqrt(dx ** 2 + dy ** 2)
angle = pi_2_pi(math.atan2(dy, dx) - xTrue[2, 0])
if d <= MAX_RANGE:
dn = d + np.random.randn() * Qsim[0, 0] # add noise
anglen = angle + np.random.randn() * Qsim[1, 1] # add noise
dn = d + np.random.randn() * Qsim[0, 0] ** 0.5 # add noise
anglen = angle + np.random.randn() * Qsim[1, 1] ** 0.5 # add noise
zi = np.array([dn, pi_2_pi(anglen), i]).reshape(3, 1)
z = np.hstack((z, zi))
# add noise to input
ud1 = u[0, 0] + np.random.randn() * Rsim[0, 0]
ud2 = u[1, 0] + np.random.randn() * Rsim[1, 1] + OFFSET_YAWRATE_NOISE
ud1 = u[0, 0] + np.random.randn() * Rsim[0, 0] ** 0.5
ud2 = u[1, 0] + np.random.randn() * Rsim[1, 1] ** 0.5 + OFFSET_YAWRATE_NOISE
ud = np.array([ud1, ud2]).reshape(2, 1)
xd = motion_model(xd, ud)
@@ -307,7 +298,6 @@ def observation(xTrue, xd, u, RFID):
def motion_model(x, u):
F = np.array([[1.0, 0, 0],
[0, 1.0, 0],
[0, 0, 1.0]])
@@ -354,7 +344,7 @@ def main():
hxTrue = xTrue
hxDR = xTrue
particles = [Particle(N_LM) for i in range(N_PARTICLE)]
particles = [Particle(N_LM) for _ in range(N_PARTICLE)]
while SIM_TIME >= time:
time += DT

View File

@@ -6,19 +6,19 @@ author: Atsushi Sakai (@Atsushi_twi)
"""
import numpy as np
import math
import matplotlib.pyplot as plt
import matplotlib.pyplot as plt
import numpy as np
# Fast SLAM covariance
Q = np.diag([3.0, np.deg2rad(10.0)])**2
R = np.diag([1.0, np.deg2rad(20.0)])**2
Q = np.diag([3.0, np.deg2rad(10.0)]) ** 2
R = np.diag([1.0, np.deg2rad(20.0)]) ** 2
# Simulation parameter
Qsim = np.diag([0.3, np.deg2rad(2.0)])**2
Rsim = np.diag([0.5, np.deg2rad(10.0)])**2
OFFSET_YAWRATE_NOISE = 0.01
Q_sim = np.diag([0.3, np.deg2rad(2.0)]) ** 2
R_sim = np.diag([0.5, np.deg2rad(10.0)]) ** 2
OFFSET_YAW_RATE_NOISE = 0.01
DT = 0.1 # time tick [s]
SIM_TIME = 50.0 # simulation time [s]
@@ -47,7 +47,6 @@ class Particle:
def fast_slam2(particles, u, z):
particles = predict_particles(particles, u)
particles = update_with_observation(particles, z)
@@ -58,12 +57,11 @@ def fast_slam2(particles, u, z):
def normalize_weight(particles):
sumw = sum([p.w for p in particles])
sum_w = sum([p.w for p in particles])
try:
for i in range(N_PARTICLE):
particles[i].w /= sumw
particles[i].w /= sum_w
except ZeroDivisionError:
for i in range(N_PARTICLE):
particles[i].w = 1.0 / N_PARTICLE
@@ -74,7 +72,6 @@ def normalize_weight(particles):
def calc_final_state(particles):
xEst = np.zeros((STATE_SIZE, 1))
particles = normalize_weight(particles)
@@ -85,19 +82,17 @@ def calc_final_state(particles):
xEst[2, 0] += particles[i].w * particles[i].yaw
xEst[2, 0] = pi_2_pi(xEst[2, 0])
# print(xEst)
return xEst
def predict_particles(particles, u):
for i in range(N_PARTICLE):
px = np.zeros((STATE_SIZE, 1))
px[0, 0] = particles[i].x
px[1, 0] = particles[i].y
px[2, 0] = particles[i].yaw
ud = u + (np.random.randn(1, 2) @ R).T # add noise
ud = u + (np.random.randn(1, 2) @ R ** 0.5).T # add noise
px = motion_model(px, ud)
particles[i].x = px[0, 0]
particles[i].y = px[1, 0]
@@ -106,8 +101,7 @@ def predict_particles(particles, u):
return particles
def add_new_lm(particle, z, Q):
def add_new_lm(particle, z, Q_cov):
r = z[0]
b = z[1]
lm_id = int(z[2])
@@ -122,15 +116,15 @@ def add_new_lm(particle, z, Q):
Gz = np.array([[c, -r * s],
[s, r * c]])
particle.lmP[2 * lm_id:2 * lm_id + 2] = Gz @ Q @ Gz.T
particle.lmP[2 * lm_id:2 * lm_id + 2] = Gz @ Q_cov @ Gz.T
return particle
def compute_jacobians(particle, xf, Pf, Q):
def compute_jacobians(particle, xf, Pf, Q_cov):
dx = xf[0, 0] - particle.x
dy = xf[1, 0] - particle.y
d2 = dx**2 + dy**2
d2 = dx ** 2 + dy ** 2
d = math.sqrt(d2)
zp = np.array(
@@ -142,14 +136,14 @@ def compute_jacobians(particle, xf, Pf, Q):
Hf = np.array([[dx / d, dy / d],
[-dy / d2, dx / d2]])
Sf = Hf @ Pf @ Hf.T + Q
Sf = Hf @ Pf @ Hf.T + Q_cov
return zp, Hv, Hf, Sf
def update_KF_with_cholesky(xf, Pf, v, Q, Hf):
def update_kf_with_cholesky(xf, Pf, v, Q_cov, Hf):
PHt = Pf @ Hf.T
S = Hf @ PHt + Q
S = Hf @ PHt + Q_cov
S = (S + S.T) * 0.5
SChol = np.linalg.cholesky(S).T
@@ -163,18 +157,17 @@ def update_KF_with_cholesky(xf, Pf, v, Q, Hf):
return x, P
def update_landmark(particle, z, Q):
def update_landmark(particle, z, Q_cov):
lm_id = int(z[2])
xf = np.array(particle.lm[lm_id, :]).reshape(2, 1)
Pf = np.array(particle.lmP[2 * lm_id:2 * lm_id + 2])
zp, Hv, Hf, Sf = compute_jacobians(particle, xf, Pf, Q)
zp, Hv, Hf, Sf = compute_jacobians(particle, xf, Pf, Q_cov)
dz = z[0:2].reshape(2, 1) - zp
dz[1, 0] = pi_2_pi(dz[1, 0])
xf, Pf = update_KF_with_cholesky(xf, Pf, dz, Q, Hf)
xf, Pf = update_kf_with_cholesky(xf, Pf, dz, Q, Hf)
particle.lm[lm_id, :] = xf.T
particle.lmP[2 * lm_id:2 * lm_id + 2, :] = Pf
@@ -182,12 +175,11 @@ def update_landmark(particle, z, Q):
return particle
def compute_weight(particle, z, Q):
def compute_weight(particle, z, Q_cov):
lm_id = int(z[2])
xf = np.array(particle.lm[lm_id, :]).reshape(2, 1)
Pf = np.array(particle.lmP[2 * lm_id:2 * lm_id + 2])
zp, Hv, Hf, Sf = compute_jacobians(particle, xf, Pf, Q)
zp, Hv, Hf, Sf = compute_jacobians(particle, xf, Pf, Q_cov)
dz = z[0:2].reshape(2, 1) - zp
dz[1, 0] = pi_2_pi(dz[1, 0])
@@ -205,15 +197,14 @@ def compute_weight(particle, z, Q):
return w
def proposal_sampling(particle, z, Q):
def proposal_sampling(particle, z, Q_cov):
lm_id = int(z[2])
xf = particle.lm[lm_id, :].reshape(2, 1)
Pf = particle.lmP[2 * lm_id:2 * lm_id + 2]
# State
x = np.array([particle.x, particle.y, particle.yaw]).reshape(3, 1)
P = particle.P
zp, Hv, Hf, Sf = compute_jacobians(particle, xf, Pf, Q)
zp, Hv, Hf, Sf = compute_jacobians(particle, xf, Pf, Q_cov)
Sfi = np.linalg.inv(Sf)
dz = z[0:2].reshape(2, 1) - zp
@@ -232,7 +223,6 @@ def proposal_sampling(particle, z, Q):
def update_with_observation(particles, z):
for iz in range(len(z[0, :])):
lmid = int(z[2, iz])
@@ -265,17 +255,16 @@ def resampling(particles):
pw = np.array(pw)
Neff = 1.0 / (pw @ pw.T) # Effective particle number
# print(Neff)
if Neff < NTH: # resampling
wcum = np.cumsum(pw)
base = np.cumsum(pw * 0.0 + 1 / N_PARTICLE) - 1 / N_PARTICLE
resampleid = base + np.random.rand(base.shape[0]) / N_PARTICLE
resamplei_id = base + np.random.rand(base.shape[0]) / N_PARTICLE
inds = []
ind = 0
for ip in range(N_PARTICLE):
while ((ind < wcum.shape[0] - 1) and (resampleid[ip] > wcum[ind])):
while (ind < wcum.shape[0] - 1) and (resamplei_id[ip] > wcum[ind]):
ind += 1
inds.append(ind)
@@ -292,7 +281,6 @@ def resampling(particles):
def calc_input(time):
if time <= 3.0: # wait at first
v = 0.0
yawrate = 0.0
@@ -306,7 +294,6 @@ def calc_input(time):
def observation(xTrue, xd, u, RFID):
# calc true state
xTrue = motion_model(xTrue, u)
@@ -317,17 +304,17 @@ def observation(xTrue, xd, u, RFID):
dx = RFID[i, 0] - xTrue[0, 0]
dy = RFID[i, 1] - xTrue[1, 0]
d = math.sqrt(dx**2 + dy**2)
d = math.sqrt(dx ** 2 + dy ** 2)
angle = pi_2_pi(math.atan2(dy, dx) - xTrue[2, 0])
if d <= MAX_RANGE:
dn = d + np.random.randn() * Qsim[0, 0] # add noise
anglen = angle + np.random.randn() * Qsim[1, 1] # add noise
dn = d + np.random.randn() * Q_sim[0, 0] ** 0.5 # add noise
anglen = angle + np.random.randn() * Q_sim[1, 1] ** 0.5 # add noise
zi = np.array([dn, pi_2_pi(anglen), i]).reshape(3, 1)
z = np.hstack((z, zi))
# add noise to input
ud1 = u[0, 0] + np.random.randn() * Rsim[0, 0]
ud2 = u[1, 0] + np.random.randn() * Rsim[1, 1] + OFFSET_YAWRATE_NOISE
ud1 = u[0, 0] + np.random.randn() * R_sim[0, 0] ** 0.5
ud2 = u[1, 0] + np.random.randn() * R_sim[1, 1] ** 0.5 + OFFSET_YAW_RATE_NOISE
ud = np.array([ud1, ud2]).reshape(2, 1)
xd = motion_model(xd, ud)
@@ -336,7 +323,6 @@ def observation(xTrue, xd, u, RFID):
def motion_model(x, u):
F = np.array([[1.0, 0, 0],
[0, 1.0, 0],
[0, 0, 1.0]])
@@ -383,7 +369,7 @@ def main():
hxTrue = xTrue
hxDR = xTrue
particles = [Particle(N_LM) for i in range(N_PARTICLE)]
particles = [Particle(N_LM) for _ in range(N_PARTICLE)]
while SIM_TIME >= time:
time += DT
@@ -409,7 +395,7 @@ def main():
for iz in range(len(z[:, 0])):
lmid = int(z[2, iz])
plt.plot([xEst[0], RFID[lmid, 0]], [
xEst[1], RFID[lmid, 1]], "-k")
xEst[1], RFID[lmid, 1]], "-k")
for i in range(N_PARTICLE):
plt.plot(particles[i].x, particles[i].y, ".r")