matanninio's picture
both first demos now work
72dbfd7
raw
history blame
14.7 kB
import gradio as gr
import torch
from fuse.data.tokenizers.modular_tokenizer.op import ModularTokenizerOp
from mammal.examples.dti_bindingdb_kd.task import DtiBindingdbKdTask
from mammal.keys import *
from mammal.model import Mammal
from abc import ABC, abstractmethod
class MammalObjectBroker():
def __init__(self, model_path: str, name:str= None, task_list: list[str]=None) -> None:
self.model_path = model_path
if name is None:
name = model_path
self.name = name
if task_list is not None:
self.tasks=task_list
else:
self.task = []
self._model = None
self._tokenizer_op = None
@property
def model(self)-> Mammal:
if self._model is None:
self._model = Mammal.from_pretrained(self.model_path)
self._model.eval()
return self._model
@property
def tokenizer_op(self):
if self._tokenizer_op is None:
self._tokenizer_op = ModularTokenizerOp.from_pretrained(self.model_path)
return self._tokenizer_op
class MammalTask(ABC):
def __init__(self, name:str) -> None:
self.name = name
self.description = None
self._demo = None
# @abstractmethod
# def _generate_prompt(self, **kwargs) -> str:
# """Formatting prompt to match pre-training syntax
# Args:
# prot1 (_type_): _description_
# prot2 (_type_): _description_
# Raises:
# No: _description_
# """
# raise NotImplementedError()
@abstractmethod
def crate_sample_dict(self,sample_inputs: dict, model_holder:MammalObjectBroker) -> dict:
"""Formatting prompt to match pre-training syntax
Args:
prompt (str): _description_
Returns:
dict: sample_dict for feeding into model
"""
raise NotImplementedError()
# @abstractmethod
def run_model(self, sample_dict, model:Mammal):
raise NotImplementedError()
def create_demo(self, model_name_widget: gr.component) -> gr.Group:
"""create an gradio demo group
Args:
model_name_widgit (gr.Component): widget holding the model name to use. This is needed to create
gradio actions with the current model name as an input
Raises:
NotImplementedError: _description_
"""
raise NotImplementedError()
def demo(self,model_name_widgit:gr.component=None):
if self._demo is None:
model_name_widget:gr.component
self._demo = self.create_demo(model_name_widget=model_name_widgit)
return self._demo
@abstractmethod
def decode_output(self,batch_dict, model:Mammal):
raise NotImplementedError()
#self._setup()
# def _setup(self):
# pass
all_tasks = dict()
all_models= dict()
class PpiTask(MammalTask):
def __init__(self):
super().__init__(name="Protein-Protein Interaction")
self.description = "Protein-Protein Interaction (PPI)"
self.examples = {
"protein_calmodulin": "MADQLTEEQIAEFKEAFSLFDKDGDGTITTKELGTVMRSLGQNPTEAELQDMISELDQDGFIDKEDLHDGDGKISFEEFLNLVNKEMTADVDGDGQVNYEEFVTMMTSK",
"protein_calcineurin": "MSSKLLLAGLDIERVLAEKNFYKEWDTWIIEAMNVGDEEVDRIKEFKEDEIFEEAKTLGTAEMQEYKKQKLEEAIEGAFDIFDKDGNGYISAAELRHVMTNLGEKLTDEEVDEMIRQMWDQNGDWDRIKELKFGEIKKLSAKDTRGTIFIKVFENLGTGVDSEYEDVSKYMLKHQ",
}
self.markup_text = """
# Mammal based {self.description} demonstration
Given two protein sequences, estimate if the proteins interact or not."""
@staticmethod
def positive_token_id(model_holder: MammalObjectBroker):
"""token for positive binding
Args:
model (MammalTrainedModel): model holding tokenizer
Returns:
int: id of positive binding token
"""
return model_holder.tokenizer_op.get_token_id("<1>")
def generate_prompt(self, prot1, prot2):
"""Formatting prompt to match pre-training syntax
Args:
prot1 (str): sequance of protein number 1
prot2 (str): sequance of protein number 2
Returns:
str: prompt
"""
prompt = f"<@TOKENIZER-TYPE=AA><BINDING_AFFINITY_CLASS><SENTINEL_ID_0>"\
"<MOLECULAR_ENTITY><MOLECULAR_ENTITY_GENERAL_PROTEIN>"\
"<SEQUENCE_NATURAL_START>{prot1}<SEQUENCE_NATURAL_END>"\
"<MOLECULAR_ENTITY><MOLECULAR_ENTITY_GENERAL_PROTEIN>"\
"<SEQUENCE_NATURAL_START>{prot2}<SEQUENCE_NATURAL_END><EOS>"
return prompt
def crate_sample_dict(self,sample_inputs: dict, model_holder:MammalObjectBroker):
# Create and load sample
sample_dict = dict()
prompt = self.generate_prompt(*sample_inputs)
sample_dict[ENCODER_INPUTS_STR] = prompt
# Tokenize
sample_dict = model_holder.tokenizer_op(
sample_dict=sample_dict,
key_in=ENCODER_INPUTS_STR,
key_out_tokens_ids=ENCODER_INPUTS_TOKENS,
key_out_attention_mask=ENCODER_INPUTS_ATTENTION_MASK,
)
sample_dict[ENCODER_INPUTS_TOKENS] = torch.tensor(
sample_dict[ENCODER_INPUTS_TOKENS]
)
sample_dict[ENCODER_INPUTS_ATTENTION_MASK] = torch.tensor(
sample_dict[ENCODER_INPUTS_ATTENTION_MASK]
)
return sample_dict
def run_model(self, sample_dict, model: Mammal):
# Generate Prediction
batch_dict = model.generate(
[sample_dict],
output_scores=True,
return_dict_in_generate=True,
max_new_tokens=5,
)
return batch_dict
def decode_output(self,batch_dict, model_holder:MammalObjectBroker):
# Get output
generated_output = model_holder.tokenizer_op._tokenizer.decode(batch_dict[CLS_PRED][0])
score = batch_dict["model.out.scores"][0][1][self.positive_token_id(model_holder)].item()
return generated_output, score
def create_and_run_prompt(self,model_name,protein1, protein2):
model_holder = all_models[model_name]
sample_inputs = {"prot1":protein1,
"prot2":protein2
}
sample_dict = self.crate_sample_dict(sample_inputs=sample_inputs, model_holder=model_holder)
prompt = sample_dict[ENCODER_INPUTS_STR]
batch_dict = self.run_model(sample_dict=sample_dict, model=model_holder.model)
res = prompt, *self.decode_output(batch_dict,model_holder=model_holder)
return res
def create_demo(self,model_name_widget:gr.component):
# """
# ### Using the model from
# ```{model} ```
# """
with gr.Group() as demo:
gr.Markdown(self.markup_text)
with gr.Row():
prot1 = gr.Textbox(
label="Protein 1 sequence",
# info="standard",
interactive=True,
lines=3,
value=self.examples["protein_calmodulin"],
)
prot2 = gr.Textbox(
label="Protein 2 sequence",
# info="standard",
interactive=True,
lines=3,
value=self.examples["protein_calcineurin"],
)
with gr.Row():
run_mammal: gr.Button = gr.Button(
"Run Mammal prompt for Protein-Protein Interaction", variant="primary"
)
with gr.Row():
prompt_box = gr.Textbox(label="Mammal prompt", lines=5)
with gr.Row():
decoded = gr.Textbox(label="Mammal output")
run_mammal.click(
fn=self.create_and_run_prompt,
inputs=[model_name_widget, prot1, prot2],
outputs=[prompt_box, decoded, gr.Number(label="PPI score")],
)
with gr.Row():
gr.Markdown(
"```<SENTINEL_ID_0>``` contains the binding affinity class, which is ```<1>``` for interacting and ```<0>``` for non-interacting"
)
demo.visible = False
return demo
ppi_task = PpiTask()
all_tasks[ppi_task.name]=ppi_task
class DtiTask(MammalTask):
def __init__(self):
super().__init__(name="Drug-Target Binding Affinity")
self.description = "Drug-Target Binding Affinity (tdi)"
self.examples = {
"target_seq": "NLMKRCTRGFRKLGKCTTLEEEKCKTLYPRGQCTCSDSKMNTHSCDCKSC",
"drug_seq":"CC(=O)NCCC1=CNc2c1cc(OC)cc2"
}
self.markup_text = """
# Mammal based Target-Drug binding affinity demonstration
Given a protein sequence and a drug (in SMILES), estimate the binding affinity.
"""
def crate_sample_dict(self, sample_inputs:dict, model_holder:MammalObjectBroker):
"""convert sample_inputs to sample_dict including creating a proper prompt
Args:
sample_inputs (dict): dictionary containing the inputs to the model
model_holder (MammalObjectBroker): model holder
Returns:
dict: sample_dict for feeding into model
"""
sample_dict = dict(sample_inputs)
sample_dict = DtiBindingdbKdTask.data_preprocessing(
sample_dict=sample_dict,
tokenizer_op=model_holder.tokenizer_op,
target_sequence_key="target_seq",
drug_sequence_key="drug_seq",
norm_y_mean=None,
norm_y_std=None,
device=model_holder.model.device,
)
return sample_dict
def run_model(self, sample_dict, model: Mammal):
# Generate Prediction
batch_dict = model.forward_encoder_only([sample_dict])
return batch_dict
def decode_output(self,batch_dict, model_holder):
# Get output
batch_dict = DtiBindingdbKdTask.process_model_output(
batch_dict,
scalars_preds_processed_key="model.out.dti_bindingdb_kd",
norm_y_mean=5.79384684128215,
norm_y_std=1.33808027428196,
)
ans = (
"model.out.dti_bindingdb_kd",
float(batch_dict["model.out.dti_bindingdb_kd"][0]),
)
return ans
def create_and_run_prompt(self,model_name,target_seq, drug_seq):
model_holder = all_models[model_name]
inputs = {
"target_seq": target_seq,
"drug_seq": drug_seq,
}
sample_dict = self.crate_sample_dict(sample_inputs=inputs, model_holder=model_holder)
prompt=sample_dict[ENCODER_INPUTS_STR]
batch_dict = self.run_model(sample_dict=sample_dict, model=model_holder.model)
res = prompt, *self.decode_output(batch_dict,model_holder=model_holder)
return res
def create_demo(self,model_name_widget):
# """
# ### Using the model from
# ```{model} ```
# """
with gr.Group() as demo:
gr.Markdown(self.markup_text)
with gr.Row():
target_textbox = gr.Textbox(
label="target sequence",
# info="standard",
interactive=True,
lines=3,
value=self.examples["target_seq"],
)
drug_textbox = gr.Textbox(
label="Drug sequance (in SMILES)",
# info="standard",
interactive=True,
lines=3,
value=self.examples["drug_seq"],
)
with gr.Row():
run_mammal = gr.Button(
"Run Mammal prompt for Protein-Protein Interaction", variant="primary"
)
with gr.Row():
prompt_box = gr.Textbox(label="Mammal prompt", lines=5)
with gr.Row():
decoded = gr.Textbox(label="Mammal output key")
run_mammal.click(
fn=self.create_and_run_prompt,
inputs=[model_name_widget, target_textbox, drug_textbox],
outputs=[prompt_box, decoded, gr.Number(label="binding affinity")],
)
demo.visible = False
return demo
tdi_task = DtiTask()
all_tasks[tdi_task.name]=tdi_task
ppi_model = MammalObjectBroker(model_path="ibm/biomed.omics.bl.sm.ma-ted-458m", task_list=[ppi_task.name])
all_models[ppi_model.name]=ppi_model
tdi_model = MammalObjectBroker(model_path="ibm/biomed.omics.bl.sm.ma-ted-458m.dti_bindingdb_pkd", task_list=[tdi_task.name])
all_models[tdi_model.name]=tdi_model
def create_application():
def task_change(value):
visibility = [gr.update(visible=(task==value)) for task in all_tasks.keys()]
# all_tasks[task].demo().visible =
choices=[model_name for model_name, model in all_models.items() if value in model.tasks]
if choices:
return (gr.update(choices=choices, value=choices[0]),*visibility)
else:
return (gr.skip,*visibility)
# return model_name_dropdown
with gr.Blocks() as application:
task_dropdown = gr.Dropdown(choices=["select demo"] + list(all_tasks.keys()))
task_dropdown.interactive = True
model_name_dropdown = gr.Dropdown(choices=[model_name for model_name, model in all_models.items() if task_dropdown.value in model.tasks], interactive=True)
ppi_demo = all_tasks[ppi_task.name].demo(model_name_widgit = model_name_dropdown)
# ppi_demo.visible = True
dtb_demo = all_tasks[tdi_task.name].demo(model_name_widgit = model_name_dropdown)
task_dropdown.change(task_change,inputs=[task_dropdown],outputs=[model_name_dropdown]+[all_tasks[task].demo() for task in all_tasks])
# def set_demo_vis(main_text):
# main_text=main_text
# print(f"main text is {main_text}")
# return gr.Group(visible=True)
# #return gr.Group(visible=(main_text == "PPI"))
# # , gr.Group( visible=(main_text == "DTI") )
# task_dropdown.change(
# set_ppi_vis, inputs=task_dropdown, outputs=[ppi_demo]
# )
return application
full_demo=None
def main():
global full_demo
full_demo = create_application()
full_demo.launch(show_error=True, share=False)
if __name__ == "__main__":
main()