# --------------------------------------------------------
# InternVL
# Copyright (c) 2024 OpenGVLab
# Licensed under The MIT License [see LICENSE for details]
# --------------------------------------------------------
import argparse
import base64
import datetime
import hashlib
import json
import os
import random
import re
import sys
# from streamlit_js_eval import streamlit_js_eval
from functools import partial
from io import BytesIO
import cv2
import numpy as np
import requests
import streamlit as st
from constants import LOGDIR, server_error_msg
from library import Library
from PIL import Image, ImageDraw, ImageFont
from streamlit_image_select import image_select
custom_args = sys.argv[1:]
parser = argparse.ArgumentParser()
parser.add_argument('--controller_url', type=str, default='http://10.140.60.209:10075', help='url of the controller')
parser.add_argument('--sd_worker_url', type=str, default='http://0.0.0.0:40006', help='url of the stable diffusion worker')
parser.add_argument('--max_image_limit', type=int, default=4, help='maximum number of images')
args = parser.parse_args(custom_args)
controller_url = args.controller_url
sd_worker_url = args.sd_worker_url
max_image_limit = args.max_image_limit
print('args:', args)
def get_model_list():
ret = requests.post(controller_url + '/refresh_all_workers')
assert ret.status_code == 200
ret = requests.post(controller_url + '/list_models')
models = ret.json()['models']
return models
def load_upload_file_and_show():
if uploaded_files is not None:
images = []
for file in uploaded_files:
file_bytes = np.asarray(bytearray(file.read()), dtype=np.uint8)
img = cv2.imdecode(file_bytes, cv2.IMREAD_COLOR)
img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
img = Image.fromarray(img)
images.append(img)
with upload_image_preview.container():
Library(images)
image_hashes = [hashlib.md5(image.tobytes()).hexdigest() for image in images]
for image, hash in zip(images, image_hashes):
t = datetime.datetime.now()
filename = os.path.join(LOGDIR, 'serve_images', f'{t.year}-{t.month:02d}-{t.day:02d}', f'{hash}.jpg')
if not os.path.isfile(filename):
os.makedirs(os.path.dirname(filename), exist_ok=True)
image.save(filename)
return images
def get_selected_worker_ip():
ret = requests.post(controller_url + '/get_worker_address',
json={'model': selected_model})
worker_addr = ret.json()['address']
return worker_addr
def generate_response(messages):
send_messages = [{'role': 'system', 'content': persona_rec}]
for message in messages:
if message['role'] == 'user':
user_message = {'role': 'user', 'content': message['content']}
if 'image' in message and len('image') > 0:
user_message['image'] = []
for image in message['image']:
user_message['image'].append(pil_image_to_base64(image))
send_messages.append(user_message)
else:
send_messages.append({'role': 'assistant', 'content': message['content']})
pload = {
'model': selected_model,
'prompt': send_messages,
'temperature': float(temperature),
'top_p': float(top_p),
'max_new_tokens': max_length,
'max_input_tiles': max_input_tiles,
'repetition_penalty': float(repetition_penalty),
}
worker_addr = get_selected_worker_ip()
headers = {'User-Agent': 'InternVL-Chat Client'}
placeholder, output = st.empty(), ''
try:
response = requests.post(worker_addr + '/worker_generate_stream',
headers=headers, json=pload, stream=True, timeout=10)
for chunk in response.iter_lines(decode_unicode=False, delimiter=b'\0'):
if chunk:
data = json.loads(chunk.decode())
if data['error_code'] == 0:
output = data['text']
# Phi3-3.8B will produce abnormal `�` output
if '4B' in selected_model and '�' in output[-2:]:
output = output.replace('�', '')
break
placeholder.markdown(output + '▌')
else:
output = data['text'] + f" (error_code: {data['error_code']})"
placeholder.markdown(output)
placeholder.markdown(output)
except requests.exceptions.RequestException as e:
placeholder.markdown(server_error_msg)
return output
def pil_image_to_base64(image):
buffered = BytesIO()
image.save(buffered, format='PNG')
return base64.b64encode(buffered.getvalue()).decode('utf-8')
def clear_chat_history():
st.session_state.messages = []
st.session_state['image_select'] = -1
def clear_file_uploader():
st.session_state.uploader_key += 1
st.rerun()
def combined_func(func_list):
for func in func_list:
func()
def show_one_or_multiple_images(message, total_image_num, is_input=True):
if 'image' in message:
if is_input:
total_image_num = total_image_num + len(message['image'])
if lan == 'English':
if len(message['image']) == 1 and total_image_num == 1:
label = f"(In this conversation, {len(message['image'])} image was uploaded, {total_image_num} image in total)"
elif len(message['image']) == 1 and total_image_num > 1:
label = f"(In this conversation, {len(message['image'])} image was uploaded, {total_image_num} images in total)"
else:
label = f"(In this conversation, {len(message['image'])} images were uploaded, {total_image_num} images in total)"
else:
label = f"(在本次对话中,上传了{len(message['image'])}张图片,总共上传了{total_image_num}张图片)"
upload_image_preview = st.empty()
with upload_image_preview.container():
Library(message['image'])
if is_input and len(message['image']) > 0:
st.markdown(label)
def find_bounding_boxes(response):
pattern = re.compile(r'\s*(.*?)\s*\s*","Image-2:
","Image-3:
" 等等。
2. 用户提问关联:用户的提问可能会具体指向某一张编号的图像,请仔细辨别用户问题中提到的图像编号。
请尽可能详细地回答用户的问题。"""
# Replicate Credentials
with st.sidebar:
model_list = get_model_list()
# "[![Open in GitHub](https://github.com/codespaces/badge.svg)](https://github.com/OpenGVLab/InternVL)"
lan = st.selectbox('#### Language / 语言', ['English', '中文'], on_change=st.rerun)
if lan == 'English':
st.logo(logo_code, link='https://github.com/OpenGVLab/InternVL', icon_image=logo_code)
st.subheader('Models and parameters')
selected_model = st.sidebar.selectbox('Choose a InternVL2 chat model', model_list, key='selected_model', on_change=clear_chat_history)
with st.expander('🤖 System Prompt'):
persona_rec = st.text_area('System Prompt', value=system_message,
help='System prompt is a pre-defined message used to instruct the assistant at the beginning of a conversation.',
height=200)
with st.expander('🔥 Advanced Options'):
temperature = st.slider('temperature', min_value=0.0, max_value=1.0, value=0.8, step=0.1)
top_p = st.slider('top_p', min_value=0.0, max_value=1.0, value=0.7, step=0.1)
repetition_penalty = st.slider('repetition_penalty', min_value=1.0, max_value=1.5, value=1.1, step=0.02)
max_length = st.slider('max_length', min_value=0, max_value=4096, value=2048, step=128)
max_input_tiles = st.slider('max_input_tiles (control image resolution)', min_value=1, max_value=24, value=12, step=1)
upload_image_preview = st.empty()
uploaded_files = st.file_uploader('Upload files', accept_multiple_files=True,
type=['png', 'jpg', 'jpeg', 'webp'],
help='You can upload multiple images (max to 4) or a single video.',
key=f'uploader_{st.session_state.uploader_key}',
on_change=st.rerun)
uploaded_pil_images = load_upload_file_and_show()
else:
st.subheader('模型和参数')
selected_model = st.sidebar.selectbox('选择一个 InternVL2 对话模型', model_list, key='selected_model', on_change=clear_chat_history)
with st.expander('🤖 系统提示'):
persona_rec = st.text_area('系统提示', value=system_message,
help='系统提示是在对话开始时用于指示助手的预定义消息。',
height=200)
with st.expander('🔥 高级选项'):
temperature = st.slider('temperature', min_value=0.0, max_value=1.0, value=0.8, step=0.1)
top_p = st.slider('top_p', min_value=0.0, max_value=1.0, value=0.7, step=0.1)
repetition_penalty = st.slider('重复惩罚', min_value=1.0, max_value=1.5, value=1.1, step=0.02)
max_length = st.slider('最大输出长度', min_value=0, max_value=4096, value=2048, step=128)
max_input_tiles = st.slider('最大图像块数 (控制图像分辨率)', min_value=1, max_value=24, value=12, step=1)
upload_image_preview = st.empty()
uploaded_files = st.file_uploader('上传文件', accept_multiple_files=True,
type=['png', 'jpg', 'jpeg', 'webp'],
help='你可以上传多张图像(最多4张)或者一个视频。',
key=f'uploader_{st.session_state.uploader_key}',
on_change=st.rerun)
uploaded_pil_images = load_upload_file_and_show()
gradient_text_html = """