import numpy as np
from typing import Tuple
from gym.envs.registration import register
from highway_env import utils
from highway_env.envs.common.abstract import AbstractEnv
from highway_env.envs.common.action import Action
from highway_env.road.road import Road, RoadNetwork
from highway_env.vehicle.controller import MDPVehicle, ControlledVehicle
[docs]class HighwayEnv(AbstractEnv):
"""
A highway driving environment.
The vehicle is driving on a straight highway with several lanes, and is rewarded for reaching a high speed,
staying on the rightmost lanes and avoiding collisions.
"""
RIGHT_LANE_REWARD: float = 0.1
"""The reward received when driving on the right-most lanes, linearly mapped to zero for other lanes."""
HIGH_SPEED_REWARD: float = 0.4
"""The reward received when driving at full speed, linearly mapped to zero for lower speeds."""
LANE_CHANGE_REWARD: float = 0
"""The reward received at each lane change action."""
[docs] def default_config(self) -> dict:
config = super().default_config()
config.update({
"observation": {
"type": "Kinematics"
},
"action": {
"type": "DiscreteMetaAction",
},
"lanes_count": 4,
"vehicles_count": 50,
"duration": 40, # [s]
"initial_spacing": 2,
"collision_reward": -1 # The reward received when colliding with a vehicle.
})
return config
[docs] def reset(self) -> np.ndarray:
self._create_road()
self._create_vehicles()
self.steps = 0
return super().reset()
[docs] def step(self, action: int) -> Tuple[np.ndarray, float, bool, dict]:
self.steps += 1
return super().step(action)
[docs] def _create_road(self) -> None:
"""Create a road composed of straight adjacent lanes."""
self.road = Road(network=RoadNetwork.straight_road_network(self.config["lanes_count"]),
np_random=self.np_random, record_history=self.config["show_trajectories"])
[docs] def _create_vehicles(self) -> None:
"""Create some new random vehicles of a given type, and add them on the road."""
self.vehicle = self.action_type.vehicle_class.create_random(self.road, 25, spacing=self.config["initial_spacing"])
self.road.vehicles.append(self.vehicle)
vehicles_type = utils.class_from_path(self.config["other_vehicles_type"])
for _ in range(self.config["vehicles_count"]):
self.road.vehicles.append(vehicles_type.create_random(self.road))
[docs] def _reward(self, action: Action) -> float:
"""
The reward is defined to foster driving at high speed, on the rightmost lanes, and to avoid collisions.
:param action: the last action performed
:return: the corresponding reward
"""
neighbours = self.road.network.all_side_lanes(self.vehicle.lane_index)
lane = self.vehicle.target_lane_index[2] if isinstance(self.vehicle, ControlledVehicle) \
else self.vehicle.lane_index[2]
speed = self.vehicle.speed_index if isinstance(self.vehicle, MDPVehicle) \
else MDPVehicle.speed_to_index(self.vehicle.speed)
reward = \
+ self.config["collision_reward"] * self.vehicle.crashed \
+ self.RIGHT_LANE_REWARD * lane / (len(neighbours) - 1) \
+ self.HIGH_SPEED_REWARD * speed / (MDPVehicle.SPEED_COUNT - 1)
reward = utils.lmap(reward,
[self.config["collision_reward"], self.HIGH_SPEED_REWARD + self.RIGHT_LANE_REWARD],
[0, 1])
reward = 0 if not self.vehicle.on_road else reward
return reward
[docs] def _is_terminal(self) -> bool:
"""The episode is over if the ego vehicle crashed or the time is out."""
return self.vehicle.crashed or self.steps >= self.config["duration"]
[docs] def _cost(self, action: int) -> float:
"""The cost signal is the occurrence of collision."""
return float(self.vehicle.crashed)
register(
id='highway-v0',
entry_point='highway_env.envs:HighwayEnv',
)