Blog: Let’s be A* — Learn and Code a Path Planning algorithm to fly a Drone — Part II


In this part, we will build on theoretical base we learnt in part 1 and code the A* algorithm in Python.

Go to the profile of Percy Jaiswal
  • Unity Simulator, Python Environment Installation and Starter Files
    Before beginning to code anything, we need to get our setup and tools installed. We begin this by downloading required Unity simulator that’s appropriate for your operating system from here. Next, we get required Python libraries installed. I would encourage you to create a separate Python environment (in any IDE of your choice) so as to keep it clean and separate. Follow installation instructions from this github repo. Finally, we download this github repo to get our start files. Make sure you got your setup correct by checking if you are able to run instructions provided in Step# 4 of above provided github repo for starter files.
  • Starter Code
    To begin with this project, we are provided with two scripts, motion_planning.py and planning_utils.py. Here you’ll also find a file called colliders.csv, which contains the 2.5D map of the simulator environment. A 2.5D map is nothing but a 2D map (x, y location) with additional ‘height’ information for the obstacle. We will explore this file little more once we begin coding.
     The main function in motion_planning.py establishes a “Mavlink” connection with simulator and creates a ‘drone’ object of ‘MotionPlanning’ class. Motion Planning class contains lot of built in functions already prepared for us. It contains callback functions ‘local_position_callback’, ‘velocity_callback’, ‘state_callback’.
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument('--port', type=int, default=5760, help='Port number')
parser.add_argument('--host', type=str, default='127.0.0.1', help="host address, i.e. '127.0.0.1'")
args = parser.parse_args()
conn = MavlinkConnection('tcp:{0}:{1}'.format(args.host, args.port), timeout=60)
drone = MotionPlanning(conn)
time.sleep(1)
drone.start()

What are callback functions? A callback function is intended to be executed only when a particular event triggers it. For e.g., if you look at code line ‘self.register_callback(MsgID.LOCAL_POSITION, self.local_position_callback)’ what it is doing is registering MsgID.LOCAL_POSITION and self.local_position_callback as key-value pair in a python dictionary (implemented inside ‘self.register_callback’ function). Now whenever value in ‘MsgID.LOCAL_POSITION’ variable changes, it will trigger a call to ‘self.local_position_callback’ function. Hence it’s called a callback function. Because the function is not called during normal ‘sequential’ flow of program, but instead it gets called only when a particular event has happened, which is change in value of ‘MsgID.LOCAL_POSITION’ in this particular case.

class MotionPlanning(Drone):
def __init__(self, connection):
super().__init__(connection)
 self.target_position = np.array([0.0, 0.0, 0.0])
self.waypoints = []
self.in_mission = True
self.check_state = {}
 # initial state
self.flight_state = States.MANUAL
 # register all your callbacks here
self.register_callback(MsgID.LOCAL_POSITION, self.local_position_callback)
self.register_callback(MsgID.LOCAL_VELOCITY, self.velocity_callback)
self.register_callback(MsgID.STATE, self.state_callback)

This all callback functions form the core structure of overall program of flying drone. Various operations like take-off, landing, transitioning to waypoint following state, arming and disarming drone along with required callback functions are taken care of in this starter code. You can go over motion_planning.py file and understand this functions, but as the primary goal of this article is to explain the code for ‘plan_path’ function, we will not dwell too much on starter code. Once a drone has been armed, it has taken off successfully and transitioned into waypoint following state, we need to start sending it one waypoint at a time, which will be generated by our ‘plan_path’ function before drone even takes off.

Once we make sure our drone is armed (basically a system check to make sure all propellers and other systems are operating as per expectation), the first thing we done before taking off is prepare a valid path to follow. To begin simple, we plan to fly at constant height. We will also be keeping a ‘Safety_distance’ from obstacles in our map / world.

TARGET_ALTITUDE = 5
SAFETY_DISTANCE = 5

  • Creat_Grid
    In order to create a path, we need a map, and that is provided to us in ‘colliders.csv’ file. Opening the file, we can see that first row contains ‘lat0’ and ‘lon0’ values. These are home location or 0, 0, 0 for our map.

Following lines read home latitude and longitude provided in 1st row of colliders.csv and set it as our home position.

# TODO: read lat0, lon0 from colliders into floating point values
lat0_lon0 = pd.read_csv('colliders.csv', nrows = 1, header = None)
lat0, lon0 = lat0_lon0.iloc[0,0], lat0_lon0.iloc[0,1]
_, lat0 = lat0.split()
_, lon0 = lon0.split()
lat0 = np.float64(lat0)
lon0 = np.float64(lon0)
# TODO: set home position to (lat0, lon0, 0)
self.set_home_position(lon0, lat0, 0)

When then check our current global position and convert it into local position, which will be with respect to home position we set earlier. We then print our current local and global position along with global home position value.

# TODO: retrieve current global position 
# TODO: convert to current local position using global_to_local()
current_local_pos = global_to_local(self.global_position, self.global_home)

# Me: checking current local position
print ('current local position {0} and {1}'.format(current_local_pos[0], current_local_pos[1]))

