Source code for highway_env.envs.highway_env

import numpy as np
from typing import Tuple
from gym.envs.registration import register

from highway_env import utils
from highway_env.envs.common.abstract import AbstractEnv
from highway_env.envs.common.action import Action
from highway_env.road.road import Road, RoadNetwork
from highway_env.vehicle.controller import MDPVehicle, ControlledVehicle


[docs]class HighwayEnv(AbstractEnv): """ A highway driving environment. The vehicle is driving on a straight highway with several lanes, and is rewarded for reaching a high speed, staying on the rightmost lanes and avoiding collisions. """ RIGHT_LANE_REWARD: float = 0.1 """The reward received when driving on the right-most lanes, linearly mapped to zero for other lanes.""" HIGH_SPEED_REWARD: float = 0.4 """The reward received when driving at full speed, linearly mapped to zero for lower speeds.""" LANE_CHANGE_REWARD: float = 0 """The reward received at each lane change action."""
[docs] def default_config(self) -> dict: config = super().default_config() config.update({ "observation": { "type": "Kinematics" }, "action": { "type": "DiscreteMetaAction", }, "lanes_count": 4, "vehicles_count": 50, "duration": 40, # [s] "initial_spacing": 2, "collision_reward": -1 # The reward received when colliding with a vehicle. }) return config
[docs] def reset(self) -> np.ndarray: self._create_road() self._create_vehicles() self.steps = 0 return super().reset()
[docs] def step(self, action: int) -> Tuple[np.ndarray, float, bool, dict]: self.steps += 1 return super().step(action)
[docs] def _create_road(self) -> None: """Create a road composed of straight adjacent lanes.""" self.road = Road(network=RoadNetwork.straight_road_network(self.config["lanes_count"]), np_random=self.np_random, record_history=self.config["show_trajectories"])
[docs] def _create_vehicles(self) -> None: """Create some new random vehicles of a given type, and add them on the road.""" self.vehicle = self.action_type.vehicle_class.create_random(self.road, 25, spacing=self.config["initial_spacing"]) self.road.vehicles.append(self.vehicle) vehicles_type = utils.class_from_path(self.config["other_vehicles_type"]) for _ in range(self.config["vehicles_count"]): self.road.vehicles.append(vehicles_type.create_random(self.road))
[docs] def _reward(self, action: Action) -> float: """ The reward is defined to foster driving at high speed, on the rightmost lanes, and to avoid collisions. :param action: the last action performed :return: the corresponding reward """ neighbours = self.road.network.all_side_lanes(self.vehicle.lane_index) lane = self.vehicle.target_lane_index[2] if isinstance(self.vehicle, ControlledVehicle) \ else self.vehicle.lane_index[2] speed = self.vehicle.speed_index if isinstance(self.vehicle, MDPVehicle) \ else MDPVehicle.speed_to_index(self.vehicle.speed) reward = \ + self.config["collision_reward"] * self.vehicle.crashed \ + self.RIGHT_LANE_REWARD * lane / (len(neighbours) - 1) \ + self.HIGH_SPEED_REWARD * speed / (MDPVehicle.SPEED_COUNT - 1) reward = utils.lmap(reward, [self.config["collision_reward"], self.HIGH_SPEED_REWARD + self.RIGHT_LANE_REWARD], [0, 1]) reward = 0 if not self.vehicle.on_road else reward return reward
[docs] def _is_terminal(self) -> bool: """The episode is over if the ego vehicle crashed or the time is out.""" return self.vehicle.crashed or self.steps >= self.config["duration"]
[docs] def _cost(self, action: int) -> float: """The cost signal is the occurrence of collision.""" return float(self.vehicle.crashed)
register( id='highway-v0', entry_point='highway_env.envs:HighwayEnv', )