from typing import List, Dict, Any, Optional, Callable
from vista.entities.sensors.EventCamera import EventCamera
import numpy as np
from collections import deque
from .Dynamics import State, StateDynamics, curvature2steering, curvature2tireangle, \
tireangle2curvature, update_with_perfect_controller
from ..Entity import Entity
from ..sensors import BaseSensor, Camera, Lidar, EventCamera
from ...core import World, Trace
from ...utils import transform, logging, misc
[docs]class Car(Entity):
""" The class of a car agent. This object lives in the :class:`World` and is attached
to a trace object that provides pointers to dataset for data-driven simulation and zero
or one or more sensor objects that synthesize sensory measurement. The update of vehicle
state is handled in this object.
Args:
world (World): The world that this agent lives in.
car_config (Dict): Configuration of the car. An example (default) is,
>>> DEFAULT_CONFIG = {
'length': 5.,
'width': 2.,
'wheel_base': 2.78,
'steering_ratio': 14.7,
'lookahead_road': False,
'road_buffer_size': 200,
}
Example Usage (always make sure reset is called first to initialize vehicle state
or pointer to the dataset for data-driven simulation)::
>>> car = world.spawn_agent(car_config)
>>> car.spawn_camera(camera_config)
>>> world.reset()
>>> car.step_dynamics(action) # update vehicle states
>>> car.step_sensors() # do sensor capture
>>> observation = car.observations # fetch sensory measurement
>>> car.step_dataset() # simply get next frame in the dataset without synthesis
"""
DEFAULT_CONFIG = {
'length': 5.,
'width': 2.,
'wheel_base': 2.78,
'steering_ratio': 14.7,
'lookahead_road': False,
'road_buffer_size': 200,
}
def __init__(self, world: World, car_config: Dict) -> None:
super(Car, self).__init__()
# Car configuration
car_config = misc.merge_dict(car_config, self.DEFAULT_CONFIG)
self._config = car_config
# Pointer to the parent vista.World object where the agent lives
self._parent: World = world
# The trace to be associated with, updated every reset
self._trace: Trace = None
# A list of sensors attached to this agent (List[vista.Sensor]).
self._sensors: List[BaseSensor] = []
# State dynamics for tracking the virtual and human agent
self._relative_state: State = State()
self._ego_dynamics: StateDynamics = StateDynamics()
self._human_dynamics: StateDynamics = StateDynamics()
# Properties of a car
self._length: float = self.config['length']
self._width: float = self.config['width']
self._wheel_base: float = self.config['wheel_base']
self._steering_ratio: float = self.config['steering_ratio']
self._speed: float = 0.
self._curvature: float = 0.
self._steering: float = 0.
self._tire_angle: float = 0.
self._human_speed: float = 0.
self._human_curvature: float = 0.
self._human_steering: float = 0.
self._human_tire_angle: float = 0.
self._timestamp: float = 0.
self._frame_number: int = 0
self._trace_index: int = 0
self._segment_index: int = 0
self._frame_index: int = 0
self._observations: Dict[str, Any] = dict()
self._done: bool = False
# Privileged information
if self.config['lookahead_road']:
road_buffer_size = self.config['road_buffer_size']
self._road: deque[np.ndarray] = deque(maxlen=road_buffer_size)
self._road_frame_idcs: deque[int] = deque(maxlen=road_buffer_size)
self._road_dynamics: StateDynamics = StateDynamics()
[docs] def spawn_camera(self, config: Dict) -> Camera:
""" Spawn and attach a camera to this car.
Args:
config (Dict): Configuration of camera and rendering. For
more details, please check the doc of
:class:`Camera` sensor.
Returns:
Camera: a vista camera sensor object spawned.
"""
name = config.get('name', None)
logging.info(f'Spawn a new camera {name} in car ({self.id})')
cam = Camera(attach_to=self, config=config)
self._sensors.append(cam)
return cam
[docs] def spawn_lidar(self, config: Dict) -> Lidar:
""" Spawn and attach a LiDAR to this car.
Args:
config (Dict): Configuration of LiDAR. For more details,
please check the doc of :class:`Lidar` sensor.
Returns:
Lidar: a vista Lidar sensor object spawned.
"""
name = config.get('name', None)
logging.info(f'Spawn a new lidar {name} in car ({self.id})')
lidar = Lidar(attach_to=self, config=config)
self._sensors.append(lidar)
return lidar
[docs] def spawn_event_camera(self, config: Dict) -> EventCamera:
""" Spawn and attach an event camera to this car.
Args:
config (Dict): Configuration of event camera. For more details,
please check the doc of :class:`EventCamera` sensor.
Returns:
EventCamera: a vista event camera sensor object spawned.
"""
name = config.get('name', None)
logging.info(f'Spawn a new event camera {name} in car ({self.id})')
event_cam = EventCamera(attach_to=self, config=config)
self._sensors.append(event_cam)
return event_cam
[docs] def reset(self,
trace_index: int,
segment_index: int,
frame_index: int,
initial_dynamics_fn: Optional[Callable] = None,
step_sensors: Optional[bool] = True) -> None:
""" Reset the car. This involves pointing to somewhere in the dataset for later-on
data-driven simulation, initializing vehicle state, and resetting all sensors attached
to this car. If ``lookahead_road = True``, the road cache will also be reset.
Args:
trace_index (int): A pointer to which trace to be simulated on.
segment_index (int): A pointer to which segment in a trace to be simulated on.
frame_index (int): A pointer to which frame in a segment to be simulated on.
initial_dynamics_fn (Callable): A function to initialize vehicle state. The
function takes x, y, yaw, steering (tire angle), and speed as inputs. Default
is set to ``None``, which initialize vehicle with the same state as the dataset.
step_sensors (bool): Whether to step sensor; default is set to ``True``.
"""
logging.info(f'Car ({self.id}) reset')
# Update pointers to dataset
self._trace = self.parent.traces[trace_index]
self._timestamp = self.trace.get_master_timestamp(
segment_index, frame_index)
self._frame_number = self.trace.get_master_frame_number(
segment_index, frame_index)
self._trace_index = trace_index
self._segment_index = segment_index
self._frame_index = frame_index
self._done = False
# Reset states and dynamics
self._human_speed = self.trace.f_speed(self.timestamp)
self._human_curvature = self.trace.f_curvature(self.timestamp)
self._human_steering = curvature2steering(self.human_curvature,
self.wheel_base,
self.steering_ratio)
self._human_tire_angle = curvature2tireangle(self.human_curvature,
self.wheel_base)
self.human_dynamics.update(0., 0., 0., self.human_tire_angle,
self.human_speed)
initial_dynamics = self.human_dynamics.numpy().copy()
if initial_dynamics_fn is not None:
initial_dynamics = initial_dynamics_fn(*initial_dynamics)
self.ego_dynamics.update(*initial_dynamics)
self._speed = self.human_speed
self._curvature = self.human_curvature
self._steering = self.human_steering
self._tire_angle = curvature2tireangle(self.curvature, self.wheel_base)
latlongyaw = transform.compute_relative_latlongyaw(
self.ego_dynamics.numpy()[:3],
self.human_dynamics.numpy()[:3])
self._relative_state.update(*latlongyaw)
# Reset for privileged information
if hasattr(self, '_road'):
self._road.clear()
self._road.append(self.human_dynamics.numpy())
self._road_dynamics = self.human_dynamics.copy()
self._road_frame_idcs.clear()
self._road_frame_idcs.append(self.frame_index)
self._update_road()
# Reset sensors
for sensor in self.sensors:
if isinstance(
sensor,
Camera) and self._trace.multi_sensor.main_camera is None:
self._trace.multi_sensor.set_main_sensor('camera', sensor.name)
elif isinstance(
sensor,
Lidar) and self._trace.multi_sensor.main_lidar is None:
self._trace.multi_sensor.set_main_sensor('lidar', sensor.name)
elif isinstance(
sensor, EventCamera
) and self._trace.multi_sensor.main_event_camera is None:
self._trace.multi_sensor.set_main_sensor(
'event_camera', sensor.name)
sensor.reset()
# Update observation
if step_sensors:
self.step_sensors()
[docs] def step_dataset(self, step_dynamics=True):
""" Step through the dataset without rendering. This is basically
fetching the next frame from the dataset. Normally, it is called
when doing imitation learning.
Args:
step_dynamics (bool): Whether to update vehicle state; default
is set to ``True``.
Raises:
NotImplementedError: if any attached sensor has no implemented
function for stepping through dataset.
"""
logging.info(f'Car ({self.id}) step based on dataset')
# Step by incrementing frame number
ts = self.timestamp
frame_index = self.frame_index + 1
exceed_end, self._timestamp = self.trace.get_master_timestamp(
self.segment_index, frame_index, check_end=True)
if exceed_end: # trigger trace done terminatal condition
self._done = True
logging.info(f'Car ({self.id}) exceed the end of trace')
else:
self._frame_index = frame_index
self._frame_number = self.trace.get_master_frame_number(
self.segment_index, self.frame_index)
self._human_speed = self.trace.f_speed(self.timestamp)
self._human_curvature = self.trace.f_curvature(self.timestamp)
self._human_steering = curvature2steering(self.human_curvature,
self.wheel_base,
self.steering_ratio)
self._human_tire_angle = curvature2tireangle(
self.human_curvature, self.wheel_base)
self._speed = self._human_speed
self._curvature = self._human_curvature
self._steering = self._human_steering
self._tire_angle = self._human_tire_angle
# Step human dynamics if queried
if step_dynamics:
current_state = [
curvature2tireangle(self.human_curvature, self.wheel_base),
self.human_speed
]
update_with_perfect_controller(current_state,
self.timestamp - ts,
self._human_dynamics)
self._ego_dynamics = self.human_dynamics.copy()
# Get image frame
self._observations = dict()
for sensor in self.sensors:
if type(sensor) not in [Camera, Lidar, EventCamera]:
raise NotImplementedError(
f'Sensor {sensor} is not supported in step dataset')
self._observations[sensor.name] = sensor.capture(
self.timestamp)
[docs] def step_dynamics(self,
action: np.ndarray,
dt: Optional[float] = 1 / 30.,
update_road: Optional[bool] = True) -> None:
""" Update vehicle state given control command based on vehicle dynamics
and update timestamp, which is then used to update pointer to the dataset
for data-driven simulation.
Args:
action (np.ndarray): Control command (curvature and speed).
dt (float): Elapsed time.
update_road (bool): Whether to update road cache; default is
set to ``True``.
"""
assert not self.done, 'Agent status is done. Please call reset first.'
logging.info(f'Car ({self.id}) step dynamics')
# Parse action
action = np.array(action).reshape(-1)
assert action.shape[0] == 2
desired_curvature, desired_speed = action
desired_tire_angle = curvature2tireangle(desired_curvature,
self.wheel_base)
# Run low-level controller and step vehicle dynamics
# TODO: non-perfect low-level controller
logging.debug('Using perfect low-level controller now')
desired_state = [desired_tire_angle, desired_speed]
update_with_perfect_controller(desired_state, dt, self._ego_dynamics)
# Update based on vehicle dynamics feedback
self._tire_angle = self.ego_dynamics.steering
self._speed = self.ego_dynamics.speed
self._curvature = tireangle2curvature(self.tire_angle, self.wheel_base)
self._steering = curvature2steering(self.curvature, self.wheel_base,
self.steering_ratio)
# Update human (reference) dynamics for assoication with the trace / dataset
human = self.human_dynamics.copy()
top2_closest = dict(dist=deque([np.inf, np.inf], maxlen=2),
dynamics=deque([None, None], maxlen=2),
timestamp=deque([None, None], maxlen=2),
index=deque([None, None], maxlen=2))
index = self.frame_index
ts = self.trace.get_master_timestamp(self.segment_index, index)
while True:
dist = np.linalg.norm(human.numpy()[:2] -
self.ego_dynamics.numpy()[:2])
if dist < top2_closest['dist'][1]:
if dist < top2_closest['dist'][0]:
top2_closest['dist'].appendleft(dist)
top2_closest['dynamics'].appendleft(human.copy())
top2_closest['timestamp'].appendleft(ts)
top2_closest['index'].appendleft(index)
else:
top2_closest['dist'][1] = dist
top2_closest['dynamics'][1] = human.copy()
top2_closest['timestamp'][1] = ts
top2_closest['index'][1] = index
else:
break
next_index = index + 1 * int(np.sign(dt))
exceed_end, next_ts = self.trace.get_master_timestamp(
self.segment_index, next_index, check_end=True)
if exceed_end: # trigger trace done terminatal condition
self._done = True
logging.info(f'Car ({self.id}) exceed the end of trace')
current_state = [
curvature2tireangle(self.trace.f_curvature(ts),
self.wheel_base),
self.trace.f_speed(ts)
]
update_with_perfect_controller(current_state, next_ts - ts, human)
index = next_index
ts = next_ts
self._human_dynamics = top2_closest['dynamics'][0].copy()
self._frame_index = top2_closest['index'][0]
self._frame_number = self.trace.get_master_frame_number(
self.segment_index, self.frame_index)
# Update timestamp based on where the car position is with respect to
# course distance of trace (may not be exactly the same as any of
# timestamps in the dataset)
latlongyaw_closest = transform.compute_relative_latlongyaw(
self.ego_dynamics.numpy()[:3],
top2_closest['dynamics'][0].numpy()[:3])
latlongyaw_second_closest = transform.compute_relative_latlongyaw(
self.ego_dynamics.numpy()[:3],
top2_closest['dynamics'][1].numpy()[:3])
ratio = abs(latlongyaw_second_closest[1]) / (
abs(latlongyaw_closest[1]) + abs(latlongyaw_second_closest[1]))
self._timestamp = (ratio * top2_closest['timestamp'][0] +
(1. - ratio) * top2_closest['timestamp'][1])
# Update human control based on current timestamp
self._human_speed = self.trace.f_speed(self.timestamp)
self._human_curvature = self.trace.f_curvature(self.timestamp)
self._human_steering = curvature2steering(self.human_curvature,
self.wheel_base,
self.steering_ratio)
self._human_tire_angle = curvature2tireangle(self.human_curvature,
self.wheel_base)
# Update relative transformation between human and ego dynamics
self._relative_state.update(*latlongyaw_closest)
# Update privileged information (in global coordinates)
if update_road and hasattr(self, '_road'):
self._update_road()
[docs] def step_sensors(self) -> None:
""" Update sensor measurement given current state of the vehicle. """
logging.info(f'Car ({self.id}) step sensors')
self._observations = dict()
for sensor in self.sensors:
self._observations[sensor.name] = sensor.capture(self.timestamp)
def _update_road(self) -> None:
exceed_end = False
get_timestamp = lambda _idx: self.trace.get_master_timestamp(
self.segment_index, _idx, check_end=True)
while self._road_frame_idcs[-1] < (
self.frame_index + self._road.maxlen / 2.) and not exceed_end:
exceed_end, ts = get_timestamp(self._road_frame_idcs[-1])
self._road_frame_idcs.append(self._road_frame_idcs[-1] + 1)
exceed_end, next_ts = get_timestamp(self._road_frame_idcs[-1])
state = [
curvature2tireangle(self.trace.f_curvature(ts),
self.wheel_base),
self.trace.f_speed(ts)
]
update_with_perfect_controller(state, next_ts - ts,
self._road_dynamics)
self._road.append(self._road_dynamics.numpy())
@property
def trace(self) -> Trace:
""" The :class:`Trace` currently associated with the car. """
return self._trace
@property
def sensors(self) -> List[BaseSensor]:
""" All sensors attached to this car. """
return self._sensors
@property
def relative_state(self) -> State:
""" Relative transform between ``ego_dynamics`` and ``human_dynamics``. """
return self._relative_state
@property
def ego_dynamics(self) -> StateDynamics:
""" Current simulated vehicle state. """
return self._ego_dynamics
@property
def human_dynamics(self) -> StateDynamics:
""" Vehicle state of the current pointer to the dataset (human trajectory). """
return self._human_dynamics
@property
def length(self) -> float:
""" Car length. """
return self._length
@property
def width(self) -> float:
""" Car width. """
return self._width
@property
def wheel_base(self) -> float:
""" Wheel base. """
return self._wheel_base
@property
def steering_ratio(self) -> float:
""" Steering ratio. """
return self._steering_ratio
@property
def speed(self) -> float:
""" Speed of simulated trajectory (this car) in current timestamp. """
return self._speed
@property
def curvature(self) -> float:
""" Curvature of simulated trajectory (this car) in current timestamp. """
return self._curvature
@property
def steering(self) -> float:
""" Steering angle of simulated trajectory (this car) in current timestamp. """
return self._steering
@property
def tire_angle(self) -> float:
""" Tire angle of simulated trajectory (this car) in current timestamp. """
return self._tire_angle
@property
def human_speed(self) -> float:
""" Speed of human trajectory in current timestamp. """
return self._human_speed
@property
def human_curvature(self) -> float:
""" Curvature of human trajectory in current timestamp. """
return self._human_curvature
@property
def human_steering(self) -> float:
""" Steering angle of human trajectory in current timestamp. """
return self._human_steering
@property
def human_tire_angle(self) -> float:
""" Tire angle of human trajectory in current timestamp. """
return self._human_tire_angle
@property
def timestamp(self) -> float:
""" Current timestamp (normally ROS timestamp). This serves as a
continuous pointer to the dataset as opposed to ``trace_index``,
``segment_index``, and ``frame_index``. """
return self._timestamp
@property
def frame_number(self) -> int:
""" Current frame number. Note that this is different from ``frame_index``
as it is a different pointer based on how we define frame in the ``master_sensor``
instead of a pointer to the dataset. There is only one unique pointer to the dataset,
which can be mapped to (potentially) different pointers to the frame number in
different sensors. """
return self._frame_number
@property
def trace_index(self) -> int:
""" Current pointer to the trace. """
return self._trace_index
@property
def segment_index(self) -> int:
""" Current pointer to the segment in the current trace. """
return self._segment_index
@property
def frame_index(self) -> int:
""" Current pointer to the frame in the current segment. """
return self._frame_index
@property
def observations(self) -> Dict[str, Any]:
""" Sensory measurement at current timestamp. """
return self._observations
@property
def done(self) -> bool:
""" Whether exceeding the end of the trace currently associated with the car. """
return self._done
@property
def road(self) -> np.ndarray:
""" Road cache if ``lookahead_road = True`` otherwise ``None``. """
return np.array(self._road) if hasattr(self, '_road') else None
@property
def config(self) -> Dict:
""" Configuration of this car. """
return self._config
def __repr__(self) -> str:
return f'<{self.__class__.__name__} (id={self.id})> ' + \
f'width: {self.width} ' + \
f'length: {self.length} ' + \
f'wheel_base: {self.wheel_base} ' + \
f'steering_ratio: {self.steering_ratio} ' + \
f'speed: {self.speed} ' + \
f'curvature: {self.curvature} '