-
-
Notifications
You must be signed in to change notification settings - Fork 7k
Add theta star planner #1293
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
AtsushiSakai
merged 7 commits into
AtsushiSakai:master
from
Musab1Blaser:enh/theta-star-planner
Oct 22, 2025
Merged
Add theta star planner #1293
Changes from 3 commits
Commits
Show all changes
7 commits
Select commit
Hold shift + click to select a range
b7d91cb
Theta Star path compression
Musab1Blaser fa798f2
add tests and documentation for theta star algorithm
Musab1Blaser d2d8361
remove erraneous nested loop
Musab1Blaser bcfdf62
improve documentation for theta star planner
Musab1Blaser 67630e5
add reference for theta star algorithm
Musab1Blaser c3ffc08
use built-in function for norm
Musab1Blaser 2a53e67
Add Bresenham line algorithm description to docs
Musab1Blaser File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,345 @@ | ||
| """ | ||
| Theta* grid planning | ||
| author: Musab Kasbati (@Musab1Blaser) | ||
| See Wikipedia article (https://cdn.aaai.org/AAAI/2007/AAAI07-187.pdf) | ||
| """ | ||
|
|
||
| import math | ||
|
|
||
| import matplotlib.pyplot as plt | ||
|
|
||
| show_animation = True | ||
| use_theta_star = True | ||
|
|
||
|
|
||
| class ThetaStarPlanner: | ||
|
|
||
| def __init__(self, ox, oy, resolution, rr): | ||
| """ | ||
| Initialize grid map for theta star planning | ||
| ox: x position list of Obstacles [m] | ||
| oy: y position list of Obstacles [m] | ||
| resolution: grid resolution [m] | ||
| rr: robot radius[m] | ||
| """ | ||
|
|
||
| self.resolution = resolution | ||
| self.rr = rr | ||
| self.min_x, self.min_y = 0, 0 | ||
| self.max_x, self.max_y = 0, 0 | ||
| self.obstacle_map = None | ||
| self.x_width, self.y_width = 0, 0 | ||
| self.motion = self.get_motion_model() | ||
| self.calc_obstacle_map(ox, oy) | ||
|
|
||
| class Node: | ||
| def __init__(self, x, y, cost, parent_index): | ||
| self.x = x # index of grid | ||
| self.y = y # index of grid | ||
| self.cost = cost | ||
| self.parent_index = parent_index | ||
|
|
||
| def __str__(self): | ||
| return str(self.x) + "," + str(self.y) + "," + str( | ||
| self.cost) + "," + str(self.parent_index) | ||
|
|
||
| def planning(self, sx, sy, gx, gy): | ||
| """ | ||
| Theta star path search | ||
| input: | ||
| s_x: start x position [m] | ||
| s_y: start y position [m] | ||
| gx: goal x position [m] | ||
| gy: goal y position [m] | ||
| output: | ||
| rx: x position list of the final path | ||
| ry: y position list of the final path | ||
| """ | ||
|
|
||
| start_node = self.Node(self.calc_xy_index(sx, self.min_x), | ||
| self.calc_xy_index(sy, self.min_y), 0.0, -1) | ||
| goal_node = self.Node(self.calc_xy_index(gx, self.min_x), | ||
| self.calc_xy_index(gy, self.min_y), 0.0, -1) | ||
|
|
||
| open_set, closed_set = dict(), dict() | ||
| open_set[self.calc_grid_index(start_node)] = start_node | ||
|
|
||
| while True: | ||
| if len(open_set) == 0: | ||
| print("Open set is empty..") | ||
| break | ||
|
|
||
| c_id = min( | ||
| open_set, | ||
| key=lambda o: open_set[o].cost + self.calc_heuristic(goal_node, | ||
| open_set[ | ||
| o])) | ||
| current = open_set[c_id] | ||
|
|
||
| # show graph | ||
| if show_animation: # pragma: no cover | ||
| x = self.calc_grid_position(current.x, self.min_x) | ||
| y = self.calc_grid_position(current.y, self.min_y) | ||
|
|
||
| # Draw an arrow toward the parent | ||
| if current.parent_index != -1 and current.parent_index in closed_set: | ||
| parent = closed_set[current.parent_index] | ||
| px = self.calc_grid_position(parent.x, self.min_x) | ||
| py = self.calc_grid_position(parent.y, self.min_y) | ||
|
|
||
| # Vector from current to parent | ||
| dx = px - x | ||
| dy = py - y | ||
|
|
||
| # scale down vector for visibility | ||
| norm = math.sqrt(dx*dx + dy*dy) | ||
| dx /= norm | ||
| dy /= norm | ||
|
|
||
| # Draw a small arrow (scale it down for visibility) | ||
| plt.arrow(x, y, dx, dy, | ||
| head_width=0.5, head_length=0.5, | ||
| fc='c', ec='c', alpha=0.7) | ||
|
|
||
| # For stopping simulation with the esc key | ||
| plt.gcf().canvas.mpl_connect( | ||
| 'key_release_event', | ||
| lambda event: [exit(0) if event.key == 'escape' else None] | ||
| ) | ||
|
|
||
| if len(closed_set.keys()) % 10 == 0: | ||
| plt.pause(0.001) | ||
|
|
||
| if current.x == goal_node.x and current.y == goal_node.y: | ||
| print("Find goal") | ||
| goal_node.parent_index = current.parent_index | ||
| goal_node.cost = current.cost | ||
| break | ||
|
|
||
| # Remove the item from the open set | ||
| del open_set[c_id] | ||
|
|
||
| # Add it to the closed set | ||
| closed_set[c_id] = current | ||
|
|
||
| # expand_grid search grid based on motion model | ||
| for i, _ in enumerate(self.motion): | ||
| node = self.Node(current.x + self.motion[i][0], | ||
| current.y + self.motion[i][1], | ||
| current.cost + self.motion[i][2], c_id) # cost may later be updated by theta star path compression | ||
| n_id = self.calc_grid_index(node) | ||
|
|
||
| if not self.verify_node(node): | ||
| continue | ||
|
|
||
| if n_id in closed_set: | ||
| continue | ||
|
|
||
| # Theta* modification: | ||
| if use_theta_star and current.parent_index != -1 and current.parent_index in closed_set: | ||
| grandparent = closed_set[current.parent_index] | ||
| if self.line_of_sight(grandparent, node): | ||
| # If parent(current) has line of sight to neighbor | ||
| node.cost = grandparent.cost + math.hypot(node.x - grandparent.x, node.y - grandparent.y) | ||
| node.parent_index = current.parent_index # compress path directly to grandparent | ||
|
|
||
| if n_id not in open_set: | ||
| open_set[n_id] = node | ||
| else: | ||
| if open_set[n_id].cost > node.cost: | ||
| # This path is the best until now. record it | ||
| open_set[n_id] = node | ||
|
|
||
|
|
||
| rx, ry = self.calc_final_path(goal_node, closed_set) | ||
|
|
||
| return rx, ry | ||
|
|
||
| def calc_final_path(self, goal_node, closed_set): | ||
| # generate final course | ||
| rx, ry = [self.calc_grid_position(goal_node.x, self.min_x)], [ | ||
| self.calc_grid_position(goal_node.y, self.min_y)] | ||
| parent_index = goal_node.parent_index | ||
| while parent_index != -1: | ||
| n = closed_set[parent_index] | ||
| rx.append(self.calc_grid_position(n.x, self.min_x)) | ||
| ry.append(self.calc_grid_position(n.y, self.min_y)) | ||
| parent_index = n.parent_index | ||
|
|
||
| return rx, ry | ||
|
|
||
| @staticmethod | ||
| def calc_heuristic(n1, n2): | ||
| w = 1.0 # weight of heuristic | ||
| d = w * math.hypot(n1.x - n2.x, n1.y - n2.y) | ||
| return d | ||
|
|
||
| def calc_grid_position(self, index, min_position): | ||
| """ | ||
| calc grid position | ||
| :param index: | ||
| :param min_position: | ||
| :return: | ||
| """ | ||
| pos = index * self.resolution + min_position | ||
| return pos | ||
|
|
||
| def calc_xy_index(self, position, min_pos): | ||
| return round((position - min_pos) / self.resolution) | ||
|
|
||
| def calc_grid_index(self, node): | ||
| return (node.y - self.min_y) * self.x_width + (node.x - self.min_x) | ||
|
|
||
| def line_of_sight(self, node1, node2): | ||
| """ | ||
| Check if there is a direct line of sight between two nodes. | ||
| Uses Bresenham’s line algorithm for grid traversal. | ||
Musab1Blaser marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| """ | ||
| x0 = node1.x | ||
| y0 = node1.y | ||
| x1 = node2.x | ||
| y1 = node2.y | ||
|
|
||
| dx = abs(x1 - x0) | ||
| dy = abs(y1 - y0) | ||
| sx = 1 if x0 < x1 else -1 | ||
| sy = 1 if y0 < y1 else -1 | ||
|
|
||
| err = dx - dy | ||
|
|
||
| while True: | ||
| if not self.verify_node(self.Node(x0, y0, 0, -1)): | ||
| return False | ||
| if x0 == x1 and y0 == y1: | ||
| break | ||
| e2 = 2 * err | ||
| if e2 > -dy: | ||
| err -= dy | ||
| x0 += sx | ||
| if e2 < dx: | ||
| err += dx | ||
| y0 += sy | ||
| return True | ||
|
|
||
|
|
||
| def verify_node(self, node): | ||
| px = self.calc_grid_position(node.x, self.min_x) | ||
| py = self.calc_grid_position(node.y, self.min_y) | ||
|
|
||
| if px < self.min_x: | ||
| return False | ||
| elif py < self.min_y: | ||
| return False | ||
| elif px >= self.max_x: | ||
| return False | ||
| elif py >= self.max_y: | ||
| return False | ||
|
|
||
| # collision check | ||
| if self.obstacle_map[node.x][node.y]: | ||
| return False | ||
|
|
||
| return True | ||
|
|
||
| def calc_obstacle_map(self, ox, oy): | ||
|
|
||
| self.min_x = round(min(ox)) | ||
| self.min_y = round(min(oy)) | ||
| self.max_x = round(max(ox)) | ||
| self.max_y = round(max(oy)) | ||
| print("min_x:", self.min_x) | ||
| print("min_y:", self.min_y) | ||
| print("max_x:", self.max_x) | ||
| print("max_y:", self.max_y) | ||
|
|
||
| self.x_width = round((self.max_x - self.min_x) / self.resolution) | ||
| self.y_width = round((self.max_y - self.min_y) / self.resolution) | ||
| print("x_width:", self.x_width) | ||
| print("y_width:", self.y_width) | ||
|
|
||
| # obstacle map generation | ||
| self.obstacle_map = [[False for _ in range(self.y_width)] | ||
| for _ in range(self.x_width)] | ||
| for ix in range(self.x_width): | ||
| x = self.calc_grid_position(ix, self.min_x) | ||
| for iy in range(self.y_width): | ||
| y = self.calc_grid_position(iy, self.min_y) | ||
| for iox, ioy in zip(ox, oy): | ||
| d = math.hypot(iox - x, ioy - y) | ||
| if d <= self.rr: | ||
| self.obstacle_map[ix][iy] = True | ||
| break | ||
|
|
||
| @staticmethod | ||
| def get_motion_model(): | ||
| # dx, dy, cost | ||
| motion = [[1, 0, 1], | ||
| [0, 1, 1], | ||
| [-1, 0, 1], | ||
| [0, -1, 1], | ||
| [-1, -1, math.sqrt(2)], | ||
| [-1, 1, math.sqrt(2)], | ||
| [1, -1, math.sqrt(2)], | ||
| [1, 1, math.sqrt(2)]] | ||
|
|
||
| return motion | ||
|
|
||
|
|
||
| def main(): | ||
| print(__file__ + " start!!") | ||
|
|
||
| # start and goal position | ||
| sx = 10.0 # [m] | ||
| sy = 10.0 # [m] | ||
| gx = 50.0 # [m] | ||
| gy = 50.0 # [m] | ||
| grid_size = 2.0 # [m] | ||
| robot_radius = 1.0 # [m] | ||
|
|
||
| # set obstacle positions | ||
| ox, oy = [], [] | ||
| for i in range(-10, 60): | ||
| ox.append(i) | ||
| oy.append(-10.0) | ||
| for i in range(-10, 60): | ||
| ox.append(60.0) | ||
| oy.append(i) | ||
| for i in range(-10, 61): | ||
| ox.append(i) | ||
| oy.append(60.0) | ||
| for i in range(-10, 61): | ||
| ox.append(-10.0) | ||
| oy.append(i) | ||
| for i in range(-10, 40): | ||
| ox.append(20.0) | ||
| oy.append(i) | ||
| for i in range(0, 40): | ||
| ox.append(40.0) | ||
| oy.append(60.0 - i) | ||
|
|
||
| if show_animation: # pragma: no cover | ||
| plt.plot(ox, oy, ".k") | ||
| plt.plot(sx, sy, "og") | ||
| plt.plot(gx, gy, "xb") | ||
| plt.grid(True) | ||
| plt.axis("equal") | ||
|
|
||
| theta_star = ThetaStarPlanner(ox, oy, grid_size, robot_radius) | ||
| rx, ry = theta_star.planning(sx, sy, gx, gy) | ||
|
|
||
| if show_animation: # pragma: no cover | ||
| plt.plot(rx, ry, "-r") | ||
| plt.pause(0.001) | ||
| plt.show() | ||
|
|
||
|
|
||
| if __name__ == '__main__': | ||
| main() | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,11 @@ | ||
| import conftest | ||
| from PathPlanning.ThetaStar import theta_star as m | ||
|
|
||
|
|
||
| def test_1(): | ||
| m.show_animation = False | ||
| m.main() | ||
|
|
||
|
|
||
| if __name__ == '__main__': | ||
| conftest.run_this_test(__file__) |
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.