Spaces:
Sleeping
Sleeping
import gradio as gr | |
import torch | |
from fuse.data.tokenizers.modular_tokenizer.op import ModularTokenizerOp | |
from mammal.examples.dti_bindingdb_kd.task import DtiBindingdbKdTask | |
from mammal.keys import * | |
from mammal.model import Mammal | |
from abc import ABC, abstractmethod | |
class MammalObjectBroker(): | |
def __init__(self, model_path: str, name:str= None, task_list: list[str]=None) -> None: | |
self.model_path = model_path | |
if name is None: | |
name = model_path | |
self.name = name | |
if task_list is not None: | |
self.tasks=task_list | |
else: | |
self.task = [] | |
self._model = None | |
self._tokenizer_op = None | |
def model(self)-> Mammal: | |
if self._model is None: | |
self._model = Mammal.from_pretrained(self.model_path) | |
self._model.eval() | |
return self._model | |
def tokenizer_op(self): | |
if self._tokenizer_op is None: | |
self._tokenizer_op = ModularTokenizerOp.from_pretrained(self.model_path) | |
return self._tokenizer_op | |
class MammalTask(ABC): | |
def __init__(self, name:str) -> None: | |
self.name = name | |
self.description = None | |
self._demo = None | |
# @abstractmethod | |
# def _generate_prompt(self, **kwargs) -> str: | |
# """Formatting prompt to match pre-training syntax | |
# Args: | |
# prot1 (_type_): _description_ | |
# prot2 (_type_): _description_ | |
# Raises: | |
# No: _description_ | |
# """ | |
# raise NotImplementedError() | |
def crate_sample_dict(self,sample_inputs: dict, model_holder:MammalObjectBroker) -> dict: | |
"""Formatting prompt to match pre-training syntax | |
Args: | |
prompt (str): _description_ | |
Returns: | |
dict: sample_dict for feeding into model | |
""" | |
raise NotImplementedError() | |
# @abstractmethod | |
def run_model(self, sample_dict, model:Mammal): | |
raise NotImplementedError() | |
def create_demo(self, model_name_widget: gr.component) -> gr.Group: | |
"""create an gradio demo group | |
Args: | |
model_name_widgit (gr.Component): widget holding the model name to use. This is needed to create | |
gradio actions with the current model name as an input | |
Raises: | |
NotImplementedError: _description_ | |
""" | |
raise NotImplementedError() | |
def demo(self,model_name_widgit:gr.component=None): | |
if self._demo is None: | |
model_name_widget:gr.component | |
self._demo = self.create_demo(model_name_widget=model_name_widgit) | |
return self._demo | |
def decode_output(self,batch_dict, model:Mammal): | |
raise NotImplementedError() | |
#self._setup() | |
# def _setup(self): | |
# pass | |
all_tasks = dict() | |
all_models= dict() | |
class PpiTask(MammalTask): | |
def __init__(self): | |
super().__init__(name="Protein-Protein Interaction") | |
self.description = "Protein-Protein Interaction (PPI)" | |
self.examples = { | |
"protein_calmodulin": "MADQLTEEQIAEFKEAFSLFDKDGDGTITTKELGTVMRSLGQNPTEAELQDMISELDQDGFIDKEDLHDGDGKISFEEFLNLVNKEMTADVDGDGQVNYEEFVTMMTSK", | |
"protein_calcineurin": "MSSKLLLAGLDIERVLAEKNFYKEWDTWIIEAMNVGDEEVDRIKEFKEDEIFEEAKTLGTAEMQEYKKQKLEEAIEGAFDIFDKDGNGYISAAELRHVMTNLGEKLTDEEVDEMIRQMWDQNGDWDRIKELKFGEIKKLSAKDTRGTIFIKVFENLGTGVDSEYEDVSKYMLKHQ", | |
} | |
self.markup_text = """ | |
# Mammal based {self.description} demonstration | |
Given two protein sequences, estimate if the proteins interact or not.""" | |
def positive_token_id(model_holder: MammalObjectBroker): | |
"""token for positive binding | |
Args: | |
model (MammalTrainedModel): model holding tokenizer | |
Returns: | |
int: id of positive binding token | |
""" | |
return model_holder.tokenizer_op.get_token_id("<1>") | |
def generate_prompt(self, prot1, prot2): | |
"""Formatting prompt to match pre-training syntax | |
Args: | |
prot1 (str): sequance of protein number 1 | |
prot2 (str): sequance of protein number 2 | |
Returns: | |
str: prompt | |
""" | |
prompt = f"<@TOKENIZER-TYPE=AA><BINDING_AFFINITY_CLASS><SENTINEL_ID_0>"\ | |
"<MOLECULAR_ENTITY><MOLECULAR_ENTITY_GENERAL_PROTEIN>"\ | |
"<SEQUENCE_NATURAL_START>{prot1}<SEQUENCE_NATURAL_END>"\ | |
"<MOLECULAR_ENTITY><MOLECULAR_ENTITY_GENERAL_PROTEIN>"\ | |
"<SEQUENCE_NATURAL_START>{prot2}<SEQUENCE_NATURAL_END><EOS>" | |
return prompt | |
def crate_sample_dict(self,sample_inputs: dict, model_holder:MammalObjectBroker): | |
# Create and load sample | |
sample_dict = dict() | |
prompt = self.generate_prompt(*sample_inputs) | |
sample_dict[ENCODER_INPUTS_STR] = prompt | |
# Tokenize | |
sample_dict = model_holder.tokenizer_op( | |
sample_dict=sample_dict, | |
key_in=ENCODER_INPUTS_STR, | |
key_out_tokens_ids=ENCODER_INPUTS_TOKENS, | |
key_out_attention_mask=ENCODER_INPUTS_ATTENTION_MASK, | |
) | |
sample_dict[ENCODER_INPUTS_TOKENS] = torch.tensor( | |
sample_dict[ENCODER_INPUTS_TOKENS] | |
) | |
sample_dict[ENCODER_INPUTS_ATTENTION_MASK] = torch.tensor( | |
sample_dict[ENCODER_INPUTS_ATTENTION_MASK] | |
) | |
return sample_dict | |
def run_model(self, sample_dict, model: Mammal): | |
# Generate Prediction | |
batch_dict = model.generate( | |
[sample_dict], | |
output_scores=True, | |
return_dict_in_generate=True, | |
max_new_tokens=5, | |
) | |
return batch_dict | |
def decode_output(self,batch_dict, model_holder:MammalObjectBroker): | |
# Get output | |
generated_output = model_holder.tokenizer_op._tokenizer.decode(batch_dict[CLS_PRED][0]) | |
score = batch_dict["model.out.scores"][0][1][self.positive_token_id(model_holder)].item() | |
return generated_output, score | |
def create_and_run_prompt(self,model_name,protein1, protein2): | |
model_holder = all_models[model_name] | |
sample_inputs = {"prot1":protein1, | |
"prot2":protein2 | |
} | |
sample_dict = self.crate_sample_dict(sample_inputs=sample_inputs, model_holder=model_holder) | |
prompt = sample_dict[ENCODER_INPUTS_STR] | |
batch_dict = self.run_model(sample_dict=sample_dict, model=model_holder.model) | |
res = prompt, *self.decode_output(batch_dict,model_holder=model_holder) | |
return res | |
def create_demo(self,model_name_widget:gr.component): | |
# """ | |
# ### Using the model from | |
# ```{model} ``` | |
# """ | |
with gr.Group() as demo: | |
gr.Markdown(self.markup_text) | |
with gr.Row(): | |
prot1 = gr.Textbox( | |
label="Protein 1 sequence", | |
# info="standard", | |
interactive=True, | |
lines=3, | |
value=self.examples["protein_calmodulin"], | |
) | |
prot2 = gr.Textbox( | |
label="Protein 2 sequence", | |
# info="standard", | |
interactive=True, | |
lines=3, | |
value=self.examples["protein_calcineurin"], | |
) | |
with gr.Row(): | |
run_mammal: gr.Button = gr.Button( | |
"Run Mammal prompt for Protein-Protein Interaction", variant="primary" | |
) | |
with gr.Row(): | |
prompt_box = gr.Textbox(label="Mammal prompt", lines=5) | |
with gr.Row(): | |
decoded = gr.Textbox(label="Mammal output") | |
run_mammal.click( | |
fn=self.create_and_run_prompt, | |
inputs=[model_name_widget, prot1, prot2], | |
outputs=[prompt_box, decoded, gr.Number(label="PPI score")], | |
) | |
with gr.Row(): | |
gr.Markdown( | |
"```<SENTINEL_ID_0>``` contains the binding affinity class, which is ```<1>``` for interacting and ```<0>``` for non-interacting" | |
) | |
demo.visible = False | |
return demo | |
ppi_task = PpiTask() | |
all_tasks[ppi_task.name]=ppi_task | |
class DtiTask(MammalTask): | |
def __init__(self): | |
super().__init__(name="Drug-Target Binding Affinity") | |
self.description = "Drug-Target Binding Affinity (tdi)" | |
self.examples = { | |
"target_seq": "NLMKRCTRGFRKLGKCTTLEEEKCKTLYPRGQCTCSDSKMNTHSCDCKSC", | |
"drug_seq":"CC(=O)NCCC1=CNc2c1cc(OC)cc2" | |
} | |
self.markup_text = """ | |
# Mammal based Target-Drug binding affinity demonstration | |
Given a protein sequence and a drug (in SMILES), estimate the binding affinity. | |
""" | |
def crate_sample_dict(self, sample_inputs:dict, model_holder:MammalObjectBroker): | |
"""convert sample_inputs to sample_dict including creating a proper prompt | |
Args: | |
sample_inputs (dict): dictionary containing the inputs to the model | |
model_holder (MammalObjectBroker): model holder | |
Returns: | |
dict: sample_dict for feeding into model | |
""" | |
sample_dict = dict(sample_inputs) | |
sample_dict = DtiBindingdbKdTask.data_preprocessing( | |
sample_dict=sample_dict, | |
tokenizer_op=model_holder.tokenizer_op, | |
target_sequence_key="target_seq", | |
drug_sequence_key="drug_seq", | |
norm_y_mean=None, | |
norm_y_std=None, | |
device=model_holder.model.device, | |
) | |
return sample_dict | |
def run_model(self, sample_dict, model: Mammal): | |
# Generate Prediction | |
batch_dict = model.forward_encoder_only([sample_dict]) | |
return batch_dict | |
def decode_output(self,batch_dict, model_holder): | |
# Get output | |
batch_dict = DtiBindingdbKdTask.process_model_output( | |
batch_dict, | |
scalars_preds_processed_key="model.out.dti_bindingdb_kd", | |
norm_y_mean=5.79384684128215, | |
norm_y_std=1.33808027428196, | |
) | |
ans = ( | |
"model.out.dti_bindingdb_kd", | |
float(batch_dict["model.out.dti_bindingdb_kd"][0]), | |
) | |
return ans | |
def create_and_run_prompt(self,model_name,target_seq, drug_seq): | |
model_holder = all_models[model_name] | |
inputs = { | |
"target_seq": target_seq, | |
"drug_seq": drug_seq, | |
} | |
sample_dict = self.crate_sample_dict(sample_inputs=inputs, model_holder=model_holder) | |
prompt=sample_dict[ENCODER_INPUTS_STR] | |
batch_dict = self.run_model(sample_dict=sample_dict, model=model_holder.model) | |
res = prompt, *self.decode_output(batch_dict,model_holder=model_holder) | |
return res | |
def create_demo(self,model_name_widget): | |
# """ | |
# ### Using the model from | |
# ```{model} ``` | |
# """ | |
with gr.Group() as demo: | |
gr.Markdown(self.markup_text) | |
with gr.Row(): | |
target_textbox = gr.Textbox( | |
label="target sequence", | |
# info="standard", | |
interactive=True, | |
lines=3, | |
value=self.examples["target_seq"], | |
) | |
drug_textbox = gr.Textbox( | |
label="Drug sequance (in SMILES)", | |
# info="standard", | |
interactive=True, | |
lines=3, | |
value=self.examples["drug_seq"], | |
) | |
with gr.Row(): | |
run_mammal = gr.Button( | |
"Run Mammal prompt for Protein-Protein Interaction", variant="primary" | |
) | |
with gr.Row(): | |
prompt_box = gr.Textbox(label="Mammal prompt", lines=5) | |
with gr.Row(): | |
decoded = gr.Textbox(label="Mammal output key") | |
run_mammal.click( | |
fn=self.create_and_run_prompt, | |
inputs=[model_name_widget, target_textbox, drug_textbox], | |
outputs=[prompt_box, decoded, gr.Number(label="binding affinity")], | |
) | |
demo.visible = False | |
return demo | |
tdi_task = DtiTask() | |
all_tasks[tdi_task.name]=tdi_task | |
ppi_model = MammalObjectBroker(model_path="ibm/biomed.omics.bl.sm.ma-ted-458m", task_list=[ppi_task.name]) | |
all_models[ppi_model.name]=ppi_model | |
tdi_model = MammalObjectBroker(model_path="ibm/biomed.omics.bl.sm.ma-ted-458m.dti_bindingdb_pkd", task_list=[tdi_task.name]) | |
all_models[tdi_model.name]=tdi_model | |
def create_application(): | |
def task_change(value): | |
visibility = [gr.update(visible=(task==value)) for task in all_tasks.keys()] | |
# all_tasks[task].demo().visible = | |
choices=[model_name for model_name, model in all_models.items() if value in model.tasks] | |
if choices: | |
return (gr.update(choices=choices, value=choices[0]),*visibility) | |
else: | |
return (gr.skip,*visibility) | |
# return model_name_dropdown | |
with gr.Blocks() as application: | |
task_dropdown = gr.Dropdown(choices=["select demo"] + list(all_tasks.keys())) | |
task_dropdown.interactive = True | |
model_name_dropdown = gr.Dropdown(choices=[model_name for model_name, model in all_models.items() if task_dropdown.value in model.tasks], interactive=True) | |
ppi_demo = all_tasks[ppi_task.name].demo(model_name_widgit = model_name_dropdown) | |
# ppi_demo.visible = True | |
dtb_demo = all_tasks[tdi_task.name].demo(model_name_widgit = model_name_dropdown) | |
task_dropdown.change(task_change,inputs=[task_dropdown],outputs=[model_name_dropdown]+[all_tasks[task].demo() for task in all_tasks]) | |
# def set_demo_vis(main_text): | |
# main_text=main_text | |
# print(f"main text is {main_text}") | |
# return gr.Group(visible=True) | |
# #return gr.Group(visible=(main_text == "PPI")) | |
# # , gr.Group( visible=(main_text == "DTI") ) | |
# task_dropdown.change( | |
# set_ppi_vis, inputs=task_dropdown, outputs=[ppi_demo] | |
# ) | |
return application | |
full_demo=None | |
def main(): | |
global full_demo | |
full_demo = create_application() | |
full_demo.launch(show_error=True, share=False) | |
if __name__ == "__main__": | |
main() | |