MESReport / utils /completion_reward.py
ChenyuRabbitLove's picture
feat: rename ntu img to ruby
882ffef
import logging
import json
import time
import io
import os
import re
import requests
import textwrap
import random
import hashlib
from datetime import datetime
from PIL import Image, ImageDraw, ImageFilter, ImageFont
import anthropic_bedrock
import gradio as gr
from opencc import OpenCC
from openai import OpenAI
from anthropic_bedrock import AnthropicBedrock, HUMAN_PROMPT, AI_PROMPT
from google.auth.transport.requests import Request
from google.oauth2.service_account import Credentials
from google import auth
from google.cloud import bigquery
from google.cloud import storage
SERVICE_ACCOUNT_INFO = os.getenv("GBQ_TOKEN")
SCOPES = ["https://www.googleapis.com/auth/cloud-platform"]
service_account_info_dict = json.loads(SERVICE_ACCOUNT_INFO)
creds = Credentials.from_service_account_info(service_account_info_dict, scopes=SCOPES)
gbq_client = bigquery.Client(
credentials=creds, project=service_account_info_dict["project_id"]
)
gcs_client = storage.Client(
credentials=creds, project=service_account_info_dict["project_id"]
)
class CompletionReward:
def __init__(self):
self.player_backend_user_id = None
self.player_name = None
self.background_url = None
self.player_selected_character = None
self.player_selected_model = None
self.player_selected_paragraph = None
self.paragraph_openai = None
self.paragraph_aws = None
self.paragraph_google = None
self.paragraph_mtk = None
self.paragraph_ntu = None
self.player_certificate_url = None
self.openai_agent = OpenAIAgent()
self.aws_agent = AWSAgent()
self.google_agent = GoogleAgent()
self.mtk_agent = MTKAgent()
self.ntu_agent = NTUAgent()
self.agents_responses = {}
self.agent_list = [
self.openai_agent,
self.aws_agent,
self.google_agent,
self.mtk_agent,
self.ntu_agent,
]
self.shuffled_response_order = {}
self.pop_response_order = []
self.response_time_map = {}
def get_llm_response_once(self, player_logs):
if self.agent_list:
# Randomly select and remove an agent from the list
agent = self.agent_list.pop(random.randint(0, len(self.agent_list) - 1))
else:
return "No agents left", None
story, response_time = agent.get_story(player_logs)
self.agents_responses[agent.name] = story
self.pop_response_order.append(agent.name)
self.response_time_map[agent.name] = response_time
if len(self.pop_response_order) == 5:
self.shuffled_response_order = {
str(index): agent for index, agent in enumerate(self.pop_response_order)
}
self.paragraph_openai = self.agents_responses["openai"]
self.paragraph_aws = self.agents_responses["aws"]
self.paragraph_google = self.agents_responses["google"]
self.paragraph_mtk = self.agents_responses["mtk"]
self.paragraph_ntu = self.agents_responses["ntu"]
return [(None, story)]
def set_player_name(self, player_name, player_backend_user_id):
self.player_backend_user_id = player_backend_user_id
self.player_name = player_name
def set_background_url(self, background_url):
self.background_url = background_url
def set_player_backend_user_id(self, player_backend_user_id):
self.player_backend_user_id = player_backend_user_id
def set_player_selected_character(self, player_selected_character):
character_map = {
"露米娜": "0",
"索拉拉": "1",
"薇丹特": "2",
"蔚藍": "3",
"紅寶石": "4",
}
self.player_selected_character = player_selected_character
self.player_selected_model = self.shuffled_response_order[
character_map[player_selected_character]
]
self.player_selected_paragraph = self.get_paragraph_by_model(
self.player_selected_model
)
def get_paragraph_by_model(self, model):
return getattr(self, f"paragraph_{model}", None)
def create_certificate(self):
image_url = self.openai_agent.get_background()
self.set_background_url(image_url)
source_file = ImageProcessor.generate_reward(
image_url,
self.player_name,
self.player_selected_paragraph,
self.player_backend_user_id,
)
public_url = self.upload_blob_and_get_public_url(
"mes_completion_rewards", source_file, f"2023_mes/{source_file}"
)
self.player_certificate_url = public_url
return gr.Image(public_url, visible=True, elem_id="certificate")
def to_dict(self):
return {
"player_backend_user_id": self.player_backend_user_id,
"player_name": self.player_name,
"background_url": self.background_url,
"player_selected_model": self.player_selected_model,
"player_selected_paragraph": self.player_selected_paragraph,
"paragraph_openai": self.paragraph_openai,
"paragraph_aws": self.paragraph_aws,
"paragraph_google": self.paragraph_google,
"paragraph_mtk": self.paragraph_mtk,
"paragraph_ntu": self.paragraph_ntu,
"response_time_openai": self.response_time_map["openai"],
"response_time_aws": self.response_time_map["aws"],
"response_time_google": self.response_time_map["google"],
"response_time_mtk": self.response_time_map["mtk"],
"response_time_ntu": self.response_time_map["ntu"],
"player_certificate_url": self.player_certificate_url,
"created_at": datetime.now(),
}
def insert_data_into_bigquery(self, client, dataset_id, table_id, rows_to_insert):
table_ref = client.dataset(dataset_id).table(table_id)
table = client.get_table(table_ref)
errors = client.insert_rows(table, rows_to_insert)
if errors:
logging.info("Errors occurred while inserting rows:")
for error in errors:
print(error)
else:
logging.info(f"Inserted {len(rows_to_insert)} rows successfully.")
def complete_reward(
self,
):
insert_row = self.to_dict()
self.insert_data_into_bigquery(
gbq_client, "streaming_log", "log_mes_completion_rewards", [insert_row]
)
logging.info(
f"Player {insert_row['player_backend_user_id']} rendered successfully."
)
with open("./data/completion_reward_issue_status.json") as f:
completion_reward_issue_status_dict = json.load(f)
completion_reward_issue_status_dict[
insert_row["player_backend_user_id"]
] = self.player_certificate_url
with open("./data/completion_reward_issue_status.json", "w") as f:
json.dump(completion_reward_issue_status_dict, f)
def upload_blob_and_get_public_url(
self, bucket_name, source_file_name, destination_blob_name
):
"""Uploads a file to the bucket and makes it publicly accessible."""
# Initialize a storage client
bucket = gcs_client.bucket(bucket_name)
blob = bucket.blob(destination_blob_name)
# Upload the file
blob.upload_from_filename(source_file_name)
# The public URL can be used to directly access the uploaded file via HTTP
public_url = blob.public_url
logging.info(f"File {source_file_name} uploaded to {destination_blob_name}.")
return public_url
class OpenAIAgent:
def __init__(self):
self.name = "openai"
self.temperature = 0.8
self.frequency_penalty = 0
self.presence_penalty = 0
self.max_tokens = 2048
def get_story(self, user_log):
system_prompt = """
我正在舉辦一個學習型的活動,我為學生設計了一個獨特的故事機制,每天每個學生都會收到屬於自己獨特的冒險紀錄,現在我需要你協助我將這些冒險紀錄,製作成一段冒險故事,請
- 以「你」稱呼學生
- 可以裁減內容以將內容限制在 500 字內
- 試著合併故事記錄成一段連貫、有吸引力的故事
- 請勿突然中斷故事,請讓故事有一個完整的結局
- 請使用 zh_TW
- 請直接回覆故事內容,不需要回覆任何訊息
"""
user_log = f"""
```{user_log}
```
"""
messages = [
{
"role": "system",
"content": f"{system_prompt}",
},
{
"role": "user",
"content": f"{user_log}",
},
]
client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
response = None
retry_attempts = 0
while retry_attempts < 5:
start_time = time.time()
try:
response = client.chat.completions.create(
model="gpt-4-1106-preview",
messages=messages,
temperature=self.temperature,
max_tokens=self.max_tokens,
frequency_penalty=self.frequency_penalty,
presence_penalty=self.presence_penalty,
)
chinese_converter = OpenCC("s2tw")
self.openai_response_time = time.time() - start_time
return chinese_converter.convert(response.choices[0].message.content), self.openai_response_time
except Exception as e:
retry_attempts += 1
logging.error(f"OpenAI Attempt {retry_attempts}: {e}")
time.sleep(1 * retry_attempts)
self.openai_response_time = time.time() - start_time
return '星際夥伴短時間內寫了太多故事,需要休息一下,請稍後再試,或是選擇其他星際夥伴的故事。', self.openai_response_time
def get_background(self):
client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
image_url = None
retry_attempts = 0
while retry_attempts < 5:
try:
logging.info("Generating image...")
response = client.images.generate(
model="dall-e-3",
prompt="Create an image in a retro Ghibli style, with a focus on a universe theme. The artwork should maintain the traditional hand-drawn animation look characteristic of Ghibli and with vibrant color. Imagine a scene set in outer space or a fantastical cosmic environment, rich with vibrant and varied color palettes to capture the mystery and majesty of the universe. The background should be detailed, showcasing stars, planets, and nebulae, blending the Ghibli style's nostalgia and emotional depth with the awe-inspiring aspects of space. The overall feel should be timeless, merging the natural wonder of the cosmos with the storytelling and emotional resonance typical of the retro Ghibli aesthetic. Soft lighting and gentle shading should be used to enhance the dreamlike, otherworldly quality of the scene.",
size="1024x1024",
quality="standard",
n=1,
)
image_url = response.data[0].url
return image_url
except Exception as e:
retry_attempts += 1
logging.error(f"DALLE Attempt {retry_attempts}: {e}")
time.sleep(1 * retry_attempts) # exponential backoff
class AWSAgent:
def __init__(self):
self.name = "aws"
def get_story(self, user_log):
system_prompt = """
我正在舉辦一個學習型的活動,我為學生設計了一個獨特的故事機制,每天每個學生都會收到屬於自己獨特的冒險紀錄,現在我需要你協助我將這些冒險紀錄,製作成一段冒險故事,請
- 以「你」稱呼學生
- 可以裁減內容以將內容限制在 500 字內
- 試著合併故事記錄成一段連貫、有吸引力的故事
- 請勿突然中斷故事,請讓故事有一個完整的結局
- 請使用 zh_TW
- 請直接回覆故事內容,不需要回覆任何訊息
"""
user_log = f"""
```{user_log}
```
"""
client = AnthropicBedrock(
aws_access_key=os.getenv("AWS_ACCESS_KEY"),
aws_secret_key=os.getenv("AWS_SECRET_KEY"),
aws_region="us-west-2",
)
retry_attempts = 0
while retry_attempts < 5:
try:
start_time = time.time()
completion = client.completions.create(
model="anthropic.claude-v2",
max_tokens_to_sample=2048,
prompt=f"{anthropic_bedrock.HUMAN_PROMPT}{system_prompt},以下是我的故事紀錄```{user_log}``` {anthropic_bedrock.AI_PROMPT}",
)
chinese_converter = OpenCC("s2tw")
self.aws_response_time = time.time() - start_time
return chinese_converter.convert(completion.completion), self.aws_response_time
except Exception as e:
retry_attempts += 1
logging.error(f"AWS Attempt {retry_attempts}: {e}")
time.sleep(1 * retry_attempts)
self.aws_response_time = time.time() - start_time
return '星際夥伴短時間內寫了太多故事,需要休息一下,請稍後再試,或是選擇其他星際夥伴的故事。', self.aws_response_time
class GoogleAgent:
from google.cloud import aiplatform
from vertexai.preview.generative_models import GenerativeModel
SERVICE_ACCOUNT_INFO = os.getenv("GBQ_TOKEN")
service_account_info_dict = json.loads(SERVICE_ACCOUNT_INFO)
SCOPES = ["https://www.googleapis.com/auth/cloud-platform"]
creds = Credentials.from_service_account_info(
service_account_info_dict, scopes=SCOPES
)
aiplatform.init(
project="junyiacademy",
service_account=service_account_info_dict,
credentials=creds,
)
gemini_pro_model = GenerativeModel("gemini-pro")
def __init__(self):
self.name = "google"
def get_story(self, user_log):
system_prompt = """
我正在舉辦一個學習型的活動,我為學生設計了一個獨特的故事機制,每天每個學生都會收到屬於自己獨特的冒險紀錄,現在我需要你協助我將這些冒險紀錄,製作成一段冒險故事,請
- 以「你」稱呼學生
- 可以裁減內容以將內容限制在 500 字內
- 試著合併故事記錄成一段連貫、有吸引力的故事
- 請勿突然中斷故事,請讓故事有一個完整的結局
- 請使用 zh_TW
- 請直接回覆故事內容,不需要回覆任何訊息
"""
user_log = f"""
```{user_log}
```
"""
retry_attempts = 0
while retry_attempts < 5:
try:
start_time = time.time()
logging.info("Google Generating response...")
model_response = self.gemini_pro_model.generate_content(
f"{system_prompt}, 以下是我的冒險故事 ```{user_log}```"
)
chinese_converter = OpenCC("s2tw")
self.google_response_time = time.time() - start_time
return chinese_converter.convert(
model_response.candidates[0].content.parts[0].text
), self.google_response_time
except Exception as e:
retry_attempts += 1
logging.error(f"Google Attempt {retry_attempts}: {e}")
time.sleep(1 * retry_attempts)
self.google_response_time = time.time() - start_time
return '星際夥伴短時間內寫了太多故事,需要休息一下,請稍後再試,或是選擇其他星際夥伴的故事。', self.google_response_time
class MTKAgent:
def __init__(self):
self.name = "mtk"
def get_story(self, user_log):
system_prompt = """
我正在舉辦一個學習型的活動,我為學生設計了一個獨特的故事機制,每天每個學生都會收到屬於自己獨特的冒險紀錄,現在我需要你協助我將這些冒險紀錄,製作成一段冒險故事,請
- 以「你」稱呼學生
- 可以裁減內容以將內容限制在 500 字內
- 試著合併故事記錄成一段連貫、有吸引力的故事
- 請勿突然中斷故事,請讓故事有一個完整的結局
- 請使用 zh_TW
- 請直接回覆故事內容,不需要回覆任何訊息
"""
user_log = f"""
```{user_log}
```
"""
BASE_URL = "http://35.229.245.251:8008/v1"
TOKEN = os.getenv("MTK_TOKEN")
MODEL_NAME = "model7-c-chat"
TEMPERATURE = 1
MAX_TOKENS = 1024
TOP_P = 0
PRESENCE_PENALTY = 0
FREQUENCY_PENALTY = 0
message = f"{system_prompt}, 以下是我的冒險故事 ```{user_log}```"
url = os.path.join(BASE_URL, "chat/completions")
headers = {
"accept": "application/json",
"Authorization": f"Bearer {TOKEN}",
"Content-Type": "application/json",
}
data = {
"model": MODEL_NAME,
"messages": str(message),
"temperature": TEMPERATURE,
"n": 1,
"max_tokens": MAX_TOKENS,
"stop": "",
"top_p": TOP_P,
"logprobs": 0,
"echo": False,
"presence_penalty": PRESENCE_PENALTY,
"frequency_penalty": FREQUENCY_PENALTY,
}
retry_attempts = 0
while retry_attempts < 5:
try:
start_time = time.time()
response = requests.post(
url, headers=headers, data=json.dumps(data)
).json()
response_text = response["choices"][0]["message"]["content"]
matched_contents = re.findall("```(.*?)```", response_text, re.DOTALL)
# Concatenate all extracted contents
extracted_content = "\n".join(matched_contents).strip()
chinese_converter = OpenCC("s2tw")
self.mtk_response_time = time.time() - start_time
if extracted_content:
return chinese_converter.convert(extracted_content), self.mtk_response_time
else:
return chinese_converter.convert(response_text), self.mtk_response_time
except Exception as e:
retry_attempts += 1
logging.error(f"MTK Attempt {retry_attempts}: {e}")
time.sleep(1 * retry_attempts)
self.mtk_response_time = time.time() - start_time
return '星際夥伴短時間內寫了太多故事,需要休息一下,請稍後再試,或是選擇其他星際夥伴的故事。', self.mtk_response_time
class NTUAgent:
def __init__(self):
self.name = "ntu"
def get_story(self, user_log):
system_prompt = """
我正在舉辦一個學習型的活動,我為學生設計了一個獨特的故事機制,每天每個學生都會收到屬於自己獨特的冒險紀錄,現在我需要你協助我將這些冒險紀錄,製作成一段冒險故事,請
- 以「你」稱呼學生
- 可以裁減內容以將內容限制在 500 字內
- 試著合併故事記錄成一段連貫、有吸引力的故事
- 請勿突然中斷故事,請讓故事有一個完整的結局
- 請使用 zh_TW
- 請直接回覆故事內容,不需要回覆任何訊息
"""
user_log = f"""
```{user_log}
```
"""
messages = [
{
"role": "system",
"content": f"{system_prompt}",
},
{
"role": "user",
"content": f"{user_log}",
},
]
url = 'http://api.twllm.com:20002/v1/chat/completions'
data = {
"model": "yentinglin/Taiwan-LLM-13B-v2.0-chat",
"messages": messages,
"temperature": 0.7,
"top_p": 1,
"n": 1,
"max_tokens": 2048,
"stop": ["string"],
"stream": False,
"presence_penalty": 0,
"frequency_penalty": 0,
"user": "string",
"best_of": 1,
"top_k": -1,
"ignore_eos": False,
"use_beam_search": False,
"stop_token_ids": [0],
"skip_special_tokens": True,
"spaces_between_special_tokens": True,
"add_generation_prompt": True,
"echo": False,
"repetition_penalty": 1,
"min_p": 0
}
headers = {
'accept': 'application/json',
'Content-Type': 'application/json'
}
retry_attempts = 0
while retry_attempts < 5:
try:
start_time = time.time()
response = requests.post(url, headers=headers, data=json.dumps(data)).json()
response_text = response["choices"][0]["message"]["content"]
matched_contents = re.findall("```(.*?)```", response_text, re.DOTALL)
# Concatenate all extracted contents
extracted_content = "\n".join(matched_contents).strip()
chinese_converter = OpenCC("s2tw")
self.ntu_response_time = time.time() - start_time
logging.warning(f"NTU response time: {self.ntu_response_time}")
if extracted_content:
return chinese_converter.convert(extracted_content), self.ntu_response_time
else:
return chinese_converter.convert(response_text), self.ntu_response_time
except Exception as e:
retry_attempts += 1
logging.error(f"NTU Attempt {retry_attempts}: {e}")
time.sleep(1 * retry_attempts)
self.ntu_response_time = time.time() - start_time
return '星際夥伴短時間內寫了太多故事,需要休息一下,請稍後再試,或是選擇其他星際夥伴的故事。', self.ntu_response_time
class ImageProcessor:
@staticmethod
def draw_shadow(
image, box, radius, offset=(10, 10), shadow_color=(0, 0, 0, 128), blur_radius=5
):
shadow_image = Image.new("RGBA", image.size, (0, 0, 0, 0))
shadow_draw = ImageDraw.Draw(shadow_image)
shadow_box = [
box[0] + offset[0],
box[1] + offset[1],
box[2] + offset[0],
box[3] + offset[1],
]
shadow_draw.rounded_rectangle(shadow_box, fill=shadow_color, radius=radius)
shadow_image = shadow_image.filter(ImageFilter.GaussianBlur(blur_radius))
image.paste(shadow_image, (0, 0), shadow_image)
@staticmethod
def generate_reward(url, player_name, paragraph, player_backend_user_id):
retry_attempts = 0
while retry_attempts < 5:
try:
response = requests.get(url)
break
except requests.RequestException as e:
retry_attempts += 1
logging.error(f"Attempt {retry_attempts}: {e}")
time.sleep(1 * retry_attempts) # exponential backoff
image_bytes = io.BytesIO(response.content)
img = Image.open(image_bytes)
tmp_img = Image.new("RGBA", img.size, (0, 0, 0, 0))
draw = ImageDraw.Draw(tmp_img)
# Draw the text
title_font = ImageFont.truetype("NotoSansTC-Bold.ttf", 34)
body_font = ImageFont.truetype("NotoSansTC-Light.ttf", 14)
# Calculate space required by the paragraph
paragraph_height = 0
for line in paragraph.split("\n"):
wrapped_lines = textwrap.wrap(line, width=63)
for wrapped_line in wrapped_lines:
_, _, _, line_height = draw.textbbox(
(0, 0), wrapped_line, font=body_font
)
paragraph_height += line_height + 10
# Draw the box
padding = 40
left, right = 50, img.width - 50
box_height = min(800, paragraph_height + padding)
top = (img.height - box_height) // 2
bottom = (img.height + box_height) // 2
border_radius = 20
# Draw the rounded rectangle
fill_color = (255, 255, 255, 200)
draw.rounded_rectangle(
[left, top, right, bottom],
fill=fill_color,
outline=None,
radius=border_radius,
)
img.paste(Image.alpha_composite(img.convert("RGBA"), tmp_img), (0, 0), tmp_img)
draw = ImageDraw.Draw(img)
# Title text
title = f"光束守護者 - {player_name} 的冒險故事"
title_x, title_y = left + 20, top + 20 # Adjust padding as needed
draw.text((title_x, title_y), title, font=title_font, fill="black")
# Paragraph text with newlines
body_x, body_y = left + 20, title_y + 60 # Adjust position as needed
for line in paragraph.split("\n"):
wrapped_lines = textwrap.wrap(line, width=63)
for wrapped_line in wrapped_lines:
draw.text((body_x, body_y), wrapped_line, font=body_font, fill="black")
body_y += 25
# Save the image with the text
def get_md5_hash(text):
return hashlib.md5(text.encode("utf-8")).hexdigest()
updated_image_path = f"certificate_{get_md5_hash(player_backend_user_id)}.png"
img.save(updated_image_path)
return updated_image_path