\n\n"
for section in document['Sections']:
markdown += "
\n\n"
section_number = section['SectionNumber']
section_title = section['Title']
markdown += f"
{section_number}. {section_title}
\n\n"
markdown += f"
\n\n{section['Content']}\n\n
\n\n"
for subsection in section.get('Subsections', []):
subsection_number = subsection['SectionNumber']
subsection_title = subsection['Title']
markdown += f"
{subsection_number} {subsection_title}
\n\n"
markdown += f"
\n\n{subsection['Content']}\n\n
\n\n"
markdown += "
"
return markdown
router = APIRouter()
class DocumentRequest(BaseModel):
query: str
class JsonDocumentResponse(BaseModel):
json_document: Dict
class MarkdownDocumentRequest(BaseModel):
json_document: Dict
query: str
async def generate_document_stream(document_generator: DocumentGenerator, document_outline: Dict, query: str):
document_generator.document_outline = document_outline
db_manager = DatabaseManager()
overall_objective = query
document_layout = json.dumps(document_generator.document_outline, indent=2)
document_generator.content_messages = [
{
"role": "system",
"content": DOCUMENT_SECTION_PROMPT_SYSTEM.format(
overall_objective=overall_objective,
document_layout=document_layout
)
}
]
for section in document_generator.document_outline["Document"].get("Sections", []):
section_title = section.get("Title", "")
section_number = section.get("SectionNumber", "")
content_instruction = section.get("Content", "")
logging.info(f"Generating content for section: {section_title}")
content = document_generator.generate_content(section_title, content_instruction, section_number)
section["Content"] = content
yield json.dumps({
"type": "document_section",
"content": {
"section_number": section_number,
"section_title": section_title,
"content": content
}
}, ensure_ascii=False) + "\n"
for subsection in section.get("Subsections", []):
subsection_title = subsection.get("Title", "")
subsection_number = subsection.get("SectionNumber", "")
subsection_content_instruction = subsection.get("Content", "")
logging.info(f"Generating content for subsection: {subsection_title}")
content = document_generator.generate_content(subsection_title, subsection_content_instruction, subsection_number)
subsection["Content"] = content
yield json.dumps({
"type": "document_section",
"content": {
"section_number": subsection_number,
"section_title": subsection_title,
"content": content
}
}, ensure_ascii=False) + "\n"
markdown_document = MarkdownConverter.convert_to_markdown(document_generator.document_outline["Document"])
yield json.dumps({
"type": "complete_document",
"content": markdown_document,
}, ensure_ascii=False) + "\n"
yield json.dumps({
"type": "complete_document",
"json": document_generator.document_outline
}, ensure_ascii=False) + "\n"
db_manager.update_database("elevatics", query, markdown_document)
@cache(expire=600*24*7)
@router.post("/generate-document/json", response_model=JsonDocumentResponse)
async def generate_document_outline_endpoint(request: DocumentRequest):
ai_client = AIClient()
document_generator = DocumentGenerator(ai_client)
try:
json_document = document_generator.generate_document_outline(request.query)
if json_document is None:
raise HTTPException(status_code=500, detail="Failed to generate a valid document outline")
return JsonDocumentResponse(json_document=json_document)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.post("/generate-document/markdown-stream")
async def generate_markdown_document_stream_endpoint(request: MarkdownDocumentRequest):
ai_client = AIClient()
document_generator = DocumentGenerator(ai_client)
async def stream_generator():
try:
async for chunk in generate_document_stream(document_generator, request.json_document, request.query):
yield chunk
except Exception as e:
yield json.dumps({
"type": "error",
"content": str(e)
}) + "\n"
return StreamingResponse(stream_generator(), media_type="text/plain")
###########################################
class MarkdownDocumentResponse(BaseModel):
markdown_document: str
@router.post("/generate-document-test", response_model=MarkdownDocumentResponse)
async def test_generate_document_endpoint(request: DocumentRequest):
try:
# Load JSON document from file
json_path = os.path.join("output/document_generator", "ai-chatbot-prd.json")
with open(json_path, "r") as json_file:
json_document = json.load(json_file)
# Load Markdown document from file
md_path = os.path.join("output/document_generator", "ai-chatbot-prd.md")
with open(md_path, "r") as md_file:
markdown_document = md_file.read()
return MarkdownDocumentResponse(markdown_document=markdown_document)
except FileNotFoundError:
raise HTTPException(status_code=404, detail="Test files not found")
except json.JSONDecodeError:
raise HTTPException(status_code=500, detail="Error parsing JSON file")
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
class CacheTestResponse(BaseModel):
result: str
execution_time: float
@router.get("/test-cache/{test_id}", response_model=CacheTestResponse)
@cache(expire=60) # Cache for 1 minute
async def test_cache(test_id: int):
start_time = time.time()
# Simulate some time-consuming operation
await asyncio.sleep(2)
result = f"Test result for ID: {test_id}"
end_time = time.time()
execution_time = end_time - start_time
return CacheTestResponse(
result=result,
execution_time=execution_time
)