# File: prompts.py DOCUMENT_OUTLINE_PROMPT_SYSTEM = """You are a document generator. Provide the outline of the document requested in in JSON format. Include sections and subsections if required. Use the "Content" field to provide a specific prompt or instruction for generating content for that particular section or subsection. OUTPUT IN FOLLOWING JSON FORMAT enclosed in tags { "Document": { "Title": "Document Title", "Author": "Author Name", "Date": "YYYY-MM-DD", "Version": "1.0", "Sections": [ { "SectionNumber": "1", "Title": "Section Title", "Content": "Specific prompt or instruction for generating content for this section", "Subsections": [ { "SectionNumber": "1.1", "Title": "Subsection Title", "Content": "Specific prompt or instruction for generating content for this subsection" } ] } ] } } """ DOCUMENT_OUTLINE_PROMPT_USER = """{query}""" DOCUMENT_SECTION_PROMPT_SYSTEM = """You are a document generator, You need to output only the content requested in the section in the prompt. FORMAT YOUR OUTPUT AS MARKDOWN ENCLOSED IN tags {overall_objective} {document_layout}""" DOCUMENT_SECTION_PROMPT_USER = """Output the content for the section "{section_or_subsection_title}" formatted as markdown. Follow this instruction: {content_instruction}""" # File: app.py import os import json import re import time import asyncio from typing import List, Dict, Optional, Any, Callable from openai import OpenAI import logging import functools from fastapi import APIRouter, HTTPException from pydantic import BaseModel from fastapi_cache.decorator import cache #from prompts import * logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) def log_execution(func: Callable) -> Callable: @functools.wraps(func) def wrapper(*args: Any, **kwargs: Any) -> Any: logger.info(f"Executing {func.__name__}") try: result = func(*args, **kwargs) logger.info(f"{func.__name__} completed successfully") return result except Exception as e: logger.error(f"Error in {func.__name__}: {e}") raise return wrapper class AIClient: def __init__(self): self.client = OpenAI( base_url="https://openrouter.ai/api/v1", api_key="sk-or-v1-"+os.environ['OPENROUTER_API_KEY'] ) @log_execution def generate_response( self, messages: List[Dict[str, str]], model: str = "openai/gpt-4o-mini", max_tokens: int = 32000 ) -> Optional[str]: if not messages: return None response = self.client.chat.completions.create( model=model, messages=messages, max_tokens=max_tokens, stream=False ) return response.choices[0].message.content class DocumentGenerator: def __init__(self, ai_client: AIClient): self.ai_client = ai_client self.document_outline = None self.content_messages = [] @staticmethod def extract_between_tags(text: str, tag: str) -> str: pattern = f"<{tag}>(.*?)" match = re.search(pattern, text, re.DOTALL) return match.group(1).strip() if match else "" @staticmethod def remove_duplicate_title(content: str, title: str, section_number: str) -> str: patterns = [ rf"^#+\s*{re.escape(section_number)}(?:\s+|\s*:\s*|\.\s*){re.escape(title)}", rf"^#+\s*{re.escape(title)}", rf"^{re.escape(section_number)}(?:\s+|\s*:\s*|\.\s*){re.escape(title)}", rf"^{re.escape(title)}", ] for pattern in patterns: content = re.sub(pattern, "", content, flags=re.MULTILINE | re.IGNORECASE) return content.lstrip() @log_execution def generate_document_outline(self, query: str, max_retries: int = 3) -> Optional[Dict]: messages = [ {"role": "system", "content": DOCUMENT_OUTLINE_PROMPT_SYSTEM}, {"role": "user", "content": DOCUMENT_OUTLINE_PROMPT_USER.format(query=query)} ] for attempt in range(max_retries): outline_response = self.ai_client.generate_response(messages, model="openai/gpt-4o") outline_json_text = self.extract_between_tags(outline_response, "output") try: self.document_outline = json.loads(outline_json_text) return self.document_outline except json.JSONDecodeError as e: if attempt < max_retries - 1: logger.warning(f"Failed to parse JSON (attempt {attempt + 1}): {e}") logger.info("Retrying...") else: logger.error(f"Failed to parse JSON after {max_retries} attempts: {e}") return None @log_execution def generate_content(self, title: str, content_instruction: str, section_number: str) -> str: self.content_messages.append({ "role": "user", "content": DOCUMENT_SECTION_PROMPT_USER.format( section_or_subsection_title=title, content_instruction=content_instruction ) }) section_response = self.ai_client.generate_response(self.content_messages) content = self.extract_between_tags(section_response, "response") content = self.remove_duplicate_title(content, title, section_number) self.content_messages.append({ "role": "assistant", "content": section_response }) return content @log_execution def generate_document(self, query: str) -> Dict: self.generate_document_outline(query) if self.document_outline is None: raise ValueError("Failed to generate a valid document outline") overall_objective = query document_layout = json.dumps(self.document_outline, indent=2) self.content_messages = [ { "role": "system", "content": DOCUMENT_SECTION_PROMPT_SYSTEM.format( overall_objective=overall_objective, document_layout=document_layout ) } ] for section in self.document_outline["Document"].get("Sections", []): section_title = section.get("Title", "") section_number = section.get("SectionNumber", "") content_instruction = section.get("Content", "") logger.info(f"Generating content for section: {section_title}") section["Content"] = self.generate_content(section_title, content_instruction, section_number) for subsection in section.get("Subsections", []): subsection_title = subsection.get("Title", "") subsection_number = subsection.get("SectionNumber", "") subsection_content_instruction = subsection.get("Content", "") logger.info(f"Generating content for subsection: {subsection_title}") subsection["Content"] = self.generate_content(subsection_title, subsection_content_instruction, subsection_number) return self.document_outline class MarkdownConverter: @staticmethod def slugify(text: str) -> str: return re.sub(r'\W+', '-', text.lower()) @classmethod def generate_toc(cls, sections: List[Dict]) -> str: toc = "
\n\n" toc += "

