Source code for vista.entities.sensors.Camera

import copy
import os
import glob
from typing import Dict, List, Any
import numpy as np
import h5py
from ffio import FFReader
import pyrender

from .camera_utils import CameraParams, ViewSynthesis, ZNEAR, ZFAR
from .BaseSensor import BaseSensor
from ..Entity import Entity
from ...utils import logging, misc, transform


[docs]class Camera(BaseSensor): """ A RGB camera sensor object that synthesizes RGB image locally around the dataset given a viewpoint (potentially different from the dataset) and timestamp. Args: attach_to (Entity): A parent object (Car) to be attached to. config (Dict): Configuration of the sensor. An example (default) is, >>> DEFAULT_CONFIG = { 'depth_mode': 'FIXED_PLANE', 'znear': ZNEAR, 'zfar': ZFAR, 'use_lighting': False, 'directional_light_intensity': 10, 'recoloring_factor': 0.5, 'use_synthesizer': True, } Check :class:`Viewsynthesis` object for more details about the configuration. """ DEFAULT_CONFIG = { 'name': 'camera_front', 'depth_mode': 'FIXED_PLANE', 'znear': ZNEAR, 'zfar': ZFAR, 'use_lighting': False, 'directional_light_intensity': 10, 'recoloring_factor': 0.5, 'use_synthesizer': True, } def __init__(self, attach_to: Entity, config: Dict) -> None: super(Camera, self).__init__(attach_to, config) # Define the CameraParams object for each physical camera in each trace self._input_cams = { trace: CameraParams(trace.param_file, self.name) for trace in attach_to.parent.traces } for _cp in self._input_cams.values(): _cp.resize(*config['size']) # Now define the virtual camera in VISTA (defaults copied from the # first trace) first_trace = attach_to.parent.traces[0] self._virtual_cam = copy.deepcopy(self._input_cams[first_trace]) self._virtual_cam.resize(*config['size']) self._streams: Dict[str, FFReader] = dict() self._flow_streams: Dict[str, List[FFReader]] = dict() self._flow_meta: Dict[str, h5py.File] = dict() if self._config.get('use_synthesizer', True): self._view_synthesis: ViewSynthesis = ViewSynthesis( self._virtual_cam, self._config) else: self._view_synthesis = None
[docs] def reset(self) -> None: """ Reset RGB camera sensor by initiating RGB data stream based on current reference pointer to the dataset. """ logging.info(f'Camera ({self.id}) reset') # Close stream already open for stream in self.streams.values(): stream.close() for flow_stream in [ _vv for _v in self.flow_streams.values() for _vv in _v.values() ]: flow_stream.close() # Fetch video stream from the associated trace. All video streams are handled by the main # camera and shared across all cameras in an agent. multi_sensor = self.parent.trace.multi_sensor trace_camera = self._input_cams[self.parent.trace] if self.name == multi_sensor.main_camera: for camera_name in multi_sensor.camera_names: # get video stream video_path = os.path.join(self.parent.trace.trace_path, camera_name + '.avi') cam_h, cam_w = (trace_camera.get_height(), trace_camera.get_width()) stream = FFReader(video_path, custom_size=(cam_h, cam_w), verbose=False) self._streams[camera_name] = stream # seek based on timestamp frame_num = self.parent.trace.good_frames[camera_name][ self.parent.segment_index][self.parent.frame_index] seek_sec = self._streams[camera_name].frame_to_secs(frame_num) self._streams[camera_name].seek(seek_sec) # get flow flow_meta_path = os.path.join(self.parent.trace.trace_path, camera_name + '_flow_meta.h5') if os.path.exists(flow_meta_path): self._flow_meta[camera_name] = h5py.File( flow_meta_path, 'r') self._flow_streams[camera_name] = dict() for flow_name in self.flow_meta[camera_name].keys(): flow_path = os.path.join( self.parent.trace.trace_path, camera_name + '_flow_{}.mp4'.format(flow_name)) flow_stream = FFReader(flow_path, verbose=False) flow_frame_num = frame_num # flow for (frame_num, frame_num + 1) flow_seek_sec = flow_stream.frame_to_secs( flow_frame_num) flow_stream.seek(flow_seek_sec) self._flow_streams[camera_name][ flow_name] = flow_stream else: logging.warning('No flow data') else: # use shared streams from the main camera main_name = multi_sensor.main_camera main_sensor = [ _s for _s in self.parent.sensors if _s.name == main_name ] assert len(main_sensor) == 1, 'Cannot find main sensor {}'.format( main_name) main_sensor = main_sensor[0] assert isinstance(main_sensor, Camera), 'Main sensor is not Camera object' self._streams = main_sensor.streams self._flow_streams = main_sensor.flow_streams self._flow_meta = main_sensor.flow_meta # Add background mesh for all video stream in view synthesis if self.view_synthesis is not None: parent_sensor_dict = {_s.name: _s for _s in self.parent.sensors} for camera_name in self.streams.keys(): if camera_name not in self.view_synthesis.bg_mesh_names: if camera_name in parent_sensor_dict.keys(): camera_param = parent_sensor_dict[ camera_name]._input_cams[self.parent.trace] else: camera_param = self._virtual_cam self.view_synthesis.add_bg_mesh(camera_param)
[docs] def capture(self, timestamp: float, **kwargs) -> np.ndarray: """ Synthesize RGB image based on current timestamp and transformation between the novel viewpoint to be simulated and the nominal viewpoint from the pre-collected dataset. Note that if there exists optical flow data in the trace directory, the :class:`Camera` object will take the optical flow to interpolate across frame to the exact timestamp as opposed to retrieving the RGB frame with the closest timestamp in the dataset. Args: timestamp (float): Timestamp that allows to retrieve a pointer to the dataset for data-driven simulation (synthesizing RGB image from real RGB video). Returns: np.ndarray: A synthesized RGB image. """ logging.info(f'Camera ({self.id}) capture') # Get frame at the closest smaller timestamp from dataset multi_sensor = self.parent.trace.multi_sensor if self.name == multi_sensor.main_camera: fetch_smaller = self.flow_streams != dict( ) # only when using optical flow all_frame_nums = multi_sensor.get_frames_from_times([timestamp], fetch_smaller) for camera_name in multi_sensor.camera_names: # rgb camera stream stream = self.streams[camera_name] frame_num = all_frame_nums[camera_name][0] if frame_num < stream.frame_num: seek_sec = stream.frame_to_secs(frame_num) stream.seek(seek_sec) while stream.frame_num != frame_num: stream.read() # flow stream if self.flow_streams != dict(): flow_frame_num = frame_num # flow for (frame_num, frame_num + 1) for flow_stream in self.flow_streams[camera_name].values(): if flow_frame_num < flow_stream.frame_num: flow_seek_sec = flow_stream.frame_to_secs( flow_frame_num) flow_stream.seek(flow_seek_sec) while flow_stream.frame_num != flow_frame_num: flow_stream.read() frames = dict() for camera_name in multi_sensor.camera_names: frames[camera_name] = self.streams[camera_name].image.copy() # Interpolate frame at the exact timestamp if self.flow_streams != dict(): for camera_name in self.flow_streams.keys(): frame = frames[camera_name] flow = dict() for flow_name, flow_minmax in self.flow_meta[ camera_name].items(): flow_stream = self.flow_streams[camera_name][flow_name] flow[flow_name] = misc.img2flow( flow_stream.image.copy(), flow_minmax[int(flow_stream.frame_num)], frame.shape[:2]) frame_num = int(self.streams[camera_name].frame_num) curr_ref_ts = multi_sensor.get_time_from_frame_num( camera_name, frame_num) next_ref_ts = multi_sensor.get_time_from_frame_num( camera_name, frame_num + 1) logging.warning( 'Stream frame number exceed 1 non-intentionally') self.streams[camera_name].read( ) # NOTE: stream frame number exceed 1 here next_frame = self.streams[camera_name].image.copy() frames[camera_name] = misc.biinterp(frame, next_frame, flow['forward'], flow['backward'], timestamp, curr_ref_ts, next_ref_ts) # Synthesis by rendering if self.view_synthesis is not None: latlongyaw = self.parent.relative_state.numpy() trans, rot = transform.latlongyaw2vec(latlongyaw) rendered_frame, _ = self.view_synthesis.synthesize( trans, rot, frames) else: rendered_frame = frames[self.name] return rendered_frame
[docs] def update_scene_object(self, name: str, scene_object: pyrender.Mesh, pose: Any) -> None: """ Update pyrender mesh object in the scene for rendering. Args: name (str): Name of the scene object. scene_object (pyrender.Mesh): The scene object. pose (Any): The pose of the scene object. """ trans, rotvec = transform.latlongyaw2vec(pose) quat = transform.euler2quat(rotvec) self.view_synthesis.update_object_node(name, scene_object, trans, quat)
@property def config(self) -> Dict: """ Configuration of the RGB camera sensor. """ return self._config @property def camera_param(self) -> CameraParams: """ Camera parameters of the virtual camera. """ return self._virtual_cam @property def streams(self) -> Dict[str, FFReader]: """ Data stream of RGB image/video dataset to be simulated from. """ return self._streams @property def flow_streams(self) -> Dict[str, List[FFReader]]: """ Data stream of optical flow (if any). """ return self._flow_streams @property def flow_meta(self) -> Dict[str, h5py.File]: """ Meta data of optical flow (if any). """ return self._flow_meta @property def view_synthesis(self) -> ViewSynthesis: """ View synthesizer object. """ return self._view_synthesis def __repr__(self) -> str: return f'<{self.__class__.__name__} (id={self.id})> ' + \ f'name: {self.name} ' + \ f'size: {self.virtual_cam.get_height()}x{self.virtual_cam.get_width()} ' + \ f'#streams: {len(self.streams)} '