Spaces:
Running
on
A10G
Running
on
A10G
from typing import Optional | |
import torch | |
from torch import nn | |
from torch.nn import functional as F | |
import numpy as np | |
from dataclasses import dataclass | |
from .transformer import ( | |
LayerNormFp32, | |
LayerNorm, | |
QuickGELU, | |
MultimodalTransformer, | |
) | |
from .model import CLIPTextCfg, CLIPVisionCfg, _build_vision_tower, _build_text_tower | |
try: | |
from transformers import ( | |
BeamSearchScorer, | |
LogitsProcessorList, | |
TopPLogitsWarper, | |
TopKLogitsWarper, | |
RepetitionPenaltyLogitsProcessor, | |
MinLengthLogitsProcessor, | |
MaxLengthCriteria, | |
StoppingCriteriaList | |
) | |
GENERATION_TYPES = { | |
"top_k": TopKLogitsWarper, | |
"top_p": TopPLogitsWarper, | |
"beam_search": "beam_search" | |
} | |
_has_transformers = True | |
except ImportError as e: | |
GENERATION_TYPES = { | |
"top_k": None, | |
"top_p": None, | |
"beam_search": "beam_search" | |
} | |
_has_transformers = False | |
class MultimodalCfg(CLIPTextCfg): | |
mlp_ratio: int = 4 | |
dim_head: int = 64 | |
heads: int = 8 | |
n_queries: int = 256 | |
attn_pooler_heads: int = 8 | |
def _build_text_decoder_tower( | |
embed_dim, | |
multimodal_cfg, | |
quick_gelu: bool = False, | |
cast_dtype: Optional[torch.dtype] = None, | |
): | |
multimodal_cfg = MultimodalCfg(**multimodal_cfg) if isinstance(multimodal_cfg, dict) else multimodal_cfg | |
act_layer = QuickGELU if quick_gelu else nn.GELU | |
norm_layer = ( | |
LayerNormFp32 if cast_dtype in (torch.float16, torch.bfloat16) else LayerNorm | |
) | |
decoder = MultimodalTransformer( | |
context_length=multimodal_cfg.context_length, | |
width=multimodal_cfg.width, | |
heads=multimodal_cfg.heads, | |
layers=multimodal_cfg.layers, | |
ls_init_value=multimodal_cfg.ls_init_value, | |
output_dim=embed_dim, | |
act_layer=act_layer, | |
norm_layer=norm_layer, | |
) | |
return decoder | |
class CoCa(nn.Module): | |
def __init__( | |
self, | |
embed_dim, | |
multimodal_cfg: MultimodalCfg, | |
text_cfg: CLIPTextCfg, | |
vision_cfg: CLIPVisionCfg, | |
quick_gelu: bool = False, | |
cast_dtype: Optional[torch.dtype] = None, | |
pad_id: int = 0, | |
): | |
super().__init__() | |
multimodal_cfg = MultimodalCfg(**multimodal_cfg) if isinstance(multimodal_cfg, dict) else multimodal_cfg | |
text_cfg = CLIPTextCfg(**text_cfg) if isinstance(text_cfg, dict) else text_cfg | |
vision_cfg = CLIPVisionCfg(**vision_cfg) if isinstance(vision_cfg, dict) else vision_cfg | |
self.text = _build_text_tower( | |
embed_dim=embed_dim, | |
text_cfg=text_cfg, | |
quick_gelu=quick_gelu, | |
cast_dtype=cast_dtype, | |
) | |
vocab_size = ( | |
text_cfg.vocab_size # for hf models | |
if hasattr(text_cfg, "hf_model_name") and text_cfg.hf_model_name is not None | |
else text_cfg.vocab_size | |
) | |
self.visual = _build_vision_tower( | |
embed_dim=embed_dim, | |
vision_cfg=vision_cfg, | |
quick_gelu=quick_gelu, | |
cast_dtype=cast_dtype, | |
) | |
self.text_decoder = _build_text_decoder_tower( | |
vocab_size, | |
multimodal_cfg=multimodal_cfg, | |
quick_gelu=quick_gelu, | |
cast_dtype=cast_dtype, | |
) | |
self.logit_scale = nn.Parameter(torch.ones([]) * np.log(1 / 0.07)) | |
self.pad_id = pad_id | |
def set_grad_checkpointing(self, enable=True): | |
self.visual.set_grad_checkpointing(enable) | |
self.text.set_grad_checkpointing(enable) | |
self.text_decoder.set_grad_checkpointing(enable) | |
def _encode_image(self, images, normalize=True): | |
image_latent, tokens_embs = self.visual(images) | |
image_latent = F.normalize(image_latent, dim=-1) if normalize else image_latent | |
return image_latent, tokens_embs | |
def _encode_text(self, text, normalize=True, embed_cls=True): | |
text = text[:, :-1] if embed_cls else text # make space for CLS token | |
text_latent, token_emb = self.text(text) | |
text_latent = F.normalize(text_latent, dim=-1) if normalize else text_latent | |
return text_latent, token_emb | |
def encode_image(self, images, normalize=True): | |
image_latent, _ = self._encode_image(images, normalize=normalize) | |
return image_latent | |
def encode_text(self, text, normalize=True, embed_cls=True): | |
text_latent, _ = self._encode_text(text, normalize=normalize, embed_cls=embed_cls) | |
return text_latent | |
def forward(self, image, text, embed_cls=True, image_latent=None, image_embs=None): | |
text_latent, token_embs = self._encode_text(text, embed_cls=embed_cls) | |
if image_latent is None or image_embs is None: | |
image_latent, image_embs = self._encode_image(image) | |
# TODO: add assertion to avoid bugs? | |
labels = text[:, -token_embs.shape[1]:] | |
logits = self.text_decoder(image_embs, token_embs) | |
return { | |
"image_features": image_latent, | |
"text_features": text_latent, | |
"logits": logits, | |
"labels": labels, | |
"logit_scale": self.logit_scale.exp() | |
} | |
def generate( | |
self, | |
image, | |
text=None, | |
seq_len=30, | |
max_seq_len=77, | |
temperature=1., | |
generation_type="beam_search", | |
top_p=0.1, # keep tokens in the 1 - top_p quantile | |
top_k=1, # keeps the top_k most probable tokens | |
pad_token_id=None, | |
eos_token_id=None, | |
sot_token_id=None, | |
num_beams=6, | |
num_beam_groups=3, | |
min_seq_len=5, | |
stopping_criteria=None, | |
repetition_penalty=1.0, | |
fixed_output_length=False # if True output.shape == (batch_size, seq_len) | |
): | |
# taking many ideas and components from HuggingFace GenerationMixin | |
# https://huggingface.co/docs/transformers/main/en/main_classes/text_generation | |
assert _has_transformers, "Please install transformers for generate functionality. `pip install transformers`." | |
assert seq_len > min_seq_len, "seq_len must be larger than min_seq_len" | |
with torch.no_grad(): | |
sot_token_id = 49406 if sot_token_id is None else sot_token_id | |
eos_token_id = 49407 if eos_token_id is None else eos_token_id | |
pad_token_id = self.pad_id if pad_token_id is None else pad_token_id | |
logit_processor = LogitsProcessorList( | |
[ | |
MinLengthLogitsProcessor(min_seq_len, eos_token_id), | |
RepetitionPenaltyLogitsProcessor(repetition_penalty), | |
] | |
) | |
if stopping_criteria is None: | |
stopping_criteria = [MaxLengthCriteria(max_length=seq_len)] | |
stopping_criteria = StoppingCriteriaList( | |
stopping_criteria | |
) | |
device = image.device | |
if generation_type == "beam_search": | |
output = self._generate_beamsearch( | |
image_inputs = image, | |
pad_token_id=pad_token_id, | |
eos_token_id=eos_token_id, | |
sot_token_id=sot_token_id, | |
num_beams=num_beams, | |
num_beam_groups=num_beam_groups, | |
min_seq_len=min_seq_len, | |
stopping_criteria=stopping_criteria, | |
logit_processor=logit_processor, | |
) | |
if fixed_output_length and output.shape[1] < seq_len: | |
return torch.cat( | |
(output, torch.ones(output.shape[0], seq_len-output.shape[1], device=device, dtype=output.dtype) * self.pad_id), | |
dim=1 | |
) | |
return output | |
elif generation_type == "top_p": | |
logit_warper = GENERATION_TYPES[generation_type](top_p) | |
elif generation_type == "top_k": | |
logit_warper = GENERATION_TYPES[generation_type](top_k) | |
else: | |
raise ValueError( | |
f"generation_type has to be one of " | |
f"{'| ' + ' | '.join(list(GENERATION_TYPES.keys())) + ' |'}." | |
) | |
image_latent, image_embs = self._encode_image(image) | |
if text is None: | |
text = torch.ones((image.shape[0], 1), device=device, dtype=torch.long) * sot_token_id | |
was_training = self.training | |
num_dims = len(text.shape) | |
if num_dims == 1: | |
text = text[None, :] | |
cur_len = text.shape[1] | |
self.eval() | |
out = text | |
while True: | |
x = out[:, -max_seq_len:] | |
cur_len = x.shape[1] | |
logits = self(image, x, image_latent=image_latent, image_embs=image_embs, embed_cls=False)["logits"][:, -1] | |
mask = (out[:, -1] == eos_token_id) | (out[:, -1] == pad_token_id) | |
sample = torch.ones((out.shape[0], 1), device=device, dtype=torch.long) * pad_token_id | |
if mask.all(): | |
if not fixed_output_length: | |
break | |
else: | |
logits = logits[~mask, :] | |
filtered_logits = logit_processor(x[~mask, :], logits) | |
filtered_logits = logit_warper(x[~mask, :], filtered_logits) | |
probs = F.softmax(filtered_logits / temperature, dim=-1) | |
if (cur_len + 1 == seq_len): | |
sample[~mask, :] = torch.ones((sum(~mask), 1), device=device, dtype=torch.long) * eos_token_id | |
else: | |
sample[~mask, :] = torch.multinomial(probs, 1) | |
out = torch.cat((out, sample), dim=-1) | |
cur_len += 1 | |
if stopping_criteria(out, None): | |
break | |
if num_dims == 1: | |
out = out.squeeze(0) | |
self.train(was_training) | |
return out | |
def _generate_beamsearch( | |
self, | |
image_inputs, | |
pad_token_id=None, | |
eos_token_id=None, | |
sot_token_id=None, | |
num_beams=6, | |
num_beam_groups=3, | |
min_seq_len=5, | |
stopping_criteria=None, | |
logit_processor=None, | |
logit_warper=None, | |
): | |
device = image_inputs.device | |
batch_size = image_inputs.shape[0] | |
image_inputs = torch.repeat_interleave(image_inputs, num_beams, dim=0) | |
image_latent, image_embs = self._encode_image(image_inputs) | |
input_ids = torch.ones((batch_size * num_beams, 1), device=device, dtype=torch.long) | |
input_ids = input_ids * sot_token_id | |
beam_scorer = BeamSearchScorer( | |
batch_size=batch_size, | |
num_beams=num_beams, | |
device=device, | |
num_beam_groups=num_beam_groups, | |
) | |
# instantiate logits processors | |
logits_processor = ( | |
LogitsProcessorList([MinLengthLogitsProcessor(min_seq_len, eos_token_id=eos_token_id)]) | |
if logit_processor is None | |
else logit_processor | |
) | |
batch_size = len(beam_scorer._beam_hyps) | |
num_beams = beam_scorer.num_beams | |
num_beam_groups = beam_scorer.num_beam_groups | |
num_sub_beams = num_beams // num_beam_groups | |
batch_beam_size, cur_len = input_ids.shape | |
beam_indices = None | |
if num_beams * batch_size != batch_beam_size: | |
raise ValueError( | |
f"Batch dimension of `input_ids` should be {num_beams * batch_size}, but is {batch_beam_size}." | |
) | |
beam_scores = torch.full((batch_size, num_beams), -1e9, dtype=torch.float, device=device) | |
# initialise score of first beam of each group with 0 and the rest with 1e-9. This ensures that the beams in | |
# the same group don't produce same tokens everytime. | |
beam_scores[:, ::num_sub_beams] = 0 | |
beam_scores = beam_scores.view((batch_size * num_beams,)) | |
while True: | |
# predicted tokens in cur_len step | |
current_tokens = torch.zeros(batch_size * num_beams, dtype=input_ids.dtype, device=device) | |
# indices which will form the beams in the next time step | |
reordering_indices = torch.zeros(batch_size * num_beams, dtype=torch.long, device=device) | |
# do one decoder step on all beams of all sentences in batch | |
model_inputs = prepare_inputs_for_generation(input_ids=input_ids, image_inputs=image_inputs) | |
outputs = self( | |
model_inputs['images'], | |
model_inputs['text'], | |
embed_cls=False, | |
image_latent=image_latent, | |
image_embs=image_embs | |
) | |
for beam_group_idx in range(num_beam_groups): | |
group_start_idx = beam_group_idx * num_sub_beams | |
group_end_idx = min(group_start_idx + num_sub_beams, num_beams) | |
group_size = group_end_idx - group_start_idx | |
# indices of beams of current group among all sentences in batch | |
batch_group_indices = [] | |
for batch_idx in range(batch_size): | |
batch_group_indices.extend( | |
[batch_idx * num_beams + idx for idx in range(group_start_idx, group_end_idx)] | |
) | |
group_input_ids = input_ids[batch_group_indices] | |
# select outputs of beams of currentg group only | |
next_token_logits = outputs['logits'][batch_group_indices, -1, :] | |
vocab_size = next_token_logits.shape[-1] | |
next_token_scores_processed = logits_processor( | |
group_input_ids, next_token_logits, current_tokens=current_tokens, beam_group_idx=beam_group_idx | |
) | |
next_token_scores = next_token_scores_processed + beam_scores[batch_group_indices].unsqueeze(-1) | |
next_token_scores = next_token_scores.expand_as(next_token_scores_processed) | |
# reshape for beam search | |
next_token_scores = next_token_scores.view(batch_size, group_size * vocab_size) | |
next_token_scores, next_tokens = torch.topk( | |
next_token_scores, 2 * group_size, dim=1, largest=True, sorted=True | |
) | |
next_indices = torch.div(next_tokens, vocab_size, rounding_mode="floor") | |
next_tokens = next_tokens % vocab_size | |
# stateless | |
process_beam_indices = sum(beam_indices, ()) if beam_indices is not None else None | |
beam_outputs = beam_scorer.process( | |
group_input_ids, | |
next_token_scores, | |
next_tokens, | |
next_indices, | |
pad_token_id=pad_token_id, | |
eos_token_id=eos_token_id, | |
beam_indices=process_beam_indices, | |
) | |
beam_scores[batch_group_indices] = beam_outputs["next_beam_scores"] | |
beam_next_tokens = beam_outputs["next_beam_tokens"] | |
beam_idx = beam_outputs["next_beam_indices"] | |
input_ids[batch_group_indices] = group_input_ids[beam_idx] | |
group_input_ids = torch.cat([group_input_ids[beam_idx, :], beam_next_tokens.unsqueeze(-1)], dim=-1) | |
current_tokens[batch_group_indices] = group_input_ids[:, -1] | |
# (beam_idx // group_size) -> batch_idx | |
# (beam_idx % group_size) -> offset of idx inside the group | |
reordering_indices[batch_group_indices] = ( | |
num_beams * torch.div(beam_idx, group_size, rounding_mode="floor") + group_start_idx + (beam_idx % group_size) | |
) | |
input_ids = torch.cat([input_ids, current_tokens.unsqueeze(-1)], dim=-1) | |
# increase cur_len | |
cur_len = cur_len + 1 | |
if beam_scorer.is_done or stopping_criteria(input_ids, None): | |
break | |
final_beam_indices = sum(beam_indices, ()) if beam_indices is not None else None | |
sequence_outputs = beam_scorer.finalize( | |
input_ids, | |
beam_scores, | |
next_tokens, | |
next_indices, | |
pad_token_id=pad_token_id, | |
eos_token_id=eos_token_id, | |
max_length=stopping_criteria.max_length, | |
beam_indices=final_beam_indices, | |
) | |
return sequence_outputs['sequences'] | |
def prepare_inputs_for_generation(input_ids, image_inputs, past=None, **kwargs): | |
if past: | |
input_ids = input_ids[:, -1].unsqueeze(-1) | |
attention_mask = kwargs.get("attention_mask", None) | |
position_ids = kwargs.get("position_ids", None) | |
if attention_mask is not None and position_ids is None: | |
# create position_ids on the fly for batch generation | |
position_ids = attention_mask.long().cumsum(-1) - 1 | |
position_ids.masked_fill_(attention_mask == 0, 1) | |
else: | |
position_ids = None | |
return { | |
"text": input_ids, | |
"images": image_inputs, | |
"past_key_values": past, | |
"position_ids": position_ids, | |
"attention_mask": attention_mask, | |
} | |