Spaces:
Sleeping
Sleeping
File size: 7,675 Bytes
24c5c6a 301ba09 8aa6c67 24c5c6a 301ba09 24c5c6a 126dbe4 24c5c6a 301ba09 24c5c6a 8aa6c67 301ba09 24c5c6a 301ba09 24c5c6a 301ba09 24c5c6a 301ba09 24c5c6a 301ba09 24c5c6a 301ba09 24c5c6a 301ba09 8aa6c67 83d87f5 8aa6c67 301ba09 8aa6c67 301ba09 8aa6c67 301ba09 8aa6c67 301ba09 8aa6c67 301ba09 8aa6c67 301ba09 8aa6c67 301ba09 8aa6c67 301ba09 24c5c6a 301ba09 24c5c6a 301ba09 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 |
import torch
from torch.nn import Linear
from torch_geometric.nn import HGTConv, MLP
import pandas as pd
import yaml
import os
from datasets import load_dataset
import gdown
class ProtHGT(torch.nn.Module):
def __init__(self, data,hidden_channels, num_heads, num_layers, mlp_hidden_layers, mlp_dropout):
super().__init__()
self.lin_dict = torch.nn.ModuleDict()
for node_type in data.node_types:
input_dim = data[node_type].x.size(1) # Get actual input dimension from data
self.lin_dict[node_type] = Linear(input_dim, hidden_channels)
self.convs = torch.nn.ModuleList()
for _ in range(num_layers):
conv = HGTConv(hidden_channels, hidden_channels, data.metadata(), num_heads, group='sum')
self.convs.append(conv)
self.mlp = MLP(mlp_hidden_layers , dropout=mlp_dropout, norm=None)
def generate_embeddings(self, x_dict, edge_index_dict):
# Generate updated embeddings through the HGT layers
x_dict = {
node_type: self.lin_dict[node_type](x).relu_()
for node_type, x in x_dict.items()
}
for conv in self.convs:
x_dict = conv(x_dict, edge_index_dict)
return x_dict
def forward(self, x_dict, edge_index_dict, tr_edge_label_index, target_type, test=False):
# Get updated embeddings
x_dict = self.generate_embeddings(x_dict, edge_index_dict)
# Make predictions
row, col = tr_edge_label_index
z = torch.cat([x_dict["Protein"][row], x_dict[target_type][col]], dim=-1)
return self.mlp(z).view(-1), x_dict
def _load_data(heterodata, protein_ids, go_category=None):
"""Process the loaded heterodata for specific proteins and GO categories."""
# Get protein indices for all input proteins
protein_indices = [heterodata['Protein']['id_mapping'][pid] for pid in protein_ids]
# Create edge indices for prediction
categories = [go_category] if go_category else ['GO_term_F', 'GO_term_P', 'GO_term_C']
for category in categories:
# Create pairs for all proteins with all GO terms
n_terms = len(heterodata[category]['id_mapping'])
protein_indices_repeated = torch.tensor(protein_indices).repeat_interleave(n_terms)
term_indices = torch.arange(n_terms).repeat(len(protein_indices))
edge_index = torch.stack([protein_indices_repeated, term_indices])
heterodata.edge_index_dict[('Protein', 'protein_function', category)] = edge_index
return heterodata
def get_available_proteins(protein_list_file='data/available_proteins.txt'):
with open(protein_list_file, 'r') as file:
return [line.strip() for line in file.readlines()]
def _generate_predictions(heterodata, model, target_type):
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model.to(device)
model.eval()
heterodata = heterodata.to(device)
with torch.no_grad():
edge_label_index = heterodata.edge_index_dict[('Protein', 'protein_function', target_type)]
predictions, _ = model(heterodata.x_dict, heterodata.edge_index_dict, edge_label_index, target_type)
predictions = torch.sigmoid(predictions)
return predictions.cpu()
def _create_prediction_df(predictions, heterodata, protein_ids, go_category):
go_category_dict = {
'GO_term_F': 'Molecular Function',
'GO_term_P': 'Biological Process',
'GO_term_C': 'Cellular Component'
}
# Create a list to store individual protein predictions
all_predictions = []
# Number of GO terms for this category
n_go_terms = len(heterodata[go_category]['id_mapping'])
# Process predictions for each protein
for i, protein_id in enumerate(protein_ids):
# Get the slice of predictions for this protein
protein_predictions = predictions[i * n_go_terms:(i + 1) * n_go_terms]
prediction_df = pd.DataFrame({
'Protein': protein_id,
'GO_category': go_category_dict[go_category],
'GO_term': list(heterodata[go_category]['id_mapping'].keys()),
'Probability': protein_predictions.numpy()
})
all_predictions.append(prediction_df)
# Combine all predictions
combined_df = pd.concat(all_predictions, ignore_index=True)
combined_df.sort_values(by=['Protein', 'Probability'], ascending=[True, False], inplace=True)
combined_df.reset_index(drop=True, inplace=True)
return combined_df
def generate_prediction_df(protein_ids, model_paths, model_config_paths, go_category):
all_predictions = []
# Convert single protein ID to list if necessary
if isinstance(protein_ids, str):
protein_ids = [protein_ids]
# Load dataset once
# heterodata = load_dataset('HUBioDataLab/ProtHGT-KG', data_files="prothgt-kg.json.gz")
print('Loading data...')
file_id = "18u1o2sm8YjMo9joFw4Ilwvg0-rUU0PXK"
output = "data/prothgt-kg.pt"
url = f"https://drive.google.com/uc?id={file_id}"
print(f"Downloading file from {url}...")
try:
gdown.download(url, output, quiet=False)
print(f"File downloaded to {output}")
except Exception as e:
print(f"Error downloading file: {e}")
raise
heterodata = torch.load(output)
print(heterodata.edge_types)
# Remove unnecessary edge types
edge_types_to_remove = [
('Protein', 'protein_function', 'GO_term_F'),
('Protein', 'protein_function', 'GO_term_P'),
('Protein', 'protein_function', 'GO_term_C'),
('GO_term_F', 'rev_protein_function', 'Protein'),
('GO_term_P', 'rev_protein_function', 'Protein'),
('GO_term_C', 'rev_protein_function', 'Protein')
]
for edge_type in edge_types_to_remove:
if edge_type in heterodata.edge_index_dict:
del heterodata.edge_index_dict[edge_type]
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
for go_cat, model_config_path, model_path in zip(go_category, model_config_paths, model_paths):
print(f'Generating predictions for {go_cat}...')
# Process data for current GO category
processed_data = _load_data(heterodata, protein_ids, go_cat)
# Load model config
with open(model_config_path, 'r') as file:
model_config = yaml.safe_load(file)
# Initialize model with configuration
model = ProtHGT(
processed_data,
hidden_channels=model_config['hidden_channels'][0],
num_heads=model_config['num_heads'],
num_layers=model_config['num_layers'],
mlp_hidden_layers=model_config['hidden_channels'][1],
mlp_dropout=model_config['mlp_dropout']
)
# Load model weights
model.load_state_dict(torch.load(model_path, map_location=device))
print(f'Loaded model weights from {model_path}')
# Generate predictions
predictions = _generate_predictions(processed_data, model, go_cat)
prediction_df = _create_prediction_df(predictions, processed_data, protein_ids, go_cat)
all_predictions.append(prediction_df)
# Clean up memory
del processed_data
del model
del predictions
torch.cuda.empty_cache() # Clear CUDA cache if using GPU
del heterodata
# Combine all predictions
final_df = pd.concat(all_predictions, ignore_index=True)
# Clean up
del all_predictions
torch.cuda.empty_cache()
return final_df
|