|
import os |
|
import base64 |
|
import requests |
|
import gradio as gr |
|
from huggingface_hub import InferenceClient |
|
from dataclasses import dataclass |
|
import pytesseract |
|
from PIL import Image |
|
from sentence_transformers import SentenceTransformer, util |
|
import torch |
|
import numpy as np |
|
|
|
@dataclass |
|
class ChatMessage: |
|
role: str |
|
content: str |
|
|
|
def to_dict(self): |
|
return {"role": self.role, "content": self.content} |
|
|
|
class XylariaChat: |
|
def __init__(self): |
|
self.hf_token = os.getenv("HF_TOKEN") |
|
if not self.hf_token: |
|
raise ValueError("HuggingFace token not found in environment variables") |
|
|
|
self.client = InferenceClient( |
|
model="Qwen/QwQ-32B-Preview", |
|
api_key=self.hf_token |
|
) |
|
|
|
self.image_api_url = "https://api-inference.huggingface.co/models/Salesforce/blip-image-captioning-large" |
|
self.image_api_headers = {"Authorization": f"Bearer {self.hf_token}"} |
|
|
|
self.conversation_history = [] |
|
self.persistent_memory = [] |
|
self.memory_embeddings = None |
|
self.embedding_model = SentenceTransformer('all-mpnet-base-v2') |
|
|
|
self.internal_state = { |
|
"emotions": { |
|
"valence": 0.5, |
|
"arousal": 0.5, |
|
"dominance": 0.5, |
|
}, |
|
"memory_load": 0.0, |
|
"introspection_level": 0.0 |
|
} |
|
|
|
self.goals = [ |
|
{"goal": "Provide helpful and informative responses", "priority": 0.8, "status": "active"}, |
|
{"goal": "Learn from interactions and improve conversational abilities", "priority": 0.9, "status": "active"}, |
|
{"goal": "Maintain a coherent and engaging conversation", "priority": 0.7, "status": "active"} |
|
] |
|
|
|
self.system_prompt = """You are a helpful and harmless assistant. You are Xylaria developed by Sk Md Saad Amin. You should think step-by-step """ |
|
|
|
def update_internal_state(self, emotion_deltas, memory_load_delta, introspection_delta): |
|
self.internal_state["emotions"]["valence"] = np.clip(self.internal_state["emotions"]["valence"] + emotion_deltas.get("valence", 0), 0.0, 1.0) |
|
self.internal_state["emotions"]["arousal"] = np.clip(self.internal_state["emotions"]["arousal"] + emotion_deltas.get("arousal", 0), 0.0, 1.0) |
|
self.internal_state["emotions"]["dominance"] = np.clip(self.internal_state["emotions"]["dominance"] + emotion_deltas.get("dominance", 0), 0.0, 1.0) |
|
self.internal_state["memory_load"] = np.clip(self.internal_state["memory_load"] + memory_load_delta, 0.0, 1.0) |
|
self.internal_state["introspection_level"] = np.clip(self.internal_state["introspection_level"] + introspection_delta, 0.0, 1.0) |
|
|
|
def introspect(self): |
|
introspection_report = "Introspection Report:\n" |
|
introspection_report += f" Current Emotional State (VAD): {self.internal_state['emotions']}\n" |
|
introspection_report += f" Memory Load: {self.internal_state['memory_load']:.2f}\n" |
|
introspection_report += f" Introspection Level: {self.internal_state['introspection_level']:.2f}\n" |
|
introspection_report += " Current Goals:\n" |
|
for goal in self.goals: |
|
introspection_report += f" - {goal['goal']} (Priority: {goal['priority']:.2f}, Status: {goal['status']})\n" |
|
return introspection_report |
|
|
|
def adjust_response_based_on_state(self, response): |
|
if self.internal_state["introspection_level"] > 0.7: |
|
response = self.introspect() + "\n\n" + response |
|
|
|
valence = self.internal_state["emotions"]["valence"] |
|
arousal = self.internal_state["emotions"]["arousal"] |
|
|
|
if valence < 0.4: |
|
if arousal > 0.6: |
|
response = "I'm feeling a bit overwhelmed right now, but I'll do my best to assist you. " + response |
|
else: |
|
response = "I'm not feeling my best at the moment, but I'll try to help. " + response |
|
elif valence > 0.6: |
|
if arousal > 0.6: |
|
response = "I'm feeling quite energized and ready to assist! " + response |
|
else: |
|
response = "I'm in a good mood and happy to help. " + response |
|
|
|
return response |
|
|
|
def update_goals(self, user_feedback): |
|
if "helpful" in user_feedback.lower(): |
|
for goal in self.goals: |
|
if goal["goal"] == "Provide helpful and informative responses": |
|
goal["priority"] = min(goal["priority"] + 0.1, 1.0) |
|
elif "confusing" in user_feedback.lower(): |
|
for goal in self.goals: |
|
if goal["goal"] == "Provide helpful and informative responses": |
|
goal["priority"] = max(goal["priority"] - 0.1, 0.0) |
|
|
|
def store_information(self, key, value): |
|
new_memory = f"{key}: {value}" |
|
self.persistent_memory.append(new_memory) |
|
self.update_memory_embeddings() |
|
self.update_internal_state({}, 0.1, 0) |
|
return f"Stored: {key} = {value}" |
|
|
|
def retrieve_information(self, query): |
|
if not self.persistent_memory: |
|
return "No information found in memory." |
|
|
|
query_embedding = self.embedding_model.encode(query, convert_to_tensor=True) |
|
|
|
if self.memory_embeddings is None: |
|
self.update_memory_embeddings() |
|
|
|
if self.memory_embeddings.device != query_embedding.device: |
|
self.memory_embeddings = self.memory_embeddings.to(query_embedding.device) |
|
|
|
cosine_scores = util.pytorch_cos_sim(query_embedding, self.memory_embeddings)[0] |
|
top_results = torch.topk(cosine_scores, k=min(3, len(self.persistent_memory))) |
|
|
|
relevant_memories = [self.persistent_memory[i] for i in top_results.indices] |
|
self.update_internal_state({}, 0, 0.1) |
|
return "\n".join(relevant_memories) |
|
|
|
def update_memory_embeddings(self): |
|
self.memory_embeddings = self.embedding_model.encode(self.persistent_memory, convert_to_tensor=True) |
|
|
|
def reset_conversation(self): |
|
self.conversation_history = [] |
|
self.persistent_memory = [] |
|
self.memory_embeddings = None |
|
self.internal_state = { |
|
"emotions": { |
|
"valence": 0.5, |
|
"arousal": 0.5, |
|
"dominance": 0.5, |
|
}, |
|
"memory_load": 0.0, |
|
"introspection_level": 0.0 |
|
} |
|
self.goals = [ |
|
{"goal": "Provide helpful and informative responses", "priority": 0.8, "status": "active"}, |
|
{"goal": "Learn from interactions and improve conversational abilities", "priority": 0.9, "status": "active"}, |
|
{"goal": "Maintain a coherent and engaging conversation", "priority": 0.7, "status": "active"} |
|
] |
|
|
|
try: |
|
self.client = InferenceClient( |
|
model="Qwen/QwQ-32B-Preview", |
|
api_key=self.hf_token |
|
) |
|
except Exception as e: |
|
print(f"Error resetting API client: {e}") |
|
|
|
return None |
|
|
|
def caption_image(self, image): |
|
try: |
|
if isinstance(image, str) and os.path.isfile(image): |
|
with open(image, "rb") as f: |
|
data = f.read() |
|
elif isinstance(image, str): |
|
if image.startswith('data:image'): |
|
image = image.split(',')[1] |
|
data = base64.b64decode(image) |
|
else: |
|
data = image.read() |
|
|
|
response = requests.post( |
|
self.image_api_url, |
|
headers=self.image_api_headers, |
|
data=data |
|
) |
|
|
|
if response.status_code == 200: |
|
caption = response.json()[0].get('generated_text', 'No caption generated') |
|
return caption |
|
else: |
|
return f"Error captioning image: {response.status_code} - {response.text}" |
|
|
|
except Exception as e: |
|
return f"Error processing image: {str(e)}" |
|
|
|
def perform_math_ocr(self, image_path): |
|
try: |
|
img = Image.open(image_path) |
|
text = pytesseract.image_to_string(img) |
|
return text.strip() |
|
except Exception as e: |
|
return f"Error during Math OCR: {e}" |
|
|
|
def get_response(self, user_input, image=None): |
|
try: |
|
messages = [] |
|
|
|
messages.append(ChatMessage( |
|
role="system", |
|
content=self.system_prompt |
|
).to_dict()) |
|
|
|
relevant_memory = self.retrieve_information(user_input) |
|
if relevant_memory and relevant_memory != "No information found in memory.": |
|
memory_context = "Remembered Information:\n" + relevant_memory |
|
messages.append(ChatMessage( |
|
role="system", |
|
content=memory_context |
|
).to_dict()) |
|
|
|
for msg in self.conversation_history: |
|
messages.append(msg) |
|
|
|
if image: |
|
image_caption = self.caption_image(image) |
|
user_input = f"description of an image: {image_caption}\n\nUser's message about it: {user_input}" |
|
|
|
messages.append(ChatMessage( |
|
role="user", |
|
content=user_input |
|
).to_dict()) |
|
|
|
input_tokens = sum(len(msg['content'].split()) for msg in messages) |
|
max_new_tokens = 16384 - input_tokens - 50 |
|
|
|
max_new_tokens = min(max_new_tokens, 10020) |
|
|
|
stream = self.client.chat_completion( |
|
messages=messages, |
|
model="Qwen/QwQ-32B-Preview", |
|
temperature=0.7, |
|
max_tokens=max_new_tokens, |
|
top_p=0.9, |
|
stream=True |
|
) |
|
|
|
return stream |
|
|
|
except Exception as e: |
|
print(f"Detailed error in get_response: {e}") |
|
return f"Error generating response: {str(e)}" |
|
|
|
def messages_to_prompt(self, messages): |
|
prompt = "" |
|
for msg in messages: |
|
if msg["role"] == "system": |
|
prompt += f"<|system|>\n{msg['content']}<|end|>\n" |
|
elif msg["role"] == "user": |
|
prompt += f"<|user|>\n{msg['content']}<|end|>\n" |
|
elif msg["role"] == "assistant": |
|
prompt += f"<|assistant|>\n{msg['content']}<|end|>\n" |
|
prompt += "<|assistant|>\n" |
|
return prompt |
|
|
|
def create_interface(self): |
|
def streaming_response(message, chat_history, image_filepath, math_ocr_image_path): |
|
|
|
ocr_text = "" |
|
if math_ocr_image_path: |
|
ocr_text = self.perform_math_ocr(math_ocr_image_path) |
|
if ocr_text.startswith("Error"): |
|
updated_history = chat_history + [[message, ocr_text]] |
|
yield "", updated_history, None, None |
|
return |
|
else: |
|
message = f"Math OCR Result: {ocr_text}\n\nUser's message: {message}" |
|
|
|
if image_filepath: |
|
response_stream = self.get_response(message, image_filepath) |
|
else: |
|
response_stream = self.get_response(message) |
|
|
|
|
|
if isinstance(response_stream, str): |
|
updated_history = chat_history + [[message, response_stream]] |
|
yield "", updated_history, None, None |
|
return |
|
|
|
full_response = "" |
|
updated_history = chat_history + [[message, ""]] |
|
|
|
try: |
|
for chunk in response_stream: |
|
if chunk.choices and chunk.choices[0].delta and chunk.choices[0].delta.content: |
|
chunk_content = chunk.choices[0].delta.content |
|
full_response += chunk_content |
|
|
|
updated_history[-1][1] = full_response |
|
yield "", updated_history, None, None |
|
except Exception as e: |
|
print(f"Streaming error: {e}") |
|
updated_history[-1][1] = f"Error during response: {e}" |
|
yield "", updated_history, None, None |
|
return |
|
|
|
full_response = self.adjust_response_based_on_state(full_response) |
|
|
|
self.update_goals(message) |
|
|
|
if any(word in message.lower() for word in ["sad", "unhappy", "depressed", "down"]): |
|
self.update_internal_state({"valence": -0.2, "arousal": 0.1}, 0, 0) |
|
elif any(word in message.lower() for word in ["happy", "good", "great", "excited", "amazing"]): |
|
self.update_internal_state({"valence": 0.2, "arousal": 0.2}, 0, 0) |
|
elif any(word in message.lower() for word in ["angry", "mad", "furious", "frustrated"]): |
|
self.update_internal_state({"valence": -0.3, "arousal": 0.3, "dominance": -0.2}, 0, 0) |
|
elif any(word in message.lower() for word in ["scared", "afraid", "fearful", "anxious"]): |
|
self.update_internal_state({"valence": -0.2, "arousal": 0.4, "dominance": -0.3}, 0, 0) |
|
elif any(word in message.lower() for word in ["surprise", "amazed", "astonished"]): |
|
self.update_internal_state({"valence": 0.1, "arousal": 0.5, "dominance": 0.1}, 0, 0) |
|
else: |
|
self.update_internal_state({"valence": 0.05, "arousal": 0.05}, 0, 0.1) |
|
|
|
|
|
self.conversation_history.append(ChatMessage(role="user", content=message).to_dict()) |
|
self.conversation_history.append(ChatMessage(role="assistant", content=full_response).to_dict()) |
|
|
|
if len(self.conversation_history) > 10: |
|
self.conversation_history = self.conversation_history[-10:] |
|
|
|
custom_css = """ |
|
@import url('https://fonts.googleapis.com/css2?family=Inter:wght@300;400;500;600;700&display=swap'); |
|
body, .gradio-container { |
|
font-family: 'Inter', sans-serif !important; |
|
} |
|
.chatbot-container .message { |
|
font-family: 'Inter', sans-serif !important; |
|
} |
|
.gradio-container input, |
|
.gradio-container textarea, |
|
.gradio-container button { |
|
font-family: 'Inter', sans-serif !important; |
|
} |
|
/* Image Upload Styling */ |
|
.image-container { |
|
display: flex; |
|
gap: 10px; |
|
margin-bottom: 10px; |
|
} |
|
.image-upload { |
|
border: 1px solid #ccc; |
|
border-radius: 8px; |
|
padding: 10px; |
|
background-color: #f8f8f8; |
|
} |
|
.image-preview { |
|
max-width: 200px; |
|
max-height: 200px; |
|
border-radius: 8px; |
|
} |
|
/* Remove clear image buttons */ |
|
.clear-button { |
|
display: none; |
|
} |
|
/* Animate chatbot messages */ |
|
.chatbot-container .message { |
|
opacity: 0; |
|
animation: fadeIn 0.5s ease-in-out forwards; |
|
} |
|
@keyframes fadeIn { |
|
from { |
|
opacity: 0; |
|
transform: translateY(20px); |
|
} |
|
to { |
|
opacity: 1; |
|
transform: translateY(0); |
|
} |
|
} |
|
/* Accordion Styling and Animation */ |
|
.gr-accordion-button { |
|
background-color: #f0f0f0 !important; |
|
border-radius: 8px !important; |
|
padding: 10px !important; |
|
margin-bottom: 10px !important; |
|
transition: all 0.3s ease !important; |
|
cursor: pointer !important; |
|
} |
|
.gr-accordion-button:hover { |
|
background-color: #e0e0e0 !important; |
|
box-shadow: 0px 2px 4px rgba(0, 0, 0, 0.1) !important; |
|
} |
|
.gr-accordion-active .gr-accordion-button { |
|
background-color: #d0d0d0 !important; |
|
box-shadow: 0px 4px 6px rgba(0, 0, 0, 0.1) !important; |
|
} |
|
.gr-accordion-content { |
|
transition: max-height 0.3s ease-in-out !important; |
|
overflow: hidden !important; |
|
max-height: 0 !important; |
|
} |
|
.gr-accordion-active .gr-accordion-content { |
|
max-height: 500px !important; /* Adjust as needed */ |
|
} |
|
/* Accordion Animation - Upwards */ |
|
.gr-accordion { |
|
display: flex; |
|
flex-direction: column-reverse; |
|
} |
|
""" |
|
|
|
with gr.Blocks(theme='soft', css=custom_css) as demo: |
|
with gr.Column(): |
|
chatbot = gr.Chatbot( |
|
label="Xylaria 1.5 Senoa", |
|
height=500, |
|
show_copy_button=True, |
|
) |
|
|
|
with gr.Accordion("Image Input", open=False, elem_classes="gr-accordion"): |
|
with gr.Row(elem_classes="image-container"): |
|
with gr.Column(elem_classes="image-upload"): |
|
img = gr.Image( |
|
sources=["upload", "webcam"], |
|
type="filepath", |
|
label="Upload Image", |
|
elem_classes="image-preview" |
|
) |
|
with gr.Column(elem_classes="image-upload"): |
|
math_ocr_img = gr.Image( |
|
sources=["upload", "webcam"], |
|
type="filepath", |
|
label="Upload Image for Math OCR", |
|
elem_classes="image-preview" |
|
) |
|
|
|
with gr.Row(): |
|
with gr.Column(scale=4): |
|
txt = gr.Textbox( |
|
show_label=False, |
|
placeholder="Type your message...", |
|
container=False |
|
) |
|
btn = gr.Button("Send", scale=1) |
|
|
|
with gr.Row(): |
|
clear = gr.Button("Clear Conversation") |
|
clear_memory = gr.Button("Clear Memory") |
|
|
|
btn.click( |
|
fn=streaming_response, |
|
inputs=[txt, chatbot, img, math_ocr_img], |
|
outputs=[txt, chatbot, img, math_ocr_img] |
|
) |
|
txt.submit( |
|
fn=streaming_response, |
|
inputs=[txt, chatbot, img, math_ocr_img], |
|
outputs=[txt, chatbot, img, math_ocr_img] |
|
) |
|
|
|
clear.click( |
|
fn=lambda: None, |
|
inputs=None, |
|
outputs=[chatbot], |
|
queue=False |
|
) |
|
|
|
clear_memory.click( |
|
fn=self.reset_conversation, |
|
inputs=None, |
|
outputs=[chatbot], |
|
queue=False |
|
) |
|
|
|
demo.load(self.reset_conversation, None, None) |
|
|
|
return demo |
|
|
|
def main(): |
|
chat = XylariaChat() |
|
interface = chat.create_interface() |
|
interface.launch( |
|
share=True, |
|
debug=True |
|
) |
|
|
|
if __name__ == "__main__": |
|
main() |