import copy
import os
from typing import List, Tuple, Optional, Callable
import gym
from gym.utils import seeding
import numpy as np
from highway_env import utils
from highway_env.envs.common.action import action_factory, Action, DiscreteMetaAction, ActionType
from highway_env.envs.common.observation import observation_factory, ObservationType
from highway_env.envs.common.finite_mdp import finite_mdp
from highway_env.envs.common.graphics import EnvViewer
from highway_env.vehicle.behavior import IDMVehicle, LinearVehicle
from highway_env.vehicle.controller import MDPVehicle
Observation = np.ndarray
[docs]class AbstractEnv(gym.Env):
"""
A generic environment for various tasks involving a vehicle driving on a road.
The environment contains a road populated with vehicles, and a controlled ego-vehicle that can change lane and
speed. The action space is fixed, but the observation space and reward function must be defined in the
environment implementations.
"""
observation_type: ObservationType
action_type: ActionType
automatic_rendering_callback: Optional[Callable]
metadata = {'render.modes': ['human', 'rgb_array']}
PERCEPTION_DISTANCE = 6.0 * MDPVehicle.SPEED_MAX
"""The maximum distance of any vehicle present in the observation [m]"""
[docs] def __init__(self, config: dict = None) -> None:
# Configuration
self.config = self.default_config()
if config:
self.config.update(config)
# Seeding
self.np_random = None
self.seed()
# Scene
self.road = None
self.vehicle = None
# Spaces
self.action_type = None
self.action_space = None
self.observation_type = None
self.observation_space = None
self.define_spaces()
# Running
self.time = 0 # Simulation time
self.steps = 0 # Actions performed
self.done = False
# Rendering
self.viewer = None
self.automatic_rendering_callback = None
self.should_update_rendering = True
self.rendering_mode = 'human'
self.enable_auto_render = False
self.reset()
[docs] @classmethod
def default_config(cls) -> dict:
"""
Default environment configuration.
Can be overloaded in environment implementations, or by calling configure().
:return: a configuration dict
"""
return {
"observation": {
"type": "TimeToCollision"
},
"action": {
"type": "DiscreteMetaAction"
},
"simulation_frequency": 15, # [Hz]
"policy_frequency": 1, # [Hz]
"other_vehicles_type": "highway_env.vehicle.behavior.IDMVehicle",
"screen_width": 600, # [px]
"screen_height": 150, # [px]
"centering_position": [0.3, 0.5],
"scaling": 5.5,
"show_trajectories": False,
"render_agent": True,
"offscreen_rendering": os.environ.get("OFFSCREEN_RENDERING", "0") == "1",
"manual_control": False
}
[docs] def seed(self, seed: int = None) -> List[int]:
self.np_random, seed = seeding.np_random(seed)
return [seed]
[docs] def define_spaces(self) -> None:
self.observation_type = observation_factory(self, self.config["observation"])
self.observation_space = self.observation_type.space()
self.action_type = action_factory(self, self.config["action"])
self.action_space = self.action_type.space()
[docs] def _reward(self, action: Action) -> float:
"""
Return the reward associated with performing a given action and ending up in the current state.
:param action: the last action performed
:return: the reward
"""
raise NotImplementedError
[docs] def _is_terminal(self) -> bool:
"""
Check whether the current state is a terminal state
:return:is the state terminal
"""
raise NotImplementedError
[docs] def _cost(self, action: Action) -> float:
"""
A constraint metric, for budgeted MDP.
If a constraint is defined, it must be used with an alternate reward that doesn't contain it as a penalty.
:param action: the last action performed
:return: the constraint signal, the alternate (constraint-free) reward
"""
raise NotImplementedError
[docs] def reset(self) -> Observation:
"""
Reset the environment to it's initial configuration
:return: the observation of the reset state
"""
self.time = 0
self.done = False
self.define_spaces()
return self.observation_type.observe()
[docs] def step(self, action: Action) -> Tuple[Observation, float, bool, dict]:
"""
Perform an action and step the environment dynamics.
The action is executed by the ego-vehicle, and all other vehicles on the road performs their default behaviour
for several simulation timesteps until the next decision making step.
:param action: the action performed by the ego-vehicle
:return: a tuple (observation, reward, terminal, info)
"""
if self.road is None or self.vehicle is None:
raise NotImplementedError("The road and vehicle must be initialized in the environment implementation")
self._simulate(action)
obs = self.observation_type.observe()
reward = self._reward(action)
terminal = self._is_terminal()
info = {
"speed": self.vehicle.speed,
"crashed": self.vehicle.crashed,
"action": action,
}
try:
info["cost"] = self._cost(action)
except NotImplementedError:
pass
return obs, reward, terminal, info
[docs] def _simulate(self, action: Optional[Action] = None) -> None:
"""Perform several steps of simulation with constant action."""
for _ in range(int(self.config["simulation_frequency"] // self.config["policy_frequency"])):
# Forward action to the vehicle
if action is not None \
and not self.config["manual_control"] \
and self.time % int(self.config["simulation_frequency"] // self.config["policy_frequency"]) == 0:
self.action_type.act(action)
self.road.act()
self.road.step(1 / self.config["simulation_frequency"])
self.time += 1
# Automatically render intermediate simulation steps if a viewer has been launched
# Ignored if the rendering is done offscreen
self._automatic_rendering()
# Stop at terminal states
if self.done or self._is_terminal():
break
self.enable_auto_render = False
[docs] def render(self, mode: str = 'human') -> Optional[np.ndarray]:
"""
Render the environment.
Create a viewer if none exists, and use it to render an image.
:param mode: the rendering mode
"""
self.rendering_mode = mode
if self.viewer is None:
self.viewer = EnvViewer(self)
self.enable_auto_render = not self.viewer.offscreen
# If the frame has already been rendered, do nothing
if self.should_update_rendering:
self.viewer.display()
if not self.viewer.offscreen:
self.viewer.handle_events()
if mode == 'rgb_array':
image = self.viewer.get_image()
return image
self.should_update_rendering = False
[docs] def close(self) -> None:
"""
Close the environment.
Will close the environment viewer if it exists.
"""
self.done = True
if self.viewer is not None:
self.viewer.close()
self.viewer = None
[docs] def get_available_actions(self) -> List[int]:
"""
Get the list of currently available actions.
Lane changes are not available on the boundary of the road, and speed changes are not available at
maximal or minimal speed.
:return: the list of available actions
"""
if not isinstance(self.action_type, DiscreteMetaAction):
raise ValueError("Only discrete meta-actions can be unavailable.")
actions = [self.action_type.actions_indexes['IDLE']]
for l_index in self.road.network.side_lanes(self.vehicle.lane_index):
if l_index[2] < self.vehicle.lane_index[2] \
and self.road.network.get_lane(l_index).is_reachable_from(self.vehicle.position) \
and self.action_type.lateral:
actions.append(self.action_type.actions_indexes['LANE_LEFT'])
if l_index[2] > self.vehicle.lane_index[2] \
and self.road.network.get_lane(l_index).is_reachable_from(self.vehicle.position) \
and self.action_type.lateral:
actions.append(self.action_type.actions_indexes['LANE_RIGHT'])
if self.vehicle.speed_index < self.vehicle.SPEED_COUNT - 1 and self.action_type.longitudinal:
actions.append(self.action_type.actions_indexes['FASTER'])
if self.vehicle.speed_index > 0 and self.action_type.longitudinal:
actions.append(self.action_type.actions_indexes['SLOWER'])
return actions
[docs] def _automatic_rendering(self) -> None:
"""
Automatically render the intermediate frames while an action is still ongoing.
This allows to render the whole video and not only single steps corresponding to agent decision-making.
If a callback has been set, use it to perform the rendering. This is useful for the environment wrappers
such as video-recording monitor that need to access these intermediate renderings.
"""
if self.viewer is not None and self.enable_auto_render:
self.should_update_rendering = True
if self.automatic_rendering_callback is not None:
self.automatic_rendering_callback()
else:
self.render(self.rendering_mode)
[docs] def simplify(self) -> 'AbstractEnv':
"""
Return a simplified copy of the environment where distant vehicles have been removed from the road.
This is meant to lower the policy computational load while preserving the optimal actions set.
:return: a simplified environment state
"""
state_copy = copy.deepcopy(self)
state_copy.road.vehicles = [state_copy.vehicle] + state_copy.road.close_vehicles_to(
state_copy.vehicle, self.PERCEPTION_DISTANCE)
return state_copy
[docs] def change_vehicles(self, vehicle_class_path: str) -> 'AbstractEnv':
"""
Change the type of all vehicles on the road
:param vehicle_class_path: The path of the class of behavior for other vehicles
Example: "highway_env.vehicle.behavior.IDMVehicle"
:return: a new environment with modified behavior model for other vehicles
"""
vehicle_class = utils.class_from_path(vehicle_class_path)
env_copy = copy.deepcopy(self)
vehicles = env_copy.road.vehicles
for i, v in enumerate(vehicles):
if v is not env_copy.vehicle:
vehicles[i] = vehicle_class.create_from(v)
return env_copy
[docs] def set_preferred_lane(self, preferred_lane: int = None) -> 'AbstractEnv':
env_copy = copy.deepcopy(self)
if preferred_lane:
for v in env_copy.road.vehicles:
if isinstance(v, IDMVehicle):
v.route = [(lane[0], lane[1], preferred_lane) for lane in v.route]
# Vehicle with lane preference are also less cautious
v.LANE_CHANGE_MAX_BRAKING_IMPOSED = 1000
return env_copy
[docs] def set_route_at_intersection(self, _to: str) -> 'AbstractEnv':
env_copy = copy.deepcopy(self)
for v in env_copy.road.vehicles:
if isinstance(v, IDMVehicle):
v.set_route_at_intersection(_to)
return env_copy
[docs] def set_vehicle_field(self, args: Tuple[str, object]) -> 'AbstractEnv':
field, value = args
env_copy = copy.deepcopy(self)
for v in env_copy.road.vehicles:
if isinstance(v, LinearVehicle):
setattr(v, field, value)
return env_copy
[docs] def call_vehicle_method(self, args: Tuple[str, Tuple[object]]) -> 'AbstractEnv':
method, method_args = args
env_copy = copy.deepcopy(self)
for i, v in enumerate(env_copy.road.vehicles):
if hasattr(v, method):
env_copy.road.vehicles[i] = getattr(v, method)(*method_args)
return env_copy
[docs] def randomize_behaviour(self) -> 'AbstractEnv':
env_copy = copy.deepcopy(self)
for v in env_copy.road.vehicles:
if isinstance(v, IDMVehicle):
v.randomize_behavior()
return env_copy
[docs] def to_finite_mdp(self):
return finite_mdp(self, time_quantization=1/self.config["policy_frequency"])
[docs] def __deepcopy__(self, memo):
"""Perform a deep copy but without copying the environment viewer."""
cls = self.__class__
result = cls.__new__(cls)
memo[id(self)] = result
for k, v in self.__dict__.items():
if k not in ['viewer', 'automatic_rendering_callback']:
setattr(result, k, copy.deepcopy(v, memo))
else:
setattr(result, k, None)
return result