File size: 11,360 Bytes
acb7e86 818ca8a acb7e86 818ca8a acb7e86 71ad8b3 acb7e86 818ca8a 71ad8b3 acb7e86 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 |
from transformers import GPT2Config, AutoTokenizer, GPT2Config
from transformers import PretrainedConfig, PreTrainedModel
import transformers
from typing import Optional, Tuple, Callable, List
import torch
import torch.nn as nn
from transformers.modeling_utils import PreTrainedModel, PretrainedConfig
from .utils import CABlock, _GPT2LMHeadModel
from .configuration_prot2text import Prot2TextConfig
from transformers.generation.configuration_utils import GenerationConfig
from transformers.generation.logits_process import LogitsProcessorList
from transformers.generation.stopping_criteria import StoppingCriteriaList
class Prot2TextModel(PreTrainedModel):
config_class = Prot2TextConfig
_keys_to_ignore_on_load_missing = [r"transformer"]
base_model_prefix = "decoder"
def __init__(self, config):
super().__init__(config)
self.gpt_config = GPT2Config.from_dict(config.gpt_config)
# define the GPT2 decoder
self.decoder = _GPT2LMHeadModel(self.gpt_config)
# if using ESM to encode protein's sequence, define the ESM layer, the Projection layer and the fusion layer
if config.esm:
self.esm_config = PretrainedConfig.from_dict(config.esm_config)
self.esm = transformers.EsmModel(self.esm_config)
self.to_embedding = nn.Linear(self.esm_config.hidden_size, self.gpt_config.n_embd)
if config.cross_esm_graph and config.rgcn:
self.h = nn.ModuleList([CABlock(self.gpt_config, layer_idx=i) for i in range(4)])
self.ln_f = nn.LayerNorm(self.gpt_config.n_embd, eps=self.gpt_config.layer_norm_epsilon)
self.config = config
def get_encoder(self):
return self.encoder
def get_decoder(self):
return self.decoder
def get_input_embeddings(self):
if hasattr(self, "transformer"):
return self.transformer.wte
return self.decoder.transformer.wte
def warm_up(self, gpt_model=None, esm_model=None):
if esm_model is not None:
self.esm = transformers.EsmModel.from_pretrained(esm_model)
if gpt_model is not None:
self.decoder = _GPT2LMHeadModel.from_pretrained(gpt_model, add_cross_attention=True, use_cache=False)
self.decoder.resize_token_embeddings(self.gpt_config.vocab_size)
self.decoder.config = self.gpt_config
def forward(self,
encoder_input_ids: Optional[torch.LongTensor] = None,
edge_index: Optional[torch.LongTensor] = None,
batch: Optional[torch.LongTensor] = None,
x: Optional[torch.FloatTensor] = None,
edge_type: Optional[torch.LongTensor] = None,
decoder_input_ids: Optional[torch.LongTensor] = None,
past_key_values: Optional[Tuple[Tuple[torch.Tensor]]] = None,
past_key_values_graph_esm: Optional[Tuple[Tuple[torch.Tensor]]] = None,
decoder_attention_mask: Optional[torch.FloatTensor] = None,
attention_mask: Optional[torch.FloatTensor] = None,
token_type_ids: Optional[torch.LongTensor] = None,
position_ids: Optional[torch.LongTensor] = None,
head_mask: Optional[torch.FloatTensor] = None,
inputs_embeds: Optional[torch.FloatTensor] = None,
encoder_hidden_states: Optional[torch.Tensor] = None,
encoder_attention_mask: Optional[torch.FloatTensor] = None,
labels: Optional[torch.LongTensor] = None,
use_cache: Optional[bool] = None,
output_attentions: Optional[bool] = None,
output_hidden_states: Optional[bool] = None,
return_dict: Optional[bool] = None,
get_graph_emb: Optional[bool] = False,
**delete_args,
):
use_cache = use_cache if use_cache is not None else self.gpt_config.use_cache
return_dict = return_dict if return_dict is not None else self.gpt_config.use_return_dict
if decoder_input_ids is not None and len(decoder_input_ids.size()) == 3:
decoder_input_ids = decoder_input_ids.squeeze(0)
if self.config.esm:
if self.config.prot2text_version=='1.0':
if encoder_input_ids.size()[1] != 1021:
raise ValueError("For this version of the model you need to PAD/Truncate the amino acid sequence for the ESM model to 1021")
esm_emb = self.esm(input_ids=encoder_input_ids, attention_mask=attention_mask, return_dict=return_dict).last_hidden_state
esm_emb = self.to_embedding(esm_emb)
graph_emb = esm_emb
else:
attention_mask = None
if self.config.prot2text_version=='1.0':
attention_mask = None
if get_graph_emb:
return graph_emb
transformer_outputs = self.decoder(input_ids=decoder_input_ids,
past_key_values=past_key_values,
attention_mask=decoder_attention_mask,
token_type_ids=token_type_ids,
position_ids=position_ids,
head_mask=head_mask,
inputs_embeds=inputs_embeds,
encoder_hidden_states=graph_emb,
encoder_attention_mask=attention_mask,
labels=labels,
use_cache=use_cache,
output_attentions=output_attentions,
output_hidden_states=output_hidden_states,
return_dict=return_dict,
)
return transformer_outputs
@torch.no_grad()
def generate_protein_description(self,
protein_sequence=None,
tokenizer=None,
device='cpu',
streamer=None,
max_new_tokens=None,
do_sample=None,
top_p=None,
top_k=None,
temperature=None,
num_beams=1,
repetition_penalty=None
):
if self.config.esm and not self.config.rgcn and protein_sequence==None:
raise ValueError(
"The model you are trying to use is based only on protein sequence, please provide an amino-acid protein_sequence"
)
if self.config.esm:
esmtokenizer = AutoTokenizer.from_pretrained(self.config.esm_model_name)
seq = esmtokenizer([protein_sequence], add_special_tokens=True, truncation=True, max_length=1021, padding='max_length', return_tensors="pt")
inputs={}
inputs['encoder_input_ids'] = seq['input_ids']
inputs['attention_mask'] = seq['attention_mask']
inputs['decoder_input_ids'] = inputs['encoder_input_ids'][:,0:1].clone()
inputs['decoder_input_ids'][:,0] = tokenizer.bos_token_id
self.to(device)
inputs = {k: v.to(device=device, non_blocking=True) if hasattr(v, 'to') else v for k, v in inputs.items()}
encoder_state = dict()
encoder_state['hidden_states'] = self(**inputs, get_graph_emb=True, output_attentions=True)
if streamer is None:
generated = tokenizer.batch_decode(self.decoder.generate(input_ids=inputs['decoder_input_ids'], encoder_outputs=encoder_state, use_cache=True), skip_special_tokens=True)
return generated[0].replace('<|stop_token|>', '').replace('<|graph_token|>', '')
else:
return self.decoder.generate(input_ids=inputs['decoder_input_ids'],
encoder_outputs=encoder_state,
use_cache=True,
streamer=streamer,
max_new_tokens=max_new_tokens,
do_sample=do_sample,
top_p=top_p,
top_k=top_k,
temperature=temperature,
num_beams=1,
repetition_penalty=repetition_penalty)
@torch.no_grad()
def generate(self,
inputs: Optional[torch.Tensor] = None,
generation_config: Optional[GenerationConfig] = None,
logits_processor: Optional[LogitsProcessorList] = None,
stopping_criteria: Optional[StoppingCriteriaList] = None,
prefix_allowed_tokens_fn: Optional[Callable[[int, torch.Tensor], List[int]]] = None,
synced_gpus: Optional[bool] = None,
assistant_model: Optional["PreTrainedModel"] = None,
streamer: Optional["BaseStreamer"] = None,
**kwargs,
):
encoder_state = self(**kwargs, get_graph_emb=True)
input_ids = kwargs['decoder_input_ids']
attention_mask = kwargs['decoder_attention_mask']
kwargs['encoder_attention_mask'] = kwargs['attention_mask']
if not self.config.cross_esm_graph and self.config.rgcn and self.config.esm:
t_add = torch.ones((kwargs['encoder_attention_mask'].size(0), 1)).to(kwargs['encoder_attention_mask'].get_device())
kwargs['encoder_attention_mask'] = torch.cat((t_add, kwargs['encoder_attention_mask']), dim=1)
for key in ['edge_index', 'edge_type', 'x', 'encoder_input_ids', 'decoder_input_ids', 'decoder_attention_mask', 'batch', 'attention_mask', 'max_length',
'_num_nodes', 'node_id', 'name', 'sequence', 'distance_matrix', 'distance', 'coordinates', 'ptr', 'num_nodes',]:
if key in kwargs.keys():
kwargs.pop(key)
return self.decoder.generate(input_ids=input_ids,
generation_config=generation_config,
logits_processor=logits_processor,
stopping_criteria=stopping_criteria,
prefix_allowed_tokens_fn=prefix_allowed_tokens_fn,
synced_gpus=synced_gpus,
assistant_model=assistant_model,
streamer=streamer,
encoder_outputs={'hidden_states': encoder_state, 'attentions':0},
**kwargs
) |