|
import os
|
|
import random
|
|
import argparse
|
|
import pandas as pd
|
|
import multiprocessing as mp
|
|
from foldseek_util import get_struc_seq
|
|
|
|
|
|
def parse_args():
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument(
|
|
"--file_path",
|
|
type=str,
|
|
required=True,
|
|
help="Path to the file containing uniprotid information.",
|
|
)
|
|
parser.add_argument(
|
|
"--sheet_name",
|
|
type=str,
|
|
default="Sheet1",
|
|
help="Name of the sheet to read (for Excel files). Default is 'Sheet1'.",
|
|
)
|
|
parser.add_argument(
|
|
"--pdb_dir",
|
|
type=str,
|
|
default="pdb_files/UP000000589_10090_MOUSE_v4",
|
|
help="Directory containing PDB files.",
|
|
)
|
|
parser.add_argument(
|
|
"--uniprotid_column",
|
|
type=str,
|
|
help="Name of the column containing UniprotID information.",
|
|
)
|
|
parser.add_argument(
|
|
"--uniprotids_column",
|
|
type=str,
|
|
help="Name of the column containing multiple UniprotIDs (separated by semicolons). The first ID will be used.",
|
|
)
|
|
parser.add_argument(
|
|
"--num_processes",
|
|
type=int,
|
|
default=2,
|
|
help="Number of processes to use for multiprocessing. Default is 2.",
|
|
)
|
|
return parser.parse_args()
|
|
|
|
|
|
def validate_columns(cfg, df):
|
|
if cfg.uniprotid_column is None and cfg.uniprotids_column is None:
|
|
raise ValueError("Either --uniprotid_column or --uniprotids_column must be provided.")
|
|
if cfg.uniprotids_column:
|
|
df = df.dropna(subset=[cfg.uniprotids_column]).reset_index(drop=True)
|
|
df["uniprotid"] = df[cfg.uniprotids_column].apply(lambda x: x.split(";")[0].split("-")[0])
|
|
cfg.uniprotid_column = "uniprotid"
|
|
return df.dropna(subset=[cfg.uniprotid_column]).reset_index(drop=True)
|
|
|
|
|
|
def find_pdb_files(pdb_dir, uniprot_ids):
|
|
pdf_files = os.listdir(pdb_dir)
|
|
pdb_paths = []
|
|
for uniprot_id in uniprot_ids:
|
|
matches = [pdf_file for pdf_file in sorted(pdf_files) if uniprot_id in pdf_file]
|
|
pdb_paths.append(matches[0] if matches else None)
|
|
return pdb_paths
|
|
|
|
|
|
def get_foldseek_seq(pdb_path, cfg):
|
|
parsed_seqs = get_struc_seq(
|
|
"bin/foldseek",
|
|
os.path.join(cfg.pdb_dir, pdb_path),
|
|
["A"],
|
|
process_id=random.randint(0, 10000000),
|
|
)["A"]
|
|
return parsed_seqs
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
|
config = parse_args()
|
|
|
|
if config.file_path.endswith(".xls") or config.file_path.endswith(".xlsx"):
|
|
df = pd.read_excel(
|
|
config.file_path,
|
|
sheet_name=config.sheet_name,
|
|
)
|
|
else:
|
|
df = pd.read_csv(config.file_path)
|
|
df = validate_columns(config, df)
|
|
|
|
df = df.dropna(subset=[config.uniprotid_column]).reset_index(drop=True)
|
|
|
|
uniprot_ids = df[config.uniprotid_column].tolist()
|
|
pdb_paths = find_pdb_files(config.pdb_dir, uniprot_ids)
|
|
df["pdb_path"] = pdb_paths
|
|
df = df.dropna(subset=["pdb_path"]).reset_index(drop=True)
|
|
df = df.drop_duplicates(subset=[config.uniprotid_column]).reset_index(drop=True)
|
|
|
|
with mp.Pool(config.num_processes) as pool:
|
|
output = pool.map(lambda x: get_foldseek_seq(x, config), df["pdb_path"].tolist())
|
|
|
|
aa, foldseek, aa_foldseek = zip(*output)
|
|
|
|
df["aa"] = aa
|
|
df["foldseek"] = foldseek
|
|
df["aa_foldseek"] = aa_foldseek
|
|
df.to_csv(f"{config.file_path.split('.')[0]}_foldseek.csv", index=False)
|
|
|