Source code for vista.entities.sensors.lidar_utils.LidarSynthesis

import imp
import importlib.resources as pkg_resources
import numpy as np
from scipy import interpolate
from typing import Tuple, Optional, Union
import torch
import warnings

import vista
from vista import resources
from vista.utils import transform, logging
from .Pointcloud import Pointcloud, Point
from .s2d_model import LidarModel

tensor_or_ndarray = Union[torch.Tensor, np.ndarray]


[docs]class LidarSynthesis: """ A Lidar synthesizer that simulates point cloud from novel viewpoint around a pre-collected Lidar sweep. At a high level, it involves (1) performing rigid transformation on point cloud based on given viewpoint change (2) projecting 3D point cloud to 2D image space with angle coordinates (3) densifying the sparse 2D image (4) culling occluded region (5) masking out some points/pixels to simulate the sparse pattern of LiDAR sweep (6) reprojecting back to 3D point cloud or rays. Args: input_yaw_fov (float): Input LiDAR field of view in yaw axis; can be read from ``params.xml`` file. input_pitch_fov (float): Input LiDAR field of view in pitch axis; can be read from ``params.xml`` file. yaw_fov (float): Output LiDAR field of view in yaw axis; default is ``input_yaw_fov``. pitch_fov (float): Output LiDAR field of view in pitch axis; default is ``input_pitch_fov``. yaw_res (float): Resolution in yaw axis; default is ``0.1``. pitch_res (float): Resolution in pitch axis; default is ``0.1``. culling_r (int): The radius (from the origin) for culling occluded points. load_model (bool): Whether to load Lidar densifier model; default to ``True``. """ def __init__(self, input_yaw_fov: Tuple[float, float], input_pitch_fov: Tuple[float, float], yaw_fov: Optional[Tuple[float, float]] = None, pitch_fov: Optional[Tuple[float, float]] = None, yaw_res: float = 0.1, pitch_res: float = 0.1, culling_r: int = 1, load_model: bool = True, **kwargs): ### Basic properties required for setting up the synthesizer including # the dimensionality and resolution of the image representation space self._res = np.array([yaw_res, pitch_res], dtype=np.float32) self._fov = np.array([input_yaw_fov, input_pitch_fov], dtype=np.float32) self._fov_rad = self._fov * np.pi / 180. self._dims = (self._fov[:, 1] - self._fov[:, 0]) / self._res self._dims = np.ceil(self._dims).astype(np.int)[:, np.newaxis] yaw_fov = input_yaw_fov if yaw_fov is None else yaw_fov pitch_fov = input_pitch_fov if pitch_fov is None else pitch_fov self._out_fov = np.array([yaw_fov, pitch_fov], dtype=np.float32) self._out_fov_rad = self._out_fov * np.pi / 180. ### Culling occluded LiDAR # Create a list of offset coordinates within a radius R of the origin, # but excluding the origin itself. self.device = 'cuda:0' if torch.cuda.is_available() else 'cpu:0' cull_axis = torch.arange(-culling_r, culling_r + 1) offsets = torch.meshgrid(cull_axis, cull_axis) offsets = torch.reshape(torch.stack(offsets, axis=-1), (-1, 2)) #(Nx2) offsets = offsets[torch.any(offsets != 0, axis=1)] #remove origin offsets = offsets.to(self.device) self.offsets = offsets[None].type(torch.int32) #expand_dims and cast ### Rendering masks and neural network model for sparse -> dense try: # can only work with python 3.9 rsrc_path = pkg_resources.files(resources) except AttributeError: with pkg_resources.path(vista, 'resources') as p: rsrc_path = p self.avg_mask = np.load(str(rsrc_path / "Lidar/avg_mask2.npy")) self.avg_mask_pt = torch.tensor(self.avg_mask).to(self.device) self.load_model = load_model self.render_model = None path = rsrc_path / "Lidar/LidarFillerDevens.pt" if path.is_file() and load_model: logging.debug(f"Loading Lidar model from {path}") state_dict = torch.load(path, map_location=self.device) self.render_model = LidarModel(layers=int(state_dict["layers"]), filters=int(state_dict["filters"])) self.render_model.load_state_dict(state_dict) self.render_model.to(self.device) self.render_model.eval()
[docs] def synthesize( self, trans: np.ndarray, rot: np.ndarray, pcd: np.ndarray, ) -> Tuple[Pointcloud, np.ndarray]: """ Apply rigid transformation to a dense pointcloud and return new dense representation or sparse pointcloud. Args: trans (np.ndarray): Translation vector. rot (np.ndarray): Rotation matrix. pcd (np.ndarray): Point cloud. Returns: Returns a tuple (``pointcloud``, ``array``), where ``pointcloud`` is the synthesized point cloud with view point change from the transform (``trans``, ``rot``), and ``array`` is the dense depth map in 2D image space. """ # Rigid transform of points R = transform.rot2mat(rot) pcd = pcd.transform(R, trans) # Convert from new pointcloud to dense image sparse = self._pcd2sparse(pcd, channels=(Point.DEPTH, Point.INTENSITY, Point.MASK), return_as_tensor=True) # Find occlusions and cull them from the rendering occlusions = self._cull_occlusions(sparse[:, :, 0]) sparse[occlusions[:, 0], occlusions[:, 1]] = np.nan # Densify the image before masking dense = self._sparse2dense(sparse, method="nn") # Sample the image to simulate active LiDAR using neural masking new_pcd = self._dense2pcd(dense) # Filter to the new pointcloud to match the desired output lidar specs new_pcd = new_pcd[(new_pcd.yaw > self._out_fov_rad[0, 0]) & (new_pcd.yaw < self._out_fov_rad[0, 1])] new_pcd = new_pcd[(new_pcd.pitch > self._out_fov_rad[1, 0]) & (new_pcd.pitch < self._out_fov_rad[1, 1])] # pitch = torch.arcsin(new_pcd.z / new_pcd.dist) # new_pcd = new_pcd[pitch < (14.0 / 180 * np.pi)] new_pcd = new_pcd.numpy() return (new_pcd, dense)
def _pcd2sparse(self, pcd: Pointcloud, channels: Point = Point.DEPTH, return_as_tensor: bool = False) -> tensor_or_ndarray: """ Convert from pointcloud to sparse image in polar coordinates. Fill image with specified features of the data (-1 = binary). """ if not isinstance(channels, list) and not isinstance(channels, tuple): channels = [channels] # Compute the values to fill and the indicies where to fill values = [pcd.get(channel) for channel in channels] values = np.stack(values, axis=1) inds = self._compute_sparse_inds(pcd) # Re-order to fill points with smallest depth last order = np.argsort(pcd.dist)[::-1] values = values[order] inds = inds[:, order] # Creat the image and fill it img = np.empty((self._dims[1, 0], self._dims[0, 0], len(channels)), np.float32) img.fill(np.nan) img[-inds[1], inds[0], :] = values return torch.tensor(img).to(self.device) if return_as_tensor else img def _cull_occlusions( self, sparse: tensor_or_ndarray, depth_slack: float = 0.1, ) -> tensor_or_ndarray: # Coordinates where we have depth samples coords = torch.stack(torch.where(sparse > 0)).T # At each location, also compute coordinate for all of its neighbors samples = coords[:, None, :] + self.offsets # (N, M, 2) # Collect the samples in each neighborhood samples[:, :, 0] = torch.clip(samples[:, :, 0], 0, sparse.shape[0] - 1) samples[:, :, 1] = torch.clip(samples[:, :, 1], 0, sparse.shape[1] - 1) neighbor_depth = sparse[samples[:, :, 0], samples[:, :, 1]] # For each location, compute the average depth of all neighbors valid = ~torch.isnan(neighbor_depth) scalar_zero = torch.zeros(1, 1).to(self.device) neighbor_depth = torch.where(valid, neighbor_depth, scalar_zero) avg_depth = (torch.sum(neighbor_depth, axis=1) / torch.sum(valid.to(torch.float), axis=1)) # Estimate if the location is occluded by measuring if its depth # greater than its surroundings (i.e. if it is behind its surroundings) # Some amound of slack can be added here to allow for edge cases. my_depth = sparse[coords[:, 0], coords[:, 1]] occluded = (my_depth - depth_slack) > avg_depth # Return the coordinates in the depth image which are occluded and # should be disregarded occluded_coords = coords[occluded] return occluded_coords def _cull_occlusions_np(self, sparse: np.ndarray, depth_slack: float = 0.1) -> np.ndarray: coords = np.array(np.where(sparse > 0)).T samples = np.expand_dims(coords, 1) + self.offsets.numpy() samples[:, :, 0] = np.clip(samples[:, :, 0], 0, sparse.shape[0] - 1) samples[:, :, 1] = np.clip(samples[:, :, 1], 0, sparse.shape[1] - 1) neighbor_depth = sparse[samples[:, :, 0], samples[:, :, 1]] with warnings.catch_warnings(): warnings.simplefilter("ignore", category=RuntimeWarning) avg_depth = np.nanmean(neighbor_depth, axis=1) my_depth = sparse[coords[:, 0], coords[:, 1]] # point is valid if it is closer than the average depth around it occluded = (my_depth - depth_slack) > avg_depth occluded_coords = coords[occluded] # remove (cull) all invalid points # sparse[occluded_coords[:, 0], occluded_coords[:, 1]] = np.nan # return sparse return occluded_coords def _sparse2dense(self, sparse: tensor_or_ndarray, method: str = "linear") -> tensor_or_ndarray: """ Convert from sparse image representation of pointcloud to dense. """ if method == "nn": sparse[torch.isnan(sparse)] = 0.0 sparse = torch.nn.functional.pad(sparse, (0, 0, 4, 4, 4, 4)) sparse = sparse[None].permute(0, 3, 1, 2) # (1, 3, H+p, W+p) dense = self.render_model(sparse).detach() # (1, 2, H+p, W+p) dense = dense.permute((0, 2, 3, 1)) # (1, H+p, W+p, 2) dense = dense[0, 4:-4, 4:-4] # (H, W, 2) else: # mask all invalid values zs = np.ma.masked_invalid(sparse) # integer arrays for indexing grid_x, grid_y = np.meshgrid(np.arange(0, self._dims[0]), np.arange(0, self._dims[1])) # retrieve the valid, non-Nan, defined values valid_xs = grid_x[~zs.mask] valid_ys = grid_y[~zs.mask] valid_zs = zs[~zs.mask] # generate interpolated array of values dense = interpolate.griddata((valid_xs, valid_ys), valid_zs, tuple((grid_x, grid_y)), method=method) dense[np.isnan(dense)] = 0.0 return dense def _dense2pcd(self, dense: tensor_or_ndarray): """ Sample mask from network and render points from mask """ # TODO: load trained masking network and feed dense through # For now, simply load a mask prior from training data and sample mask_shape = self.avg_mask.shape if isinstance(dense, torch.Tensor): mask = self.avg_mask_pt > torch.rand(size=mask_shape).to( self.device) pitch, yaw = torch.where(mask) else: mask = self.avg_mask > np.random.uniform(size=mask_shape) pitch, yaw = np.where(mask) pitch, yaw = self._coords2angles(pitch, yaw) rays = self._angles2rays(pitch, yaw) sampled_depth_and_ints = dense[mask] dist = sampled_depth_and_ints[:, 0] intensity = None if dense.shape[-1] == 2: # intensity dimension intensity = sampled_depth_and_ints[:, 1] points = (dist * rays).T pcd = Pointcloud(points, intensity) return pcd def _coords2angles( self, pitch_coords: tensor_or_ndarray, yaw_coords: tensor_or_ndarray ) -> Tuple[tensor_or_ndarray, tensor_or_ndarray]: yaw = yaw_coords * (self._fov_rad[0, 1] - self._fov_rad[0, 0]) / \ self._dims[0, 0] + self._fov_rad[0, 0] pitch = pitch_coords * (self._fov_rad[1, 0] - self._fov_rad[1, 1]) / \ self._dims[1, 0] + self._fov_rad[1, 1] return pitch, yaw def _angles2rays(self, pitch: tensor_or_ndarray, yaw: tensor_or_ndarray) -> tensor_or_ndarray: with_torch = isinstance(pitch, torch.Tensor) cos = torch.cos if with_torch else np.cos sin = torch.sin if with_torch else np.sin stack = torch.stack if with_torch else np.array xyLen = cos(pitch) rays = stack([ \ xyLen * cos(yaw), xyLen * sin(yaw), sin(pitch)]) return rays def _compute_sparse_inds(self, pcd: Pointcloud) -> np.ndarray: """ Compute the indicies on the image representation which will be filled for a given pointcloud """ # project point cloud to 2D point map yaw = np.arctan2(pcd.y, pcd.x) pitch = np.arcsin(pcd.z / pcd.dist) angles = np.stack((yaw, pitch)) fov_range = self._fov_rad[:, [1]] - self._fov_rad[:, [0]] slope = self._dims / fov_range inds = slope * (angles - self._fov_rad[:, [0]]) inds = np.floor(inds).astype(np.int) np.clip(inds, np.zeros((2, 1)), self._dims - 1, out=inds) return inds