""" Simple Chatbot @author: Nigel Gebodh @email: nigel.gebodh@gmail.com """ import streamlit as st import os import requests import json entire_assistant_response = "" def get_streamed_response(message, history, model): all_message = [] if not history: # If history is empty all_message.append({"role": "user", "content": ""}) for human, assistant in history: all_message.append({"role": "user", "content": human}) all_message.append({"role": "assistant", "content": assistant}) global entire_assistant_response entire_assistant_response = "" # Reset the entire assistant response all_message.append({"role": "user", "content": message}) url = "https://api.together.xyz/v1/chat/completions" payload = { "model": model, "temperature": 1.05, "top_p": 0.9, "top_k": 50, "repetition_penalty": 1, "n": 1, "messages": all_message, "stream_tokens": True, } TOGETHER_API_KEY = os.getenv('TOGETHER_API_KEY') headers = { "accept": "application/json", "content-type": "application/json", "Authorization": f"Bearer {TOGETHER_API_KEY}", } response = requests.post(url, json=payload, headers=headers, stream=True) response.raise_for_status() # Ensure HTTP request was successful for line in response.iter_lines(): if line: decoded_line = line.decode('utf-8') # Check for the completion signal if decoded_line == "data: [DONE]": yield entire_assistant_response # Yield the entire response at the end break try: # Decode and strip any SSE format specific prefix ("data: ") if decoded_line.startswith("data: "): decoded_line = decoded_line.replace("data: ", "") chunk_data = json.loads(decoded_line) content = chunk_data['choices'][0]['delta']['content'] entire_assistant_response += content # Aggregate content yield entire_assistant_response except json.JSONDecodeError: print(f"Invalid JSON received: {decoded_line}") continue except KeyError as e: print(f"KeyError encountered: {e}") continue print(entire_assistant_response) all_message.append({"role": "assistant", "content": entire_assistant_response}) # Initialize Streamlit app st.title("Simple Chatbot") # Initialize session state if not present if "messages" not in st.session_state: st.session_state.messages = [] # Define available models models = { "Mistral": "mistralai/Mistral-7B-Instruct-v0.2", "Gemma-7B": "google/gemma-7b-it", "Gemma-2B": "google/gemma-2b-it", "Zephyr-7B-β": "HuggingFaceH4/zephyr-7b-beta", "BibleLearnerAI": "NousResearch/Nous-Hermes-2-Yi-34B" } # Allow user to select a model selected_model = st.sidebar.selectbox("Select Model", list(models.keys())) # Create model description st.sidebar.write(f"You're now chatting with **{selected_model}**") st.sidebar.markdown("*Generated content may be inaccurate or false.*") st.sidebar.markdown("\nLearn how to build this chatbot [here](https://ngebodh.github.io/projects/2024-03-05/).") st.sidebar.markdown("\nRun into issues? Try the [back-up](https://huggingface.co/spaces/ngebodh/SimpleChatbot-Backup).") if "prev_option" not in st.session_state: st.session_state.prev_option = selected_model if st.session_state.prev_option != selected_model: st.session_state.messages = [] st.session_state.prev_option = selected_model #Pull in the model we want to use repo_id = models[selected_model] st.subheader(f'AI - {selected_model}') # Accept user input if prompt := st.text_input(f"Hi I'm {selected_model}, ask me a question"): # Display user message with st.spinner("AI is typing..."): st.session_state.messages.append({"role": "user", "content": prompt}) # Call selected model to get response response_stream = get_streamed_response(prompt, [(m["content"] for m in st.session_state.messages[:-1])], repo_id) for response in response_stream: st.session_state.messages.append({"role": "assistant", "content": response}) # Display chat history for message in st.session_state.messages: if message["role"] == "user": st.text_input("You:", value=message["content"], disabled=True) else: st.text_input(f"{selected_model}:", value=message["content"], disabled=True)