|
from jupyter_client import KernelManager |
|
import threading |
|
import re |
|
from utils.const import * |
|
|
|
|
|
class JupyterNotebook: |
|
def __init__(self): |
|
self.km = KernelManager() |
|
self.km.start_kernel() |
|
self.kc = self.km.client() |
|
_ = self.add_and_run(TOOLS_CODE) |
|
|
|
def clean_output(self, outputs): |
|
outputs_only_str = list() |
|
for i in outputs: |
|
if type(i) == dict: |
|
if "text/plain" in list(i.keys()): |
|
outputs_only_str.append(i["text/plain"]) |
|
elif type(i) == str: |
|
outputs_only_str.append(i) |
|
elif type(i) == list: |
|
error_msg = "\n".join(i) |
|
error_msg = re.sub(r"\x1b\[.*?m", "", error_msg) |
|
outputs_only_str.append(error_msg) |
|
|
|
return "\n".join(outputs_only_str).strip() |
|
|
|
def add_and_run(self, code_string): |
|
|
|
def run_code_in_thread(): |
|
nonlocal outputs, error_flag |
|
|
|
|
|
msg_id = self.kc.execute(code_string) |
|
|
|
while True: |
|
try: |
|
msg = self.kc.get_iopub_msg(timeout=20) |
|
|
|
msg_type = msg["header"]["msg_type"] |
|
content = msg["content"] |
|
|
|
if msg_type == "execute_result": |
|
outputs.append(content["data"]) |
|
elif msg_type == "stream": |
|
outputs.append(content["text"]) |
|
elif msg_type == "error": |
|
error_flag = True |
|
outputs.append(content["traceback"]) |
|
|
|
|
|
if msg_type == "status" and content["execution_state"] == "idle": |
|
break |
|
except: |
|
break |
|
|
|
outputs = [] |
|
error_flag = False |
|
|
|
|
|
thread = threading.Thread(target=run_code_in_thread) |
|
thread.start() |
|
|
|
|
|
thread.join(timeout=20) |
|
|
|
|
|
if thread.is_alive(): |
|
outputs = ["Execution timed out."] |
|
|
|
error_flag = "Timeout" |
|
|
|
return self.clean_output(outputs), error_flag |
|
|
|
def close(self): |
|
"""Shutdown the kernel.""" |
|
self.km.shutdown_kernel() |
|
|
|
def __deepcopy__(self, memo): |
|
if id(self) in memo: |
|
return memo[id(self)] |
|
new_copy = type(self)() |
|
memo[id(self)] = new_copy |
|
return new_copy |
|
|