print('global home {0}, position {1}, local position {2}'.format(self.global_home, self.global_position,
self.local_position))

Next we work on defining our map in grid format that we studied in part 1 of this series. We first read complete data from colliders.csv, and then pass that numpy array to function called ‘create_grid’.

# Read in obstacle map
data = np.loadtxt('colliders.csv', delimiter=',', dtype='Float64', skiprows=3)

# Define a grid for a particular altitude and safety margin around obstacles
grid, north_offset, east_offset = create_grid(data, TARGET_ALTITUDE, SAFETY_DISTANCE)

Before we understand how ‘create_grid’ will create our grid map, we need to understand the format in which data is provided in colliders.csv. If we look at the heading for each column (as shown in row2 of image2), we see that we are given 3 positional values of a point — its x, y and z coordinates, and 3 distance values. The x, y and z coordinates are coordinates of a center point for an obstacle and 3 distances are its width, length and height values. Actually they are half of height, width and length, as they are distance from center point to one side of obstacle. Below diagram explains how both center point and dimensions values for obstacles are provided.

We will be creating maps similar to 2.5D structure explained in part 1. In fact, we will be going one step further. We have already fixed the flying altitude for our drone with ‘TARGET_ALTITUDE’ variable. So while creating map, we will only include an obstacle in our map if its height (PozZ + 2 * halfSizeZ) is greater than ‘TARGET_ALTITUDE’. If height of the obstacle is less than ‘TARGET_ALTITUDE’ we will just assume it to be a free space as our drone can fly over it. This approach greatly reduces the space and processing complexity that our map needs.

With height parameter sorted out, we need to determine size of the map which we need to create. For this, we first determine the closest and furthest point from our origin in both X and Y direction. Consider X direction to be North and Y direction to be East, we can get value of closet and furthest point in both directions as follows:

# minimum and maximum north coordinates
north_min = np.floor(np.min(data[:, 0] - data[:, 3]))
north_max = np.ceil(np.max(data[:, 0] + data[:, 3]))
# minimum and maximum east coordinates
east_min = np.floor(np.min(data[:, 1] - data[:, 4]))
east_max = np.ceil(np.max(data[:, 1] + data[:, 4]))
# given the minimum and maximum coordinates we can
# calculate the size of the grid.
north_size = int(np.ceil((north_max - north_min + 1)))
east_size = int(np.ceil((east_max - east_min + 1)))

And finally we initialize our grid map as numpy zeros array

# Initialize an empty grid
grid = np.zeros((north_size, east_size))

Next, we will iterate over each obstacle from our colliders.csv data, which we have already captured in variable ‘data’. We will first compare whether obstacle’s height is above ‘TARGET_ALTITUDE’, and if it is, then we will mark the area occupied by the obstacle + a safety distance we configured in variable ‘SAFETY_DISTANCE’ as 1 in our grid world. Finally, we return our grid world along with north_min and east_min values.

# Populate the grid with obstacles
for i in range(data.shape[0]):
north, east, alt, d_north, d_east, d_alt = data[i, :]
if alt + d_alt + safety_distance > drone_altitude:
obstacle = [
int(np.clip(north - d_north - safety_distance - north_min, 0, north_size-1)),
int(np.clip(north + d_north + safety_distance - north_min, 0, north_size-1)),
int(np.clip(east - d_east - safety_distance - east_min, 0, east_size-1)),
int(np.clip(east + d_east + safety_distance - east_min, 0, east_size-1)),
]
grid[obstacle[0]:obstacle[1]+1, obstacle[2]:obstacle[3]+1] = 1
return grid, int(north_min), int(east_min)

A small caveat which we need to understand is, the obstacles in colliders and our current local location are with respect to global home, which we had configured in with help of lat0 and lon0 variables. Whereas, the grid world which we create in create_grid function has origin at first obstacle point. Remember how we had calculated north min & max and east min & max and then north and east size to create a grid. Well, because of that there is an offset between colliders & our drone’s positional values and corresponding same position in our grid world. Following diagram will help us understand it better.

So every time we are trying to map a value from our drone’s positional sensor to grid world, we need to subtract north and east offset respectively. This offset value is nothing but north_min and east_min returned by create_grid function. We set our current position in grid_world as

# TODO: convert start position to current position rather than map center
grid_start = (int(current_local_pos[0] - north_offset), int(current_local_pos[1] - east_offset))

Assign a goal location of our choice

# Take GPS co-ordinates as Grid goal -Me
grid_goal = (-122.396582, 37.795714, 0)
grid_goal = global_to_local(grid_goal, self.global_home)
grid_goal = (int(grid_goal[0] - north_offset), int(grid_goal[1] - east_offset))

Print our start and goal location and off we go on our way to be A*!!

