mirror of
https://github.com/AtsushiSakai/PythonRobotics.git
synced 2026-01-12 23:58:04 -05:00
228 lines
8.7 KiB
Python
228 lines
8.7 KiB
Python
"""
|
|
flowfield pathfinding
|
|
author: Sarim Mehdi (muhammadsarim.mehdi@studio.unibo.it)
|
|
Source: https://leifnode.com/2013/12/flow-field-pathfinding/
|
|
"""
|
|
|
|
import numpy as np
|
|
import matplotlib.pyplot as plt
|
|
|
|
show_animation = True
|
|
|
|
|
|
def draw_horizontal_line(start_x, start_y, length, o_x, o_y, o_dict, path):
|
|
for i in range(start_x, start_x + length):
|
|
for j in range(start_y, start_y + 2):
|
|
o_x.append(i)
|
|
o_y.append(j)
|
|
o_dict[(i, j)] = path
|
|
|
|
|
|
def draw_vertical_line(start_x, start_y, length, o_x, o_y, o_dict, path):
|
|
for i in range(start_x, start_x + 2):
|
|
for j in range(start_y, start_y + length):
|
|
o_x.append(i)
|
|
o_y.append(j)
|
|
o_dict[(i, j)] = path
|
|
|
|
|
|
class FlowField:
|
|
def __init__(self, obs_grid, goal_x, goal_y, start_x, start_y,
|
|
limit_x, limit_y):
|
|
self.start_pt = [start_x, start_y]
|
|
self.goal_pt = [goal_x, goal_y]
|
|
self.obs_grid = obs_grid
|
|
self.limit_x, self.limit_y = limit_x, limit_y
|
|
self.cost_field = {}
|
|
self.integration_field = {}
|
|
self.vector_field = {}
|
|
|
|
def find_path(self):
|
|
self.create_cost_field()
|
|
self.create_integration_field()
|
|
self.assign_vectors()
|
|
self.follow_vectors()
|
|
|
|
def create_cost_field(self):
|
|
"""Assign cost to each grid which defines the energy
|
|
it would take to get there."""
|
|
for i in range(self.limit_x):
|
|
for j in range(self.limit_y):
|
|
if self.obs_grid[(i, j)] == 'free':
|
|
self.cost_field[(i, j)] = 1
|
|
elif self.obs_grid[(i, j)] == 'medium':
|
|
self.cost_field[(i, j)] = 7
|
|
elif self.obs_grid[(i, j)] == 'hard':
|
|
self.cost_field[(i, j)] = 20
|
|
elif self.obs_grid[(i, j)] == 'obs':
|
|
continue
|
|
|
|
if [i, j] == self.goal_pt:
|
|
self.cost_field[(i, j)] = 0
|
|
|
|
def create_integration_field(self):
|
|
"""Start from the goal node and calculate the value
|
|
of the integration field at each node. Start by
|
|
assigning a value of infinity to every node except
|
|
the goal node which is assigned a value of 0. Put the
|
|
goal node in the open list and then get its neighbors
|
|
(must not be obstacles). For each neighbor, the new
|
|
cost is equal to the cost of the current node in the
|
|
integration field (in the beginning, this will simply
|
|
be the goal node) + the cost of the neighbor in the
|
|
cost field + the extra cost (optional). The new cost
|
|
is only assigned if it is less than the previously
|
|
assigned cost of the node in the integration field and,
|
|
when that happens, the neighbor is put on the open list.
|
|
This process continues until the open list is empty."""
|
|
for i in range(self.limit_x):
|
|
for j in range(self.limit_y):
|
|
if self.obs_grid[(i, j)] == 'obs':
|
|
continue
|
|
self.integration_field[(i, j)] = np.inf
|
|
if [i, j] == self.goal_pt:
|
|
self.integration_field[(i, j)] = 0
|
|
|
|
open_list = [(self.goal_pt, 0)]
|
|
while open_list:
|
|
curr_pos, curr_cost = open_list[0]
|
|
curr_x, curr_y = curr_pos
|
|
for i in range(-1, 2):
|
|
for j in range(-1, 2):
|
|
x, y = curr_x + i, curr_y + j
|
|
if self.obs_grid[(x, y)] == 'obs':
|
|
continue
|
|
if (i, j) in [(1, 0), (0, 1), (-1, 0), (0, -1)]:
|
|
e_cost = 10
|
|
else:
|
|
e_cost = 14
|
|
neighbor_energy = self.cost_field[(x, y)]
|
|
neighbor_old_cost = self.integration_field[(x, y)]
|
|
neighbor_new_cost = curr_cost + neighbor_energy + e_cost
|
|
if neighbor_new_cost < neighbor_old_cost:
|
|
self.integration_field[(x, y)] = neighbor_new_cost
|
|
open_list.append(([x, y], neighbor_new_cost))
|
|
del open_list[0]
|
|
|
|
def assign_vectors(self):
|
|
"""For each node, assign a vector from itself to the node with
|
|
the lowest cost in the integration field. An agent will simply
|
|
follow this vector field to the goal"""
|
|
for i in range(self.limit_x):
|
|
for j in range(self.limit_y):
|
|
if self.obs_grid[(i, j)] == 'obs':
|
|
continue
|
|
if [i, j] == self.goal_pt:
|
|
self.vector_field[(i, j)] = (None, None)
|
|
continue
|
|
offset_list = [(i + a, j + b)
|
|
for a in range(-1, 2)
|
|
for b in range(-1, 2)]
|
|
neighbor_list = [{'loc': pt,
|
|
'cost': self.integration_field[pt]}
|
|
for pt in offset_list
|
|
if self.obs_grid[pt] != 'obs']
|
|
neighbor_list = sorted(neighbor_list, key=lambda x: x['cost'])
|
|
best_neighbor = neighbor_list[0]['loc']
|
|
self.vector_field[(i, j)] = best_neighbor
|
|
|
|
def follow_vectors(self):
|
|
curr_x, curr_y = self.start_pt
|
|
while curr_x is not None and curr_y is not None:
|
|
curr_x, curr_y = self.vector_field[(curr_x, curr_y)]
|
|
|
|
if show_animation:
|
|
plt.plot(curr_x, curr_y, "b*")
|
|
plt.pause(0.001)
|
|
|
|
if show_animation:
|
|
plt.show()
|
|
|
|
|
|
def main():
|
|
# set obstacle positions
|
|
obs_dict = {}
|
|
for i in range(51):
|
|
for j in range(51):
|
|
obs_dict[(i, j)] = 'free'
|
|
o_x, o_y, m_x, m_y, h_x, h_y = [], [], [], [], [], []
|
|
|
|
s_x = 5.0
|
|
s_y = 5.0
|
|
g_x = 35.0
|
|
g_y = 45.0
|
|
|
|
# draw outer border of maze
|
|
draw_vertical_line(0, 0, 50, o_x, o_y, obs_dict, 'obs')
|
|
draw_vertical_line(48, 0, 50, o_x, o_y, obs_dict, 'obs')
|
|
draw_horizontal_line(0, 0, 50, o_x, o_y, obs_dict, 'obs')
|
|
draw_horizontal_line(0, 48, 50, o_x, o_y, obs_dict, 'obs')
|
|
|
|
# draw inner walls
|
|
all_x = [10, 10, 10, 15, 20, 20, 30, 30, 35, 30, 40, 45]
|
|
all_y = [10, 30, 45, 20, 5, 40, 10, 40, 5, 40, 10, 25]
|
|
all_len = [10, 10, 5, 10, 10, 5, 20, 10, 25, 10, 35, 15]
|
|
for x, y, l in zip(all_x, all_y, all_len):
|
|
draw_vertical_line(x, y, l, o_x, o_y, obs_dict, 'obs')
|
|
|
|
all_x[:], all_y[:], all_len[:] = [], [], []
|
|
all_x = [35, 40, 15, 10, 45, 20, 10, 15, 25, 45, 10, 30, 10, 40]
|
|
all_y = [5, 10, 15, 20, 20, 25, 30, 35, 35, 35, 40, 40, 45, 45]
|
|
all_len = [10, 5, 10, 10, 5, 5, 10, 5, 10, 5, 10, 5, 5, 5]
|
|
for x, y, l in zip(all_x, all_y, all_len):
|
|
draw_horizontal_line(x, y, l, o_x, o_y, obs_dict, 'obs')
|
|
|
|
# Some points are assigned a slightly higher energy value in the cost
|
|
# field. For example, if an agent wishes to go to a point, it might
|
|
# encounter different kind of terrain like grass and dirt. Grass is
|
|
# assigned medium difficulty of passage (color coded as green on the
|
|
# map here). Dirt is assigned hard difficulty of passage (color coded
|
|
# as brown here). Hence, this algorithm will take into account how
|
|
# difficult it is to go through certain areas of a map when deciding
|
|
# the shortest path.
|
|
|
|
# draw paths that have medium difficulty (in terms of going through them)
|
|
all_x[:], all_y[:], all_len[:] = [], [], []
|
|
all_x = [10, 45]
|
|
all_y = [22, 20]
|
|
all_len = [8, 5]
|
|
for x, y, l in zip(all_x, all_y, all_len):
|
|
draw_vertical_line(x, y, l, m_x, m_y, obs_dict, 'medium')
|
|
|
|
all_x[:], all_y[:], all_len[:] = [], [], []
|
|
all_x = [20, 30, 42] + [47] * 5
|
|
all_y = [35, 30, 38] + [37 + i for i in range(2)]
|
|
all_len = [5, 7, 3] + [1] * 3
|
|
for x, y, l in zip(all_x, all_y, all_len):
|
|
draw_horizontal_line(x, y, l, m_x, m_y, obs_dict, 'medium')
|
|
|
|
# draw paths that have hard difficulty (in terms of going through them)
|
|
all_x[:], all_y[:], all_len[:] = [], [], []
|
|
all_x = [15, 20, 35]
|
|
all_y = [45, 20, 35]
|
|
all_len = [3, 5, 7]
|
|
for x, y, l in zip(all_x, all_y, all_len):
|
|
draw_vertical_line(x, y, l, h_x, h_y, obs_dict, 'hard')
|
|
|
|
all_x[:], all_y[:], all_len[:] = [], [], []
|
|
all_x = [30] + [47] * 5
|
|
all_y = [10] + [37 + i for i in range(2)]
|
|
all_len = [5] + [1] * 3
|
|
for x, y, l in zip(all_x, all_y, all_len):
|
|
draw_horizontal_line(x, y, l, h_x, h_y, obs_dict, 'hard')
|
|
|
|
if show_animation:
|
|
plt.plot(o_x, o_y, "sr")
|
|
plt.plot(m_x, m_y, "sg")
|
|
plt.plot(h_x, h_y, "sy")
|
|
plt.plot(s_x, s_y, "og")
|
|
plt.plot(g_x, g_y, "o")
|
|
plt.grid(True)
|
|
|
|
flow_obj = FlowField(obs_dict, g_x, g_y, s_x, s_y, 50, 50)
|
|
flow_obj.find_path()
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|