from keras import Model from keras.layers import Input from keras.layers import Multiply from keras.layers import Dense, Dropout, Activation, Flatten from keras.layers import Convolution1D, AveragePooling1D import pandas as pd import numpy as np import keras import requests from functools import reduce from operator import add from Bio.SeqRecord import SeqRecord from Bio.SeqFeature import SeqFeature, FeatureLocation from Bio.Seq import Seq from Bio import SeqIO ntmap = {'A': (1, 0, 0, 0), 'C': (0, 1, 0, 0), 'G': (0, 0, 1, 0), 'T': (0, 0, 0, 1) } def get_seqcode(seq): return np.array(reduce(add, map(lambda c: ntmap[c], seq.upper()))).reshape((1, len(seq), -1)) def Seq_DeepCpf1_model(input_shape): Seq_deepCpf1_Input_SEQ = Input(shape=input_shape) Seq_deepCpf1_C1 = Convolution1D(80, 5, activation='relu')(Seq_deepCpf1_Input_SEQ) Seq_deepCpf1_P1 = AveragePooling1D(2)(Seq_deepCpf1_C1) Seq_deepCpf1_F = Flatten()(Seq_deepCpf1_P1) Seq_deepCpf1_DO1 = Dropout(0.3)(Seq_deepCpf1_F) Seq_deepCpf1_D1 = Dense(80, activation='relu')(Seq_deepCpf1_DO1) Seq_deepCpf1_DO2 = Dropout(0.3)(Seq_deepCpf1_D1) Seq_deepCpf1_D2 = Dense(40, activation='relu')(Seq_deepCpf1_DO2) Seq_deepCpf1_DO3 = Dropout(0.3)(Seq_deepCpf1_D2) Seq_deepCpf1_D3 = Dense(40, activation='relu')(Seq_deepCpf1_DO3) Seq_deepCpf1_DO4 = Dropout(0.3)(Seq_deepCpf1_D3) Seq_deepCpf1_Output = Dense(1, activation='linear')(Seq_deepCpf1_DO4) Seq_deepCpf1 = Model(inputs=[Seq_deepCpf1_Input_SEQ], outputs=[Seq_deepCpf1_Output]) return Seq_deepCpf1 # seq-ca model (DeepCpf1) def DeepCpf1_model(input_shape): DeepCpf1_Input_SEQ = Input(shape=input_shape) DeepCpf1_C1 = Convolution1D(80, 5, activation='relu')(DeepCpf1_Input_SEQ) DeepCpf1_P1 = AveragePooling1D(2)(DeepCpf1_C1) DeepCpf1_F = Flatten()(DeepCpf1_P1) DeepCpf1_DO1 = Dropout(0.3)(DeepCpf1_F) DeepCpf1_D1 = Dense(80, activation='relu')(DeepCpf1_DO1) DeepCpf1_DO2 = Dropout(0.3)(DeepCpf1_D1) DeepCpf1_D2 = Dense(40, activation='relu')(DeepCpf1_DO2) DeepCpf1_DO3 = Dropout(0.3)(DeepCpf1_D2) DeepCpf1_D3_SEQ = Dense(40, activation='relu')(DeepCpf1_DO3) DeepCpf1_Input_CA = Input(shape=(1,)) DeepCpf1_D3_CA = Dense(40, activation='relu')(DeepCpf1_Input_CA) DeepCpf1_M = Multiply()([DeepCpf1_D3_SEQ, DeepCpf1_D3_CA]) DeepCpf1_DO4 = Dropout(0.3)(DeepCpf1_M) DeepCpf1_Output = Dense(1, activation='linear')(DeepCpf1_DO4) DeepCpf1 = Model(inputs=[DeepCpf1_Input_SEQ, DeepCpf1_Input_CA], outputs=[DeepCpf1_Output]) return DeepCpf1 def fetch_ensembl_transcripts(gene_symbol): url = f"https://rest.ensembl.org/lookup/symbol/homo_sapiens/{gene_symbol}?expand=1;content-type=application/json" response = requests.get(url) if response.status_code == 200: gene_data = response.json() if 'Transcript' in gene_data: return gene_data['Transcript'] else: print("No transcripts found for gene:", gene_symbol) return None else: print(f"Error fetching gene data from Ensembl: {response.text}") return None def fetch_ensembl_sequence(transcript_id): url = f"https://rest.ensembl.org/sequence/id/{transcript_id}?content-type=application/json" response = requests.get(url) if response.status_code == 200: sequence_data = response.json() if 'seq' in sequence_data: return sequence_data['seq'] else: print("No sequence found for transcript:", transcript_id) return None else: print(f"Error fetching sequence data from Ensembl: {response.text}") return None def find_crispr_targets(sequence, chr, start, end, strand, transcript_id, exon_id, pam="TTTN", target_length=34): targets = [] len_sequence = len(sequence) complement = {'A': 'T', 'T': 'A', 'C': 'G', 'G': 'C'} dnatorna = {'A': 'A', 'T': 'U', 'C': 'C', 'G': 'G'} for i in range(len_sequence - target_length + 1): target_seq = sequence[i:i + target_length] if target_seq[4:7] == 'TTT': if strand == -1: tar_start = end - i - target_length + 1 tar_end = end -i #seq_in_ref = ''.join([complement[base] for base in target_seq])[::-1] else: tar_start = start + i tar_end = start + i + target_length - 1 #seq_in_ref = target_seq gRNA = ''.join([dnatorna[base] for base in target_seq[8:28]]) targets.append([target_seq, gRNA, chr, str(tar_start), str(tar_end), str(strand), transcript_id, exon_id]) return targets def format_prediction_output(targets, model_path): # Loading weights for the model Seq_deepCpf1 = Seq_DeepCpf1_model(input_shape=(34, 4)) Seq_deepCpf1.load_weights(model_path) formatted_data = [] for target in targets: # Predict encoded_seq = get_seqcode(target[0]) prediction = float(list(Seq_deepCpf1.predict(encoded_seq)[0])[0]) if prediction > 100: prediction = 100 # Format output gRNA = target[1] chr = target[2] start = target[3] end = target[4] strand = target[5] transcript_id = target[6] exon_id = target[7] formatted_data.append([chr, start, end, strand, transcript_id, exon_id, target[0], gRNA, prediction]) return formatted_data def process_gene(gene_symbol, model_path): transcripts = fetch_ensembl_transcripts(gene_symbol) results = [] all_exons = [] all_gene_sequences = [] if transcripts: for i in range(len(transcripts)): Exons = transcripts[i]['Exon'] all_exons.append(Exons) transcript_id = transcripts[i]['id'] for j in range(len(Exons)): exon_id = Exons[j]['id'] gene_sequence = fetch_ensembl_sequence(exon_id) if gene_sequence: all_gene_sequences.append(gene_sequence) start = Exons[j]['start'] end = Exons[j]['end'] strand = Exons[j]['strand'] chr = Exons[j]['seq_region_name'] targets = find_crispr_targets(gene_sequence, chr, start, end, strand, transcript_id, exon_id) if targets: formatted_data = format_prediction_output(targets, model_path) results.append(formatted_data) # for data in formatted_data: # print(f"Chr: {data[0]}, Start: {data[1]}, End: {data[2]}, Strand: {data[3]}, target: {data[4]}, gRNA: {data[5]}, pred_Score: {data[6]}") else: print("Failed to retrieve gene sequence.") else: print("Failed to retrieve transcripts.") return results, all_gene_sequences, all_exons # def create_genbank_features(formatted_data): # features = [] # for data in formatted_data: # try: # # Attempt to convert start and end positions to integers # start = int(data[1]) # end = int(data[2]) # except ValueError as e: # # Log the error and skip this iteration if conversion fails # print(f"Error converting start/end to int: {data[1]}, {data[2]} - {e}") # continue # Skip this iteration # # # Proceed as normal if conversion is successful # strand = 1 if data[3] == '+' else -1 # location = FeatureLocation(start=start, end=end, strand=strand) # feature = SeqFeature(location=location, type="misc_feature", qualifiers={ # 'label': data[5], # gRNA as label # 'note': f"Prediction: {data[6]}" # Prediction score in note # }) # features.append(feature) # return features # # def generate_genbank_file_from_data(formatted_data, gene_sequence, gene_symbol, output_path): # features = create_genbank_features(formatted_data) # record = SeqRecord(Seq(gene_sequence), id=gene_symbol, name=gene_symbol, # description='CRISPR Cas12 predicted targets', features=features) # record.annotations["molecule_type"] = "DNA" # SeqIO.write(record, output_path, "genbank") # # def create_csv_from_df(df, output_path): # df.to_csv(output_path, index=False) # # def generate_bed_file_from_data(formatted_data, output_path): # with open(output_path, 'w') as bed_file: # for data in formatted_data: # try: # # Ensure data has the expected number of elements # if len(data) < 7: # raise ValueError("Incomplete data item") # # chrom = data[0] # start = data[1] # end = data[2] # strand = '+' if data[3] == '+' else '-' # gRNA = data[5] # score = data[6] # Ensure this index exists # # bed_file.write(f"{chrom}\t{start}\t{end}\t{gRNA}\t{score}\t{strand}\n") # except ValueError as e: # print(f"Skipping an item due to error: {e}") # continue