Source code for vista.utils.misc

from typing import Optional, Dict, Union, List, Tuple, Any
import numpy as np
import cv2
from shapely.geometry import box as Box
from shapely import affinity

from . import transform
from ..entities.agents import Car
from ..entities.agents.Dynamics import StateDynamics

Vec = Union[List, Tuple, np.ndarray]


[docs]def agent2poly(agent: Car, ref_dynamics: Optional[StateDynamics] = None) -> Box: """ Convert Agent object to polygon w.r.t. a reference dynamics. Args: agent (Car): An agent with valid dynamics (pose and vehicle state). ref_dynamics (StateDynamics): A reference dynamics for computing the polygon representation of the agent. Default set to None that uses human dynamics of the agent. Returns: Box: A polygon that describes the agent. """ ref_dynamics = agent.human_dynamics if ref_dynamics is None else ref_dynamics rel_pose = transform.compute_relative_latlongyaw( agent.ego_dynamics.numpy()[:3], ref_dynamics.numpy()[:3]) poly = Box(rel_pose[0] - agent.width / 2., rel_pose[1] - agent.length / 2., rel_pose[0] + agent.width / 2., rel_pose[1] + agent.length / 2.) poly = affinity.rotate(poly, np.degrees(rel_pose[2])) return poly
[docs]def merge_dict(dict1: Dict, dict2: Dict) -> Dict: """ Merge two dict, where dict1 has higher priority. Args: dict1 (Dict): The first dictionary. dict2 (Dict): The second dictionary. Returns: Dict: The merged dictionary. """ return dict(list(dict2.items()) + list(dict1.items()))
[docs]def fetch_agent_info(agent: Car) -> Dict[str, Any]: """ Get info from agent class. Args: agent (Car): The agent to be extracted information from. Returns: Dict: A dictionary contains various information of the agent. """ info = dict( trace_path=agent.trace.trace_path, relative_state=agent.relative_state.numpy(), ego_dynamics=agent.ego_dynamics.numpy(), human_dynamics=agent.human_dynamics.numpy(), length=agent.length, width=agent.width, wheel_base=agent.wheel_base, steering_ratio=agent.steering_ratio, speed=agent.speed, curvature=agent.curvature, steering=agent.steering, tire_angle=agent.tire_angle, human_speed=agent.human_speed, human_curvature=agent.human_curvature, human_steering=agent.human_steering, human_tire_angle=agent.human_tire_angle, timestamp=agent.timestamp, frame_number=agent.frame_number, trace_index=agent.trace_index, segment_index=agent.segment_index, frame_index=agent.frame_index, trace_done=agent.done, ) return info
[docs]def img2flow(img: np.ndarray, mag_minmax: Vec, flow_size: Optional[Vec] = None) -> np.ndarray: """ Convert HSV-encoded flow image to optical flow. Args: img (np.ndarray): An image with channel order BGR. mag_minmax (Vec): The minmum and maximum when normalizing the flow magnitude to [0,1]. flow_size (Vec): Size of the output flow array. If set, resize the image before converting to flow; default to ``None``. Returns: np.ndarray: A HxWx2 array with the two channels as magnitude and the angle of the flow. """ hsv = cv2.cvtColor(img, cv2.COLOR_BGR2HSV) # assume image is BGR if flow_size is not None: hsv = cv2.resize(hsv, flow_size[::-1]) ang = hsv[..., 0] * 2. * np.pi / 180 mag = hsv[..., 1] / 255. * (mag_minmax[1] - mag_minmax[0]) + mag_minmax[0] flow = np.stack(cv2.polarToCart(mag, ang), axis=-1) return flow
[docs]def biinterp(I0: np.ndarray, I1: np.ndarray, F_0_1: np.ndarray, F_1_0: np.ndarray, ts: float, t0: float, t1: float) -> np.ndarray: """ Interpolate frame with bidirectional flow. Args: I0 (np.ndarray): A RGB image at time `t0`. I1 (np.ndarray): A RGB image at time `t1`. F_0_1 (np.ndarray): The flow from time `t0` to `t1`. F_1_0 (np.ndarray): The flow from time `t1` to `t0`. ts (float): The timestamp to be interpolated to. t0 (float): The timestamp of `I0`. t1 (float): The timestamp of `I1`. Returns: np.ndarray: An interpolated RGB image at time `ts`. """ t = (ts - t0) / (t1 - t0) temp = -t * (1 - t) fCoeff = [temp, t * t, (1 - t) * (1 - t), temp] F_t_0 = fCoeff[0] * F_0_1 + fCoeff[1] * F_1_0 F_t_1 = fCoeff[2] * F_0_1 + fCoeff[3] * F_1_0 g_I0_F_t_0 = flow_backwarp(I0, F_t_0) g_I1_F_t_1 = flow_backwarp(I1, F_t_1) wCoeff = [1 - t, t] out = wCoeff[0] * g_I0_F_t_0 + wCoeff[1] * g_I1_F_t_1 return out
[docs]def flow_backwarp(img: np.ndarray, flow: np.ndarray, use_pytorch: Optional[bool] = False) -> np.ndarray: """ Warp image based on optical flow. Args: img (np.ndarray): An image to be warped. flow (np.ndarray): Optical flow to warp the image. use_pytorch (bool): Whether to use pytorch for warping; default to ``False``. Returns: np.ndarray: A warped image. """ H, W = img.shape[:2] gridX, gridY = np.meshgrid(np.arange(W), np.arange(H)) u = flow[:, :, 0] v = flow[:, :, 1] x = gridX + u y = gridY + v if use_pytorch: import torch x = 2 * (x / W - 0.5) y = 2 * (y / H - 0.5) grid = torch.from_numpy(np.stack((x, y), axis=2)) img = torch.from_numpy(img / 255.).permute(2, 0, 1) out = torch.nn.functional.grid_sample(img[None, ...], grid[None, ...], align_corners=True) out = (out[0].permute(1, 2, 0).cpu().numpy() * 255.).astype(np.uint8) else: img = img / 255. grid = np.stack([x, y], axis=-1) out = cv2.remap(img, x.astype(np.float32), y.astype(np.float32), cv2.INTER_LINEAR) out = np.clip(out * 255, 0, 255).astype(np.uint8) return out