from fmpy import * from fmpy import read_model_description, extract from fmpy.fmi2 import FMU2Slave import numpy as np import shutil import pandas as pd import random import plotly.graph_objects as go import json from fmpy import * from fmpy import read_model_description, extract from fmpy.fmi2 import FMU2Slave import numpy as np import shutil import pandas as pd import random import plotly.graph_objects as go import json df_profile = pd.read_csv("profile_processed.csv") def getProfileFromID(id): return df_profile[df_profile.ID==id].iloc[0, 1:].to_list() def simulation(id, kp, ki, kd, bis_target=40, min_noise=50, max_noise=150): profile = getProfileFromID(id) age = profile[0] weight = profile[1] height = profile[2] gender = profile[3] vrs = {} fmu = 'Pharmacokinetics_4_comportmental_model_PI_ref_FMU_base4_OAAS_lnx.fmu' model_description = read_model_description(fmu) for variable in model_description.modelVariables: vrs[variable.name] = variable.valueReference start_time = 0.0 stop_time = 7000 step_size = 1 unzipdir = extract(fmu) fmu = FMU2Slave(guid=model_description.guid, unzipDirectory=unzipdir, modelIdentifier=model_description.coSimulation.modelIdentifier, instanceName='instance1') # initialize fmu.instantiate() fmu.setupExperiment(startTime=start_time) fmu.enterInitializationMode() fmu.exitInitializationMode() fmu.setReal([vrs["amesim_interface.Age_year"]], [age]) fmu.setReal([vrs["amesim_interface.BIS0"]], [95.6]) fmu.setReal([vrs["amesim_interface.BISmin"]], [8.9]) fmu.setReal([vrs["amesim_interface.Drug_concentration_mgmL"]], [20]) fmu.setReal([vrs["amesim_interface.EC50"]], [2.23]) fmu.setReal([vrs["amesim_interface.Gamma"]], [1.58]) fmu.setReal([vrs["amesim_interface.Gender_0male_1female"]], [gender]) fmu.setReal([vrs["amesim_interface.Height_cm"]], [height]) fmu.setReal([vrs["amesim_interface.Infusion_rate_mLh"]], [200]) fmu.setReal([vrs["amesim_interface.Weight_kg"]], [weight]) vr_input = vrs["amesim_interface.Infusion_rate_mLh"] vr_output = vrs["amesim_interface.BIS_Index"] rows = [] # list to record the results time = start_time infusion_rate = 200 i = 0 target = bis_target last_error = 0 # simulation loop impulsive_noise = random.randint(min_noise, max_noise) print("noise level:", impulsive_noise) while time < stop_time: if time >= 2.4e3 and time < 4.5e3: target = 60 p = 0 i = 0 if time >= 4.5e3: target = bis_target p = 0 i = 0 bis = fmu.getReal([int(vr_output)])[0] if time > step_size else 95.6 p = bis - target i = i + p d = p - last_error last_error = p infusion_rate = np.clip(kp*p + ki*i + kd*d, 0, 200) # add impulsive noise to infusion rate n = time // 100 start = 100 * n end = start + 50 if (time > start and time < end and n % 15 == 0): infusion_rate += impulsive_noise fmu.setReal([vr_input], [int(infusion_rate)]) # perform one step fmu.doStep(currentCommunicationPoint=time, communicationStepSize=step_size) # advance the time time += step_size # get the values for 'inputs' and 'outputs[4]' inputs, outputs = fmu.getReal([int(vr_input), int(vr_output)]) # append the results rows.append((time, bis, inputs)) fmu.terminate() fmu.freeInstance() shutil.rmtree(unzipdir, ignore_errors=True) result = np.array(rows, dtype=np.dtype([('time', np.float64), ('BIS', np.float64), ('Infusion', np.float64)])) return result, impulsive_noise def plot_result(result, show_original): df = pd.DataFrame(result) trace1 = go.Scatter(x=df.index, y=df['BIS'], mode='lines', name='BIS') fig1 = go.Figure(data=trace1) fig1.update_layout(height=400, width=1200, title_text="BIS evolution") # Add a line trace for column_2 in the second subplot trace2 = go.Scatter(x=df.index, y=df['Infusion'], mode='lines', name='Infusion rate') fig2 = go.Figure(data=trace2) fig2.update_layout(height=400, width=1200, title_text="Infusion rate evolution") if show_original: result_baseline = np.load("result_impulsive.npy") df_original = pd.DataFrame(result_baseline) fig1.add_trace(go.Scatter(x=df_original.index, y=df_original['BIS'], mode='lines', name='BIS original', line=dict(color="red"), opacity=0.5)) fig2.add_trace(go.Scatter(x=df_original.index, y=df_original['Infusion'], mode='lines', name='Infusion rate original', line=dict(color="red"), opacity=0.5)) else: np.save("result_impulsive.npy", result) return fig1, fig2 def gradio_display_profile(id): profile = getProfileFromID(id) gender = "Male" if profile[3] == 0 else "Female" data = {} data["age"] = [profile[0]] data["weight"] = [profile[1]] data["height"] = [profile[2]] data["gender"] = [gender] df = pd.DataFrame(data) return df def gradio_simulation(id, kp, ki, kd, show_original, bis_target, min_noise, max_noise): result, noise_level = simulation(id, kp, ki, kd, bis_target, min_noise, max_noise) fig1, fig2 = plot_result(result, show_original) return fig1, fig2, noise_level def gradio_save(id, kp, ki, kd, bis_target, min_noise, max_noise): result, noise_level = simulation(id, kp, ki, kd, bis_target, min_noise, max_noise) patient_profile = getProfileFromID(id) # Assuming patient_profile is a list of 4 integers, bis_trace is a list of 7000 floats, and kp, ki, kd are floats data = { 'inputs': { 'patient_profile': { 'age': patient_profile[0], 'weight': patient_profile[1], 'height': patient_profile[2], 'gender': patient_profile[3] }, 'bis_trace': result['BIS'].tolist(), 'noise_level': noise_level }, 'outputs': { 'kp': kp, 'ki': ki, 'kd': kd } } with open(f'saved_data/patient-{id}.json', 'w') as f: json.dump(data, f) return "Saved" import gradio as gr with gr.Blocks() as demo: with gr.Row(): with gr.Column(scale=1): gr.Markdown("# BIS Target") bis_target = gr.Slider(minimum=0, maximum=100, step=1, value=30, label="BIS target") gr.Markdown("# Impulsive noise range") min_noise = gr.Slider(minimum=0, maximum=50, step=1, value=50, label="noise min") max_noise = gr.Slider(minimum=0, maximum=150, step=1, value=150, label="noise max") gr.Markdown("# Patient profile") id = gr.Number(value=1, precision=0, label="Patient ID") profile_output = gr.Dataframe(value=gradio_display_profile(1), label="Patient profile") id.change(gradio_display_profile, inputs=[id], outputs=[profile_output]) # with gr.Blocks(): # with gr.Accordion("noise range"): # min_pul = gr.Slider(minimum=0, maximum=50, step=1, value=50, label="noise min") # max_pul = gr.Slider(minimum=0, maximum=150, step=1, value=150, label="noise max") gr.Markdown("# PID parameters") with gr.Blocks(): kp = gr.Slider(minimum=0, maximum=20, value=4, label="kp") ki = gr.Slider(minimum=0, maximum=1, value=0.01, label="ki") kd = gr.Slider(minimum=0, maximum=200, value=0, label="kd") button = gr.Button("Simulate") show_original = gr.Checkbox(label="Show original") gr.Markdown("# Save the best parameters") save_result = gr.Button("Save") save_output = gr.Textbox(label="Save status") with gr.Column(scale=5): plot1 = gr.Plot(label="BIS evolution") plot2 = gr.Plot(label="Infusion rate evolution") button.click(gradio_simulation, inputs=[id, kp, ki, kd, show_original, bis_target, min_noise, max_noise], outputs=[plot1, plot2]) save_result.click(gradio_save, inputs=[id, kp, ki, kd, bis_target, min_noise, max_noise], outputs=[save_output]) demo.launch()