import numpy as np
from gym.envs.registration import register
from highway_env import utils
from highway_env.envs.common.abstract import AbstractEnv
from highway_env.road.lane import LineType, StraightLane, SineLane
from highway_env.road.road import Road, RoadNetwork
from highway_env.vehicle.controller import ControlledVehicle, MDPVehicle
from highway_env.road.objects import Obstacle
[docs]class MergeEnv(AbstractEnv):
"""
A highway merge negotiation environment.
The ego-vehicle is driving on a highway and approached a merge, with some vehicles incoming on the access ramp.
It is rewarded for maintaining a high speed and avoiding collisions, but also making room for merging
vehicles.
"""
COLLISION_REWARD: float = -1
RIGHT_LANE_REWARD: float = 0.1
HIGH_SPEED_REWARD: float = 0.2
MERGING_SPEED_REWARD: float = -0.5
LANE_CHANGE_REWARD: float = -0.05
[docs] def _reward(self, action: int) -> float:
"""
The vehicle is rewarded for driving with high speed on lanes to the right and avoiding collisions
But an additional altruistic penalty is also suffered if any vehicle on the merging lane has a low speed.
:param action: the action performed
:return: the reward of the state-action transition
"""
action_reward = {0: self.LANE_CHANGE_REWARD,
1: 0,
2: self.LANE_CHANGE_REWARD,
3: 0,
4: 0}
reward = self.COLLISION_REWARD * self.vehicle.crashed \
+ self.RIGHT_LANE_REWARD * self.vehicle.lane_index[2] / 1 \
+ self.HIGH_SPEED_REWARD * self.vehicle.speed_index / (self.vehicle.SPEED_COUNT - 1)
# Altruistic penalty
for vehicle in self.road.vehicles:
if vehicle.lane_index == ("b", "c", 2) and isinstance(vehicle, ControlledVehicle):
reward += self.MERGING_SPEED_REWARD * \
(vehicle.target_speed - vehicle.speed) / vehicle.target_speed
return utils.lmap(action_reward[action] + reward,
[self.COLLISION_REWARD + self.MERGING_SPEED_REWARD,
self.HIGH_SPEED_REWARD + self.RIGHT_LANE_REWARD],
[0, 1])
[docs] def _is_terminal(self) -> bool:
"""The episode is over when a collision occurs or when the access ramp has been passed."""
return self.vehicle.crashed or self.vehicle.position[0] > 370
[docs] def reset(self) -> np.ndarray:
self._make_road()
self._make_vehicles()
return super().reset()
[docs] def _make_road(self) -> None:
"""
Make a road composed of a straight highway and a merging lane.
:return: the road
"""
net = RoadNetwork()
# Highway lanes
ends = [150, 80, 80, 150] # Before, converging, merge, after
c, s, n = LineType.CONTINUOUS_LINE, LineType.STRIPED, LineType.NONE
y = [0, StraightLane.DEFAULT_WIDTH]
line_type = [[c, s], [n, c]]
line_type_merge = [[c, s], [n, s]]
for i in range(2):
net.add_lane("a", "b", StraightLane([0, y[i]], [sum(ends[:2]), y[i]], line_types=line_type[i]))
net.add_lane("b", "c", StraightLane([sum(ends[:2]), y[i]], [sum(ends[:3]), y[i]], line_types=line_type_merge[i]))
net.add_lane("c", "d", StraightLane([sum(ends[:3]), y[i]], [sum(ends), y[i]], line_types=line_type[i]))
# Merging lane
amplitude = 3.25
ljk = StraightLane([0, 6.5 + 4 + 4], [ends[0], 6.5 + 4 + 4], line_types=[c, c], forbidden=True)
lkb = SineLane(ljk.position(ends[0], -amplitude), ljk.position(sum(ends[:2]), -amplitude),
amplitude, 2 * np.pi / (2*ends[1]), np.pi / 2, line_types=[c, c], forbidden=True)
lbc = StraightLane(lkb.position(ends[1], 0), lkb.position(ends[1], 0) + [ends[2], 0],
line_types=[n, c], forbidden=True)
net.add_lane("j", "k", ljk)
net.add_lane("k", "b", lkb)
net.add_lane("b", "c", lbc)
road = Road(network=net, np_random=self.np_random, record_history=self.config["show_trajectories"])
road.objects.append(Obstacle(road, lbc.position(ends[2], 0)))
self.road = road
[docs] def _make_vehicles(self) -> None:
"""
Populate a road with several vehicles on the highway and on the merging lane, as well as an ego-vehicle.
:return: the ego-vehicle
"""
road = self.road
ego_vehicle = self.action_type.vehicle_class(road,
road.network.get_lane(("a", "b", 1)).position(30, 0),
speed=30)
road.vehicles.append(ego_vehicle)
other_vehicles_type = utils.class_from_path(self.config["other_vehicles_type"])
road.vehicles.append(other_vehicles_type(road, road.network.get_lane(("a", "b", 0)).position(90, 0), speed=29))
road.vehicles.append(other_vehicles_type(road, road.network.get_lane(("a", "b", 1)).position(70, 0), speed=31))
road.vehicles.append(other_vehicles_type(road, road.network.get_lane(("a", "b", 0)).position(5, 0), speed=31.5))
merging_v = other_vehicles_type(road, road.network.get_lane(("j", "k", 0)).position(110, 0), speed=20)
merging_v.target_speed = 30
road.vehicles.append(merging_v)
self.vehicle = ego_vehicle
register(
id='merge-v0',
entry_point='highway_env.envs:MergeEnv',
)