Table of Contents

\n\n" toc += "\n\n" return toc @classmethod def convert_to_markdown(cls, document: Dict) -> str: # First page with centered content markdown = "
\n\n" markdown += f"

{document['Title']}

\n\n" markdown += f"

By {document['Author']}

\n\n" markdown += f"

Version {document['Version']} | {document['Date']}

\n\n" markdown += "
\n\n" # Table of Contents on the second page markdown += cls.generate_toc(document['Sections']) # Main content markdown += "
\n\n" for section in document['Sections']: markdown += "
\n\n" section_number = section['SectionNumber'] section_title = section['Title'] markdown += f"

{section_number}. {section_title}

\n\n" markdown += f"
\n\n{section['Content']}\n\n
\n\n" for subsection in section.get('Subsections', []): subsection_number = subsection['SectionNumber'] subsection_title = subsection['Title'] markdown += f"

{subsection_number} {subsection_title}

\n\n" markdown += f"
\n\n{subsection['Content']}\n\n
\n\n" markdown += "
" return markdown router = APIRouter() class DocumentRequest(BaseModel): query: str class DocumentResponse(BaseModel): json_document: Dict markdown_document: str @cache(expire=600*24*7) @router.post("/generate-document", response_model=DocumentResponse) async def generate_document_endpoint(request: DocumentRequest): ai_client = AIClient() document_generator = DocumentGenerator(ai_client) try: # Generate the document json_document = document_generator.generate_document(request.query) # Convert to Markdown markdown_document = MarkdownConverter.convert_to_markdown(json_document["Document"]) return DocumentResponse( json_document=json_document, markdown_document=markdown_document ) except Exception as e: raise HTTPException(status_code=500, detail=str(e)) class CacheTestResponse(BaseModel): result: str execution_time: float @router.get("/test-cache/{test_id}", response_model=CacheTestResponse) @cache(expire=60) # Cache for 1 minute async def test_cache(test_id: int): start_time = time.time() # Simulate some time-consuming operation await asyncio.sleep(2) result = f"Test result for ID: {test_id}" end_time = time.time() execution_time = end_time - start_time return CacheTestResponse( result=result, execution_time=execution_time )