""" Hybrid A* path planning author: Zheng Zh (@Zhengzh) """ import heapq import scipy.spatial import numpy as np import math import matplotlib.pyplot as plt import sys sys.path.append("../ReedsSheppPath/") try: from a_star_heuristic import dp_planning # , calc_obstacle_map import reeds_shepp_path_planning as rs from car import move, check_car_collision, MAX_STEER, WB, plot_car except: raise XY_GRID_RESOLUTION = 2.0 # [m] YAW_GRID_RESOLUTION = np.deg2rad(15.0) # [rad] MOTION_RESOLUTION = 0.1 # [m] path interporate resolution N_STEER = 20.0 # number of steer command H_COST = 1.0 VR = 1.0 # robot radius SB_COST = 100.0 # switch back penalty cost BACK_COST = 5.0 # backward penalty cost STEER_CHANGE_COST = 5.0 # steer angle change penalty cost STEER_COST = 1.0 # steer angle change penalty cost H_COST = 5.0 # Heuristic cost show_animation = True class Node: def __init__(self, xind, yind, yawind, direction, xlist, ylist, yawlist, directions, steer=0.0, pind=None, cost=None): self.xind = xind self.yind = yind self.yawind = yawind self.direction = direction self.xlist = xlist self.ylist = ylist self.yawlist = yawlist self.directions = directions self.steer = steer self.pind = pind self.cost = cost class Path: def __init__(self, xlist, ylist, yawlist, directionlist, cost): self.xlist = xlist self.ylist = ylist self.yawlist = yawlist self.directionlist = directionlist self.cost = cost class KDTree: """ Nearest neighbor search class with KDTree """ def __init__(self, data): # store kd-tree self.tree = scipy.spatial.cKDTree(data) def search(self, inp, k=1): """ Search NN inp: input data, single frame or multi frame """ if len(inp.shape) >= 2: # multi input index = [] dist = [] for i in inp.T: idist, iindex = self.tree.query(i, k=k) index.append(iindex) dist.append(idist) return index, dist dist, index = self.tree.query(inp, k=k) return index, dist def search_in_distance(self, inp, r): """ find points with in a distance r """ index = self.tree.query_ball_point(inp, r) return index class Config: def __init__(self, ox, oy, xyreso, yawreso): min_x_m = min(ox) min_y_m = min(oy) max_x_m = max(ox) max_y_m = max(oy) ox.append(min_x_m) oy.append(min_y_m) ox.append(max_x_m) oy.append(max_y_m) self.minx = round(min_x_m / xyreso) self.miny = round(min_y_m / xyreso) self.maxx = round(max_x_m / xyreso) self.maxy = round(max_y_m / xyreso) self.xw = round(self.maxx - self.minx) self.yw = round(self.maxy - self.miny) self.minyaw = round(- math.pi / yawreso) - 1 self.maxyaw = round(math.pi / yawreso) self.yaww = round(self.maxyaw - self.minyaw) def calc_motion_inputs(): for steer in np.concatenate((np.linspace(-MAX_STEER, MAX_STEER, N_STEER),[0.0])): for d in [1, -1]: yield [steer, d] def get_neighbors(current, config, ox, oy, kdtree): for steer, d in calc_motion_inputs(): node = calc_next_node(current, steer, d, config, ox, oy, kdtree) if node and verify_index(node, config): yield node def calc_next_node(current, steer, direction, config, ox, oy, kdtree): x, y, yaw = current.xlist[-1], current.ylist[-1], current.yawlist[-1] arc_l = XY_GRID_RESOLUTION * 1.5 xlist, ylist, yawlist = [], [], [] for _ in np.arange(0, arc_l, MOTION_RESOLUTION): x, y, yaw = move(x, y, yaw, MOTION_RESOLUTION * direction, steer) xlist.append(x) ylist.append(y) yawlist.append(yaw) if not check_car_collision(xlist, ylist, yawlist, ox, oy, kdtree): return None d = direction == 1 xind = round(x / XY_GRID_RESOLUTION) yind = round(y / XY_GRID_RESOLUTION) yawind = round(yaw / YAW_GRID_RESOLUTION) addedcost = 0.0 if d != current.direction: addedcost += SB_COST # steer penalty addedcost += STEER_COST * abs(steer) # steer change penalty addedcost += STEER_CHANGE_COST * abs(current.steer - steer) cost = current.cost + addedcost + arc_l node = Node(xind, yind, yawind, d, xlist, ylist, yawlist, [d], pind=calc_index(current, config), cost=cost, steer=steer) return node def is_same_grid(n1, n2): if n1.xind == n2.xind and n1.yind == n2.yind and n1.yawind == n2.yawind: return True return False def analytic_expantion(current, goal, c, ox, oy, kdtree): sx = current.xlist[-1] sy = current.ylist[-1] syaw = current.yawlist[-1] gx = goal.xlist[-1] gy = goal.ylist[-1] gyaw = goal.yawlist[-1] max_curvature = math.tan(MAX_STEER) / WB paths = rs.calc_paths(sx, sy, syaw, gx, gy, gyaw, max_curvature, step_size=MOTION_RESOLUTION) if not paths: return None best_path, best = None, None for path in paths: if check_car_collision(path.x, path.y, path.yaw, ox, oy, kdtree): cost = calc_rs_path_cost(path) if not best or best > cost: best = cost best_path = path return best_path def update_node_with_analystic_expantion(current, goal, c, ox, oy, kdtree): apath = analytic_expantion(current, goal, c, ox, oy, kdtree) if apath: plt.plot(apath.x, apath.y) fx = apath.x[1:] fy = apath.y[1:] fyaw = apath.yaw[1:] fcost = current.cost + calc_rs_path_cost(apath) fpind = calc_index(current, c) fd = [] for d in apath.directions[1:]: fd.append(d >= 0) fsteer = 0.0 fpath = Node(current.xind, current.yind, current.yawind, current.direction, fx, fy, fyaw, fd, cost=fcost, pind=fpind, steer=fsteer) return True, fpath return False, None def calc_rs_path_cost(rspath): cost = 0.0 for l in rspath.lengths: if l >= 0: # forward cost += l else: # back cost += abs(l) * BACK_COST # swich back penalty for i in range(len(rspath.lengths) - 1): if rspath.lengths[i] * rspath.lengths[i + 1] < 0.0: # switch back cost += SB_COST # steer penalyty for ctype in rspath.ctypes: if ctype != "S": # curve cost += STEER_COST * abs(MAX_STEER) # ==steer change penalty # calc steer profile nctypes = len(rspath.ctypes) ulist = [0.0] * nctypes for i in range(nctypes): if rspath.ctypes[i] == "R": ulist[i] = - MAX_STEER elif rspath.ctypes[i] == "L": ulist[i] = MAX_STEER for i in range(len(rspath.ctypes) - 1): cost += STEER_CHANGE_COST * abs(ulist[i + 1] - ulist[i]) return cost def hybrid_a_star_planning(start, goal, ox, oy, xyreso, yawreso): """ start goal ox: x position list of Obstacles [m] oy: y position list of Obstacles [m] xyreso: grid resolution [m] yawreso: yaw angle resolution [rad] """ start[2], goal[2] = rs.pi_2_pi(start[2]), rs.pi_2_pi(goal[2]) tox, toy = ox[:], oy[:] obkdtree = KDTree(np.vstack((tox, toy)).T) config = Config(tox, toy, xyreso, yawreso) nstart = Node(round(start[0] / xyreso), round(start[1] / xyreso), round(start[2] / yawreso), True, [start[0]], [start[1]], [start[2]], [True], cost=0) ngoal = Node(round(goal[0] / xyreso), round(goal[1] / xyreso), round(goal[2] / yawreso), True, [goal[0]], [goal[1]], [goal[2]], [True]) openList, closedList = {}, {} _, _, h_dp = dp_planning(nstart.xlist[-1], nstart.ylist[-1], ngoal.xlist[-1], ngoal.ylist[-1], ox, oy, xyreso, VR) pq = [] openList[calc_index(nstart, config)] = nstart heapq.heappush(pq, (calc_cost(nstart, h_dp, ngoal, config), calc_index(nstart, config))) while True: if not openList: print("Error: Cannot find path, No open set") return [], [], [] cost, c_id = heapq.heappop(pq) if c_id in openList: current = openList.pop(c_id) closedList[c_id] = current else: continue if show_animation: # pragma: no cover plt.plot(current.xlist[-1], current.ylist[-1], "xc") # for stopping simulation with the esc key. plt.gcf().canvas.mpl_connect('key_release_event', lambda event: [exit(0) if event.key == 'escape' else None]) if len(closedList.keys()) % 10 == 0: plt.pause(0.001) isupdated, fpath = update_node_with_analystic_expantion( current, ngoal, config, ox, oy, obkdtree) if isupdated: break for neighbor in get_neighbors(current, config, ox, oy, obkdtree): neighbor_index = calc_index(neighbor, config) if neighbor_index in closedList: continue if neighbor not in openList \ or openList[neighbor_index].cost > neighbor.cost: heapq.heappush( pq, (calc_cost(neighbor, h_dp, ngoal, config), neighbor_index)) openList[neighbor_index] = neighbor path = get_final_path(closedList, fpath, nstart, config) return path def calc_cost(n, h_dp, goal, c): ind = (n.yind - c.miny) * c.xw + (n.xind - c.minx) if ind not in h_dp: return n.cost + 999999999 # collision cost return n.cost + H_COST * h_dp[ind].cost def get_final_path(closed, ngoal, nstart, config): rx, ry, ryaw = list(reversed(ngoal.xlist)), list( reversed(ngoal.ylist)), list(reversed(ngoal.yawlist)) direction = list(reversed(ngoal.directions)) nid = ngoal.pind finalcost = ngoal.cost while nid: n = closed[nid] rx.extend(list(reversed(n.xlist))) ry.extend(list(reversed(n.ylist))) ryaw.extend(list(reversed(n.yawlist))) direction.extend(list(reversed(n.directions))) nid = n.pind rx = list(reversed(rx)) ry = list(reversed(ry)) ryaw = list(reversed(ryaw)) direction = list(reversed(direction)) # adjust first direction direction[0] = direction[1] path = Path(rx, ry, ryaw, direction, finalcost) return path def verify_index(node, c): xind, yind = node.xind, node.yind if xind >= c.minx and xind <= c.maxx and yind >= c.miny \ and yind <= c.maxy: return True return False def calc_index(node, c): ind = (node.yawind - c.minyaw) * c.xw * c.yw + \ (node.yind - c.miny) * c.xw + (node.xind - c.minx) if ind <= 0: print("Error(calc_index):", ind) return ind def main(): print("Start Hybrid A* planning") ox, oy = [], [] for i in range(60): ox.append(i) oy.append(0.0) for i in range(60): ox.append(60.0) oy.append(i) for i in range(61): ox.append(i) oy.append(60.0) for i in range(61): ox.append(0.0) oy.append(i) for i in range(40): ox.append(20.0) oy.append(i) for i in range(40): ox.append(40.0) oy.append(60.0 - i) # Set Initial parameters start = [10.0, 10.0, np.deg2rad(90.0)] goal = [50.0, 50.0, np.deg2rad(-90.0)] plt.plot(ox, oy, ".k") rs.plot_arrow(start[0], start[1], start[2], fc='g') rs.plot_arrow(goal[0], goal[1], goal[2]) plt.grid(True) plt.axis("equal") path = hybrid_a_star_planning( start, goal, ox, oy, XY_GRID_RESOLUTION, YAW_GRID_RESOLUTION) x = path.xlist y = path.ylist yaw = path.yawlist for ix, iy, iyaw in zip(x, y, yaw): plt.cla() plt.plot(ox, oy, ".k") plt.plot(x, y, "-r", label="Hybrid A* path") plt.grid(True) plt.axis("equal") plot_car(ix, iy, iyaw) plt.pause(0.0001) print(__file__ + " done!!") if __name__ == '__main__': main()