Source code for vista.entities.sensors.Lidar

import os
from typing import Dict, Any
import numpy as np
import h5py

from .BaseSensor import BaseSensor
from .lidar_utils import LidarSynthesis, Pointcloud
from ..Entity import Entity
from ...core import Trace
from ...utils import logging, parse_params


[docs]class Lidar(BaseSensor): """ A LiDAR sensor object that synthesizes LiDAR measurement locally around the dataset given a viewpoint (potentially different from the dataset) and timestamp. Args: attach_to (Entity): A car to be attached to. config (dict): Configuration of LiDAR sensor. An example (default) is, >>> DEFAULT_CONFIG = { 'name': 'lidar_3d', 'yaw_fov': None, 'pitch_fov': None, 'culling_r': 1, 'use_synthesizer': True, } Check :class:`Lidarsynthesis` object for more details about the configuration. """ DEFAULT_CONFIG = { 'name': 'lidar_3d', 'yaw_fov': None, 'pitch_fov': None, 'culling_r': 1, 'use_synthesizer': True, } def __init__(self, attach_to: Entity, config: Dict) -> None: super(Lidar, self).__init__(attach_to, config) logging.debug('Not actually streaming lidar data when reading') self._streams: Dict[str, h5py.File] = dict() # Initialize lidar novel view synthesis object if self.config['use_synthesizer']: # Make one view synthesizer for each trace within # attach_to.parent.traces (different traces may have different # input sensors specs). For each synthesizer, pass in the input # and output lidar params. Then during synthesis, use the # appropraite synthesizer. self._view_synthesizers = {} for trace in attach_to.parent.traces: pfile = parse_params.ParamsFile(trace.param_file) in_params, _ = pfile.parse_lidar(self.name) self._view_synthesizers[trace] = LidarSynthesis( load_model=True, input_yaw_fov=in_params['yaw_fov'], input_pitch_fov=in_params['pitch_fov'], **self.config) else: self._view_synthesizers = None
[docs] def reset(self) -> None: """ Reset LiDAR sensor by initiating LiDAR data stream based on current reference pointer to the dataset. """ logging.info(f'Lidar ({self.id}) reset') # Initiate lidar data stream based on current reference pointer to the # dataset. All data streams are handled by the main lidar and shared # across all Lidar objects in an agent. multi_sensor = self.parent.trace.multi_sensor if self.name == multi_sensor.main_lidar: for lidar_name in multi_sensor.lidar_names: fpath = os.path.join(self.parent.trace.trace_path, lidar_name + '.h5') stream = h5py.File(fpath, 'r') self._streams[lidar_name] = stream else: main_name = multi_sensor.main_lidar main_sensor = [ _s for _s in self.parent.sensors if _s.name == main_name ] assert len(main_sensor) == 1, \ f'Cannot find main sensor {main_name}' main_sensor = main_sensor[0] assert isinstance(main_sensor, Lidar), \ 'Main sensor is not Lidar object' self._streams = main_sensor.streams # reset lidar synthesis pass
[docs] def capture(self, timestamp: float, **kwargs) -> np.ndarray: """ Synthesize LiDAR point cloud based on current timestamp and transformation between the novel viewpoint to be simulated and the nominal viewpoint from the pre-collected dataset. Args: timestamp (float): Timestamp that allows to retrieve a pointer to the dataset for data-driven simulation (synthesizing point cloud from real LiDAR sweep). Returns: np.ndarray: Synthesized point cloud. """ logging.info(f'Lidar ({self.id}) capture') # Get frame at the closest smaller timestamp from dataset. multi_sensor = self.parent.trace.multi_sensor all_frame_nums = multi_sensor.get_frames_from_times([timestamp]) for lidar_name in multi_sensor.lidar_names: stream = self.streams[lidar_name] frame_num = all_frame_nums[lidar_name][0] xyz = stream['xyz'][frame_num] intensity = stream['intensity'][frame_num] pcd = Pointcloud(xyz, intensity) pcd = pcd[pcd.dist > 2.5] # TODO: when is it possible for there to be multiple (multi_sensor.lidar_names)? # Synthesis by rendering if self.config['use_synthesizer']: # self.parent.reslative_state.update(0, 0, yaw=np.sin(timestamp)) lat, long, yaw = self.parent.relative_state.numpy() logging.debug( f"state: {lat} {long} {yaw} \t timestamp {timestamp}") trans = np.array([-long, lat, 0]) rot = np.array([0., 0, yaw]) # TODO: should yaw be Y or Z? synthesizer = self._view_synthesizers[self.parent.trace] new_pcd, new_dense = synthesizer.synthesize(trans, rot, pcd) else: new_pcd = pcd return new_pcd
[docs] def update_scene_object(self, name: str, scene_object: Any, pose: Any) -> None: """ Adding virtual object in LiDAR synthesis is not yet implemented. """ raise NotImplementedError
@property def config(self) -> Dict: """ Configuration of the LiDAR sensor. """ return self._config @property def streams(self) -> Dict[str, h5py.File]: """ Data stream of LiDAR dataset to be simulated from. """ return self._streams @property def view_synthesis(self) -> LidarSynthesis: """ Wiew synthesizer object for the first trace. """ first = list(self._view_synthesizers.keys())[0] return self._view_synthesizers[first]