|
from crewai import Agent, Task, Crew |
|
import gradio as gr |
|
import asyncio |
|
from typing import List, Generator, Any, Dict, Union |
|
from langchain_openai import ChatOpenAI |
|
import queue |
|
import threading |
|
import os |
|
|
|
class AgentMessageQueue: |
|
def __init__(self): |
|
self.message_queue = queue.Queue() |
|
self.final_output = None |
|
|
|
def add_message(self, message: Dict): |
|
self.message_queue.put(message) |
|
|
|
def get_messages(self) -> List[Dict]: |
|
messages = [] |
|
while not self.message_queue.empty(): |
|
messages.append(self.message_queue.get()) |
|
return messages |
|
|
|
def set_final_output(self, output: str): |
|
self.final_output = output |
|
|
|
def get_final_output(self) -> str: |
|
return self.final_output |
|
|
|
class ArticleCrew: |
|
def __init__(self, api_key: str = None): |
|
self.api_key = api_key |
|
self.message_queue = AgentMessageQueue() |
|
self.planner = None |
|
self.writer = None |
|
self.editor = None |
|
|
|
def initialize_agents(self, topic: str): |
|
if not self.api_key: |
|
raise ValueError("OpenAI API key is required") |
|
|
|
os.environ["OPENAI_API_KEY"] = self.api_key |
|
|
|
llm = ChatOpenAI( |
|
temperature=0.7, |
|
model="gpt-4" |
|
) |
|
|
|
self.planner = Agent( |
|
role="Content Planner", |
|
goal=f"Plan engaging and factually accurate content on {topic}", |
|
backstory=f"You're working on planning a blog article about the topic: {topic}. " |
|
"You collect information that helps the audience learn something " |
|
"and make informed decisions.", |
|
allow_delegation=False, |
|
verbose=True, |
|
llm=llm |
|
) |
|
|
|
self.writer = Agent( |
|
role="Content Writer", |
|
goal=f"Write insightful and factually accurate opinion piece about the topic: {topic}", |
|
backstory=f"You're working on writing a new opinion piece about the topic: {topic}. " |
|
"You base your writing on the work of the Content Planner.", |
|
allow_delegation=False, |
|
verbose=True, |
|
llm=llm |
|
) |
|
|
|
self.editor = Agent( |
|
role="Editor", |
|
goal="Edit a given blog post to align with the writing style", |
|
backstory="You are an editor who receives a blog post from the Content Writer.", |
|
allow_delegation=False, |
|
verbose=True, |
|
llm=llm |
|
) |
|
|
|
def create_tasks(self, topic: str): |
|
if not self.planner or not self.writer or not self.editor: |
|
self.initialize_agents(topic) |
|
|
|
plan_task = Task( |
|
description=( |
|
f"1. Prioritize the latest trends, key players, and noteworthy news on {topic}.\n" |
|
f"2. Identify the target audience, considering their interests and pain points.\n" |
|
f"3. Develop a detailed content outline including introduction, key points, and call to action.\n" |
|
f"4. Include SEO keywords and relevant data or sources." |
|
), |
|
expected_output="A comprehensive content plan document with an outline, audience analysis, SEO keywords, and resources.", |
|
agent=self.planner |
|
) |
|
|
|
write_task = Task( |
|
description=( |
|
"1. Use the content plan to craft a compelling blog post.\n" |
|
"2. Incorporate SEO keywords naturally.\n" |
|
"3. Sections/Subtitles are properly named in an engaging manner.\n" |
|
"4. Ensure proper structure with introduction, body, and conclusion.\n" |
|
"5. Proofread for grammatical errors." |
|
), |
|
expected_output="A well-written blog post in markdown format, ready for publication.", |
|
agent=self.writer |
|
) |
|
|
|
edit_task = Task( |
|
description="Proofread the given blog post for grammatical errors and alignment with the brand's voice.", |
|
expected_output="A well-written blog post in markdown format, ready for publication.", |
|
agent=self.editor |
|
) |
|
|
|
return [plan_task, write_task, edit_task] |
|
|
|
async def process_article(self, topic: str) -> Generator[List[Dict], None, None]: |
|
def step_callback(output: Any) -> None: |
|
try: |
|
output_str = str(output).strip() |
|
|
|
|
|
if "# Agent:" in output_str: |
|
agent_name = output_str.split("# Agent:")[1].split("\n")[0].strip() |
|
else: |
|
agent_name = "Agent" |
|
|
|
|
|
if "## Task:" in output_str: |
|
content = output_str.split("## Task:")[1].split("\n#")[0].strip() |
|
self.message_queue.add_message({ |
|
"role": "assistant", |
|
"content": content, |
|
"metadata": {"title": f"π {agent_name}'s Task"} |
|
}) |
|
elif "## Final Answer:" in output_str: |
|
content = output_str.split("## Final Answer:")[1].strip() |
|
if agent_name == "Editor": |
|
|
|
self.message_queue.set_final_output(content) |
|
self.message_queue.add_message({ |
|
"role": "assistant", |
|
"content": content, |
|
"metadata": {"title": f"β
{agent_name}'s Output"} |
|
}) |
|
else: |
|
self.message_queue.add_message({ |
|
"role": "assistant", |
|
"content": output_str, |
|
"metadata": {"title": f"π {agent_name} thinking"} |
|
}) |
|
|
|
except Exception as e: |
|
print(f"Error in step_callback: {str(e)}") |
|
|
|
def task_callback(output: Any) -> None: |
|
try: |
|
content = str(output) |
|
if hasattr(output, 'agent'): |
|
agent_name = str(output.agent) |
|
else: |
|
agent_name = "Agent" |
|
|
|
self.message_queue.add_message({ |
|
"role": "assistant", |
|
"content": content.strip(), |
|
"metadata": {"title": f"β
Task completed by {agent_name}"} |
|
}) |
|
|
|
|
|
if agent_name == "Editor": |
|
final_content = self.message_queue.get_final_output() |
|
if final_content: |
|
self.message_queue.add_message({ |
|
"role": "assistant", |
|
"content": "Here's your completed article:", |
|
"metadata": {"title": "π Final Article"} |
|
}) |
|
self.message_queue.add_message({ |
|
"role": "assistant", |
|
"content": final_content |
|
}) |
|
self.message_queue.add_message({ |
|
"role": "assistant", |
|
"content": "Article generation completed!", |
|
"metadata": {"title": "β¨ Complete"} |
|
}) |
|
|
|
except Exception as e: |
|
print(f"Error in task_callback: {str(e)}") |
|
|
|
self.initialize_agents(topic) |
|
|
|
crew = Crew( |
|
agents=[self.planner, self.writer, self.editor], |
|
tasks=self.create_tasks(topic), |
|
verbose=True, |
|
step_callback=step_callback, |
|
task_callback=task_callback |
|
) |
|
|
|
|
|
yield [{ |
|
"role": "assistant", |
|
"content": "Starting work on your article...", |
|
"metadata": {"title": "π Process Started"} |
|
}] |
|
|
|
|
|
result_container = [] |
|
def run_crew(): |
|
try: |
|
result = crew.kickoff(inputs={"topic": topic}) |
|
result_container.append(result) |
|
except Exception as e: |
|
result_container.append(e) |
|
print(f"Error occurred: {str(e)}") |
|
|
|
thread = threading.Thread(target=run_crew) |
|
thread.start() |
|
|
|
|
|
while thread.is_alive() or not self.message_queue.message_queue.empty(): |
|
messages = self.message_queue.get_messages() |
|
if messages: |
|
yield messages |
|
await asyncio.sleep(0.1) |
|
|
|
def create_demo(): |
|
article_crew = None |
|
|
|
with gr.Blocks(theme=gr.themes.Soft()) as demo: |
|
gr.Markdown("# π AI Article Writing Crew") |
|
gr.Markdown("Watch as this AI Crew collaborates to create your article! This application utilizes [CrewAI](https://www.crewai.com/) agents: Content Planner, Content Writer, and Content Editor, to write an article on any topic you choose. To get started, enter your OpenAI API Key below and press Enter!") |
|
|
|
openai_api_key = gr.Textbox( |
|
label='OpenAI API Key', |
|
type='password', |
|
placeholder='Type your OpenAI API key and press Enter!', |
|
interactive=True |
|
) |
|
|
|
chatbot = gr.Chatbot( |
|
label="Writing Process", |
|
avatar_images=(None, "https://avatars.githubusercontent.com/u/170677839?v=4"), |
|
height=700, |
|
type="messages", |
|
show_label=True, |
|
visible=False, |
|
value=[] |
|
) |
|
|
|
with gr.Row(equal_height=True): |
|
topic = gr.Textbox( |
|
label="Article Topic", |
|
placeholder="Enter the topic you want an article about...", |
|
scale=4, |
|
visible=False |
|
) |
|
|
|
async def process_input(topic, history, api_key): |
|
nonlocal article_crew |
|
if not api_key: |
|
history.append({ |
|
"role": "assistant", |
|
"content": "Please provide an OpenAI API key first.", |
|
"metadata": {"title": "β Error"} |
|
}) |
|
yield history |
|
return |
|
|
|
|
|
if article_crew is None: |
|
article_crew = ArticleCrew(api_key=api_key) |
|
else: |
|
article_crew.api_key = api_key |
|
|
|
|
|
history.append({ |
|
"role": "user", |
|
"content": f"Write an article about: {topic}" |
|
}) |
|
yield history |
|
|
|
try: |
|
async for messages in article_crew.process_article(topic): |
|
history.extend(messages) |
|
yield history |
|
except Exception as e: |
|
history.append({ |
|
"role": "assistant", |
|
"content": f"An error occurred: {str(e)}", |
|
"metadata": {"title": "β Error"} |
|
}) |
|
yield history |
|
|
|
btn = gr.Button("Write Article", variant="primary", scale=1, visible=False) |
|
|
|
def show_interface(): |
|
return { |
|
openai_api_key: gr.Textbox(visible=False), |
|
chatbot: gr.Chatbot(visible=True), |
|
topic: gr.Textbox(visible=True), |
|
btn: gr.Button(visible=True) |
|
} |
|
|
|
openai_api_key.submit( |
|
show_interface, |
|
None, |
|
[openai_api_key, chatbot, topic, btn] |
|
) |
|
|
|
btn.click( |
|
process_input, |
|
inputs=[topic, chatbot, openai_api_key], |
|
outputs=[chatbot] |
|
) |
|
|
|
return demo |
|
|
|
if __name__ == "__main__": |
|
demo = create_demo() |
|
demo.queue() |
|
demo.launch(debug=True) |