""" curl -X GET http://localhost:7680/api/models curl -X POST http://127.0.0.1:7680/hf/v1/chat/completions \ -H "Content-Type: application/json" \ -d '{ "prompt": "你是什么模型?" }' """ import json import uuid # 用于生成随机用户 ID import requests from flask import Flask, request, Response app = Flask(__name__) # 全局字典用于保存用户的上下文对话 user_contexts = {} MAX_HISTORY_LENGTH = 15 # 最大上下文历史长度 def get_models(): models = { "object": "list", "data": [ {"id": "Qwen2.5-72B", "object": "model", "created": 0, "owned_by": "Qwen"}, {"id": "Llama-3.1-Nemotron-70B", "object": "model", "created": 0, "owned_by": "Nemotron"}, {"id": "NVLM-D-72B", "object": "model", "created": 0, "owned_by": "NVDIA"}, {"id": "DeepSeek-Coder-V2", "object": "model", "created": 0, "owned_by": "DeepSeek"}, {"id": "Qwen2.5-Coder-32B", "object": "model", "created": 0, "owned_by": "Qwen"}, ] } return json.dumps(models) def chat_completion( user_prompt, user_id: str = None, system_prompt="You are a helpful assistant.", model="Qwen2.5-72B", project="DecentralGPT", stream=False, temperature=0.3, max_tokens=1024, top_p=0.5, frequency_penalty=0, presence_penalty=0): """处理用户请求并保留上下文""" url = 'https://usa-chat.degpt.ai/api/v0/chat/completion/proxy' headers = { 'accept': 'application/json', 'accept-language': 'en-US,en;q=0.9,zh-CN;q=0.8,zh;q=0.7', 'content-type': 'application/json', 'dnt': '1', 'origin': 'https://www.degpt.ai', 'priority': 'u=1, i', 'referer': 'https://www.degpt.ai/', 'sec-ch-ua': 'Google Chrome";v="131", "Chromium";v="131", "Not_A Brand";v="24"', 'sec-ch-ua-mobile': '?0', 'sec-ch-ua-platform': '"macOS"', 'sec-fetch-dest': 'empty', 'sec-fetch-mode': 'cors', 'sec-fetch-site': 'same-site', 'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36' } # 初始化或更新用户的对话历史 if user_id is not None: if user_id not in user_contexts: user_contexts[user_id] = [{"role": "system", "content": system_prompt}] user_contexts[user_id].append({"role": "user", "content": user_prompt}) # 检查是否需要修剪历史记录,保留 `system` 提示词 while len(user_contexts[user_id]) > MAX_HISTORY_LENGTH: # 删除最早的用户问题和系统回复,但保留 `system` 提示词 if len(user_contexts[user_id]) > 2: # 检查删除的条目是否有匹配的系统回复,如果没有,只删除用户输入 if user_contexts[user_id][2]["role"] == "user": user_contexts[user_id] = [user_contexts[user_id][0]] + user_contexts[user_id][2:] else: user_contexts[user_id] = [user_contexts[user_id][0]] + user_contexts[user_id][2:] else: break messages = user_contexts[user_id] else: # 如果没有提供 user_id,不保留上下文 messages = [{"role": "system", "content": system_prompt}, {"role": "user", "content": user_prompt}] payload = { "model": model, "messages": messages, "project": project, "stream": stream, "temperature": temperature, "max_tokens": max_tokens, "top_p": top_p, "frequency_penalty": frequency_penalty, "presence_penalty": presence_penalty } try: response = requests.post(url, headers=headers, json=payload) response.encoding = 'utf-8' response.raise_for_status() ## print(response.text) # 获取响应并添加到上下文 response_content = response.json()["choices"][0]["message"]["content"] # print( # f"=========== {user_id}:{user_prompt} ====================\r\n请求内容:{messages}\r\n完整响应:{response.text}") # 将系统的回复添加到用户上下文中 if user_id is not None: user_contexts[user_id].append({"role": "assistant", "content": response_content}) return response.text except requests.exceptions.RequestException as e: print(f"请求失败: {e}") return "请求失败,请检查网络或参数配置。" except (KeyError, IndexError) as e: print(f"解析响应时出错: {e}") return "解析响应内容失败。" return {} @app.route('/api/models', methods=['GET']) @app.route('/api/v1/models', methods=['GET']) @app.route('/hf/v1/models', methods=['GET']) def models(): """返回可用模型列表""" return get_models() @app.route('/api/chat/completion', methods=['POST']) @app.route('/api/v1/chat/completions', methods=['POST']) @app.route('/hf/v1/chat/completions', methods=['POST']) def chat_completion_api(): """处理用户请求并保留上下文""" data = request.json user_prompt = data.get("prompt") user_id = data.get("user_id", str(uuid.uuid4())) # 如果未提供 user_id,生成随机值 response_content = chat_completion( user_prompt, user_id=user_id ) # maybe \uxxxx # return jsonify(response_content) ## maybe \"xxx\" # return Response( # json.dumps(response_content, ensure_ascii=False), # content_type="application/json; charset=utf-8" # ) # support Chinese if isinstance(response_content, str): # 如果已经是 JSON 字符串 return Response(response_content, content_type="application/json; charset=utf-8") if __name__ == '__main__': app.run(host='0.0.0.0', port=7860)