utils.py 12 KB


  1. # Ultralytics 🚀 AGPL-3.0 License - https://ultralytics.com/license
  2. from typing import Tuple
  3. import torch
  4. import torch.nn.functional as F
  5. def select_closest_cond_frames(frame_idx, cond_frame_outputs, max_cond_frame_num):
  6. """
  7. Selects the closest conditioning frames to a given frame index.
  8. Args:
  9. frame_idx (int): Current frame index.
  10. cond_frame_outputs (Dict[int, Any]): Dictionary of conditioning frame outputs keyed by frame indices.
  11. max_cond_frame_num (int): Maximum number of conditioning frames to select.
  12. Returns:
  13. (Tuple[Dict[int, Any], Dict[int, Any]]): A tuple containing two dictionaries:
  14. - selected_outputs: Selected items from cond_frame_outputs.
  15. - unselected_outputs: Items not selected from cond_frame_outputs.
  16. Examples:
  17. >>> frame_idx = 5
  18. >>> cond_frame_outputs = {1: "a", 3: "b", 7: "c", 9: "d"}
  19. >>> max_cond_frame_num = 2
  20. >>> selected, unselected = select_closest_cond_frames(frame_idx, cond_frame_outputs, max_cond_frame_num)
  21. >>> print(selected)
  22. {3: 'b', 7: 'c'}
  23. >>> print(unselected)
  24. {1: 'a', 9: 'd'}
  25. """
  26. if max_cond_frame_num == -1 or len(cond_frame_outputs) <= max_cond_frame_num:
  27. selected_outputs = cond_frame_outputs
  28. unselected_outputs = {}
  29. else:
  30. assert max_cond_frame_num >= 2, "we should allow using 2+ conditioning frames"
  31. selected_outputs = {}
  32. # the closest conditioning frame before `frame_idx` (if any)
  33. idx_before = max((t for t in cond_frame_outputs if t < frame_idx), default=None)
  34. if idx_before is not None:
  35. selected_outputs[idx_before] = cond_frame_outputs[idx_before]
  36. # the closest conditioning frame after `frame_idx` (if any)
  37. idx_after = min((t for t in cond_frame_outputs if t >= frame_idx), default=None)
  38. if idx_after is not None:
  39. selected_outputs[idx_after] = cond_frame_outputs[idx_after]
  40. # add other temporally closest conditioning frames until reaching a total
  41. # of `max_cond_frame_num` conditioning frames.
  42. num_remain = max_cond_frame_num - len(selected_outputs)
  43. inds_remain = sorted(
  44. (t for t in cond_frame_outputs if t not in selected_outputs),
  45. key=lambda x: abs(x - frame_idx),
  46. )[:num_remain]
  47. selected_outputs.update((t, cond_frame_outputs[t]) for t in inds_remain)
  48. unselected_outputs = {t: v for t, v in cond_frame_outputs.items() if t not in selected_outputs}
  49. return selected_outputs, unselected_outputs
  50. def get_1d_sine_pe(pos_inds, dim, temperature=10000):
  51. """Generates 1D sinusoidal positional embeddings for given positions and dimensions."""
  52. pe_dim = dim // 2
  53. dim_t = torch.arange(pe_dim, dtype=torch.float32, device=pos_inds.device)
  54. dim_t = temperature ** (2 * (dim_t // 2) / pe_dim)
  55. pos_embed = pos_inds.unsqueeze(-1) / dim_t
  56. pos_embed = torch.cat([pos_embed.sin(), pos_embed.cos()], dim=-1)
  57. return pos_embed
  58. def init_t_xy(end_x: int, end_y: int):
  59. """Initializes 1D and 2D coordinate tensors for a grid of specified dimensions."""
  60. t = torch.arange(end_x * end_y, dtype=torch.float32)
  61. t_x = (t % end_x).float()
  62. t_y = torch.div(t, end_x, rounding_mode="floor").float()
  63. return t_x, t_y
  64. def compute_axial_cis(dim: int, end_x: int, end_y: int, theta: float = 10000.0):
  65. """Computes axial complex exponential positional encodings for 2D spatial positions in a grid."""
  66. freqs_x = 1.0 / (theta ** (torch.arange(0, dim, 4)[: (dim // 4)].float() / dim))
  67. freqs_y = 1.0 / (theta ** (torch.arange(0, dim, 4)[: (dim // 4)].float() / dim))
  68. t_x, t_y = init_t_xy(end_x, end_y)
  69. freqs_x = torch.outer(t_x, freqs_x)
  70. freqs_y = torch.outer(t_y, freqs_y)
  71. freqs_cis_x = torch.polar(torch.ones_like(freqs_x), freqs_x)
  72. freqs_cis_y = torch.polar(torch.ones_like(freqs_y), freqs_y)
  73. return torch.cat([freqs_cis_x, freqs_cis_y], dim=-1)
  74. def reshape_for_broadcast(freqs_cis: torch.Tensor, x: torch.Tensor):
  75. """Reshapes frequency tensor for broadcasting with input tensor, ensuring dimensional compatibility."""
  76. ndim = x.ndim
  77. assert 0 <= 1 < ndim
  78. assert freqs_cis.shape == (x.shape[-2], x.shape[-1])
  79. shape = [d if i >= ndim - 2 else 1 for i, d in enumerate(x.shape)]
  80. return freqs_cis.view(*shape)
  81. def apply_rotary_enc(
  82. xq: torch.Tensor,
  83. xk: torch.Tensor,
  84. freqs_cis: torch.Tensor,
  85. repeat_freqs_k: bool = False,
  86. ):
  87. """Applies rotary positional encoding to query and key tensors using complex-valued frequency components."""
  88. xq_ = torch.view_as_complex(xq.float().reshape(*xq.shape[:-1], -1, 2))
  89. xk_ = torch.view_as_complex(xk.float().reshape(*xk.shape[:-1], -1, 2)) if xk.shape[-2] != 0 else None
  90. freqs_cis = reshape_for_broadcast(freqs_cis, xq_)
  91. xq_out = torch.view_as_real(xq_ * freqs_cis).flatten(3)
  92. if xk_ is None:
  93. # no keys to rotate, due to dropout
  94. return xq_out.type_as(xq).to(xq.device), xk
  95. # repeat freqs along seq_len dim to match k seq_len
  96. if repeat_freqs_k:
  97. r = xk_.shape[-2] // xq_.shape[-2]
  98. freqs_cis = freqs_cis.repeat(*([1] * (freqs_cis.ndim - 2)), r, 1)
  99. xk_out = torch.view_as_real(xk_ * freqs_cis).flatten(3)
  100. return xq_out.type_as(xq).to(xq.device), xk_out.type_as(xk).to(xk.device)
  101. def window_partition(x, window_size):
  102. """
  103. Partitions input tensor into non-overlapping windows with padding if needed.
  104. Args:
  105. x (torch.Tensor): Input tensor with shape (B, H, W, C).
  106. window_size (int): Size of each window.
  107. Returns:
  108. (Tuple[torch.Tensor, Tuple[int, int]]): A tuple containing:
  109. - windows (torch.Tensor): Partitioned windows with shape (B * num_windows, window_size, window_size, C).
  110. - (Hp, Wp) (Tuple[int, int]): Padded height and width before partition.
  111. Examples:
  112. >>> x = torch.randn(1, 16, 16, 3)
  113. >>> windows, (Hp, Wp) = window_partition(x, window_size=4)
  114. >>> print(windows.shape, Hp, Wp)
  115. torch.Size([16, 4, 4, 3]) 16 16
  116. """
  117. B, H, W, C = x.shape
  118. pad_h = (window_size - H % window_size) % window_size
  119. pad_w = (window_size - W % window_size) % window_size
  120. if pad_h > 0 or pad_w > 0:
  121. x = F.pad(x, (0, 0, 0, pad_w, 0, pad_h))
  122. Hp, Wp = H + pad_h, W + pad_w
  123. x = x.view(B, Hp // window_size, window_size, Wp // window_size, window_size, C)
  124. windows = x.permute(0, 1, 3, 2, 4, 5).contiguous().view(-1, window_size, window_size, C)
  125. return windows, (Hp, Wp)
  126. def window_unpartition(windows, window_size, pad_hw, hw):
  127. """
  128. Unpartitions windowed sequences into original sequences and removes padding.
  129. This function reverses the windowing process, reconstructing the original input from windowed segments
  130. and removing any padding that was added during the windowing process.
  131. Args:
  132. windows (torch.Tensor): Input tensor of windowed sequences with shape (B * num_windows, window_size,
  133. window_size, C), where B is the batch size, num_windows is the number of windows, window_size is
  134. the size of each window, and C is the number of channels.
  135. window_size (int): Size of each window.
  136. pad_hw (Tuple[int, int]): Padded height and width (Hp, Wp) of the input before windowing.
  137. hw (Tuple[int, int]): Original height and width (H, W) of the input before padding and windowing.
  138. Returns:
  139. (torch.Tensor): Unpartitioned sequences with shape (B, H, W, C), where B is the batch size, H and W
  140. are the original height and width, and C is the number of channels.
  141. Examples:
  142. >>> windows = torch.rand(32, 8, 8, 64) # 32 windows of size 8x8 with 64 channels
  143. >>> pad_hw = (16, 16) # Padded height and width
  144. >>> hw = (15, 14) # Original height and width
  145. >>> x = window_unpartition(windows, window_size=8, pad_hw=pad_hw, hw=hw)
  146. >>> print(x.shape)
  147. torch.Size([1, 15, 14, 64])
  148. """
  149. Hp, Wp = pad_hw
  150. H, W = hw
  151. B = windows.shape[0] // (Hp * Wp // window_size // window_size)
  152. x = windows.view(B, Hp // window_size, Wp // window_size, window_size, window_size, -1)
  153. x = x.permute(0, 1, 3, 2, 4, 5).contiguous().view(B, Hp, Wp, -1)
  154. if Hp > H or Wp > W:
  155. x = x[:, :H, :W, :].contiguous()
  156. return x
  157. def get_rel_pos(q_size: int, k_size: int, rel_pos: torch.Tensor) -> torch.Tensor:
  158. """
  159. Extracts relative positional embeddings based on query and key sizes.
  160. Args:
  161. q_size (int): Size of the query.
  162. k_size (int): Size of the key.
  163. rel_pos (torch.Tensor): Relative position embeddings with shape (L, C), where L is the maximum relative
  164. distance and C is the embedding dimension.
  165. Returns:
  166. (torch.Tensor): Extracted positional embeddings according to relative positions, with shape (q_size,
  167. k_size, C).
  168. Examples:
  169. >>> q_size, k_size = 8, 16
  170. >>> rel_pos = torch.randn(31, 64) # 31 = 2 * max(8, 16) - 1
  171. >>> extracted_pos = get_rel_pos(q_size, k_size, rel_pos)
  172. >>> print(extracted_pos.shape)
  173. torch.Size([8, 16, 64])
  174. """
  175. max_rel_dist = int(2 * max(q_size, k_size) - 1)
  176. # Interpolate rel pos if needed.
  177. if rel_pos.shape[0] != max_rel_dist:
  178. # Interpolate rel pos.
  179. rel_pos_resized = F.interpolate(
  180. rel_pos.reshape(1, rel_pos.shape[0], -1).permute(0, 2, 1),
  181. size=max_rel_dist,
  182. mode="linear",
  183. )
  184. rel_pos_resized = rel_pos_resized.reshape(-1, max_rel_dist).permute(1, 0)
  185. else:
  186. rel_pos_resized = rel_pos
  187. # Scale the coords with short length if shapes for q and k are different.
  188. q_coords = torch.arange(q_size)[:, None] * max(k_size / q_size, 1.0)
  189. k_coords = torch.arange(k_size)[None, :] * max(q_size / k_size, 1.0)
  190. relative_coords = (q_coords - k_coords) + (k_size - 1) * max(q_size / k_size, 1.0)
  191. return rel_pos_resized[relative_coords.long()]
  192. def add_decomposed_rel_pos(
  193. attn: torch.Tensor,
  194. q: torch.Tensor,
  195. rel_pos_h: torch.Tensor,
  196. rel_pos_w: torch.Tensor,
  197. q_size: Tuple[int, int],
  198. k_size: Tuple[int, int],
  199. ) -> torch.Tensor:
  200. """
  201. Adds decomposed Relative Positional Embeddings to the attention map.
  202. This function calculates and applies decomposed Relative Positional Embeddings as described in the MVITv2
  203. paper. It enhances the attention mechanism by incorporating spatial relationships between query and key
  204. positions.
  205. Args:
  206. attn (torch.Tensor): Attention map with shape (B, q_h * q_w, k_h * k_w).
  207. q (torch.Tensor): Query tensor in the attention layer with shape (B, q_h * q_w, C).
  208. rel_pos_h (torch.Tensor): Relative position embeddings for height axis with shape (Lh, C).
  209. rel_pos_w (torch.Tensor): Relative position embeddings for width axis with shape (Lw, C).
  210. q_size (Tuple[int, int]): Spatial sequence size of query q as (q_h, q_w).
  211. k_size (Tuple[int, int]): Spatial sequence size of key k as (k_h, k_w).
  212. Returns:
  213. (torch.Tensor): Updated attention map with added relative positional embeddings, shape
  214. (B, q_h * q_w, k_h * k_w).
  215. Examples:
  216. >>> B, C, q_h, q_w, k_h, k_w = 1, 64, 8, 8, 8, 8
  217. >>> attn = torch.rand(B, q_h * q_w, k_h * k_w)
  218. >>> q = torch.rand(B, q_h * q_w, C)
  219. >>> rel_pos_h = torch.rand(2 * max(q_h, k_h) - 1, C)
  220. >>> rel_pos_w = torch.rand(2 * max(q_w, k_w) - 1, C)
  221. >>> q_size, k_size = (q_h, q_w), (k_h, k_w)
  222. >>> updated_attn = add_decomposed_rel_pos(attn, q, rel_pos_h, rel_pos_w, q_size, k_size)
  223. >>> print(updated_attn.shape)
  224. torch.Size([1, 64, 64])
  225. References:
  226. https://github.com/facebookresearch/mvit/blob/main/mvit/models/attention.py
  227. """
  228. q_h, q_w = q_size
  229. k_h, k_w = k_size
  230. Rh = get_rel_pos(q_h, k_h, rel_pos_h)
  231. Rw = get_rel_pos(q_w, k_w, rel_pos_w)
  232. B, _, dim = q.shape
  233. r_q = q.reshape(B, q_h, q_w, dim)
  234. rel_h = torch.einsum("bhwc,hkc->bhwk", r_q, Rh)
  235. rel_w = torch.einsum("bhwc,wkc->bhwk", r_q, Rw)
  236. attn = (attn.view(B, q_h, q_w, k_h, k_w) + rel_h[:, :, :, :, None] + rel_w[:, :, :, None, :]).view(
  237. B, q_h * q_w, k_h * k_w
  238. )
  239. return attn