diff --git a/PathPlanning/Eta3SplinePath/eta3_spline_path.py b/PathPlanning/Eta3SplinePath/eta3_spline_path.py index efb90407..a9bcd351 100644 --- a/PathPlanning/Eta3SplinePath/eta3_spline_path.py +++ b/PathPlanning/Eta3SplinePath/eta3_spline_path.py @@ -152,10 +152,10 @@ class eta3_path_segment(object): + (10. * eta[1] - 2. * eta[3] + 1. / 6 * eta[5]) * sb \ - (2. * eta[1]**2 * kappa[2] - 1. / 6 * eta[1]**3 * kappa[3] - 1. / 2 * eta[1] * eta[3] * kappa[2]) * cb - - def s_dot(u): return np.linalg.norm(self.coeffs[:, 1:].dot( - np.array([1, 2. * u, 3. * u**2, 4. * u**3, 5. * u**4, 6. * u**5, 7. * u**6]))) - self.segment_length = quad(lambda u: s_dot(u), 0, 1)[0] + + self.s_dot = lambda u : max(np.linalg.norm(self.coeffs[:, 1:].dot(np.array([1, 2.*u, 3.*u**2, 4.*u**3, 5.*u**4, 6.*u**5, 7.*u**6]))), 1e-6) + self.f_length = lambda ue: quad(lambda u: self.s_dot(u), 0, ue) + self.segment_length = self.f_length(1)[0] """ eta3_path_segment::calc_point @@ -170,6 +170,21 @@ class eta3_path_segment(object): assert(u >= 0 and u <= 1) return self.coeffs.dot(np.array([1, u, u**2, u**3, u**4, u**5, u**6, u**7])) + """ + eta3_path_segment::calc_deriv + + input + u - parametric representation of a point along the segment, 0 <= u <= 1 + returns + (d^nx/du^n,d^ny/du^n) of point along the segment, for 0 < n <= 2 + """ + def calc_deriv(self, u, order=1): + assert(u >= 0 and u <= 1) + assert(order > 0 and order <= 2) + if order == 1: + return self.coeffs[:, 1:].dot(np.array([1, 2.*u, 3.*u**2, 4.*u**3, 5.*u**4, 6.*u**5, 7.*u**6])) + else: + return self.coeffs[:, 2:].dot(np.array([2, 6.*u, 12.*u**2, 20.*u**3, 30.*u**4, 42.*u**5])) def test1(): diff --git a/PathPlanning/Eta3SplineTrajectory/eta3_spline_trajectory.py b/PathPlanning/Eta3SplineTrajectory/eta3_spline_trajectory.py new file mode 100644 index 00000000..cb1b06f2 --- /dev/null +++ b/PathPlanning/Eta3SplineTrajectory/eta3_spline_trajectory.py @@ -0,0 +1,431 @@ +""" + +\eta^3 polynomials trajectory planner + +author: Joe Dinius, Ph.D (https://jwdinius.github.io) + Atsushi Sakai (@Atsushi_twi) + +Refs: +- https://jwdinius.github.io/blog/2018/eta3traj +- [\eta^3-Splines for the Smooth Path Generation of Wheeled Mobile Robots](https://ieeexplore.ieee.org/document/4339545/) + +""" + +import numpy as np +import matplotlib.pyplot as plt +from matplotlib.collections import LineCollection +import sys +import os +sys.path.append(os.path.relpath("../Eta3SplinePath")) +from eta3_spline_path import eta3_path, eta3_path_segment + +show_animation = True + + +class MaxVelocityNotReached(Exception): + def __init__(self, actual_vel, max_vel): + self.message = 'Actual velocity {} does not equal desired max velocity {}!'.format(actual_vel, max_vel) + + +class eta3_trajectory(eta3_path): + """ + eta3_trajectory + + input + segments: list of `eta3_trajectory_segment` instances defining a continuous trajectory + """ + def __init__(self, segments, max_vel, v0=0.0, a0=0.0, max_accel=2.0, max_jerk=5.0): + # ensure that all inputs obey the assumptions of the model + assert max_vel > 0 and v0 >= 0 and a0 >= 0 and max_accel > 0 and max_jerk > 0 \ + and a0 <= max_accel and v0 <= max_vel + super(eta3_trajectory, self).__init__(segments=segments) + self.total_length = sum([s.segment_length for s in self.segments]) + self.max_vel = float(max_vel) + self.v0 = float(v0) + self.a0 = float(a0) + self.max_accel = float(max_accel) + self.max_jerk = float(max_jerk) + length_array = np.array([s.segment_length for s in self.segments]) + # add a zero to the beginning for finding the correct segment_id + self.cum_lengths = np.concatenate((np.array([0]), np.cumsum(length_array))) + ## compute velocity profile on top of the path + self.velocity_profile() + self.ui_prev = 0 + self.prev_seg_id = 0 + + def velocity_profile(self): + ''' /~~~~~----------------~~~~~\ + / \ + / \ + / \ + / \ + (v=v0, a=a0) ~~~~~ \ + \ + \~~~~~ (vf=0, af=0) + pos.|pos.|neg.| cruise at |neg.| neg. |neg. + max |max.|max.| max. |max.| max. |max. + jerk|acc.|jerk| velocity |jerk| acc. |jerk + index 0 1 2 3 (optional) 4 5 6 + ''' + # delta_a: accel change from initial position to end of maximal jerk section + delta_a = self.max_accel - self.a0 + # t_s1: time of traversal of maximal jerk section + t_s1 = delta_a / self.max_jerk + # v_s1: velocity at the end of the maximal jerk section + v_s1 = self.v0 + self.a0 * t_s1 + self.max_jerk * t_s1**2 / 2. + # s_s1: length of the maximal jerk section + s_s1 = self.v0 * t_s1 + self.a0 * t_s1**2 / 2. + self.max_jerk * t_s1**3 / 6. + # t_sf: time of traversal of final section, which is also maximal jerk, but has final velocity 0 + t_sf = self.max_accel / self.max_jerk + # v_sf: velocity at beginning of final section + v_sf = self.max_jerk * t_sf**2 / 2. + # s_sf: length of final section + s_sf = self.max_jerk * t_sf**3 / 6. + # solve for the maximum achievable velocity based on the kinematic limits imposed by max_accel and max_jerk + # this leads to a quadratic equation in v_max: a*v_max**2 + b*v_max + c = 0 + a = 1 / self.max_accel + b = 3. * self.max_accel / (2. * self.max_jerk) + v_s1 / self.max_accel - (self.max_accel**2 / self.max_jerk + v_s1) / self.max_accel + c = s_s1 + s_sf - self.total_length - 7. * self.max_accel**3 / (3. * self.max_jerk**2) \ + - v_s1 * (self.max_accel / self.max_jerk + v_s1 / self.max_accel) \ + + (self.max_accel**2 / self.max_jerk + v_s1 / self.max_accel)**2 / (2. * self.max_accel) + v_max = ( -b + np.sqrt(b**2 - 4. * a * c) ) / (2. * a) + + # v_max represents the maximum velocity that could be attained if there was no cruise period + # (i.e. driving at constant speed without accelerating or jerking) + # if this velocity is less than our desired max velocity, the max velocity needs to be updated + # XXX the way to handle this `if` condition needs to be more thoroughly worked through + if self.max_vel > v_max: + # when this condition is tripped, there will be no cruise period (s_cruise=0) + self.max_vel = v_max + + # setup arrays to store values at END of trajectory sections + self.times = np.zeros((7,)) + self.vels = np.zeros((7,)) + self.seg_lengths = np.zeros((7,)) + + # Section 0: max jerk up to max acceleration + index = 0 + self.times[0] = t_s1 + self.vels[0] = v_s1 + self.seg_lengths[0] = s_s1 + + # Section 1: accelerate at max_accel + index = 1 + # compute change in velocity over the section + delta_v = (self.max_vel - self.max_jerk * (self.max_accel / self.max_jerk)**2 / 2.) - self.vels[index-1] + self.times[index] = delta_v / self.max_accel + self.vels[index] = self.vels[index-1] + self.max_accel * self.times[index] + self.seg_lengths[index] = self.vels[index-1] * self.times[index] + self.max_accel * self.times[index]**2 / 2. + + # Section 2: decrease acceleration (down to 0) until max speed is hit + index = 2 + self.times[index] = self.max_accel / self.max_jerk + self.vels[index] = self.vels[index-1] + self.max_accel * self.times[index] \ + - self.max_jerk * self.times[index]**2 / 2. + + # as a check, the velocity at the end of the section should be self.max_vel + if not np.isclose(self.vels[index], self.max_vel): + raise MaxVelocityNotReached(self.vels[index], self.max_vel) + + self.seg_lengths[index] = self.vels[index-1] * self.times[index] + self.max_accel * self.times[index]**2 / 2. \ + - self.max_jerk * self.times[index]**3 / 6. + + # Section 3: will be done last + + # Section 4: apply min jerk until min acceleration is hit + index = 4 + self.times[index] = self.max_accel / self.max_jerk + self.vels[index] = self.max_vel - self.max_jerk * self.times[index]**2 / 2. + self.seg_lengths[index] = self.max_vel * self.times[index] - self.max_jerk * self.times[index]**3 / 6. + + # Section 5: continue deceleration at max rate + index = 5 + # compute velocity change over sections + delta_v = self.vels[index-1] - v_sf + self.times[index] = delta_v / self.max_accel + self.vels[index] = self.vels[index-1] - self.max_accel * self.times[index] + self.seg_lengths[index] = self.vels[index-1] * self.times[index] - self.max_accel * self.times[index]**2 / 2. + + # Section 6(final): max jerk to get to zero velocity and zero acceleration simultaneously + index = 6 + self.times[index] = t_sf + self.vels[index] = self.vels[index-1] - self.max_jerk * t_sf**2 / 2. + + try: + assert np.isclose(self.vels[index], 0) + except AssertionError as e: + print('The final velocity {} is not zero'.format(self.vels[index])) + raise e + + self.seg_lengths[index] = s_sf + + if self.seg_lengths.sum() < self.total_length: + index = 3 + # the length of the cruise section is whatever length hasn't already been accounted for + # NOTE: the total array self.seg_lengths is summed because the entry for the cruise segment is + # initialized to 0! + self.seg_lengths[index] = self.total_length - self.seg_lengths.sum() + self.vels[index] = self.max_vel + self.times[index] = self.seg_lengths[index] / self.max_vel + + # make sure that all of the times are positive, otherwise the kinematic limits + # chosen cannot be enforced on the path + assert(np.all(self.times >= 0)) + self.total_time = self.times.sum() + + def get_interp_param(self, seg_id, s, ui, tol=0.001): + f = lambda u: self.segments[seg_id].f_length(u)[0] - s + fprime = lambda u: self.segments[seg_id].s_dot(u) + while (ui >= 0 and ui <= 1) and abs(f(ui)) > tol: + ui -= f(ui) / fprime(ui) + ui = max(0, min(ui, 1)) + return ui + + def calc_traj_point(self, time): + # compute velocity at time + if time <= self.times[0]: + linear_velocity = self.v0 + self.max_jerk * time**2 / 2. + s = self.v0 * time + self.max_jerk * time**3 / 6 + linear_accel = self.max_jerk * time + elif time <= self.times[:2].sum(): + delta_t = time - self.times[0] + linear_velocity = self.vels[0] + self.max_accel * delta_t + s = self.seg_lengths[0] + self.vels[0] * delta_t + self.max_accel * delta_t**2 / 2. + linear_accel = self.max_accel + elif time <= self.times[:3].sum(): + delta_t = time - self.times[:2].sum() + linear_velocity = self.vels[1] + self.max_accel * delta_t - self.max_jerk * delta_t**2 / 2. + s = self.seg_lengths[:2].sum() + self.vels[1] * delta_t + self.max_accel * delta_t**2 /2. \ + - self.max_jerk * delta_t**3 / 6. + linear_accel = self.max_accel - self.max_jerk * delta_t + elif time <= self.times[:4].sum(): + delta_t = time - self.times[:3].sum() + linear_velocity = self.vels[3] + s = self.seg_lengths[:3].sum() + self.vels[3] * delta_t + linear_accel = 0. + elif time <= self.times[:5].sum(): + delta_t = time - self.times[:4].sum() + linear_velocity = self.vels[3] - self.max_jerk * delta_t**2 / 2. + s = self.seg_lengths[:4].sum() + self.vels[3] * delta_t - self.max_jerk * delta_t**3 / 6. + linear_accel = -self.max_jerk * delta_t + elif time <= self.times[:-1].sum(): + delta_t = time - self.times[:5].sum() + linear_velocity = self.vels[4] - self.max_accel * delta_t + s = self.seg_lengths[:5].sum() + self.vels[4] * delta_t - self.max_accel * delta_t**2 / 2. + linear_accel = -self.max_accel + elif time < self.times.sum(): + delta_t = time - self.times[:-1].sum() + linear_velocity = self.vels[5] - self.max_accel * delta_t + self.max_jerk * delta_t**2 / 2. + s = self.seg_lengths[:-1].sum() + self.vels[5] * delta_t - self.max_accel * delta_t**2 / 2. \ + + self.max_jerk * delta_t**3 / 6. + linear_accel = -self.max_accel + self.max_jerk*delta_t + else: + linear_velocity = 0. + s = self.total_length + linear_accel = 0. + seg_id = np.max(np.argwhere(self.cum_lengths <= s)) + + # will happen at the end of the segment + if seg_id == len(self.segments): + seg_id -= 1 + ui = 1 + else: + # compute interpolation parameter using length from current segment's starting point + curr_segment_length = s - self.cum_lengths[seg_id] + ui = self.get_interp_param(seg_id=seg_id, s=curr_segment_length, ui=self.ui_prev) + + if not seg_id == self.prev_seg_id: + self.ui_prev = 0 + else: + self.ui_prev = ui + + self.prev_seg_id = seg_id + # TODO(jwd): normalize! + # compute angular velocity of current point= (ydd*xd - xdd*yd) / (xd**2 + yd**2) + d = self.segments[seg_id].calc_deriv(ui, order=1) + dd = self.segments[seg_id].calc_deriv(ui, order=2) + # su - the rate of change of arclength wrt u + su = self.segments[seg_id].s_dot(ui) + if not np.isclose(su, 0.) and not np.isclose(linear_velocity, 0.): + # ut - time-derivative of interpolation parameter u + ut = linear_velocity / su + # utt - time-derivative of ut + utt = linear_accel / su - (d[0] * dd[0] + d[1] * dd[1]) / su**2 * ut + xt = d[0] * ut + yt = d[1] * ut + xtt = dd[0] * ut**2 + d[0] * utt + ytt = dd[1] * ut**2 + d[1] * utt + angular_velocity = (ytt * xt - xtt * yt) / linear_velocity**2 + else: + angular_velocity = 0. + + # combine path point with orientation and velocities + pos = self.segments[seg_id].calc_point(ui) + state = np.array([pos[0], pos[1], np.arctan2(d[1], d[0]), linear_velocity, angular_velocity]) + return state + + +def test1(max_vel=0.5): + + for i in range(10): + trajectory_segments = [] + # segment 1: lane-change curve + start_pose = [0, 0, 0] + end_pose = [4, 3.0, 0] + # NOTE: The ordering on kappa is [kappa_A, kappad_A, kappa_B, kappad_B], with kappad_* being the curvature derivative + kappa = [0, 0, 0, 0] + eta = [i, i, 0, 0, 0, 0] + trajectory_segments.append(eta3_path_segment( + start_pose=start_pose, end_pose=end_pose, eta=eta, kappa=kappa)) + + traj = eta3_trajectory(trajectory_segments, max_vel=max_vel, max_accel=0.5) + + # interpolate at several points along the path + times = np.linspace(0, traj.total_time, 101) + state = np.empty((5, times.size)) + for j, t in enumerate(times): + state[:, j] = traj.calc_traj_point(t) + + if show_animation: + # plot the path + plt.plot(state[0, :], state[1, :]) + plt.pause(1.0) + + plt.show() + if show_animation: + plt.close("all") + + +def test2(max_vel=0.5): + + for i in range(10): + trajectory_segments = [] + # segment 1: lane-change curve + start_pose = [0, 0, 0] + end_pose = [4, 3.0, 0] + # NOTE: The ordering on kappa is [kappa_A, kappad_A, kappa_B, kappad_B], with kappad_* being the curvature derivative + kappa = [0, 0, 0, 0] + # NOTE: INTEGRATOR ERROR EXPLODES WHEN eta[:1] IS ZERO! + # was: eta = [0, 0, (i - 5) * 20, (5 - i) * 20, 0, 0], now is: + eta = [0.1, 0.1, (i - 5) * 20, (5 - i) * 20, 0, 0] + trajectory_segments.append(eta3_path_segment( + start_pose=start_pose, end_pose=end_pose, eta=eta, kappa=kappa)) + + traj = eta3_trajectory(trajectory_segments, max_vel=max_vel, max_accel=0.5) + + # interpolate at several points along the path + times = np.linspace(0, traj.total_time, 101) + state = np.empty((5, times.size)) + for j, t in enumerate(times): + state[:, j] = traj.calc_traj_point(t) + + if show_animation: + # plot the path + plt.plot(state[0, :], state[1, :]) + plt.pause(1.0) + + plt.show() + if show_animation: + plt.close("all") + + +def test3(max_vel=2.0): + trajectory_segments = [] + + # segment 1: lane-change curve + start_pose = [0, 0, 0] + end_pose = [4, 1.5, 0] + # NOTE: The ordering on kappa is [kappa_A, kappad_A, kappa_B, kappad_B], with kappad_* being the curvature derivative + kappa = [0, 0, 0, 0] + eta = [4.27, 4.27, 0, 0, 0, 0] + trajectory_segments.append(eta3_path_segment( + start_pose=start_pose, end_pose=end_pose, eta=eta, kappa=kappa)) + + # segment 2: line segment + start_pose = [4, 1.5, 0] + end_pose = [5.5, 1.5, 0] + kappa = [0, 0, 0, 0] + # NOTE: INTEGRATOR ERROR EXPLODES WHEN eta[:1] IS ZERO! + #was: eta = [0, 0, 0, 0, 0, 0], now is: + eta = [0.5, 0.5, 0, 0, 0, 0] + trajectory_segments.append(eta3_path_segment( + start_pose=start_pose, end_pose=end_pose, eta=eta, kappa=kappa)) + + # segment 3: cubic spiral + start_pose = [5.5, 1.5, 0] + end_pose = [7.4377, 1.8235, 0.6667] + kappa = [0, 0, 1, 1] + eta = [1.88, 1.88, 0, 0, 0, 0] + trajectory_segments.append(eta3_path_segment( + start_pose=start_pose, end_pose=end_pose, eta=eta, kappa=kappa)) + + # segment 4: generic twirl arc + start_pose = [7.4377, 1.8235, 0.6667] + end_pose = [7.8, 4.3, 1.8] + kappa = [1, 1, 0.5, 0] + eta = [7, 10, 10, -10, 4, 4] + trajectory_segments.append(eta3_path_segment( + start_pose=start_pose, end_pose=end_pose, eta=eta, kappa=kappa)) + + # segment 5: circular arc + start_pose = [7.8, 4.3, 1.8] + end_pose = [5.4581, 5.8064, 3.3416] + kappa = [0.5, 0, 0.5, 0] + eta = [2.98, 2.98, 0, 0, 0, 0] + trajectory_segments.append(eta3_path_segment( + start_pose=start_pose, end_pose=end_pose, eta=eta, kappa=kappa)) + + # construct the whole path + traj = eta3_trajectory(trajectory_segments, max_vel=max_vel, max_accel=0.5, max_jerk=1) + + # interpolate at several points along the path + times = np.linspace(0, traj.total_time, 1001) + state = np.empty((5, times.size)) + for i, t in enumerate(times): + state[:, i] = traj.calc_traj_point(t) + + # plot the path + + if show_animation: + fig, ax = plt.subplots() + x, y = state[0, :], state[1, :] + points = np.array([x, y]).T.reshape(-1, 1, 2) + segs = np.concatenate([points[:-1], points[1:]], axis=1) + lc = LineCollection(segs, cmap=plt.get_cmap('inferno')) + ax.set_xlim(np.min(x)-1, np.max(x)+1) + ax.set_ylim(np.min(y)-1, np.max(y)+1) + lc.set_array(state[3, :]) + lc.set_linewidth(3) + ax.add_collection(lc) + axcb = fig.colorbar(lc) + axcb.set_label('velocity(m/s)') + ax.set_title('Trajectory') + plt.xlabel('x') + plt.ylabel('y') + plt.pause(1.0) + + fig1, ax1 = plt.subplots() + ax1.plot(times, state[3, :], 'b-') + ax1.set_xlabel('time(s)') + ax1.set_ylabel('velocity(m/s)', color='b') + ax1.tick_params('y', colors='b') + ax1.set_title('Control') + ax2 = ax1.twinx() + ax2.plot(times, state[4, :], 'r-') + ax2.set_ylabel('angular velocity(rad/s)', color='r') + ax2.tick_params('y', colors='r') + fig.tight_layout() + plt.show() + + +def main(): + """ + recreate path from reference (see Table 1) + """ + #test1() + #test2() + test3() + + +if __name__ == '__main__': + main()