""" Taken from https://github.com/lucidrains/flamingo-pytorch """ import torch from einops import rearrange, repeat try: from einops_exts import rearrange_many except: pass from torch import einsum, nn def exists(val): return val is not None def FeedForward(dim, mult=4): inner_dim = int(dim * mult) return nn.Sequential( nn.LayerNorm(dim), nn.Linear(dim, inner_dim, bias=False), nn.GELU(), nn.Linear(inner_dim, dim, bias=False), ) class PerceiverAttention(nn.Module): def __init__(self, *, dim, dim_head=64, heads=8): super().__init__() self.scale = dim_head**-0.5 self.heads = heads inner_dim = dim_head * heads self.norm_media = nn.LayerNorm(dim) self.norm_latents = nn.LayerNorm(dim) self.to_q = nn.Linear(dim, inner_dim, bias=False) self.to_kv = nn.Linear(dim, inner_dim * 2, bias=False) self.to_out = nn.Linear(inner_dim, dim, bias=False) def forward(self, x, latents): """ Args: x (torch.Tensor): image features shape (b, T, n1, D) latent (torch.Tensor): latent features shape (b, T, n2, D) """ x = self.norm_media(x) latents = self.norm_latents(latents) h = self.heads q = self.to_q(latents) kv_input = torch.cat((x, latents), dim=-2) k, v = self.to_kv(kv_input).chunk(2, dim=-1) q, k, v = rearrange_many((q, k, v), "b t n (h d) -> b h t n d", h=h) q = q * self.scale # attention sim = einsum("... i d, ... j d -> ... i j", q, k) sim = sim - sim.amax(dim=-1, keepdim=True).detach() attn = sim.softmax(dim=-1) out = einsum("... i j, ... j d -> ... i d", attn, v) out = rearrange(out, "b h t n d -> b t n (h d)", h=h) return self.to_out(out) class PerceiverResamplerModule(nn.Module): def __init__( self, *, dim, depth=6, dim_head=64, heads=8, num_latents=64, max_num_media=None, max_num_frames=None, ff_mult=4, ): super().__init__() self.latents = nn.Parameter(torch.randn(num_latents, dim)) self.frame_embs = nn.Parameter(torch.randn(max_num_frames, dim)) if exists(max_num_frames) else None self.media_time_embs = nn.Parameter(torch.randn(max_num_media, 1, dim)) if exists(max_num_media) else None self.layers = nn.ModuleList([]) for _ in range(depth): self.layers.append( nn.ModuleList( [ PerceiverAttention(dim=dim, dim_head=dim_head, heads=heads), FeedForward(dim=dim, mult=ff_mult) if ff_mult > 0 else nn.Identity(), ] ) ) self.norm = nn.LayerNorm(dim) def forward(self, x): """ Args: x (torch.Tensor): image features shape (b, T, F, v, D) Returns: shape (b, T, n, D) where n is self.num_latents """ b, T, F, v = x.shape[:4] # frame and media time embeddings if exists(self.frame_embs): frame_embs = repeat(self.frame_embs[:F], "F d -> b T F v d", b=b, T=T, v=v) x = x + frame_embs x = rearrange(x, "b T F v d -> b T (F v) d") # flatten the frame and spatial dimensions if exists(self.media_time_embs): x = x + self.media_time_embs[:T] # blocks latents = repeat(self.latents, "n d -> b T n d", b=b, T=T) for attn, ff in self.layers: latents = attn(x, latents) + latents latents = ff(latents) + latents return self.norm(latents) class PerceiverResampler(nn.Module): def __init__(self, model_args, vision_tower): super().__init__() self.depth = model_args.mm_perceiver_depth self.num_latents = model_args.mm_perceiver_latents self.ff_mult = model_args.mm_perceiver_ff_mult self.pretrained = model_args.mm_perceiver_pretrained self.perceiver = PerceiverResamplerModule(dim=vision_tower.hidden_size, depth=self.depth, num_latents=self.num_latents, ff_mult=self.ff_mult) if self.pretrained is not None: self.load_state_dict(torch.load(self.pretrained)) def forward(self, image_features, *args, **kwargs): return self.perceiver(image_features[:, None, None]).squeeze(1) @property def config(self): return { "mm_resampler_type": "perceiver", "mm_perceiver_depth": self.depth, "mm_perceiver_latents": self.num_latents, "mm_perceiver_ff_mult": self.ff_mult, "mm_perceiver_pretrained": self.pretrained, }