From 2ac1d9c8156eda2e6b87f501197a8ea42b649653 Mon Sep 17 00:00:00 2001 From: vss2sn <28676655+vss2sn@users.noreply.github.com> Date: Mon, 7 Jun 2021 07:28:23 -0400 Subject: [PATCH] Add D* Lite. (#511) * Add D* Lite. * Add test. Minor changes * Modified based on LGTM report * Fix linter errors * Fixes and lint. * Update README.md Made requested changes Add transform between world and grid coordinates to allow negative wold coordinates Modify to allow diagonal motion * Added display of previous and new computed path --- PathPlanning/DStarLite/d_star_lite.py | 398 ++++++++++++++++++++++++++ README.md | 13 + tests/test_d_star_lite.py | 11 + 3 files changed, 422 insertions(+) create mode 100644 PathPlanning/DStarLite/d_star_lite.py create mode 100644 tests/test_d_star_lite.py diff --git a/PathPlanning/DStarLite/d_star_lite.py b/PathPlanning/DStarLite/d_star_lite.py new file mode 100644 index 00000000..603c053d --- /dev/null +++ b/PathPlanning/DStarLite/d_star_lite.py @@ -0,0 +1,398 @@ +""" +D* Lite grid planning +author: vss2sn (28676655+vss2sn@users.noreply.github.com) +Link to papers: +D* Lite (Link: http://idm-lab.org/bib/abstracts/papers/aaai02b.pd) +Improved Fast Replanning for Robot Navigation in Unknown Terrain +(Link: http://www.cs.cmu.edu/~maxim/files/dlite_icra02.pdf) +Implemented maintaining similarity with the pseudocode for understanding. +Code can be significantly optimized by using a priority queue for U, etc. +Avoiding additional imports based on repository philosophy. +""" +import math +import matplotlib.pyplot as plt +import random + +show_animation = True +pause_time = 0.001 +p_create_random_obstacle = 0 + + +class Node: + def __init__(self, x: int = 0, y: int = 0, cost: float = 0.0): + self.x = x + self.y = y + self.cost = cost + + +def add_coordinates(node1: Node, node2: Node): + new_node = Node() + new_node.x = node1.x + node2.x + new_node.y = node1.y + node2.y + new_node.cost = node1.cost + node2.cost + return new_node + + +def compare_coordinates(node1: Node, node2: Node): + return node1.x == node2.x and node1.y == node2.y + + +class DStarLite: + + # Please adjust the heuristic function (h) if you change the list of + # possible motions + motions = [ + Node(1, 0, 1), + Node(0, 1, 1), + Node(-1, 0, 1), + Node(0, -1, 1), + Node(1, 1, math.sqrt(2)), + Node(1, -1, math.sqrt(2)), + Node(-1, 1, math.sqrt(2)), + Node(-1, -1, math.sqrt(2)) + ] + + def __init__(self, ox: list, oy: list): + # Ensure that within the algorithm implementation all node coordinates + # are indices in the grid and extend + # from 0 to abs(_max - _min) + self.x_min_world = int(min(ox)) + self.y_min_world = int(min(oy)) + self.x_max = int(abs(max(ox) - self.x_min_world)) + self.y_max = int(abs(max(oy) - self.y_min_world)) + self.obstacles = [Node(x - self.x_min_world, y - self.y_min_world) + for x, y in zip(ox, oy)] + self.start = Node(0, 0) + self.goal = Node(0, 0) + self.U = list() + self.km = 0.0 + self.kold = 0.0 + self.rhs = list() + self.g = list() + self.detected_obstacles = list() + if show_animation: + self.detected_obstacles_for_plotting_x = list() + self.detected_obstacles_for_plotting_y = list() + + def create_grid(self, val: float): + grid = list() + for _ in range(0, self.x_max): + grid_row = list() + for _ in range(0, self.y_max): + grid_row.append(val) + grid.append(grid_row) + return grid + + def is_obstacle(self, node: Node): + return any([compare_coordinates(node, obstacle) + for obstacle in self.obstacles]) or \ + any([compare_coordinates(node, obstacle) + for obstacle in self.detected_obstacles]) + + def c(self, node1: Node, node2: Node): + if self.is_obstacle(node2): + # Attempting to move from or to an obstacle + return math.inf + new_node = Node(node1.x-node2.x, node1.y-node2.y) + detected_motion = list(filter(lambda motion: + compare_coordinates(motion, new_node), + self.motions)) + return detected_motion[0].cost + + def h(self, s: Node): + # Cannot use the 2nd euclidean norm as this might sometimes generate + # heuristics that overestimate the cost, making them inadmissible, + # due to rounding errors etc (when combined with calculate_key) + # To be admissible heuristic should + # never overestimate the cost of a move + # hence not using the line below + # return math.hypot(self.start.x - s.x, self.start.y - s.y) + + # Below is the same as 1; modify if you modify the cost of each move in + # motion + # return max(abs(self.start.x - s.x), abs(self.start.y - s.y)) + return 1 + + def calculate_key(self, s: Node): + return (min(self.g[s.x][s.y], self.rhs[s.x][s.y]) + self.h(s) + + self.km, min(self.g[s.x][s.y], self.rhs[s.x][s.y])) + + def is_valid(self, node: Node): + if 0 <= node.x < self.x_max and 0 <= node.y < self.y_max: + return True + return False + + def get_neighbours(self, u: Node): + return [add_coordinates(u, motion) for motion in self.motions + if self.is_valid(add_coordinates(u, motion))] + + def pred(self, u: Node): + # Grid, so each vertex is connected to the ones around it + return self.get_neighbours(u) + + def succ(self, u: Node): + # Grid, so each vertex is connected to the ones around it + return self.get_neighbours(u) + + def initialize(self, start: Node, goal: Node): + self.start.x = start.x - self.x_min_world + self.start.y = start.y - self.y_min_world + self.goal.x = goal.x - self.x_min_world + self.goal.y = goal.y - self.y_min_world + self.U = list() # Would normally be a priority queue + self.km = 0.0 + self.rhs = self.create_grid(math.inf) + self.g = self.create_grid(math.inf) + self.rhs[self.goal.x][self.goal.y] = 0 + self.U.append((self.goal, self.calculate_key(self.goal))) + self.detected_obstacles = list() + + def update_vertex(self, u: Node): + if not compare_coordinates(u, self.goal): + self.rhs[u.x][u.y] = min([self.c(u, sprime) + + self.g[sprime.x][sprime.y] + for sprime in self.succ(u)]) + if any([compare_coordinates(u, node) for node, key in self.U]): + self.U = [(node, key) for node, key in self.U + if not compare_coordinates(node, u)] + self.U.sort(key=lambda x: x[1]) + if self.g[u.x][u.y] != self.rhs[u.x][u.y]: + self.U.append((u, self.calculate_key(u))) + self.U.sort(key=lambda x: x[1]) + + def compare_keys(self, key_pair1: tuple[float, float], + key_pair2: tuple[float, float]): + return key_pair1[0] < key_pair2[0] or \ + (key_pair1[0] == key_pair2[0] and key_pair1[1] < key_pair2[1]) + + def compute_shortest_path(self): + self.U.sort(key=lambda x: x[1]) + while (len(self.U) > 0 and + self.compare_keys(self.U[0][1], + self.calculate_key(self.start))) or \ + self.rhs[self.start.x][self.start.y] != \ + self.g[self.start.x][self.start.y]: + self.kold = self.U[0][1] + u = self.U[0][0] + self.U.pop(0) + if self.compare_keys(self.kold, self.calculate_key(u)): + self.U.append((u, self.calculate_key(u))) + self.U.sort(key=lambda x: x[1]) + elif self.g[u.x][u.y] > self.rhs[u.x][u.y]: + self.g[u.x][u.y] = self.rhs[u.x][u.y] + for s in self.pred(u): + self.update_vertex(s) + else: + self.g[u.x][u.y] = math.inf + for s in self.pred(u) + [u]: + self.update_vertex(s) + self.U.sort(key=lambda x: x[1]) + + def detect_changes(self): + changed_vertices = list() + if len(self.spoofed_obstacles) > 0: + for spoofed_obstacle in self.spoofed_obstacles[0]: + if compare_coordinates(spoofed_obstacle, self.start) or \ + compare_coordinates(spoofed_obstacle, self.goal): + continue + changed_vertices.append(spoofed_obstacle) + self.detected_obstacles.append(spoofed_obstacle) + if show_animation: + self.detected_obstacles_for_plotting_x.append( + spoofed_obstacle.x + self.x_min_world) + self.detected_obstacles_for_plotting_y.append( + spoofed_obstacle.y + self.y_min_world) + plt.plot(self.detected_obstacles_for_plotting_x, + self.detected_obstacles_for_plotting_y, ".k") + plt.pause(pause_time) + self.spoofed_obstacles.pop(0) + + # Allows random generation of obstacles + random.seed() + if random.random() > 1 - p_create_random_obstacle: + x = random.randint(0, self.x_max) + y = random.randint(0, self.y_max) + new_obs = Node(x, y) + if compare_coordinates(new_obs, self.start) or \ + compare_coordinates(new_obs, self.goal): + return changed_vertices + changed_vertices.append(Node(x, y)) + self.detected_obstacles.append(Node(x, y)) + if show_animation: + self.detected_obstacles_for_plotting_x.append(x + + self.x_min_world) + self.detected_obstacles_for_plotting_y.append(y + + self.y_min_world) + plt.plot(self.detected_obstacles_for_plotting_x, + self.detected_obstacles_for_plotting_y, ".k") + plt.pause(pause_time) + return changed_vertices + + def compute_current_path(self): + path = list() + current_point = Node(self.start.x, self.start.y) + while not compare_coordinates(current_point, self.goal): + path.append(current_point) + current_point = min(self.succ(current_point), + key=lambda sprime: + self.c(current_point, sprime) + + self.g[sprime.x][sprime.y]) + path.append(self.goal) + return path + + def compare_paths(self, path1: list, path2: list): + if len(path1) != len(path2): + return False + for node1, node2 in zip(path1, path2): + if not compare_coordinates(node1, node2): + return False + return True + + def display_path(self, path: list, colour: str, alpha: int = 1): + px = [(node.x + self.x_min_world) for node in path] + py = [(node.y + self.y_min_world) for node in path] + drawing = plt.plot(px, py, colour, alpha=alpha) + plt.pause(pause_time) + return drawing + + def main(self, start: Node, goal: Node, + spoofed_ox: list, spoofed_oy: list): + self.spoofed_obstacles = [[Node(x - self.x_min_world, + y - self.y_min_world) + for x, y in zip(rowx, rowy)] + for rowx, rowy in zip(spoofed_ox, spoofed_oy) + ] + pathx = [] + pathy = [] + self.initialize(start, goal) + last = self.start + self.compute_shortest_path() + pathx.append(self.start.x + self.x_min_world) + pathy.append(self.start.y + self.y_min_world) + + if show_animation: + current_path = self.compute_current_path() + previous_path = current_path.copy() + previous_path_image = self.display_path(previous_path, ".c", + alpha=0.3) + current_path_image = self.display_path(current_path, ".c") + + while not compare_coordinates(self.goal, self.start): + if self.g[self.start.x][self.start.y] == math.inf: + print("No path possible") + return False, pathx, pathy + self.start = min(self.succ(self.start), + key=lambda sprime: + self.c(self.start, sprime) + + self.g[sprime.x][sprime.y]) + pathx.append(self.start.x + self.x_min_world) + pathy.append(self.start.y + self.y_min_world) + if show_animation: + current_path.pop(0) + plt.plot(pathx, pathy, "-r") + plt.pause(pause_time) + changed_vertices = self.detect_changes() + if len(changed_vertices) != 0: + print("New obstacle detected") + self.km += self.h(last) + last = self.start + for u in changed_vertices: + if compare_coordinates(u, self.start): + continue + self.rhs[u.x][u.y] = math.inf + self.g[u.x][u.y] = math.inf + self.update_vertex(u) + self.compute_shortest_path() + + if show_animation: + new_path = self.compute_current_path() + if not self.compare_paths(current_path, new_path): + current_path_image[0].remove() + previous_path_image[0].remove() + previous_path = current_path.copy() + current_path = new_path.copy() + previous_path_image = self.display_path(previous_path, + ".c", + alpha=0.3) + current_path_image = self.display_path(current_path, + ".c") + plt.pause(pause_time) + print("Path found") + return True, pathx, pathy + + +def main(): + + # start and goal position + sx = 10 # [m] + sy = 10 # [m] + gx = 50 # [m] + gy = 50 # [m] + + # set obstacle positions + ox, oy = [], [] + for i in range(-10, 60): + ox.append(i) + oy.append(-10.0) + for i in range(-10, 60): + ox.append(60.0) + oy.append(i) + for i in range(-10, 61): + ox.append(i) + oy.append(60.0) + for i in range(-10, 61): + ox.append(-10.0) + oy.append(i) + for i in range(-10, 40): + ox.append(20.0) + oy.append(i) + for i in range(0, 40): + ox.append(40.0) + oy.append(60.0 - i) + + if show_animation: + plt.plot(ox, oy, ".k") + plt.plot(sx, sy, "og") + plt.plot(gx, gy, "xb") + plt.grid(True) + plt.axis("equal") + label_column = ['Start', 'Goal', 'Path taken', + 'Current computed path', 'Previous computed path', + 'Obstacles'] + columns = [plt.plot([], [], symbol, color=colour, alpha=alpha)[0] + for symbol, colour, alpha in [['o', 'g', 1], + ['x', 'b', 1], + ['-', 'r', 1], + ['.', 'c', 1], + ['.', 'c', 0.3], + ['.', 'k', 1]]] + plt.legend(columns, label_column, bbox_to_anchor=(1, 1), title="Key:", + fontsize="xx-small") + plt.plot() + plt.pause(pause_time) + + # Obstacles discovered at time = row + # time = 1, obstacles discovered at (0, 2), (9, 2), (4, 0) + # time = 2, obstacles discovered at (0, 1), (7, 7) + # ... + # when the spoofed obstacles are: + # spoofed_ox = [[0, 9, 4], [0, 7], [], [], [], [], [], [5]] + # spoofed_oy = [[2, 2, 0], [1, 7], [], [], [], [], [], [4]] + + # Reroute + # spoofed_ox = [[], [], [], [], [], [], [], [40 for _ in range(10, 21)]] + # spoofed_oy = [[], [], [], [], [], [], [], [i for i in range(10, 21)]] + + # Obstacles that demostrate large rerouting + spoofed_ox = [[], [], [], + [i for i in range(0, 21)] + [0 for _ in range(0, 20)]] + spoofed_oy = [[], [], [], + [20 for _ in range(0, 21)] + [i for i in range(0, 20)]] + + dstarlite = DStarLite(ox, oy) + dstarlite.main(Node(x=sx, y=sy), Node(x=gx, y=gy), + spoofed_ox=spoofed_ox, spoofed_oy=spoofed_oy) + + +if __name__ == "__main__": + main() diff --git a/README.md b/README.md index fac8b53d..e2eafe74 100644 --- a/README.md +++ b/README.md @@ -38,6 +38,7 @@ Python codes for robotics algorithm. * [Dijkstra algorithm](#dijkstra-algorithm) * [A* algorithm](#a-algorithm) * [D* algorithm](#d-algorithm) + * [D* Lite algorithm](#dstarlite-algorithm) * [Potential Field algorithm](#potential-field-algorithm) * [Grid based coverage path planning](#grid-based-coverage-path-planning) * [State Lattice Planning](#state-lattice-planning) @@ -314,6 +315,18 @@ Ref: - [D* Algorithm Wikipedia](https://en.wikipedia.org/wiki/D*) +### D\* Lite algorithm + +This algorithm finds the shortest path between two points while rerouting when obstacles are discovered. It has been implemented here for a 2D grid. + +![D* Lite](https://github.com/AtsushiSakai/PythonRoboticsGifs/raw/master/PathPlanning/DStarLite/animation.gif) + +The animation shows a robot finding its path and rerouting to avoid obstacles as they are discovered using the D* Lite search algorithm. + +Refs: + +- [D* Lite](http://idm-lab.org/bib/abstracts/papers/aaai02b.pd) +- [Improved Fast Replanning for Robot Navigation in Unknown Terrain](http://www.cs.cmu.edu/~maxim/files/dlite_icra02.pdf) ### Potential Field algorithm diff --git a/tests/test_d_star_lite.py b/tests/test_d_star_lite.py new file mode 100644 index 00000000..b60a140a --- /dev/null +++ b/tests/test_d_star_lite.py @@ -0,0 +1,11 @@ +import conftest +from PathPlanning.DStarLite import d_star_lite as m + + +def test_1(): + m.show_animation = False + m.main() + + +if __name__ == '__main__': + conftest.run_this_test(__file__)