S2S-Arena / app.py
KurtDu's picture
Upload 4 files
dff2993 verified
raw
history blame
6.16 kB
import os
import json
import random
import uuid
from flask import Flask, request, jsonify, session, render_template
from flask_cors import CORS
from datetime import datetime
from elo_rank import EloRank
app = Flask(__name__)
CORS(app)
app.secret_key = 'supersecretkey'
DATA_DIR = './data'
RESULTS_DIR = './results'
# 实例化 EloRank 系统
elo_rank_system = EloRank()
# 初始化 Elo 排名的模型
models = ['output_path_4o', 'output_path_miniomni', 'output_path_speechgpt', 'output_path_funaudio', 'output_path_4o_cascade', 'output_path_4o_llama_omni']
for model in models:
elo_rank_system.add_model(model)
def load_test_data(task):
"""Load the JSON file corresponding to the selected task"""
with open(os.path.join(DATA_DIR, f"{task}.json"), "r", encoding='utf-8') as f:
test_data = json.load(f)
# 更新音频路径,将它们指向 Flask 静态文件夹
for item in test_data:
item['input_path'] = f"/static/audio{item['input_path']}"
item['output_path_4o'] = f"/static/audio{item['output_path_4o']}"
item['output_path_miniomni'] = f"/static/audio{item['output_path_miniomni']}"
item['output_path_speechgpt'] = f"/static/audio{item['output_path_speechgpt']}"
item['output_path_funaudio'] = f"/static/audio{item['output_path_funaudio']}"
item['output_path_4o_cascade'] = f"/static/audio{item['output_path_4o_cascade']}"
item['output_path_4o_llama_omni'] = f"/static/audio{item['output_path_4o_llama_omni']}"
return test_data
def save_result(task, username, result_data, session_id):
"""Save user's result in a separate file"""
file_path = os.path.join(RESULTS_DIR, f"{task}_{username}_{session_id}.jsonl")
# 获取所有模型的 Elo 分数
elo_scores = {model: elo_rank_system.get_rating(model) for model in models}
# 添加 Elo 分数和时间戳到结果数据
result_data['elo_scores'] = elo_scores
result_data['timestamp'] = datetime.now().isoformat()
with open(file_path, "a", encoding='utf-8') as f:
f.write(json.dumps(result_data) + "\n")
@app.route('/start_test', methods=['POST'])
def start_test():
"""Initiate the test for a user with the selected task"""
data = request.json
task = data['task']
username = data['username']
# Load the test data
test_data = load_test_data(task)
# Shuffle test data for the user
random.shuffle(test_data)
# Generate a unique session ID (for example using uuid)
session_id = str(uuid.uuid4())
# Store in session
session['task'] = task
session['username'] = username
session['test_data'] = test_data
session['current_index'] = 0
session['session_id'] = session_id # Store the session ID in the session
task_description = test_data[0].get('task_description', '')
return jsonify({"message": "Test started", "total_tests": len(test_data), "task_description": task_description})
@app.route('/next_test', methods=['GET'])
def next_test():
"""Serve the next test item"""
if 'current_index' not in session or 'test_data' not in session:
return jsonify({"message": "No active test found"}), 400
current_index = session['current_index']
test_data = session['test_data']
if current_index >= len(test_data):
# Return the "Test completed" message when all tests are done
return jsonify({"message": "Test completed"}), 200
# 使用 EloRank 的 sample_next_match 来选择两款模型
selected_models = elo_rank_system.sample_next_match()
# Serve test data with the two selected models
current_test = test_data[current_index]
session['selected_models'] = selected_models
session['current_index'] += 1
return jsonify({
"text": current_test["text"],
"input_path": current_test["input_path"],
"model_a": selected_models[0],
"model_b": selected_models[1],
"audio_a": current_test[selected_models[0]],
"audio_b": current_test[selected_models[1]]
})
@app.route('/submit_result', methods=['POST'])
def submit_result():
"""Submit the user's result and save it"""
data = request.json
chosen_model = data['chosen_model']
username = session.get('username')
task = session.get('task')
current_index = session.get('current_index') - 1 # Subtract since we increment after serving
session_id = session.get('session_id') # Get the session ID
if not username or not task or current_index < 0:
return jsonify({"message": "No active test found"}), 400
# Retrieve the selected models
selected_models = session['selected_models']
model_a = selected_models[0]
model_b = selected_models[1]
result = {
"name": username,
"chosen_model": chosen_model,
"model_a": model_a,
"model_b": model_b,
"result": {
model_a: 1 if chosen_model == 'A' else 0,
model_b: 1 if chosen_model == 'B' else 0
}
}
# Save the result for the current test using session_id to avoid filename conflict
test_data = session['test_data'][current_index]
result_data = {**test_data, **result}
save_result(task, username, result_data, session_id)
# 更新 Elo 排名系统
if chosen_model == 'A':
elo_rank_system.record_match(model_a, model_b)
else:
elo_rank_system.record_match(model_b, model_a)
return jsonify({"message": "Result submitted", "model_a": model_a, "model_b": model_b, "chosen_model": chosen_model})
@app.route('/end_test', methods=['GET'])
def end_test():
"""End the test session"""
session.clear()
return jsonify({"message": "Test completed"})
# 渲染index.html页面
@app.route('/')
def index():
return render_template('index.html')
if __name__ == '__main__':
if not os.path.exists(RESULTS_DIR):
os.makedirs(RESULTS_DIR)
# 允许局域网访问
app.run(debug=True, host="0.0.0.0", port=6002)