PySR / pysr /feynman_problems.py
MilesCranmer's picture
Rename to feynman_problems.py
cc26e99
raw
history blame
6.08 kB
import numpy as np
import csv
import traceback
from sr import pysr, best
class Problem:
"""
Problem API to work with PySR.
Has attributes: X, y as pysr accepts, form which is a string representing the correct equation and variable_names
Should be able to call pysr(problem.X, problem.y, var_names=problem.var_names) and have it work
"""
def __init__(self, X, y, form=None, variable_names=None):
self.X = X
self.y = y
self.form = form
self.variable_names = variable_names
class FeynmanProblem(Problem):
"""
Stores the data for the problems from the 100 Feynman Equations on Physics.
This is the benchmark used in the AI Feynman Paper
"""
def __init__(self, row, gen=False, dp=500):
"""
row: a row read as a dict from the FeynmanEquations dataset provided in the datasets folder of the repo
gen: If true the problem will have dp X and y values randomly generated else they will be None
"""
self.eq_id = row['Filename']
#self.form = row['Formula']
self.n_vars = int(row['# variables'])
super(FeynmanProblem, self).__init__(None, None, form=row['Formula'],
variable_names=[row[f'v{i + 1}_name'] for i in range(self.n_vars)])
#self.var_names = [row[f'v{i+1}_name'] for i in range(self.n_vars)]
self.low = [float(row[f'v{i+1}_low']) for i in range(self.n_vars)]
self.high = [float(row[f'v{i+1}_high']) for i in range(self.n_vars)]
self.dp = dp#int(row[f'datapoints'])
#self.X = None
#self.Y = None
if gen:
self.X = np.random.uniform(0.01, 25, size=(self.dp, self.n_vars))
d = {}
for var in range(len(self.variable_names)):
d[self.variable_names[var]] = self.X[:, var]
d['exp'] = np.exp
d['sqrt'] = np.sqrt
d['pi'] = np.pi
d['cos'] = np.cos
d['sin'] = np.sin
d['tan'] = np.tan
d['tanh'] = np.tanh
d['ln'] = np.log
d['log'] = np.log # Quite sure the Feynman dataset has no base 10 logs
d['arcsin'] = np.arcsin
self.y = eval(self.form,d)
return
def __str__(self):
return f"Feynman Equation: {self.eq_id}|Form: {self.form}"
def __repr__(self):
return str(self)
def mk_problems(first=100, gen=False, dp=500, data_dir="datasets/FeynmanEquations.csv"):
"""
first: the first "first" equations from the dataset will be made into problems
data_dir: the path pointing to the Feynman Equations csv
returns: list of FeynmanProblems
"""
ret = []
with open(data_dir) as csvfile:
ind = 0
reader = csv.DictReader(csvfile)
for i, row in enumerate(reader):
if ind > first:
break
if row['Filename'] == '': continue
try:
p = FeynmanProblem(row, gen=gen, dp=dp)
ret.append(p)
except Exception as e:
traceback.print_exc()
#print(row)
print(f"FAILED ON ROW {i}")
ind += 1
return ret
def run_on_problem(problem, verbosity=0, multiprocessing=True):
"""
Takes in a problem and returns a tuple: (equations, best predicted equation, actual equation)
"""
from time import time
starting = time()
equations = pysr(problem.X, problem.y, variable_names=problem.variable_names, verbosity=verbosity,)
timing = time()-starting
others = {"time": timing, "problem": problem}
if not multiprocessing:
others['equations'] = equations
return str(best(equations)), problem.form, others
def do_feynman_experiments_parallel(first=100, verbosity=0, dp=500, output_file_path="experiments/FeynmanExperiment.csv", data_dir="datasets/FeynmanEquations.csv"):
import multiprocessing as mp
from tqdm import tqdm
problems = FeynmanProblem.mk_problems(first=first, gen=True, dp=dp, data_dir=data_dir)
ids = []
predictions = []
true_equations = []
time_takens = []
pool = mp.Pool()
results = []
with tqdm(total=len(problems)) as pbar:
for i, res in enumerate(pool.imap(run_on_problem, problems)):
results.append(res)
pbar.update()
for res in results:
prediction, true_equation, others = res
problem = others['problem']
ids.append(problem.eq_id)
predictions.append(prediction)
true_equations.append(true_equation)
time_takens.append(others['time'])
with open(output_file_path, 'a') as f:
writer = csv.writer(f, delimiter=',')
writer.writerow(['ID', 'Predicted', 'True', 'Time'])
for i in range(len(ids)):
writer.writerow([ids[i], predictions[i], true_equations[i], time_takens[i]])
return
def do_feynman_experiments(first=100, verbosity=0, dp=500, output_file_path="experiments/FeynmanExperiment.csv", data_dir="datasets/FeynmanEquations.csv"):
from tqdm import tqdm
problems = FeynmanProblem.mk_problems(first=first, gen=True, dp=dp, data_dir=data_dir)
indx = range(len(problems))
ids = []
predictions = []
true_equations = []
time_takens = []
for problem in tqdm(problems):
prediction, true_equation, others = run_on_problem(problem, verbosity)
ids.append(problem.eq_id)
predictions.append(prediction)
true_equations.append(true_equation)
time_takens.append(others['time'])
with open(output_file_path, 'a') as f:
writer = csv.writer(f, delimiter=',')
writer.writerow(['ID', 'Predicted', 'True', 'Time'])
for i in range(len(ids)):
writer.writerow([ids[i], predictions[i], true_equations[i], time_takens[i]])
return
if __name__ == "__main__":
do_feynman_experiments_parallel(first=10)