Source code for vista.tasks.lane_following

from typing import Optional, List, Dict, Any
import numpy as np

from .. import World
from ..utils import logging, misc


def default_terminal_condition(task, agent_id, **kwargs):
    """ An example definition of terminal condition. """

    agent = [_a for _a in task.world.agents if _a.id == agent_id][0]

    def _check_out_of_lane():
        road_half_width = agent.trace.road_width / 2.
        return np.abs(agent.relative_state.x) > road_half_width

    def _check_exceed_max_rot():
        maximal_rotation = np.pi / 10.
        return np.abs(agent.relative_state.theta) > maximal_rotation

    out_of_lane = _check_out_of_lane()
    exceed_max_rot = _check_exceed_max_rot()
    done = out_of_lane or exceed_max_rot or agent.done
    other_info = {
        'done': done,
        'out_of_lane': out_of_lane,
        'exceed_max_rot': exceed_max_rot,
    }

    return done, other_info


def default_reward_fn(task, agent_id, **kwargs):
    """ An example definition of reward function. """
    reward = -1 if kwargs['done'] else 0  # simply encourage survival

    return reward, {}


[docs]class LaneFollowing: """ This class defines a simple lane following task in Vista. It basically handles vehicle state update of the ego car, rendering of specified sensors, and determing reward and terminal condition. The default terminal condition is set to (1) being out of lane (2) exceed maximal roation (3) reaching the end of the trace. Args: trace_paths (List[str]): A list of trace paths. trace_config (Dict): Configuration of the trace. car_configs (List[Dict]): Configuration of ``every`` cars. sensors_configs (List[Dict]): Configuration of sensors on ``every`` cars. task_config (Dict): Configuration of the task, which specifies reward function and terminal condition. For more details, please check the source code. logging_level (str): Logging level (``DEBUG``, ``INFO``, ``WARNING``, ``ERROR``, ``CRITICAL``); default set to ``WARNING``. """ DEFAULT_CONFIG = { 'reward_fn': default_reward_fn, 'terminal_condition': default_terminal_condition, } def __init__(self, trace_paths: List[str], trace_config: Dict, car_config: Dict, sensors_configs: Optional[List[Dict]] = [], task_config: Optional[Dict] = dict(), logging_level: Optional[str] = 'WARNING'): logging.setLevel(getattr(logging, logging_level)) self._config = misc.merge_dict(task_config, self.DEFAULT_CONFIG) self._world: World = World(trace_paths, trace_config) agent = self._world.spawn_agent(car_config) for sensor_config in sensors_configs: sensor_type = sensor_config.pop('type') if sensor_type == 'camera': agent.spawn_camera(sensor_config) elif sensor_type == 'event_camera': agent.spawn_event_camera(sensor_config) elif sensor_type == 'lidar': agent.spawn_lidar(sensor_config) else: raise NotImplementedError( f'Unrecognized sensor type {sensor_type}') self._distance = 0 self._prev_xy = np.zeros((2, ))
[docs] def reset(self): """ Reset the environment. This basically reset the :class:`World` object in Vista, which reset all (the only) agent in the world. Returns: Dict: A dictionary with keys as agent IDs and values as observation for each agent, which is also a dictionary with keys as sensor IDs and values as sensory measurements. """ self._world.reset() agent = self._world.agents[0] observations = self._append_agent_id(agent.observations) self._distance = 0 self._prev_xy = np.zeros((2, )) return observations
[docs] def step(self, action, dt=1 / 30.): """ Step the environment. This involves updating agent's state based on the given actions and determining reward and termination. Args: actions (Dict[str, np.ndarray]): A dictionary with keys as agent IDs and values as actions to be executed to interact with the environment and other agents. dt (float): Elapsed time in second; default set to 1/30. Returns: Return a tuple (``dict_a``, ``dict_b``, ``dict_c``, ``dict_d``), where ``dict_a`` is the observation, ``dict_b`` is the reward, ``dict_c`` is whether the episode terminates, ``dict_d`` is additional informations for every agents; keys of every dictionary are agent IDs. """ # Step agent and get observation agent = self._world.agents[0] action = np.array([action[agent.id][0], agent.human_speed]) agent.step_dynamics(action, dt=dt) agent.step_sensors() observations = agent.observations # Define terminal condition done, info_from_terminal_condition = self.config['terminal_condition']( self, agent.id) # Define reward reward, _ = self.config['reward_fn'](self, agent.id, **info_from_terminal_condition) # Get info info = misc.fetch_agent_info(agent) info['out_of_lane'] = info_from_terminal_condition['out_of_lane'] info['exceed_rot'] = info_from_terminal_condition['exceed_rot'] current_xy = agent.ego_dynamics.numpy()[:2] self._distance += np.linalg.norm(current_xy - self._prev_xy) self._prev_xy = current_xy info['distance'] = self._distance # Pack output observations, reward, done, info = map( self._append_agent_id, [observations, reward, done, info]) return observations, reward, done, info
[docs] def set_seed(self, seed) -> None: """ Set random seed. Args: seed (int): Random seed. """ self._seed = seed self._rng = np.random.default_rng(self.seed) self.world.set_seed(seed)
def _append_agent_id(self, data): agent = self._world.agents[0] return {agent.id: data} @property def config(self) -> Dict: """ Configuration of this task. """ return self._config @property def world(self) -> World: """ :class:`World` of this task. """ return self._world @property def seed(self) -> int: """ Random seed for the task and the associated :class:`World`. """ return self._seed