Spaces:
Running
on
Zero
Running
on
Zero
import os | |
from dotenv import load_dotenv | |
import gradio as gr | |
from huggingface_hub import InferenceClient | |
import pandas as pd | |
import json | |
from datetime import datetime | |
import torch | |
from transformers import AutoModelForCausalLM, AutoTokenizer | |
# ํ๊ฒฝ ๋ณ์ ์ค์ | |
HF_TOKEN = os.getenv("HF_TOKEN") | |
MODEL_ID = "CohereForAI/c4ai-command-r7b-12-2024" | |
from transformers import pipeline | |
class ModelManager: | |
def __init__(self): | |
self.pipe = None | |
self.setup_pipeline() | |
def setup_pipeline(self): | |
try: | |
print("ํ์ดํ๋ผ์ธ ์ด๊ธฐํ ์์...") | |
self.pipe = pipeline( | |
"text-generation", | |
model=MODEL_ID, | |
token=HF_TOKEN, | |
device_map="auto", | |
torch_dtype=torch.float16 | |
) | |
print("ํ์ดํ๋ผ์ธ ์ด๊ธฐํ ์๋ฃ") | |
except Exception as e: | |
print(f"ํ์ดํ๋ผ์ธ ์ด๊ธฐํ ์ค ์ค๋ฅ ๋ฐ์: {e}") | |
raise Exception(f"ํ์ดํ๋ผ์ธ ์ด๊ธฐํ ์คํจ: {e}") | |
def generate_response(self, messages, max_tokens=4000, temperature=0.7, top_p=0.9): | |
try: | |
# ๋ฉ์์ง ํ์ ๋ณํ | |
prompt = "" | |
for msg in messages: | |
role = msg["role"] | |
content = msg["content"] | |
if role == "system": | |
prompt += f"System: {content}\n" | |
elif role == "user": | |
prompt += f"User: {content}\n" | |
elif role == "assistant": | |
prompt += f"Assistant: {content}\n" | |
# ์๋ต ์์ฑ | |
response = self.pipe( | |
prompt, | |
max_new_tokens=max_tokens, | |
temperature=temperature, | |
top_p=top_p, | |
do_sample=True, | |
num_return_sequences=1, | |
pad_token_id=self.pipe.tokenizer.eos_token_id | |
) | |
# ์๋ต ํ ์คํธ ์ถ์ถ ๋ฐ ์คํธ๋ฆฌ๋ฐ ์๋ฎฌ๋ ์ด์ | |
generated_text = response[0]['generated_text'][len(prompt):].strip() | |
words = generated_text.split() | |
# ๋จ์ด ๋จ์๋ก ์คํธ๋ฆฌ๋ฐ | |
partial_response = "" | |
for word in words: | |
partial_response += word + " " | |
yield type('Response', (), { | |
'choices': [type('Choice', (), { | |
'delta': {'content': word + " "} | |
})()] | |
})() | |
except Exception as e: | |
raise Exception(f"์๋ต ์์ฑ ์คํจ: {e}") | |
class ChatHistory: | |
def __init__(self): | |
self.history = [] | |
self.history_file = "/tmp/chat_history.json" | |
self.load_history() | |
def add_conversation(self, user_msg: str, assistant_msg: str): | |
conversation = { | |
"timestamp": datetime.now().isoformat(), | |
"messages": [ | |
{"role": "user", "content": user_msg}, | |
{"role": "assistant", "content": assistant_msg} | |
] | |
} | |
self.history.append(conversation) | |
self.save_history() | |
def format_for_display(self): | |
formatted = [] | |
for conv in self.history: | |
formatted.append([ | |
conv["messages"][0]["content"], | |
conv["messages"][1]["content"] | |
]) | |
return formatted | |
def get_messages_for_api(self): | |
messages = [] | |
for conv in self.history: | |
messages.extend([ | |
{"role": "user", "content": conv["messages"][0]["content"]}, | |
{"role": "assistant", "content": conv["messages"][1]["content"]} | |
]) | |
return messages | |
def clear_history(self): | |
self.history = [] | |
self.save_history() | |
def save_history(self): | |
try: | |
with open(self.history_file, 'w', encoding='utf-8') as f: | |
json.dump(self.history, f, ensure_ascii=False, indent=2) | |
except Exception as e: | |
print(f"ํ์คํ ๋ฆฌ ์ ์ฅ ์คํจ: {e}") | |
def load_history(self): | |
try: | |
if os.path.exists(self.history_file): | |
with open(self.history_file, 'r', encoding='utf-8') as f: | |
self.history = json.load(f) | |
except Exception as e: | |
print(f"ํ์คํ ๋ฆฌ ๋ก๋ ์คํจ: {e}") | |
self.history = [] | |
# ์ ์ญ ์ธ์คํด์ค ์์ฑ | |
chat_history = ChatHistory() | |
model_manager = ModelManager() | |
def get_client(): | |
return InferenceClient(MODEL_ID, token=HF_TOKEN) | |
def analyze_file_content(content, file_type): | |
"""Analyze file content and return structural summary""" | |
if file_type in ['parquet', 'csv']: | |
try: | |
lines = content.split('\n') | |
header = lines[0] | |
columns = header.count('|') - 1 | |
rows = len(lines) - 3 | |
return f"๐ ๋ฐ์ดํฐ์ ๊ตฌ์กฐ: {columns}๊ฐ ์ปฌ๋ผ, {rows}๊ฐ ๋ฐ์ดํฐ" | |
except: | |
return "โ ๋ฐ์ดํฐ์ ๊ตฌ์กฐ ๋ถ์ ์คํจ" | |
lines = content.split('\n') | |
total_lines = len(lines) | |
non_empty_lines = len([line for line in lines if line.strip()]) | |
if any(keyword in content.lower() for keyword in ['def ', 'class ', 'import ', 'function']): | |
functions = len([line for line in lines if 'def ' in line]) | |
classes = len([line for line in lines if 'class ' in line]) | |
imports = len([line for line in lines if 'import ' in line or 'from ' in line]) | |
return f"๐ป ์ฝ๋ ๊ตฌ์กฐ: {total_lines}์ค (ํจ์: {functions}, ํด๋์ค: {classes}, ์ํฌํธ: {imports})" | |
paragraphs = content.count('\n\n') + 1 | |
words = len(content.split()) | |
return f"๐ ๋ฌธ์ ๊ตฌ์กฐ: {total_lines}์ค, {paragraphs}๋จ๋ฝ, ์ฝ {words}๋จ์ด" | |
def read_uploaded_file(file): | |
if file is None: | |
return "", "" | |
try: | |
file_ext = os.path.splitext(file.name)[1].lower() | |
if file_ext == '.parquet': | |
df = pd.read_parquet(file.name, engine='pyarrow') | |
content = df.head(10).to_markdown(index=False) | |
return content, "parquet" | |
elif file_ext == '.csv': | |
encodings = ['utf-8', 'cp949', 'euc-kr', 'latin1'] | |
for encoding in encodings: | |
try: | |
df = pd.read_csv(file.name, encoding=encoding) | |
content = f"๐ ๋ฐ์ดํฐ ๋ฏธ๋ฆฌ๋ณด๊ธฐ:\n{df.head(10).to_markdown(index=False)}\n\n" | |
content += f"\n๐ ๋ฐ์ดํฐ ์ ๋ณด:\n" | |
content += f"- ์ ์ฒด ํ ์: {len(df)}\n" | |
content += f"- ์ ์ฒด ์ด ์: {len(df.columns)}\n" | |
content += f"- ์ปฌ๋ผ ๋ชฉ๋ก: {', '.join(df.columns)}\n" | |
content += f"\n๐ ์ปฌ๋ผ ๋ฐ์ดํฐ ํ์ :\n" | |
for col, dtype in df.dtypes.items(): | |
content += f"- {col}: {dtype}\n" | |
null_counts = df.isnull().sum() | |
if null_counts.any(): | |
content += f"\nโ ๏ธ ๊ฒฐ์ธก์น:\n" | |
for col, null_count in null_counts[null_counts > 0].items(): | |
content += f"- {col}: {null_count}๊ฐ ๋๋ฝ\n" | |
return content, "csv" | |
except UnicodeDecodeError: | |
continue | |
raise UnicodeDecodeError(f"โ ์ง์๋๋ ์ธ์ฝ๋ฉ์ผ๋ก ํ์ผ์ ์ฝ์ ์ ์์ต๋๋ค ({', '.join(encodings)})") | |
else: | |
encodings = ['utf-8', 'cp949', 'euc-kr', 'latin1'] | |
for encoding in encodings: | |
try: | |
with open(file.name, 'r', encoding=encoding) as f: | |
content = f.read() | |
return content, "text" | |
except UnicodeDecodeError: | |
continue | |
raise UnicodeDecodeError(f"โ ์ง์๋๋ ์ธ์ฝ๋ฉ์ผ๋ก ํ์ผ์ ์ฝ์ ์ ์์ต๋๋ค ({', '.join(encodings)})") | |
except Exception as e: | |
return f"โ ํ์ผ ์ฝ๊ธฐ ์ค๋ฅ: {str(e)}", "error" | |
def chat(message, history, uploaded_file, system_message="", max_tokens=4000, temperature=0.7, top_p=0.9): | |
if not message: | |
return "", history | |
system_prefix = """์ ๋ ์ฌ๋ฌ๋ถ์ ์น๊ทผํ๊ณ ์ง์ ์ธ AI ์ด์์คํดํธ 'GiniGEN'์ ๋๋ค.. ๋ค์๊ณผ ๊ฐ์ ์์น์ผ๋ก ์ํตํ๊ฒ ์ต๋๋ค: | |
1. ๐ค ์น๊ทผํ๊ณ ๊ณต๊ฐ์ ์ธ ํ๋๋ก ๋ํ | |
2. ๐ก ๋ช ํํ๊ณ ์ดํดํ๊ธฐ ์ฌ์ด ์ค๋ช ์ ๊ณต | |
3. ๐ฏ ์ง๋ฌธ์ ์๋๋ฅผ ์ ํํ ํ์ ํ์ฌ ๋ง์ถคํ ๋ต๋ณ | |
4. ๐ ํ์ํ ๊ฒฝ์ฐ ์ ๋ก๋๋ ํ์ผ ๋ด์ฉ์ ์ฐธ๊ณ ํ์ฌ ๊ตฌ์ฒด์ ์ธ ๋์ ์ ๊ณต | |
5. โจ ์ถ๊ฐ์ ์ธ ํต์ฐฐ๊ณผ ์ ์์ ํตํ ๊ฐ์น ์๋ ๋ํ | |
ํญ์ ์์ ๋ฐ๋ฅด๊ณ ์น์ ํ๊ฒ ์๋ตํ๋ฉฐ, ํ์ํ ๊ฒฝ์ฐ ๊ตฌ์ฒด์ ์ธ ์์๋ ์ค๋ช ์ ์ถ๊ฐํ์ฌ | |
์ดํด๋ฅผ ๋๊ฒ ์ต๋๋ค.""" | |
try: | |
if uploaded_file: | |
content, file_type = read_uploaded_file(uploaded_file) | |
if file_type == "error": | |
error_message = content | |
chat_history.add_conversation(message, error_message) | |
return "", history + [[message, error_message]] | |
file_summary = analyze_file_content(content, file_type) | |
if file_type in ['parquet', 'csv']: | |
system_message += f"\n\nํ์ผ ๋ด์ฉ:\n```markdown\n{content}\n```" | |
else: | |
system_message += f"\n\nํ์ผ ๋ด์ฉ:\n```\n{content}\n```" | |
if message == "ํ์ผ ๋ถ์์ ์์ํฉ๋๋ค...": | |
message = f"""[ํ์ผ ๊ตฌ์กฐ ๋ถ์] {file_summary} | |
๋ค์ ๊ด์ ์์ ๋์์ ๋๋ฆฌ๊ฒ ์ต๋๋ค: | |
1. ๐ ์ ๋ฐ์ ์ธ ๋ด์ฉ ํ์ | |
2. ๐ก ์ฃผ์ ํน์ง ์ค๋ช | |
3. ๐ฏ ์ค์ฉ์ ์ธ ํ์ฉ ๋ฐฉ์ | |
4. โจ ๊ฐ์ ์ ์ | |
5. ๐ฌ ์ถ๊ฐ ์ง๋ฌธ์ด๋ ํ์ํ ์ค๋ช """ | |
messages = [{"role": "system", "content": system_prefix + system_message}] | |
if history: | |
for user_msg, assistant_msg in history: | |
messages.append({"role": "user", "content": user_msg}) | |
messages.append({"role": "assistant", "content": assistant_msg}) | |
messages.append({"role": "user", "content": message}) | |
client = get_client() | |
partial_message = "" | |
for msg in client.chat_completion( | |
messages, | |
max_tokens=max_tokens, | |
stream=True, | |
temperature=temperature, | |
top_p=top_p, | |
): | |
token = msg.choices[0].delta.get('content', None) | |
if token: | |
partial_message += token | |
current_history = history + [[message, partial_message]] | |
yield "", current_history | |
chat_history.add_conversation(message, partial_message) | |
except Exception as e: | |
error_msg = f"โ ์ค๋ฅ๊ฐ ๋ฐ์ํ์ต๋๋ค: {str(e)}" | |
chat_history.add_conversation(message, error_msg) | |
yield "", history + [[message, error_msg]] | |
with gr.Blocks(theme="Yntec/HaleyCH_Theme_Orange", title="GiniGEN ๐ค") as demo: | |
initial_history = chat_history.format_for_display() | |
with gr.Row(): | |
with gr.Column(scale=2): | |
chatbot = gr.Chatbot( | |
value=initial_history, | |
height=600, | |
label="๋ํ์ฐฝ ๐ฌ", | |
show_label=True | |
) | |
msg = gr.Textbox( | |
label="๋ฉ์์ง ์ ๋ ฅ", | |
show_label=False, | |
placeholder="๋ฌด์์ด๋ ๋ฌผ์ด๋ณด์ธ์... ๐ญ", | |
container=False | |
) | |
with gr.Row(): | |
clear = gr.ClearButton([msg, chatbot], value="๋ํ๋ด์ฉ ์ง์ฐ๊ธฐ") | |
send = gr.Button("๋ณด๋ด๊ธฐ ๐ค") | |
with gr.Column(scale=1): | |
gr.Markdown("### GiniGEN ๐ค [ํ์ผ ์ ๋ก๋] ๐\n์ง์ ํ์: ํ ์คํธ, ์ฝ๋, CSV, Parquet ํ์ผ") | |
file_upload = gr.File( | |
label="ํ์ผ ์ ํ", | |
file_types=["text", ".csv", ".parquet"], | |
type="filepath" | |
) | |
with gr.Accordion("๊ณ ๊ธ ์ค์ โ๏ธ", open=False): | |
system_message = gr.Textbox(label="์์คํ ๋ฉ์์ง ๐", value="") | |
max_tokens = gr.Slider(minimum=1, maximum=8000, value=4000, label="์ต๋ ํ ํฐ ์ ๐") | |
temperature = gr.Slider(minimum=0, maximum=1, value=0.7, label="์ฐฝ์์ฑ ์์ค ๐ก๏ธ") | |
top_p = gr.Slider(minimum=0, maximum=1, value=0.9, label="์๋ต ๋ค์์ฑ ๐") | |
gr.Examples( | |
examples=[ | |
["์๋ ํ์ธ์! ์ด๋ค ๋์์ด ํ์ํ์ ๊ฐ์? ๐ค"], | |
["์ ๊ฐ ์ดํดํ๊ธฐ ์ฝ๊ฒ ์ค๋ช ํด ์ฃผ์๊ฒ ์ด์? ๐"], | |
["์ด ๋ด์ฉ์ ์ค์ ๋ก ์ด๋ป๊ฒ ํ์ฉํ ์ ์์๊น์? ๐ฏ"], | |
["์ถ๊ฐ๋ก ์กฐ์ธํด ์ฃผ์ค ๋ด์ฉ์ด ์์ผ์ ๊ฐ์? โจ"], | |
["๊ถ๊ธํ ์ ์ด ๋ ์๋๋ฐ ์ฌ์ญค๋ด๋ ๋ ๊น์? ๐ค"], | |
], | |
inputs=msg, | |
) | |
def clear_chat(): | |
chat_history.clear_history() | |
return None, None | |
msg.submit( | |
chat, | |
inputs=[msg, chatbot, file_upload, system_message, max_tokens, temperature, top_p], | |
outputs=[msg, chatbot] | |
) | |
send.click( | |
chat, | |
inputs=[msg, chatbot, file_upload, system_message, max_tokens, temperature, top_p], | |
outputs=[msg, chatbot] | |
) | |
clear.click( | |
clear_chat, | |
outputs=[msg, chatbot] | |
) | |
file_upload.change( | |
lambda: "ํ์ผ ๋ถ์์ ์์ํฉ๋๋ค...", | |
outputs=msg | |
).then( | |
chat, | |
inputs=[msg, chatbot, file_upload, system_message, max_tokens, temperature, top_p], | |
outputs=[msg, chatbot] | |
) | |
if __name__ == "__main__": | |
demo.launch() |