File size: 5,010 Bytes
6fcd376 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 |
import jupyter_client
import re
def delete_color_control_char(string):
ansi_escape = re.compile(r'(\x9B|\x1B\[)[0-?]*[ -\/]*[@-~]')
return ansi_escape.sub('', string)
class JupyterKernel:
def __init__(self, work_dir):
self.kernel_manager, self.kernel_client = jupyter_client.manager.start_new_kernel(kernel_name='python3')
self.work_dir = work_dir
self.interrupt_signal = False
self._create_work_dir()
self.available_functions = {
'execute_code': self.execute_code,
'python': self.execute_code
}
def execute_code_(self, code):
msg_id = self.kernel_client.execute(code)
# Get the output of the code
msg_list = []
while True:
try:
iopub_msg = self.kernel_client.get_iopub_msg(timeout=1)
msg_list.append(iopub_msg)
if iopub_msg['msg_type'] == 'status' and iopub_msg['content'].get('execution_state') == 'idle':
break
except:
if self.interrupt_signal:
self.kernel_manager.interrupt_kernel()
self.interrupt_signal = False
continue
all_output = []
for iopub_msg in msg_list:
if iopub_msg['msg_type'] == 'stream':
if iopub_msg['content'].get('name') == 'stdout':
output = iopub_msg['content']['text']
all_output.append(('stdout', output))
elif iopub_msg['msg_type'] == 'execute_result':
if 'data' in iopub_msg['content']:
if 'text/plain' in iopub_msg['content']['data']:
output = iopub_msg['content']['data']['text/plain']
all_output.append(('execute_result_text', output))
if 'text/html' in iopub_msg['content']['data']:
output = iopub_msg['content']['data']['text/html']
all_output.append(('execute_result_html', output))
if 'image/png' in iopub_msg['content']['data']:
output = iopub_msg['content']['data']['image/png']
all_output.append(('execute_result_png', output))
if 'image/jpeg' in iopub_msg['content']['data']:
output = iopub_msg['content']['data']['image/jpeg']
all_output.append(('execute_result_jpeg', output))
elif iopub_msg['msg_type'] == 'display_data':
if 'data' in iopub_msg['content']:
if 'text/plain' in iopub_msg['content']['data']:
output = iopub_msg['content']['data']['text/plain']
all_output.append(('display_text', output))
if 'text/html' in iopub_msg['content']['data']:
output = iopub_msg['content']['data']['text/html']
all_output.append(('display_html', output))
if 'image/png' in iopub_msg['content']['data']:
output = iopub_msg['content']['data']['image/png']
all_output.append(('display_png', output))
if 'image/jpeg' in iopub_msg['content']['data']:
output = iopub_msg['content']['data']['image/jpeg']
all_output.append(('display_jpeg', output))
elif iopub_msg['msg_type'] == 'error':
if 'traceback' in iopub_msg['content']:
output = '\n'.join(iopub_msg['content']['traceback'])
all_output.append(('error', output))
return all_output
def execute_code(self, code):
text_to_gpt = []
content_to_display = self.execute_code_(code)
for mark, out_str in content_to_display:
if mark in ('stdout', 'execute_result_text', 'display_text'):
text_to_gpt.append(out_str)
elif mark in ('execute_result_png', 'execute_result_jpeg', 'display_png', 'display_jpeg'):
text_to_gpt.append('[image]')
elif mark == 'error':
text_to_gpt.append(delete_color_control_char(out_str))
return '\n'.join(text_to_gpt), content_to_display
def _create_work_dir(self):
# set work dir in jupyter environment
init_code = f"import os\n" \
f"if not os.path.exists('{self.work_dir}'):\n" \
f" os.mkdir('{self.work_dir}')\n" \
f"os.chdir('{self.work_dir}')\n" \
f"del os"
self.execute_code_(init_code)
def send_interrupt_signal(self):
self.interrupt_signal = True
def restart_jupyter_kernel(self):
self.kernel_client.shutdown()
self.kernel_manager, self.kernel_client = jupyter_client.manager.start_new_kernel(kernel_name='python3')
self.interrupt_signal = False
self._create_work_dir()
|