import os
import numpy as np
from typing import Dict, List, Optional
from collections import OrderedDict
import h5py
from . import TopicNames
from ...utils import logging
[docs]class MultiSensor:
""" This class handles synchronization across multiple sensors in a trace. It basically
reads timestamps files associated with every sensors existing in the trace and construct
several helper functions for conversion between an unified timestamp used in the simulator
and timestamp (or frame number, etc) of different sensors.
Args:
trace_dir (str): Directory to a trace.
master_sensor (str): Name of the master sensor.
Raises:
AssertionError: (1) The name of the master sensor not included in a predefined
set of topic names specified in (2) No timestamp data for the master sensor.
"""
def __init__(
self,
trace_dir: str,
master_sensor: Optional[str] = TopicNames.master_topic) -> None:
self._trace_dir: str = trace_dir
self._master_sensor: str = master_sensor
# Get frame-to-timestamp mapping for every sensors
self._sensor_frame_to_time: Dict = dict()
sensor_topic_names = [TopicNames.lidar_3d] + list(TopicNames.cameras)
assert master_sensor in sensor_topic_names, \
f'Master sensor {master_sensor} not in topic names' + \
f'{sensor_topic_names}. Please check camera config or TopicNames.py'
for fname in os.listdir(self._trace_dir):
sensor_name, ext = os.path.splitext(fname)
if sensor_name in sensor_topic_names:
fpath = os.path.join(self._trace_dir, fname)
if ext == '.csv':
data = np.genfromtxt(fpath,
delimiter=',',
skip_header=1,
dtype=np.float64)
frames, times = (data[:, 0], data[:, 1])
elif ext == ".h5":
f = h5py.File(fpath, "r")
times = f["timestamp"][:, 0]
frames = np.arange(times.shape[0])
else:
continue # Not implemented yet...
frame_to_time = OrderedDict()
for i in range(len(frames)):
frame_to_time[int(frames[i])] = times[i]
self._sensor_frame_to_time[sensor_name] = frame_to_time
self._sensor_names: List[str] = list(self._sensor_frame_to_time.keys())
assert master_sensor in self.sensor_names, \
f'No timestamp data for the master sensor {master_sensor}'
[docs] def get_time_from_frame_num(self, sensor: str, frame_num: int) -> float:
""" Compute the timestamp associated to a frame in a video return
None if we dont have information about that frame.
Args:
sensor (str): Sensor name.
frame_num (int): Frame number.
Returns:
float: Timestamp associated with the given sensor and frame number.
"""
return self._sensor_frame_to_time[sensor].get(frame_num, None)
[docs] def get_frames_from_times(
self,
timestamps: List[float],
fetch_smaller: Optional[bool] = False) -> Dict[str, List[int]]:
""" Takes in a list of timestamps and returns corresponding frame
numbers for each sensor. Note that since sensors are not necessarily
sync'ed, the returned frame numbers are the one with the closest
(smaller) timestamps.
Args:
timestamps (list): A list of timestamps.
fetch_smaller (bool): Whether to fetch the closes and smaller timestamps.
Returns:
dict: Corresponding frame numbers for all sensors.
"""
frames = dict()
timestamps = np.array(timestamps)
for sensor in self.sensor_names:
frame_to_time = self._sensor_frame_to_time[sensor]
frames[sensor] = []
pointer = 0
for ts in timestamps:
while pointer < len(frame_to_time) - 1:
if ts >= frame_to_time[pointer] and ts < frame_to_time[
pointer + 1]:
if fetch_smaller:
frames[sensor].append(pointer)
else:
if np.abs(frame_to_time[pointer] - ts) >= \
np.abs(ts - frame_to_time[pointer + 1]):
frames[sensor].append(pointer + 1)
else:
frames[sensor].append(pointer)
break
else:
pointer += 1
return frames
[docs] def get_master_timestamps(self) -> List[float]:
""" Get all timestamps of the main sensor.
Returns:
list: A list of timestamp.
"""
# using values() works since it's an ordered dict
timestamps = list(
self._sensor_frame_to_time[self._master_sensor].values())
return timestamps
[docs] def set_main_sensor(self, sensor_type: str, sensor_name: str) -> None:
""" Set main sensor based on sensor's type and name.
Args:
sensor_type (str): Type of the sensor to be set (camera, lidar, or event camera).
sensor_name (str): Name of the sensor to be set.
"""
assert sensor_type in ['camera', 'lidar', 'event_camera']
setattr(self, '_main_{}'.format(sensor_type), sensor_name)
@property
def sensor_names(self) -> List[str]:
""" A list of all sensors' names. """
return self._sensor_names
@property
def camera_names(self) -> List[str]:
""" A list of RGB cameras' names. """
logging.debug('Hacky way to include RGB camera with name front_center')
return [
_x for _x in self._sensor_names
if 'camera' in _x or _x == 'front_center'
]
@property
def main_camera(self) -> str:
""" The main RGB camera object. """
return self._main_camera if hasattr(self, '_main_camera') else None
@property
def lidar_names(self) -> List[str]:
""" A list of LiDARs' names. """
return [_x for _x in self._sensor_names if 'lidar' in _x]
@property
def main_lidar(self) -> str:
""" The main LiDAR object """
return self._main_lidar if hasattr(self, '_main_lidar') else None
@property
def main_event_camera(self) -> str:
""" The main event camera object. """
return self._main_event_camera if hasattr(
self, '_main_event_camera') else None
@property
def master_sensor(self) -> str:
""" The name of the master sensor. """
return self._master_sensor