Source code for highway_env.vehicle.behavior

from typing import Tuple, Union

import numpy as np

from highway_env.road.road import Road, Route, LaneIndex
from highway_env.types import Vector
from highway_env.vehicle.controller import ControlledVehicle
from highway_env import utils
from highway_env.vehicle.kinematics import Vehicle
from highway_env.road.objects import RoadObject


[docs]class IDMVehicle(ControlledVehicle): """ A vehicle using both a longitudinal and a lateral decision policies. - Longitudinal: the IDM model computes an acceleration given the preceding vehicle's distance and speed. - Lateral: the MOBIL model decides when to change lane by maximizing the acceleration of nearby vehicles. """ # Longitudinal policy parameters ACC_MAX = 6.0 # [m/s2] """Maximum acceleration.""" COMFORT_ACC_MAX = 3.0 # [m/s2] """Desired maximum acceleration.""" COMFORT_ACC_MIN = -5.0 # [m/s2] """Desired maximum deceleration.""" DISTANCE_WANTED = 5.0 + ControlledVehicle.LENGTH # [m] """Desired jam distance to the front vehicle.""" TIME_WANTED = 1.5 # [s] """Desired time gap to the front vehicle.""" DELTA = 4.0 # [] """Exponent of the velocity term.""" # Lateral policy parameters POLITENESS = 0. # in [0, 1] LANE_CHANGE_MIN_ACC_GAIN = 0.2 # [m/s2] LANE_CHANGE_MAX_BRAKING_IMPOSED = 2.0 # [m/s2] LANE_CHANGE_DELAY = 1.0 # [s]
[docs] def __init__(self, road: Road, position: Vector, heading: float = 0, speed: float = 0, target_lane_index: int = None, target_speed: float = None, route: Route = None, enable_lane_change: bool = True, timer: float = None): super().__init__(road, position, heading, speed, target_lane_index, target_speed, route) self.enable_lane_change = enable_lane_change self.timer = timer or (np.sum(self.position)*np.pi) % self.LANE_CHANGE_DELAY
[docs] def randomize_behavior(self): pass
[docs] @classmethod def create_from(cls, vehicle: ControlledVehicle) -> "IDMVehicle": """ Create a new vehicle from an existing one. The vehicle dynamics and target dynamics are copied, other properties are default. :param vehicle: a vehicle :return: a new vehicle at the same dynamical state """ v = cls(vehicle.road, vehicle.position, heading=vehicle.heading, speed=vehicle.speed, target_lane_index=vehicle.target_lane_index, target_speed=vehicle.target_speed, route=vehicle.route, timer=getattr(vehicle, 'timer', None)) return v
[docs] def act(self, action: Union[dict, str] = None): """ Execute an action. For now, no action is supported because the vehicle takes all decisions of acceleration and lane changes on its own, based on the IDM and MOBIL models. :param action: the action """ if self.crashed: return action = {} front_vehicle, rear_vehicle = self.road.neighbour_vehicles(self) # Lateral: MOBIL self.follow_road() if self.enable_lane_change: self.change_lane_policy() action['steering'] = self.steering_control(self.target_lane_index) action['steering'] = np.clip(action['steering'], -self.MAX_STEERING_ANGLE, self.MAX_STEERING_ANGLE) # Longitudinal: IDM action['acceleration'] = self.acceleration(ego_vehicle=self, front_vehicle=front_vehicle, rear_vehicle=rear_vehicle) # action['acceleration'] = self.recover_from_stop(action['acceleration']) action['acceleration'] = np.clip(action['acceleration'], -self.ACC_MAX, self.ACC_MAX) Vehicle.act(self, action) # Skip ControlledVehicle.act(), or the command will be overriden.
[docs] def step(self, dt: float): """ Step the simulation. Increases a timer used for decision policies, and step the vehicle dynamics. :param dt: timestep """ self.timer += dt super().step(dt)
[docs] def acceleration(self, ego_vehicle: ControlledVehicle, front_vehicle: Vehicle = None, rear_vehicle: Vehicle = None) -> float: """ Compute an acceleration command with the Intelligent Driver Model. The acceleration is chosen so as to: - reach a target speed; - maintain a minimum safety distance (and safety time) w.r.t the front vehicle. :param ego_vehicle: the vehicle whose desired acceleration is to be computed. It does not have to be an IDM vehicle, which is why this method is a class method. This allows an IDM vehicle to reason about other vehicles behaviors even though they may not IDMs. :param front_vehicle: the vehicle preceding the ego-vehicle :param rear_vehicle: the vehicle following the ego-vehicle :return: the acceleration command for the ego-vehicle [m/s2] """ if not ego_vehicle or isinstance(ego_vehicle, RoadObject): return 0 ego_target_speed = utils.not_zero(getattr(ego_vehicle, "target_speed", 0)) acceleration = self.COMFORT_ACC_MAX * ( 1 - np.power(max(ego_vehicle.speed, 0) / ego_target_speed, self.DELTA)) if front_vehicle: d = ego_vehicle.lane_distance_to(front_vehicle) acceleration -= self.COMFORT_ACC_MAX * \ np.power(self.desired_gap(ego_vehicle, front_vehicle) / utils.not_zero(d), 2) return acceleration
[docs] def desired_gap(self, ego_vehicle: Vehicle, front_vehicle: Vehicle = None) -> float: """ Compute the desired distance between a vehicle and its leading vehicle. :param ego_vehicle: the vehicle being controlled :param front_vehicle: its leading vehicle :return: the desired distance between the two [m] """ d0 = self.DISTANCE_WANTED tau = self.TIME_WANTED ab = -self.COMFORT_ACC_MAX * self.COMFORT_ACC_MIN dv = ego_vehicle.speed - front_vehicle.speed d_star = d0 + ego_vehicle.speed * tau + ego_vehicle.speed * dv / (2 * np.sqrt(ab)) return d_star
[docs] def maximum_speed(self, front_vehicle: Vehicle = None) -> Tuple[float, float]: """ Compute the maximum allowed speed to avoid Inevitable Collision States. Assume the front vehicle is going to brake at full deceleration and that it will be noticed after a given delay, and compute the maximum speed which allows the ego-vehicle to brake enough to avoid the collision. :param front_vehicle: the preceding vehicle :return: the maximum allowed speed, and suggested acceleration """ if not front_vehicle: return self.target_speed d0 = self.DISTANCE_WANTED a0 = self.COMFORT_ACC_MIN a1 = self.COMFORT_ACC_MIN tau = self.TIME_WANTED d = max(self.lane_distance_to(front_vehicle) - self.LENGTH / 2 - front_vehicle.LENGTH / 2 - d0, 0) v1_0 = front_vehicle.speed delta = 4 * (a0 * a1 * tau) ** 2 + 8 * a0 * (a1 ** 2) * d + 4 * a0 * a1 * v1_0 ** 2 v_max = -a0 * tau + np.sqrt(delta) / (2 * a1) # Speed control self.target_speed = min(self.maximum_speed(front_vehicle), self.target_speed) acceleration = self.speed_control(self.target_speed) return v_max, acceleration
[docs] def change_lane_policy(self) -> None: """ Decide when to change lane. Based on: - frequency; - closeness of the target lane; - MOBIL model. """ # If a lane change already ongoing if self.lane_index != self.target_lane_index: # If we are on correct route but bad lane: abort it if someone else is already changing into the same lane if self.lane_index[:2] == self.target_lane_index[:2]: for v in self.road.vehicles: if v is not self \ and v.lane_index != self.target_lane_index \ and isinstance(v, ControlledVehicle) \ and v.target_lane_index == self.target_lane_index: d = self.lane_distance_to(v) d_star = self.desired_gap(self, v) if 0 < d < d_star: self.target_lane_index = self.lane_index break return # else, at a given frequency, if not utils.do_every(self.LANE_CHANGE_DELAY, self.timer): return self.timer = 0 # decide to make a lane change for lane_index in self.road.network.side_lanes(self.lane_index): # Is the candidate lane close enough? if not self.road.network.get_lane(lane_index).is_reachable_from(self.position): continue # Does the MOBIL model recommend a lane change? if self.mobil(lane_index): self.target_lane_index = lane_index
[docs] def mobil(self, lane_index: LaneIndex) -> bool: """ MOBIL lane change model: Minimizing Overall Braking Induced by a Lane change The vehicle should change lane only if: - after changing it (and/or following vehicles) can accelerate more; - it doesn't impose an unsafe braking on its new following vehicle. :param lane_index: the candidate lane for the change :return: whether the lane change should be performed """ # Is the maneuver unsafe for the new following vehicle? new_preceding, new_following = self.road.neighbour_vehicles(self, lane_index) new_following_a = self.acceleration(ego_vehicle=new_following, front_vehicle=new_preceding) new_following_pred_a = self.acceleration(ego_vehicle=new_following, front_vehicle=self) if new_following_pred_a < -self.LANE_CHANGE_MAX_BRAKING_IMPOSED: return False # Do I have a planned route for a specific lane which is safe for me to access? old_preceding, old_following = self.road.neighbour_vehicles(self) self_pred_a = self.acceleration(ego_vehicle=self, front_vehicle=new_preceding) if self.route and self.route[0][2]: # Wrong direction if np.sign(lane_index[2] - self.target_lane_index[2]) != np.sign(self.route[0][2] - self.target_lane_index[2]): return False # Unsafe braking required elif self_pred_a < -self.LANE_CHANGE_MAX_BRAKING_IMPOSED: return False # Is there an acceleration advantage for me and/or my followers to change lane? else: self_a = self.acceleration(ego_vehicle=self, front_vehicle=old_preceding) old_following_a = self.acceleration(ego_vehicle=old_following, front_vehicle=self) old_following_pred_a = self.acceleration(ego_vehicle=old_following, front_vehicle=old_preceding) jerk = self_pred_a - self_a + self.POLITENESS * (new_following_pred_a - new_following_a + old_following_pred_a - old_following_a) if jerk < self.LANE_CHANGE_MIN_ACC_GAIN: return False # All clear, let's go! return True
[docs] def recover_from_stop(self, acceleration: float) -> float: """ If stopped on the wrong lane, try a reversing maneuver. :param acceleration: desired acceleration from IDM :return: suggested acceleration to recover from being stuck """ stopped_speed = 5 safe_distance = 200 # Is the vehicle stopped on the wrong lane? if self.target_lane_index != self.lane_index and self.speed < stopped_speed: _, rear = self.road.neighbour_vehicles(self) _, new_rear = self.road.neighbour_vehicles(self, self.road.network.get_lane(self.target_lane_index)) # Check for free room behind on both lanes if (not rear or rear.lane_distance_to(self) > safe_distance) and \ (not new_rear or new_rear.lane_distance_to(self) > safe_distance): # Reverse return -self.COMFORT_ACC_MAX / 2 return acceleration
[docs]class LinearVehicle(IDMVehicle): """A Vehicle whose longitudinal and lateral controllers are linear with respect to parameters.""" ACCELERATION_PARAMETERS = [0.3, 0.3, 2.0] STEERING_PARAMETERS = [ControlledVehicle.KP_HEADING, ControlledVehicle.KP_HEADING * ControlledVehicle.KP_LATERAL] ACCELERATION_RANGE = np.array([0.5*np.array(ACCELERATION_PARAMETERS), 1.5*np.array(ACCELERATION_PARAMETERS)]) STEERING_RANGE = np.array([np.array(STEERING_PARAMETERS) - np.array([0.07, 1.5]), np.array(STEERING_PARAMETERS) + np.array([0.07, 1.5])]) TIME_WANTED = 2.5
[docs] def __init__(self, road: Road, position: Vector, heading: float = 0, speed: float = 0, target_lane_index: int = None, target_speed: float = None, route: Route = None, enable_lane_change: bool = True, timer: float = None, data: dict = None): super().__init__(road, position, heading, speed, target_lane_index, target_speed, route, enable_lane_change, timer) self.data = data if data is not None else {} self.collecting_data = True
[docs] def act(self, action: Union[dict, str] = None): if self.collecting_data: self.collect_data() super().act(action)
[docs] def randomize_behavior(self): ua = self.road.np_random.uniform(size=np.shape(self.ACCELERATION_PARAMETERS)) self.ACCELERATION_PARAMETERS = self.ACCELERATION_RANGE[0] + ua*(self.ACCELERATION_RANGE[1] - self.ACCELERATION_RANGE[0]) ub = self.road.np_random.uniform(size=np.shape(self.STEERING_PARAMETERS)) self.STEERING_PARAMETERS = self.STEERING_RANGE[0] + ub*(self.STEERING_RANGE[1] - self.STEERING_RANGE[0])
[docs] def acceleration(self, ego_vehicle: ControlledVehicle, front_vehicle: Vehicle = None, rear_vehicle: Vehicle = None) -> float: """ Compute an acceleration command with a Linear Model. The acceleration is chosen so as to: - reach a target speed; - reach the speed of the leading (resp following) vehicle, if it is lower (resp higher) than ego's; - maintain a minimum safety distance w.r.t the leading vehicle. :param ego_vehicle: the vehicle whose desired acceleration is to be computed. It does not have to be an Linear vehicle, which is why this method is a class method. This allows a Linear vehicle to reason about other vehicles behaviors even though they may not Linear. :param front_vehicle: the vehicle preceding the ego-vehicle :param rear_vehicle: the vehicle following the ego-vehicle :return: the acceleration command for the ego-vehicle [m/s2] """ return float(np.dot(self.ACCELERATION_PARAMETERS, self.acceleration_features(ego_vehicle, front_vehicle, rear_vehicle)))
[docs] def acceleration_features(self, ego_vehicle: ControlledVehicle, front_vehicle: Vehicle = None, rear_vehicle: Vehicle = None) -> np.ndarray: vt, dv, dp = 0, 0, 0 if ego_vehicle: vt = ego_vehicle.target_speed - ego_vehicle.speed d_safe = self.DISTANCE_WANTED + np.maximum(ego_vehicle.speed, 0) * self.TIME_WANTED if front_vehicle: d = ego_vehicle.lane_distance_to(front_vehicle) dv = min(front_vehicle.speed - ego_vehicle.speed, 0) dp = min(d - d_safe, 0) return np.array([vt, dv, dp])
[docs] def steering_control(self, target_lane_index: LaneIndex) -> float: """ Linear controller with respect to parameters. Overrides the non-linear controller ControlledVehicle.steering_control() :param target_lane_index: index of the lane to follow :return: a steering wheel angle command [rad] """ return float(np.dot(np.array(self.STEERING_PARAMETERS), self.steering_features(target_lane_index)))
[docs] def steering_features(self, target_lane_index: LaneIndex) -> np.ndarray: """ A collection of features used to follow a lane :param target_lane_index: index of the lane to follow :return: a array of features """ lane = self.road.network.get_lane(target_lane_index) lane_coords = lane.local_coordinates(self.position) lane_next_coords = lane_coords[0] + self.speed * self.PURSUIT_TAU lane_future_heading = lane.heading_at(lane_next_coords) features = np.array([utils.wrap_to_pi(lane_future_heading - self.heading) * self.LENGTH / utils.not_zero(self.speed), -lane_coords[1] * self.LENGTH / (utils.not_zero(self.speed) ** 2)]) return features
[docs] def longitudinal_structure(self): # Nominal dynamics: integrate speed A = np.array([ [0, 0, 1, 0], [0, 0, 0, 1], [0, 0, 0, 0], [0, 0, 0, 0] ]) # Target speed dynamics phi0 = np.array([ [0, 0, 0, 0], [0, 0, 0, 0], [0, 0, -1, 0], [0, 0, 0, -1] ]) # Front speed control phi1 = np.array([ [0, 0, 0, 0], [0, 0, 0, 0], [0, 0, -1, 1], [0, 0, 0, 0] ]) # Front position control phi2 = np.array([ [0, 0, 0, 0], [0, 0, 0, 0], [-1, 1, -self.TIME_WANTED, 0], [0, 0, 0, 0] ]) # Disable speed control front_vehicle, _ = self.road.neighbour_vehicles(self) if not front_vehicle or self.speed < front_vehicle.speed: phi1 *= 0 # Disable front position control if front_vehicle: d = self.lane_distance_to(front_vehicle) if d != self.DISTANCE_WANTED + self.TIME_WANTED * self.speed: phi2 *= 0 else: phi2 *= 0 phi = np.array([phi0, phi1, phi2]) return A, phi
[docs] def lateral_structure(self): A = np.array([ [0, 1], [0, 0] ]) phi0 = np.array([ [0, 0], [0, -1] ]) phi1 = np.array([ [0, 0], [-1, 0] ]) phi = np.array([phi0, phi1]) return A, phi
[docs] def collect_data(self): """Store features and outputs for parameter regression.""" self.add_features(self.data, self.target_lane_index)
[docs] def add_features(self, data, lane_index, output_lane=None): front_vehicle, rear_vehicle = self.road.neighbour_vehicles(self) features = self.acceleration_features(self, front_vehicle, rear_vehicle) output = np.dot(self.ACCELERATION_PARAMETERS, features) if "longitudinal" not in data: data["longitudinal"] = {"features": [], "outputs": []} data["longitudinal"]["features"].append(features) data["longitudinal"]["outputs"].append(output) if output_lane is None: output_lane = lane_index features = self.steering_features(lane_index) out_features = self.steering_features(output_lane) output = np.dot(self.STEERING_PARAMETERS, out_features) if "lateral" not in data: data["lateral"] = {"features": [], "outputs": []} data["lateral"]["features"].append(features) data["lateral"]["outputs"].append(output)
[docs]class AggressiveVehicle(LinearVehicle): LANE_CHANGE_MIN_ACC_GAIN = 1.0 # [m/s2] MERGE_ACC_GAIN = 0.8 MERGE_VEL_RATIO = 0.75 MERGE_TARGET_VEL = 30 ACCELERATION_PARAMETERS = [MERGE_ACC_GAIN / ((1 - MERGE_VEL_RATIO) * MERGE_TARGET_VEL), MERGE_ACC_GAIN / (MERGE_VEL_RATIO * MERGE_TARGET_VEL), 0.5]
[docs]class DefensiveVehicle(LinearVehicle): LANE_CHANGE_MIN_ACC_GAIN = 1.0 # [m/s2] MERGE_ACC_GAIN = 1.2 MERGE_VEL_RATIO = 0.75 MERGE_TARGET_VEL = 30 ACCELERATION_PARAMETERS = [MERGE_ACC_GAIN / ((1 - MERGE_VEL_RATIO) * MERGE_TARGET_VEL), MERGE_ACC_GAIN / (MERGE_VEL_RATIO * MERGE_TARGET_VEL), 2.0]