import pandas as pd import numpy as np from utils import mol_to_geognn_graph_data_MMFF3d as smiles2adjoin import tensorflow as tf str2num = {'':0 ,'H': 1, 'C': 2, 'N': 3, 'O': 4, 'F': 5, 'S': 6, 'Cl': 7, 'P': 8, 'Br': 9, 'B': 10,'I': 11,'Si':12,'Se':13,'':14,'':15,'':16} num2str = {i:j for j,i in str2num.items()} class Graph_Bert_Dataset(object): def __init__(self,path,smiles_field=['0'], adj=['1'],addH=True): if path.endswith('.txt') or path.endswith('.tsv'): self.df = pd.read_csv(path,sep='\n\t') else: self.df = pd.read_csv(path) self.smiles_field = smiles_field self.adj = adj self.vocab = str2num self.devocab = num2str self.addH = addH def get_data(self): data = self.df train_idx = [] idx = data.sample(frac=0.9).index train_idx.extend(idx) data1 = data[data.index.isin(train_idx)] data2 = data[~data.index.isin(train_idx)] self.dataset1 = tf.data.Dataset.from_tensor_slices((data1[self.smiles_field],data1[self.adj])) self.dataset1 = self.dataset1.map(self.tf_numerical_smiles).padded_batch(256, padded_shapes=( tf.TensorShape([None]),tf.TensorShape([None,None]), tf.TensorShape([None]) ,tf.TensorShape([None]))).prefetch(50) self.dataset2 = tf.data.Dataset.from_tensor_slices((data2[self.smiles_field],data2[self.adj])) self.dataset2 = self.dataset2.map(self.tf_numerical_smiles).padded_batch(512, padded_shapes=( tf.TensorShape([None]), tf.TensorShape([None, None]), tf.TensorShape([None]), tf.TensorShape([None]))).prefetch(50) return self.dataset1, self.dataset2 def numerical_smiles(self, atom, adj): #smiles = smiles.numpy().decode() atom = np.array(atom) atom = atom[0].decode() atom = atom.replace('\n','') atom = atom.replace('[',' ') atom = atom.replace(']',' ') atom = atom.split("'") atoms_list = [] for i in atom: if i not in [' ']: atoms_list.append(i) adj = np.array(adj)[0].decode() adjoin_matrix =np.load( adj ) atoms_list = [''] + atoms_list nums_list = [str2num.get(i,str2num['']) for i in atoms_list] temp = np.ones((len(nums_list),len(nums_list))) temp[1:,1:] = adjoin_matrix temp[np.where(temp == 0)]=-1e9 adjoin_matrix = temp #adjoin_matrix = (1 - temp) * (-1e9) choices = np.random.permutation(len(nums_list)-1)[:max(int(len(nums_list)*0.15),1)] + 1 y = np.array(nums_list).astype('int64') weight = np.zeros(len(nums_list)) for i in choices: rand = np.random.rand() weight[i] = 1 if rand < 0.8: nums_list[i] = str2num[''] elif rand < 0.9: nums_list[i] = int(np.random.rand() * 14 + 1) x = np.array(nums_list).astype('int64') weight = weight.astype('float32') return x, adjoin_matrix, y, weight def tf_numerical_smiles(self, atom,adj): #print(data) # x,adjoin_matrix,y,weight = tf.py_function(self.balanced_numerical_smiles, # [data], [tf.int64, tf.float32 ,tf.int64,tf.float32]) x, adjoin_matrix, y, weight = tf.py_function(self.numerical_smiles, (atom, adj), [tf.int64, tf.float32, tf.int64, tf.float32]) x.set_shape([None]) adjoin_matrix.set_shape([None,None]) y.set_shape([None]) weight.set_shape([None]) return x, adjoin_matrix, y, weight class Graph_Regression_Dataset_test(object): def __init__(self,path,smiles_field='SMILES',label_field='PCE',normalize=False,max_len=1000,addH=True): if path.endswith('.txt') or path.endswith('.tsv'): self.df = pd.read_csv(path.format('test'),sep='\t') else: self.df = pd.read_csv(path.format('test')) self.smiles_field = smiles_field self.label_field = label_field self.vocab = str2num self.devocab = num2str self.df = self.df[self.df[smiles_field].str.len()<=max_len] self.addH = addH if normalize: self.max = self.df[self.label_field].max() self.min = self.df[self.label_field].min() self.df[self.label_field] = (self.df[self.label_field]-self.min)/(self.max-self.min)-0.5 self.value_range = self.max-self.min def get_data(self): train_data = self.df self.dataset1 = tf.data.Dataset.from_tensor_slices((train_data[self.smiles_field], train_data[self.label_field])) self.dataset1 = self.dataset1.map(self.tf_numerical_smiles).padded_batch(64, padded_shapes=( tf.TensorShape([None]), tf.TensorShape([None,None]),tf.TensorShape([1]))) return self.dataset1 def numerical_smiles(self, smiles,label): smiles = smiles.numpy().decode() atoms_list, adjoin_matrix = smiles2adjoins(smiles) atoms_list = list(atoms_list) atoms_list = [''] + atoms_list nums_list = [str2num.get(i,str2num['']) for i in atoms_list] temp = np.ones((len(nums_list),len(nums_list))) temp[1:,1:] = adjoin_matrix temp[np.where(temp == 0)]=-1e9 adjoin_matrix = temp x = np.array(nums_list).astype('int64') y = np.array([label]).astype('float32') return x, adjoin_matrix,y def tf_numerical_smiles(self, smiles,label): x,adjoin_matrix,y = tf.py_function(self.numerical_smiles, [smiles,label], [tf.int64, tf.float32 ,tf.float32]) x.set_shape([None]) adjoin_matrix.set_shape([None,None]) y.set_shape([None]) return x, adjoin_matrix , y class predict_smiles(object): def __init__(self,smiles ,normalize=False,max_len=1000,addH=True): self.smiles_field = smiles self.label_field = float(0) self.vocab = str2num self.devocab = num2str #self.df = self.df[self.df[smiles_field].str.len()<=max_len] self.addH = addH if normalize: self.max = self.df[self.label_field].max() self.min = self.df[self.label_field].min() self.df[self.label_field] = (self.df[self.label_field]-self.min)/(self.max-self.min)-0.5 self.value_range = self.max-self.min def numerical_smiles(self, atoms_list,adj,label): atom = np.array(atoms_list) atoms_list = [] for i in atom: if i not in [' ']: atoms_list.append(str(i,encoding='utf-8')) label = np.array(label) adj = np.array(adj) adjoin_matrix =adj atoms_list = [''] + atoms_list nums_list = [str2num.get(i,str2num['']) for i in atoms_list] #temp = np.ones((len(nums_list),len(nums_list))) #temp[1:, 1:] = adjoin_matrix #adjoin_matrix = (1-temp)*(-1e9) temp = np.ones((len(nums_list),len(nums_list))) temp[1:,1:] = adjoin_matrix temp[np.where(temp == 0)]=-1e9 adjoin_matrix = temp x = np.array(nums_list).astype('int64') y = np.array([label]).astype('float32') return x, adjoin_matrix,y def get_data(self): atom, adj = smiles2adjoin( self.smiles_field) atom = np.array(atom) atoms_list = [] for i in atom: if i not in [' ']: atoms_list.append(i) adj = np.array(adj) adjoin_matrix = adj self.dataset1 = tf.data.Dataset.from_tensors((atoms_list, adjoin_matrix, self.label_field)) self.dataset1 = self.dataset1.map(self.tf_numerical_smiles).cache().padded_batch(1, padded_shapes=( tf.TensorShape([None]), tf.TensorShape([None,None]),tf.TensorShape([1]))) return self.dataset1 def tf_numerical_smiles(self, atoms_list,adj,label): x,adjoin_matrix,y = tf.py_function(self.numerical_smiles, (atoms_list,adj,label), [tf.int64, tf.float32 ,tf.float32]) x.set_shape([None]) adjoin_matrix.set_shape([None,None]) y.set_shape([None]) return x, adjoin_matrix , y class Graph_Regression_test(object): def __init__(self,path,smiles_field=['0'],adj = ['1'], label_field=['2'],normalize=False,max_len=1000,addH=True): if path.endswith('.txt') or path.endswith('.tsv'): # self.df = pd.read_csv(path.format('train3'),sep='\t') #self.dt = pd.read_csv(path.format('test3'),sep='\t') self.dv = pd.read_csv(path.format('val3'),sep='\t') else: #self.df = pd.read_csv(path.format('train/train')) #self.dt = pd.read_csv(path.format('test/test')) self.dv = pd.read_csv(path.format('val/val')) self.smiles_field = smiles_field self.adj = adj self.label_field = label_field self.vocab = str2num self.devocab = num2str #self.df = self.df[self.df[smiles_field].str.len()<=max_len] self.addH = addH if normalize: self.max = self.df[self.label_field].max() self.min = self.df[self.label_field].min() self.df[self.label_field] = (self.df[self.label_field]-self.min)/(self.max-self.min)-0.5 self.value_range = self.max-self.min def get_data(self): train_data = self.dv #idx = train_data.sample(frac=0.9).index # train_idx = [] # #idx = train_data.sample(frac=0.9).index # train_idx.extend(idx) # data1 = train_data[train_data.index.isin(train_idx)] # data2 = train_data[~train_data.index.isin(train_idx)] self.dataset1 = tf.data.Dataset.from_tensor_slices((train_data[self.smiles_field],train_data[self.adj], train_data[self.label_field])) self.dataset1 = self.dataset1.map(self.tf_numerical_smiles).cache().padded_batch(64, padded_shapes=( tf.TensorShape([None]), tf.TensorShape([None,None]),tf.TensorShape([1]))).prefetch(100) return self.dataset1 def numerical_smiles(self, atom,adj,label): atom = np.array(atom) atom = atom[0].decode() atom = atom.replace('\n','') atom = atom.replace('[',' ') atom = atom.replace(']',' ') atom = atom.split("'") atoms_list = [] for i in atom: if i not in [' ']: atoms_list.append(i) label = np.array(label)[0] adj = np.array(adj)[0].decode() adjoin_matrix =np.load( adj ) atoms_list = [''] + atoms_list nums_list = [str2num.get(i,str2num['']) for i in atoms_list] #temp = np.ones((len(nums_list),len(nums_list))) #temp[1:, 1:] = adjoin_matrix #adjoin_matrix = (1-temp)*(-1e9) temp = np.ones((len(nums_list),len(nums_list))) temp[1:,1:] = adjoin_matrix temp[np.where(temp == 0)]=-1e9 adjoin_matrix = temp x = np.array(nums_list).astype('int64') y = np.array([label]).astype('float32') return x, adjoin_matrix,y def tf_numerical_smiles(self, smiles,adj,label): x,adjoin_matrix,y = tf.py_function(self.numerical_smiles, (smiles,adj,label), [tf.int64, tf.float32 ,tf.float32]) x.set_shape([None]) adjoin_matrix.set_shape([None,None]) y.set_shape([None]) return x, adjoin_matrix , y class Graph_Regression(object): def __init__(self,path,smiles_field=['0'],adj = ['1'], label_field=['2'],normalize=False,max_len=1000,addH=True): if path.endswith('.txt') or path.endswith('.tsv'): self.df = pd.read_csv(path.format('train3'),sep='\t') self.dt = pd.read_csv(path.format('test3'),sep='\t') #self.dv = pd.read_csv(path.format('val3'),sep='\t') else: self.df = pd.read_csv(path.format('train/train')) self.dt = pd.read_csv(path.format('test/test')) #self.dv = pd.read_csv(path.format('val3')) self.smiles_field = smiles_field self.adj = adj self.label_field = label_field self.vocab = str2num self.devocab = num2str #self.df = self.df[self.df[smiles_field].str.len()<=max_len] self.addH = addH if normalize: self.max = self.df[self.label_field].max() self.min = self.df[self.label_field].min() self.df[self.label_field] = (self.df[self.label_field]-self.min)/(self.max-self.min)-0.5 self.value_range = self.max-self.min def get_data(self): train_data = self.df test_data = self.dt data2=test_data #idx = train_data.sample(frac=0.9).index # train_idx = [] # #idx = train_data.sample(frac=0.9).index # train_idx.extend(idx) # data1 = train_data[train_data.index.isin(train_idx)] # data2 = train_data[~train_data.index.isin(train_idx)] self.dataset1 = tf.data.Dataset.from_tensor_slices((train_data[self.smiles_field],train_data[self.adj], train_data[self.label_field])) self.dataset1 = self.dataset1.map(self.tf_numerical_smiles).cache().padded_batch(64, padded_shapes=( tf.TensorShape([None]), tf.TensorShape([None,None]),tf.TensorShape([1]))).prefetch(100) self.dataset2 = tf.data.Dataset.from_tensor_slices((test_data[self.smiles_field], test_data[self.adj],test_data[self.label_field])) self.dataset2 = self.dataset2.map(self.tf_numerical_smiles).padded_batch(64, padded_shapes=( tf.TensorShape([None]),tf.TensorShape([None,None]), tf.TensorShape([1]))).cache().prefetch(100) self.dataset3 = tf.data.Dataset.from_tensor_slices((data2[self.smiles_field],test_data[self.adj], data2[self.label_field])) self.dataset3 = self.dataset3.map(self.tf_numerical_smiles).padded_batch(64, padded_shapes=( tf.TensorShape([None]), tf.TensorShape([None, None]), tf.TensorShape([1]))).cache().prefetch(100) return self.dataset1,self.dataset2,self.dataset3 def numerical_smiles(self, atom,adj,label): atom = np.array(atom) atom = atom[0].decode() atom = atom.replace('\n','') atom = atom.replace('[',' ') atom = atom.replace(']',' ') atom = atom.split("'") atoms_list = [] for i in atom: if i not in [' ']: atoms_list.append(i) label = np.array(label)[0] adj = np.array(adj)[0].decode() adjoin_matrix =np.load( adj ) atoms_list = [''] + atoms_list nums_list = [str2num.get(i,str2num['']) for i in atoms_list] #temp = np.ones((len(nums_list),len(nums_list))) #temp[1:, 1:] = adjoin_matrix #adjoin_matrix = (1-temp)*(-1e9) temp = np.ones((len(nums_list),len(nums_list))) temp[1:,1:] = adjoin_matrix temp[np.where(temp == 0)]=-1e9 adjoin_matrix = temp x = np.array(nums_list).astype('int64') y = np.array([label]).astype('float32') return x, adjoin_matrix,y def tf_numerical_smiles(self, smiles,adj,label): x,adjoin_matrix,y = tf.py_function(self.numerical_smiles, (smiles,adj,label), [tf.int64, tf.float32 ,tf.float32]) x.set_shape([None]) adjoin_matrix.set_shape([None,None]) y.set_shape([None]) return x, adjoin_matrix , y class Inference_Dataset(object): def __init__(self,path,smiles_field='Smiles',addH=True): if path.endswith('.txt') or path.endswith('.tsv'): self.df = pd.read_csv(path,sep='\t') else: self.df = pd.read_csv(path) self.smiles_field = smiles_field self.vocab = str2num self.devocab = num2str self.addH = addH def get_data(self): data = self.df train_idx = [] idx = data.sample(frac=0.9).index train_idx.extend(idx) data1 = data[data.index.isin(train_idx)] data2 = data[~data.index.isin(train_idx)] print(len(data1)) self.dataset1 = tf.data.Dataset.from_tensor_slices(data1[self.smiles_field].tolist()) self.dataset1 = self.dataset1.map(self.tf_numerical_smiles).padded_batch(1, padded_shapes=( tf.TensorShape([None]),tf.TensorShape([None,None]), tf.TensorShape([None]) ,tf.TensorShape([None]))).prefetch(50) print(self.dataset1) self.dataset2 = tf.data.Dataset.from_tensor_slices(data2[self.smiles_field].tolist()) self.dataset2 = self.dataset2.map(self.tf_numerical_smiles).padded_batch(1, padded_shapes=( tf.TensorShape([None]), tf.TensorShape([None, None]), tf.TensorShape([None]), tf.TensorShape([None]))).prefetch(50) return self.dataset1, self.dataset2 def numerical_smiles(self, smiles): smiles = smiles.numpy().decode() atoms_list, adjoin_matrix = smiles2adjoins(smiles,explicit_hydrogens=self.addH) print(atoms_list) atoms_list = [''] + atoms_list nums_list = [str2num.get(i,str2num['']) for i in atoms_list] temp = np.ones((len(nums_list),len(nums_list))) temp[1:,1:] = adjoin_matrix temp[np.where(temp == 0)]=-1e9 adjoin_matrix = temp choices = np.random.permutation(len(nums_list)-1)[:max(int(len(nums_list)*0.15),1)] + 1 y = np.array(nums_list).astype('int64') x = np.array(nums_list).astype('int64') return x, adjoin_matrix, [smiles],atoms_list def tf_numerical_smiles(self, data): # x,adjoin_matrix,y,weight = tf.py_function(self.balanced_numerical_smiles, # [data], [tf.int64, tf.float32 ,tf.int64,tf.float32]) x, adjoin_matrix, y, weight = tf.py_function(self.numerical_smiles, [data], [tf.int64, tf.float32, tf.int64, tf.float32]) smiles.set_shape([1]) atom_list.set_shape([None]) x.set_shape([None]) adjoin_matrix.set_shape([None,None]) y.set_shape([None]) weight.set_shape([None]) return x, adjoin_matrix,smiles,atom_list class Inference_Dataset(object): def __init__(self,sml_list,max_len=1000,addH=True): self.vocab = str2num self.devocab = num2str self.sml_list = [i for i in sml_list if len(i)'] + atoms_list nums_list = [str2num.get(i,str2num['']) for i in atoms_list] temp = np.ones((len(nums_list),len(nums_list))) temp[1:,1:] = adjoin_matrix adjoin_matrix = (1-temp)*(-1e9) x = np.array(nums_list).astype('int64') return x, adjoin_matrix,[smiles], atoms_list def tf_numerical_smiles(self, smiles): x,adjoin_matrix,smiles,atom_list = tf.py_function(self.numerical_smiles, [smiles], [tf.int64, tf.float32,tf.string, tf.string]) x.set_shape([None]) adjoin_matrix.set_shape([None,None]) smiles.set_shape([1]) atom_list.set_shape([None]) return x, adjoin_matrix,smiles,atom_list