# imports import gradio as gr from crewai import Agent, Task, Crew, LLM from crewai_tools import ScrapeWebsiteTool import queue import threading import asyncio from typing import List, Dict, Generator # Message Queue System to manage flow of message class SupportMessageQueue: def __init__(self): self.message_queue = queue.Queue() self.last_agent = None def add_message(self, message: Dict): print(f"Adding message to queue: {message}") self.message_queue.put(message) def get_messages(self) -> List[Dict]: messages = [] while not self.message_queue.empty(): messages.append(self.message_queue.get()) return messages # main class class SupportCrew: def __init__(self, api_key: str = None): self.api_key = api_key self.message_queue = SupportMessageQueue() self.support_agent = None self.qa_agent = None self.current_agent = None self.scrape_tool = None # agent initialization with role, goal, and backstory def initialize_agents(self, website_url: str): if not self.api_key: raise ValueError("OpenAI API key is required") self.llm = LLM( model="huggingface/meta-llama/Llama-3.3-70B-Instruct", api_key=self.api_key, ) self.scrape_tool = ScrapeWebsiteTool(website_url=website_url) self.support_agent = Agent( role="Senior Support Representative", goal="Be the most friendly and helpful support representative in your team", backstory=( "You work at crewAI and are now working on providing support to customers. " "You need to make sure that you provide the best support! " "Make sure to provide full complete answers, and make no assumptions." ), llm=self.llm, allow_delegation=False, verbose=True, ) self.qa_agent = Agent( role="Support Quality Assurance Specialist", goal="Get recognition for providing the best support quality assurance in your team", backstory=( "You work at crewAI and are now working with your team on customer requests " "ensuring that the support representative is providing the best support possible. " "You need to make sure that the support representative is providing full " "complete answers, and make no assumptions." ), llm=self.llm, verbose=True, ) # task creation with description and expected output format and tools def create_tasks(self, inquiry: str) -> List[Task]: inquiry_resolution = Task( description=( f"A customer just reached out with a super important ask:\n{inquiry}\n\n" "Make sure to use everything you know to provide the best support possible. " "You must strive to provide a complete and accurate response to the customer's inquiry." ), expected_output=( "A detailed, informative response to the customer's inquiry that addresses " "all aspects of their question.\n" "The response should include references to everything you used to find the answer, " "including external data or solutions. Ensure the answer is complete, " "leaving no questions unanswered, and maintain a helpful and friendly tone throughout." ), tools=[self.scrape_tool], agent=self.support_agent, ) quality_assurance_review = Task( description=( "Review the response drafted by the Senior Support Representative for the customer's inquiry. " "Ensure that the answer is comprehensive, accurate, and adheres to the " "high-quality standards expected for customer support.\n" "Verify that all parts of the customer's inquiry have been addressed " "thoroughly, with a helpful and friendly tone.\n" "Check for references and sources used to find the information, " "ensuring the response is well-supported and leaves no questions unanswered." ), expected_output=( "A final, detailed, and informative response ready to be sent to the customer.\n" "This response should fully address the customer's inquiry, incorporating all " "relevant feedback and improvements.\n" "Don't be too formal, maintain a professional and friendly tone throughout." ), agent=self.qa_agent, ) return [inquiry_resolution, quality_assurance_review] # main processing function async def process_support( self, inquiry: str, website_url: str ) -> Generator[List[Dict], None, None]: def add_agent_messages(agent_name: str, tasks: str, emoji: str = "🤖"): self.message_queue.add_message( { "role": "assistant", "content": agent_name, "metadata": {"title": f"{emoji} {agent_name}"}, } ) self.message_queue.add_message( { "role": "assistant", "content": tasks, "metadata": {"title": f"📋 Task for {agent_name}"}, } ) # Manages transition between agents def setup_next_agent(current_agent: str) -> None: if current_agent == "Senior Support Representative": self.current_agent = "Support Quality Assurance Specialist" add_agent_messages( "Support Quality Assurance Specialist", "Review and improve the support representative's response", ) def task_callback(task_output) -> None: print(f"Task callback received: {task_output}") raw_output = task_output.raw if "## Final Answer:" in raw_output: content = raw_output.split("## Final Answer:")[1].strip() else: content = raw_output.strip() if self.current_agent == "Support Quality Assurance Specialist": self.message_queue.add_message( { "role": "assistant", "content": "Final response is ready!", "metadata": {"title": "✅ Final Response"}, } ) formatted_content = content formatted_content = formatted_content.replace("\n#", "\n\n#") formatted_content = formatted_content.replace("\n-", "\n\n-") formatted_content = formatted_content.replace("\n*", "\n\n*") formatted_content = formatted_content.replace("\n1.", "\n\n1.") formatted_content = formatted_content.replace("\n\n\n", "\n\n") self.message_queue.add_message( {"role": "assistant", "content": formatted_content} ) else: self.message_queue.add_message( { "role": "assistant", "content": content, "metadata": {"title": f"✨ Output from {self.current_agent}"}, } ) setup_next_agent(self.current_agent) try: self.initialize_agents(website_url) self.current_agent = "Senior Support Representative" yield [ { "role": "assistant", "content": "Starting to process your inquiry...", "metadata": {"title": "🚀 Process Started"}, } ] add_agent_messages( "Senior Support Representative", "Analyze customer inquiry and provide comprehensive support", ) crew = Crew( agents=[self.support_agent, self.qa_agent], tasks=self.create_tasks(inquiry), verbose=True, task_callback=task_callback, ) def run_crew(): try: crew.kickoff() except Exception as e: print(f"Error in crew execution: {str(e)}") self.message_queue.add_message( { "role": "assistant", "content": f"An error occurred: {str(e)}", "metadata": {"title": "❌ Error"}, } ) thread = threading.Thread(target=run_crew) thread.start() while thread.is_alive() or not self.message_queue.message_queue.empty(): messages = self.message_queue.get_messages() if messages: print(f"Yielding messages: {messages}") yield messages await asyncio.sleep(0.1) except Exception as e: print(f"Error in process_support: {str(e)}") yield [ { "role": "assistant", "content": f"An error occurred: {str(e)}", "metadata": {"title": "❌ Error"}, } ] def create_demo(): support_crew = None with gr.Blocks(theme=gr.themes.Ocean()) as demo: gr.Markdown("# 🎯 AI Customer Support Crew") gr.Markdown( "This is a friendly, high-performing multi-agent application built with Gradio and CrewAI. Enter a webpage URL and your questions from that webpage." ) chatbot = gr.Chatbot( label="Support Process", height=700, type="messages", show_label=True, avatar_images=( None, "https://avatars.githubusercontent.com/u/170677839?v=4", ), render_markdown=True, ) with gr.Row(equal_height=True): inquiry = gr.Textbox( label="Your Inquiry", placeholder="Enter your question...", scale=4, ) website_url = gr.Textbox( label="Documentation URL", placeholder="Enter documentation URL to search...", scale=4, ) btn = gr.Button("Get Support", variant="primary", scale=1) async def process_input( inquiry_text, website_url_text, history, oauth_token: gr.OAuthToken | None ): nonlocal support_crew api_key = oauth_token.token if not api_key: history = history or [] history.append( { "role": "assistant", "content": "Please provide huggingface key.", "metadata": {"title": "❌ Error"}, } ) yield history return if support_crew is None: support_crew = SupportCrew(api_key=api_key) history = history or [] history.append( { "role": "user", "content": f"Question: {inquiry_text}\nDocumentation: {website_url_text}", } ) yield history try: async for messages in support_crew.process_support( inquiry_text, website_url_text ): history.extend(messages) yield history except Exception as e: history.append( { "role": "assistant", "content": f"An error occurred: {str(e)}", "metadata": {"title": "❌ Error"}, } ) yield history btn.click(process_input, [inquiry, website_url, chatbot], [chatbot]) inquiry.submit(process_input, [inquiry, website_url, chatbot], [chatbot]) return demo def swap_visibilty(profile: gr.OAuthProfile | None): return ( gr.update(elem_classes=["main_ui_logged_in"]) if profile else gr.update(elem_classes=["main_ui_logged_out"]) ) css = """ .main_ui_logged_out{opacity: 0.3; pointer-events: none} """ interface = create_demo() with gr.Blocks(css=css) as demo: gr.LoginButton() with gr.Column(elem_classes="main_ui_logged_out") as main_ui: interface.render() demo.load(fn=swap_visibilty, outputs=main_ui) if __name__ == "__main__": demo.queue() demo.launch()