fix batch_informed_rrtstar.py

This commit is contained in:
Atsushi Sakai
2019-08-03 13:45:20 +09:00
parent 3e017c9170
commit 38373ac7de

View File

@@ -14,10 +14,11 @@ author: Karan Chawla(@karanchawla)
Reference: https://arxiv.org/abs/1405.5848
"""
import random
import numpy as np
import math
import random
import matplotlib.pyplot as plt
import numpy as np
show_animation = True
@@ -26,8 +27,14 @@ class RTree(object):
# Class to represent the explicit tree created
# while sampling through the state space
def __init__(self, start=[0, 0], lowerLimit=[0, 0], upperLimit=[10, 10], resolution=1):
def __init__(self, start=None, lowerLimit=None, upperLimit=None, resolution=1.0):
if upperLimit is None:
upperLimit = [10, 10]
if lowerLimit is None:
lowerLimit = [0, 0]
if start is None:
start = [0, 0]
self.vertices = dict()
self.edges = []
self.start = start
@@ -42,20 +49,21 @@ class RTree(object):
self.num_cells[idx] = np.ceil(
(upperLimit[idx] - lowerLimit[idx]) / resolution)
vertex_id = self.realWorldToNodeId(start)
vertex_id = self.real_world_to_node_id(start)
self.vertices[vertex_id] = []
def getRootId(self):
@staticmethod
def get_root_id():
# return the id of the root of the tree
return 0
def addVertex(self, vertex):
def add_vertex(self, vertex):
# add a vertex to the tree
vertex_id = self.realWorldToNodeId(vertex)
vertex_id = self.real_world_to_node_id(vertex)
self.vertices[vertex_id] = []
return vertex_id
def addEdge(self, v, x):
def add_edge(self, v, x):
# create an edge between v and x vertices
if (v, x) not in self.edges:
self.edges.append((v, x))
@@ -63,7 +71,7 @@ class RTree(object):
self.vertices[v].append(x)
self.vertices[x].append(v)
def realCoordsToGridCoord(self, real_coord):
def real_coords_to_grid_coord(self, real_coord):
# convert real world coordinates to grid space
# depends on the resolution of the grid
# the output is the same as real world coords if the resolution
@@ -71,10 +79,10 @@ class RTree(object):
coord = [0] * self.dimension
for i in range(len(coord)):
start = self.lowerLimit[i] # start of the grid space
coord[i] = np.around((real_coord[i] - start) / self.resolution)
coord[i] = int(np.around((real_coord[i] - start) / self.resolution))
return coord
def gridCoordinateToNodeId(self, coord):
def grid_coordinate_to_node_id(self, coord):
# This function maps a grid coordinate to a unique
# node id
nodeId = 0
@@ -85,12 +93,12 @@ class RTree(object):
nodeId = nodeId + coord[i] * product
return nodeId
def realWorldToNodeId(self, real_coord):
def real_world_to_node_id(self, real_coord):
# first convert the given coordinates to grid space and then
# convert the grid space coordinates to a unique node id
return self.gridCoordinateToNodeId(self.realCoordsToGridCoord(real_coord))
return self.grid_coordinate_to_node_id(self.real_coords_to_grid_coord(real_coord))
def gridCoordToRealWorldCoord(self, coord):
def grid_coord_to_real_world_coord(self, coord):
# This function smaps a grid coordinate in discrete space
# to a configuration in the full configuration space
config = [0] * self.dimension
@@ -102,7 +110,7 @@ class RTree(object):
config[i] = start + grid_step
return config
def nodeIdToGridCoord(self, node_id):
def node_id_to_grid_coord(self, node_id):
# This function maps a node id to the associated
# grid coordinate
coord = [0] * len(self.lowerLimit)
@@ -115,12 +123,10 @@ class RTree(object):
node_id = node_id - (coord[i] * prod)
return coord
def nodeIdToRealWorldCoord(self, nid):
# This function maps a node in discrete space to a configuraiton
def node_id_to_real_world_coord(self, nid):
# This function maps a node in discrete space to a configuration
# in the full configuration space
return self.gridCoordToRealWorldCoord(self.nodeIdToGridCoord(nid))
# Uses Batch Informed Trees to find a path from start to goal
return self.grid_coord_to_real_world_coord(self.node_id_to_grid_coord(nid))
class BITStar(object):
@@ -135,6 +141,8 @@ class BITStar(object):
self.maxrand = randArea[1]
self.maxIter = maxIter
self.obstacleList = obstacleList
self.startId = None
self.goalId = None
self.vertex_queue = []
self.edge_queue = []
@@ -154,8 +162,8 @@ class BITStar(object):
upperLimit=upperLimit, resolution=0.01)
def setup_planning(self):
self.startId = self.tree.realWorldToNodeId(self.start)
self.goalId = self.tree.realWorldToNodeId(self.goal)
self.startId = self.tree.real_world_to_node_id(self.start)
self.goalId = self.tree.real_world_to_node_id(self.goal)
# add goal to the samples
self.samples[self.goalId] = self.goal
@@ -163,9 +171,9 @@ class BITStar(object):
self.f_scores[self.goalId] = 0
# add the start id to the tree
self.tree.addVertex(self.start)
self.tree.add_vertex(self.start)
self.g_scores[self.startId] = 0
self.f_scores[self.startId] = self.computeHeuristicCost(
self.f_scores[self.startId] = self.compute_heuristic_cost(
self.startId, self.goalId)
# max length we expect to find in our 'informed' sample space, starts as infinite
@@ -182,11 +190,11 @@ class BITStar(object):
# first column of idenity matrix transposed
id1_t = np.array([1.0, 0.0, 0.0]).reshape(1, 3)
M = np.dot(a1, id1_t)
U, S, Vh = np.linalg.svd(M, 1, 1)
U, S, Vh = np.linalg.svd(M, True, True)
C = np.dot(np.dot(U, np.diag(
[1.0, 1.0, np.linalg.det(U) * np.linalg.det(np.transpose(Vh))])), Vh)
self.samples.update(self.informedSample(
self.samples.update(self.informed_sample(
200, cBest, cMin, xCenter, C))
return etheta, cMin, xCenter, C, cBest
@@ -207,7 +215,7 @@ class BITStar(object):
else:
m = 100
cBest = self.g_scores[self.goalId]
self.samples.update(self.informedSample(
self.samples.update(self.informed_sample(
m, cBest, cMin, xCenter, C))
# make the old vertices the new vertices
@@ -225,26 +233,25 @@ class BITStar(object):
foundGoal = False
# run until done
while (iterations < self.maxIter):
while iterations < self.maxIter:
cBest = self.setup_sample(iterations,
foundGoal, cMin, xCenter, C, cBest)
# expand the best vertices until an edge is better than the vertex
# this is done because the vertex cost represents the lower bound
# on the edge cost
while(self.bestVertexQueueValue() <= self.bestEdgeQueueValue()):
self.expandVertex(self.bestInVertexQueue())
while self.best_vertex_queue_value() <= self.best_edge_queue_value():
self.expand_vertex(self.best_in_vertex_queue())
# add the best edge to the tree
bestEdge = self.bestInEdgeQueue()
bestEdge = self.best_in_edge_queue()
self.edge_queue.remove(bestEdge)
# Check if this can improve the current solution
estimatedCostOfVertex = self.g_scores[bestEdge[0]] + self.computeDistanceCost(
bestEdge[0], bestEdge[1]) + self.computeHeuristicCost(bestEdge[1], self.goalId)
estimatedCostOfEdge = self.computeDistanceCost(self.startId, bestEdge[0]) + self.computeHeuristicCost(
bestEdge[0], bestEdge[1]) + self.computeHeuristicCost(bestEdge[1], self.goalId)
actualCostOfEdge = self.g_scores[bestEdge[0]] + \
self.computeDistanceCost(bestEdge[0], bestEdge[1])
estimatedCostOfVertex = self.g_scores[bestEdge[0]] + self.compute_distance_cost(
bestEdge[0], bestEdge[1]) + self.compute_heuristic_cost(bestEdge[1], self.goalId)
estimatedCostOfEdge = self.compute_distance_cost(self.startId, bestEdge[0]) + self.compute_heuristic_cost(
bestEdge[0], bestEdge[1]) + self.compute_heuristic_cost(bestEdge[1], self.goalId)
actualCostOfEdge = self.g_scores[bestEdge[0]] + self.compute_distance_cost(bestEdge[0], bestEdge[1])
f1 = estimatedCostOfVertex < self.g_scores[self.goalId]
f2 = estimatedCostOfEdge < self.g_scores[self.goalId]
@@ -252,46 +259,44 @@ class BITStar(object):
if f1 and f2 and f3:
# connect this edge
firstCoord = self.tree.nodeIdToRealWorldCoord(
firstCoord = self.tree.node_id_to_real_world_coord(
bestEdge[0])
secondCoord = self.tree.nodeIdToRealWorldCoord(
secondCoord = self.tree.node_id_to_real_world_coord(
bestEdge[1])
path = self.connect(firstCoord, secondCoord)
lastEdge = self.tree.realWorldToNodeId(secondCoord)
lastEdge = self.tree.real_world_to_node_id(secondCoord)
if path is None or len(path) == 0:
continue
nextCoord = path[len(path) - 1, :]
nextCoordPathId = self.tree.realWorldToNodeId(
nextCoordPathId = self.tree.real_world_to_node_id(
nextCoord)
bestEdge = (bestEdge[0], nextCoordPathId)
if(bestEdge[1] in self.tree.vertices.keys()):
if bestEdge[1] in self.tree.vertices.keys():
continue
else:
try:
del self.samples[bestEdge[1]]
except(KeyError):
except KeyError:
pass
eid = self.tree.addVertex(nextCoord)
eid = self.tree.add_vertex(nextCoord)
self.vertex_queue.append(eid)
if eid == self.goalId or bestEdge[0] == self.goalId or bestEdge[1] == self.goalId:
print("Goal found")
foundGoal = True
self.tree.addEdge(bestEdge[0], bestEdge[1])
self.tree.add_edge(bestEdge[0], bestEdge[1])
g_score = self.computeDistanceCost(
g_score = self.compute_distance_cost(
bestEdge[0], bestEdge[1])
self.g_scores[bestEdge[1]] = g_score + \
self.g_scores[bestEdge[0]]
self.f_scores[bestEdge[1]] = g_score + \
self.computeHeuristicCost(bestEdge[1], self.goalId)
self.updateGraph()
self.g_scores[bestEdge[1]] = g_score + self.g_scores[bestEdge[0]]
self.f_scores[bestEdge[1]] = g_score + self.compute_heuristic_cost(bestEdge[1], self.goalId)
self.update_graph()
# visualize new edge
if animation:
self.drawGraph(xCenter=xCenter, cBest=cBest,
cMin=cMin, etheta=etheta, samples=self.samples.values(),
start=firstCoord, end=secondCoord, tree=self.tree.edges)
self.draw_graph(xCenter=xCenter, cBest=cBest,
cMin=cMin, etheta=etheta, samples=self.samples.values(),
start=firstCoord, end=secondCoord)
self.remove_queue(lastEdge, bestEdge)
@@ -306,14 +311,13 @@ class BITStar(object):
return self.find_final_path()
def find_final_path(self):
plan = []
plan.append(self.goal)
plan = [self.goal]
currId = self.goalId
while (currId != self.startId):
plan.append(self.tree.nodeIdToRealWorldCoord(currId))
while currId != self.startId:
plan.append(self.tree.node_id_to_real_world_coord(currId))
try:
currId = self.nodes[currId]
except(KeyError):
except KeyError:
print("cannot find Path")
return []
@@ -324,29 +328,30 @@ class BITStar(object):
def remove_queue(self, lastEdge, bestEdge):
for edge in self.edge_queue:
if(edge[1] == bestEdge[1]):
if self.g_scores[edge[1]] + self.computeDistanceCost(edge[1], bestEdge[1]) >= self.g_scores[self.goalId]:
if(lastEdge, bestEdge[1]) in self.edge_queue:
if edge[1] == bestEdge[1]:
dist_cost = self.compute_distance_cost(edge[1], bestEdge[1])
if self.g_scores[edge[1]] + dist_cost >= self.g_scores[self.goalId]:
if (lastEdge, bestEdge[1]) in self.edge_queue:
self.edge_queue.remove(
(lastEdge, bestEdge[1]))
def connect(self, start, end):
# A function which attempts to extend from a start coordinates
# to goal coordinates
steps = int(self.computeDistanceCost(self.tree.realWorldToNodeId(
start), self.tree.realWorldToNodeId(end)) * 10)
steps = int(self.compute_distance_cost(self.tree.real_world_to_node_id(
start), self.tree.real_world_to_node_id(end)) * 10)
x = np.linspace(start[0], end[0], num=steps)
y = np.linspace(start[1], end[1], num=steps)
for i in range(len(x)):
if(self._collisionCheck(x[i], y[i])):
if(i == 0):
if self._collision_check(x[i], y[i]):
if i == 0:
return None
# if collision, send path until collision
return np.vstack((x[0:i], y[0:i])).transpose()
return np.vstack((x, y)).transpose()
def _collisionCheck(self, x, y):
def _collision_check(self, x, y):
for (ox, oy, size) in self.obstacleList:
dx = ox - x
dy = oy - y
@@ -355,45 +360,44 @@ class BITStar(object):
return True # collision
return False
# def prune(self, c):
def computeHeuristicCost(self, start_id, goal_id):
def compute_heuristic_cost(self, start_id, goal_id):
# Using Manhattan distance as heuristic
start = np.array(self.tree.nodeIdToRealWorldCoord(start_id))
goal = np.array(self.tree.nodeIdToRealWorldCoord(goal_id))
start = np.array(self.tree.node_id_to_real_world_coord(start_id))
goal = np.array(self.tree.node_id_to_real_world_coord(goal_id))
return np.linalg.norm(start - goal, 2)
def computeDistanceCost(self, vid, xid):
def compute_distance_cost(self, vid, xid):
# L2 norm distance
start = np.array(self.tree.nodeIdToRealWorldCoord(vid))
stop = np.array(self.tree.nodeIdToRealWorldCoord(xid))
start = np.array(self.tree.node_id_to_real_world_coord(vid))
stop = np.array(self.tree.node_id_to_real_world_coord(xid))
return np.linalg.norm(stop - start, 2)
# Sample free space confined in the radius of ball R
def informedSample(self, m, cMax, cMin, xCenter, C):
def informed_sample(self, m, cMax, cMin, xCenter, C):
samples = dict()
print("g_Score goal id: ", self.g_scores[self.goalId])
for i in range(m + 1):
if cMax < float('inf'):
r = [cMax / 2.0,
math.sqrt(cMax**2 - cMin**2) / 2.0,
math.sqrt(cMax**2 - cMin**2) / 2.0]
math.sqrt(cMax ** 2 - cMin ** 2) / 2.0,
math.sqrt(cMax ** 2 - cMin ** 2) / 2.0]
L = np.diag(r)
xBall = self.sampleUnitBall()
xBall = self.sample_unit_ball()
rnd = np.dot(np.dot(C, L), xBall) + xCenter
rnd = [rnd[(0, 0)], rnd[(1, 0)]]
random_id = self.tree.realWorldToNodeId(rnd)
random_id = self.tree.real_world_to_node_id(rnd)
samples[random_id] = rnd
else:
rnd = self.sampleFreeSpace()
random_id = self.tree.realWorldToNodeId(rnd)
rnd = self.sample_free_space()
random_id = self.tree.real_world_to_node_id(rnd)
samples[random_id] = rnd
return samples
# Sample point in a unit ball
def sampleUnitBall(self):
@staticmethod
def sample_unit_ball():
a = random.random()
b = random.random()
@@ -404,63 +408,64 @@ class BITStar(object):
b * math.sin(2 * math.pi * a / b))
return np.array([[sample[0]], [sample[1]], [0]])
def sampleFreeSpace(self):
def sample_free_space(self):
rnd = [random.uniform(self.minrand, self.maxrand),
random.uniform(self.minrand, self.maxrand)]
return rnd
def bestVertexQueueValue(self):
if(len(self.vertex_queue) == 0):
def best_vertex_queue_value(self):
if len(self.vertex_queue) == 0:
return float('inf')
values = [self.g_scores[v]
+ self.computeHeuristicCost(v, self.goalId) for v in self.vertex_queue]
+ self.compute_heuristic_cost(v, self.goalId) for v in self.vertex_queue]
values.sort()
return values[0]
def bestEdgeQueueValue(self):
if(len(self.edge_queue) == 0):
def best_edge_queue_value(self):
if len(self.edge_queue) == 0:
return float('inf')
# return the best value in the queue by score g_tau[v] + c(v,x) + h(x)
values = [self.g_scores[e[0]] + self.computeDistanceCost(e[0], e[1])
+ self.computeHeuristicCost(e[1], self.goalId) for e in self.edge_queue]
values = [self.g_scores[e[0]] + self.compute_distance_cost(e[0], e[1])
+ self.compute_heuristic_cost(e[1], self.goalId) for e in self.edge_queue]
values.sort(reverse=True)
return values[0]
def bestInVertexQueue(self):
def best_in_vertex_queue(self):
# return the best value in the vertex queue
v_plus_vals = [(v, self.g_scores[v] + self.computeHeuristicCost(v, self.goalId))
v_plus_vals = [(v, self.g_scores[v] + self.compute_heuristic_cost(v, self.goalId))
for v in self.vertex_queue]
v_plus_vals = sorted(v_plus_vals, key=lambda x: x[1])
# print(v_plus_vals)
return v_plus_vals[0][0]
def bestInEdgeQueue(self):
e_and_values = [(e[0], e[1], self.g_scores[e[0]] + self.computeDistanceCost(
e[0], e[1]) + self.computeHeuristicCost(e[1], self.goalId)) for e in self.edge_queue]
def best_in_edge_queue(self):
e_and_values = [(e[0], e[1], self.g_scores[e[0]] + self.compute_distance_cost(
e[0], e[1]) + self.compute_heuristic_cost(e[1], self.goalId)) for e in self.edge_queue]
e_and_values = sorted(e_and_values, key=lambda x: x[2])
return (e_and_values[0][0], e_and_values[0][1])
return e_and_values[0][0], e_and_values[0][1]
def expandVertex(self, vid):
def expand_vertex(self, vid):
self.vertex_queue.remove(vid)
# get the coordinates for given vid
currCoord = np.array(self.tree.nodeIdToRealWorldCoord(vid))
currCoord = np.array(self.tree.node_id_to_real_world_coord(vid))
# get the nearest value in vertex for every one in samples where difference is
# less than the radius
neigbors = []
for sid, scoord in self.samples.items():
scoord = np.array(scoord)
if(np.linalg.norm(scoord - currCoord, 2) <= self.r and sid != vid):
if np.linalg.norm(scoord - currCoord, 2) <= self.r and sid != vid:
neigbors.append((sid, scoord))
# add an edge to the edge queue is the path might improve the solution
for neighbor in neigbors:
sid = neighbor[0]
estimated_f_score = self.computeDistanceCost(
self.startId, vid) + self.computeHeuristicCost(sid, self.goalId) + self.computeDistanceCost(vid, sid)
h_cost = self.compute_heuristic_cost(sid, self.goalId)
estimated_f_score = self.compute_distance_cost(
self.startId, vid) + h_cost + self.compute_distance_cost(vid, sid)
if estimated_f_score < self.g_scores[self.goalId]:
self.edge_queue.append((vid, sid))
@@ -469,22 +474,22 @@ class BITStar(object):
def add_vertex_to_edge_queue(self, vid, currCoord):
if vid not in self.old_vertices:
neigbors = []
neighbors = []
for v, edges in self.tree.vertices.items():
if v != vid and (v, vid) not in self.edge_queue and (vid, v) not in self.edge_queue:
vcoord = self.tree.nodeIdToRealWorldCoord(v)
if(np.linalg.norm(vcoord - currCoord, 2) <= self.r):
neigbors.append((vid, vcoord))
v_coord = self.tree.node_id_to_real_world_coord(v)
if np.linalg.norm(currCoord - v_coord, 2) <= self.r:
neighbors.append((vid, v_coord))
for neighbor in neigbors:
for neighbor in neighbors:
sid = neighbor[0]
estimated_f_score = self.computeDistanceCost(self.startId, vid) + \
self.computeDistanceCost(
vid, sid) + self.computeHeuristicCost(sid, self.goalId)
if estimated_f_score < self.g_scores[self.goalId] and (self.g_scores[vid] + self.computeDistanceCost(vid, sid)) < self.g_scores[sid]:
estimated_f_score = self.compute_distance_cost(self.startId, vid) + self.compute_distance_cost(
vid, sid) + self.compute_heuristic_cost(sid, self.goalId)
if estimated_f_score < self.g_scores[self.goalId] and (
self.g_scores[vid] + self.compute_distance_cost(vid, sid)) < self.g_scores[sid]:
self.edge_queue.append((vid, sid))
def updateGraph(self):
def update_graph(self):
closedSet = []
openSet = []
currId = self.startId
@@ -498,22 +503,21 @@ class BITStar(object):
openSet.remove(currId)
# Check if we're at the goal
if(currId == self.goalId):
self.nodes[self.goalId]
if currId == self.goalId:
break
if(currId not in closedSet):
if currId not in closedSet:
closedSet.append(currId)
# find a non visited successor to the current node
successors = self.tree.vertices[currId]
for succesor in successors:
if(succesor in closedSet):
if succesor in closedSet:
continue
else:
# claculate tentative g score
g_score = self.g_scores[currId] + \
self.computeDistanceCost(currId, succesor)
self.compute_distance_cost(currId, succesor)
if succesor not in openSet:
# add the successor to open set
openSet.append(succesor)
@@ -522,14 +526,13 @@ class BITStar(object):
# update g and f scores
self.g_scores[succesor] = g_score
self.f_scores[succesor] = g_score + \
self.computeHeuristicCost(succesor, self.goalId)
self.f_scores[succesor] = g_score + self.compute_heuristic_cost(succesor, self.goalId)
# store the parent and child
self.nodes[succesor] = currId
def drawGraph(self, xCenter=None, cBest=None, cMin=None, etheta=None,
samples=None, start=None, end=None, tree=None):
def draw_graph(self, xCenter=None, cBest=None, cMin=None, etheta=None,
samples=None, start=None, end=None):
plt.clf()
for rnd in samples:
if rnd is not None:
@@ -549,9 +552,10 @@ class BITStar(object):
plt.grid(True)
plt.pause(0.01)
def plot_ellipse(self, xCenter, cBest, cMin, etheta): # pragma: no cover
@staticmethod
def plot_ellipse(xCenter, cBest, cMin, etheta): # pragma: no cover
a = math.sqrt(cBest**2 - cMin**2) / 2.0
a = math.sqrt(cBest ** 2 - cMin ** 2) / 2.0
b = cBest / 2.0
angle = math.pi / 2.0 - etheta
cx = xCenter[0]