Files
PythonRobotics/PathPlanning/BatchInformedRRTStar/batch_informed_rrt_star.py
Atsushi Sakai 73e1c0bebc Add "Code Link" sections and rename classes for consistency (#1214)
This commit adds "Code Link" sections to documentation across various path planning modules, linking to relevant class and function APIs. Additionally, several class renaming changes were made, such as `Dijkstra` to `DijkstraPlanner` and `eta3_trajectory` to `Eta3SplineTrajectory`, to enhance naming consistency. Minor fixes include file restructuring and image renaming for the RRT module.
2025-05-05 17:29:45 +09:00

636 lines
23 KiB
Python

"""
Batch Informed Trees based path planning:
Uses a heuristic to efficiently search increasingly dense
RGGs while reusing previous information. Provides faster
convergence that RRT*, Informed RRT* and other sampling based
methods.
Uses lazy connecting by combining sampling based methods and A*
like incremental graph search algorithms.
author: Karan Chawla(@karanchawla)
Atsushi Sakai(@Atsushi_twi)
Reference: https://arxiv.org/abs/1405.5848
"""
import math
import random
import matplotlib.pyplot as plt
import numpy as np
show_animation = True
class RTree:
# Class to represent the explicit tree created
# while sampling through the state space
def __init__(self, start=None, lowerLimit=None, upperLimit=None,
resolution=1.0):
if upperLimit is None:
upperLimit = [10, 10]
if lowerLimit is None:
lowerLimit = [0, 0]
if start is None:
start = [0, 0]
self.vertices = dict()
self.edges = []
self.start = start
self.lowerLimit = lowerLimit
self.upperLimit = upperLimit
self.dimension = len(lowerLimit)
self.num_cells = [0] * self.dimension
self.resolution = resolution
# compute the number of grid cells based on the limits and
# resolution given
for idx in range(self.dimension):
self.num_cells[idx] = np.ceil(
(upperLimit[idx] - lowerLimit[idx]) / resolution)
vertex_id = self.real_world_to_node_id(start)
self.vertices[vertex_id] = []
@staticmethod
def get_root_id():
# return the id of the root of the tree
return 0
def add_vertex(self, vertex):
# add a vertex to the tree
vertex_id = self.real_world_to_node_id(vertex)
self.vertices[vertex_id] = []
return vertex_id
def add_edge(self, v, x):
# create an edge between v and x vertices
if (v, x) not in self.edges:
self.edges.append((v, x))
# since the tree is undirected
self.vertices[v].append(x)
self.vertices[x].append(v)
def real_coords_to_grid_coord(self, real_coord):
# convert real world coordinates to grid space
# depends on the resolution of the grid
# the output is the same as real world coords if the resolution
# is set to 1
coord = [0] * self.dimension
for i in range(len(coord)):
start = self.lowerLimit[i] # start of the grid space
coord[i] = int(np.around((real_coord[i] - start) / self.resolution))
return coord
def grid_coordinate_to_node_id(self, coord):
# This function maps a grid coordinate to a unique
# node id
nodeId = 0
for i in range(len(coord) - 1, -1, -1):
product = 1
for j in range(0, i):
product = product * self.num_cells[j]
nodeId = nodeId + coord[i] * product
return nodeId
def real_world_to_node_id(self, real_coord):
# first convert the given coordinates to grid space and then
# convert the grid space coordinates to a unique node id
return self.grid_coordinate_to_node_id(
self.real_coords_to_grid_coord(real_coord))
def grid_coord_to_real_world_coord(self, coord):
# This function maps a grid coordinate in discrete space
# to a configuration in the full configuration space
config = [0] * self.dimension
for i in range(0, len(coord)):
# start of the real world / configuration space
start = self.lowerLimit[i]
# step from the coordinate in the grid
grid_step = self.resolution * coord[i]
config[i] = start + grid_step
return config
def node_id_to_grid_coord(self, node_id):
# This function maps a node id to the associated
# grid coordinate
coord = [0] * len(self.lowerLimit)
for i in range(len(coord) - 1, -1, -1):
# Get the product of the grid space maximums
prod = 1
for j in range(0, i):
prod = prod * self.num_cells[j]
coord[i] = np.floor(node_id / prod)
node_id = node_id - (coord[i] * prod)
return coord
def node_id_to_real_world_coord(self, nid):
# This function maps a node in discrete space to a configuration
# in the full configuration space
return self.grid_coord_to_real_world_coord(
self.node_id_to_grid_coord(nid))
class BITStar:
def __init__(self, start, goal,
obstacleList, randArea, eta=2.0,
maxIter=80):
self.start = start
self.goal = goal
self.min_rand = randArea[0]
self.max_rand = randArea[1]
self.max_iIter = maxIter
self.obstacleList = obstacleList
self.startId = None
self.goalId = None
self.vertex_queue = []
self.edge_queue = []
self.samples = dict()
self.g_scores = dict()
self.f_scores = dict()
self.nodes = dict()
self.r = float('inf')
self.eta = eta # tunable parameter
self.unit_ball_measure = 1
self.old_vertices = []
# initialize tree
lowerLimit = [randArea[0], randArea[0]]
upperLimit = [randArea[1], randArea[1]]
self.tree = RTree(start=start, lowerLimit=lowerLimit,
upperLimit=upperLimit, resolution=0.01)
def setup_planning(self):
self.startId = self.tree.real_world_to_node_id(self.start)
self.goalId = self.tree.real_world_to_node_id(self.goal)
# add goal to the samples
self.samples[self.goalId] = self.goal
self.g_scores[self.goalId] = float('inf')
self.f_scores[self.goalId] = 0
# add the start id to the tree
self.tree.add_vertex(self.start)
self.g_scores[self.startId] = 0
self.f_scores[self.startId] = self.compute_heuristic_cost(
self.startId, self.goalId)
# max length we expect to find in our 'informed' sample space, starts as infinite
cBest = self.g_scores[self.goalId]
# Computing the sampling space
cMin = math.hypot(self.start[0] - self.goal[0],
self.start[1] - self.goal[1]) / 1.5
xCenter = np.array([[(self.start[0] + self.goal[0]) / 2.0],
[(self.start[1] + self.goal[1]) / 2.0], [0]])
a1 = np.array([[(self.goal[0] - self.start[0]) / cMin],
[(self.goal[1] - self.start[1]) / cMin], [0]])
eTheta = math.atan2(a1[1, 0], a1[0, 0])
# first column of identity matrix transposed
id1_t = np.array([1.0, 0.0, 0.0]).reshape(1, 3)
M = np.dot(a1, id1_t)
U, S, Vh = np.linalg.svd(M, True, True)
C = np.dot(np.dot(U, np.diag(
[1.0, 1.0, np.linalg.det(U) * np.linalg.det(np.transpose(Vh))])),
Vh)
self.samples.update(self.informed_sample(
200, cBest, cMin, xCenter, C))
return eTheta, cMin, xCenter, C, cBest
def setup_sample(self, iterations, foundGoal, cMin, xCenter, C, cBest):
if len(self.vertex_queue) == 0 and len(self.edge_queue) == 0:
print("Batch: ", iterations)
# Using informed rrt star way of computing the samples
self.r = 2.0
if iterations != 0:
if foundGoal:
# a better way to do this would be to make number of samples
# a function of cMin
m = 200
self.samples = dict()
self.samples[self.goalId] = self.goal
else:
m = 100
cBest = self.g_scores[self.goalId]
self.samples.update(self.informed_sample(
m, cBest, cMin, xCenter, C))
# make the old vertices the new vertices
self.old_vertices += self.tree.vertices.keys()
# add the vertices to the vertex queue
for nid in self.tree.vertices.keys():
if nid not in self.vertex_queue:
self.vertex_queue.append(nid)
return cBest
def plan(self, animation=True):
eTheta, cMin, xCenter, C, cBest = self.setup_planning()
iterations = 0
foundGoal = False
# run until done
while iterations < self.max_iIter:
cBest = self.setup_sample(iterations,
foundGoal, cMin, xCenter, C, cBest)
# expand the best vertices until an edge is better than the vertex
# this is done because the vertex cost represents the lower bound
# on the edge cost
while self.best_vertex_queue_value() <= \
self.best_edge_queue_value():
self.expand_vertex(self.best_in_vertex_queue())
# add the best edge to the tree
bestEdge = self.best_in_edge_queue()
self.edge_queue.remove(bestEdge)
# Check if this can improve the current solution
estimatedCostOfVertex = self.g_scores[bestEdge[
0]] + self.compute_distance_cost(
bestEdge[0], bestEdge[1]) + self.compute_heuristic_cost(
bestEdge[1], self.goalId)
estimatedCostOfEdge = self.compute_distance_cost(
self.startId, bestEdge[0]) + self.compute_heuristic_cost(
bestEdge[0], bestEdge[1]) + self.compute_heuristic_cost(
bestEdge[1], self.goalId)
actualCostOfEdge = self.g_scores[
bestEdge[0]] + self.compute_distance_cost(
bestEdge[0], bestEdge[1])
f1 = estimatedCostOfVertex < self.g_scores[self.goalId]
f2 = estimatedCostOfEdge < self.g_scores[self.goalId]
f3 = actualCostOfEdge < self.g_scores[self.goalId]
if f1 and f2 and f3:
# connect this edge
firstCoord = self.tree.node_id_to_real_world_coord(
bestEdge[0])
secondCoord = self.tree.node_id_to_real_world_coord(
bestEdge[1])
path = self.connect(firstCoord, secondCoord)
lastEdge = self.tree.real_world_to_node_id(secondCoord)
if path is None or len(path) == 0:
continue
nextCoord = path[len(path) - 1, :]
nextCoordPathId = self.tree.real_world_to_node_id(
nextCoord)
bestEdge = (bestEdge[0], nextCoordPathId)
if bestEdge[1] in self.tree.vertices.keys():
continue
else:
try:
del self.samples[bestEdge[1]]
except KeyError:
# invalid sample key
pass
eid = self.tree.add_vertex(nextCoord)
self.vertex_queue.append(eid)
if eid == self.goalId or bestEdge[0] == self.goalId or \
bestEdge[1] == self.goalId:
print("Goal found")
foundGoal = True
self.tree.add_edge(bestEdge[0], bestEdge[1])
g_score = self.compute_distance_cost(
bestEdge[0], bestEdge[1])
self.g_scores[bestEdge[1]] = g_score + self.g_scores[
bestEdge[0]]
self.f_scores[
bestEdge[1]] = g_score + self.compute_heuristic_cost(
bestEdge[1], self.goalId)
self.update_graph()
# visualize new edge
if animation:
self.draw_graph(xCenter=xCenter, cBest=cBest,
cMin=cMin, eTheta=eTheta,
samples=self.samples.values(),
start=firstCoord, end=secondCoord)
self.remove_queue(lastEdge, bestEdge)
else:
print("Nothing good")
self.edge_queue = []
self.vertex_queue = []
iterations += 1
print("Finding the path")
return self.find_final_path()
def find_final_path(self):
plan = [self.goal]
currId = self.goalId
while currId != self.startId:
plan.append(self.tree.node_id_to_real_world_coord(currId))
try:
currId = self.nodes[currId]
except KeyError:
print("cannot find Path")
return []
plan.append(self.start)
plan = plan[::-1] # reverse the plan
return plan
def remove_queue(self, lastEdge, bestEdge):
for edge in self.edge_queue:
if edge[1] == bestEdge[1]:
dist_cost = self.compute_distance_cost(edge[1], bestEdge[1])
if self.g_scores[edge[1]] + dist_cost >= \
self.g_scores[self.goalId]:
if (lastEdge, bestEdge[1]) in self.edge_queue:
self.edge_queue.remove(
(lastEdge, bestEdge[1]))
def connect(self, start, end):
# A function which attempts to extend from a start coordinates
# to goal coordinates
steps = int(self.compute_distance_cost(
self.tree.real_world_to_node_id(start),
self.tree.real_world_to_node_id(end)) * 10)
x = np.linspace(start[0], end[0], num=steps)
y = np.linspace(start[1], end[1], num=steps)
for i in range(len(x)):
if self._collision_check(x[i], y[i]):
if i == 0:
return None
# if collision, send path until collision
return np.vstack((x[0:i], y[0:i])).transpose()
return np.vstack((x, y)).transpose()
def _collision_check(self, x, y):
for (ox, oy, size) in self.obstacleList:
dx = ox - x
dy = oy - y
d = dx * dx + dy * dy
if d <= size ** 2:
return True # collision
return False
def compute_heuristic_cost(self, start_id, goal_id):
# Using Manhattan distance as heuristic
start = np.array(self.tree.node_id_to_real_world_coord(start_id))
goal = np.array(self.tree.node_id_to_real_world_coord(goal_id))
return np.linalg.norm(start - goal, 2)
def compute_distance_cost(self, vid, xid):
# L2 norm distance
start = np.array(self.tree.node_id_to_real_world_coord(vid))
stop = np.array(self.tree.node_id_to_real_world_coord(xid))
return np.linalg.norm(stop - start, 2)
# Sample free space confined in the radius of ball R
def informed_sample(self, m, cMax, cMin, xCenter, C):
samples = dict()
print("g_Score goal id: ", self.g_scores[self.goalId])
for i in range(m + 1):
if cMax < float('inf'):
r = [cMax / 2.0,
math.sqrt(cMax ** 2 - cMin ** 2) / 2.0,
math.sqrt(cMax ** 2 - cMin ** 2) / 2.0]
L = np.diag(r)
xBall = self.sample_unit_ball()
rnd = np.dot(np.dot(C, L), xBall) + xCenter
rnd = [rnd[(0, 0)], rnd[(1, 0)]]
random_id = self.tree.real_world_to_node_id(rnd)
samples[random_id] = rnd
else:
rnd = self.sample_free_space()
random_id = self.tree.real_world_to_node_id(rnd)
samples[random_id] = rnd
return samples
# Sample point in a unit ball
@staticmethod
def sample_unit_ball():
a = random.random()
b = random.random()
if b < a:
a, b = b, a
sample = (b * math.cos(2 * math.pi * a / b),
b * math.sin(2 * math.pi * a / b))
return np.array([[sample[0]], [sample[1]], [0]])
def sample_free_space(self):
rnd = [random.uniform(self.min_rand, self.max_rand),
random.uniform(self.min_rand, self.max_rand)]
return rnd
def best_vertex_queue_value(self):
if len(self.vertex_queue) == 0:
return float('inf')
values = [self.g_scores[v]
+ self.compute_heuristic_cost(v, self.goalId) for v in
self.vertex_queue]
values.sort()
return values[0]
def best_edge_queue_value(self):
if len(self.edge_queue) == 0:
return float('inf')
# return the best value in the queue by score g_tau[v] + c(v,x) + h(x)
values = [self.g_scores[e[0]] + self.compute_distance_cost(e[0], e[1])
+ self.compute_heuristic_cost(e[1], self.goalId) for e in
self.edge_queue]
values.sort(reverse=True)
return values[0]
def best_in_vertex_queue(self):
# return the best value in the vertex queue
v_plus_values = [(v, self.g_scores[v] +
self.compute_heuristic_cost(v, self.goalId))
for v in self.vertex_queue]
v_plus_values = sorted(v_plus_values, key=lambda x: x[1])
# print(v_plus_values)
return v_plus_values[0][0]
def best_in_edge_queue(self):
e_and_values = [
(e[0], e[1], self.g_scores[e[0]] + self.compute_distance_cost(
e[0], e[1]) + self.compute_heuristic_cost(e[1], self.goalId))
for e in self.edge_queue]
e_and_values = sorted(e_and_values, key=lambda x: x[2])
return e_and_values[0][0], e_and_values[0][1]
def expand_vertex(self, vid):
self.vertex_queue.remove(vid)
# get the coordinates for given vid
currCoord = np.array(self.tree.node_id_to_real_world_coord(vid))
# get the nearest value in vertex for every one in samples where difference is
# less than the radius
neighbors = []
for sid, s_coord in self.samples.items():
s_coord = np.array(s_coord)
if np.linalg.norm(s_coord - currCoord, 2) <= self.r and sid != vid:
neighbors.append((sid, s_coord))
# add an edge to the edge queue is the path might improve the solution
for neighbor in neighbors:
sid = neighbor[0]
h_cost = self.compute_heuristic_cost(sid, self.goalId)
estimated_f_score = self.compute_distance_cost(
self.startId, vid) + h_cost + self.compute_distance_cost(vid,
sid)
if estimated_f_score < self.g_scores[self.goalId]:
self.edge_queue.append((vid, sid))
# add the vertex to the edge queue
self.add_vertex_to_edge_queue(vid, currCoord)
def add_vertex_to_edge_queue(self, vid, currCoord):
if vid not in self.old_vertices:
neighbors = []
for v, edges in self.tree.vertices.items():
if v != vid and (v, vid) not in self.edge_queue and \
(vid, v) not in self.edge_queue:
v_coord = self.tree.node_id_to_real_world_coord(v)
if np.linalg.norm(currCoord - v_coord, 2) <= self.r:
neighbors.append((vid, v_coord))
for neighbor in neighbors:
sid = neighbor[0]
estimated_f_score = self.compute_distance_cost(
self.startId, vid) + self.compute_distance_cost(
vid, sid) + self.compute_heuristic_cost(sid, self.goalId)
if estimated_f_score < self.g_scores[self.goalId] and (
self.g_scores[vid] +
self.compute_distance_cost(vid, sid)) < \
self.g_scores[sid]:
self.edge_queue.append((vid, sid))
def update_graph(self):
closedSet = []
openSet = []
currId = self.startId
openSet.append(currId)
while len(openSet) != 0:
# get the element with lowest f_score
currId = min(openSet, key=lambda x: self.f_scores[x])
# remove element from open set
openSet.remove(currId)
# Check if we're at the goal
if currId == self.goalId:
break
if currId not in closedSet:
closedSet.append(currId)
# find a non visited successor to the current node
successors = self.tree.vertices[currId]
for successor in successors:
if successor in closedSet:
continue
else:
# calculate tentative g score
g_score = self.g_scores[currId] + \
self.compute_distance_cost(currId, successor)
if successor not in openSet:
# add the successor to open set
openSet.append(successor)
elif g_score >= self.g_scores[successor]:
continue
# update g and f scores
self.g_scores[successor] = g_score
self.f_scores[
successor] = g_score + self.compute_heuristic_cost(
successor, self.goalId)
# store the parent and child
self.nodes[successor] = currId
def draw_graph(self, xCenter=None, cBest=None, cMin=None, eTheta=None,
samples=None, start=None, end=None):
plt.clf()
# for stopping simulation with the esc key.
plt.gcf().canvas.mpl_connect(
'key_release_event',
lambda event: [exit(0) if event.key == 'escape' else None])
for rnd in samples:
if rnd is not None:
plt.plot(rnd[0], rnd[1], "^k")
if cBest != float('inf'):
self.plot_ellipse(xCenter, cBest, cMin, eTheta)
if start is not None and end is not None:
plt.plot([start[0], start[1]], [end[0], end[1]], "-g")
for (ox, oy, size) in self.obstacleList:
plt.plot(ox, oy, "ok", ms=30 * size)
plt.plot(self.start[0], self.start[1], "xr")
plt.plot(self.goal[0], self.goal[1], "xr")
plt.axis([-2, 15, -2, 15])
plt.grid(True)
plt.pause(0.01)
@staticmethod
def plot_ellipse(xCenter, cBest, cMin, eTheta): # pragma: no cover
a = math.sqrt(cBest ** 2 - cMin ** 2) / 2.0
b = cBest / 2.0
angle = math.pi / 2.0 - eTheta
cx = xCenter[0]
cy = xCenter[1]
t = np.arange(0, 2 * math.pi + 0.1, 0.1)
x = [a * math.cos(it) for it in t]
y = [b * math.sin(it) for it in t]
R = np.array([[math.cos(angle), math.sin(angle)],
[-math.sin(angle), math.cos(angle)]])
fx = R @ np.array([x, y])
px = np.array(fx[0, :] + cx).flatten()
py = np.array(fx[1, :] + cy).flatten()
plt.plot(cx, cy, "xc")
plt.plot(px, py, "--c")
def main(maxIter=80):
print("Starting Batch Informed Trees Star planning")
obstacleList = [
(5, 5, 0.5),
(9, 6, 1),
(7, 5, 1),
(1, 5, 1),
(3, 6, 1),
(7, 9, 1)
]
bitStar = BITStar(start=[-1, 0], goal=[3, 8], obstacleList=obstacleList,
randArea=[-2, 15], maxIter=maxIter)
path = bitStar.plan(animation=show_animation)
print("Done")
if show_animation:
plt.plot([x for (x, y) in path], [y for (x, y) in path], '-r')
plt.grid(True)
plt.pause(0.05)
plt.show()
if __name__ == '__main__':
main()