print('Local Start and Goal: ', grid_start, grid_goal)
path, _ = a_star(grid, heuristic, grid_start, grid_goal)
  • A* in the city
    We pass the grid we just now created in above section, a heuristic function and our start and goal location as arguments to a_star function. Hopefully the concepts we had gained in part 1 of this series will help us understand code for this function better. In fact let’s revisit the list of steps we had gone through while understanding Breadth First Algorithm. Knowing which step of our algorithm is performed in which set of lines will help us understand below code block better. As stated in Part 1, a A* algorithm adds ‘cost’ and ‘heuristic’ function on top of a BFS. So along with code for implementing below steps, it will include adding and accessing ‘cost’ and ‘heuristic’ functions too.
     The backbone of a breadth-first search consists of these basic steps:
     1. Add a node/vertex from the grid to a queue of nodes to be “visited”.
     2. Visit the topmost node in the queue, and mark it as such.
     3. If that node has any neighbors, check to see if they have been “visited” or not.
     4. Add any neighboring nodes that still need to be “visited” to the queue.
     5. Remove current node we’ve visited from the queue.
     We first declare few variables. We will be using a Priority Queue to store nodes we need to visit. A typical pattern for entries in priority queue is a tuple in the form: (priority_number, data). The lowest valued entries are retrieved first and as we will be using ‘cost to reach corresponding node’ value as ‘priority_number’, whenever we will ‘get’ (or pop) an element from this priority queue, we will get an element which has least ‘cost’ value. We start at our ‘start’ node.
path = []
path_cost = 0
queue = PriorityQueue()
queue.put((0, start))
visited = set(start)
branch = {}
found = False
# Check till we have searched all nodes or have found our ‘goal’
while not queue.empty():
item = queue.get() # Step2. Visit the topmost node in the queue
current_cost = item[0]
current_node = item[1]
 if current_node == goal:
print('Found a path.')
found = True
break

else:
#Step3. If that node has any neighbors, check to see if they have been “visited” or not.

for a in valid_actions(grid, current_node):
next_node = (current_node[0] + a.delta[0], current_node[1] + a.delta[1])
new_cost = current_cost + a.cost + h(next_node, goal)
#Step4. Add any neighboring nodes that still need to be “visited” to the queue.
if next_node not in visited:
visited.add(next_node)
queue.put((new_cost, next_node))
 branch[next_node] = (new_cost, current_node, a)

As we saw in part 1, whenever we reach a node in BFS algorithm, it is guaranteed to be the shortest path to that node from our ‘start’ location. So to keep track of path we are tracing, in a_star function, we will be maintaining a dictionary, for which ‘key’ will be the node we will be visiting next, and its value will be a tuple of current node, action to be taken from current node in order to reach next node and total cost to reach next node. This tuple (current node + action to take to reach next node) will represent the shortest path to reach next node coming from ‘start’ location.
 Now once we find ‘goal’ location, we will trace back the route to ‘start’ location with the help of this ‘branch’ dictionary. We first copy the value of total cost to reach ‘goal’ location. Next, as we know that node present in ‘value’ of dictionary lies on the shortest path, we will build our ‘path’ list as -> [goal, node present in value of branch[goal], …]. And we will keep tracing back our route till we find ‘start’ location, adding nodes to ‘path’ list.

if found:
# retrace steps
n = goal
path_cost = branch[n][0]
path.append(goal)
while branch[n][1] != start:
path.append(branch[n][1])
n = branch[n][1]
path.append(branch[n][1])
else:
print('**********************')
print('Failed to find a path!')
print('**********************')
return path[::-1], path_cost
  • Helper Functions
    Path Pruning: Once we have our path figured out, we will prune it by removing collinear points as explained in Collinearity Check section in Part 1. We take 3 consecutive points from our final path and check whether they lie on same line. We do this by calculating the area enclosed by selected 3 points. If it is very close to 0, we can assume that they lie on same line. We first convert a point in a 3 element (x-coordinate, y-coordinate, 1) to numpy array. We add this last element “1” so that we can arrange 3 vertices into a nice 3 x 3 matrix. After reshaping points in “points()” function, we create a matrix by concating them and then calculating the determinant of that matrix using a convenient np.linalg.det() function.
    Valid Actions: One function which was used by us in a_star is valid_actions (grid, current_node). This was used to carry out step# 3 of BFS algorithm, where we search for current node’s neighbours. We already have our action set (as explained in “Search space, Action set and Cost” section of Part1) and location of our current node. This function will return a subset of action set. It will check weather an action (i.e. moving up, down, left, right or diagonal) will lead us off grid or to an obstacle / No Fly zone node. And if that is the case, it will remove this action from list of valid actions feasible from current node.

Coming back to motion_planning.py, once we get pruned path, take each point from it, convert it into waypoint in local frame coordinate (by adding north and east offsets) and send it to autopilot using self.send_waypoints() function. And with that, we have finished coding our path planning A* algorithm. So without further ado, lets fire up Udacity’s drone simulator and run our motion_planning.py python file. You can find all the source code in my github repo here. If everything runs smoothly, you should be able to see your drone fly from a user configured starting and goal location like shown in below gif.

Hope you have enjoyed reading couple of articles in this series and now have a good understanding of fundamentals of A*.

If you found this article useful, you know what to do next 😊 Till next time….cheers!!

Source: Artificial Intelligence on Medium

(Visited 1 times, 1 visits today)

Leave a comment

Your email address will not be published. Required fields are marked *