import os import sys import json import math import string import random import argparse import logging from typing import List, Tuple, Optional, AsyncGenerator import aiohttp import uvicorn import requests import gradio as gr from fastapi import FastAPI, Request from starlette.responses import HTMLResponse # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s', handlers=[ logging.StreamHandler(sys.stdout) ] ) logger = logging.getLogger(__name__) # Environment Variables API_BASE = os.getenv("API_BASE", "env") API_KEY = os.getenv("API_KEY") OAI_API_KEY = os.getenv("OPENAI_API_KEY") BASE_URL = os.getenv("OPENAI_BASE_URL", "https://api.openai.com/v1") DEF_MODELS = [ "chatgpt-4o-latest", "gpt-4-0125-preview", "gpt-4-0613", "gpt-4-1106-preview", "gpt-4-turbo-2024-04-09", "gpt-4-turbo-preview", "gpt-4-turbo", "gpt-4", "gpt-4o-2024-05-13", "gpt-4o-2024-08-06", "gpt-4o-2024-11-20", "gpt-4o-mini-2024-07-18", "gpt-4o-mini", "gpt-4o" ] models = [] model_list = {} # Exception for API Key handling class APIKeyError(Exception): pass def get_api_key(call: str = 'api_key') -> str: key = API_KEY if call == 'api_key' else (OAI_API_KEY if call == 'oai_api_key' else API_KEY) if ',' in key: selected_key = random.choice(key.split(',')) logger.debug(f"Selected API key: {selected_key}") return selected_key return key def encode_chat(messages: List[dict]) -> str: encoded = "\n".join( f"<|im_start|>{msg['role']}{f' [{msg['name']}]' if 'name' in msg else ''}\n{msg['content']}<|end_of_text|>" for msg in messages ) logger.debug(f"Encoded chat: {encoded}") return encoded def check_models(): global BASE_URL, API_KEY if API_BASE == "env": try: response = requests.get( f"{BASE_URL}/models", headers={"Authorization": f"Bearer {get_api_key()}"} ) response.raise_for_status() data = response.json() if 'data' not in data: logger.warning("No 'data' in response. Falling back to default BASE_URL and API_KEY.") BASE_URL = "https://api.openai.com/v1" API_KEY = OAI_API_KEY else: logger.info("Successfully fetched models from API_BASE.") except requests.RequestException as e: logger.error(f"Error testing API endpoint: {e}. Falling back to default BASE_URL and API_KEY.") BASE_URL = "https://api.openai.com/v1" API_KEY = OAI_API_KEY else: BASE_URL = "https://api.openai.com/v1" API_KEY = OAI_API_KEY logger.info("Using default BASE_URL and OAI_API_KEY.") def load_models(): global models, model_list models = sorted(DEF_MODELS) model_list = { "object": "list", "data": [{"id": model_id, "object": "model", "created": 0, "owned_by": "system"} for model_id in models] } logger.info(f"Loaded models: {models}") def handle_api_keys(): global API_KEY valid_keys = [] keys = API_KEY.split(',') if ',' in API_KEY else [API_KEY] for key in keys: try: response = requests.get( f"{BASE_URL}/models", headers={"Authorization": f"Bearer {key.strip()}"} ) response.raise_for_status() if 'data' in response.json(): valid_keys.append(key.strip()) logger.debug(f"Valid API key: {key.strip()}") else: logger.warning(f"API key {key.strip()} is invalid.") except requests.RequestException as e: logger.error(f"API key {key.strip()} is not valid or an error occurred: {e}") if not valid_keys: raise APIKeyError("No valid API keys are available.") API_KEY = ",".join(valid_keys) logger.info(f"Using API keys: {API_KEY}") def moderate(messages: List[dict]) -> Optional[dict]: try: response = requests.post( f"{BASE_URL}/moderations", headers={ "Content-Type": "application/json", "Authorization": f"Bearer {get_api_key('api_key')}" }, json={"input": encode_chat(messages)} ) response.raise_for_status() moderation_result = response.json() logger.debug(f"Moderation result: {moderation_result}") except requests.RequestException as e: logger.error(f"Moderation request failed: {e}. Trying fallback URL.") try: response = requests.post( "https://api.openai.com/v1/moderations", headers={ "Content-Type": "application/json", "Authorization": f"Bearer {get_api_key('oai_api_key')}" }, json={"input": encode_chat(messages)} ) response.raise_for_status() moderation_result = response.json() logger.debug(f"Moderation result from fallback: {moderation_result}") except requests.RequestException as ex: logger.error(f"Fallback moderation request failed: {ex}") return None try: if isinstance(moderation_result, list): flagged = any(result.get("flagged", False) for result in moderation_result) else: flagged = moderation_result.get("flagged", False) if flagged: logger.info("Content flagged by moderation.") return moderation_result except KeyError as e: logger.error(f"Key error during moderation processing: {e}") return None return None async def stream_chat(params: dict): async with aiohttp.ClientSession() as session: for attempt, url in enumerate([f"{BASE_URL}/chat/completions", "https://api.openai.com/v1/chat/completions"], start=1): try: async with session.post( url, headers={ "Authorization": f"Bearer {get_api_key('api_key')}", "Content-Type": "application/json" }, json=params ) as resp: resp.raise_for_status() async for line in resp.content: if line: line_str = line.decode('utf-8').strip() if line_str.startswith("data: "): line_str = line_str[6:] if line_str == "[DONE]": break try: message = json.loads(line_str) yield message except json.JSONDecodeError: logger.warning("Failed to decode JSON from line.") continue break # Successful request, exit the loop except aiohttp.ClientError as e: logger.error(f"Stream chat request failed on attempt {attempt}: {e}") if attempt == 2: return def rnd(length: int = 8) -> str: result = ''.join(random.choices(string.ascii_letters + string.digits, k=length)) logger.debug(f"Generated random string: {result}") return result async def respond( message: str, history: List[Tuple[str, str]], model_name: str, max_tokens: int, temperature: float, top_p: float, ) -> AsyncGenerator[str, None]: messages = [] for user_msg, assistant_msg in history: if user_msg: messages.append({"role": "user", "content": user_msg}) if assistant_msg: messages.append({"role": "assistant", "content": assistant_msg}) if message: messages.append({"role": "user", "content": message}) moderation = moderate(messages) if moderation: reasons = [] categories = moderation[0].get('categories', {}) if isinstance(moderation, list) else moderation.get('categories', {}) for category, flagged in categories.items(): if flagged: reasons.append(category) if reasons: response = "[MODERATION] I'm sorry, but I can't assist with that.\n\nReasons:\n```\n" + "\n".join(f"{i+1}. {reason}" for i, reason in enumerate(reasons)) + "\n```" else: response = "[MODERATION] I'm sorry, but I can't assist with that." logger.info("Message flagged by moderation.") yield response return params = { "model": model_name, "messages": messages, "max_tokens": max_tokens, "temperature": temperature, "top_p": top_p, "user": rnd(), "stream": True } response_text = "" async for token in stream_chat(params): if token and 'choices' in token: delta = token['choices'][0].get('delta', {}) content = delta.get("content", delta.get("refusal", "")) response_text += content yield response_text def create_gradio_interface() -> gr.ChatInterface: return gr.ChatInterface( respond, title="gpt-4o-mini", description="The chat is back online for a not-so-long time.", additional_inputs=[ gr.Dropdown(choices=models, value="gpt-4o-mini", label="Model"), gr.Slider(minimum=1, maximum=4096, value=4096, step=1, label="Max new tokens"), gr.Slider(minimum=0.1, maximum=2.0, value=0.7, step=0.05, label="Temperature"), gr.Slider(minimum=0.05, maximum=1.0, value=0.95, step=0.05, label="Top-p (nucleus sampling)"), ], css="footer{display:none !important}", head="""""" ) def create_fastapi_app() -> FastAPI: app = FastAPI() @app.get("/declined") def declined(): return HTMLResponse(content=""" Declined

Ok, you can go back to Hugging Face. I just didn't have any idea how to handle decline so you are redirected here.


Go back """) gradio_app = create_gradio_interface() app = gr.mount_gradio_app(app, gradio_app, path="/") return app class ArgParser(argparse.ArgumentParser): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.add_argument("-s", "--server", type=str, default="0.0.0.0", help="Server host.") self.add_argument("-p", "--port", type=int, default=7860, help="Server port.") self.add_argument("-d", "--dev", action="store_true", help="Run in development mode.") self.args = self.parse_args(sys.argv[1:]) def main(): try: handle_api_keys() load_models() check_models() except APIKeyError as e: logger.critical(e) sys.exit(1) app = create_fastapi_app() args = ArgParser().args uvicorn.run( "main:app", host=args.server, port=args.port, reload=args.dev ) if __name__ == "__main__": main()