diff --git a/PathPlanning/BatchInformedRRTStar/batch_informed_rrtstar.py b/PathPlanning/BatchInformedRRTStar/batch_informed_rrtstar.py index d9213db5..7b871b80 100644 --- a/PathPlanning/BatchInformedRRTStar/batch_informed_rrtstar.py +++ b/PathPlanning/BatchInformedRRTStar/batch_informed_rrtstar.py @@ -25,24 +25,45 @@ class Tree(object): self.start = start self.lowerLimit = lowerLimit self.upperLimit = upperLimit + self.dimension = len(lowerLimit) + + # compute the number of grid cells based on the limits and + # resolution given for idx in range(len(lowerLimit)): self.num_cells[idx] = np.ceil((upperLimit[idx] - lowerLimit[idx])/resolution) def getRootId(self): + # return the id of the root of the tree return 0 def addVertex(self, vertex): + # add a vertex to the tree vertex_id = self.gridCoordinateToNodeId(vertex) self.vertices[vertex_id] = [] return vertex_id def addEdge(self, v, x): + # create an edge between v and x vertices if (v, x) not in self.edges: self.edges.append((v,x)) + # since the tree is undirected self.vertices[v].append(x) self.vertices[x].append(v) + def realCoordsToGridCoord(self, real_coord): + # convert real world coordinates to grid space + # depends on the resolution of the grid + # the output is the same as real world coords if the resolution + # is set to 1 + coord = [0] * self.dimension + for i in xrange(0, len(coord)): + start = self.lower_limits[i] # start of the grid space + coord[i] = np.around((real_coord[i] - start)/ self.resolution) + return coord + def gridCoordinateToNodeId(self, coord): + # This function maps a grid coordinate to a unique + # node id nodeId = 0 for i in range(len(coord) - 1, -1, -1): product = 1 @@ -51,6 +72,39 @@ class Tree(object): node_id = node_id + coord[i] * product return node_id + def realWorldToNodeId(self, real_coord): + # first convert the given coordinates to grid space and then + # convert the grid space coordinates to a unique node id + return self.gridCoordinateToNodeId(self.realCoordsToGridCoord(real_coord)) + + def gridCoordToRealWorldCoord(self, coord): + # This function smaps a grid coordinate in discrete space + # to a configuration in the full configuration space + config = [0] * self.dimension + for i in range(0, len(coord)): + start = self.lower_limits[i] # start of the real world / configuration space + grid_step = self.resolution * coord[i] # step from the coordinate in the grid + half_step = self.resolution / 2 # To get to middle of the grid + config[i] = start + grid_step # + half_step + return config + + def nodeIdToGridCoord(self, node_id): + # This function maps a node id to the associated + # grid coordinate + coord = [0] * len(self.lowerLimit) + for i in range(len(coord) - 1, -1, -1): + # Get the product of the grid space maximums + prod = 1 + for j in range(0, i): + prod = prod * self.num_cells[j] + coord[i] = np.floor(node_id / prod) + node_id = node_id - (coord[i] * prod) + return coord + + def nodeIdToRealWorldCoord(self, nid): + # This function maps a node in discrete space to a configuraiton + # in the full configuration space + return self.gridCoordToRealWorldCoord(self.nodeIdToGridCoord(nid)) class Node(): @@ -65,14 +119,16 @@ class BITStar(): def __init__(self, start, goal, obstacleList, randArea, eta=2.0, expandDis=0.5, goalSampleRate=10, maxIter=200): - self.start = Node(start[0], start[1]) - self.goal = Node(goal[0], goal[1]) + self.start = start + self.goal = goal + self.minrand = randArea[0] self.maxrand = randArea[1] self.expandDis = expandDis self.goalSampleRate = goalSampleRate self.maxIter = maxIter self.obstacleList = obstacleList + self.vertex_queue = [] self.edge_queue = [] self.samples = dict() @@ -84,12 +140,26 @@ class BITStar(): self.old_vertices = [] def plan(self, animation=True): + # initialize tree + self.tree = Tree(self.start,[self.minrand, self.minrand], + [self.maxrand, self.maxrand], 1.0) + + self.startId = self.tree.realWorldToNodeId(self.start) + self.goalId = self.tree.realWorldToNodeId(self.goal) + + # add goal to the samples + self.samples[self.goalId] = self.goal + self.g_scores[self.goalId] = float('inf') + self.f_scores[self.goalId] = 0 + + # add the start id to the tree + self.tree.addVertex(self.start) + self.g_scores[self.startId] = 0 + self.f_scores[self.startId] = self.computeHeuristicCost(self.startId, self.goalId) - self.nodeList = [self.start] - plan = None iterations = 0 # max length we expect to find in our 'informed' sample space, starts as infinite - cBest = float('inf') + cBest = self.g_scores[self.goalId] pathLen = float('inf') solutionSet = set() path = None @@ -113,9 +183,68 @@ class BITStar(): # run until done while (iterations < self.maxIter): if len(self.vertex_queue) == 0 and len(self.edge_queue) == 0: - samples = self.informedSample(100, cBest, cMin, xCenter, C) + # Using informed rrt star way of computing the samples + self.samples.update(self.informedSample(200, cBest, cMin, xCenter, C)) # prune the tree + if iterations != 0: + self.samples.update(self.informedSample(200, cBest, cMin, xCenter, C)) + + # make the old vertices the new vertices + self.old_vertices += self.tree.vertices.keys() + # add the vertices to the vertex queue + for nid in self.tree.vertices.keys(): + if nid not in self.vertex_queue: + self.vertex_queue.append(nid) + # expand the best vertices until an edge is better than the vertex + # this is done because the vertex cost represents the lower bound + # on the edge cost + while(self.bestVertexQueueValue() <= self.bestEdgeQueueValue()): + self.expandVertex(self.bestInVertexQueue()) + + # add the best edge to the tree + bestEdge = self.bestInEdgeQueue() + self.edge_queue.remove(bestEdge) + + # Check if this can improve the current solution + estimatedCostOfVertex = self.g_scores[bestEdge[0]] + + self.computeDistanceCost(bestEdge[0], bestEdge[1]) + + self.computeHeuristicCost(bestEdge[1], self.goalId) + estimatedCostOfEdge = self.computeDistanceCost(self.startId, bestEdge[0]) + + self.computeHeuristicCost(bestEdge[0], bestEdge[1]) + + self.computeHeuristicCost(bestEdge[1], self.goalId) + actualCostOfEdge = self.g_scores[bestEdge[0]] + + self.computeDistanceCost(best_edge[0], best_edge[1]) + + if(estimatedCostOfVertex < self.g_scores[self.goalId]): + if(estimatedCostOfEdge < self.g_scores[self.goalId]): + if(actualCostOfEdge < self.g_scores[self.goalId]): + # connect this edge + firstCoord = self.tree.nodeIdToRealWorldCoord(bestEdge[0]) + secondCoord = self.tree.nodeIdToRealWorldCoord(bestEdge[1]) + path = self.connect(firstCoord, secondCoord) + if path == None or len(path) = 0: + continue + nextCoord = path[len(path) - 1, :] + nextCoordPathId = self.tree.realWorldToNodeId(nextCoord) + bestEdge = (bestEdge[0], nextCoordPathId) + try: + del self.samples[bestEdge[1]] + except(KeyError): + pass + eid = self.tree.addVertex(nextCoordPathId) + self.vertex_queue.append(eid) + if eid == self.goalId or bestEdge[0] == self.goalId or + bestEdge[1] == self.goalId: + print("Goal found") + foundGoal = True + + self.tree.addEdge(bestEdge[0], bestEdge[1]) + + g_score = self.computeDistanceCost(bestEdge[0], bestEdge[1]) + self.g_scores[bestEdge[1]] = g_score + self.g_scores[best_edge[0]] + self.f_scores[bestEdge[1]] = g_score + self.computeHeuristicCost(bestEdge[1], self.goalId) + self.updateGraph() + @@ -131,6 +260,20 @@ class BITStar(): # def prune(self, c): + def computeHeuristicCost(self, start_id, goal_id): + # Using Manhattan distance as heuristic + start = np.array(self.tree.nodeIdToRealWorldCoord(start_id)) + goal = np.array(self.tree.nodeIdToRealWorldCoord(goal_id)) + + return np.linalg.norm(start - goal, 2) + + def computeDistanceCost(self, vid, xid): + # L2 norm distance + start = np.array(self.tree.nodeIdToRealWorldCoord(vid)) + stop = np.array(self.tree.nodeIdToRealWorldCoord(xid)) + + return np.linalg.norm(stop - start, 2) + def radius(self, q): dim = len(start) #dimensions space_measure = self.minrand * self.maxrand # volume of the space @@ -144,21 +287,22 @@ class BITStar(): # Sample free space confined in the radius of ball R def informedSample(self, m, cMax, cMin, xCenter, C): - samples = [] - if cMax < float('inf'): - for i in range(m): - r = [cMax / 2.0, - math.sqrt(cMax**2 - cMin**2) / 2.0, - math.sqrt(cMax**2 - cMin**2) / 2.0] - L = np.diag(r) - xBall = self.sampleUnitBall() - rnd = np.dot(np.dot(C, L), xBall) + xCenter - rnd = [rnd[(0, 0)], rnd[(1, 0)]] - samples.append(rnd) - else: - for i in range(m): - rnd = self.sampleFreeSpace() - samples.append(rnd) + samples = dict() + for i in range(m+1): + if cMax < float('inf'): + r = [cMax / 2.0, + math.sqrt(cMax**2 - cMin**2) / 2.0, + math.sqrt(cMax**2 - cMin**2) / 2.0] + L = np.diag(r) + xBall = self.sampleUnitBall() + rnd = np.dot(np.dot(C, L), xBall) + xCenter + rnd = [rnd[(0, 0)], rnd[(1, 0)]] + random_id = self.tree.realWorldToNodeId(rnd) + samples[random_id] = rnd + else: + rnd = self.sampleFreeSpace() + random_id = self.tree.realWorldToNodeId(rnd) + samples[random_id] = rnd return samples # Sample point in a unit ball @@ -174,21 +318,72 @@ class BITStar(): return np.array([[sample[0]], [sample[1]], [0]]) def sampleFreeSpace(self): - if random.randint(0, 100) > self.goalSampleRate: - rnd = [random.uniform(self.minrand, self.maxrand), + rnd = [random.uniform(self.minrand, self.maxrand), random.uniform(self.minrand, self.maxrand)] - else: - rnd = [self.goal.x, self.goal.y] return rnd - # def bestVertexQueueValue(self): + def bestVertexQueueValue(self): + if(len(self.vertex_queue) == 0): + return float('inf') + values = [self.g_scores[v] + self.computeHeuristicCost(v, self.goalId) for v in self.vertex_queue] + values.sort() + return values[0] - # def bestEdgeQueueValue(self): + def bestEdgeQueueValue(self): + if(len(self.edge_queue)==0): + return float('inf') + # return the best value in the queue by score g_tau[v] + c(v,x) + h(x) + values = [self.g_scores[e[0]] + self.computeDistanceCost(e[0], e[1]) + + self.computeHeuristicCost(e[1], self.goalId) for e in self.edge_queue] + values.sort(reverse=True) + return values[0] - # def bestInEdgeQueue(self): + def bestInVertexQueue(self): + # return the best value in the vertex queue + v_plus_vals = [(v, self.g_scores[v] + self.computeHeuristicCost(v, self.goalId)) for v in self.vertex_queue] + v_plus_vals = sorted(v_plus_vals, key=lambda x: x[1]) + + return v_plus_vals[0][0] + + def bestInEdgeQueue(self): + e_and_values = [(e[0], e[1], self.g_scores[e[0]] + self.computeDistanceCost(e[0], e[1]) + self.computeHeuristicCost(e[1], self.goalId)) for e in self.edge_queue] + e_and_values = sorted(e_and_values, key=lambda x : x[2]) + + return (e_and_values[0][0], e_and_values[0][1]) + + def expandVertex(self, vid): + self.vertex_queue.remove(vid) + + # get the coordinates for given vid + currCoord = np.array(self.nodeIdToRealWorldCoord(vid)) + + # get the nearest value in vertex for every one in samples where difference is + # less than the radius + neigbors = [] + for sid, scoord in self.samples.items(): + scoord = np.array(scoord) + if(np.linalg.norm(scoord - currCoord, 2) <= self.r and sid != vid): + neigbors.append((sid, scoord)) + + # add the vertex to the edge queue + if vid not in self.old_vertices: + neigbors = [] + for v, edges in self.tree.vertices.items(): + if v!= vid and (v, vid) not in self.edge_queue: + vcoord = self.tree.nodeIdToRealWorldCoord(v) + if(np.linalg.norm(vcoord - currCoord, 2) <= self.r and v!=vid): + neigbors.append((vid, vcoord)) + + # add an edge to the edge queue is the path might improve the solution + for neighbor in neighbors: + sid = neighbor[0] + estimated_f_score = self.computeDistanceCost(self.startId, vid) + + self.computeHeuristicCost(sif, self.goalId) + + self.computeDistanceCost(vid, sid) + if estimated_f_score < self.g_scores[self.goalId]: + self.edge_queue.append((vid, sid)) - # def bestInVertexQueue(self): # def updateGraph(self):