import multiprocessing as mp import os import tempfile import time from pathlib import Path import gradio as gr import numpy as np import pandas as pd from matplotlib import pyplot as plt plt.ioff() plt.rcParams["font.family"] = [ "IBM Plex Mono", # Fallback fonts: "DejaVu Sans Mono", "Courier New", "monospace", ] empty_df = lambda: pd.DataFrame( { "equation": [], "loss": [], "complexity": [], } ) test_equations = ["sin(2*x)/x + 0.1*x"] def generate_data(s: str, num_points: int, noise_level: float, data_seed: int): rstate = np.random.RandomState(data_seed) x = rstate.uniform(-10, 10, num_points) for k, v in { "sin": "np.sin", "cos": "np.cos", "exp": "np.exp", "log": "np.log", "tan": "np.tan", "^": "**", }.items(): s = s.replace(k, v) y = eval(s) noise = rstate.normal(0, noise_level, y.shape) y_noisy = y + noise return pd.DataFrame({"x": x}), y_noisy def _greet_dispatch( file_input, force_run, test_equation, num_points, noise_level, data_seed, niterations, maxsize, binary_operators, unary_operators, plot_update_delay, parsimony, populations, population_size, ncycles_per_iteration, elementwise_loss, adaptive_parsimony_scaling, optimizer_algorithm, optimizer_iterations, batching, batch_size, ): """Load data, then spawn a process to run the greet function.""" if file_input is not None: # Look at some statistics of the file: df = pd.read_csv(file_input) if len(df) == 0: return ( empty_df(), "The file is empty!", ) if len(df.columns) == 1: return ( empty_df(), "The file has only one column!", ) if len(df) > 10_000 and not force_run: return ( empty_df(), "You have uploaded a file with more than 10,000 rows. " "This will take very long to run. " "Please upload a subsample of the data, " "or check the box 'Ignore Warnings'.", ) col_to_fit = df.columns[-1] y = np.array(df[col_to_fit]) X = df.drop([col_to_fit], axis=1) else: X, y = generate_data(test_equation, num_points, noise_level, data_seed) with tempfile.TemporaryDirectory() as tmpdirname: base = Path(tmpdirname) equation_file = base / "hall_of_fame.csv" equation_file_bkup = base / "hall_of_fame.csv.bkup" process = mp.Process( target=greet, kwargs=dict( X=X, y=y, niterations=niterations, maxsize=maxsize, binary_operators=binary_operators, unary_operators=unary_operators, equation_file=equation_file, parsimony=parsimony, populations=populations, population_size=population_size, ncycles_per_iteration=ncycles_per_iteration, elementwise_loss=elementwise_loss, adaptive_parsimony_scaling=adaptive_parsimony_scaling, optimizer_algorithm=optimizer_algorithm, optimizer_iterations=optimizer_iterations, batching=batching, batch_size=batch_size, ), ) process.start() last_yield_time = None while process.is_alive(): if equation_file_bkup.exists(): try: # First, copy the file to a the copy file equation_file_copy = base / "hall_of_fame_copy.csv" os.system(f"cp {equation_file_bkup} {equation_file_copy}") equations = pd.read_csv(equation_file_copy) # Ensure it is pareto dominated, with more complex expressions # having higher loss. Otherwise remove those rows. # TODO: Not sure why this occurs; could be the result of a late copy? equations.sort_values("Complexity", ascending=True, inplace=True) equations.reset_index(inplace=True) bad_idx = [] min_loss = None for i in equations.index: if min_loss is None or equations.loc[i, "Loss"] < min_loss: min_loss = float(equations.loc[i, "Loss"]) else: bad_idx.append(i) equations.drop(index=bad_idx, inplace=True) while ( last_yield_time is not None and time.time() - last_yield_time < plot_update_delay ): time.sleep(0.1) yield equations[["Complexity", "Loss", "Equation"]] last_yield_time = time.time() except pd.errors.EmptyDataError: pass process.join() def greet( *, X, y, **pysr_kwargs, ): import pysr model = pysr.PySRRegressor( progress=False, timeout_in_seconds=1000, **pysr_kwargs, ) model.fit(X, y) return 0 def _data_layout(): with gr.Tab("Example Data"): # Plot of the example data: with gr.Row(): with gr.Column(): example_plot = gr.Plot() with gr.Column(): test_equation = gr.Radio( test_equations, value=test_equations[0], label="Test Equation" ) num_points = gr.Slider( minimum=10, maximum=1000, value=200, label="Number of Data Points", step=1, ) noise_level = gr.Slider( minimum=0, maximum=1, value=0.05, label="Noise Level" ) data_seed = gr.Number(value=0, label="Random Seed") with gr.Tab("Upload Data"): file_input = gr.File(label="Upload a CSV File") gr.Markdown( "The rightmost column of your CSV file will be used as the target variable." ) return dict( file_input=file_input, test_equation=test_equation, num_points=num_points, noise_level=noise_level, data_seed=data_seed, example_plot=example_plot, ) def _settings_layout(): with gr.Tab("Basic Settings"): binary_operators = gr.CheckboxGroup( choices=["+", "-", "*", "/", "^", "max", "min", "mod", "cond"], label="Binary Operators", value=["+", "-", "*", "/"], ) unary_operators = gr.CheckboxGroup( choices=[ "sin", "cos", "tan", "exp", "log", "square", "cube", "sqrt", "abs", "erf", "relu", "round", "sign", ], label="Unary Operators", value=["sin"], ) niterations = gr.Slider( minimum=1, maximum=1000, value=40, label="Number of Iterations", step=1, ) maxsize = gr.Slider( minimum=7, maximum=100, value=20, label="Maximum Complexity", step=1, ) parsimony = gr.Number( value=0.0032, label="Parsimony Coefficient", ) with gr.Tab("Advanced Settings"): populations = gr.Slider( minimum=2, maximum=100, value=15, label="Number of Populations", step=1, ) population_size = gr.Slider( minimum=2, maximum=1000, value=33, label="Population Size", step=1, ) ncycles_per_iteration = gr.Number( value=550, label="Cycles per Iteration", ) elementwise_loss = gr.Radio( ["L2DistLoss()", "L1DistLoss()", "LogitDistLoss()", "HuberLoss()"], value="L2DistLoss()", label="Loss Function", ) adaptive_parsimony_scaling = gr.Number( value=20.0, label="Adaptive Parsimony Scaling", ) optimizer_algorithm = gr.Radio( ["BFGS", "NelderMead"], value="BFGS", label="Optimizer Algorithm", ) optimizer_iterations = gr.Slider( minimum=1, maximum=100, value=8, label="Optimizer Iterations", step=1, ) # Bool: batching = gr.Checkbox( value=False, label="Batching", ) batch_size = gr.Slider( minimum=2, maximum=1000, value=50, label="Batch Size", step=1, ) with gr.Tab("Gradio Settings"): plot_update_delay = gr.Slider( minimum=1, maximum=100, value=3, label="Plot Update Delay", ) force_run = gr.Checkbox( value=False, label="Ignore Warnings", ) return dict( binary_operators=binary_operators, unary_operators=unary_operators, niterations=niterations, maxsize=maxsize, force_run=force_run, plot_update_delay=plot_update_delay, parsimony=parsimony, populations=populations, population_size=population_size, ncycles_per_iteration=ncycles_per_iteration, elementwise_loss=elementwise_loss, adaptive_parsimony_scaling=adaptive_parsimony_scaling, optimizer_algorithm=optimizer_algorithm, optimizer_iterations=optimizer_iterations, batching=batching, batch_size=batch_size, ) def main(): blocks = {} with gr.Blocks() as demo: with gr.Row(): with gr.Column(): with gr.Row(): blocks = {**blocks, **_data_layout()} with gr.Row(): blocks = {**blocks, **_settings_layout()} with gr.Column(): with gr.Tab("Pareto Front"): blocks["pareto"] = gr.Plot() with gr.Tab("Predictions"): blocks["predictions_plot"] = gr.Plot() blocks["df"] = gr.Dataframe( headers=["complexity", "loss", "equation"], datatype=["number", "number", "str"], wrap=True, column_widths=[75, 75, 200], interactive=False, ) blocks["run"] = gr.Button() blocks["run"].click( _greet_dispatch, inputs=[ blocks[k] for k in [ "file_input", "force_run", "test_equation", "num_points", "noise_level", "data_seed", "niterations", "maxsize", "binary_operators", "unary_operators", "plot_update_delay", "parsimony", "populations", "population_size", "ncycles_per_iteration", "elementwise_loss", "adaptive_parsimony_scaling", "optimizer_algorithm", "optimizer_iterations", "batching", "batch_size", ] ], outputs=blocks["df"], ) # Any update to the equation choice will trigger a replot: eqn_components = [ blocks["test_equation"], blocks["num_points"], blocks["noise_level"], blocks["data_seed"], ] for eqn_component in eqn_components: eqn_component.change(replot, eqn_components, blocks["example_plot"]) # Update plot when dataframe is updated: blocks["df"].change( replot_pareto, inputs=[blocks["df"], blocks["maxsize"]], outputs=[blocks["pareto"]], ) demo.load(replot, eqn_components, blocks["example_plot"]) demo.launch(debug=True) def replot_pareto(df, maxsize): fig, ax = plt.subplots(figsize=(6, 6), dpi=100) if len(df) == 0 or "Equation" not in df.columns: return fig # Plotting the data ax.loglog( df["Complexity"], df["Loss"], marker="o", linestyle="-", color="#333f48", linewidth=1.5, markersize=6, ) # Set the axis limits ax.set_xlim(0.5, maxsize + 1) ytop = 2 ** (np.ceil(np.log2(df["Loss"].max()))) ybottom = 2 ** (np.floor(np.log2(df["Loss"].min() + 1e-20))) ax.set_ylim(ybottom, ytop) ax.grid(True, which="both", ls="--", linewidth=0.5, color="gray", alpha=0.5) ax.spines["top"].set_visible(False) ax.spines["right"].set_visible(False) # Range-frame the plot for direction in ["bottom", "left"]: ax.spines[direction].set_position(("outward", 10)) # Delete far ticks ax.tick_params(axis="both", which="major", labelsize=10, direction="out", length=5) ax.tick_params(axis="both", which="minor", labelsize=8, direction="out", length=3) ax.set_xlabel("Complexity") ax.set_ylabel("Loss") fig.tight_layout(pad=2) return fig def replot(test_equation, num_points, noise_level, data_seed): X, y = generate_data(test_equation, num_points, noise_level, data_seed) x = X["x"] plt.rcParams["font.family"] = "IBM Plex Mono" fig, ax = plt.subplots(figsize=(6, 6), dpi=100) ax.scatter(x, y, alpha=0.7, edgecolors="w", s=50) ax.grid(True, which="both", ls="--", linewidth=0.5, color="gray", alpha=0.5) ax.spines["top"].set_visible(False) ax.spines["right"].set_visible(False) # Range-frame the plot for direction in ["bottom", "left"]: ax.spines[direction].set_position(("outward", 10)) # Delete far ticks ax.tick_params(axis="both", which="major", labelsize=10, direction="out", length=5) ax.tick_params(axis="both", which="minor", labelsize=8, direction="out", length=3) ax.set_xlabel("x") ax.set_ylabel("y") fig.tight_layout(pad=2) return fig if __name__ == "__main__": main()