From 38373ac7de43573dad62cb4fbb3d206e62dc3b15 Mon Sep 17 00:00:00 2001 From: Atsushi Sakai Date: Sat, 3 Aug 2019 13:45:20 +0900 Subject: [PATCH] fix batch_informed_rrtstar.py --- .../batch_informed_rrtstar.py | 250 +++++++++--------- 1 file changed, 127 insertions(+), 123 deletions(-) diff --git a/PathPlanning/BatchInformedRRTStar/batch_informed_rrtstar.py b/PathPlanning/BatchInformedRRTStar/batch_informed_rrtstar.py index 366ec5ac..37ac75ad 100644 --- a/PathPlanning/BatchInformedRRTStar/batch_informed_rrtstar.py +++ b/PathPlanning/BatchInformedRRTStar/batch_informed_rrtstar.py @@ -14,10 +14,11 @@ author: Karan Chawla(@karanchawla) Reference: https://arxiv.org/abs/1405.5848 """ -import random -import numpy as np import math +import random + import matplotlib.pyplot as plt +import numpy as np show_animation = True @@ -26,8 +27,14 @@ class RTree(object): # Class to represent the explicit tree created # while sampling through the state space - def __init__(self, start=[0, 0], lowerLimit=[0, 0], upperLimit=[10, 10], resolution=1): + def __init__(self, start=None, lowerLimit=None, upperLimit=None, resolution=1.0): + if upperLimit is None: + upperLimit = [10, 10] + if lowerLimit is None: + lowerLimit = [0, 0] + if start is None: + start = [0, 0] self.vertices = dict() self.edges = [] self.start = start @@ -42,20 +49,21 @@ class RTree(object): self.num_cells[idx] = np.ceil( (upperLimit[idx] - lowerLimit[idx]) / resolution) - vertex_id = self.realWorldToNodeId(start) + vertex_id = self.real_world_to_node_id(start) self.vertices[vertex_id] = [] - def getRootId(self): + @staticmethod + def get_root_id(): # return the id of the root of the tree return 0 - def addVertex(self, vertex): + def add_vertex(self, vertex): # add a vertex to the tree - vertex_id = self.realWorldToNodeId(vertex) + vertex_id = self.real_world_to_node_id(vertex) self.vertices[vertex_id] = [] return vertex_id - def addEdge(self, v, x): + def add_edge(self, v, x): # create an edge between v and x vertices if (v, x) not in self.edges: self.edges.append((v, x)) @@ -63,7 +71,7 @@ class RTree(object): self.vertices[v].append(x) self.vertices[x].append(v) - def realCoordsToGridCoord(self, real_coord): + def real_coords_to_grid_coord(self, real_coord): # convert real world coordinates to grid space # depends on the resolution of the grid # the output is the same as real world coords if the resolution @@ -71,10 +79,10 @@ class RTree(object): coord = [0] * self.dimension for i in range(len(coord)): start = self.lowerLimit[i] # start of the grid space - coord[i] = np.around((real_coord[i] - start) / self.resolution) + coord[i] = int(np.around((real_coord[i] - start) / self.resolution)) return coord - def gridCoordinateToNodeId(self, coord): + def grid_coordinate_to_node_id(self, coord): # This function maps a grid coordinate to a unique # node id nodeId = 0 @@ -85,12 +93,12 @@ class RTree(object): nodeId = nodeId + coord[i] * product return nodeId - def realWorldToNodeId(self, real_coord): + def real_world_to_node_id(self, real_coord): # first convert the given coordinates to grid space and then # convert the grid space coordinates to a unique node id - return self.gridCoordinateToNodeId(self.realCoordsToGridCoord(real_coord)) + return self.grid_coordinate_to_node_id(self.real_coords_to_grid_coord(real_coord)) - def gridCoordToRealWorldCoord(self, coord): + def grid_coord_to_real_world_coord(self, coord): # This function smaps a grid coordinate in discrete space # to a configuration in the full configuration space config = [0] * self.dimension @@ -102,7 +110,7 @@ class RTree(object): config[i] = start + grid_step return config - def nodeIdToGridCoord(self, node_id): + def node_id_to_grid_coord(self, node_id): # This function maps a node id to the associated # grid coordinate coord = [0] * len(self.lowerLimit) @@ -115,12 +123,10 @@ class RTree(object): node_id = node_id - (coord[i] * prod) return coord - def nodeIdToRealWorldCoord(self, nid): - # This function maps a node in discrete space to a configuraiton + def node_id_to_real_world_coord(self, nid): + # This function maps a node in discrete space to a configuration # in the full configuration space - return self.gridCoordToRealWorldCoord(self.nodeIdToGridCoord(nid)) - -# Uses Batch Informed Trees to find a path from start to goal + return self.grid_coord_to_real_world_coord(self.node_id_to_grid_coord(nid)) class BITStar(object): @@ -135,6 +141,8 @@ class BITStar(object): self.maxrand = randArea[1] self.maxIter = maxIter self.obstacleList = obstacleList + self.startId = None + self.goalId = None self.vertex_queue = [] self.edge_queue = [] @@ -154,8 +162,8 @@ class BITStar(object): upperLimit=upperLimit, resolution=0.01) def setup_planning(self): - self.startId = self.tree.realWorldToNodeId(self.start) - self.goalId = self.tree.realWorldToNodeId(self.goal) + self.startId = self.tree.real_world_to_node_id(self.start) + self.goalId = self.tree.real_world_to_node_id(self.goal) # add goal to the samples self.samples[self.goalId] = self.goal @@ -163,9 +171,9 @@ class BITStar(object): self.f_scores[self.goalId] = 0 # add the start id to the tree - self.tree.addVertex(self.start) + self.tree.add_vertex(self.start) self.g_scores[self.startId] = 0 - self.f_scores[self.startId] = self.computeHeuristicCost( + self.f_scores[self.startId] = self.compute_heuristic_cost( self.startId, self.goalId) # max length we expect to find in our 'informed' sample space, starts as infinite @@ -182,11 +190,11 @@ class BITStar(object): # first column of idenity matrix transposed id1_t = np.array([1.0, 0.0, 0.0]).reshape(1, 3) M = np.dot(a1, id1_t) - U, S, Vh = np.linalg.svd(M, 1, 1) + U, S, Vh = np.linalg.svd(M, True, True) C = np.dot(np.dot(U, np.diag( [1.0, 1.0, np.linalg.det(U) * np.linalg.det(np.transpose(Vh))])), Vh) - self.samples.update(self.informedSample( + self.samples.update(self.informed_sample( 200, cBest, cMin, xCenter, C)) return etheta, cMin, xCenter, C, cBest @@ -207,7 +215,7 @@ class BITStar(object): else: m = 100 cBest = self.g_scores[self.goalId] - self.samples.update(self.informedSample( + self.samples.update(self.informed_sample( m, cBest, cMin, xCenter, C)) # make the old vertices the new vertices @@ -225,26 +233,25 @@ class BITStar(object): foundGoal = False # run until done - while (iterations < self.maxIter): + while iterations < self.maxIter: cBest = self.setup_sample(iterations, foundGoal, cMin, xCenter, C, cBest) # expand the best vertices until an edge is better than the vertex # this is done because the vertex cost represents the lower bound # on the edge cost - while(self.bestVertexQueueValue() <= self.bestEdgeQueueValue()): - self.expandVertex(self.bestInVertexQueue()) + while self.best_vertex_queue_value() <= self.best_edge_queue_value(): + self.expand_vertex(self.best_in_vertex_queue()) # add the best edge to the tree - bestEdge = self.bestInEdgeQueue() + bestEdge = self.best_in_edge_queue() self.edge_queue.remove(bestEdge) # Check if this can improve the current solution - estimatedCostOfVertex = self.g_scores[bestEdge[0]] + self.computeDistanceCost( - bestEdge[0], bestEdge[1]) + self.computeHeuristicCost(bestEdge[1], self.goalId) - estimatedCostOfEdge = self.computeDistanceCost(self.startId, bestEdge[0]) + self.computeHeuristicCost( - bestEdge[0], bestEdge[1]) + self.computeHeuristicCost(bestEdge[1], self.goalId) - actualCostOfEdge = self.g_scores[bestEdge[0]] + \ - self.computeDistanceCost(bestEdge[0], bestEdge[1]) + estimatedCostOfVertex = self.g_scores[bestEdge[0]] + self.compute_distance_cost( + bestEdge[0], bestEdge[1]) + self.compute_heuristic_cost(bestEdge[1], self.goalId) + estimatedCostOfEdge = self.compute_distance_cost(self.startId, bestEdge[0]) + self.compute_heuristic_cost( + bestEdge[0], bestEdge[1]) + self.compute_heuristic_cost(bestEdge[1], self.goalId) + actualCostOfEdge = self.g_scores[bestEdge[0]] + self.compute_distance_cost(bestEdge[0], bestEdge[1]) f1 = estimatedCostOfVertex < self.g_scores[self.goalId] f2 = estimatedCostOfEdge < self.g_scores[self.goalId] @@ -252,46 +259,44 @@ class BITStar(object): if f1 and f2 and f3: # connect this edge - firstCoord = self.tree.nodeIdToRealWorldCoord( + firstCoord = self.tree.node_id_to_real_world_coord( bestEdge[0]) - secondCoord = self.tree.nodeIdToRealWorldCoord( + secondCoord = self.tree.node_id_to_real_world_coord( bestEdge[1]) path = self.connect(firstCoord, secondCoord) - lastEdge = self.tree.realWorldToNodeId(secondCoord) + lastEdge = self.tree.real_world_to_node_id(secondCoord) if path is None or len(path) == 0: continue nextCoord = path[len(path) - 1, :] - nextCoordPathId = self.tree.realWorldToNodeId( + nextCoordPathId = self.tree.real_world_to_node_id( nextCoord) bestEdge = (bestEdge[0], nextCoordPathId) - if(bestEdge[1] in self.tree.vertices.keys()): + if bestEdge[1] in self.tree.vertices.keys(): continue else: try: del self.samples[bestEdge[1]] - except(KeyError): + except KeyError: pass - eid = self.tree.addVertex(nextCoord) + eid = self.tree.add_vertex(nextCoord) self.vertex_queue.append(eid) if eid == self.goalId or bestEdge[0] == self.goalId or bestEdge[1] == self.goalId: print("Goal found") foundGoal = True - self.tree.addEdge(bestEdge[0], bestEdge[1]) + self.tree.add_edge(bestEdge[0], bestEdge[1]) - g_score = self.computeDistanceCost( + g_score = self.compute_distance_cost( bestEdge[0], bestEdge[1]) - self.g_scores[bestEdge[1]] = g_score + \ - self.g_scores[bestEdge[0]] - self.f_scores[bestEdge[1]] = g_score + \ - self.computeHeuristicCost(bestEdge[1], self.goalId) - self.updateGraph() + self.g_scores[bestEdge[1]] = g_score + self.g_scores[bestEdge[0]] + self.f_scores[bestEdge[1]] = g_score + self.compute_heuristic_cost(bestEdge[1], self.goalId) + self.update_graph() # visualize new edge if animation: - self.drawGraph(xCenter=xCenter, cBest=cBest, - cMin=cMin, etheta=etheta, samples=self.samples.values(), - start=firstCoord, end=secondCoord, tree=self.tree.edges) + self.draw_graph(xCenter=xCenter, cBest=cBest, + cMin=cMin, etheta=etheta, samples=self.samples.values(), + start=firstCoord, end=secondCoord) self.remove_queue(lastEdge, bestEdge) @@ -306,14 +311,13 @@ class BITStar(object): return self.find_final_path() def find_final_path(self): - plan = [] - plan.append(self.goal) + plan = [self.goal] currId = self.goalId - while (currId != self.startId): - plan.append(self.tree.nodeIdToRealWorldCoord(currId)) + while currId != self.startId: + plan.append(self.tree.node_id_to_real_world_coord(currId)) try: currId = self.nodes[currId] - except(KeyError): + except KeyError: print("cannot find Path") return [] @@ -324,29 +328,30 @@ class BITStar(object): def remove_queue(self, lastEdge, bestEdge): for edge in self.edge_queue: - if(edge[1] == bestEdge[1]): - if self.g_scores[edge[1]] + self.computeDistanceCost(edge[1], bestEdge[1]) >= self.g_scores[self.goalId]: - if(lastEdge, bestEdge[1]) in self.edge_queue: + if edge[1] == bestEdge[1]: + dist_cost = self.compute_distance_cost(edge[1], bestEdge[1]) + if self.g_scores[edge[1]] + dist_cost >= self.g_scores[self.goalId]: + if (lastEdge, bestEdge[1]) in self.edge_queue: self.edge_queue.remove( (lastEdge, bestEdge[1])) def connect(self, start, end): # A function which attempts to extend from a start coordinates # to goal coordinates - steps = int(self.computeDistanceCost(self.tree.realWorldToNodeId( - start), self.tree.realWorldToNodeId(end)) * 10) + steps = int(self.compute_distance_cost(self.tree.real_world_to_node_id( + start), self.tree.real_world_to_node_id(end)) * 10) x = np.linspace(start[0], end[0], num=steps) y = np.linspace(start[1], end[1], num=steps) for i in range(len(x)): - if(self._collisionCheck(x[i], y[i])): - if(i == 0): + if self._collision_check(x[i], y[i]): + if i == 0: return None # if collision, send path until collision return np.vstack((x[0:i], y[0:i])).transpose() return np.vstack((x, y)).transpose() - def _collisionCheck(self, x, y): + def _collision_check(self, x, y): for (ox, oy, size) in self.obstacleList: dx = ox - x dy = oy - y @@ -355,45 +360,44 @@ class BITStar(object): return True # collision return False - # def prune(self, c): - - def computeHeuristicCost(self, start_id, goal_id): + def compute_heuristic_cost(self, start_id, goal_id): # Using Manhattan distance as heuristic - start = np.array(self.tree.nodeIdToRealWorldCoord(start_id)) - goal = np.array(self.tree.nodeIdToRealWorldCoord(goal_id)) + start = np.array(self.tree.node_id_to_real_world_coord(start_id)) + goal = np.array(self.tree.node_id_to_real_world_coord(goal_id)) return np.linalg.norm(start - goal, 2) - def computeDistanceCost(self, vid, xid): + def compute_distance_cost(self, vid, xid): # L2 norm distance - start = np.array(self.tree.nodeIdToRealWorldCoord(vid)) - stop = np.array(self.tree.nodeIdToRealWorldCoord(xid)) + start = np.array(self.tree.node_id_to_real_world_coord(vid)) + stop = np.array(self.tree.node_id_to_real_world_coord(xid)) return np.linalg.norm(stop - start, 2) # Sample free space confined in the radius of ball R - def informedSample(self, m, cMax, cMin, xCenter, C): + def informed_sample(self, m, cMax, cMin, xCenter, C): samples = dict() print("g_Score goal id: ", self.g_scores[self.goalId]) for i in range(m + 1): if cMax < float('inf'): r = [cMax / 2.0, - math.sqrt(cMax**2 - cMin**2) / 2.0, - math.sqrt(cMax**2 - cMin**2) / 2.0] + math.sqrt(cMax ** 2 - cMin ** 2) / 2.0, + math.sqrt(cMax ** 2 - cMin ** 2) / 2.0] L = np.diag(r) - xBall = self.sampleUnitBall() + xBall = self.sample_unit_ball() rnd = np.dot(np.dot(C, L), xBall) + xCenter rnd = [rnd[(0, 0)], rnd[(1, 0)]] - random_id = self.tree.realWorldToNodeId(rnd) + random_id = self.tree.real_world_to_node_id(rnd) samples[random_id] = rnd else: - rnd = self.sampleFreeSpace() - random_id = self.tree.realWorldToNodeId(rnd) + rnd = self.sample_free_space() + random_id = self.tree.real_world_to_node_id(rnd) samples[random_id] = rnd return samples # Sample point in a unit ball - def sampleUnitBall(self): + @staticmethod + def sample_unit_ball(): a = random.random() b = random.random() @@ -404,63 +408,64 @@ class BITStar(object): b * math.sin(2 * math.pi * a / b)) return np.array([[sample[0]], [sample[1]], [0]]) - def sampleFreeSpace(self): + def sample_free_space(self): rnd = [random.uniform(self.minrand, self.maxrand), random.uniform(self.minrand, self.maxrand)] return rnd - def bestVertexQueueValue(self): - if(len(self.vertex_queue) == 0): + def best_vertex_queue_value(self): + if len(self.vertex_queue) == 0: return float('inf') values = [self.g_scores[v] - + self.computeHeuristicCost(v, self.goalId) for v in self.vertex_queue] + + self.compute_heuristic_cost(v, self.goalId) for v in self.vertex_queue] values.sort() return values[0] - def bestEdgeQueueValue(self): - if(len(self.edge_queue) == 0): + def best_edge_queue_value(self): + if len(self.edge_queue) == 0: return float('inf') # return the best value in the queue by score g_tau[v] + c(v,x) + h(x) - values = [self.g_scores[e[0]] + self.computeDistanceCost(e[0], e[1]) - + self.computeHeuristicCost(e[1], self.goalId) for e in self.edge_queue] + values = [self.g_scores[e[0]] + self.compute_distance_cost(e[0], e[1]) + + self.compute_heuristic_cost(e[1], self.goalId) for e in self.edge_queue] values.sort(reverse=True) return values[0] - def bestInVertexQueue(self): + def best_in_vertex_queue(self): # return the best value in the vertex queue - v_plus_vals = [(v, self.g_scores[v] + self.computeHeuristicCost(v, self.goalId)) + v_plus_vals = [(v, self.g_scores[v] + self.compute_heuristic_cost(v, self.goalId)) for v in self.vertex_queue] v_plus_vals = sorted(v_plus_vals, key=lambda x: x[1]) # print(v_plus_vals) return v_plus_vals[0][0] - def bestInEdgeQueue(self): - e_and_values = [(e[0], e[1], self.g_scores[e[0]] + self.computeDistanceCost( - e[0], e[1]) + self.computeHeuristicCost(e[1], self.goalId)) for e in self.edge_queue] + def best_in_edge_queue(self): + e_and_values = [(e[0], e[1], self.g_scores[e[0]] + self.compute_distance_cost( + e[0], e[1]) + self.compute_heuristic_cost(e[1], self.goalId)) for e in self.edge_queue] e_and_values = sorted(e_and_values, key=lambda x: x[2]) - return (e_and_values[0][0], e_and_values[0][1]) + return e_and_values[0][0], e_and_values[0][1] - def expandVertex(self, vid): + def expand_vertex(self, vid): self.vertex_queue.remove(vid) # get the coordinates for given vid - currCoord = np.array(self.tree.nodeIdToRealWorldCoord(vid)) + currCoord = np.array(self.tree.node_id_to_real_world_coord(vid)) # get the nearest value in vertex for every one in samples where difference is # less than the radius neigbors = [] for sid, scoord in self.samples.items(): scoord = np.array(scoord) - if(np.linalg.norm(scoord - currCoord, 2) <= self.r and sid != vid): + if np.linalg.norm(scoord - currCoord, 2) <= self.r and sid != vid: neigbors.append((sid, scoord)) # add an edge to the edge queue is the path might improve the solution for neighbor in neigbors: sid = neighbor[0] - estimated_f_score = self.computeDistanceCost( - self.startId, vid) + self.computeHeuristicCost(sid, self.goalId) + self.computeDistanceCost(vid, sid) + h_cost = self.compute_heuristic_cost(sid, self.goalId) + estimated_f_score = self.compute_distance_cost( + self.startId, vid) + h_cost + self.compute_distance_cost(vid, sid) if estimated_f_score < self.g_scores[self.goalId]: self.edge_queue.append((vid, sid)) @@ -469,22 +474,22 @@ class BITStar(object): def add_vertex_to_edge_queue(self, vid, currCoord): if vid not in self.old_vertices: - neigbors = [] + neighbors = [] for v, edges in self.tree.vertices.items(): if v != vid and (v, vid) not in self.edge_queue and (vid, v) not in self.edge_queue: - vcoord = self.tree.nodeIdToRealWorldCoord(v) - if(np.linalg.norm(vcoord - currCoord, 2) <= self.r): - neigbors.append((vid, vcoord)) + v_coord = self.tree.node_id_to_real_world_coord(v) + if np.linalg.norm(currCoord - v_coord, 2) <= self.r: + neighbors.append((vid, v_coord)) - for neighbor in neigbors: + for neighbor in neighbors: sid = neighbor[0] - estimated_f_score = self.computeDistanceCost(self.startId, vid) + \ - self.computeDistanceCost( - vid, sid) + self.computeHeuristicCost(sid, self.goalId) - if estimated_f_score < self.g_scores[self.goalId] and (self.g_scores[vid] + self.computeDistanceCost(vid, sid)) < self.g_scores[sid]: + estimated_f_score = self.compute_distance_cost(self.startId, vid) + self.compute_distance_cost( + vid, sid) + self.compute_heuristic_cost(sid, self.goalId) + if estimated_f_score < self.g_scores[self.goalId] and ( + self.g_scores[vid] + self.compute_distance_cost(vid, sid)) < self.g_scores[sid]: self.edge_queue.append((vid, sid)) - def updateGraph(self): + def update_graph(self): closedSet = [] openSet = [] currId = self.startId @@ -498,22 +503,21 @@ class BITStar(object): openSet.remove(currId) # Check if we're at the goal - if(currId == self.goalId): - self.nodes[self.goalId] + if currId == self.goalId: break - if(currId not in closedSet): + if currId not in closedSet: closedSet.append(currId) # find a non visited successor to the current node successors = self.tree.vertices[currId] for succesor in successors: - if(succesor in closedSet): + if succesor in closedSet: continue else: # claculate tentative g score g_score = self.g_scores[currId] + \ - self.computeDistanceCost(currId, succesor) + self.compute_distance_cost(currId, succesor) if succesor not in openSet: # add the successor to open set openSet.append(succesor) @@ -522,14 +526,13 @@ class BITStar(object): # update g and f scores self.g_scores[succesor] = g_score - self.f_scores[succesor] = g_score + \ - self.computeHeuristicCost(succesor, self.goalId) + self.f_scores[succesor] = g_score + self.compute_heuristic_cost(succesor, self.goalId) # store the parent and child self.nodes[succesor] = currId - def drawGraph(self, xCenter=None, cBest=None, cMin=None, etheta=None, - samples=None, start=None, end=None, tree=None): + def draw_graph(self, xCenter=None, cBest=None, cMin=None, etheta=None, + samples=None, start=None, end=None): plt.clf() for rnd in samples: if rnd is not None: @@ -549,9 +552,10 @@ class BITStar(object): plt.grid(True) plt.pause(0.01) - def plot_ellipse(self, xCenter, cBest, cMin, etheta): # pragma: no cover + @staticmethod + def plot_ellipse(xCenter, cBest, cMin, etheta): # pragma: no cover - a = math.sqrt(cBest**2 - cMin**2) / 2.0 + a = math.sqrt(cBest ** 2 - cMin ** 2) / 2.0 b = cBest / 2.0 angle = math.pi / 2.0 - etheta cx = xCenter[0]