from flask import Flask, request, jsonify, Response import cloudscraper # 替换requests库,专门用于绕过Cloudflare保护 import io import json import re import uuid import random import time import os app = Flask(__name__) TARGET_URL = "https://grok.com/rest/app-chat/conversations/new" MODELS = ["grok-2", "grok-3", "grok-3-thinking"] COOKIE_NUM = 0 COOKIE_LIST = [] LAST_COOKIE_INDEX = {} TEMPORARY_MODE = False USER_AGENTS = [ # Windows - Chrome "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36", "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/130.0.0.0 Safari/537.36", # Windows - Firefox "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:132.0) Gecko/20100101 Firefox/132.0", # Windows - Edge "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.36 Edg/123.0.2420.81", # Windows - Opera "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.36 OPR/109.0.0.0", # macOS - Chrome "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/130.0.0.0 Safari/537.36", # macOS - Safari "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/18.0.1 Safari/605.1.15", # macOS - Firefox "Mozilla/5.0 (Macintosh; Intel Mac OS X 14.7; rv:132.0) Gecko/20100101 Firefox/132.0", # macOS - Opera "Mozilla/5.0 (Macintosh; Intel Mac OS X 14_4_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.36 OPR/109.0.0.0", "Mozilla/5.0 (Macintosh; Intel Mac OS X 14.4; rv:124.0) Gecko/20100101 Firefox/124.0", # Linux - Chrome "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.36", # Linux - Firefox "Mozilla/5.0 (X11; Linux i686; rv:124.0) Gecko/20100101 Firefox/124.0", ] def create_cf_scraper(cookie_string): """创建一个配置好的cloudscraper实例""" # 只使用cloudscraper支持的浏览器 browser = random.choice(['chrome', 'firefox']) # 修复:移除不支持的'edge' platform = random.choice(['windows', 'darwin', 'linux']) # 创建cloudscraper会话 scraper = cloudscraper.create_scraper( browser={ 'browser': browser, 'platform': platform, 'desktop': True }, delay=random.uniform(5, 10), # 等待Cloudflare检查 interpreter='js2py', # 使用js2py解释JavaScript挑战 ) # 设置自定义用户代理 selected_ua = random.choice(USER_AGENTS) # 设置基本头信息 scraper.headers.update({ "user-agent": selected_ua, "accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8", "accept-language": "en-US,en;q=0.5", "accept-encoding": "gzip, deflate, br", "dnt": "1", "sec-fetch-dest": "document", "sec-fetch-mode": "navigate", "sec-fetch-site": "none", "sec-fetch-user": "?1", "upgrade-insecure-requests": "1", "cookie": cookie_string }) return scraper def resolve_config(): global COOKIE_NUM, COOKIE_LIST, LAST_COOKIE_INDEX, TEMPORARY_MODE COOKIE_LIST = [] cookie_index = 1 while True: cookie_env_name = f"GROK_COOKIE_{cookie_index}" cookie_string = os.environ.get(cookie_env_name) if cookie_string: try: print(f"创建Cookie {cookie_index} 的CloudScraper实例...") scraper = create_cf_scraper(cookie_string) COOKIE_LIST.append(scraper) cookie_index += 1 except Exception as e: print(f"为Cookie {cookie_index} 创建CloudScraper失败: {e}") cookie_index += 1 else: break COOKIE_NUM = len(COOKIE_LIST) if COOKIE_NUM == 0: raise ValueError("未提供Grok cookies,请通过环境变量设置 (GROK_COOKIE_1, GROK_COOKIE_2, ...)") temporary_mode_str = os.environ.get("GROK_TEMPORARY_MODE", "false").lower() TEMPORARY_MODE = temporary_mode_str == "true" or temporary_mode_str == "1" LAST_COOKIE_INDEX = {model: 0 for model in MODELS} print(f"已从环境变量加载 {COOKIE_NUM} 个Grok cookies。") print(f"临时模式: {TEMPORARY_MODE}") @app.route("/", methods=["GET"]) def root(): return "Grok Proxy is running (Cloudflare Protected)", 200, {'Content-Type': 'text/plain'} @app.route("/health", methods=["GET"]) def health_check(): return "OK", 200, {'Content-Type': 'text/plain'} @app.route("/v1/models", methods=["GET"]) def get_models(): model_list = [] for model in MODELS: model_list.append( { "id": model, "object": "model", "created": int(time.time()), "owned_by": "Elbert", "name": model, } ) return jsonify({"object": "list", "data": model_list}) @app.route("/v1/chat/completions", methods=["POST"]) def chat_completions(): print("Received request") openai_request = request.get_json() print(openai_request) stream = openai_request.get("stream", False) messages = openai_request.get("messages") model = openai_request.get("model") if model not in MODELS: return jsonify({"error": "Model not available"}), 500 if messages is None: return jsonify({"error": "Messages is required"}), 400 disable_search, force_concise, messages = magic(messages) message = format_message(messages) is_reasoning = len(model) > 6 model = model[0:6] return ( send_message(message, model, disable_search, force_concise, is_reasoning) if stream else send_message_non_stream( message, model, disable_search, force_concise, is_reasoning) ) def get_next_account(model): current = (LAST_COOKIE_INDEX[model] + 1) % COOKIE_NUM LAST_COOKIE_INDEX[model] = current print(f"Using account {current+1}/{COOKIE_NUM} for {model}") return COOKIE_LIST[current] def send_message(message, model, disable_search, force_concise, is_reasoning): headers = { "authority": "grok.com", "accept": "*/*", "accept-language": "zh-CN,zh;q=0.9,en;q=0.8", "cache-control": "no-cache", "content-type": "application/json", "origin": "https://grok.com", "pragma": "no-cache", "referer": "https://grok.com/", "sec-ch-ua": '"Not(A:Brand";v="99", "Google Chrome";v="133", "Chromium";v="133"', "sec-ch-ua-mobile": "?0", "sec-ch-ua-platform": '"Windows"', "sec-fetch-dest": "empty", "sec-fetch-mode": "cors", "sec-fetch-site": "same-origin", } payload = { "temporary": TEMPORARY_MODE, "modelName": "grok-3", "message": message, "fileAttachments": [], "imageAttachments": [], "disableSearch": disable_search, "enableImageGeneration": False, "returnImageBytes": False, "returnRawGrokInXaiRequest": False, "enableImageStreaming": True, "imageGenerationCount": 2, "forceConcise": force_concise, "toolOverrides": {}, "enableSideBySide": True, "isPreset": False, "sendFinalMetadata": True, "customInstructions": "", "deepsearchPreset": "", "isReasoning": is_reasoning, } try: scraper = get_next_account(model) # 预热Cloudflare print("预热Cloudflare权限...") scraper.get("https://grok.com/", timeout=15) time.sleep(random.uniform(1.0, 2.0)) # 随机等待以模拟人类 print("发送消息请求...") response = scraper.post(TARGET_URL, headers=headers, json=payload, stream=True) response.raise_for_status() def generate(): try: print("---------- Response ----------") cnt = 2 thinking = 2 for line in response.iter_lines(): if line: if cnt != 0: cnt -= 1 else: decoded_line = line.decode("utf-8") data = json.loads(decoded_line) token = data["result"]["response"]["token"] content = "" if is_reasoning: if thinking == 2: thinking = 1 content = f"\n{token}" print(f"{content}", end="") elif thinking & ( not data["result"]["response"]["isThinking"] ): thinking = 0 content = f"\n\n{token}" print(f"{content}", end="") else: content = token print(content, end="") else: content = token print(content, end="") openai_chunk = { "id": "chatcmpl-" + str(uuid.uuid4()), "object": "chat.completion.chunk", "created": int(time.time()), "model": model, "choices": [ { "index": 0, "delta": {"content": content}, "finish_reason": None, } ], } yield f"data: {json.dumps(openai_chunk)}\n\n" if data["result"]["response"]["isSoftStop"]: openai_chunk = { "id": "chatcmpl-" + str(uuid.uuid4()), "object": "chat.completion.chunk", "created": int(time.time()), "model": model, "choices": [ { "index": 0, "delta": {"content": content}, "finish_reason": "completed", } ], } yield f"data: {json.dumps(openai_chunk)}\n\n" break print("\n---------- Response End ----------") yield f"data: [DONE]\n\n" except Exception as e: print(f"Failed to send message: {e}") yield f'data: {{"error": "{e}"}}\n\n' return Response(generate(), content_type="text/event-stream") except Exception as e: print(f"Failed to send message: {e}") return jsonify({"error": f"Failed to send message: {e}"}), 500 def send_message_non_stream( message, model, disable_search, force_concise, is_reasoning ): headers = { "authority": "grok.com", "accept": "*/*", "accept-language": "zh-CN,zh;q=0.9,en;q=0.8", "cache-control": "no-cache", "content-type": "application/json", "origin": "https://grok.com", "pragma": "no-cache", "referer": "https://grok.com/", "sec-ch-ua": '"Not(A:Brand";v="99", "Google Chrome";v="133", "Chromium";v="133"', "sec-ch-ua-mobile": "?0", "sec-ch-ua-platform": '"Windows"', "sec-fetch-dest": "empty", "sec-fetch-mode": "cors", "sec-fetch-site": "same-origin", } payload = { "temporary": TEMPORARY_MODE, "modelName": "grok-3", "message": message, "fileAttachments": [], "imageAttachments": [], "disableSearch": disable_search, "enableImageGeneration": False, "returnImageBytes": False, "returnRawGrokInXaiRequest": False, "enableImageStreaming": True, "imageGenerationCount": 2, "forceConcise": force_concise, "toolOverrides": {}, "enableSideBySide": True, "isPreset": False, "sendFinalMetadata": True, "customInstructions": "", "deepsearchPreset": "", "isReasoning": is_reasoning, } thinking = 2 try: scraper = get_next_account(model) # 预热Cloudflare print("预热Cloudflare权限...") scraper.get("https://grok.com/", timeout=15) time.sleep(random.uniform(1.0, 2.0)) # 随机等待以模拟人类 print("发送非流式消息请求...") response = scraper.post(TARGET_URL, headers=headers, json=payload, stream=True) response.raise_for_status() cnt = 2 try: print("---------- Response ----------") buffer = io.StringIO() for line in response.iter_lines(): if line: if cnt != 0: cnt -= 1 else: decoded_line = line.decode("utf-8") data = json.loads(decoded_line) token = data["result"]["response"]["token"] content = "" if is_reasoning: if thinking == 2: thinking = 1 content = f"\n{token}" print(f"{content}", end="") buffer.write(content) elif thinking & ( not data["result"]["response"]["isThinking"] ): thinking = 0 content = f"\n\n{token}" print(f"{content}", end="") buffer.write(content) else: content = token print(content, end="") buffer.write(content) else: content = token print(content, end="") buffer.write(content) if data["result"]["response"]["isSoftStop"]: break print("\n---------- Response End ----------") openai_response = { "id": "chatcmpl-" + str(uuid.uuid4()), "object": "chat.completion", "created": int(time.time()), "model": model, "choices": [ { "index": 0, "message": {"role": "assistant", "content": buffer.getvalue()}, "finish_reason": "completed", } ], } return jsonify(openai_response) except Exception as e: print(f"Failed to process response: {e}") return jsonify({"error": f"Failed to process response: {e}"}), 500 except Exception as e: print(f"Failed to send message: {e}") return jsonify({"error": f"Failed to send message: {e}"}), 500 def format_message(messages): buffer = io.StringIO() role_map, prefix, messages = extract_role(messages) for message in messages: role = message.get("role") role = "\b" + role_map[role] if prefix else role_map[role] content = message.get("content").replace("\\n", "\n") pattern = re.compile(r"<\|removeRole\|>\n") if pattern.match(content): content = pattern.sub("", content) buffer.write(f"{content}\n") else: buffer.write(f"{role}: {content}\n") return buffer.getvalue() def extract_role(messages): role_map = {"user": "Human", "assistant": "Assistant", "system": "System"} prefix = False first_message = messages[0]["content"] pattern = re.compile( r""" \s* user:\s*(?P[^\n]*)\s* assistant:\s*(?P[^\n]*)\s* system:\s*(?P[^\n]*)\s* prefix:\s*(?P[^\n]*)\s* \n """, re.VERBOSE, ) match = pattern.search(first_message) if match: role_map = { "user": match.group("user"), "assistant": match.group("assistant"), "system": match.group("system"), } prefix = match.group("prefix") == "1" messages[0]["content"] = pattern.sub("", first_message) print(f"Extracted role map:") print( f"User: {role_map['user']}, {role_map['assistant']}, System: {role_map['system']}" ) print(f"Using prefix: {prefix}") return (role_map, prefix, messages) def magic(messages): first_message = messages[0]["content"] disable_search = False if re.search(r"<\|disableSearch\|>", first_message): disable_search = True print("Disable search") first_message = re.sub(r"<\|disableSearch\|>", "", first_message) force_concise = False if re.search(r"<\|forceConcise\|>", first_message): force_concise = True print("Force concise") first_message = re.sub(r"<\|forceConcise\|>", "", first_message) messages[0]["content"] = first_message return (disable_search, force_concise, messages) # 初始化配置 resolve_config() if __name__ == "__main__": app.run(host="0.0.0.0", port=7860)