|
import dotenv |
|
import evalica |
|
import io |
|
import json |
|
import os |
|
import random |
|
import threading |
|
|
|
import aisuite as ai |
|
import gradio as gr |
|
import pandas as pd |
|
|
|
from huggingface_hub import upload_file, hf_hub_download, HfFolder, HfApi |
|
from datetime import datetime |
|
from gradio_leaderboard import Leaderboard |
|
|
|
|
|
dotenv.load_dotenv() |
|
|
|
|
|
gcp_credentials = os.environ.get("GCP_CREDENTIALS") |
|
|
|
|
|
credentials_path = ( |
|
"/tmp/gcp_credentials.json" |
|
) |
|
with open(credentials_path, "w") as f: |
|
f.write(gcp_credentials) |
|
|
|
|
|
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = credentials_path |
|
|
|
|
|
TIMEOUT = 60 |
|
|
|
|
|
client = ai.Client() |
|
|
|
|
|
SHOW_HINT_STRING = True |
|
HINT_STRING = "Once signed in, your votes will be recorded securely." |
|
|
|
|
|
with open("context_window.json", "r") as file: |
|
context_window = json.load(file) |
|
|
|
|
|
available_models = list(context_window.keys()) |
|
if len(available_models) < 2: |
|
raise ValueError( |
|
"Insufficient models in context_window.json. At least two are required." |
|
) |
|
|
|
|
|
models_state = {} |
|
conversation_state = {} |
|
|
|
|
|
|
|
|
|
|
|
def truncate_prompt(prompt, model_alias, models): |
|
model_name = models[model_alias] |
|
context_length = context_window.get(model_name, 4096) |
|
while len(json.dumps({"role": "user", "content": prompt})) > context_length: |
|
prompt = prompt[:-10] if len(prompt) > 10 else prompt[:1] |
|
return prompt |
|
|
|
|
|
def chat_with_models(user_input, model_alias, models, conversation_state, timeout=TIMEOUT): |
|
model_name = models[model_alias] |
|
truncated_input = truncate_prompt(user_input, model_alias, models) |
|
conversation_state.setdefault(model_name, []).append( |
|
{"role": "user", "content": user_input} |
|
) |
|
|
|
response_event = threading.Event() |
|
model_response = {"content": None, "error": None} |
|
|
|
def request_model_response(): |
|
try: |
|
response = client.chat.completions.create( |
|
model=model_name, |
|
messages=[{"role": "user", "content": truncated_input}], |
|
) |
|
model_response["content"] = response.choices[0].message.content |
|
except Exception as e: |
|
model_response["error"] = f"{model_name} model is not available. Error: {e}" |
|
finally: |
|
response_event.set() |
|
|
|
|
|
response_thread = threading.Thread(target=request_model_response) |
|
response_thread.start() |
|
|
|
|
|
response_event_occurred = response_event.wait(timeout) |
|
|
|
if not response_event_occurred: |
|
|
|
raise TimeoutError( |
|
f"The {model_alias} model did not respond within {timeout} seconds." |
|
) |
|
elif model_response["error"]: |
|
|
|
raise Exception(model_response["error"]) |
|
else: |
|
|
|
formatted_response = f"```\n{model_response['content']}\n```" |
|
conversation_state[model_name].append( |
|
{"role": "assistant", "content": model_response["content"]} |
|
) |
|
return formatted_response |
|
|
|
|
|
def save_content_to_hf(content, repo_name): |
|
""" |
|
Save feedback content to Hugging Face repository organized by month and year. |
|
|
|
Args: |
|
content (dict): Feedback data to be saved. |
|
month_year (str): Year and month string in the format "YYYY_MM". |
|
repo_name (str): Hugging Face repository name. |
|
""" |
|
|
|
token = HfFolder.get_token() |
|
if token is None: |
|
raise ValueError("Please log in to Hugging Face using `huggingface-cli login`.") |
|
|
|
|
|
json_content = json.dumps(content, indent=4).encode("utf-8") |
|
|
|
|
|
file_like_object = io.BytesIO(json_content) |
|
|
|
|
|
month_year = datetime.now().strftime("%Y_%m") |
|
day_hour_minute_second = datetime.now().strftime("%d_%H%M%S") |
|
|
|
|
|
filename = f"{month_year}/{day_hour_minute_second}.json" |
|
|
|
|
|
upload_file( |
|
path_or_fileobj=file_like_object, |
|
path_in_repo=filename, |
|
repo_id=repo_name, |
|
repo_type="dataset", |
|
use_auth_token=token, |
|
) |
|
|
|
|
|
def load_content_from_hf(repo_name="SE-Arena/votes"): |
|
""" |
|
Read feedback content from a Hugging Face repository based on the current month and year. |
|
|
|
Args: |
|
repo_name (str): Hugging Face repository name. |
|
|
|
Returns: |
|
list: Aggregated feedback data read from the repository. |
|
""" |
|
|
|
|
|
year_month = datetime.now().strftime("%Y_%m") |
|
feedback_data = [] |
|
|
|
try: |
|
api = HfApi() |
|
|
|
|
|
repo_files = api.list_repo_files(repo_id="SE-Arena/votes", repo_type="dataset") |
|
|
|
|
|
feedback_files = [file for file in repo_files if year_month in file] |
|
|
|
if not feedback_files: |
|
raise FileNotFoundError( |
|
f"No feedback files found for {year_month} in {repo_name}." |
|
) |
|
|
|
|
|
for file in feedback_files: |
|
local_path = hf_hub_download( |
|
repo_id=repo_name, filename=file, repo_type="dataset" |
|
) |
|
with open(local_path, "r") as f: |
|
data = json.load(f) |
|
if isinstance(data, list): |
|
feedback_data.extend(data) |
|
elif isinstance(data, dict): |
|
feedback_data.append(data) |
|
|
|
return feedback_data |
|
|
|
except: |
|
raise Exception("Error loading feedback data from Hugging Face repository.") |
|
|
|
|
|
def get_leaderboard_data(): |
|
|
|
try: |
|
feedback_data = load_content_from_hf() |
|
feedback_df = pd.DataFrame(feedback_data) |
|
except: |
|
|
|
return pd.DataFrame( |
|
columns=[ |
|
"Rank", |
|
"Model", |
|
"Elo Score", |
|
"Average Win Rate", |
|
"Bradley-Terry Coefficient", |
|
"Eigenvector Centrality Value", |
|
"PageRank Score", |
|
"Newman Modularity Score", |
|
] |
|
) |
|
|
|
feedback_df["winner"] = feedback_df["winner"].map( |
|
{ |
|
"left": evalica.Winner.X, |
|
"right": evalica.Winner.Y, |
|
"tie": evalica.Winner.Draw, |
|
} |
|
) |
|
|
|
|
|
avr_result = evalica.average_win_rate( |
|
feedback_df["left"], feedback_df["right"], feedback_df["winner"] |
|
) |
|
bt_result = evalica.bradley_terry( |
|
feedback_df["left"], feedback_df["right"], feedback_df["winner"] |
|
) |
|
newman_result = evalica.newman( |
|
feedback_df["left"], feedback_df["right"], feedback_df["winner"] |
|
) |
|
eigen_result = evalica.eigen( |
|
feedback_df["left"], feedback_df["right"], feedback_df["winner"] |
|
) |
|
elo_result = evalica.elo( |
|
feedback_df["left"], feedback_df["right"], feedback_df["winner"] |
|
) |
|
pagerank_result = evalica.pagerank( |
|
feedback_df["left"], feedback_df["right"], feedback_df["winner"] |
|
) |
|
|
|
|
|
ranking_df = pd.DataFrame( |
|
{ |
|
"Model": elo_result.scores.index, |
|
"Elo Score": elo_result.scores.values, |
|
"Average Win Rate": avr_result.scores.values * 100, |
|
"Bradley-Terry Coefficient": bt_result.scores.values, |
|
"Eigenvector Centrality Value": eigen_result.scores.values, |
|
"PageRank Score": pagerank_result.scores.values, |
|
"Newman Modularity Score": newman_result.scores.values, |
|
} |
|
) |
|
|
|
|
|
ranking_df["Rank"] = ( |
|
ranking_df["Elo Score"].rank(ascending=False, method="min").astype(int) |
|
) |
|
|
|
|
|
ranking_df = ranking_df.round( |
|
{ |
|
"Elo Score": 2, |
|
"Average Win Rate": 2, |
|
"Bradley-Terry Coefficient": 2, |
|
"Eigenvector Centrality Value": 2, |
|
"PageRank Score": 2, |
|
"Newman Modularity Score": 2, |
|
} |
|
) |
|
|
|
|
|
ranking_df = ranking_df.sort_values(by="Rank").reset_index(drop=True) |
|
|
|
ranking_df = ranking_df[ |
|
[ |
|
"Rank", |
|
"Model", |
|
"Elo Score", |
|
"Average Win Rate", |
|
"Bradley-Terry Coefficient", |
|
"Eigenvector Centrality Value", |
|
"PageRank Score", |
|
"Newman Modularity Score", |
|
] |
|
] |
|
|
|
return ranking_df |
|
|
|
|
|
|
|
def toggle_submit_button(text): |
|
if not text or text.strip() == "": |
|
return gr.update(interactive=False) |
|
else: |
|
return gr.update(interactive=True) |
|
|
|
|
|
|
|
with gr.Blocks() as app: |
|
user_authenticated = gr.State(False) |
|
models_state = gr.State({}) |
|
conversation_state = gr.State({}) |
|
|
|
with gr.Tab("🏆Leaderboard"): |
|
|
|
leaderboard_intro = gr.Markdown( |
|
""" |
|
# 🏆 Software Engineering Arena Leaderboard: Community-Driven Evaluation of Top SE Chatbots |
|
|
|
The Software Engineering (SE) Arena is an open-source platform designed to evaluate language models through human preference, fostering transparency and collaboration. Developed by researchers at [Software Analysis and Intelligence Lab (SAIL)](https://sail.cs.queensu.ca), the platform empowers the community to assess and compare the performance of leading foundation models in SE tasks. For technical details, check out our [paper](https://arxiv.org/abs/your-paper-link). |
|
""", |
|
elem_classes="leaderboard-intro", |
|
) |
|
|
|
leaderboard_component = Leaderboard( |
|
value=get_leaderboard_data(), |
|
select_columns=[ |
|
"Rank", |
|
"Model", |
|
"Elo Score", |
|
"Average Win Rate", |
|
], |
|
search_columns=["Model"], |
|
filter_columns=[ |
|
"Elo Score", |
|
"Average Win Rate", |
|
"Bradley-Terry Coefficient", |
|
"Eigenvector Centrality Value", |
|
"PageRank Score", |
|
"Newman Modularity Score", |
|
], |
|
) |
|
with gr.Tab("⚔️Arena"): |
|
|
|
arena_intro = gr.Markdown( |
|
""" |
|
# ⚔️ Software Engineering (SE) Arena: Explore and Test the Best SE Chatbots with Long-Context Interactions |
|
|
|
## 📜How It Works |
|
- **Blind Comparison**: Submit any software engineering-related query to two anonymous chatbots, including top models like ChatGPT, Gemini, Claude, Llama, and others. |
|
- **Interactive Voting**: Engage in multi-turn dialogues and compare responses. Continue the conversation until you're confident in choosing the better model. |
|
- **Fair Play Rules**: Votes are valid only when chatbot identities remain anonymous—revealed identities disqualify the session. |
|
|
|
**Note:** Due to budget constraints, responses that take longer than one minute to generate will be discarded. |
|
""", |
|
elem_classes="arena-intro", |
|
) |
|
|
|
with gr.Row(): |
|
|
|
markdown_text = "## Please sign in using the button on the right to vote!" |
|
if SHOW_HINT_STRING: |
|
markdown_text += f"\n{HINT_STRING}" |
|
hint_markdown = gr.Markdown(markdown_text, elem_classes="markdown-text") |
|
login_button = gr.Button( |
|
"Sign in with Hugging Face", elem_id="oauth-button" |
|
) |
|
|
|
|
|
shared_input = gr.Textbox( |
|
label="Enter your prompt for both models", |
|
lines=2, |
|
interactive=False, |
|
) |
|
send_first = gr.Button( |
|
"Submit", visible=True, interactive=False |
|
) |
|
|
|
|
|
shared_input.change( |
|
fn=toggle_submit_button, inputs=shared_input, outputs=send_first |
|
) |
|
|
|
user_prompt_md = gr.Markdown(value="", visible=False) |
|
|
|
with gr.Column(): |
|
shared_input |
|
user_prompt_md |
|
|
|
with gr.Row(): |
|
response_a_title = gr.Markdown(value="", visible=False) |
|
response_b_title = gr.Markdown(value="", visible=False) |
|
|
|
with gr.Row(): |
|
response_a = gr.Markdown(label="Response from Model A") |
|
response_b = gr.Markdown(label="Response from Model B") |
|
|
|
|
|
with gr.Row(visible=False) as timeout_popup: |
|
timeout_message = gr.Markdown( |
|
"### Timeout\n\nOne of the models did not respond within 1 minute. Please try again." |
|
) |
|
close_popup_btn = gr.Button("Okay") |
|
|
|
def close_timeout_popup(): |
|
|
|
shared_input_state = gr.update(interactive=True) |
|
send_first_state = toggle_submit_button(shared_input.value) |
|
|
|
model_a_input_state = gr.update(interactive=True) |
|
model_a_send_state = toggle_submit_button(model_a_input.value) |
|
|
|
model_b_input_state = gr.update(interactive=True) |
|
model_b_send_state = toggle_submit_button(model_b_input.value) |
|
|
|
return ( |
|
gr.update(visible=False), |
|
shared_input_state, |
|
send_first_state, |
|
model_a_input_state, |
|
model_a_send_state, |
|
model_b_input_state, |
|
model_b_send_state, |
|
) |
|
|
|
|
|
with gr.Row(visible=False) as multi_round_inputs: |
|
model_a_input = gr.Textbox(label="Model A Input", lines=1) |
|
model_a_send = gr.Button( |
|
"Send to Model A", interactive=False |
|
) |
|
|
|
model_b_input = gr.Textbox(label="Model B Input", lines=1) |
|
model_b_send = gr.Button( |
|
"Send to Model B", interactive=False |
|
) |
|
|
|
|
|
model_a_input.change( |
|
fn=toggle_submit_button, inputs=model_a_input, outputs=model_a_send |
|
) |
|
|
|
model_b_input.change( |
|
fn=toggle_submit_button, inputs=model_b_input, outputs=model_b_send |
|
) |
|
|
|
close_popup_btn.click( |
|
close_timeout_popup, |
|
inputs=[], |
|
outputs=[ |
|
timeout_popup, |
|
shared_input, |
|
send_first, |
|
model_a_input, |
|
model_a_send, |
|
model_b_input, |
|
model_b_send, |
|
], |
|
) |
|
|
|
|
|
def update_model_titles_and_responses( |
|
user_input, models_state, conversation_state |
|
): |
|
|
|
if len(available_models) < 2: |
|
raise ValueError( |
|
"Insufficient models in context_window.json. At least two are required." |
|
) |
|
selected_models = random.sample(available_models, 2) |
|
models = {"Model A": selected_models[0], "Model B": selected_models[1]} |
|
|
|
|
|
models_state.clear() |
|
models_state.update(models) |
|
conversation_state.clear() |
|
conversation_state.update({name: [] for name in models.values()}) |
|
|
|
try: |
|
response_a = chat_with_models( |
|
user_input, "Model A", models_state, conversation_state |
|
) |
|
response_b = chat_with_models( |
|
user_input, "Model B", models_state, conversation_state |
|
) |
|
except TimeoutError as e: |
|
|
|
return ( |
|
gr.update( |
|
value="", interactive=False, visible=True |
|
), |
|
gr.update(value="", visible=False), |
|
gr.update(value="", visible=False), |
|
gr.update(value="", visible=False), |
|
gr.update(value=""), |
|
gr.update(value=""), |
|
gr.update(visible=False), |
|
gr.update(visible=False), |
|
gr.update(visible=True, interactive=False), |
|
gr.update(interactive=False), |
|
models_state, |
|
conversation_state, |
|
gr.update(visible=True), |
|
) |
|
except Exception as e: |
|
raise gr.Error(str(e)) |
|
|
|
|
|
model_a_send_state = toggle_submit_button("") |
|
model_b_send_state = toggle_submit_button("") |
|
|
|
return ( |
|
gr.update(visible=False), |
|
gr.update( |
|
value=f"**Your Prompt:**\n\n{user_input}", visible=True |
|
), |
|
gr.update(value=f"### Model A:", visible=True), |
|
gr.update(value=f"### Model B:", visible=True), |
|
gr.update(value=response_a), |
|
gr.update(value=response_b), |
|
gr.update(visible=True), |
|
gr.update(visible=True), |
|
gr.update(visible=False), |
|
gr.update(interactive=True), |
|
models_state, |
|
conversation_state, |
|
gr.update(visible=False), |
|
model_a_send_state, |
|
model_b_send_state, |
|
) |
|
|
|
|
|
with gr.Row(visible=False) as vote_panel: |
|
feedback = gr.Radio( |
|
choices=["Model A", "Model B", "Can't Decide"], |
|
label="Which model do you prefer?", |
|
value="Can't Decide", |
|
interactive=False, |
|
) |
|
submit_feedback_btn = gr.Button("Submit Feedback", interactive=False) |
|
|
|
|
|
def handle_login(): |
|
""" |
|
Handle user login using Hugging Face OAuth with automatic redirection. |
|
""" |
|
try: |
|
|
|
HfApi() |
|
|
|
|
|
print( |
|
"Redirected to Hugging Face for authentication. Please complete the login." |
|
) |
|
token = HfFolder.get_token() |
|
if not token: |
|
raise Exception("Authentication token not found.") |
|
|
|
|
|
return ( |
|
gr.update(visible=False), |
|
gr.update(interactive=True), |
|
gr.update(interactive=True), |
|
gr.update(interactive=True), |
|
gr.update(interactive=True), |
|
gr.update(visible=False), |
|
) |
|
except Exception as e: |
|
|
|
print(f"Login failed: {e}") |
|
return ( |
|
gr.update(visible=True), |
|
gr.update(interactive=False), |
|
gr.update(interactive=False), |
|
gr.update( |
|
interactive=False |
|
), |
|
gr.update(interactive=False), |
|
gr.update(visible=True), |
|
) |
|
|
|
|
|
login_button.click( |
|
handle_login, |
|
inputs=[], |
|
outputs=[ |
|
login_button, |
|
shared_input, |
|
send_first, |
|
feedback, |
|
submit_feedback_btn, |
|
hint_markdown, |
|
], |
|
) |
|
|
|
|
|
send_first.click( |
|
update_model_titles_and_responses, |
|
inputs=[shared_input, models_state, conversation_state], |
|
outputs=[ |
|
shared_input, |
|
user_prompt_md, |
|
response_a_title, |
|
response_b_title, |
|
response_a, |
|
response_b, |
|
multi_round_inputs, |
|
vote_panel, |
|
send_first, |
|
feedback, |
|
models_state, |
|
conversation_state, |
|
timeout_popup, |
|
model_a_send, |
|
model_b_send, |
|
], |
|
) |
|
|
|
|
|
def handle_model_a_send(user_input, models_state, conversation_state): |
|
try: |
|
response = chat_with_models( |
|
user_input, "Model A", models_state, conversation_state |
|
) |
|
|
|
return ( |
|
response, |
|
conversation_state, |
|
gr.update(visible=False), |
|
gr.update( |
|
value="", interactive=True |
|
), |
|
gr.update(interactive=False), |
|
) |
|
except TimeoutError as e: |
|
|
|
return ( |
|
gr.update(value=""), |
|
conversation_state, |
|
gr.update(visible=True), |
|
gr.update(interactive=False), |
|
gr.update(interactive=False), |
|
) |
|
except Exception as e: |
|
raise gr.Error(str(e)) |
|
|
|
def handle_model_b_send(user_input, models_state, conversation_state): |
|
try: |
|
response = chat_with_models( |
|
user_input, "Model B", models_state, conversation_state |
|
) |
|
|
|
return ( |
|
response, |
|
conversation_state, |
|
gr.update(visible=False), |
|
gr.update( |
|
value="", interactive=True |
|
), |
|
gr.update(interactive=False), |
|
) |
|
except TimeoutError as e: |
|
|
|
return ( |
|
gr.update(value=""), |
|
conversation_state, |
|
gr.update(visible=True), |
|
gr.update(interactive=False), |
|
gr.update(interactive=False), |
|
) |
|
except Exception as e: |
|
raise gr.Error(str(e)) |
|
|
|
model_a_send.click( |
|
handle_model_a_send, |
|
inputs=[model_a_input, models_state, conversation_state], |
|
outputs=[ |
|
response_a, |
|
conversation_state, |
|
timeout_popup, |
|
model_a_input, |
|
model_a_send, |
|
], |
|
) |
|
model_b_send.click( |
|
handle_model_b_send, |
|
inputs=[model_b_input, models_state, conversation_state], |
|
outputs=[ |
|
response_b, |
|
conversation_state, |
|
timeout_popup, |
|
model_b_input, |
|
model_b_send, |
|
], |
|
) |
|
|
|
def submit_feedback(vote, models_state, conversation_state): |
|
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") |
|
|
|
|
|
match vote: |
|
case "Model A": |
|
winner_model = "left" |
|
case "Model B": |
|
winner_model = "right" |
|
case "Can't Decide": |
|
winner_model = "tie" |
|
|
|
|
|
feedback_entry = { |
|
"left": models_state["Model A"], |
|
"right": models_state["Model B"], |
|
"winner": winner_model, |
|
"timestamp": timestamp, |
|
} |
|
|
|
|
|
save_content_to_hf(feedback_entry, "SE-Arena/votes") |
|
|
|
|
|
save_content_to_hf(conversation_state, "SE-Arena/conversations") |
|
|
|
|
|
models_state.clear() |
|
conversation_state.clear() |
|
|
|
|
|
leaderboard_data = get_leaderboard_data() |
|
|
|
|
|
return ( |
|
gr.update( |
|
value="", interactive=True, visible=True |
|
), |
|
gr.update(value="", visible=False), |
|
gr.update(value="", visible=False), |
|
gr.update(value="", visible=False), |
|
gr.update(value=""), |
|
gr.update(value=""), |
|
gr.update(visible=False), |
|
gr.update(visible=False), |
|
gr.update( |
|
value="Submit", interactive=True, visible=True |
|
), |
|
gr.update( |
|
value="Can't Decide", interactive=True |
|
), |
|
leaderboard_data, |
|
) |
|
|
|
|
|
submit_feedback_btn.click( |
|
submit_feedback, |
|
inputs=[feedback, models_state, conversation_state], |
|
outputs=[ |
|
shared_input, |
|
user_prompt_md, |
|
response_a_title, |
|
response_b_title, |
|
response_a, |
|
response_b, |
|
multi_round_inputs, |
|
vote_panel, |
|
send_first, |
|
feedback, |
|
leaderboard_component, |
|
], |
|
) |
|
|
|
|
|
terms_of_service = gr.Markdown( |
|
""" |
|
## Terms of Service |
|
|
|
Users are required to agree to the following terms before using the service: |
|
|
|
- The service is a **research preview**. It only provides limited safety measures and may generate offensive content. |
|
- It must not be used for any illegal, harmful, violent, racist, or sexual purposes. |
|
- Please **do not upload any private information**. |
|
- The service collects user dialogue data, including both text and images, and reserves the right to distribute it under a **Creative Commons Attribution (CC-BY)** or a similar license. |
|
""" |
|
) |
|
|
|
app.launch() |
|
|