Spaces:
Running
Running
from flask import Flask, request, jsonify, Response | |
import cloudscraper # 替换requests库,专门用于绕过Cloudflare保护 | |
import io | |
import json | |
import re | |
import uuid | |
import random | |
import time | |
import os | |
app = Flask(__name__) | |
TARGET_URL = "https://grok.com/rest/app-chat/conversations/new" | |
MODELS = ["grok-2", "grok-3", "grok-3-thinking"] | |
COOKIE_NUM = 0 | |
COOKIE_LIST = [] | |
LAST_COOKIE_INDEX = {} | |
TEMPORARY_MODE = False | |
USER_AGENTS = [ | |
# Windows - Chrome | |
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36", | |
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/130.0.0.0 Safari/537.36", | |
# Windows - Firefox | |
"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:132.0) Gecko/20100101 Firefox/132.0", | |
# Windows - Edge | |
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.36 Edg/123.0.2420.81", | |
# Windows - Opera | |
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.36 OPR/109.0.0.0", | |
# macOS - Chrome | |
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36", | |
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/130.0.0.0 Safari/537.36", | |
# macOS - Safari | |
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/18.0.1 Safari/605.1.15", | |
# macOS - Firefox | |
"Mozilla/5.0 (Macintosh; Intel Mac OS X 14.7; rv:132.0) Gecko/20100101 Firefox/132.0", | |
# macOS - Opera | |
"Mozilla/5.0 (Macintosh; Intel Mac OS X 14_4_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.36 OPR/109.0.0.0", | |
"Mozilla/5.0 (Macintosh; Intel Mac OS X 14.4; rv:124.0) Gecko/20100101 Firefox/124.0", | |
# Linux - Chrome | |
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.36", | |
# Linux - Firefox | |
"Mozilla/5.0 (X11; Linux i686; rv:124.0) Gecko/20100101 Firefox/124.0", | |
] | |
def create_cf_scraper(cookie_string): | |
"""创建一个配置好的cloudscraper实例""" | |
# 只使用cloudscraper支持的浏览器 | |
browser = random.choice(['chrome', 'firefox']) # 修复:移除不支持的'edge' | |
platform = random.choice(['windows', 'darwin', 'linux']) | |
# 创建cloudscraper会话 | |
scraper = cloudscraper.create_scraper( | |
browser={ | |
'browser': browser, | |
'platform': platform, | |
'desktop': True | |
}, | |
delay=random.uniform(5, 10), # 等待Cloudflare检查 | |
interpreter='js2py', # 使用js2py解释JavaScript挑战 | |
) | |
# 设置自定义用户代理 | |
selected_ua = random.choice(USER_AGENTS) | |
# 设置基本头信息 | |
scraper.headers.update({ | |
"user-agent": selected_ua, | |
"accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8", | |
"accept-language": "en-US,en;q=0.5", | |
"accept-encoding": "gzip, deflate, br", | |
"dnt": "1", | |
"sec-fetch-dest": "document", | |
"sec-fetch-mode": "navigate", | |
"sec-fetch-site": "none", | |
"sec-fetch-user": "?1", | |
"upgrade-insecure-requests": "1", | |
"cookie": cookie_string | |
}) | |
return scraper | |
def resolve_config(): | |
global COOKIE_NUM, COOKIE_LIST, LAST_COOKIE_INDEX, TEMPORARY_MODE | |
COOKIE_LIST = [] | |
cookie_index = 1 | |
while True: | |
cookie_env_name = f"GROK_COOKIE_{cookie_index}" | |
cookie_string = os.environ.get(cookie_env_name) | |
if cookie_string: | |
try: | |
print(f"创建Cookie {cookie_index} 的CloudScraper实例...") | |
scraper = create_cf_scraper(cookie_string) | |
COOKIE_LIST.append(scraper) | |
cookie_index += 1 | |
except Exception as e: | |
print(f"为Cookie {cookie_index} 创建CloudScraper失败: {e}") | |
cookie_index += 1 | |
else: | |
break | |
COOKIE_NUM = len(COOKIE_LIST) | |
if COOKIE_NUM == 0: | |
raise ValueError("未提供Grok cookies,请通过环境变量设置 (GROK_COOKIE_1, GROK_COOKIE_2, ...)") | |
temporary_mode_str = os.environ.get("GROK_TEMPORARY_MODE", "false").lower() | |
TEMPORARY_MODE = temporary_mode_str == "true" or temporary_mode_str == "1" | |
LAST_COOKIE_INDEX = {model: 0 for model in MODELS} | |
print(f"已从环境变量加载 {COOKIE_NUM} 个Grok cookies。") | |
print(f"临时模式: {TEMPORARY_MODE}") | |
def root(): | |
return "Grok Proxy is running (Cloudflare Protected)", 200, {'Content-Type': 'text/plain'} | |
def health_check(): | |
return "OK", 200, {'Content-Type': 'text/plain'} | |
def get_models(): | |
model_list = [] | |
for model in MODELS: | |
model_list.append( | |
{ | |
"id": model, | |
"object": "model", | |
"created": int(time.time()), | |
"owned_by": "Elbert", | |
"name": model, | |
} | |
) | |
return jsonify({"object": "list", "data": model_list}) | |
def chat_completions(): | |
print("Received request") | |
openai_request = request.get_json() | |
print(openai_request) | |
stream = openai_request.get("stream", False) | |
messages = openai_request.get("messages") | |
model = openai_request.get("model") | |
if model not in MODELS: | |
return jsonify({"error": "Model not available"}), 500 | |
if messages is None: | |
return jsonify({"error": "Messages is required"}), 400 | |
disable_search, force_concise, messages = magic(messages) | |
message = format_message(messages) | |
is_reasoning = len(model) > 6 | |
model = model[0:6] | |
return ( | |
send_message(message, model, disable_search, force_concise, is_reasoning) | |
if stream | |
else send_message_non_stream( | |
message, model, disable_search, force_concise, is_reasoning) | |
) | |
def get_next_account(model): | |
current = (LAST_COOKIE_INDEX[model] + 1) % COOKIE_NUM | |
LAST_COOKIE_INDEX[model] = current | |
print(f"Using account {current+1}/{COOKIE_NUM} for {model}") | |
return COOKIE_LIST[current] | |
def send_message(message, model, disable_search, force_concise, is_reasoning): | |
headers = { | |
"authority": "grok.com", | |
"accept": "*/*", | |
"accept-language": "zh-CN,zh;q=0.9,en;q=0.8", | |
"cache-control": "no-cache", | |
"content-type": "application/json", | |
"origin": "https://grok.com", | |
"pragma": "no-cache", | |
"referer": "https://grok.com/", | |
"sec-ch-ua": '"Not(A:Brand";v="99", "Google Chrome";v="133", "Chromium";v="133"', | |
"sec-ch-ua-mobile": "?0", | |
"sec-ch-ua-platform": '"Windows"', | |
"sec-fetch-dest": "empty", | |
"sec-fetch-mode": "cors", | |
"sec-fetch-site": "same-origin", | |
} | |
payload = { | |
"temporary": TEMPORARY_MODE, | |
"modelName": "grok-3", | |
"message": message, | |
"fileAttachments": [], | |
"imageAttachments": [], | |
"disableSearch": disable_search, | |
"enableImageGeneration": False, | |
"returnImageBytes": False, | |
"returnRawGrokInXaiRequest": False, | |
"enableImageStreaming": True, | |
"imageGenerationCount": 2, | |
"forceConcise": force_concise, | |
"toolOverrides": {}, | |
"enableSideBySide": True, | |
"isPreset": False, | |
"sendFinalMetadata": True, | |
"customInstructions": "", | |
"deepsearchPreset": "", | |
"isReasoning": is_reasoning, | |
} | |
try: | |
scraper = get_next_account(model) | |
# 预热Cloudflare | |
print("预热Cloudflare权限...") | |
scraper.get("https://grok.com/", timeout=15) | |
time.sleep(random.uniform(1.0, 2.0)) # 随机等待以模拟人类 | |
print("发送消息请求...") | |
response = scraper.post(TARGET_URL, headers=headers, json=payload, stream=True) | |
response.raise_for_status() | |
def generate(): | |
try: | |
print("---------- Response ----------") | |
cnt = 2 | |
thinking = 2 | |
for line in response.iter_lines(): | |
if line: | |
if cnt != 0: | |
cnt -= 1 | |
else: | |
decoded_line = line.decode("utf-8") | |
data = json.loads(decoded_line) | |
token = data["result"]["response"]["token"] | |
content = "" | |
if is_reasoning: | |
if thinking == 2: | |
thinking = 1 | |
content = f"<Thinking>\n{token}" | |
print(f"{content}", end="") | |
elif thinking & ( | |
not data["result"]["response"]["isThinking"] | |
): | |
thinking = 0 | |
content = f"\n</Thinking>\n{token}" | |
print(f"{content}", end="") | |
else: | |
content = token | |
print(content, end="") | |
else: | |
content = token | |
print(content, end="") | |
openai_chunk = { | |
"id": "chatcmpl-" + str(uuid.uuid4()), | |
"object": "chat.completion.chunk", | |
"created": int(time.time()), | |
"model": model, | |
"choices": [ | |
{ | |
"index": 0, | |
"delta": {"content": content}, | |
"finish_reason": None, | |
} | |
], | |
} | |
yield f"data: {json.dumps(openai_chunk)}\n\n" | |
if data["result"]["response"]["isSoftStop"]: | |
openai_chunk = { | |
"id": "chatcmpl-" + str(uuid.uuid4()), | |
"object": "chat.completion.chunk", | |
"created": int(time.time()), | |
"model": model, | |
"choices": [ | |
{ | |
"index": 0, | |
"delta": {"content": content}, | |
"finish_reason": "completed", | |
} | |
], | |
} | |
yield f"data: {json.dumps(openai_chunk)}\n\n" | |
break | |
print("\n---------- Response End ----------") | |
yield f"data: [DONE]\n\n" | |
except Exception as e: | |
print(f"Failed to send message: {e}") | |
yield f'data: {{"error": "{e}"}}\n\n' | |
return Response(generate(), content_type="text/event-stream") | |
except Exception as e: | |
print(f"Failed to send message: {e}") | |
return jsonify({"error": f"Failed to send message: {e}"}), 500 | |
def send_message_non_stream( | |
message, model, disable_search, force_concise, is_reasoning | |
): | |
headers = { | |
"authority": "grok.com", | |
"accept": "*/*", | |
"accept-language": "zh-CN,zh;q=0.9,en;q=0.8", | |
"cache-control": "no-cache", | |
"content-type": "application/json", | |
"origin": "https://grok.com", | |
"pragma": "no-cache", | |
"referer": "https://grok.com/", | |
"sec-ch-ua": '"Not(A:Brand";v="99", "Google Chrome";v="133", "Chromium";v="133"', | |
"sec-ch-ua-mobile": "?0", | |
"sec-ch-ua-platform": '"Windows"', | |
"sec-fetch-dest": "empty", | |
"sec-fetch-mode": "cors", | |
"sec-fetch-site": "same-origin", | |
} | |
payload = { | |
"temporary": TEMPORARY_MODE, | |
"modelName": "grok-3", | |
"message": message, | |
"fileAttachments": [], | |
"imageAttachments": [], | |
"disableSearch": disable_search, | |
"enableImageGeneration": False, | |
"returnImageBytes": False, | |
"returnRawGrokInXaiRequest": False, | |
"enableImageStreaming": True, | |
"imageGenerationCount": 2, | |
"forceConcise": force_concise, | |
"toolOverrides": {}, | |
"enableSideBySide": True, | |
"isPreset": False, | |
"sendFinalMetadata": True, | |
"customInstructions": "", | |
"deepsearchPreset": "", | |
"isReasoning": is_reasoning, | |
} | |
thinking = 2 | |
try: | |
scraper = get_next_account(model) | |
# 预热Cloudflare | |
print("预热Cloudflare权限...") | |
scraper.get("https://grok.com/", timeout=15) | |
time.sleep(random.uniform(1.0, 2.0)) # 随机等待以模拟人类 | |
print("发送非流式消息请求...") | |
response = scraper.post(TARGET_URL, headers=headers, json=payload, stream=True) | |
response.raise_for_status() | |
cnt = 2 | |
try: | |
print("---------- Response ----------") | |
buffer = io.StringIO() | |
for line in response.iter_lines(): | |
if line: | |
if cnt != 0: | |
cnt -= 1 | |
else: | |
decoded_line = line.decode("utf-8") | |
data = json.loads(decoded_line) | |
token = data["result"]["response"]["token"] | |
content = "" | |
if is_reasoning: | |
if thinking == 2: | |
thinking = 1 | |
content = f"<Thinking>\n{token}" | |
print(f"{content}", end="") | |
buffer.write(content) | |
elif thinking & ( | |
not data["result"]["response"]["isThinking"] | |
): | |
thinking = 0 | |
content = f"\n</Thinking>\n{token}" | |
print(f"{content}", end="") | |
buffer.write(content) | |
else: | |
content = token | |
print(content, end="") | |
buffer.write(content) | |
else: | |
content = token | |
print(content, end="") | |
buffer.write(content) | |
if data["result"]["response"]["isSoftStop"]: | |
break | |
print("\n---------- Response End ----------") | |
openai_response = { | |
"id": "chatcmpl-" + str(uuid.uuid4()), | |
"object": "chat.completion", | |
"created": int(time.time()), | |
"model": model, | |
"choices": [ | |
{ | |
"index": 0, | |
"message": {"role": "assistant", "content": buffer.getvalue()}, | |
"finish_reason": "completed", | |
} | |
], | |
} | |
return jsonify(openai_response) | |
except Exception as e: | |
print(f"Failed to process response: {e}") | |
return jsonify({"error": f"Failed to process response: {e}"}), 500 | |
except Exception as e: | |
print(f"Failed to send message: {e}") | |
return jsonify({"error": f"Failed to send message: {e}"}), 500 | |
def format_message(messages): | |
buffer = io.StringIO() | |
role_map, prefix, messages = extract_role(messages) | |
for message in messages: | |
role = message.get("role") | |
role = "\b" + role_map[role] if prefix else role_map[role] | |
content = message.get("content").replace("\\n", "\n") | |
pattern = re.compile(r"<\|removeRole\|>\n") | |
if pattern.match(content): | |
content = pattern.sub("", content) | |
buffer.write(f"{content}\n") | |
else: | |
buffer.write(f"{role}: {content}\n") | |
return buffer.getvalue() | |
def extract_role(messages): | |
role_map = {"user": "Human", "assistant": "Assistant", "system": "System"} | |
prefix = False | |
first_message = messages[0]["content"] | |
pattern = re.compile( | |
r""" | |
<roleInfo>\s* | |
user:\s*(?P<user>[^\n]*)\s* | |
assistant:\s*(?P<assistant>[^\n]*)\s* | |
system:\s*(?P<system>[^\n]*)\s* | |
prefix:\s*(?P<prefix>[^\n]*)\s* | |
</roleInfo>\n | |
""", | |
re.VERBOSE, | |
) | |
match = pattern.search(first_message) | |
if match: | |
role_map = { | |
"user": match.group("user"), | |
"assistant": match.group("assistant"), | |
"system": match.group("system"), | |
} | |
prefix = match.group("prefix") == "1" | |
messages[0]["content"] = pattern.sub("", first_message) | |
print(f"Extracted role map:") | |
print( | |
f"User: {role_map['user']}, {role_map['assistant']}, System: {role_map['system']}" | |
) | |
print(f"Using prefix: {prefix}") | |
return (role_map, prefix, messages) | |
def magic(messages): | |
first_message = messages[0]["content"] | |
disable_search = False | |
if re.search(r"<\|disableSearch\|>", first_message): | |
disable_search = True | |
print("Disable search") | |
first_message = re.sub(r"<\|disableSearch\|>", "", first_message) | |
force_concise = False | |
if re.search(r"<\|forceConcise\|>", first_message): | |
force_concise = True | |
print("Force concise") | |
first_message = re.sub(r"<\|forceConcise\|>", "", first_message) | |
messages[0]["content"] = first_message | |
return (disable_search, force_concise, messages) | |
# 初始化配置 | |
resolve_config() | |
if __name__ == "__main__": | |
app.run(host="0.0.0.0", port=7860) | |