Spaces:
Sleeping
Sleeping
import gradio as gr | |
from groq import Groq | |
import json | |
import requests | |
from datetime import datetime | |
import logging | |
import os | |
from typing import Dict, List, Optional | |
import time | |
from googlesearch import search | |
import threading | |
import queue | |
import colorama | |
from colorama import Fore, Style | |
import random | |
import pandas as pd | |
import csv | |
from PIL import Image | |
from io import BytesIO | |
from selenium import webdriver | |
from selenium.webdriver.common.by import By | |
import pytesseract | |
# Initialize colorama for colored console output | |
colorama.init() | |
# Configure logging | |
logging.basicConfig( | |
level=logging.INFO, | |
format='%(asctime)s | %(levelname)s | %(message)s', | |
handlers=[ | |
logging.StreamHandler(), | |
logging.FileHandler('agent_chat.log') | |
] | |
) | |
# Initialize Groq client | |
GROQ_API_KEY = "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" | |
client = Groq(api_key=GROQ_API_KEY) | |
def google_search(query: str, num_results: int = 5) -> List[str]: | |
"""Perform a Google search and return results""" | |
try: | |
search_results = [] | |
for result in search(query, stop=num_results): | |
search_results.append(result) | |
return search_results | |
except Exception as e: | |
logging.error(f"Google search error: {str(e)}") | |
return [] | |
class ConversationManager: | |
def __init__(self): | |
self.markdown_file = "conversation_history.md" | |
self.text_file = "conversation_history.txt" | |
self.current_session = datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
def log_conversation(self, message: str, agent: str, is_task: bool = False): | |
"""Log conversation to both markdown and text files""" | |
# Log to markdown file | |
with open(self.markdown_file, "a", encoding="utf-8") as f: | |
if not os.path.getsize(self.markdown_file): | |
f.write(f"# Scamrakshak Team Conversations\n\n") | |
if is_task: | |
f.write(f"\n### Task Assignment ({self.current_session})\n") | |
f.write(f"**From CEO to {agent}**:\n") | |
f.write(f"```\n{message}\n```\n") | |
else: | |
f.write(f"\n### {agent} Response ({self.current_session})\n") | |
f.write(f"{message}\n") | |
f.write("\n---\n") | |
# Log to text file | |
with open(self.text_file, "a", encoding="utf-8") as f: | |
if not os.path.getsize(self.text_file): | |
f.write("=== SCAMRAKSHAK TEAM CONVERSATIONS ===\n\n") | |
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
if is_task: | |
f.write(f"\n[{timestamp}] TASK ASSIGNMENT\n") | |
f.write(f"From: CEO\n") | |
f.write(f"To: {agent}\n") | |
f.write(f"Task: {message}\n") | |
else: | |
f.write(f"\n[{timestamp}] {agent} RESPONSE\n") | |
f.write(f"{message}\n") | |
f.write("\n" + "="*50 + "\n") | |
class Agent: | |
def __init__(self, name: str, role: str, system_prompt: str, conversation_manager: ConversationManager): | |
self.name = name | |
self.role = role | |
self.system_prompt = system_prompt | |
self.conversation_manager = conversation_manager | |
self.conversation_history: List[Dict] = [] | |
self.task_queue = queue.Queue() | |
self.research_results = {} | |
def get_response(self, user_input: str, from_agent: str = None) -> str: | |
# First check for scam detection commands | |
scam_detection_response = self.handle_scam_detection(user_input) | |
if scam_detection_response: | |
return scam_detection_response | |
# Continue with normal response processing | |
try: | |
# Add context about who is sending the message | |
sender_context = f"Message from {from_agent}: " if from_agent else "" | |
# Perform research if needed | |
research_results = [] | |
if "research" in user_input.lower() or "search" in user_input.lower(): | |
research_results = google_search(user_input) | |
research_context = "\n\nResearch results:\n" + "\n".join(research_results) | |
else: | |
research_context = "" | |
# Prepare messages including conversation history | |
messages = [{"role": "system", "content": self.system_prompt}] | |
messages.extend(self.conversation_history) | |
messages.append({ | |
"role": "user", | |
"content": f"{sender_context}{user_input}{research_context}" | |
}) | |
# Get response from Groq | |
chat_completion = client.chat.completions.create( | |
messages=messages, | |
model="llama-3.2-90b-text-preview", | |
temperature=0.7, | |
max_tokens=1000 | |
) | |
response = chat_completion.choices[0].message.content | |
# Log the response | |
self.conversation_manager.log_conversation( | |
response, | |
self.name, | |
is_task=False | |
) | |
# Update conversation history | |
self.conversation_history.append({"role": "user", "content": user_input}) | |
self.conversation_history.append({"role": "assistant", "content": response}) | |
# Keep only last 10 messages to prevent context length issues | |
if len(self.conversation_history) > 10: | |
self.conversation_history = self.conversation_history[-10:] | |
return f"{self.name}: {response}" | |
except Exception as e: | |
logging.error(f"Error getting response from {self.name}: {str(e)}") | |
return f"Error: Could not get response from {self.name}. Please try again." | |
def assign_task(self, task: str, from_agent: str): | |
"""Add a task to the agent's queue""" | |
self.task_queue.put((task, from_agent)) | |
self.conversation_manager.log_conversation( | |
task, | |
self.name, | |
is_task=True | |
) | |
def process_task(self) -> Optional[str]: | |
"""Process the next task in the queue""" | |
if not self.task_queue.empty(): | |
task, from_agent = self.task_queue.get() | |
response = self.get_response(task, from_agent) | |
return response | |
return None | |
def log_communication(self, message: str, from_agent: str = None, to_agent: str = None): | |
"""Log communication between agents""" | |
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
if from_agent and to_agent: | |
print(f"{Fore.YELLOW}[{timestamp}] {Fore.GREEN}{from_agent} → {to_agent}{Fore.WHITE}: {message}{Style.RESET_ALL}") | |
elif from_agent: | |
print(f"{Fore.YELLOW}[{timestamp}] {Fore.BLUE}{from_agent}{Fore.WHITE}: {message}{Style.RESET_ALL}") | |
else: | |
print(f"{Fore.YELLOW}[{timestamp}]{Fore.WHITE}: {message}{Style.RESET_ALL}") | |
def handle_scam_detection(self, message: str) -> str: | |
"""Handle scam detection commands""" | |
if "scam_detect" in message.lower(): | |
try: | |
self.log_communication("Initializing scam detection process...", self.name) | |
# Create necessary directories | |
directories = ['data/images', 'data/texts', 'data/reports'] | |
for directory in directories: | |
os.makedirs(directory, exist_ok=True) | |
# Start detection in background | |
def run_detection(): | |
try: | |
self.log_communication("Starting image scraping...", self.name) | |
image_urls = self.scrape_scam_images() | |
if image_urls: | |
self.log_communication(f"Found {len(image_urls)} images. Processing...", self.name) | |
self.process_scam_images(image_urls) | |
# Clean up images | |
images_dir = os.path.join('data', 'images') | |
if os.path.exists(images_dir): | |
import shutil | |
shutil.rmtree(images_dir) | |
os.makedirs(images_dir) | |
self.log_communication("Scam detection completed and images cleaned up.", self.name) | |
except Exception as e: | |
self.log_communication(f"Error in scam detection: {str(e)}", self.name) | |
# Start detection in background thread | |
import threading | |
detection_thread = threading.Thread(target=run_detection) | |
detection_thread.start() | |
return f"{self.name}: I've initiated the scam detection process. You can check the status anytime by asking 'scam_detect status'." | |
except Exception as e: | |
return f"{self.name}: Error starting scam detection: {str(e)}" | |
elif "scam_detect status" in message.lower(): | |
return self.get_scam_detection_status() | |
return None | |
def scrape_scam_images(self): | |
"""Scrape images from Bing""" | |
chrome_options = webdriver.ChromeOptions() | |
chrome_options.add_argument('--headless') | |
chrome_options.add_argument('--no-sandbox') | |
chrome_options.add_argument('--disable-dev-shm-usage') | |
driver = webdriver.Chrome(options=chrome_options) | |
image_urls = [] | |
try: | |
search_query = "indian scam sms" | |
encoded_query = search_query.replace(' ', '+') | |
driver.get(f"https://www.bing.com/images/search?q={encoded_query}") | |
self.log_communication("Loading images...", self.name) | |
time.sleep(3) | |
for i in range(5): | |
driver.execute_script("window.scrollTo(0, document.body.scrollHeight);") | |
time.sleep(2) | |
self.log_communication(f"Scroll {i+1}/5 completed", self.name) | |
selectors = [".mimg", ".iusc"] | |
for selector in selectors: | |
elements = driver.find_elements(By.CSS_SELECTOR, selector) | |
for element in elements: | |
try: | |
if selector == ".mimg": | |
url = element.get_attribute('src') | |
else: | |
m = element.get_attribute('m') | |
if m: | |
m_json = json.loads(m) | |
url = m_json.get('murl') | |
else: | |
continue | |
if url and url.startswith('http') and url not in image_urls: | |
image_urls.append(url) | |
except Exception as e: | |
self.log_communication(f"Error getting URL: {str(e)}", self.name) | |
return image_urls | |
finally: | |
driver.quit() | |
def process_scam_images(self, image_urls): | |
"""Process scraped images and save messages immediately""" | |
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
last_report_time = datetime.now() | |
images_to_delete = [] | |
total_messages = 0 | |
try: | |
self.log_communication(f"Starting to process {len(image_urls)} images...", self.name) | |
for i, url in enumerate(image_urls, 1): | |
try: | |
self.log_communication(f"Processing image {i}/{len(image_urls)}", self.name) | |
# Download and process image | |
response = requests.get(url, timeout=10) | |
img = Image.open(BytesIO(response.content)) | |
# Save image temporarily | |
img_filename = f"image_{timestamp}_{i}.png" | |
img_path = os.path.join('data', 'images', img_filename) | |
img.save(img_path) | |
images_to_delete.append(img_path) | |
# Extract text | |
text = pytesseract.image_to_string(img) | |
if text.strip(): | |
# Store message immediately | |
message_data = [{'message': text.strip()}] | |
self.update_scam_csv(message_data) | |
total_messages += 1 | |
# Update reports every 30 seconds | |
if (datetime.now() - last_report_time).total_seconds() >= 30: | |
self.generate_status_report(total_messages) | |
last_report_time = datetime.now() | |
except Exception as e: | |
self.log_communication(f"Error processing image {i}: {str(e)}", self.name) | |
continue | |
finally: | |
# Delete processed image immediately | |
for img_path in images_to_delete: | |
try: | |
if os.path.exists(img_path): | |
os.remove(img_path) | |
except Exception as e: | |
self.log_communication(f"Error deleting image {img_path}: {str(e)}", self.name) | |
images_to_delete = [] # Clear the list | |
# Final report update | |
self.generate_status_report(total_messages, is_final=True) | |
finally: | |
# Clean up images directory | |
images_dir = os.path.join('data', 'images') | |
try: | |
if os.path.exists(images_dir): | |
import shutil | |
shutil.rmtree(images_dir) | |
os.makedirs(images_dir) | |
self.log_communication("Images directory cleaned successfully", self.name) | |
except Exception as e: | |
self.log_communication(f"Error cleaning images directory: {str(e)}", self.name) | |
def update_scam_csv(self, new_data): | |
"""Update scam123.csv immediately with new messages""" | |
csv_path = os.path.join('data', 'scam123.csv') | |
try: | |
# Read existing messages | |
existing_messages = set() | |
if os.path.exists(csv_path): | |
with open(csv_path, 'r', encoding='utf-8') as f: | |
reader = csv.DictReader(f) | |
existing_messages = {row['message'] for row in reader} | |
# Add new messages | |
messages_added = 0 | |
for item in new_data: | |
message = item.get('message', '').strip() | |
if message and message not in existing_messages: | |
existing_messages.add(message) | |
messages_added += 1 | |
# Write all messages to CSV | |
with open(csv_path, 'w', encoding='utf-8', newline='') as f: | |
writer = csv.DictWriter(f, fieldnames=['message']) | |
writer.writeheader() | |
for message in existing_messages: | |
writer.writerow({'message': message}) | |
# Create backup | |
backup_path = os.path.join('data', 'backups', f'scam123_backup_{datetime.now().strftime("%Y%m%d_%H%M%S")}.csv') | |
os.makedirs(os.path.join('data', 'backups'), exist_ok=True) | |
with open(backup_path, 'w', encoding='utf-8', newline='') as f: | |
writer = csv.DictWriter(f, fieldnames=['message']) | |
writer.writeheader() | |
for message in existing_messages: | |
writer.writerow({'message': message}) | |
if messages_added > 0: | |
self.log_communication(f"Added {messages_added} new messages to scam123.csv", self.name) | |
except Exception as e: | |
self.log_communication(f"Error updating CSV: {str(e)}", self.name) | |
def generate_status_report(self, total_messages, is_final=False): | |
"""Generate status report""" | |
try: | |
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
report_path = os.path.join('data', 'reports', f'status_report_{timestamp}.txt') | |
with open(report_path, 'w', encoding='utf-8') as f: | |
f.write(f"Scam Detection Status Report\n") | |
f.write(f"Generated at: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n") | |
f.write(f"{'=' * 50}\n\n") | |
f.write(f"Total Messages Processed: {total_messages}\n") | |
if os.path.exists('data/scam123.csv'): | |
with open('data/scam123.csv', 'r', encoding='utf-8') as csv_file: | |
reader = csv.DictReader(csv_file) | |
unique_messages = len(list(reader)) | |
f.write(f"Unique Messages in Database: {unique_messages}\n") | |
if is_final: | |
f.write("\nFinal Status:\n") | |
f.write("Processing completed successfully\n") | |
f.write(f"Total execution time: {(datetime.now() - self.start_time).total_seconds():.2f} seconds\n") | |
self.log_communication(f"Status report generated: {report_path}", self.name) | |
except Exception as e: | |
self.log_communication(f"Error generating status report: {str(e)}", self.name) | |
def get_scam_detection_status(self) -> str: | |
"""Get current status of scam detection""" | |
try: | |
# Check scam123.csv | |
csv_file = os.path.join('data', 'scam123.csv') | |
if not os.path.exists(csv_file): | |
return f"{self.name}: No scam detection data available yet. Process hasn't started or no messages detected." | |
try: | |
# Read CSV and get statistics | |
with open(csv_file, 'r', encoding='utf-8') as f: | |
reader = csv.DictReader(f) | |
messages = list(reader) | |
total_messages = len(messages) | |
status = ( | |
f"{self.name}: Current Scam Detection Status:\n" | |
f"Total Messages Collected: {total_messages}\n" | |
) | |
# Check if process is currently running | |
images_dir = os.path.join('data', 'images') | |
if os.path.exists(images_dir) and len(os.listdir(images_dir)) > 0: | |
status += "\nStatus: RUNNING - Currently processing images..." | |
else: | |
status += "\nStatus: IDLE - Waiting for new detection run" | |
# Add last update time if file exists | |
if total_messages > 0: | |
file_modified_time = datetime.fromtimestamp(os.path.getmtime(csv_file)) | |
status += f"\nLast Updated: {file_modified_time.strftime('%Y-%m-%d %H:%M:%S')}" | |
return status | |
except Exception as e: | |
return f"{self.name}: Error reading scam detection data: {str(e)}" | |
except Exception as e: | |
return f"{self.name}: Error checking status: {str(e)}" | |
# Initialize agents with their roles and prompts | |
ceo_agent = Agent( | |
name="CEO", | |
role="Executive", | |
system_prompt="""You are the CEO of Scamrakshak, a company dedicated to protecting users from digital scams. | |
You can delegate tasks to the Tech Support and Research teams. | |
When given a task about technical implementation or research: | |
1. Break it down into specific sub-tasks | |
2. Assign appropriate tasks to Tech Support and Research teams | |
3. Synthesize their responses into a comprehensive plan | |
4. Provide strategic oversight and direction | |
Format task assignments as: "TASK FOR [AGENT]: [specific task description]" | |
""", | |
conversation_manager=ConversationManager() | |
) | |
tech_support_agent = Agent( | |
name="Tech Support", | |
role="Support", | |
system_prompt="""You are Scamrakshak's Technical Support specialist. | |
When assigned tasks by the CEO: | |
1. Analyze technical requirements | |
2. Provide detailed implementation steps | |
3. Consider security implications | |
4. Suggest best practices and potential challenges | |
5. Research technical solutions using available resources | |
Focus on practical, secure, and efficient solutions. | |
Always consider Android best practices and security guidelines. | |
""", | |
conversation_manager=ConversationManager() | |
) | |
researcher_agent = Agent( | |
name="Researcher", | |
role="Analyst", | |
system_prompt="""You are Scamrakshak's Research Analyst specializing in scam trends and prevention. | |
When assigned tasks by the CEO: | |
1. Research current trends and solutions | |
2. Analyze market data and competitor approaches | |
3. Provide data-backed recommendations | |
4. Consider regulatory and compliance aspects | |
5. Identify potential risks and opportunities | |
Use research results to provide comprehensive analysis. | |
Focus on actionable insights and industry best practices. | |
""", | |
conversation_manager=ConversationManager() | |
) | |
class AgentSystem: | |
def __init__(self): | |
self.agents = { | |
"CEO": ceo_agent, | |
"Tech Support": tech_support_agent, | |
"Researcher": researcher_agent | |
} | |
self.current_agent = "CEO" | |
self.conversation_log = [] | |
def switch_agent(self, agent_name: str) -> str: | |
if agent_name in self.agents: | |
self.current_agent = agent_name | |
return f"Switched to {agent_name}" | |
return f"Invalid agent name. Available agents: {', '.join(self.agents.keys())}" | |
def process_task_chain(self, initial_input: str) -> List[str]: | |
"""Process a task through multiple agents""" | |
responses = [] | |
# Log initial request | |
print(f"\n{Fore.CYAN}=== Starting New Task Chain ==={Style.RESET_ALL}") | |
self.agents["CEO"].log_communication(initial_input, "User") | |
# CEO processes initial request | |
print(f"\n{Fore.CYAN}=== CEO Analyzing Request ==={Style.RESET_ALL}") | |
ceo_response = self.agents["CEO"].get_response(initial_input) | |
responses.append(ceo_response) | |
# Extract and process tasks immediately | |
tasks_found = False | |
for line in ceo_response.split('\n'): | |
if "TASK FOR" in line: | |
tasks_found = True | |
target_agent = line.split("TASK FOR")[1].split(":")[0].strip() | |
task = line.split(":", 1)[1].strip() | |
if target_agent.upper() == "RESEARCH TEAM": | |
target_agent = "Researcher" # Map to correct agent name | |
elif target_agent.upper() == "TECH SUPPORT TEAM": | |
target_agent = "Tech Support" # Map to correct agent name | |
if target_agent in self.agents: | |
print(f"\n{Fore.CYAN}=== {target_agent} Processing Task ==={Style.RESET_ALL}") | |
# Assign and process task immediately | |
self.agents[target_agent].assign_task(task, "CEO") | |
response = self.agents[target_agent].process_task() | |
if response: | |
responses.append(response) | |
print(f"\n{Fore.GREEN}=== {target_agent} Task Complete ==={Style.RESET_ALL}") | |
if tasks_found: | |
# CEO synthesizes all responses | |
print(f"\n{Fore.CYAN}=== CEO Synthesizing All Responses ==={Style.RESET_ALL}") | |
synthesis_prompt = ( | |
"Based on the research team and tech support findings above, " | |
"provide a comprehensive summary and strategic recommendations. " | |
"Include specific action items and next steps." | |
) | |
final_response = self.agents["CEO"].get_response(synthesis_prompt) | |
responses.append(final_response) | |
else: | |
print(f"{Fore.RED}No tasks were delegated in the CEO's response{Style.RESET_ALL}") | |
print(f"\n{Fore.CYAN}=== Task Chain Complete ==={Style.RESET_ALL}\n") | |
return responses | |
def get_response(self, user_input: str) -> str: | |
if "implement" in user_input.lower() or "research" in user_input.lower(): | |
# Process as a task chain | |
responses = self.process_task_chain(user_input) | |
return "\n\n".join(responses) | |
else: | |
# Normal single-agent response | |
self.agents[self.current_agent].log_communication(user_input, "User") | |
response = self.agents[self.current_agent].get_response(user_input) | |
return response | |
# Initialize agent system | |
agent_system = AgentSystem() | |
def chat_interface(message: str, history: List[List[str]]) -> str: | |
"""Handle chat interactions and agent responses""" | |
print(f"\n{Fore.CYAN}=== New User Message ==={Style.RESET_ALL}") | |
# Check for agent switch command | |
if message.startswith("/switch"): | |
try: | |
_, agent_name = message.split(" ", 1) | |
response = agent_system.switch_agent(agent_name) | |
print(f"{Fore.YELLOW}[SYSTEM] {response}{Style.RESET_ALL}") | |
return response | |
except ValueError: | |
error_msg = "Invalid switch command. Use: /switch [CEO|Tech Support|Researcher]" | |
print(f"{Fore.RED}[ERROR] {error_msg}{Style.RESET_ALL}") | |
return error_msg | |
else: | |
# Get response from current agent | |
return agent_system.get_response(message) | |
# Add this new class for team chat | |
class TeamChat: | |
def __init__(self, agents: Dict[str, Agent]): | |
self.agents = agents | |
self.is_active = False | |
self.conversation_manager = ConversationManager() | |
def process_team_message(self, message: str, from_role: str = "Founder") -> List[str]: | |
"""Process a message in team chat mode""" | |
responses = [] | |
if from_role == "Founder": | |
# CEO responds to founder's task | |
ceo_prompt = f"As CEO, respond briefly to the founder's request: {message}. Keep it under 50 words and professional." | |
ceo_response = self.agents["CEO"].get_response(ceo_prompt) | |
responses.append(ceo_response) | |
# CEO delegates if needed | |
if "implement" in message.lower() or "research" in message.lower(): | |
delegation_prompt = f"Delegate this task briefly to team members: {message}. Keep each delegation under 30 words." | |
delegation = self.agents["CEO"].get_response(delegation_prompt) | |
responses.append(delegation) | |
# Team members acknowledge | |
for agent_name in ["Tech Support", "Researcher"]: | |
ack_prompt = f"Acknowledge the task briefly and professionally. Keep it under 20 words." | |
ack = self.agents[agent_name].get_response(ack_prompt) | |
responses.append(ack) | |
else: | |
# Normal team member response | |
response_prompt = f"Respond briefly to the team chat message: {message}. Keep it under 30 words and professional." | |
response = self.agents[from_role].get_response(response_prompt) | |
responses.append(response) | |
return responses | |
def create_interface(): | |
"""Create and configure the Gradio interface""" | |
with gr.Blocks( | |
title="Scamrakshak AI Assistant", | |
theme=gr.themes.Soft(), | |
css=""" | |
.gradio-container { | |
font-family: 'Arial', sans-serif; | |
max-width: 1000px; | |
margin: auto; | |
} | |
.agent-status { | |
padding: 1rem; | |
margin: 1rem 0; | |
border-radius: 0.5rem; | |
background-color: #f8f9fa; | |
border: 1px solid #dee2e6; | |
} | |
.agent-indicator { | |
display: inline-block; | |
padding: 0.25rem 0.5rem; | |
border-radius: 0.25rem; | |
margin-right: 0.5rem; | |
font-weight: bold; | |
} | |
.ceo-color { background-color: #e3f2fd; color: #1565c0; } | |
.tech-color { background-color: #f3e5f5; color: #7b1fa2; } | |
.research-color { background-color: #e8f5e9; color: #2e7d32; } | |
.chat-message { | |
padding: 1rem; | |
margin: 0.5rem; | |
border-radius: 0.5rem; | |
border-left: 4px solid; | |
} | |
.ceo-message { border-left-color: #1565c0; } | |
.tech-message { border-left-color: #7b1fa2; } | |
.research-message { border-left-color: #2e7d32; } | |
.user-message { border-left-color: #ff9800; } | |
.task-delegation { | |
background-color: #fff3e0; | |
border: 1px solid #ffe0b2; | |
padding: 0.5rem; | |
margin: 0.5rem 0; | |
border-radius: 0.25rem; | |
} | |
""" | |
) as interface: | |
with gr.Row(): | |
gr.Markdown(""" | |
# 🤖 Scamrakshak AI Assistant | |
An advanced AI system with three specialized agents working together to protect you from scams. | |
""") | |
# Agent Status Panel | |
with gr.Row() as agent_status: | |
with gr.Column(scale=1): | |
gr.Markdown(""" | |
### Active Agents | |
""") | |
with gr.Group(elem_classes="agent-status"): | |
current_agent = gr.Textbox( | |
label="Current Active Agent", | |
value="CEO", | |
interactive=False, | |
elem_classes="agent-indicator ceo-color" | |
) | |
gr.Markdown(""" | |
#### Available Agents: | |
- 👔 **CEO** - Strategic oversight and task delegation | |
- 🛠️ **Tech Support** - Technical implementation and security | |
- 🔍 **Researcher** - Trend analysis and market research | |
Use `/switch [agent]` to change agents | |
""") | |
# Main Chat Interface | |
with gr.Row(): | |
with gr.Column(scale=3): | |
chatbot = gr.Chatbot( | |
label="Conversation", | |
height=600, | |
container=True, | |
show_label=True, | |
elem_id="chatbot" | |
) | |
with gr.Row(): | |
with gr.Column(scale=4): | |
msg = gr.Textbox( | |
label="Your message", | |
placeholder="Ask a question or use /team to start team chat...", | |
lines=2, | |
show_label=True, | |
container=True | |
) | |
with gr.Column(scale=1): | |
with gr.Row(): | |
send = gr.Button("Send", variant="primary") | |
clear = gr.Button("Clear", variant="stop") | |
# Add team chat controls | |
with gr.Row(): | |
team_chat_active = gr.Checkbox( | |
label="Team Chat Mode", | |
value=False, | |
interactive=True | |
) | |
current_role = gr.Dropdown( | |
choices=["Founder", "CEO", "Tech Support", "Researcher"], | |
value="Founder", | |
label="Speaking As", | |
interactive=True | |
) | |
# Message handling functions remain the same | |
def user_message(message: str, history: List[List[str]], is_team_chat: bool, role: str) -> tuple[List[List[str]], str]: | |
if message.strip() == "": | |
return history, "" | |
if message.startswith("/team"): | |
is_team_chat = True | |
return history, "" | |
if is_team_chat: | |
# Process team chat message | |
team_chat = TeamChat(agent_system.agents) | |
responses = team_chat.process_team_message(message, role) | |
# Format team chat messages | |
history.append([ | |
f'<div class="team-chat-message {role.lower()}-message">{role}: {message}</div>', | |
"" | |
]) | |
for response in responses: | |
agent = response.split(":")[0] | |
content = response.split(":", 1)[1] | |
history.append([ | |
"", | |
f'<div class="team-chat-message {agent.lower()}-message">{response}</div>' | |
]) | |
else: | |
# Normal chat processing | |
response = chat_interface(message, history) | |
history.append([ | |
f'<div class="user-message">{message}</div>', | |
response | |
]) | |
return history, "" | |
# Connect interface elements | |
msg.submit( | |
user_message, | |
[msg, chatbot, team_chat_active, current_role], | |
[chatbot, msg] | |
) | |
send.click( | |
user_message, | |
[msg, chatbot, team_chat_active, current_role], | |
[chatbot, msg] | |
) | |
clear.click(lambda: ([], ""), None, [chatbot, msg]) | |
# Update current agent display | |
def update_current_agent(message: str) -> str: | |
if message.startswith("/switch"): | |
try: | |
_, agent_name = message.split(" ", 1) | |
if agent_name in ["CEO", "Tech Support", "Researcher"]: | |
return agent_name | |
except: | |
pass | |
return current_agent.value | |
msg.submit(update_current_agent, [msg], [current_agent]) | |
send.click(update_current_agent, [msg], [current_agent]) | |
# Add Team Chat section | |
with gr.Tab("Team Chat"): | |
with gr.Column(): | |
gr.Markdown(""" | |
# 👥 Team Chat Room | |
Watch the Scamrakshak team have spontaneous work discussions! | |
""") | |
team_chat_box = gr.Chatbot( | |
label="Team Discussion", | |
height=400 | |
) | |
start_discussion = gr.Button("Start New Team Discussion", variant="primary") | |
def trigger_team_discussion() -> List[List[str]]: | |
team_chat = TeamChat(agent_system.agents) | |
discussion = team_chat.start_team_discussion() | |
formatted_discussion = [] | |
for msg in discussion: | |
agent = msg.split(":")[0] | |
content = msg.split(":", 1)[1] | |
formatted_discussion.append([ | |
"", | |
f'<div class="{agent.lower()}-message">{msg}</div>' | |
]) | |
return formatted_discussion | |
start_discussion.click( | |
trigger_team_discussion, | |
outputs=[team_chat_box] | |
) | |
gr.Markdown(""" | |
### About Team Chat | |
- Team members spontaneously discuss work-related topics | |
- Discussions are focused on improving Scamrakshak's services | |
- Watch how different team members contribute their expertise | |
- Topics include security, features, market trends, and more | |
""") | |
return interface | |
if __name__ == "__main__": | |
# Create and launch the interface | |
demo = create_interface() | |
demo.queue() # Enable queuing for better handling of multiple requests | |
demo.launch( | |
server_name="0.0.0.0", | |
server_port=7860, | |
share=True, | |
show_error=True, | |
show_api=False | |
) |