Source code for highway_env.envs.common.observation

from typing import List, Dict, TYPE_CHECKING, Optional
from gym import spaces
import numpy as np
import pandas as pd

from highway_env import utils
from highway_env.envs.common.finite_mdp import compute_ttc_grid
from highway_env.road.lane import AbstractLane
from highway_env.vehicle.controller import MDPVehicle

if TYPE_CHECKING:
    from highway_env.envs.common.abstract import AbstractEnv


[docs]class ObservationType(object):
[docs] def space(self) -> spaces.Space: """Get the observation space.""" raise NotImplementedError()
[docs] def observe(self): """Get an observation of the environment state.""" raise NotImplementedError()
[docs]class GrayscaleObservation(ObservationType): """ An observation class that collects directly what the simulator renders Also stacks the collected frames as in the nature DQN. Specific keys are expected in the configuration dictionary passed. Example of observation dictionary in the environment config: observation": { "type": "GrayscaleObservation", "weights": [0.2989, 0.5870, 0.1140], #weights for RGB conversion, "stack_size": 4, "observation_shape": (84, 84) } Also, the screen_height and screen_width of the environment should match the expected observation_shape. """
[docs] def __init__(self, env: 'AbstractEnv', config: dict) -> None: self.env = env self.config = config self.observation_shape = config["observation_shape"] self.shape = self.observation_shape + (config["stack_size"], ) self.state = np.zeros(self.shape)
[docs] def space(self) -> spaces.Space: try: return spaces.Box(shape=self.shape, low=0, high=1, dtype=np.float32) except AttributeError: return spaces.Space()
[docs] def observe(self) -> np.ndarray: new_obs = self._record_to_grayscale() new_obs = np.reshape(new_obs, self.observation_shape) self.state = np.roll(self.state, -1, axis=-1) self.state[:, :, -1] = new_obs return self.state
[docs] def _record_to_grayscale(self) -> np.ndarray: raw_rgb = self.env.render('rgb_array') return np.dot(raw_rgb[..., :3], self.config['weights'])
[docs]class TimeToCollisionObservation(ObservationType):
[docs] def __init__(self, env: 'AbstractEnv', horizon: int = 10, **kwargs: dict) -> None: self.env = env self.horizon = horizon
[docs] def space(self) -> spaces.Space: try: return spaces.Box(shape=self.observe().shape, low=0, high=1, dtype=np.float32) except AttributeError: return spaces.Space()
[docs] def observe(self) -> np.ndarray: grid = compute_ttc_grid(self.env, time_quantization=1/self.env.config["policy_frequency"], horizon=self.horizon) padding = np.ones(np.shape(grid)) padded_grid = np.concatenate([padding, grid, padding], axis=1) obs_lanes = 3 l0 = grid.shape[1] + self.env.vehicle.lane_index[2] - obs_lanes // 2 lf = grid.shape[1] + self.env.vehicle.lane_index[2] + obs_lanes // 2 clamped_grid = padded_grid[:, l0:lf+1, :] repeats = np.ones(clamped_grid.shape[0]) repeats[np.array([0, -1])] += clamped_grid.shape[0] padded_grid = np.repeat(clamped_grid, repeats.astype(int), axis=0) obs_speeds = 3 v0 = grid.shape[0] + self.env.vehicle.speed_index - obs_speeds // 2 vf = grid.shape[0] + self.env.vehicle.speed_index + obs_speeds // 2 clamped_grid = padded_grid[v0:vf + 1, :, :] return clamped_grid
[docs]class KinematicObservation(ObservationType): """Observe the kinematics of nearby vehicles.""" FEATURES: List[str] = ['presence', 'x', 'y', 'vx', 'vy']
[docs] def __init__(self, env: 'AbstractEnv', features: List[str] = None, vehicles_count: int = 5, features_range: Dict[str, List[float]] = None, absolute: bool = False, order: str = "sorted", normalize: bool = True, clip: bool = True, see_behind: bool = False, observe_intentions: bool = False, **kwargs: dict) -> None: """ :param env: The environment to observe :param features: Names of features used in the observation :param vehicles_count: Number of observed vehicles :param absolute: Use absolute coordinates :param order: Order of observed vehicles. Values: sorted, shuffled :param normalize: Should the observation be normalized :param clip: Should the value be clipped in the desired range :param see_behind: Should the observation contains the vehicles behind :param observe_intentions: Observe the destinations of other vehicles """ self.env = env self.features = features or self.FEATURES self.vehicles_count = vehicles_count self.features_range = features_range self.absolute = absolute self.order = order self.normalize = normalize self.clip = clip self.see_behind = see_behind self.observe_intentions = observe_intentions
[docs] def space(self) -> spaces.Space: return spaces.Box(shape=(self.vehicles_count, len(self.features)), low=-1, high=1, dtype=np.float32)
[docs] def normalize_obs(self, df: pd.DataFrame) -> pd.DataFrame: """ Normalize the observation values. For now, assume that the road is straight along the x axis. :param Dataframe df: observation data """ if not self.features_range: side_lanes = self.env.road.network.all_side_lanes(self.env.vehicle.lane_index) self.features_range = { "x": [-5.0 * MDPVehicle.SPEED_MAX, 5.0 * MDPVehicle.SPEED_MAX], "y": [-AbstractLane.DEFAULT_WIDTH * len(side_lanes), AbstractLane.DEFAULT_WIDTH * len(side_lanes)], "vx": [-2*MDPVehicle.SPEED_MAX, 2*MDPVehicle.SPEED_MAX], "vy": [-2*MDPVehicle.SPEED_MAX, 2*MDPVehicle.SPEED_MAX] } for feature, f_range in self.features_range.items(): if feature in df: df[feature] = utils.lmap(df[feature], [f_range[0], f_range[1]], [-1, 1]) if self.clip: df[feature] = np.clip(df[feature], -1, 1) return df
[docs] def observe(self) -> np.ndarray: # Add ego-vehicle df = pd.DataFrame.from_records([self.env.vehicle.to_dict()])[self.features] # Add nearby traffic # sort = self.order == "sorted" close_vehicles = self.env.road.close_vehicles_to(self.env.vehicle, self.env.PERCEPTION_DISTANCE, count=self.vehicles_count - 1, see_behind=self.see_behind) if close_vehicles: origin = self.env.vehicle if not self.absolute else None df = df.append(pd.DataFrame.from_records( [v.to_dict(origin, observe_intentions=self.observe_intentions) for v in close_vehicles[-self.vehicles_count + 1:]])[self.features], ignore_index=True) # Normalize and clip if self.normalize: df = self.normalize_obs(df) # Fill missing rows if df.shape[0] < self.vehicles_count: rows = np.zeros((self.vehicles_count - df.shape[0], len(self.features))) df = df.append(pd.DataFrame(data=rows, columns=self.features), ignore_index=True) # Reorder df = df[self.features] obs = df.values.copy() if self.order == "shuffled": self.env.np_random.shuffle(obs[1:]) # Flatten return obs
[docs]class OccupancyGridObservation(ObservationType): """Observe an occupancy grid of nearby vehicles.""" FEATURES: List[str] = ['presence', 'vx', 'vy'] GRID_SIZE: List[List[float]] = [[-5.5*5, 5.5*5], [-5.5*5, 5.5*5]] GRID_STEP: List[int] = [5, 5]
[docs] def __init__(self, env: 'AbstractEnv', features: Optional[List[str]] = None, grid_size: Optional[List[List[float]]] = None, grid_step: Optional[List[int]] = None, features_range: Dict[str, List[float]] = None, absolute: bool = False, **kwargs: dict) -> None: """ :param env: The environment to observe :param features: Names of features used in the observation :param vehicles_count: Number of observed vehicles """ self.env = env self.features = features if features is not None else self.FEATURES self.grid_size = np.array(grid_size) if grid_size is not None else np.array(self.GRID_SIZE) self.grid_step = np.array(grid_step) if grid_step is not None else np.array(self.GRID_STEP) grid_shape = np.asarray(np.floor((self.grid_size[:, 1] - self.grid_size[:, 0]) / grid_step), dtype=np.int) self.grid = np.zeros((len(self.features), *grid_shape)) self.features_range = features_range self.absolute = absolute
[docs] def space(self) -> spaces.Space: return spaces.Box(shape=self.grid.shape, low=-1, high=1, dtype=np.float32)
[docs] def normalize(self, df: pd.DataFrame) -> pd.DataFrame: """ Normalize the observation values. For now, assume that the road is straight along the x axis. :param Dataframe df: observation data """ if not self.features_range: self.features_range = { "vx": [-2*MDPVehicle.SPEED_MAX, 2*MDPVehicle.SPEED_MAX], "vy": [-2*MDPVehicle.SPEED_MAX, 2*MDPVehicle.SPEED_MAX] } for feature, f_range in self.features_range.items(): if feature in df: df[feature] = utils.lmap(df[feature], [f_range[0], f_range[1]], [-1, 1]) return df
[docs] def observe(self) -> np.ndarray: if self.absolute: raise NotImplementedError() else: # Add nearby traffic self.grid.fill(0) df = pd.DataFrame.from_records( [v.to_dict(self.env.vehicle) for v in self.env.road.vehicles]) # Normalize df = self.normalize(df) # Fill-in features for layer, feature in enumerate(self.features): for _, vehicle in df.iterrows(): x, y = vehicle["x"], vehicle["y"] # Recover unnormalized coordinates for cell index if "x" in self.features_range: x = utils.lmap(x, [-1, 1], [self.features_range["x"][0], self.features_range["x"][1]]) if "y" in self.features_range: y = utils.lmap(y, [-1, 1], [self.features_range["y"][0], self.features_range["y"][1]]) cell = (int((x - self.grid_size[0, 0]) / self.grid_step[0]), int((y - self.grid_size[1, 0]) / self.grid_step[1])) if 0 <= cell[1] < self.grid.shape[-2] and 0 <= cell[0] < self.grid.shape[-1]: self.grid[layer, cell[1], cell[0]] = vehicle[feature] # Clip obs = np.clip(self.grid, -1, 1) return obs
[docs]class KinematicsGoalObservation(KinematicObservation):
[docs] def __init__(self, env: 'AbstractEnv', scales: List[float], **kwargs: dict) -> None: self.scales = np.array(scales) super().__init__(env, **kwargs)
[docs] def space(self) -> spaces.Space: try: obs = self.observe() return spaces.Dict(dict( desired_goal=spaces.Box(-np.inf, np.inf, shape=obs["desired_goal"].shape, dtype=np.float32), achieved_goal=spaces.Box(-np.inf, np.inf, shape=obs["achieved_goal"].shape, dtype=np.float32), observation=spaces.Box(-np.inf, np.inf, shape=obs["observation"].shape, dtype=np.float32), )) except AttributeError: return spaces.Space()
[docs] def observe(self) -> Dict[str, np.ndarray]: obs = np.ravel(pd.DataFrame.from_records([self.env.vehicle.to_dict()])[self.features]) goal = np.ravel(pd.DataFrame.from_records([self.env.goal.to_dict()])[self.features]) obs = { "observation": obs / self.scales, "achieved_goal": obs / self.scales, "desired_goal": goal / self.scales } return obs
[docs]class AttributesObservation(ObservationType):
[docs] def __init__(self, env: 'AbstractEnv', attributes: List[str], **kwargs: dict) -> None: self.env = env self.attributes = attributes
[docs] def space(self) -> spaces.Space: try: obs = self.observe() return spaces.Dict({ attribute: spaces.Box(-np.inf, np.inf, shape=obs[attribute].shape, dtype=np.float32) for attribute in self.attributes }) except AttributeError: return spaces.Space()
[docs] def observe(self) -> Dict[str, np.ndarray]: return { attribute: getattr(self.env, attribute) for attribute in self.attributes }
[docs]def observation_factory(env: 'AbstractEnv', config: dict) -> ObservationType: if config["type"] == "TimeToCollision": return TimeToCollisionObservation(env, **config) elif config["type"] == "Kinematics": return KinematicObservation(env, **config) elif config["type"] == "OccupancyGrid": return OccupancyGridObservation(env, **config) elif config["type"] == "KinematicsGoal": return KinematicsGoalObservation(env, **config) elif config["type"] == "GrayscaleObservation": return GrayscaleObservation(env, config) elif config["type"] == "AttributesObservation": return AttributesObservation(env, **config) else: raise ValueError("Unknown observation type")