mirror of
https://github.com/AtsushiSakai/PythonRobotics.git
synced 2026-04-22 03:00:22 -04:00
Implemented Flowfield Pathfinding (#408)
* Add files via upload * Add files via upload * Update test_a_star_variants.py * Update a_star_variants.py * Update a_star_variants.py * Update a_star_variants.py * Update a_star_variants.py * Update a_star_variants.py * Update a_star_variants.py * Update test_a_star_variants.py * Update a_star_variants.py * Update a_star_variants.py * Update a_star_variants.py * Update a_star_variants.py * Update a_star_variants.py * Update a_star_variants.py * Update test_a_star_variants.py * Add files via upload * Delete test_a_star_variants.py * Update test_a_star_variants_iterative_deepening.py * Update test_a_star_variants_beam_search.py * Update test_a_star_variants_dynamic_weighting.py * Update test_a_star_variants_jump_point.py * Update test_a_star_variants_theta_star.py * Update test_a_star_variants_beam_search.py * Update test_a_star_variants_beam_search.py * Update test_a_star_variants_dynamic_weighting.py * Update test_a_star_variants_iterative_deepening.py * Update test_a_star_variants_jump_point.py * Update test_a_star_variants_theta_star.py * Update test_a_star_variants_beam_search.py * Update test_a_star_variants_dynamic_weighting.py * Update test_a_star_variants_iterative_deepening.py * Update test_a_star_variants_jump_point.py * Update test_a_star_variants_theta_star.py * Update a_star_variants.py * Add files via upload * Add files via upload * Delete test_a_star_variants_beam_search.py * Delete test_a_star_variants_dynamic_weighting.py * Delete test_a_star_variants_iterative_deepening.py * Delete test_a_star_variants_jump_point.py * Delete test_a_star_variants_theta_star.py * Added requested changes * Added requested changes * Added flowfield * Added requested changes * Update flowfield.py
This commit is contained in:
223
PathPlanning/FlowField/flowfield.py
Normal file
223
PathPlanning/FlowField/flowfield.py
Normal file
@@ -0,0 +1,223 @@
|
||||
"""
|
||||
flowfield pathfinding
|
||||
author: Sarim Mehdi (muhammadsarim.mehdi@studio.unibo.it)
|
||||
Source: https://leifnode.com/2013/12/flow-field-pathfinding/
|
||||
"""
|
||||
|
||||
import numpy as np
|
||||
import matplotlib.pyplot as plt
|
||||
|
||||
show_animation = True
|
||||
|
||||
|
||||
def draw_horizontal_line(start_x, start_y, length, o_x, o_y, o_dict, path):
|
||||
for i in range(start_x, start_x + length):
|
||||
for j in range(start_y, start_y + 2):
|
||||
o_x.append(i)
|
||||
o_y.append(j)
|
||||
o_dict[(i, j)] = path
|
||||
|
||||
|
||||
def draw_vertical_line(start_x, start_y, length, o_x, o_y, o_dict, path):
|
||||
for i in range(start_x, start_x + 2):
|
||||
for j in range(start_y, start_y + length):
|
||||
o_x.append(i)
|
||||
o_y.append(j)
|
||||
o_dict[(i, j)] = path
|
||||
|
||||
|
||||
class FlowField:
|
||||
def __init__(self, obs_grid, goal_x, goal_y, start_x, start_y,
|
||||
limit_x, limit_y):
|
||||
self.start_pt = [start_x, start_y]
|
||||
self.goal_pt = [goal_x, goal_y]
|
||||
self.obs_grid = obs_grid
|
||||
self.limit_x, self.limit_y = limit_x, limit_y
|
||||
self.cost_field = {}
|
||||
self.integration_field = {}
|
||||
self.vector_field = {}
|
||||
|
||||
def find_path(self):
|
||||
self.create_cost_field()
|
||||
self.create_integration_field()
|
||||
self.assign_vectors()
|
||||
self.follow_vectors()
|
||||
|
||||
def create_cost_field(self):
|
||||
"""Assign cost to each grid which defines the energy
|
||||
it would take to get there."""
|
||||
for i in range(self.limit_x):
|
||||
for j in range(self.limit_y):
|
||||
if self.obs_grid[(i, j)] == 'free':
|
||||
self.cost_field[(i, j)] = 1
|
||||
elif self.obs_grid[(i, j)] == 'medium':
|
||||
self.cost_field[(i, j)] = 7
|
||||
elif self.obs_grid[(i, j)] == 'hard':
|
||||
self.cost_field[(i, j)] = 20
|
||||
elif self.obs_grid[(i, j)] == 'obs':
|
||||
continue
|
||||
|
||||
if [i, j] == self.goal_pt:
|
||||
self.cost_field[(i, j)] = 0
|
||||
|
||||
def create_integration_field(self):
|
||||
"""Start from the goal node and calculate the value
|
||||
of the integration field at each node. Start by
|
||||
assigning a value of infinity to every node except
|
||||
the goal node which is assigned a value of 0. Put the
|
||||
goal node in the open list and then get its neighbors
|
||||
(must not be obstacles). For each neighbor, the new
|
||||
cost is equal to the cost of the current node in the
|
||||
integration field (in the beginning, this will simply
|
||||
be the goal node) + the cost of the neighbor in the
|
||||
cost field + the extra cost (optional). The new cost
|
||||
is only assigned if it is less than the previously
|
||||
assigned cost of the node in the integration field and,
|
||||
when that happens, the neighbor is put on the open list.
|
||||
This process continues until the open list is empty."""
|
||||
for i in range(self.limit_x):
|
||||
for j in range(self.limit_y):
|
||||
if self.obs_grid[(i, j)] == 'obs':
|
||||
continue
|
||||
self.integration_field[(i, j)] = np.inf
|
||||
if [i, j] == self.goal_pt:
|
||||
self.integration_field[(i, j)] = 0
|
||||
|
||||
open_list = [(self.goal_pt, 0)]
|
||||
while open_list:
|
||||
curr_pos, curr_cost = open_list[0]
|
||||
curr_x, curr_y = curr_pos
|
||||
for i in range(-1, 2):
|
||||
for j in range(-1, 2):
|
||||
x, y = curr_x + i, curr_y + j
|
||||
if self.obs_grid[(x, y)] == 'obs':
|
||||
continue
|
||||
if (i, j) in [(1, 0), (0, 1), (-1, 0), (0, -1)]:
|
||||
e_cost = 10
|
||||
else:
|
||||
e_cost = 14
|
||||
neighbor_energy = self.cost_field[(x, y)]
|
||||
neighbor_old_cost = self.integration_field[(x, y)]
|
||||
neighbor_new_cost = curr_cost + neighbor_energy + e_cost
|
||||
if neighbor_new_cost < neighbor_old_cost:
|
||||
self.integration_field[(x, y)] = neighbor_new_cost
|
||||
open_list.append(([x, y], neighbor_new_cost))
|
||||
del open_list[0]
|
||||
|
||||
def assign_vectors(self):
|
||||
"""For each node, assign a vector from itself to the node with
|
||||
the lowest cost in the integration field. An agent will simply
|
||||
follow this vector field to the goal"""
|
||||
for i in range(self.limit_x):
|
||||
for j in range(self.limit_y):
|
||||
if self.obs_grid[(i, j)] == 'obs':
|
||||
continue
|
||||
if [i, j] == self.goal_pt:
|
||||
self.vector_field[(i, j)] = (None, None)
|
||||
continue
|
||||
offset_list = [(i + a, j + b)
|
||||
for a in range(-1, 2)
|
||||
for b in range(-1, 2)]
|
||||
neighbor_list = [{'loc': pt,
|
||||
'cost': self.integration_field[pt]}
|
||||
for pt in offset_list
|
||||
if self.obs_grid[pt] != 'obs']
|
||||
neighbor_list = sorted(neighbor_list, key=lambda x: x['cost'])
|
||||
best_neighbor = neighbor_list[0]['loc']
|
||||
self.vector_field[(i, j)] = best_neighbor
|
||||
|
||||
def follow_vectors(self):
|
||||
curr_x, curr_y = self.start_pt
|
||||
while curr_x is not None and curr_y is not None:
|
||||
plt.plot(curr_x, curr_y, "b*")
|
||||
curr_x, curr_y = self.vector_field[(curr_x, curr_y)]
|
||||
plt.pause(0.001)
|
||||
if show_animation:
|
||||
plt.show()
|
||||
|
||||
|
||||
def main():
|
||||
# set obstacle positions
|
||||
obs_dict = {}
|
||||
for i in range(51):
|
||||
for j in range(51):
|
||||
obs_dict[(i, j)] = 'free'
|
||||
o_x, o_y, m_x, m_y, h_x, h_y = [], [], [], [], [], []
|
||||
|
||||
s_x = 5.0
|
||||
s_y = 5.0
|
||||
g_x = 35.0
|
||||
g_y = 45.0
|
||||
|
||||
# draw outer border of maze
|
||||
draw_vertical_line(0, 0, 50, o_x, o_y, obs_dict, 'obs')
|
||||
draw_vertical_line(48, 0, 50, o_x, o_y, obs_dict, 'obs')
|
||||
draw_horizontal_line(0, 0, 50, o_x, o_y, obs_dict, 'obs')
|
||||
draw_horizontal_line(0, 48, 50, o_x, o_y, obs_dict, 'obs')
|
||||
|
||||
# draw inner walls
|
||||
all_x = [10, 10, 10, 15, 20, 20, 30, 30, 35, 30, 40, 45]
|
||||
all_y = [10, 30, 45, 20, 5, 40, 10, 40, 5, 40, 10, 25]
|
||||
all_len = [10, 10, 5, 10, 10, 5, 20, 10, 25, 10, 35, 15]
|
||||
for x, y, l in zip(all_x, all_y, all_len):
|
||||
draw_vertical_line(x, y, l, o_x, o_y, obs_dict, 'obs')
|
||||
|
||||
all_x[:], all_y[:], all_len[:] = [], [], []
|
||||
all_x = [35, 40, 15, 10, 45, 20, 10, 15, 25, 45, 10, 30, 10, 40]
|
||||
all_y = [5, 10, 15, 20, 20, 25, 30, 35, 35, 35, 40, 40, 45, 45]
|
||||
all_len = [10, 5, 10, 10, 5, 5, 10, 5, 10, 5, 10, 5, 5, 5]
|
||||
for x, y, l in zip(all_x, all_y, all_len):
|
||||
draw_horizontal_line(x, y, l, o_x, o_y, obs_dict, 'obs')
|
||||
|
||||
# Some points are assigned a slightly higher energy value in the cost
|
||||
# field. For example, if an agent wishes to go to a point, it might
|
||||
# encounter different kind of terrain like grass and dirt. Grass is
|
||||
# assigned medium difficulty of passage (color coded as green on the
|
||||
# map here). Dirt is assigned hard difficulty of passage (color coded
|
||||
# as brown here). Hence, this algorithm will take into account how
|
||||
# difficult it is to go through certain areas of a map when deciding
|
||||
# the shortest path.
|
||||
|
||||
# draw paths that have medium difficulty (in terms of going through them)
|
||||
all_x[:], all_y[:], all_len[:] = [], [], []
|
||||
all_x = [10, 45]
|
||||
all_y = [22, 20]
|
||||
all_len = [8, 5]
|
||||
for x, y, l in zip(all_x, all_y, all_len):
|
||||
draw_vertical_line(x, y, l, m_x, m_y, obs_dict, 'medium')
|
||||
|
||||
all_x[:], all_y[:], all_len[:] = [], [], []
|
||||
all_x = [20, 30, 42] + [47] * 5
|
||||
all_y = [35, 30, 38] + [37 + i for i in range(2)]
|
||||
all_len = [5, 7, 3] + [1] * 3
|
||||
for x, y, l in zip(all_x, all_y, all_len):
|
||||
draw_horizontal_line(x, y, l, m_x, m_y, obs_dict, 'medium')
|
||||
|
||||
# draw paths that have hard difficulty (in terms of going through them)
|
||||
all_x[:], all_y[:], all_len[:] = [], [], []
|
||||
all_x = [15, 20, 35]
|
||||
all_y = [45, 20, 35]
|
||||
all_len = [3, 5, 7]
|
||||
for x, y, l in zip(all_x, all_y, all_len):
|
||||
draw_vertical_line(x, y, l, h_x, h_y, obs_dict, 'hard')
|
||||
|
||||
all_x[:], all_y[:], all_len[:] = [], [], []
|
||||
all_x = [30] + [47] * 5
|
||||
all_y = [10] + [37 + i for i in range(2)]
|
||||
all_len = [5] + [1] * 3
|
||||
for x, y, l in zip(all_x, all_y, all_len):
|
||||
draw_horizontal_line(x, y, l, h_x, h_y, obs_dict, 'hard')
|
||||
|
||||
plt.plot(o_x, o_y, "sr")
|
||||
plt.plot(m_x, m_y, "sg")
|
||||
plt.plot(h_x, h_y, "sy")
|
||||
plt.plot(s_x, s_y, "og")
|
||||
plt.plot(g_x, g_y, "o")
|
||||
plt.grid(True)
|
||||
|
||||
flow_obj = FlowField(obs_dict, g_x, g_y, s_x, s_y, 50, 50)
|
||||
flow_obj.find_path()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
17
tests/test_flow_field.py
Normal file
17
tests/test_flow_field.py
Normal file
@@ -0,0 +1,17 @@
|
||||
import PathPlanning.FlowField.flowfield as flowfield
|
||||
from unittest import TestCase
|
||||
import sys
|
||||
import os
|
||||
sys.path.append(os.path.dirname(__file__) + "/../")
|
||||
|
||||
|
||||
class Test(TestCase):
|
||||
|
||||
def test(self):
|
||||
flowfield.show_animation = False
|
||||
flowfield.main()
|
||||
|
||||
|
||||
if __name__ == '__main__': # pragma: no cover
|
||||
test = Test()
|
||||
test.test()
|
||||
Reference in New Issue
Block a user