import argparse from ast import parse import datetime import json import os import time import hashlib import re import gradio as gr import requests import random from filelock import FileLock from io import BytesIO from PIL import Image, ImageDraw, ImageFont from constants import LOGDIR from utils import ( build_logger, server_error_msg, violates_moderation, moderation_msg, load_image_from_base64, get_log_filename, ) from conversation import Conversation logger = build_logger("gradio_web_server", "gradio_web_server.log") headers = {"User-Agent": "InternVL-Chat Client"} no_change_btn = gr.Button() enable_btn = gr.Button(interactive=True) disable_btn = gr.Button(interactive=False) def write2file(path, content): lock = FileLock(f"{path}.lock") with lock: with open(path, "a") as fout: fout.write(content) def sort_models(models): def custom_sort_key(model_name): # InternVL-Chat-V1-5 should be the first item if model_name == "InternVL-Chat-V1-5": return (1, model_name) # 1 indicates highest precedence elif model_name.startswith("InternVL-Chat-V1-5-"): return (1, model_name) # 1 indicates highest precedence else: return (0, model_name) # 0 indicates normal order models.sort(key=custom_sort_key, reverse=True) try: # We have five InternVL-Chat-V1-5 models, randomly choose one to be the first first_three = models[:4] random.shuffle(first_three) models[:4] = first_three except: pass return models def get_model_list(): logger.info(f"Call `get_model_list`") ret = requests.post(args.controller_url + "/refresh_all_workers") logger.info(f"status_code from `get_model_list`: {ret.status_code}") assert ret.status_code == 200 ret = requests.post(args.controller_url + "/list_models") logger.info(f"status_code from `list_models`: {ret.status_code}") models = ret.json()["models"] models = sort_models(models) logger.info(f"Models (from {args.controller_url}): {models}") return models get_window_url_params = """ function() { const params = new URLSearchParams(window.location.search); url_params = Object.fromEntries(params); console.log(url_params); return url_params; } """ def init_state(state=None): if state is not None: del state return Conversation() def load_demo(url_params, request: gr.Request = None): if not request: logger.info(f"load_demo. ip: {request.client.host}. params: {url_params}") dropdown_update = gr.Dropdown(visible=True) if "model" in url_params: model = url_params["model"] if model in models: dropdown_update = gr.Dropdown(value=model, visible=True) state = init_state() return state, dropdown_update def load_demo_refresh_model_list(request: gr.Request = None): if not request: logger.info(f"load_demo. ip: {request.client.host}") models = get_model_list() state = init_state() dropdown_update = gr.Dropdown( choices=models, value=models[0] if len(models) > 0 else "" ) return state, dropdown_update def vote_last_response(state, liked, model_selector, request: gr.Request): conv_data = { "tstamp": round(time.time(), 4), "like": liked, "model": model_selector, "state": state.dict(), "ip": request.client.host, } write2file(get_log_filename(), json.dumps(conv_data) + "\n") def upvote_last_response(state, model_selector, request: gr.Request): logger.info(f"upvote. ip: {request.client.host}") vote_last_response(state, True, model_selector, request) textbox = gr.MultimodalTextbox(value=None, interactive=True) return (textbox,) + (disable_btn,) * 3 def downvote_last_response(state, model_selector, request: gr.Request): logger.info(f"downvote. ip: {request.client.host}") vote_last_response(state, False, model_selector, request) textbox = gr.MultimodalTextbox(value=None, interactive=True) return (textbox,) + (disable_btn,) * 3 def vote_selected_response( state, model_selector, request: gr.Request, data: gr.LikeData ): logger.info( f"Vote: {data.liked}, index: {data.index}, value: {data.value} , ip: {request.client.host}" ) conv_data = { "tstamp": round(time.time(), 4), "like": data.liked, "index": data.index, "model": model_selector, "state": state.dict(), "ip": request.client.host, } write2file(get_log_filename(), json.dumps(conv_data) + "\n") return def flag_last_response(state, model_selector, request: gr.Request): logger.info(f"flag. ip: {request.client.host}") vote_last_response(state, "flag", model_selector, request) textbox = gr.MultimodalTextbox(value=None, interactive=True) return (textbox,) + (disable_btn,) * 3 def regenerate(state, image_process_mode, request: gr.Request): logger.info(f"regenerate. ip: {request.client.host}") # state.messages[-1][-1] = None state.update_message(Conversation.ASSISTANT, None, -1) prev_human_msg = state.messages[-2] if type(prev_human_msg[1]) in (tuple, list): prev_human_msg[1] = (*prev_human_msg[1][:2], image_process_mode) state.skip_next = False textbox = gr.MultimodalTextbox(value=None, interactive=True) return (state, state.to_gradio_chatbot(), textbox) + (disable_btn,) * 5 def clear_history(request: gr.Request): logger.info(f"clear_history. ip: {request.client.host}") state = init_state() textbox = gr.MultimodalTextbox(value=None, interactive=True) return (state, state.to_gradio_chatbot(), textbox) + (disable_btn,) * 5 def change_system_prompt(state, system_prompt, request: gr.Request): logger.info(f"Change system prompt. ip: {request.client.host}") state.set_system_message(system_prompt) return state def add_text(state, message, system_prompt, model_selector, request: gr.Request): print(f"state: {state}") if not state: state, model_selector = load_demo_refresh_model_list(request) images = message.get("files", []) text = message.get("text", "").strip() logger.info(f"add_text. ip: {request.client.host}. len: {len(text)}") # import pdb; pdb.set_trace() textbox = gr.MultimodalTextbox(value=None, interactive=False) if len(text) <= 0 and len(images) == 0: state.skip_next = True return (state, state.to_gradio_chatbot(), textbox) + (no_change_btn,) * 5 if args.moderate: flagged = violates_moderation(text) if flagged: state.skip_next = True textbox = gr.MultimodalTextbox( value={"text": moderation_msg}, interactive=True ) return (state, state.to_gradio_chatbot(), textbox) + (no_change_btn,) * 5 images = [Image.open(path).convert("RGB") for path in images] if len(images) > 0 and len(state.get_images(source=state.USER)) > 0: state = init_state(state) state.set_system_message(system_prompt) state.append_message(Conversation.USER, text, images) state.skip_next = False return (state, state.to_gradio_chatbot(), textbox, model_selector) + ( disable_btn, ) * 5 def http_bot( state, model_selector, temperature, top_p, repetition_penalty, max_new_tokens, max_input_tiles, # bbox_threshold, # mask_threshold, request: gr.Request, ): logger.info(f"http_bot. ip: {request.client.host}") start_tstamp = time.time() model_name = model_selector if hasattr(state, "skip_next") and state.skip_next: # This generate call is skipped due to invalid inputs yield ( state, state.to_gradio_chatbot(), gr.MultimodalTextbox(interactive=False), ) + (no_change_btn,) * 5 return # Query worker address controller_url = args.controller_url ret = requests.post( controller_url + "/get_worker_address", json={"model": model_name} ) worker_addr = ret.json()["address"] logger.info(f"model_name: {model_name}, worker_addr: {worker_addr}") # No available worker if worker_addr == "": # state.messages[-1][-1] = server_error_msg state.update_message(Conversation.ASSISTANT, server_error_msg) yield ( state, state.to_gradio_chatbot(), gr.MultimodalTextbox(interactive=False), disable_btn, disable_btn, disable_btn, enable_btn, enable_btn, ) return all_images = state.get_images(source=state.USER) all_image_paths = [state.save_image(image) for image in all_images] # Make requests pload = { "model": model_name, "prompt": state.get_prompt(), "temperature": float(temperature), "top_p": float(top_p), "max_new_tokens": max_new_tokens, "max_input_tiles": max_input_tiles, # "bbox_threshold": bbox_threshold, # "mask_threshold": mask_threshold, "repetition_penalty": repetition_penalty, "images": f"List of {len(all_images)} images: {all_image_paths}", } logger.info(f"==== request ====\n{pload}") pload.pop("images") pload["prompt"] = state.get_prompt(inlude_image=True) state.append_message(Conversation.ASSISTANT, state.streaming_placeholder) yield ( state, state.to_gradio_chatbot(), gr.MultimodalTextbox(interactive=False), ) + (disable_btn,) * 5 try: # Stream output response = requests.post( worker_addr + "/worker_generate_stream", headers=headers, json=pload, stream=True, timeout=20, ) for chunk in response.iter_lines(decode_unicode=False, delimiter=b"\0"): if chunk: data = json.loads(chunk.decode()) if data["error_code"] == 0: if "text" in data: output = data["text"].strip() output += state.streaming_placeholder image = None if "image" in data: image = load_image_from_base64(data["image"]) _ = state.save_image(image) state.update_message(Conversation.ASSISTANT, output, image) yield ( state, state.to_gradio_chatbot(), gr.MultimodalTextbox(interactive=False), ) + (disable_btn,) * 5 else: output = ( f"**{data['text']}**" + f" (error_code: {data['error_code']})" ) state.update_message(Conversation.ASSISTANT, output, None) yield ( state, state.to_gradio_chatbot(), gr.MultimodalTextbox(interactive=True), ) + ( disable_btn, disable_btn, disable_btn, enable_btn, enable_btn, ) return except requests.exceptions.RequestException as e: state.update_message(Conversation.ASSISTANT, server_error_msg, None) yield ( state, state.to_gradio_chatbot(), gr.MultimodalTextbox(interactive=True), ) + ( disable_btn, disable_btn, disable_btn, enable_btn, enable_btn, ) return ai_response = state.return_last_message() state.end_of_current_turn() yield ( state, state.to_gradio_chatbot(), gr.MultimodalTextbox(interactive=True), ) + (enable_btn,) * 5 finish_tstamp = time.time() logger.info(f"{output}") data = { "tstamp": round(finish_tstamp, 4), "like": None, "model": model_name, "start": round(start_tstamp, 4), "finish": round(start_tstamp, 4), "state": state.dict(), "images": all_image_paths, "ip": request.client.host, } write2file(get_log_filename(), json.dumps(data) + "\n") title_html = """