import os from dotenv import load_dotenv load_dotenv() # 加载.env文件中的环境变量 import json import uuid import time import base64 import sys import inspect from loguru import logger import requests from flask import Flask, request, Response, jsonify, stream_with_context from curl_cffi import requests as curl_requests from werkzeug.middleware.proxy_fix import ProxyFix class Logger: def __init__(self, level="INFO", colorize=True, format=None): logger.remove() if format is None: format = ( "{time:YYYY-MM-DD HH:mm:ss} | " "{level: <8} | " "{extra[filename]}:{extra[function]}:{extra[lineno]} | " "{message}" ) logger.add( sys.stderr, level=level, format=format, colorize=colorize, backtrace=True, diagnose=True ) self.logger = logger def _get_caller_info(self): frame = inspect.currentframe() try: caller_frame = frame.f_back.f_back full_path = caller_frame.f_code.co_filename function = caller_frame.f_code.co_name lineno = caller_frame.f_lineno filename = os.path.basename(full_path) return { 'filename': filename, 'function': function, 'lineno': lineno } finally: del frame def info(self, message, source="API"): caller_info = self._get_caller_info() self.logger.bind(**caller_info).info(f"[{source}] {message}") def error(self, message, source="API"): caller_info = self._get_caller_info() if isinstance(message, Exception): self.logger.bind(**caller_info).exception(f"[{source}] {str(message)}") else: self.logger.bind(**caller_info).error(f"[{source}] {message}") def warning(self, message, source="API"): caller_info = self._get_caller_info() self.logger.bind(**caller_info).warning(f"[{source}] {message}") def debug(self, message, source="API"): caller_info = self._get_caller_info() self.logger.bind(**caller_info).debug(f"[{source}] {message}") async def request_logger(self, request): caller_info = self._get_caller_info() self.logger.bind(**caller_info).info(f"请求: {request.method} {request.path}", "Request") logger = Logger(level="INFO") CONFIG = { "MODELS": { 'grok-2': 'grok-latest', 'grok-2-imageGen': 'grok-latest', 'grok-2-search': 'grok-latest', "grok-3": "grok-3", "grok-3-search": "grok-3", "grok-3-imageGen": "grok-3", "grok-3-deepsearch": "grok-3", "grok-3-reasoning": "grok-3" }, "API": { "IS_CUSTOM_SSO": os.environ.get("IS_CUSTOM_SSO", "false").lower() == "true", "BASE_URL": "https://grok.com", "API_KEY": os.environ.get("API_KEY", "sk-123456"), "SIGNATURE_COOKIE": None, "PICGO_KEY": os.environ.get("PICGO_KEY") or None, "TUMY_KEY": os.environ.get("TUMY_KEY") or None, "RETRY_TIME": 1000, "PROXY": os.environ.get("PROXY") or None, "IS_TEMP_CONVERSATION": os.environ.get("IS_TEMP_CONVERSATION", "false").lower() == "true" }, "SERVER": { "PORT": int(os.environ.get("PORT", 7860)) # 修改默认端口为7860 }, "RETRY": { "MAX_ATTEMPTS": 2 }, "SHOW_THINKING": os.environ.get("SHOW_THINKING", "true").lower() == "true", "IS_THINKING": False, "IS_IMG_GEN": False, "IS_IMG_GEN2": False, "ISSHOW_SEARCH_RESULTS": os.environ.get("ISSHOW_SEARCH_RESULTS", "false").lower() == "true" } DEFAULT_HEADERS = { 'Accept': '*/*', 'Accept-Language': 'zh-CN,zh;q=0.9', 'Accept-Encoding': 'gzip, deflate, br, zstd', 'Content-Type': 'text/plain;charset=UTF-8', 'Connection': 'keep-alive', 'Origin': 'https://grok.com', 'Priority': 'u=1, i', 'Sec-Ch-Ua': '"Chromium";v="130", "Google Chrome";v="130", "Not?A_Brand";v="99"', 'Sec-Ch-Ua-Mobile': '?0', 'Sec-Ch-Ua-Platform': '"Windows"', 'Sec-Fetch-Dest': 'empty', 'Sec-Fetch-Mode': 'cors', 'Sec-Fetch-Site': 'same-origin', 'Baggage': 'sentry-public_key=b311e0f2690c81f25e2c4cf6d4f7ce1c' } class AuthTokenManager: def __init__(self): self.token_model_map = {} self.expired_tokens = set() self.token_status_map = {} self.model_config = { "grok-2": { "RequestFrequency": 30, "ExpirationTime": 1 * 60 * 60 * 1000 # 1小时 }, "grok-3": { "RequestFrequency": 20, "ExpirationTime": 2 * 60 * 60 * 1000 # 2小时 }, "grok-3-deepsearch": { "RequestFrequency": 10, "ExpirationTime": 24 * 60 * 60 * 1000 # 24小时 }, "grok-3-reasoning": { "RequestFrequency": 10, "ExpirationTime": 24 * 60 * 60 * 1000 # 24小时 } } self.token_reset_switch = False self.token_reset_timer = None def add_token(self, token): sso = token.split("sso=")[1].split(";")[0] for model in self.model_config.keys(): if model not in self.token_model_map: self.token_model_map[model] = [] if sso not in self.token_status_map: self.token_status_map[sso] = {} existing_token_entry = next((entry for entry in self.token_model_map[model] if entry["token"] == token), None) if not existing_token_entry: self.token_model_map[model].append({ "token": token, "RequestCount": 0, "AddedTime": int(time.time() * 1000), "StartCallTime": None }) if model not in self.token_status_map[sso]: self.token_status_map[sso][model] = { "isValid": True, "invalidatedTime": None, "totalRequestCount": 0 } def set_token(self, token): models = list(self.model_config.keys()) self.token_model_map = {model: [{ "token": token, "RequestCount": 0, "AddedTime": int(time.time() * 1000), "StartCallTime": None }] for model in models} sso = token.split("sso=")[1].split(";")[0] self.token_status_map[sso] = {model: { "isValid": True, "invalidatedTime": None, "totalRequestCount": 0 } for model in models} def delete_token(self, token): try: sso = token.split("sso=")[1].split(";")[0] for model in self.token_model_map: self.token_model_map[model] = [entry for entry in self.token_model_map[model] if entry["token"] != token] if sso in self.token_status_map: del self.token_status_map[sso] logger.info(f"令牌已成功移除: {token}", "TokenManager") return True except Exception as error: logger.error(f"令牌删除失败: {str(error)}") return False def get_next_token_for_model(self, model_id): normalized_model = self.normalize_model_name(model_id) if normalized_model not in self.token_model_map or not self.token_model_map[normalized_model]: return None token_entry = self.token_model_map[normalized_model][0] if token_entry: if token_entry["StartCallTime"] is None: token_entry["StartCallTime"] = int(time.time() * 1000) if not self.token_reset_switch: self.start_token_reset_process() self.token_reset_switch = True token_entry["RequestCount"] += 1 if token_entry["RequestCount"] > self.model_config[normalized_model]["RequestFrequency"]: self.remove_token_from_model(normalized_model, token_entry["token"]) next_token_entry = self.token_model_map[normalized_model][0] if self.token_model_map[normalized_model] else None return next_token_entry["token"] if next_token_entry else None sso = token_entry["token"].split("sso=")[1].split(";")[0] if sso in self.token_status_map and normalized_model in self.token_status_map[sso]: if token_entry["RequestCount"] == self.model_config[normalized_model]["RequestFrequency"]: self.token_status_map[sso][normalized_model]["isValid"] = False self.token_status_map[sso][normalized_model]["invalidatedTime"] = int(time.time() * 1000) self.token_status_map[sso][normalized_model]["totalRequestCount"] += 1 return token_entry["token"] return None def remove_token_from_model(self, model_id, token): normalized_model = self.normalize_model_name(model_id) if normalized_model not in self.token_model_map: logger.error(f"模型 {normalized_model} 不存在", "TokenManager") return False model_tokens = self.token_model_map[normalized_model] token_index = next((i for i, entry in enumerate(model_tokens) if entry["token"] == token), -1) if token_index != -1: removed_token_entry = model_tokens.pop(token_index) self.expired_tokens.add(( removed_token_entry["token"], normalized_model, int(time.time() * 1000) )) if not self.token_reset_switch: self.start_token_reset_process() self.token_reset_switch = True logger.info(f"模型{model_id}的令牌已失效,已成功移除令牌: {token}", "TokenManager") return True logger.error(f"在模型 {normalized_model} 中未找到 token: {token}", "TokenManager") return False def get_expired_tokens(self): return list(self.expired_tokens) def normalize_model_name(self, model): if model.startswith('grok-') and 'deepsearch' not in model and 'reasoning' not in model: return '-'.join(model.split('-')[:2]) return model def get_token_count_for_model(self, model_id): normalized_model = self.normalize_model_name(model_id) return len(self.token_model_map.get(normalized_model, [])) def get_remaining_token_request_capacity(self): remaining_capacity_map = {} for model in self.model_config.keys(): model_tokens = self.token_model_map.get(model, []) model_request_frequency = self.model_config[model]["RequestFrequency"] total_used_requests = sum(token_entry.get("RequestCount", 0) for token_entry in model_tokens) remaining_capacity = (len(model_tokens) * model_request_frequency) - total_used_requests remaining_capacity_map[model] = max(0, remaining_capacity) return remaining_capacity_map def get_token_array_for_model(self, model_id): normalized_model = self.normalize_model_name(model_id) return self.token_model_map.get(normalized_model, []) def start_token_reset_process(self): def reset_expired_tokens(): now = int(time.time() * 1000) tokens_to_remove = set() for token_info in self.expired_tokens: token, model, expired_time = token_info expiration_time = self.model_config[model]["ExpirationTime"] if now - expired_time >= expiration_time: if not any(entry["token"] == token for entry in self.token_model_map.get(model, [])): if model not in self.token_model_map: self.token_model_map[model] = [] self.token_model_map[model].append({ "token": token, "RequestCount": 0, "AddedTime": now, "StartCallTime": None }) sso = token.split("sso=")[1].split(";")[0] if sso in self.token_status_map and model in self.token_status_map[sso]: self.token_status_map[sso][model]["isValid"] = True self.token_status_map[sso][model]["invalidatedTime"] = None self.token_status_map[sso][model]["totalRequestCount"] = 0 tokens_to_remove.add(token_info) self.expired_tokens -= tokens_to_remove for model in self.model_config.keys(): if model not in self.token_model_map: continue for token_entry in self.token_model_map[model]: if not token_entry.get("StartCallTime"): continue expiration_time = self.model_config[model]["ExpirationTime"] if now - token_entry["StartCallTime"] >= expiration_time: sso = token_entry["token"].split("sso=")[1].split(";")[0] if sso in self.token_status_map and model in self.token_status_map[sso]: self.token_status_map[sso][model]["isValid"] = True self.token_status_map[sso][model]["invalidatedTime"] = None self.token_status_map[sso][model]["totalRequestCount"] = 0 token_entry["RequestCount"] = 0 token_entry["StartCallTime"] = None import threading # 启动一个线程执行定时任务,每小时执行一次 def run_timer(): while True: reset_expired_tokens() time.sleep(3600) timer_thread = threading.Thread(target=run_timer) timer_thread.daemon = True timer_thread.start() def get_all_tokens(self): all_tokens = set() for model_tokens in self.token_model_map.values(): for entry in model_tokens: all_tokens.add(entry["token"]) return list(all_tokens) def get_token_status_map(self): return self.token_status_map class Utils: @staticmethod def organize_search_results(search_results): if not search_results or 'results' not in search_results: return '' results = search_results['results'] formatted_results = [] for index, result in enumerate(results): title = result.get('title', '未知标题') url = result.get('url', '#') preview = result.get('preview', '无预览内容') formatted_result = f"\r\n
资料[{index}]: {title}\r\n{preview}\r\n\n[Link]({url})\r\n
" formatted_results.append(formatted_result) return '\n\n'.join(formatted_results) @staticmethod def create_auth_headers(model): return token_manager.get_next_token_for_model(model) @staticmethod def get_proxy_options(): proxy = CONFIG["API"]["PROXY"] proxy_options = {} if proxy: logger.info(f"使用代理: {proxy}", "Server") proxy_options["proxies"] = {"https": proxy, "http": proxy} if proxy.startswith("socks5://"): proxy_options["proxies"] = {"https": proxy, "http": proxy} proxy_options["proxy_type"] = "socks5" return proxy_options class GrokApiClient: def __init__(self, model_id): if model_id not in CONFIG["MODELS"]: raise ValueError(f"不支持的模型: {model_id}") self.model_id = CONFIG["MODELS"][model_id] def process_message_content(self, content): if isinstance(content, str): return content return None def get_image_type(self, base64_string): mime_type = 'image/jpeg' if 'data:image' in base64_string: import re matches = re.search(r'data:([a-zA-Z0-9]+\/[a-zA-Z0-9-.+]+);base64,', base64_string) if matches: mime_type = matches.group(1) extension = mime_type.split('/')[1] file_name = f"image.{extension}" return { "mimeType": mime_type, "fileName": file_name } def upload_base64_image(self, base64_data, url): try: if 'data:image' in base64_data: image_buffer = base64_data.split(',')[1] else: image_buffer = base64_data image_info = self.get_image_type(base64_data) mime_type = image_info["mimeType"] file_name = image_info["fileName"] upload_data = { "rpc": "uploadFile", "req": { "fileName": file_name, "fileMimeType": mime_type, "content": image_buffer } } logger.info("发送图片请求", "Server") proxy_options = Utils.get_proxy_options() response = curl_requests.post( url, headers={ **DEFAULT_HEADERS, "Cookie": CONFIG["API"]["SIGNATURE_COOKIE"] }, json=upload_data, impersonate="chrome120", **proxy_options ) if response.status_code != 200: logger.error(f"上传图片失败,状态码:{response.status_code}", "Server") return '' result = response.json() logger.info(f"上传图片成功: {result}", "Server") return result.get("fileMetadataId", "") except Exception as error: logger.error(str(error), "Server") return '' def prepare_chat_request(self, request): if ((request["model"] == 'grok-2-imageGen' or request["model"] == 'grok-3-imageGen') and not CONFIG["API"]["PICGO_KEY"] and not CONFIG["API"]["TUMY_KEY"] and request.get("stream", False)): raise ValueError("该模型流式输出需要配置PICGO或者TUMY图床密钥!") todo_messages = request["messages"] if request["model"] in ['grok-2-imageGen', 'grok-3-imageGen', 'grok-3-deepsearch']: last_message = todo_messages[-1] if last_message["role"] != 'user': raise ValueError('此模型最后一条消息必须是用户消息!') todo_messages = [last_message] file_attachments = [] messages = '' last_role = None last_content = '' search = request["model"] in ['grok-2-search', 'grok-3-search'] # 移除标签及其内容和base64图片 def remove_think_tags(text): import re text = re.sub(r'[\s\S]*?<\/think>', '', text).strip() text = re.sub(r'!\[image\]\(data:.*?base64,.*?\)', '[图片]', text) return text def process_content(content): if isinstance(content, list): text_content = '' for item in content: if item["type"] == 'image_url': text_content += ("[图片]" if not text_content else '\n[图片]') elif item["type"] == 'text': text_content += (remove_think_tags(item["text"]) if not text_content else '\n' + remove_think_tags(item["text"])) return text_content elif isinstance(content, dict) and content is not None: if content["type"] == 'image_url': return "[图片]" elif content["type"] == 'text': return remove_think_tags(content["text"]) return remove_think_tags(self.process_message_content(content)) for current in todo_messages: role = 'assistant' if current["role"] == 'assistant' else 'user' is_last_message = current == todo_messages[-1] if is_last_message and "content" in current: if isinstance(current["content"], list): for item in current["content"]: if item["type"] == 'image_url': processed_image = self.upload_base64_image( item["image_url"]["url"], f"{CONFIG['API']['BASE_URL']}/api/rpc" ) if processed_image: file_attachments.append(processed_image) elif isinstance(current["content"], dict) and current["content"].get("type") == 'image_url': processed_image = self.upload_base64_image( current["content"]["image_url"]["url"], f"{CONFIG['API']['BASE_URL']}/api/rpc" ) if processed_image: file_attachments.append(processed_image) text_content = process_content(current.get("content", "")) if text_content or (is_last_message and file_attachments): if role == last_role and text_content: last_content += '\n' + text_content messages = messages[:messages.rindex(f"{role.upper()}: ")] + f"{role.upper()}: {last_content}\n" else: messages += f"{role.upper()}: {text_content or '[图片]'}\n" last_content = text_content last_role = role return { "temporary": CONFIG["API"].get("IS_TEMP_CONVERSATION", False), "modelName": self.model_id, "message": messages.strip(), "fileAttachments": file_attachments[:4], "imageAttachments": [], "disableSearch": False, "enableImageGeneration": True, "returnImageBytes": False, "returnRawGrokInXaiRequest": False, "enableImageStreaming": False, "imageGenerationCount": 1, "forceConcise": False, "toolOverrides": { "imageGen": request["model"] in ['grok-2-imageGen', 'grok-3-imageGen'], "webSearch": search, "xSearch": search, "xMediaSearch": search, "trendsSearch": search, "xPostAnalyze": search }, "enableSideBySide": True, "isPreset": False, "sendFinalMetadata": True, "customInstructions": "", "deepsearchPreset": "default" if request["model"] == 'grok-3-deepsearch' else "", "isReasoning": request["model"] == 'grok-3-reasoning' } class MessageProcessor: @staticmethod def create_chat_response(message, model, is_stream=False): base_response = { "id": f"chatcmpl-{uuid.uuid4()}", "created": int(time.time()), "model": model } if is_stream: return { **base_response, "object": "chat.completion.chunk", "choices": [{ "index": 0, "delta": { "content": message } }] } return { **base_response, "object": "chat.completion", "choices": [{ "index": 0, "message": { "role": "assistant", "content": message }, "finish_reason": "stop" }], "usage": None } def process_model_response(response, model): result = {"token": None, "imageUrl": None} if CONFIG["IS_IMG_GEN"]: if response.get("cachedImageGenerationResponse") and not CONFIG["IS_IMG_GEN2"]: result["imageUrl"] = response["cachedImageGenerationResponse"]["imageUrl"] return result if model == 'grok-2': result["token"] = response.get("token") elif model in ['grok-2-search', 'grok-3-search']: if response.get("webSearchResults") and CONFIG["ISSHOW_SEARCH_RESULTS"]: result["token"] = f"\r\n{Utils.organize_search_results(response['webSearchResults'])}\r\n" else: result["token"] = response.get("token") elif model == 'grok-3': result["token"] = response.get("token") elif model == 'grok-3-deepsearch': if response.get("messageStepId") and not CONFIG["SHOW_THINKING"]: return result if response.get("messageStepId") and not CONFIG["IS_THINKING"]: result["token"] = "" + response.get("token", "") CONFIG["IS_THINKING"] = True elif not response.get("messageStepId") and CONFIG["IS_THINKING"] and response.get("messageTag") == "final": result["token"] = "" + response.get("token", "") CONFIG["IS_THINKING"] = False elif (response.get("messageStepId") and CONFIG["IS_THINKING"] and response.get("messageTag") == "assistant") or response.get("messageTag") == "final": result["token"] = response.get("token") elif model == 'grok-3-reasoning': if response.get("isThinking") and not CONFIG["SHOW_THINKING"]: return result if response.get("isThinking") and not CONFIG["IS_THINKING"]: result["token"] = "" + response.get("token", "") CONFIG["IS_THINKING"] = True elif not response.get("isThinking") and CONFIG["IS_THINKING"]: result["token"] = "" + response.get("token", "") CONFIG["IS_THINKING"] = False else: result["token"] = response.get("token") return result def handle_image_response(image_url): max_retries = 2 retry_count = 0 image_base64_response = None while retry_count < max_retries: try: proxy_options = Utils.get_proxy_options() image_base64_response = curl_requests.get( f"https://assets.grok.com/{image_url}", headers={ **DEFAULT_HEADERS, "Cookie": CONFIG["API"]["SIGNATURE_COOKIE"] }, impersonate="chrome120", **proxy_options ) if image_base64_response.status_code == 200: break retry_count += 1 if retry_count == max_retries: raise Exception(f"上游服务请求失败! status: {image_base64_response.status_code}") time.sleep(CONFIG["API"]["RETRY_TIME"] / 1000 * retry_count) except Exception as error: logger.error(str(error), "Server") retry_count += 1 if retry_count == max_retries: raise time.sleep(CONFIG["API"]["RETRY_TIME"] / 1000 * retry_count) image_buffer = image_base64_response.content if not CONFIG["API"]["PICGO_KEY"] and not CONFIG["API"]["TUMY_KEY"]: base64_image = base64.b64encode(image_buffer).decode('utf-8') image_content_type = image_base64_response.headers.get('content-type', 'image/jpeg') return f"![image](data:{image_content_type};base64,{base64_image})" logger.info("开始上传图床", "Server") if CONFIG["API"]["PICGO_KEY"]: files = {'source': ('image.jpg', image_buffer, 'image/jpeg')} headers = { "X-API-Key": CONFIG["API"]["PICGO_KEY"] } response_url = requests.post( "https://www.picgo.net/api/1/upload", files=files, headers=headers ) if response_url.status_code != 200: return "生图失败,请查看PICGO图床密钥是否设置正确" else: logger.info("生图成功", "Server") result = response_url.json() return f"![image]({result['image']['url']})" elif CONFIG["API"]["TUMY_KEY"]: files = {'file': ('image.jpg', image_buffer, 'image/jpeg')} headers = { "Accept": "application/json", 'Authorization': f"Bearer {CONFIG['API']['TUMY_KEY']}" } response_url = requests.post( "https://tu.my/api/v1/upload", files=files, headers=headers ) if response_url.status_code != 200: return "生图失败,请查看TUMY图床密钥是否设置正确" else: try: result = response_url.json() logger.info("生图成功", "Server") return f"![image]({result['data']['links']['url']})" except Exception as error: logger.error(str(error), "Server") return "生图失败,请查看TUMY图床密钥是否设置正确" def handle_non_stream_response(response, model): try: logger.info("开始处理非流式响应", "Server") stream = response.iter_lines() full_response = "" CONFIG["IS_THINKING"] = False CONFIG["IS_IMG_GEN"] = False CONFIG["IS_IMG_GEN2"] = False for chunk in stream: if not chunk: continue try: line_json = json.loads(chunk.decode("utf-8").strip()) if line_json.get("error"): logger.error(json.dumps(line_json, indent=2), "Server") return json.dumps({"error": "RateLimitError"}) + "\n\n" response_data = line_json.get("result", {}).get("response") if not response_data: continue if response_data.get("doImgGen") or response_data.get("imageAttachmentInfo"): CONFIG["IS_IMG_GEN"] = True result = process_model_response(response_data, model) if result["token"]: full_response += result["token"] if result["imageUrl"]: CONFIG["IS_IMG_GEN2"] = True return handle_image_response(result["imageUrl"]) except json.JSONDecodeError: continue except Exception as e: logger.error(f"处理流式响应行时出错: {str(e)}", "Server") continue return full_response except Exception as error: logger.error(str(error), "Server") raise def handle_stream_response(response, model): def generate(): logger.info("开始处理流式响应", "Server") stream = response.iter_lines() CONFIG["IS_THINKING"] = False CONFIG["IS_IMG_GEN"] = False CONFIG["IS_IMG_GEN2"] = False for chunk in stream: if not chunk: continue try: line_json = json.loads(chunk.decode("utf-8").strip()) if line_json.get("error"): logger.error(json.dumps(line_json, indent=2), "Server") yield json.dumps({"error": "RateLimitError"}) + "\n\n" return response_data = line_json.get("result", {}).get("response") if not response_data: continue if response_data.get("doImgGen") or response_data.get("imageAttachmentInfo"): CONFIG["IS_IMG_GEN"] = True result = process_model_response(response_data, model) if result["token"]: # 修复的代码 - 使用临时变量 token_content = result["token"] response_json = json.dumps(MessageProcessor.create_chat_response(token_content, model, True)) yield f"data: {response_json}\n\n" if result["imageUrl"]: CONFIG["IS_IMG_GEN2"] = True image_data = handle_image_response(result["imageUrl"]) # 同样修复这里 response_json = json.dumps(MessageProcessor.create_chat_response(image_data, model, True)) yield f"data: {response_json}\n\n" except json.JSONDecodeError: continue except Exception as e: logger.error(f"处理流式响应行时出错: {str(e)}", "Server") continue yield "data: [DONE]\n\n" return generate() def initialization(): sso_array = os.environ.get("SSO", "").split(',') logger.info("开始加载令牌", "Server") for sso in sso_array: if sso: token_manager.add_token(f"sso-rw={sso};sso={sso}") logger.info(f"成功加载令牌: {json.dumps(token_manager.get_all_tokens(), indent=2)}", "Server") logger.info(f"令牌加载完成,共加载: {len(token_manager.get_all_tokens())}个令牌", "Server") if CONFIG["API"]["PROXY"]: logger.info(f"代理已设置: {CONFIG['API']['PROXY']}", "Server") logger.info("初始化完成", "Server") app = Flask(__name__) app.wsgi_app = ProxyFix(app.wsgi_app) @app.before_request def log_request_info(): logger.info(f"{request.method} {request.path}", "Request") @app.route('/get/tokens', methods=['GET']) def get_tokens(): auth_token = request.headers.get('Authorization', '').replace('Bearer ', '') if CONFIG["API"]["IS_CUSTOM_SSO"]: return jsonify({"error": '自定义的SSO令牌模式无法获取轮询sso令牌状态'}), 403 elif auth_token != CONFIG["API"]["API_KEY"]: return jsonify({"error": 'Unauthorized'}), 401 return jsonify(token_manager.get_token_status_map()) @app.route('/add/token', methods=['POST']) def add_token(): auth_token = request.headers.get('Authorization', '').replace('Bearer ', '') if CONFIG["API"]["IS_CUSTOM_SSO"]: return jsonify({"error": '自定义的SSO令牌模式无法添加sso令牌'}), 403 elif auth_token != CONFIG["API"]["API_KEY"]: return jsonify({"error": 'Unauthorized'}), 401 try: sso = request.json.get('sso') token_manager.add_token(f"sso-rw={sso};sso={sso}") return jsonify(token_manager.get_token_status_map().get(sso, {})), 200 except Exception as error: logger.error(str(error), "Server") return jsonify({"error": '添加sso令牌失败'}), 500 @app.route('/delete/token', methods=['POST']) def delete_token(): auth_token = request.headers.get('Authorization', '').replace('Bearer ', '') if CONFIG["API"]["IS_CUSTOM_SSO"]: return jsonify({"error": '自定义的SSO令牌模式无法删除sso令牌'}), 403 elif auth_token != CONFIG["API"]["API_KEY"]: return jsonify({"error": 'Unauthorized'}), 401 try: sso = request.json.get('sso') token_manager.delete_token(f"sso-rw={sso};sso={sso}") return jsonify({"message": '删除sso令牌成功'}), 200 except Exception as error: logger.error(str(error), "Server") return jsonify({"error": '删除sso令牌失败'}), 500 @app.route('/v1/models', methods=['GET']) def get_models(): return jsonify({ "object": "list", "data": [ { "id": model, "object": "model", "created": int(time.time()), "owned_by": "grok" } for model in CONFIG["MODELS"].keys() ] }) @app.route('/v1/chat/completions', methods=['POST']) def chat_completions(): try: auth_token = request.headers.get('Authorization', '').replace('Bearer ', '') if auth_token: if CONFIG["API"]["IS_CUSTOM_SSO"]: result = f"sso={auth_token};sso-rw={auth_token}" token_manager.set_token(result) elif auth_token != CONFIG["API"]["API_KEY"]: return jsonify({"error": 'Unauthorized'}), 401 else: return jsonify({"error": 'API_KEY缺失'}), 401 data = request.json model = data.get("model") stream = data.get("stream", False) retry_count = 0 grok_client = GrokApiClient(model) request_payload = grok_client.prepare_chat_request(data) while retry_count < CONFIG["RETRY"]["MAX_ATTEMPTS"]: retry_count += 1 CONFIG["API"]["SIGNATURE_COOKIE"] = Utils.create_auth_headers(model) if not CONFIG["API"]["SIGNATURE_COOKIE"]: raise ValueError('该模型无可用令牌') logger.info(f"当前令牌: {json.dumps(CONFIG['API']['SIGNATURE_COOKIE'], indent=2)}", "Server") logger.info(f"当前可用模型的全部可用数量: {json.dumps(token_manager.get_remaining_token_request_capacity(), indent=2)}", "Server") try: proxy_options = Utils.get_proxy_options() response = curl_requests.post( f"{CONFIG['API']['BASE_URL']}/rest/app-chat/conversations/new", headers={ "Accept": "text/event-stream", "Baggage": "sentry-public_key=b311e0f2690c81f25e2c4cf6d4f7ce1c", "Content-Type": "text/plain;charset=UTF-8", "Connection": "keep-alive", "Cookie": CONFIG["API"]["SIGNATURE_COOKIE"] }, data=json.dumps(request_payload), impersonate="chrome120", stream=True, **proxy_options ) if response.status_code == 200: logger.info("请求成功", "Server") logger.info(f"当前{model}剩余可用令牌数: {token_manager.get_token_count_for_model(model)}", "Server") try: if stream: return Response( stream_with_context(handle_stream_response(response, model)), content_type='text/event-stream' ) else: content = handle_non_stream_response(response, model) return jsonify(MessageProcessor.create_chat_response(content, model)) except Exception as error: logger.error(str(error), "Server") if CONFIG["API"]["IS_CUSTOM_SSO"]: raise ValueError(f"自定义SSO令牌当前模型{model}的请求次数已失效") token_manager.remove_token_from_model(model, CONFIG["API"]["SIGNATURE_COOKIE"]) if token_manager.get_token_count_for_model(model) == 0: raise ValueError(f"{model} 次数已达上限,请切换其他模型或者重新对话") elif response.status_code == 429: if CONFIG["API"]["IS_CUSTOM_SSO"]: raise ValueError(f"自定义SSO令牌当前模型{model}的请求次数已失效") token_manager.remove_token_from_model(model, CONFIG["API"]["SIGNATURE_COOKIE"]) if token_manager.get_token_count_for_model(model) == 0: raise ValueError(f"{model} 次数已达上限,请切换其他模型或者重新对话") else: if CONFIG["API"]["IS_CUSTOM_SSO"]: raise ValueError(f"自定义SSO令牌当前模型{model}的请求次数已失效") logger.error(f"令牌异常错误状态!status: {response.status_code}", "Server") token_manager.remove_token_from_model(model, CONFIG["API"]["SIGNATURE_COOKIE"]) logger.info(f"当前{model}剩余可用令牌数: {token_manager.get_token_count_for_model(model)}", "Server") except Exception as e: logger.error(f"请求处理异常: {str(e)}", "Server") if CONFIG["API"]["IS_CUSTOM_SSO"]: raise continue raise ValueError('当前模型所有令牌都已耗尽') except Exception as error: logger.error(str(error), "ChatAPI") return jsonify({ "error": { "message": str(error), "type": "server_error" } }), 500 @app.route('/', defaults={'path': ''}) @app.route('/') def catch_all(path): return 'api运行正常', 200 if __name__ == '__main__': token_manager = AuthTokenManager() initialization() app.run( host='0.0.0.0', port=CONFIG["SERVER"]["PORT"], debug=False )