# File: prompts.py DOCUMENT_OUTLINE_PROMPT_SYSTEM = """You are a document generator. Provide the outline of the document requested in in JSON format. Include sections and subsections if required. Use the "Content" field to provide a specific prompt or instruction for generating content for that particular section or subsection. make sure the Sections follow a logical flow and each prompt's content does not overlap with other sections. OUTPUT IN FOLLOWING JSON FORMAT enclosed in tags { "Document": { "Title": "Document Title", "Author": "Author Name", "Date": "YYYY-MM-DD", "Version": "1.0", "Sections": [ { "SectionNumber": "1", "Title": "Section Title", "Content": "Specific prompt or instruction for generating content for this section", "Subsections": [ { "SectionNumber": "1.1", "Title": "Subsection Title", "Content": "Specific prompt or instruction for generating content for this subsection" } ] } ] } } """ DOCUMENT_OUTLINE_PROMPT_USER = """{query}""" DOCUMENT_SECTION_PROMPT_SYSTEM = """You are a document generator, You need to output only the content requested in the section in the prompt. FORMAT YOUR OUTPUT AS MARKDOWN ENCLOSED IN tags {overall_objective} {document_layout}""" DOCUMENT_SECTION_PROMPT_USER = """Output the content for the section "{section_or_subsection_title}" formatted as markdown. Follow this instruction: {content_instruction}""" # File: app.py import os import json import re import asyncio import time from typing import List, Dict, Optional, Any, Callable from openai import OpenAI import logging import functools from fastapi import APIRouter, HTTPException, Request from fastapi.responses import StreamingResponse from pydantic import BaseModel from fastapi_cache.decorator import cache import psycopg2 from datetime import datetime logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) def log_execution(func: Callable) -> Callable: @functools.wraps(func) def wrapper(*args: Any, **kwargs: Any) -> Any: logger.info(f"Executing {func.__name__}") try: result = func(*args, **kwargs) logger.info(f"{func.__name__} completed successfully") return result except Exception as e: logger.error(f"Error in {func.__name__}: {e}") raise return wrapper class DatabaseManager: """Manages database operations.""" def __init__(self): self.db_params = { "dbname": "postgres", "user": os.environ['SUPABASE_USER'], "password": os.environ['SUPABASE_PASSWORD'], "host": "aws-0-us-west-1.pooler.supabase.com", "port": "5432" } @log_execution def update_database(self, user_id: str, user_query: str, response: str) -> None: with psycopg2.connect(**self.db_params) as conn: with conn.cursor() as cur: insert_query = """ INSERT INTO ai_document_generator (user_id, user_query, response) VALUES (%s, %s, %s); """ cur.execute(insert_query, (user_id, user_query, response)) class AIClient: def __init__(self): self.client = OpenAI( base_url="https://openrouter.ai/api/v1", api_key="sk-or-v1-"+os.environ['OPENROUTER_API_KEY'] ) @log_execution def generate_response( self, messages: List[Dict[str, str]], model: str = "openai/gpt-4o-mini", max_tokens: int = 32000 ) -> Optional[str]: if not messages: return None response = self.client.chat.completions.create( model=model, messages=messages, max_tokens=max_tokens, stream=False ) return response.choices[0].message.content class DocumentGenerator: def __init__(self, ai_client: AIClient): self.ai_client = ai_client self.document_outline = None self.content_messages = [] @staticmethod def extract_between_tags(text: str, tag: str) -> str: pattern = f"<{tag}>(.*?)" match = re.search(pattern, text, re.DOTALL) return match.group(1).strip() if match else "" @staticmethod def remove_duplicate_title(content: str, title: str, section_number: str) -> str: patterns = [ rf"^#+\s*{re.escape(section_number)}(?:\s+|\s*:\s*|\.\s*){re.escape(title)}", rf"^#+\s*{re.escape(title)}", rf"^{re.escape(section_number)}(?:\s+|\s*:\s*|\.\s*){re.escape(title)}", rf"^{re.escape(title)}", ] for pattern in patterns: content = re.sub(pattern, "", content, flags=re.MULTILINE | re.IGNORECASE) return content.lstrip() @log_execution def generate_document_outline(self, query: str, max_retries: int = 3) -> Optional[Dict]: messages = [ {"role": "system", "content": DOCUMENT_OUTLINE_PROMPT_SYSTEM}, {"role": "user", "content": DOCUMENT_OUTLINE_PROMPT_USER.format(query=query)} ] for attempt in range(max_retries): outline_response = self.ai_client.generate_response(messages, model="openai/gpt-4o") outline_json_text = self.extract_between_tags(outline_response, "output") try: self.document_outline = json.loads(outline_json_text) return self.document_outline except json.JSONDecodeError as e: if attempt < max_retries - 1: logger.warning(f"Failed to parse JSON (attempt {attempt + 1}): {e}") logger.info("Retrying...") else: logger.error(f"Failed to parse JSON after {max_retries} attempts: {e}") return None @log_execution def generate_content(self, title: str, content_instruction: str, section_number: str) -> str: self.content_messages.append({ "role": "user", "content": DOCUMENT_SECTION_PROMPT_USER.format( section_or_subsection_title=title, content_instruction=content_instruction ) }) section_response = self.ai_client.generate_response(self.content_messages) content = self.extract_between_tags(section_response, "response") content = self.remove_duplicate_title(content, title, section_number) self.content_messages.append({ "role": "assistant", "content": section_response }) return content class MarkdownConverter: @staticmethod def slugify(text: str) -> str: return re.sub(r'\W+', '-', text.lower()) @classmethod def generate_toc(cls, sections: List[Dict]) -> str: toc = "
\n\n" toc += "

