|
import jupyter_client |
|
import re |
|
|
|
|
|
def delete_color_control_char(string): |
|
ansi_escape = re.compile(r'(\x9B|\x1B\[)[0-?]*[ -\/]*[@-~]') |
|
return ansi_escape.sub('', string) |
|
|
|
|
|
class JupyterKernel: |
|
def __init__(self, work_dir): |
|
self.kernel_manager, self.kernel_client = jupyter_client.manager.start_new_kernel(kernel_name='python3') |
|
self.work_dir = work_dir |
|
self.interrupt_signal = False |
|
self._create_work_dir() |
|
self.available_functions = { |
|
'execute_code': self.execute_code, |
|
'python': self.execute_code |
|
} |
|
|
|
def execute_code_(self, code): |
|
msg_id = self.kernel_client.execute(code) |
|
|
|
|
|
msg_list = [] |
|
while True: |
|
try: |
|
iopub_msg = self.kernel_client.get_iopub_msg(timeout=1) |
|
msg_list.append(iopub_msg) |
|
if iopub_msg['msg_type'] == 'status' and iopub_msg['content'].get('execution_state') == 'idle': |
|
break |
|
except: |
|
if self.interrupt_signal: |
|
self.kernel_manager.interrupt_kernel() |
|
self.interrupt_signal = False |
|
continue |
|
|
|
all_output = [] |
|
for iopub_msg in msg_list: |
|
if iopub_msg['msg_type'] == 'stream': |
|
if iopub_msg['content'].get('name') == 'stdout': |
|
output = iopub_msg['content']['text'] |
|
all_output.append(('stdout', output)) |
|
elif iopub_msg['msg_type'] == 'execute_result': |
|
if 'data' in iopub_msg['content']: |
|
if 'text/plain' in iopub_msg['content']['data']: |
|
output = iopub_msg['content']['data']['text/plain'] |
|
all_output.append(('execute_result_text', output)) |
|
if 'text/html' in iopub_msg['content']['data']: |
|
output = iopub_msg['content']['data']['text/html'] |
|
all_output.append(('execute_result_html', output)) |
|
if 'image/png' in iopub_msg['content']['data']: |
|
output = iopub_msg['content']['data']['image/png'] |
|
all_output.append(('execute_result_png', output)) |
|
if 'image/jpeg' in iopub_msg['content']['data']: |
|
output = iopub_msg['content']['data']['image/jpeg'] |
|
all_output.append(('execute_result_jpeg', output)) |
|
elif iopub_msg['msg_type'] == 'display_data': |
|
if 'data' in iopub_msg['content']: |
|
if 'text/plain' in iopub_msg['content']['data']: |
|
output = iopub_msg['content']['data']['text/plain'] |
|
all_output.append(('display_text', output)) |
|
if 'text/html' in iopub_msg['content']['data']: |
|
output = iopub_msg['content']['data']['text/html'] |
|
all_output.append(('display_html', output)) |
|
if 'image/png' in iopub_msg['content']['data']: |
|
output = iopub_msg['content']['data']['image/png'] |
|
all_output.append(('display_png', output)) |
|
if 'image/jpeg' in iopub_msg['content']['data']: |
|
output = iopub_msg['content']['data']['image/jpeg'] |
|
all_output.append(('display_jpeg', output)) |
|
elif iopub_msg['msg_type'] == 'error': |
|
if 'traceback' in iopub_msg['content']: |
|
output = '\n'.join(iopub_msg['content']['traceback']) |
|
all_output.append(('error', output)) |
|
|
|
return all_output |
|
|
|
def execute_code(self, code): |
|
text_to_gpt = [] |
|
content_to_display = self.execute_code_(code) |
|
for mark, out_str in content_to_display: |
|
if mark in ('stdout', 'execute_result_text', 'display_text'): |
|
text_to_gpt.append(out_str) |
|
elif mark in ('execute_result_png', 'execute_result_jpeg', 'display_png', 'display_jpeg'): |
|
text_to_gpt.append('[image]') |
|
elif mark == 'error': |
|
text_to_gpt.append(delete_color_control_char(out_str)) |
|
|
|
return '\n'.join(text_to_gpt), content_to_display |
|
|
|
def _create_work_dir(self): |
|
|
|
init_code = f"import os\n" \ |
|
f"if not os.path.exists('{self.work_dir}'):\n" \ |
|
f" os.mkdir('{self.work_dir}')\n" \ |
|
f"os.chdir('{self.work_dir}')\n" \ |
|
f"del os" |
|
self.execute_code_(init_code) |
|
|
|
def send_interrupt_signal(self): |
|
self.interrupt_signal = True |
|
|
|
def restart_jupyter_kernel(self): |
|
self.kernel_client.shutdown() |
|
self.kernel_manager, self.kernel_client = jupyter_client.manager.start_new_kernel(kernel_name='python3') |
|
self.interrupt_signal = False |
|
self._create_work_dir() |
|
|