\n\n"
for section in document['Sections']:
markdown += "
\n\n"
section_number = section['SectionNumber']
section_title = section['Title']
markdown += f"
{section_number}. {section_title}
\n\n"
markdown += f"
\n\n{section['Content']}\n\n
\n\n"
for subsection in section.get('Subsections', []):
subsection_number = subsection['SectionNumber']
subsection_title = subsection['Title']
markdown += f"
{subsection_number} {subsection_title}
\n\n"
markdown += f"
\n\n{subsection['Content']}\n\n
\n\n"
markdown += "
"
return markdown
router = APIRouter()
class JsonDocumentResponse(BaseModel):
json_document: Dict
class JsonDocumentRequest(BaseModel):
query: str
template: bool = False
images: Optional[List[UploadFile]] = None
documents: Optional[List[UploadFile]] = None
conversation_id: str = ""
class Config:
arbitrary_types_allowed = True
class MarkdownDocumentRequest(BaseModel):
json_document: Dict
query: str
conversation_id: str = ""
MESSAGE_DELIMITER = b"\n---DELIMITER---\n"
def yield_message(message):
message_json = json.dumps(message, ensure_ascii=False).encode('utf-8')
return message_json + MESSAGE_DELIMITER
async def generate_document_stream(document_generator: DocumentGenerator, document_outline: Dict, query: str, template: bool = False):
document_generator.document_outline = document_outline
db_manager = DatabaseManager()
overall_objective = query
document_layout = json.dumps(document_generator.document_outline, indent=2)
SECTION_PROMPT_SYSTEM = DOCUMENT_SECTION_PROMPT_SYSTEM if not template else DOCUMENT_TEMPLATE_SECTION_PROMPT_SYSTEM
document_generator.content_messages = [
{
"role": "system",
"content": SECTION_PROMPT_SYSTEM.format(
overall_objective=overall_objective,
document_layout=document_layout
)
}
]
for section in document_generator.document_outline["Document"].get("Sections", []):
section_title = section.get("Title", "")
section_number = section.get("SectionNumber", "")
content_instruction = section.get("Content", "")
logging.info(f"Generating content for section: {section_title}")
content = document_generator.generate_content(section_title, content_instruction, section_number, template)
section["Content"] = content
yield yield_message({
"type": "document_section",
"content": {
"section_number": section_number,
"section_title": section_title,
"content": content
}
})
for subsection in section.get("Subsections", []):
subsection_title = subsection.get("Title", "")
subsection_number = subsection.get("SectionNumber", "")
subsection_content_instruction = subsection.get("Content", "")
logging.info(f"Generating content for subsection: {subsection_title}")
content = document_generator.generate_content(subsection_title, subsection_content_instruction, subsection_number, template)
subsection["Content"] = content
yield yield_message({
"type": "document_section",
"content": {
"section_number": subsection_number,
"section_title": subsection_title,
"content": content
}
})
markdown_document = MarkdownConverter.convert_to_markdown(document_generator.document_outline["Document"])
yield yield_message({
"type": "complete_document",
"content": {
"markdown": markdown_document,
"json": document_generator.document_outline
},
});
db_manager.update_database("elevatics", query, markdown_document)
@router.post("/generate-document/markdown-stream")
async def generate_markdown_document_stream_endpoint(request: MarkdownDocumentRequest):
ai_client = AIClient()
document_generator = DocumentGenerator(ai_client)
async def stream_generator():
try:
async for chunk in generate_document_stream(document_generator, request.json_document, request.query, request.template):
yield chunk
except Exception as e:
yield yield_message({
"type": "error",
"content": str(e)
})
return StreamingResponse(stream_generator(), media_type="application/octet-stream")
@cache(expire=600*24*7)
@router.post("/generate-document/json", response_model=JsonDocumentResponse)
async def generate_document_outline_endpoint(
query: str,
template: bool = False,
images: Optional[List[UploadFile]] = File(None),
documents: Optional[List[UploadFile]] = File(None),
conversation_id: str = ""
):
request = JsonDocumentRequest(
query=query,
template=template,
images=images,
documents=documents,
conversation_id=conversation_id
)
ai_client = AIClient()
document_generator = DocumentGenerator(ai_client)
vision_tools = VisionTools(ai_client)
try:
image_context = ""
if request.images:
image_context = await vision_tools.extract_images_info(request.images)
json_document = document_generator.generate_document_outline(
request.query,
request.template,
image_context=image_context
)
if json_document is None:
raise HTTPException(status_code=500, detail="Failed to generate a valid document outline")
return JsonDocumentResponse(json_document=json_document)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
## OBSERVABILITY
from uuid import uuid4
import csv
from io import StringIO
class ObservationResponse(BaseModel):
observations: List[Dict]
def create_csv_response(observations: List[Dict]) -> StreamingResponse:
def iter_csv(data):
output = StringIO()
writer = csv.DictWriter(output, fieldnames=data[0].keys() if data else [])
writer.writeheader()
for row in data:
writer.writerow(row)
output.seek(0)
yield output.read()
headers = {
'Content-Disposition': 'attachment; filename="observations.csv"'
}
return StreamingResponse(iter_csv(observations), media_type="text/csv", headers=headers)
@router.get("/last-observations/{limit}")
async def get_last_observations(limit: int = 10, format: str = "json"):
observability_manager = LLMObservabilityManager()
try:
# Get all observations, sorted by created_at in descending order
all_observations = observability_manager.get_observations()
all_observations.sort(key=lambda x: x['created_at'], reverse=True)
# Get the last conversation_id
if all_observations:
last_conversation_id = all_observations[0]['conversation_id']
# Filter observations for the last conversation
last_conversation_observations = [
obs for obs in all_observations
if obs['conversation_id'] == last_conversation_id
][:limit]
if format.lower() == "csv":
return create_csv_response(last_conversation_observations)
else:
return ObservationResponse(observations=last_conversation_observations)
else:
if format.lower() == "csv":
return create_csv_response([])
else:
return ObservationResponse(observations=[])
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to retrieve observations: {str(e)}")
###########################################
class MarkdownDocumentResponse(BaseModel):
markdown_document: str
@router.post("/generate-document-test", response_model=MarkdownDocumentResponse)
async def test_generate_document_endpoint(request: MarkdownDocumentRequest):
try:
# Load JSON document from file
json_path = os.path.join("output/document_generator", "ai-chatbot-prd.json")
with open(json_path, "r") as json_file:
json_document = json.load(json_file)
# Load Markdown document from file
md_path = os.path.join("output/document_generator", "ai-chatbot-prd.md")
with open(md_path, "r") as md_file:
markdown_document = md_file.read()
return MarkdownDocumentResponse(markdown_document=markdown_document)
except FileNotFoundError:
raise HTTPException(status_code=404, detail="Test files not found")
except json.JSONDecodeError:
raise HTTPException(status_code=500, detail="Error parsing JSON file")
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
class CacheTestResponse(BaseModel):
result: str
execution_time: float
@router.get("/test-cache/{test_id}", response_model=CacheTestResponse)
@cache(expire=60) # Cache for 1 minute
async def test_cache(test_id: int):
start_time = time.time()
# Simulate some time-consuming operation
await asyncio.sleep(2)
result = f"Test result for ID: {test_id}"
end_time = time.time()
execution_time = end_time - start_time
return CacheTestResponse(
result=result,
execution_time=execution_time
)