\n\n"
for section in document['Sections']:
markdown += "
\n\n"
section_number = section['SectionNumber']
section_title = section['Title']
markdown += f"
{section_number}. {section_title}
\n\n"
markdown += f"
\n\n{section['Content']}\n\n
\n\n"
for subsection in section.get('Subsections', []):
subsection_number = subsection['SectionNumber']
subsection_title = subsection['Title']
markdown += f"
{subsection_number} {subsection_title}
\n\n"
markdown += f"
\n\n{subsection['Content']}\n\n
\n\n"
markdown += "
"
return markdown
router = APIRouter()
class JsonDocumentResponse(BaseModel):
json_document: Dict
# class JsonDocumentRequest(BaseModel):
# query: str
# template: bool = False
# images: Optional[List[UploadFile]] = File(None)
# documents: Optional[List[UploadFile]] = File(None)
# conversation_id: str = ""
class MarkdownDocumentRequest(BaseModel):
json_document: Dict
query: str
template: bool = False
conversation_id: str = None
MESSAGE_DELIMITER = b"\n---DELIMITER---\n"
def yield_message(message):
message_json = json.dumps(message, ensure_ascii=False).encode('utf-8')
return message_json + MESSAGE_DELIMITER
async def generate_document_stream(document_generator: DocumentGenerator, document_outline: Dict, query: str, template: bool = False):
document_generator.document_outline = document_outline
db_manager = DatabaseManager()
overall_objective = query
document_layout = json.dumps(document_generator.document_outline, indent=2)
SECTION_PROMPT_SYSTEM = DOCUMENT_SECTION_PROMPT_SYSTEM if not template else DOCUMENT_TEMPLATE_SECTION_PROMPT_SYSTEM
document_generator.content_messages = [
{
"role": "system",
"content": SECTION_PROMPT_SYSTEM.format(
overall_objective=overall_objective,
document_layout=document_layout
)
}
]
for section in document_generator.document_outline["Document"].get("Sections", []):
section_title = section.get("Title", "")
section_number = section.get("SectionNumber", "")
content_instruction = section.get("Content", "")
logging.info(f"Generating content for section: {section_title}")
content = document_generator.generate_content(section_title, content_instruction, section_number, template)
section["Content"] = content
yield yield_message({
"type": "document_section",
"content": {
"section_number": section_number,
"section_title": section_title,
"content": content
}
})
for subsection in section.get("Subsections", []):
subsection_title = subsection.get("Title", "")
subsection_number = subsection.get("SectionNumber", "")
subsection_content_instruction = subsection.get("Content", "")
logging.info(f"Generating content for subsection: {subsection_title}")
content = document_generator.generate_content(subsection_title, subsection_content_instruction, subsection_number, template)
subsection["Content"] = content
yield yield_message({
"type": "document_section",
"content": {
"section_number": subsection_number,
"section_title": subsection_title,
"content": content
}
})
markdown_document = MarkdownConverter.convert_to_markdown(document_generator.document_outline["Document"])
yield yield_message({
"type": "complete_document",
"content": {
"markdown": markdown_document,
"json": document_generator.document_outline
},
});
db_manager.update_database("elevatics", query, markdown_document)
@router.post("/generate-document/markdown-stream")
async def generate_markdown_document_stream_endpoint(request: MarkdownDocumentRequest):
ai_client = AIClient()
document_generator = DocumentGenerator(ai_client)
async def stream_generator():
try:
async for chunk in generate_document_stream(document_generator, request.json_document, request.query, request.template):
yield chunk
except Exception as e:
yield yield_message({
"type": "error",
"content": str(e)
})
return StreamingResponse(stream_generator(), media_type="application/octet-stream")
@cache(expire=600*24*7)
@router.post("/generate-document/json", response_model=JsonDocumentResponse)
async def generate_document_outline_endpoint(
query: str = Form(...),
template: bool = Form(False),
conversation_id: str = Form(...),
images: Optional[List[UploadFile]] = File(None),
documents: Optional[List[UploadFile]] = File(None)
):
ai_client = AIClient()
document_generator = DocumentGenerator(ai_client)
vision_tools = VisionTools(ai_client)
try:
image_context = ""
if images:
image_context = await vision_tools.extract_images_info(images)
json_document = document_generator.generate_document_outline(
query,
template,
image_context=image_context
)
if json_document is None:
raise HTTPException(status_code=500, detail="Failed to generate a valid document outline")
return JsonDocumentResponse(json_document=json_document)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
## OBSERVABILITY
from uuid import uuid4
import csv
from io import StringIO
class ObservationResponse(BaseModel):
observations: List[Dict]
def create_csv_response(observations: List[Dict]) -> StreamingResponse:
def iter_csv(data):
output = StringIO()
writer = csv.DictWriter(output, fieldnames=data[0].keys() if data else [])
writer.writeheader()
for row in data:
writer.writerow(row)
output.seek(0)
yield output.read()
headers = {
'Content-Disposition': 'attachment; filename="observations.csv"'
}
return StreamingResponse(iter_csv(observations), media_type="text/csv", headers=headers)
@router.get("/last-observations/{limit}")
async def get_last_observations(limit: int = 10, format: str = "json"):
observability_manager = LLMObservabilityManager()
try:
# Get all observations, sorted by created_at in descending order
all_observations = observability_manager.get_observations()
all_observations.sort(key=lambda x: x['created_at'], reverse=True)
# Get the last conversation_id
if all_observations:
last_conversation_id = all_observations[0]['conversation_id']
# Filter observations for the last conversation
last_conversation_observations = [
obs for obs in all_observations
if obs['conversation_id'] == last_conversation_id
][:limit]
if format.lower() == "csv":
return create_csv_response(last_conversation_observations)
else:
return ObservationResponse(observations=last_conversation_observations)
else:
if format.lower() == "csv":
return create_csv_response([])
else:
return ObservationResponse(observations=[])
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to retrieve observations: {str(e)}")
## TEST CACHE
class CacheItem(BaseModel):
key: str
value: str
@router.post("/set-cache")
async def set_cache(item: CacheItem):
try:
# Set the cache with a default expiration of 1 hour (3600 seconds)
await FastAPICache.get_backend().set(item.key, item.value, expire=3600)
return {"message": f"Cache set for key: {item.key}"}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to set cache: {str(e)}")
@router.get("/get-cache/{key}")
async def get_cache(key: str):
try:
value = await FastAPICache.get_backend().get(key)
if value is None:
raise HTTPException(status_code=404, detail=f"No cache found for key: {key}")
return {"key": key, "value": value}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to get cache: {str(e)}")