DeepAcceptor / dataset.py
jinysun's picture
Upload 2 files
fc06566
import pandas as pd
import numpy as np
from utils import mol_to_geognn_graph_data_MMFF3d as smiles2adjoin
import tensorflow as tf
str2num = {'<pad>':0 ,'H': 1, 'C': 2, 'N': 3, 'O': 4, 'F': 5, 'S': 6, 'Cl': 7, 'P': 8, 'Br': 9,
'B': 10,'I': 11,'Si':12,'Se':13,'<unk>':14,'<mask>':15,'<global>':16}
num2str = {i:j for j,i in str2num.items()}
class Graph_Bert_Dataset(object):
def __init__(self,path,smiles_field=['0'], adj=['1'],addH=True):
if path.endswith('.txt') or path.endswith('.tsv'):
self.df = pd.read_csv(path,sep='\n\t')
else:
self.df = pd.read_csv(path)
self.smiles_field = smiles_field
self.adj = adj
self.vocab = str2num
self.devocab = num2str
self.addH = addH
def get_data(self):
data = self.df
train_idx = []
idx = data.sample(frac=0.9).index
train_idx.extend(idx)
data1 = data[data.index.isin(train_idx)]
data2 = data[~data.index.isin(train_idx)]
self.dataset1 = tf.data.Dataset.from_tensor_slices((data1[self.smiles_field],data1[self.adj]))
self.dataset1 = self.dataset1.map(self.tf_numerical_smiles).padded_batch(256, padded_shapes=(
tf.TensorShape([None]),tf.TensorShape([None,None]), tf.TensorShape([None]) ,tf.TensorShape([None]))).prefetch(50)
self.dataset2 = tf.data.Dataset.from_tensor_slices((data2[self.smiles_field],data2[self.adj]))
self.dataset2 = self.dataset2.map(self.tf_numerical_smiles).padded_batch(512, padded_shapes=(
tf.TensorShape([None]), tf.TensorShape([None, None]), tf.TensorShape([None]),
tf.TensorShape([None]))).prefetch(50)
return self.dataset1, self.dataset2
def numerical_smiles(self, atom, adj):
#smiles = smiles.numpy().decode()
atom = np.array(atom)
atom = atom[0].decode()
atom = atom.replace('\n','')
atom = atom.replace('[',' ')
atom = atom.replace(']',' ')
atom = atom.split("'")
atoms_list = []
for i in atom:
if i not in [' ']:
atoms_list.append(i)
adj = np.array(adj)[0].decode()
adjoin_matrix =np.load( adj )
atoms_list = ['<global>'] + atoms_list
nums_list = [str2num.get(i,str2num['<unk>']) for i in atoms_list]
temp = np.ones((len(nums_list),len(nums_list)))
temp[1:,1:] = adjoin_matrix
temp[np.where(temp == 0)]=-1e9
adjoin_matrix = temp
#adjoin_matrix = (1 - temp) * (-1e9)
choices = np.random.permutation(len(nums_list)-1)[:max(int(len(nums_list)*0.15),1)] + 1
y = np.array(nums_list).astype('int64')
weight = np.zeros(len(nums_list))
for i in choices:
rand = np.random.rand()
weight[i] = 1
if rand < 0.8:
nums_list[i] = str2num['<mask>']
elif rand < 0.9:
nums_list[i] = int(np.random.rand() * 14 + 1)
x = np.array(nums_list).astype('int64')
weight = weight.astype('float32')
return x, adjoin_matrix, y, weight
def tf_numerical_smiles(self, atom,adj):
#print(data)
# x,adjoin_matrix,y,weight = tf.py_function(self.balanced_numerical_smiles,
# [data], [tf.int64, tf.float32 ,tf.int64,tf.float32])
x, adjoin_matrix, y, weight = tf.py_function(self.numerical_smiles, (atom, adj),
[tf.int64, tf.float32, tf.int64, tf.float32])
x.set_shape([None])
adjoin_matrix.set_shape([None,None])
y.set_shape([None])
weight.set_shape([None])
return x, adjoin_matrix, y, weight
class Graph_Regression_Dataset_test(object):
def __init__(self,path,smiles_field='SMILES',label_field='PCE',normalize=False,max_len=1000,addH=True):
if path.endswith('.txt') or path.endswith('.tsv'):
self.df = pd.read_csv(path.format('test'),sep='\t')
else:
self.df = pd.read_csv(path.format('test'))
self.smiles_field = smiles_field
self.label_field = label_field
self.vocab = str2num
self.devocab = num2str
self.df = self.df[self.df[smiles_field].str.len()<=max_len]
self.addH = addH
if normalize:
self.max = self.df[self.label_field].max()
self.min = self.df[self.label_field].min()
self.df[self.label_field] = (self.df[self.label_field]-self.min)/(self.max-self.min)-0.5
self.value_range = self.max-self.min
def get_data(self):
train_data = self.df
self.dataset1 = tf.data.Dataset.from_tensor_slices((train_data[self.smiles_field], train_data[self.label_field]))
self.dataset1 = self.dataset1.map(self.tf_numerical_smiles).padded_batch(64, padded_shapes=(
tf.TensorShape([None]), tf.TensorShape([None,None]),tf.TensorShape([1])))
return self.dataset1
def numerical_smiles(self, smiles,label):
smiles = smiles.numpy().decode()
atoms_list, adjoin_matrix = smiles2adjoins(smiles)
atoms_list = list(atoms_list)
atoms_list = ['<global>'] + atoms_list
nums_list = [str2num.get(i,str2num['<unk>']) for i in atoms_list]
temp = np.ones((len(nums_list),len(nums_list)))
temp[1:,1:] = adjoin_matrix
temp[np.where(temp == 0)]=-1e9
adjoin_matrix = temp
x = np.array(nums_list).astype('int64')
y = np.array([label]).astype('float32')
return x, adjoin_matrix,y
def tf_numerical_smiles(self, smiles,label):
x,adjoin_matrix,y = tf.py_function(self.numerical_smiles, [smiles,label], [tf.int64, tf.float32 ,tf.float32])
x.set_shape([None])
adjoin_matrix.set_shape([None,None])
y.set_shape([None])
return x, adjoin_matrix , y
class predict_smiles(object):
def __init__(self,smiles ,normalize=False,max_len=1000,addH=True):
self.smiles_field = smiles
self.label_field = float(0)
self.vocab = str2num
self.devocab = num2str
#self.df = self.df[self.df[smiles_field].str.len()<=max_len]
self.addH = addH
if normalize:
self.max = self.df[self.label_field].max()
self.min = self.df[self.label_field].min()
self.df[self.label_field] = (self.df[self.label_field]-self.min)/(self.max-self.min)-0.5
self.value_range = self.max-self.min
def numerical_smiles(self, atoms_list,adj,label):
atom = np.array(atoms_list)
atoms_list = []
for i in atom:
if i not in [' ']:
atoms_list.append(str(i,encoding='utf-8'))
label = np.array(label)
adj = np.array(adj)
adjoin_matrix =adj
atoms_list = ['<global>'] + atoms_list
nums_list = [str2num.get(i,str2num['<unk>']) for i in atoms_list]
#temp = np.ones((len(nums_list),len(nums_list)))
#temp[1:, 1:] = adjoin_matrix
#adjoin_matrix = (1-temp)*(-1e9)
temp = np.ones((len(nums_list),len(nums_list)))
temp[1:,1:] = adjoin_matrix
temp[np.where(temp == 0)]=-1e9
adjoin_matrix = temp
x = np.array(nums_list).astype('int64')
y = np.array([label]).astype('float32')
return x, adjoin_matrix,y
def get_data(self):
atom, adj = smiles2adjoin( self.smiles_field)
atom = np.array(atom)
atoms_list = []
for i in atom:
if i not in [' ']:
atoms_list.append(i)
adj = np.array(adj)
adjoin_matrix = adj
self.dataset1 = tf.data.Dataset.from_tensors((atoms_list, adjoin_matrix, self.label_field))
self.dataset1 = self.dataset1.map(self.tf_numerical_smiles).cache().padded_batch(1, padded_shapes=(
tf.TensorShape([None]), tf.TensorShape([None,None]),tf.TensorShape([1])))
return self.dataset1
def tf_numerical_smiles(self, atoms_list,adj,label):
x,adjoin_matrix,y = tf.py_function(self.numerical_smiles, (atoms_list,adj,label), [tf.int64, tf.float32 ,tf.float32])
x.set_shape([None])
adjoin_matrix.set_shape([None,None])
y.set_shape([None])
return x, adjoin_matrix , y
class Graph_Regression_test(object):
def __init__(self,path,smiles_field=['0'],adj = ['1'], label_field=['2'],normalize=False,max_len=1000,addH=True):
if path.endswith('.txt') or path.endswith('.tsv'):
# self.df = pd.read_csv(path.format('train3'),sep='\t')
#self.dt = pd.read_csv(path.format('test3'),sep='\t')
self.dv = pd.read_csv(path.format('val3'),sep='\t')
else:
#self.df = pd.read_csv(path.format('train/train'))
#self.dt = pd.read_csv(path.format('test/test'))
self.dv = pd.read_csv(path.format('val/val'))
self.smiles_field = smiles_field
self.adj = adj
self.label_field = label_field
self.vocab = str2num
self.devocab = num2str
#self.df = self.df[self.df[smiles_field].str.len()<=max_len]
self.addH = addH
if normalize:
self.max = self.df[self.label_field].max()
self.min = self.df[self.label_field].min()
self.df[self.label_field] = (self.df[self.label_field]-self.min)/(self.max-self.min)-0.5
self.value_range = self.max-self.min
def get_data(self):
train_data = self.dv
#idx = train_data.sample(frac=0.9).index
# train_idx = []
# #idx = train_data.sample(frac=0.9).index
# train_idx.extend(idx)
# data1 = train_data[train_data.index.isin(train_idx)]
# data2 = train_data[~train_data.index.isin(train_idx)]
self.dataset1 = tf.data.Dataset.from_tensor_slices((train_data[self.smiles_field],train_data[self.adj], train_data[self.label_field]))
self.dataset1 = self.dataset1.map(self.tf_numerical_smiles).cache().padded_batch(64, padded_shapes=(
tf.TensorShape([None]), tf.TensorShape([None,None]),tf.TensorShape([1]))).prefetch(100)
return self.dataset1
def numerical_smiles(self, atom,adj,label):
atom = np.array(atom)
atom = atom[0].decode()
atom = atom.replace('\n','')
atom = atom.replace('[',' ')
atom = atom.replace(']',' ')
atom = atom.split("'")
atoms_list = []
for i in atom:
if i not in [' ']:
atoms_list.append(i)
label = np.array(label)[0]
adj = np.array(adj)[0].decode()
adjoin_matrix =np.load( adj )
atoms_list = ['<global>'] + atoms_list
nums_list = [str2num.get(i,str2num['<unk>']) for i in atoms_list]
#temp = np.ones((len(nums_list),len(nums_list)))
#temp[1:, 1:] = adjoin_matrix
#adjoin_matrix = (1-temp)*(-1e9)
temp = np.ones((len(nums_list),len(nums_list)))
temp[1:,1:] = adjoin_matrix
temp[np.where(temp == 0)]=-1e9
adjoin_matrix = temp
x = np.array(nums_list).astype('int64')
y = np.array([label]).astype('float32')
return x, adjoin_matrix,y
def tf_numerical_smiles(self, smiles,adj,label):
x,adjoin_matrix,y = tf.py_function(self.numerical_smiles, (smiles,adj,label), [tf.int64, tf.float32 ,tf.float32])
x.set_shape([None])
adjoin_matrix.set_shape([None,None])
y.set_shape([None])
return x, adjoin_matrix , y
class Graph_Regression(object):
def __init__(self,path,smiles_field=['0'],adj = ['1'], label_field=['2'],normalize=False,max_len=1000,addH=True):
if path.endswith('.txt') or path.endswith('.tsv'):
self.df = pd.read_csv(path.format('train3'),sep='\t')
self.dt = pd.read_csv(path.format('test3'),sep='\t')
#self.dv = pd.read_csv(path.format('val3'),sep='\t')
else:
self.df = pd.read_csv(path.format('train/train'))
self.dt = pd.read_csv(path.format('test/test'))
#self.dv = pd.read_csv(path.format('val3'))
self.smiles_field = smiles_field
self.adj = adj
self.label_field = label_field
self.vocab = str2num
self.devocab = num2str
#self.df = self.df[self.df[smiles_field].str.len()<=max_len]
self.addH = addH
if normalize:
self.max = self.df[self.label_field].max()
self.min = self.df[self.label_field].min()
self.df[self.label_field] = (self.df[self.label_field]-self.min)/(self.max-self.min)-0.5
self.value_range = self.max-self.min
def get_data(self):
train_data = self.df
test_data = self.dt
data2=test_data
#idx = train_data.sample(frac=0.9).index
# train_idx = []
# #idx = train_data.sample(frac=0.9).index
# train_idx.extend(idx)
# data1 = train_data[train_data.index.isin(train_idx)]
# data2 = train_data[~train_data.index.isin(train_idx)]
self.dataset1 = tf.data.Dataset.from_tensor_slices((train_data[self.smiles_field],train_data[self.adj], train_data[self.label_field]))
self.dataset1 = self.dataset1.map(self.tf_numerical_smiles).cache().padded_batch(64, padded_shapes=(
tf.TensorShape([None]), tf.TensorShape([None,None]),tf.TensorShape([1]))).prefetch(100)
self.dataset2 = tf.data.Dataset.from_tensor_slices((test_data[self.smiles_field], test_data[self.adj],test_data[self.label_field]))
self.dataset2 = self.dataset2.map(self.tf_numerical_smiles).padded_batch(64, padded_shapes=(
tf.TensorShape([None]),tf.TensorShape([None,None]), tf.TensorShape([1]))).cache().prefetch(100)
self.dataset3 = tf.data.Dataset.from_tensor_slices((data2[self.smiles_field],test_data[self.adj], data2[self.label_field]))
self.dataset3 = self.dataset3.map(self.tf_numerical_smiles).padded_batch(64, padded_shapes=(
tf.TensorShape([None]), tf.TensorShape([None, None]), tf.TensorShape([1]))).cache().prefetch(100)
return self.dataset1,self.dataset2,self.dataset3
def numerical_smiles(self, atom,adj,label):
atom = np.array(atom)
atom = atom[0].decode()
atom = atom.replace('\n','')
atom = atom.replace('[',' ')
atom = atom.replace(']',' ')
atom = atom.split("'")
atoms_list = []
for i in atom:
if i not in [' ']:
atoms_list.append(i)
label = np.array(label)[0]
adj = np.array(adj)[0].decode()
adjoin_matrix =np.load( adj )
atoms_list = ['<global>'] + atoms_list
nums_list = [str2num.get(i,str2num['<unk>']) for i in atoms_list]
#temp = np.ones((len(nums_list),len(nums_list)))
#temp[1:, 1:] = adjoin_matrix
#adjoin_matrix = (1-temp)*(-1e9)
temp = np.ones((len(nums_list),len(nums_list)))
temp[1:,1:] = adjoin_matrix
temp[np.where(temp == 0)]=-1e9
adjoin_matrix = temp
x = np.array(nums_list).astype('int64')
y = np.array([label]).astype('float32')
return x, adjoin_matrix,y
def tf_numerical_smiles(self, smiles,adj,label):
x,adjoin_matrix,y = tf.py_function(self.numerical_smiles, (smiles,adj,label), [tf.int64, tf.float32 ,tf.float32])
x.set_shape([None])
adjoin_matrix.set_shape([None,None])
y.set_shape([None])
return x, adjoin_matrix , y
class Inference_Dataset(object):
def __init__(self,path,smiles_field='Smiles',addH=True):
if path.endswith('.txt') or path.endswith('.tsv'):
self.df = pd.read_csv(path,sep='\t')
else:
self.df = pd.read_csv(path)
self.smiles_field = smiles_field
self.vocab = str2num
self.devocab = num2str
self.addH = addH
def get_data(self):
data = self.df
train_idx = []
idx = data.sample(frac=0.9).index
train_idx.extend(idx)
data1 = data[data.index.isin(train_idx)]
data2 = data[~data.index.isin(train_idx)]
print(len(data1))
self.dataset1 = tf.data.Dataset.from_tensor_slices(data1[self.smiles_field].tolist())
self.dataset1 = self.dataset1.map(self.tf_numerical_smiles).padded_batch(1, padded_shapes=(
tf.TensorShape([None]),tf.TensorShape([None,None]), tf.TensorShape([None]) ,tf.TensorShape([None]))).prefetch(50)
print(self.dataset1)
self.dataset2 = tf.data.Dataset.from_tensor_slices(data2[self.smiles_field].tolist())
self.dataset2 = self.dataset2.map(self.tf_numerical_smiles).padded_batch(1, padded_shapes=(
tf.TensorShape([None]), tf.TensorShape([None, None]), tf.TensorShape([None]),
tf.TensorShape([None]))).prefetch(50)
return self.dataset1, self.dataset2
def numerical_smiles(self, smiles):
smiles = smiles.numpy().decode()
atoms_list, adjoin_matrix = smiles2adjoins(smiles,explicit_hydrogens=self.addH)
print(atoms_list)
atoms_list = ['<global>'] + atoms_list
nums_list = [str2num.get(i,str2num['<unk>']) for i in atoms_list]
temp = np.ones((len(nums_list),len(nums_list)))
temp[1:,1:] = adjoin_matrix
temp[np.where(temp == 0)]=-1e9
adjoin_matrix = temp
choices = np.random.permutation(len(nums_list)-1)[:max(int(len(nums_list)*0.15),1)] + 1
y = np.array(nums_list).astype('int64')
x = np.array(nums_list).astype('int64')
return x, adjoin_matrix, [smiles],atoms_list
def tf_numerical_smiles(self, data):
# x,adjoin_matrix,y,weight = tf.py_function(self.balanced_numerical_smiles,
# [data], [tf.int64, tf.float32 ,tf.int64,tf.float32])
x, adjoin_matrix, y, weight = tf.py_function(self.numerical_smiles, [data],
[tf.int64, tf.float32, tf.int64, tf.float32])
smiles.set_shape([1])
atom_list.set_shape([None])
x.set_shape([None])
adjoin_matrix.set_shape([None,None])
y.set_shape([None])
weight.set_shape([None])
return x, adjoin_matrix,smiles,atom_list
class Inference_Dataset(object):
def __init__(self,sml_list,max_len=1000,addH=True):
self.vocab = str2num
self.devocab = num2str
self.sml_list = [i for i in sml_list if len(i)<max_len]
self.addH = addH
def get_data(self):
self.dataset = tf.data.Dataset.from_tensor_slices((self.sml_list,))
self.dataset = self.dataset.map(self.tf_numerical_smiles).padded_batch(64, padded_shapes=(
tf.TensorShape([None]), tf.TensorShape([None,None]),tf.TensorShape([1]),tf.TensorShape([None]))).cache().prefetch(20)
return self.dataset
def numerical_smiles(self, smiles):
smiles_origin = smiles
smiles = smiles.numpy().decode()
atoms_list, adjoin_matrix = smiles2adjoins(smiles)
atoms_list = ['<global>'] + atoms_list
nums_list = [str2num.get(i,str2num['<unk>']) for i in atoms_list]
temp = np.ones((len(nums_list),len(nums_list)))
temp[1:,1:] = adjoin_matrix
adjoin_matrix = (1-temp)*(-1e9)
x = np.array(nums_list).astype('int64')
return x, adjoin_matrix,[smiles], atoms_list
def tf_numerical_smiles(self, smiles):
x,adjoin_matrix,smiles,atom_list = tf.py_function(self.numerical_smiles, [smiles], [tf.int64, tf.float32,tf.string, tf.string])
x.set_shape([None])
adjoin_matrix.set_shape([None,None])
smiles.set_shape([1])
atom_list.set_shape([None])
return x, adjoin_matrix,smiles,atom_list