Table of Contents

\n\n" toc += "\n\n" return toc @classmethod def convert_to_markdown(cls, document: Dict) -> str: markdown = "
\n\n" markdown += f"

{document['Title']}

\n\n" markdown += f"

By {document['Author']}

\n\n" markdown += f"

Version {document['Version']} | {document['Date']}

\n\n" markdown += "
\n\n" markdown += cls.generate_toc(document['Sections']) markdown += "
\n\n" for section in document['Sections']: markdown += "
\n\n" section_number = section['SectionNumber'] section_title = section['Title'] markdown += f"

{section_number}. {section_title}

\n\n" markdown += f"
\n\n{section['Content']}\n\n
\n\n" for subsection in section.get('Subsections', []): subsection_number = subsection['SectionNumber'] subsection_title = subsection['Title'] markdown += f"

{subsection_number} {subsection_title}

\n\n" markdown += f"
\n\n{subsection['Content']}\n\n
\n\n" markdown += "
" return markdown router = APIRouter() class DocumentRequest(BaseModel): query: str class JsonDocumentResponse(BaseModel): json_document: Dict class MarkdownDocumentRequest(BaseModel): json_document: Dict query: str async def generate_document_stream(document_generator: DocumentGenerator, document_outline: Dict, query: str): document_generator.document_outline = document_outline db_manager = DatabaseManager() overall_objective = query document_layout = json.dumps(document_generator.document_outline, indent=2) document_generator.content_messages = [ { "role": "system", "content": DOCUMENT_SECTION_PROMPT_SYSTEM.format( overall_objective=overall_objective, document_layout=document_layout ) } ] for section in document_generator.document_outline["Document"].get("Sections", []): section_title = section.get("Title", "") section_number = section.get("SectionNumber", "") content_instruction = section.get("Content", "") logging.info(f"Generating content for section: {section_title}") content = document_generator.generate_content(section_title, content_instruction, section_number) section["Content"] = content yield json.dumps({ "type": "document_section", "content": { "section_number": section_number, "section_title": section_title, "content": content } }, ensure_ascii=False) + "\n" for subsection in section.get("Subsections", []): subsection_title = subsection.get("Title", "") subsection_number = subsection.get("SectionNumber", "") subsection_content_instruction = subsection.get("Content", "") logging.info(f"Generating content for subsection: {subsection_title}") content = document_generator.generate_content(subsection_title, subsection_content_instruction, subsection_number) subsection["Content"] = content yield json.dumps({ "type": "document_section", "content": { "section_number": subsection_number, "section_title": subsection_title, "content": content } }, ensure_ascii=False) + "\n" markdown_document = MarkdownConverter.convert_to_markdown(document_generator.document_outline["Document"]) yield json.dumps({ "type": "complete_document", "content": markdown_document, }, ensure_ascii=False) + "\n" yield json.dumps({ "type": "complete_document", "json": document_generator.document_outline }, ensure_ascii=False) + "\n" db_manager.update_database("elevatics", query, markdown_document) @cache(expire=600*24*7) @router.post("/generate-document/json", response_model=JsonDocumentResponse) async def generate_document_outline_endpoint(request: DocumentRequest): ai_client = AIClient() document_generator = DocumentGenerator(ai_client) try: json_document = document_generator.generate_document_outline(request.query) if json_document is None: raise HTTPException(status_code=500, detail="Failed to generate a valid document outline") return JsonDocumentResponse(json_document=json_document) except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @router.post("/generate-document/markdown-stream") async def generate_markdown_document_stream_endpoint(request: MarkdownDocumentRequest): ai_client = AIClient() document_generator = DocumentGenerator(ai_client) async def stream_generator(): try: async for chunk in generate_document_stream(document_generator, request.json_document, request.query): yield chunk except Exception as e: yield json.dumps({ "type": "error", "content": str(e) }) + "\n" return StreamingResponse(stream_generator(), media_type="text/plain") ########################################### class MarkdownDocumentResponse(BaseModel): markdown_document: str @router.post("/generate-document-test", response_model=MarkdownDocumentResponse) async def test_generate_document_endpoint(request: DocumentRequest): try: # Load JSON document from file json_path = os.path.join("output/document_generator", "ai-chatbot-prd.json") with open(json_path, "r") as json_file: json_document = json.load(json_file) # Load Markdown document from file md_path = os.path.join("output/document_generator", "ai-chatbot-prd.md") with open(md_path, "r") as md_file: markdown_document = md_file.read() return MarkdownDocumentResponse(markdown_document=markdown_document) except FileNotFoundError: raise HTTPException(status_code=404, detail="Test files not found") except json.JSONDecodeError: raise HTTPException(status_code=500, detail="Error parsing JSON file") except Exception as e: raise HTTPException(status_code=500, detail=str(e)) class CacheTestResponse(BaseModel): result: str execution_time: float @router.get("/test-cache/{test_id}", response_model=CacheTestResponse) @cache(expire=60) # Cache for 1 minute async def test_cache(test_id: int): start_time = time.time() # Simulate some time-consuming operation await asyncio.sleep(2) result = f"Test result for ID: {test_id}" end_time = time.time() execution_time = end_time - start_time return CacheTestResponse( result=result, execution_time=execution_time )