import os from threading import Thread from typing import Iterator import json from datetime import datetime from pathlib import Path from uuid import uuid4 import gradio as gr import spaces import torch from transformers import AutoModelForCausalLM, AutoTokenizer, TextIteratorStreamer, AutoModel from pathlib import Path from pinecone.grpc import PineconeGRPC as Pinecone import torch from huggingface_hub import CommitScheduler HF_UPLOAD = os.environ.get("HF_UPLOAD") JSON_DATASET_DIR = Path("json_dataset") JSON_DATASET_DIR.mkdir(parents=True, exist_ok=True) JSON_DATASET_PATH = JSON_DATASET_DIR / f"train-{uuid4()}.json" scheduler = CommitScheduler( repo_id="psyche/llama3-mrc-chat-log", repo_type="dataset", folder_path=JSON_DATASET_DIR, path_in_repo="data", token=HF_UPLOAD ) pc = Pinecone(api_key=os.environ.get("PINECONE")) index = pc.Index("commonsense") """ device = torch.device("cuda" if torch.cuda.is_available() else "CPU") retriever_tokenizer = AutoTokenizer.from_pretrained("psyche/dpr-longformer-ko-4096") retriever = AutoModel.from_pretrained("psyche/dpr-longformer-ko-4096") retriever.eval() retriever.to(device) """ def save_json(question: str, answer: str) -> None: with scheduler.lock: with JSON_DATASET_PATH.open("a") as f: json.dump({"question": question, "answer": answer, "datetime": datetime.now().isoformat(), "label":""}, f, ensure_ascii=False) f.write("\n") MAX_MAX_NEW_TOKENS = 8192 DEFAULT_MAX_NEW_TOKENS = 4096 MAX_INPUT_TOKEN_LENGTH = 2048 DESCRIPTION = """\ # Llama-3 8B Korean QA Chatbot \ """ if not torch.cuda.is_available(): DESCRIPTION += "\n
Running on CPU π₯Ά This demo does not work on CPU.
" if torch.cuda.is_available(): model_id = "psyche/llama3-8b-instruct-ko" #model_id = "psyche/meta-llama3-experiments" model = AutoModelForCausalLM.from_pretrained(model_id, device_map="auto", load_in_4bit=True, revision="v4.3") tokenizer = AutoTokenizer.from_pretrained(model_id, revision="v4.3") @spaces.GPU def generate( message: str, chat_history: list[tuple[str, str]], system_prompt: str, max_new_tokens: int = 1024, temperature: float = 0.6, top_p: float = 0.9, top_k: int = 50, repetition_penalty: float = 1.2, ) -> Iterator[str]: conversation = [] if system_prompt: conversation.append({"role": "system", "content": system_prompt}) for user, assistant in chat_history: conversation.extend([{"role": "user", "content": user}, {"role": "assistant", "content": assistant}]) """ retriever_inputs = retriever_tokenizer([message], max_length=1024, truncation=True, return_tensors="pt") retriever_inputs = {k:v.to(retriever.device) for k,v in retriever_inputs.items()} with torch.no_grad(): embeddings = retriever(**retriever_inputs).pooler_output embeddings = embeddings.cpu().numpy() results = index.query( vector=embeddings[0], top_k=1, include_values=False, include_metadata=True ) results = [result for result in results["matches"] if result["score"] > 0.6] if len(results) > 0: message = results[0]["metadata"]["text"] + f"\n\nμ λ¬Έλ§₯μ μ°Έκ³ νμ¬ μ§λ¬Έ '{message}'μ λ΅νλ©΄?" """ conversation.append({"role": "user", "content": message }) input_ids = tokenizer.apply_chat_template(conversation, return_tensors="pt", add_generation_prompt=True) if input_ids.shape[1] > MAX_INPUT_TOKEN_LENGTH: input_ids = input_ids[:, -MAX_INPUT_TOKEN_LENGTH:] gr.Warning(f"Trimmed input from conversation as it was longer than {MAX_INPUT_TOKEN_LENGTH} tokens.") input_ids = input_ids.to(model.device) streamer = TextIteratorStreamer(tokenizer, timeout=10.0, skip_prompt=True, skip_special_tokens=True) generate_kwargs = dict( {"input_ids": input_ids}, streamer=streamer, max_new_tokens=max_new_tokens, do_sample=True, top_p=top_p, top_k=top_k, temperature=temperature, num_beams=1, repetition_penalty=repetition_penalty, ) t = Thread(target=model.generate, kwargs=generate_kwargs) t.start() outputs = [] for text in streamer: outputs.append(text) yield "".join(outputs) save_json(message, "".join(outputs)) chat_interface = gr.ChatInterface( fn=generate, additional_inputs=[ gr.Textbox(label="System prompt", lines=6), gr.Slider( label="Max new tokens", minimum=1, maximum=MAX_MAX_NEW_TOKENS, step=1, value=DEFAULT_MAX_NEW_TOKENS, ), gr.Slider( label="Temperature", minimum=0.01, maximum=4.0, step=0.1, value=0.01, ), gr.Slider( label="Top-p (nucleus sampling)", minimum=0.05, maximum=1.0, step=0.05, value=0.9, ), gr.Slider( label="Top-k", minimum=1, maximum=1000, step=1, value=50, ), gr.Slider( label="Repetition penalty", minimum=1.0, maximum=2.0, step=0.05, value=1.15, ), ], stop_btn=None, examples=[ ["μλ ?"], ["λκ° ν μ μλκ² λμΌ?"], ["νμ΄μ¬μ λν΄μ μλ €μ€"], ["λνλ―Όκ΅μ μλλ?"], ["λ λλ μ΄λλλΌ λ μ΄μΌ?"], ], ) with gr.Blocks(css="style.css") as demo: gr.Markdown(DESCRIPTION) gr.DuplicateButton(value="Duplicate Space for private use", elem_id="duplicate-button") chat_interface.render() if __name__ == "__main__": demo.queue(max_size=20).launch()