import os
import sys
from typing import Dict, List, Optional
import numpy as np
from ffio import FFReader
import cv2
import torch
from .camera_utils import CameraParams, ViewSynthesis
from .BaseSensor import BaseSensor
from .Camera import Camera
from ..Entity import Entity
from ...utils import logging, misc
try:
from metavision_core.event_io.raw_reader import RawReader
except ImportError:
logging.warning(
'Fail to import module for event camera. Remember to do ' +
'source <some-dir>/openeb/build/utils/scripts/setup_env.sh' +
'Can ignore this if not using it')
[docs]class EventCamera(BaseSensor):
""" A event camera sensor object that synthesizes event data locally around the RGB
dataset given a viewpoint (potentially different from the dataset) and timestamp with
video interpolation and an event emission model.
Args:
attach_to (Entity): A parent object (car) to be attached to.
config (Dict): Configuration of the sensor. An example (default) config is,
>>> DEFAULT_CONFIG = {
'rig_path': None,
# Event camera
'name': 'event_camera_front',
'original_size': (480, 640),
'size': (240, 320),
'optical_flow_root': '../data_prep/Super-SloMo',
'checkpoint': '../data_prep/Super-SloMo/ckpt/SuperSloMo.ckpt',
'lambda_flow': 0.5,
'max_sf': 16,
'use_gpu': True,
'positive_threshold': 0.1,
'sigma_positive_threshold': 0.02,
'negative_threshold': -0.1,
'sigma_negative_threshold': 0.02,
'reproject_pixel': False,
'subsampling_ratio': 0.5,
# RGB rendering
'base_camera_name': 'camera_front',
'base_size': (600, 960),
'depth_mode': 'FIXED_PLANE',
'use_lighting': False,
}
Note that event camera simulation requires third-party dependence and pretrained
checkpoint for video interpolation.
"""
DEFAULT_CONFIG = {
'rig_path': None,
# Event camera
'name': 'event_camera_front',
'original_size': (480, 640),
'size': (240, 320),
'optical_flow_root': '../data_prep/Super-SloMo',
'checkpoint': '../data_prep/Super-SloMo/ckpt/SuperSloMo.ckpt',
'lambda_flow': 0.5,
'max_sf': 16,
'use_gpu': True,
'positive_threshold': 0.1,
'sigma_positive_threshold': 0.02,
'negative_threshold': -0.1,
'sigma_negative_threshold': 0.02,
'reproject_pixel': False,
'subsampling_ratio': 0.5,
# RGB rendering
'base_camera_name': 'camera_front',
'base_size': (600, 960),
'depth_mode': 'FIXED_PLANE',
'use_lighting': False,
}
def __init__(self, attach_to: Entity, config: Dict) -> None:
super(EventCamera, self).__init__(attach_to, config)
self._config['rig_path'] = os.path.expanduser(self._config['rig_path'])
self._camera_param: CameraParams = CameraParams(
self._config['rig_path'], self.name)
self._camera_param.resize(*self._config['size'])
self._base_camera_param: CameraParams = CameraParams(
self._config['rig_path'], self._config['base_camera_name'])
self._base_camera_param.resize(*self._config['base_size'])
self._config['use_synthesizer'] = self._config.get(
'use_synthesizer', True)
if self._config['use_synthesizer']:
self._streams: Dict[str, FFReader] = dict()
vs_cam_param = self._base_camera_param if \
self.config['reproject_pixel'] else self._camera_param
self._view_synthesis: ViewSynthesis = ViewSynthesis(
vs_cam_param, self._config, init_with_bg_mesh=False)
# load video interpolation model
sys.path.append(os.path.abspath(self._config['optical_flow_root']))
from slowmo_warp import SlowMoWarp
self._config['checkpoint'] = os.path.abspath(
self._config['checkpoint'])
height = self._config['base_size'][0] if \
self.config['reproject_pixel'] else self._config['size'][0]
width = self._config['base_size'][1] if \
self.config['reproject_pixel'] else self._config['size'][1]
self._interp = SlowMoWarp(height=height,
width=width,
checkpoint=self._config['checkpoint'],
lambda_flow=self._config['lambda_flow'],
cuda=self._config['use_gpu'])
self._prev_frame: np.ndarray = None
self._prev_timestamp: float = None
else:
self._streams: Dict[str, RawReader] = dict()
self._prev_timestamp: float = None
[docs] def reset(self) -> None:
""" Reset Event camera sensor by initiating RGB data stream (as it's simulated
using RGB data) based on current reference pointer to the dataset.
"""
logging.info(f'Event camera ({self.id}) reset')
if self._config['use_synthesizer']:
# Close stream already open
for stream in self.streams.values():
stream.close()
# Fetch video stream from the associated trace. This is mostly the same as how we handle
# video stream in Camera object. The difference and the reason of not sharing with
# Camera is (1) EventCamera can be a standalone object to be initiated and more
# importantly (2) TODO
multi_sensor = self.parent.trace.multi_sensor
if self.name == multi_sensor.main_event_camera:
for camera_name in multi_sensor.camera_names: # base cameras are regular cameras
# get video stream
video_path = os.path.join(self.parent.trace.trace_path,
camera_name + '.avi')
cam_h, cam_w = self.base_camera_param.get_height(
), self.base_camera_param.get_width()
stream = FFReader(video_path,
custom_size=(cam_h, cam_w),
verbose=False)
self._streams[camera_name] = stream
# seek based on timestamp
frame_num = self.parent.trace.good_frames[camera_name][
self.parent.segment_index][self.parent.frame_index]
seek_sec = self._streams[camera_name].frame_to_secs(
frame_num)
self._streams[camera_name].seek(seek_sec)
else: # use shared streams from the main camera
main_name = multi_sensor.main_event_camera
main_sensor = [
_s for _s in self.parent.sensors if _s.name == main_name
]
assert len(
main_sensor) == 1, 'Cannot find main sensor {}'.format(
main_name)
main_sensor = main_sensor[0]
assert isinstance(main_sensor,
Camera), 'Main sensor is not Camera object'
self._streams = main_sensor.streams
# Add background mesh for all video stream in view synthesis. Note that we use camera
# parameters from the base camera since 2D-to-3D part is determined by the regular RGB
# video streams. Reprojecting back to 2D is then be handled by event camera
# configuration
parent_sensor_dict = {_s.name: _s for _s in self.parent.sensors}
for camera_name in self.streams.keys():
if camera_name not in self.view_synthesis.bg_mesh_names:
if camera_name in parent_sensor_dict.keys():
camera_param = parent_sensor_dict[
camera_name].camera_param
else:
camera_param = CameraParams(self._config['rig_path'],
camera_name)
camera_param.resize(*self._config['base_size'])
self.view_synthesis.add_bg_mesh(camera_param)
# Reset previous frame
self._prev_frame = None
if self.config['reproject_pixel']:
cam_h = self.camera_param.get_height()
cam_w = self.camera_param.get_width()
world_rays = self.view_synthesis._world_rays[
self.base_camera_param.name]
K = self.camera_param.get_K()
self._uv = np.matmul(K, world_rays).T[:, :2][:, ::-1]
else:
# TODO: check if we need to close event data stream
logging.debug(
'Not sure if it is ok to not close the event data stream')
raw_path = os.path.join(self.parent.trace.trace_path,
f'{self.name}.raw')
stream = RawReader(raw_path)
self._streams[self.name] = stream
# Reset previous timestamp
self._prev_timestamp = None
[docs] def capture(self,
timestamp: float,
update_rgb_frame_only: Optional[bool] = False) -> np.ndarray:
""" Synthesize event data based on current timestamp and transformation
between the novel viewpoint to be simulated and the nominal viewpoint from
the pre-collected RGB dataset. In a very high level, it basically performs
video interpolation across consecutive RGB frames, extract events with an
event emission model, projects the events to the event camera space. Note
that the simulation is running a deep network (SuperSloMo) for video interpolation.
Args:
timestamp (float): Timestamp that allows to retrieve a pointer to
the dataset for data-driven simulation (synthesizing RGB image
from real RGB video).
Returns:
np.ndarray: A synthesized event data.
"""
logging.info(f'Event camera ({self.id}) capture')
if self._config['use_synthesizer']:
# Get frame at the closest smaller timestamp from dataset. In event camera simulation,
# we don't use flow interpolation since event generation is already continuous-time.
multi_sensor = self.parent.trace.multi_sensor
if self.name == multi_sensor.main_event_camera:
all_frame_nums = multi_sensor.get_frames_from_times(
[timestamp], fetch_smaller=False)
for camera_name in multi_sensor.camera_names:
# rgb camera stream
stream = self.streams[camera_name]
frame_num = all_frame_nums[camera_name][0]
if frame_num < stream.frame_num:
seek_sec = stream.frame_to_secs(frame_num)
stream.seek(seek_sec)
while stream.frame_num != frame_num:
stream.read()
frames = dict()
for camera_name in multi_sensor.camera_names:
frames[camera_name] = self.streams[camera_name].image.copy()
# Synthesis by rendering with compensation of relative pose between event camera
# parameters and the base (RGB) camera parameters since we set the view synthesizer
# with event camera parameters and project frame to 3D meshes using base camera
# parameters
lat, long, yaw = self.parent.relative_state.numpy()
trans = np.array([lat, 0., -long])
rot = np.array([0., yaw, 0.])
rendered_frame, _ = self.view_synthesis.synthesize(
trans, rot, frames)
# work with RGB instead of BGR; need copy otherwise fail at converting to tensor
rendered_frame = rendered_frame[:, :, ::-1].copy()
if update_rgb_frame_only:
self._prev_frame = rendered_frame
self._prev_timestamp = timestamp
else:
events = [[], []]
if self.prev_frame is not None:
with torch.no_grad():
out = self._interp.forward_warp(
self.prev_frame,
rendered_frame,
max_sf=self.config['max_sf'],
use_max_flow=True)
flow = list(
map(lambda _x: _x.cpu().numpy()[0].transpose(1, 2, 0),
out['flow']))
interp = [self.prev_frame] + out['interpolated']
if len(interp) >= 2:
last_interp_frame = cv2.cvtColor(
interp[0], cv2.COLOR_RGB2GRAY)
dt = (timestamp - self.prev_timestamp) / out['sf']
event_timestamp = self.prev_timestamp
for interp_frame in interp[1:]:
interp_frame = cv2.cvtColor(
interp_frame, cv2.COLOR_RGB2GRAY)
log_d_interp = np.log(interp_frame / 255. + 1e-8) - \
np.log(last_interp_frame / 255. + 1e-8)
sample_fn = lambda mu, sig: np.clip(
np.random.normal(mu, sig), 0., 1.)
positive_C = sample_fn(
self.config['positive_threshold'],
self.config['sigma_positive_threshold'])
negative_C = -sample_fn(
-self.config['negative_threshold'],
self.config['sigma_negative_threshold'])
if self.config['reproject_pixel']:
cam_h = self.camera_param.get_height()
cam_w = self.camera_param.get_width()
def extract_uv(_mask):
_fl_mask = _mask.flatten()
_uv = self._uv[_fl_mask]
_mode = 1
if _mode == 0:
_uv = np.round(_uv).astype(np.int)
elif _mode == 1:
_uv_floor = np.floor(_uv)
_uv_ceil = np.ceil(_uv)
diff_floor = np.abs(_uv - _uv_floor)
diff_ceil = np.abs(_uv - _uv_ceil)
n_uv = _uv.shape[0]
zeros = np.zeros((n_uv, ))
_uv_score = np.concatenate([
np.stack([
_uv_floor[:, 0],
_uv_floor[:, 1], zeros
],
axis=1),
np.stack([
_uv_floor[:, 0],
_uv_ceil[:, 1], zeros
],
axis=1),
np.stack([
_uv_ceil[:, 0],
_uv_floor[:, 1], zeros
],
axis=1),
np.stack([
_uv_ceil[:, 0], _uv_ceil[:, 1],
zeros
],
axis=1)
],
axis=0)
_uv_score[:n_uv,
2] = diff_ceil[:,
0] * diff_ceil[:,
1]
_uv_score[
n_uv:2 * n_uv,
2] = diff_ceil[:,
0] * diff_floor[:,
1]
_uv_score[
2 * n_uv:3 * n_uv,
2] = diff_floor[:,
0] * diff_ceil[:,
1]
_uv_score[
3 * n_uv:,
2] = diff_floor[:,
0] * diff_floor[:,
1]
_uv = _uv_score[
_uv_score[:, 2] >= 0.2, :2].astype(
np.int)
else:
_uv_floor = np.floor(_uv)
_uv_ceil = np.ceil(_uv)
_uv = np.concatenate([
np.stack([
_uv_floor[:, 0], _uv_floor[:,
1]
],
axis=1),
np.stack([
_uv_floor[:, 0], _uv_ceil[:, 1]
],
axis=1),
np.stack([
_uv_ceil[:, 0], _uv_floor[:, 1]
],
axis=1),
np.stack([
_uv_ceil[:, 0], _uv_ceil[:, 1]
],
axis=1)
],
axis=0)
_uv = _uv.astype(np.int)
_valid_mask = (_uv[:,0] > 0) & (_uv[:,0] < cam_h) & \
(_uv[:,1] > 0) & (_uv[:,1] < cam_w)
_uv = _uv[_valid_mask]
return _uv
positive_uv = extract_uv(
log_d_interp >= positive_C)
negative_uv = extract_uv(
log_d_interp <= negative_C)
else:
positive_uv = np.argwhere(
log_d_interp >= positive_C)
negative_uv = np.argwhere(
log_d_interp <= negative_C)
event_timestamp += dt
event_timestamp_us = (event_timestamp *
1e6).astype(np.int64)
positive_events = np.concatenate([
positive_uv,
np.tile([event_timestamp_us, 1],
(positive_uv.shape[0], 1))
],
axis=1)
negative_events = np.concatenate([
negative_uv,
np.tile([event_timestamp_us, 1],
(negative_uv.shape[0], 0))
],
axis=1)
events[0].append(positive_events)
events[1].append(negative_events)
last_interp_frame = interp_frame
self._prev_frame = rendered_frame
self._prev_timestamp = timestamp
return events
else:
if self._prev_timestamp is not None:
stream = self.streams[self.name]
multi_sensor = self.parent.trace.multi_sensor
ref_sensor = multi_sensor.master_sensor
# TODO: didn't do dedicated alignment between ros clock and event camera clock
logging.debug(
'No dedicated alignment between ros and event camera clocks'
)
offset_timestamp = multi_sensor.get_time_from_frame_num(
ref_sensor, 0)
event_timestamp_us = int(
(self._prev_timestamp - offset_timestamp) * 1e6)
if event_timestamp_us < stream.current_time:
stream.reset() # cannot seek backward
stream.seek_time(event_timestamp_us)
dt_us = int((timestamp - self._prev_timestamp) * 1e6)
events = stream.load_delta_t(dt_us)
if stream.is_done():
self.parent._done = True
ori_cam_h, ori_cam_w = self.config['original_size']
cam_h = self.camera_param.get_height()
cam_w = self.camera_param.get_width()
# NOTE: do subsampling otherwise too many events left
subsample_factor = (cam_h * cam_w) / float(
ori_cam_h * ori_cam_w) * self.config['subsampling_ratio']
n_events = events['x'].shape[0]
subsample_mask = np.random.choice(np.arange(n_events),
size=int(n_events *
subsample_factor),
replace=False)
events_x = events['x'][subsample_mask]
events_y = events['y'][subsample_mask]
events_p = events['p'][subsample_mask]
mul_x = cam_w / float(ori_cam_w)
mul_y = cam_h / float(ori_cam_h)
events_x = (events_x * mul_x).astype(np.int)
events_y = (events_y * mul_y).astype(np.int)
positive_mask = events_p == 1
negative_mask = events_p == 0
positive_events = np.array(
[events_y[positive_mask], events_x[positive_mask]]).T
negative_events = np.array(
[events_y[negative_mask], events_x[negative_mask]]).T
events = [[positive_events], [negative_events]]
else:
events = [[], []]
self._prev_timestamp = timestamp
return events
@property
def config(self) -> Dict:
""" Configuration of this sensor. """
return self._config
@property
def streams(self) -> Dict[str, FFReader]:
""" Data stream of RGB image/video dataset to be simulated from. """
return self._streams
@property
def camera_param(self) -> CameraParams:
""" Camera parameters of the virtual event camera. """
return self._camera_param
@property
def base_camera_param(self) -> CameraParams:
""" Camera parameters of the RGB camera. """
return self._base_camera_param
@property
def view_synthesis(self) -> ViewSynthesis:
""" View synthesizer object. """
return self._view_synthesis
@property
def prev_frame(self) -> np.ndarray:
""" Previous RGB frame. """
return self._prev_frame
@property
def prev_timestamp(self) -> float:
""" Previous timestamp. """
return self._prev_timestamp