Spaces:
Running
Running
import pandas as pd | |
import numpy as np | |
from utils import mol_to_geognn_graph_data_MMFF3d as smiles2adjoin | |
import tensorflow as tf | |
str2num = {'<pad>':0 ,'H': 1, 'C': 2, 'N': 3, 'O': 4, 'F': 5, 'S': 6, 'Cl': 7, 'P': 8, 'Br': 9, | |
'B': 10,'I': 11,'Si':12,'Se':13,'<unk>':14,'<mask>':15,'<global>':16} | |
num2str = {i:j for j,i in str2num.items()} | |
class Graph_Bert_Dataset(object): | |
def __init__(self,path,smiles_field=['0'], adj=['1'],addH=True): | |
if path.endswith('.txt') or path.endswith('.tsv'): | |
self.df = pd.read_csv(path,sep='\n\t') | |
else: | |
self.df = pd.read_csv(path) | |
self.smiles_field = smiles_field | |
self.adj = adj | |
self.vocab = str2num | |
self.devocab = num2str | |
self.addH = addH | |
def get_data(self): | |
data = self.df | |
train_idx = [] | |
idx = data.sample(frac=0.9).index | |
train_idx.extend(idx) | |
data1 = data[data.index.isin(train_idx)] | |
data2 = data[~data.index.isin(train_idx)] | |
self.dataset1 = tf.data.Dataset.from_tensor_slices((data1[self.smiles_field],data1[self.adj])) | |
self.dataset1 = self.dataset1.map(self.tf_numerical_smiles).padded_batch(256, padded_shapes=( | |
tf.TensorShape([None]),tf.TensorShape([None,None]), tf.TensorShape([None]) ,tf.TensorShape([None]))).prefetch(50) | |
self.dataset2 = tf.data.Dataset.from_tensor_slices((data2[self.smiles_field],data2[self.adj])) | |
self.dataset2 = self.dataset2.map(self.tf_numerical_smiles).padded_batch(512, padded_shapes=( | |
tf.TensorShape([None]), tf.TensorShape([None, None]), tf.TensorShape([None]), | |
tf.TensorShape([None]))).prefetch(50) | |
return self.dataset1, self.dataset2 | |
def numerical_smiles(self, atom, adj): | |
#smiles = smiles.numpy().decode() | |
atom = np.array(atom) | |
atom = atom[0].decode() | |
atom = atom.replace('\n','') | |
atom = atom.replace('[',' ') | |
atom = atom.replace(']',' ') | |
atom = atom.split("'") | |
atoms_list = [] | |
for i in atom: | |
if i not in [' ']: | |
atoms_list.append(i) | |
adj = np.array(adj)[0].decode() | |
adjoin_matrix =np.load( adj ) | |
atoms_list = ['<global>'] + atoms_list | |
nums_list = [str2num.get(i,str2num['<unk>']) for i in atoms_list] | |
temp = np.ones((len(nums_list),len(nums_list))) | |
temp[1:,1:] = adjoin_matrix | |
temp[np.where(temp == 0)]=-1e9 | |
adjoin_matrix = temp | |
#adjoin_matrix = (1 - temp) * (-1e9) | |
choices = np.random.permutation(len(nums_list)-1)[:max(int(len(nums_list)*0.15),1)] + 1 | |
y = np.array(nums_list).astype('int64') | |
weight = np.zeros(len(nums_list)) | |
for i in choices: | |
rand = np.random.rand() | |
weight[i] = 1 | |
if rand < 0.8: | |
nums_list[i] = str2num['<mask>'] | |
elif rand < 0.9: | |
nums_list[i] = int(np.random.rand() * 14 + 1) | |
x = np.array(nums_list).astype('int64') | |
weight = weight.astype('float32') | |
return x, adjoin_matrix, y, weight | |
def tf_numerical_smiles(self, atom,adj): | |
#print(data) | |
# x,adjoin_matrix,y,weight = tf.py_function(self.balanced_numerical_smiles, | |
# [data], [tf.int64, tf.float32 ,tf.int64,tf.float32]) | |
x, adjoin_matrix, y, weight = tf.py_function(self.numerical_smiles, (atom, adj), | |
[tf.int64, tf.float32, tf.int64, tf.float32]) | |
x.set_shape([None]) | |
adjoin_matrix.set_shape([None,None]) | |
y.set_shape([None]) | |
weight.set_shape([None]) | |
return x, adjoin_matrix, y, weight | |
class Graph_Regression_Dataset_test(object): | |
def __init__(self,path,smiles_field='SMILES',label_field='PCE',normalize=False,max_len=1000,addH=True): | |
if path.endswith('.txt') or path.endswith('.tsv'): | |
self.df = pd.read_csv(path.format('test'),sep='\t') | |
else: | |
self.df = pd.read_csv(path.format('test')) | |
self.smiles_field = smiles_field | |
self.label_field = label_field | |
self.vocab = str2num | |
self.devocab = num2str | |
self.df = self.df[self.df[smiles_field].str.len()<=max_len] | |
self.addH = addH | |
if normalize: | |
self.max = self.df[self.label_field].max() | |
self.min = self.df[self.label_field].min() | |
self.df[self.label_field] = (self.df[self.label_field]-self.min)/(self.max-self.min)-0.5 | |
self.value_range = self.max-self.min | |
def get_data(self): | |
train_data = self.df | |
self.dataset1 = tf.data.Dataset.from_tensor_slices((train_data[self.smiles_field], train_data[self.label_field])) | |
self.dataset1 = self.dataset1.map(self.tf_numerical_smiles).padded_batch(64, padded_shapes=( | |
tf.TensorShape([None]), tf.TensorShape([None,None]),tf.TensorShape([1]))) | |
return self.dataset1 | |
def numerical_smiles(self, smiles,label): | |
smiles = smiles.numpy().decode() | |
atoms_list, adjoin_matrix = smiles2adjoins(smiles) | |
atoms_list = list(atoms_list) | |
atoms_list = ['<global>'] + atoms_list | |
nums_list = [str2num.get(i,str2num['<unk>']) for i in atoms_list] | |
temp = np.ones((len(nums_list),len(nums_list))) | |
temp[1:,1:] = adjoin_matrix | |
temp[np.where(temp == 0)]=-1e9 | |
adjoin_matrix = temp | |
x = np.array(nums_list).astype('int64') | |
y = np.array([label]).astype('float32') | |
return x, adjoin_matrix,y | |
def tf_numerical_smiles(self, smiles,label): | |
x,adjoin_matrix,y = tf.py_function(self.numerical_smiles, [smiles,label], [tf.int64, tf.float32 ,tf.float32]) | |
x.set_shape([None]) | |
adjoin_matrix.set_shape([None,None]) | |
y.set_shape([None]) | |
return x, adjoin_matrix , y | |
class predict_smiles(object): | |
def __init__(self,smiles ,normalize=False,max_len=1000,addH=True): | |
self.smiles_field = smiles | |
self.label_field = float(0) | |
self.vocab = str2num | |
self.devocab = num2str | |
#self.df = self.df[self.df[smiles_field].str.len()<=max_len] | |
self.addH = addH | |
if normalize: | |
self.max = self.df[self.label_field].max() | |
self.min = self.df[self.label_field].min() | |
self.df[self.label_field] = (self.df[self.label_field]-self.min)/(self.max-self.min)-0.5 | |
self.value_range = self.max-self.min | |
def numerical_smiles(self, atoms_list,adj,label): | |
atom = np.array(atoms_list) | |
atoms_list = [] | |
for i in atom: | |
if i not in [' ']: | |
atoms_list.append(str(i,encoding='utf-8')) | |
label = np.array(label) | |
adj = np.array(adj) | |
adjoin_matrix =adj | |
atoms_list = ['<global>'] + atoms_list | |
nums_list = [str2num.get(i,str2num['<unk>']) for i in atoms_list] | |
#temp = np.ones((len(nums_list),len(nums_list))) | |
#temp[1:, 1:] = adjoin_matrix | |
#adjoin_matrix = (1-temp)*(-1e9) | |
temp = np.ones((len(nums_list),len(nums_list))) | |
temp[1:,1:] = adjoin_matrix | |
temp[np.where(temp == 0)]=-1e9 | |
adjoin_matrix = temp | |
x = np.array(nums_list).astype('int64') | |
y = np.array([label]).astype('float32') | |
return x, adjoin_matrix,y | |
def get_data(self): | |
atom, adj = smiles2adjoin( self.smiles_field) | |
atom = np.array(atom) | |
atoms_list = [] | |
for i in atom: | |
if i not in [' ']: | |
atoms_list.append(i) | |
adj = np.array(adj) | |
adjoin_matrix = adj | |
self.dataset1 = tf.data.Dataset.from_tensors((atoms_list, adjoin_matrix, self.label_field)) | |
self.dataset1 = self.dataset1.map(self.tf_numerical_smiles).cache().padded_batch(1, padded_shapes=( | |
tf.TensorShape([None]), tf.TensorShape([None,None]),tf.TensorShape([1]))) | |
return self.dataset1 | |
def tf_numerical_smiles(self, atoms_list,adj,label): | |
x,adjoin_matrix,y = tf.py_function(self.numerical_smiles, (atoms_list,adj,label), [tf.int64, tf.float32 ,tf.float32]) | |
x.set_shape([None]) | |
adjoin_matrix.set_shape([None,None]) | |
y.set_shape([None]) | |
return x, adjoin_matrix , y | |
class Graph_Regression_test(object): | |
def __init__(self,path,smiles_field=['0'],adj = ['1'], label_field=['2'],normalize=False,max_len=1000,addH=True): | |
if path.endswith('.txt') or path.endswith('.tsv'): | |
# self.df = pd.read_csv(path.format('train3'),sep='\t') | |
#self.dt = pd.read_csv(path.format('test3'),sep='\t') | |
self.dv = pd.read_csv(path.format('val3'),sep='\t') | |
else: | |
#self.df = pd.read_csv(path.format('train/train')) | |
#self.dt = pd.read_csv(path.format('test/test')) | |
self.dv = pd.read_csv(path.format('val/val')) | |
self.smiles_field = smiles_field | |
self.adj = adj | |
self.label_field = label_field | |
self.vocab = str2num | |
self.devocab = num2str | |
#self.df = self.df[self.df[smiles_field].str.len()<=max_len] | |
self.addH = addH | |
if normalize: | |
self.max = self.df[self.label_field].max() | |
self.min = self.df[self.label_field].min() | |
self.df[self.label_field] = (self.df[self.label_field]-self.min)/(self.max-self.min)-0.5 | |
self.value_range = self.max-self.min | |
def get_data(self): | |
train_data = self.dv | |
#idx = train_data.sample(frac=0.9).index | |
# train_idx = [] | |
# #idx = train_data.sample(frac=0.9).index | |
# train_idx.extend(idx) | |
# data1 = train_data[train_data.index.isin(train_idx)] | |
# data2 = train_data[~train_data.index.isin(train_idx)] | |
self.dataset1 = tf.data.Dataset.from_tensor_slices((train_data[self.smiles_field],train_data[self.adj], train_data[self.label_field])) | |
self.dataset1 = self.dataset1.map(self.tf_numerical_smiles).cache().padded_batch(64, padded_shapes=( | |
tf.TensorShape([None]), tf.TensorShape([None,None]),tf.TensorShape([1]))).prefetch(100) | |
return self.dataset1 | |
def numerical_smiles(self, atom,adj,label): | |
atom = np.array(atom) | |
atom = atom[0].decode() | |
atom = atom.replace('\n','') | |
atom = atom.replace('[',' ') | |
atom = atom.replace(']',' ') | |
atom = atom.split("'") | |
atoms_list = [] | |
for i in atom: | |
if i not in [' ']: | |
atoms_list.append(i) | |
label = np.array(label)[0] | |
adj = np.array(adj)[0].decode() | |
adjoin_matrix =np.load( adj ) | |
atoms_list = ['<global>'] + atoms_list | |
nums_list = [str2num.get(i,str2num['<unk>']) for i in atoms_list] | |
#temp = np.ones((len(nums_list),len(nums_list))) | |
#temp[1:, 1:] = adjoin_matrix | |
#adjoin_matrix = (1-temp)*(-1e9) | |
temp = np.ones((len(nums_list),len(nums_list))) | |
temp[1:,1:] = adjoin_matrix | |
temp[np.where(temp == 0)]=-1e9 | |
adjoin_matrix = temp | |
x = np.array(nums_list).astype('int64') | |
y = np.array([label]).astype('float32') | |
return x, adjoin_matrix,y | |
def tf_numerical_smiles(self, smiles,adj,label): | |
x,adjoin_matrix,y = tf.py_function(self.numerical_smiles, (smiles,adj,label), [tf.int64, tf.float32 ,tf.float32]) | |
x.set_shape([None]) | |
adjoin_matrix.set_shape([None,None]) | |
y.set_shape([None]) | |
return x, adjoin_matrix , y | |
class Graph_Regression(object): | |
def __init__(self,path,smiles_field=['0'],adj = ['1'], label_field=['2'],normalize=False,max_len=1000,addH=True): | |
if path.endswith('.txt') or path.endswith('.tsv'): | |
self.df = pd.read_csv(path.format('train3'),sep='\t') | |
self.dt = pd.read_csv(path.format('test3'),sep='\t') | |
#self.dv = pd.read_csv(path.format('val3'),sep='\t') | |
else: | |
self.df = pd.read_csv(path.format('train/train')) | |
self.dt = pd.read_csv(path.format('test/test')) | |
#self.dv = pd.read_csv(path.format('val3')) | |
self.smiles_field = smiles_field | |
self.adj = adj | |
self.label_field = label_field | |
self.vocab = str2num | |
self.devocab = num2str | |
#self.df = self.df[self.df[smiles_field].str.len()<=max_len] | |
self.addH = addH | |
if normalize: | |
self.max = self.df[self.label_field].max() | |
self.min = self.df[self.label_field].min() | |
self.df[self.label_field] = (self.df[self.label_field]-self.min)/(self.max-self.min)-0.5 | |
self.value_range = self.max-self.min | |
def get_data(self): | |
train_data = self.df | |
test_data = self.dt | |
data2=test_data | |
#idx = train_data.sample(frac=0.9).index | |
# train_idx = [] | |
# #idx = train_data.sample(frac=0.9).index | |
# train_idx.extend(idx) | |
# data1 = train_data[train_data.index.isin(train_idx)] | |
# data2 = train_data[~train_data.index.isin(train_idx)] | |
self.dataset1 = tf.data.Dataset.from_tensor_slices((train_data[self.smiles_field],train_data[self.adj], train_data[self.label_field])) | |
self.dataset1 = self.dataset1.map(self.tf_numerical_smiles).cache().padded_batch(64, padded_shapes=( | |
tf.TensorShape([None]), tf.TensorShape([None,None]),tf.TensorShape([1]))).prefetch(100) | |
self.dataset2 = tf.data.Dataset.from_tensor_slices((test_data[self.smiles_field], test_data[self.adj],test_data[self.label_field])) | |
self.dataset2 = self.dataset2.map(self.tf_numerical_smiles).padded_batch(64, padded_shapes=( | |
tf.TensorShape([None]),tf.TensorShape([None,None]), tf.TensorShape([1]))).cache().prefetch(100) | |
self.dataset3 = tf.data.Dataset.from_tensor_slices((data2[self.smiles_field],test_data[self.adj], data2[self.label_field])) | |
self.dataset3 = self.dataset3.map(self.tf_numerical_smiles).padded_batch(64, padded_shapes=( | |
tf.TensorShape([None]), tf.TensorShape([None, None]), tf.TensorShape([1]))).cache().prefetch(100) | |
return self.dataset1,self.dataset2,self.dataset3 | |
def numerical_smiles(self, atom,adj,label): | |
atom = np.array(atom) | |
atom = atom[0].decode() | |
atom = atom.replace('\n','') | |
atom = atom.replace('[',' ') | |
atom = atom.replace(']',' ') | |
atom = atom.split("'") | |
atoms_list = [] | |
for i in atom: | |
if i not in [' ']: | |
atoms_list.append(i) | |
label = np.array(label)[0] | |
adj = np.array(adj)[0].decode() | |
adjoin_matrix =np.load( adj ) | |
atoms_list = ['<global>'] + atoms_list | |
nums_list = [str2num.get(i,str2num['<unk>']) for i in atoms_list] | |
#temp = np.ones((len(nums_list),len(nums_list))) | |
#temp[1:, 1:] = adjoin_matrix | |
#adjoin_matrix = (1-temp)*(-1e9) | |
temp = np.ones((len(nums_list),len(nums_list))) | |
temp[1:,1:] = adjoin_matrix | |
temp[np.where(temp == 0)]=-1e9 | |
adjoin_matrix = temp | |
x = np.array(nums_list).astype('int64') | |
y = np.array([label]).astype('float32') | |
return x, adjoin_matrix,y | |
def tf_numerical_smiles(self, smiles,adj,label): | |
x,adjoin_matrix,y = tf.py_function(self.numerical_smiles, (smiles,adj,label), [tf.int64, tf.float32 ,tf.float32]) | |
x.set_shape([None]) | |
adjoin_matrix.set_shape([None,None]) | |
y.set_shape([None]) | |
return x, adjoin_matrix , y | |
class Inference_Dataset(object): | |
def __init__(self,path,smiles_field='Smiles',addH=True): | |
if path.endswith('.txt') or path.endswith('.tsv'): | |
self.df = pd.read_csv(path,sep='\t') | |
else: | |
self.df = pd.read_csv(path) | |
self.smiles_field = smiles_field | |
self.vocab = str2num | |
self.devocab = num2str | |
self.addH = addH | |
def get_data(self): | |
data = self.df | |
train_idx = [] | |
idx = data.sample(frac=0.9).index | |
train_idx.extend(idx) | |
data1 = data[data.index.isin(train_idx)] | |
data2 = data[~data.index.isin(train_idx)] | |
print(len(data1)) | |
self.dataset1 = tf.data.Dataset.from_tensor_slices(data1[self.smiles_field].tolist()) | |
self.dataset1 = self.dataset1.map(self.tf_numerical_smiles).padded_batch(1, padded_shapes=( | |
tf.TensorShape([None]),tf.TensorShape([None,None]), tf.TensorShape([None]) ,tf.TensorShape([None]))).prefetch(50) | |
print(self.dataset1) | |
self.dataset2 = tf.data.Dataset.from_tensor_slices(data2[self.smiles_field].tolist()) | |
self.dataset2 = self.dataset2.map(self.tf_numerical_smiles).padded_batch(1, padded_shapes=( | |
tf.TensorShape([None]), tf.TensorShape([None, None]), tf.TensorShape([None]), | |
tf.TensorShape([None]))).prefetch(50) | |
return self.dataset1, self.dataset2 | |
def numerical_smiles(self, smiles): | |
smiles = smiles.numpy().decode() | |
atoms_list, adjoin_matrix = smiles2adjoins(smiles,explicit_hydrogens=self.addH) | |
print(atoms_list) | |
atoms_list = ['<global>'] + atoms_list | |
nums_list = [str2num.get(i,str2num['<unk>']) for i in atoms_list] | |
temp = np.ones((len(nums_list),len(nums_list))) | |
temp[1:,1:] = adjoin_matrix | |
temp[np.where(temp == 0)]=-1e9 | |
adjoin_matrix = temp | |
choices = np.random.permutation(len(nums_list)-1)[:max(int(len(nums_list)*0.15),1)] + 1 | |
y = np.array(nums_list).astype('int64') | |
x = np.array(nums_list).astype('int64') | |
return x, adjoin_matrix, [smiles],atoms_list | |
def tf_numerical_smiles(self, data): | |
# x,adjoin_matrix,y,weight = tf.py_function(self.balanced_numerical_smiles, | |
# [data], [tf.int64, tf.float32 ,tf.int64,tf.float32]) | |
x, adjoin_matrix, y, weight = tf.py_function(self.numerical_smiles, [data], | |
[tf.int64, tf.float32, tf.int64, tf.float32]) | |
smiles.set_shape([1]) | |
atom_list.set_shape([None]) | |
x.set_shape([None]) | |
adjoin_matrix.set_shape([None,None]) | |
y.set_shape([None]) | |
weight.set_shape([None]) | |
return x, adjoin_matrix,smiles,atom_list | |
class Inference_Dataset(object): | |
def __init__(self,sml_list,max_len=1000,addH=True): | |
self.vocab = str2num | |
self.devocab = num2str | |
self.sml_list = [i for i in sml_list if len(i)<max_len] | |
self.addH = addH | |
def get_data(self): | |
self.dataset = tf.data.Dataset.from_tensor_slices((self.sml_list,)) | |
self.dataset = self.dataset.map(self.tf_numerical_smiles).padded_batch(64, padded_shapes=( | |
tf.TensorShape([None]), tf.TensorShape([None,None]),tf.TensorShape([1]),tf.TensorShape([None]))).cache().prefetch(20) | |
return self.dataset | |
def numerical_smiles(self, smiles): | |
smiles_origin = smiles | |
smiles = smiles.numpy().decode() | |
atoms_list, adjoin_matrix = smiles2adjoins(smiles) | |
atoms_list = ['<global>'] + atoms_list | |
nums_list = [str2num.get(i,str2num['<unk>']) for i in atoms_list] | |
temp = np.ones((len(nums_list),len(nums_list))) | |
temp[1:,1:] = adjoin_matrix | |
adjoin_matrix = (1-temp)*(-1e9) | |
x = np.array(nums_list).astype('int64') | |
return x, adjoin_matrix,[smiles], atoms_list | |
def tf_numerical_smiles(self, smiles): | |
x,adjoin_matrix,smiles,atom_list = tf.py_function(self.numerical_smiles, [smiles], [tf.int64, tf.float32,tf.string, tf.string]) | |
x.set_shape([None]) | |
adjoin_matrix.set_shape([None,None]) | |
smiles.set_shape([1]) | |
atom_list.set_shape([None]) | |
return x, adjoin_matrix,smiles,atom_list | |