Spaces:
Sleeping
Sleeping
import os | |
from _utils.LLMs.LLM_class import LLM | |
from _utils.gerar_relatorio_modelo_usuario.utils import ( | |
get_response_from_auxiliar_contextual_prompt, | |
validate_many_chunks_in_one_request, | |
) | |
from typing import Any, List, Dict, Tuple, Optional, cast | |
from anthropic import Anthropic, AsyncAnthropic | |
import logging | |
from langchain.schema import Document | |
from llama_index import Document as Llama_Index_Document | |
import asyncio | |
from typing import List | |
from dataclasses import dataclass | |
from _utils.gerar_relatorio_modelo_usuario.llm_calls import aclaude_answer, agemini_answer, agpt_answer | |
from _utils.gerar_relatorio_modelo_usuario.prompts import contextual_prompt | |
from _utils.models.gerar_relatorio import ( | |
ContextualizedChunk, | |
DocumentChunk, | |
RetrievalConfig, | |
) | |
from langchain_core.messages import HumanMessage | |
lista_contador = [] | |
class ContextualRetriever: | |
def __init__(self, config: RetrievalConfig, claude_context_model: str): | |
self.config = config | |
self.logger = logging.getLogger(__name__) | |
self.bm25 = None | |
self.claude_context_model = claude_context_model | |
self.claude_api_key = os.environ.get("CLAUDE_API_KEY", "") | |
self.claude_client = AsyncAnthropic(api_key=self.claude_api_key) | |
# self.claude_client = Anthropic(api_key=claude_api_key) | |
def getAllDocumentsIds(self, lista_com_20_chunks: List[DocumentChunk]): | |
contador = 1 | |
all_chunks_contents = "" | |
all_document_ids = [] | |
for chunk in lista_com_20_chunks: | |
all_chunks_contents += f"\n\nCHUNK {contador}:\n" | |
all_chunks_contents += chunk.content | |
pattern = r"Num\. (\d+)" | |
import re | |
match = re.search(pattern, chunk.content) | |
if match: | |
number = match.group(1) # Extract the number | |
else: | |
number = 0 | |
all_document_ids.append(int(number)) | |
contador += 1 | |
return all_chunks_contents, all_document_ids | |
def get_info_from_validated_chunks(self, matches): | |
result = [ | |
[int(doc_id), title.strip(), content.strip()] | |
for doc_id, title, content in matches | |
] | |
return result | |
async def llm_call_uma_lista_de_chunks( | |
self, lista_com_20_chunks: List[DocumentChunk], resumo_auxiliar | |
) -> List[List[Any]]: | |
"""Generate contextual description using ChatOpenAI""" | |
all_chunks_contents, all_document_ids = self.getAllDocumentsIds( | |
lista_com_20_chunks | |
) | |
try: | |
print("\n\nCOMEÇOU A REQUISIÇÃO") | |
prompt = contextual_prompt( | |
resumo_auxiliar, all_chunks_contents, len(lista_com_20_chunks) | |
) | |
for attempt in range(4): | |
if attempt != 0: | |
print("------------- FORMATAÇÃO DO CONTEXTUAL INCORRETA - TENTANDO NOVAMENTE -------------") | |
print( | |
f"TENTATIVA FORMATAÇÃO CHUNKS NÚMERO {attempt + 1}" | |
) | |
print("COMEÇANDO UMA REQUISIÇÃO DO CONTEXTUAL") | |
# raw_response = await agpt_answer(prompt) | |
# raw_response = await agemini_answer(prompt, "gemini-2.0-flash-lite-preview-02-05") | |
raw_response = await agemini_answer(prompt, "gemini-2.0-flash-lite") | |
print("TERMINOU UMA REQUISIÇÃO DO CONTEXTUAL") | |
response = cast(str, raw_response) | |
# response = await llms.deepseek().ainvoke([HumanMessage(content=prompt)]) | |
# return cast(str, response.content) | |
matches = validate_many_chunks_in_one_request( | |
response, all_document_ids | |
) | |
if matches: | |
return self.get_info_from_validated_chunks(matches) | |
raise ValueError(f"FORMATAÇÃO DOS CHUNKS FOI INVÁLIDA: {response}") | |
except Exception as e: | |
self.logger.error(f"Context generation failed for chunks .... : {str(e)}") | |
return [[""]] | |
async def contextualize_uma_lista_de_chunks( | |
self, lista_com_20_chunks: List[DocumentChunk], response_auxiliar_summary | |
): | |
lista_contador.append(0) | |
print("contador: ", len(lista_contador)) | |
result = await self.llm_call_uma_lista_de_chunks( | |
lista_com_20_chunks, response_auxiliar_summary | |
) | |
lista_chunks: List[ContextualizedChunk] = [] | |
try: | |
for index, chunk in enumerate(lista_com_20_chunks): | |
lista_chunks.append( | |
ContextualizedChunk( | |
contextual_summary=result[index][2], | |
content=chunk.content, | |
page_number=chunk.page_number, | |
id_do_processo=int(result[index][0]), | |
chunk_id=chunk.chunk_id, | |
start_char=chunk.start_char, | |
end_char=chunk.end_char, | |
context=result[index][1], | |
) | |
) | |
except BaseException as e : | |
print(e) | |
print("\nERRO DO CONTEXTUAL") | |
print('\n\nresult', result) | |
return lista_chunks | |
async def contextualize_all_chunks( | |
self, | |
all_PDFs_chunks: List[DocumentChunk], | |
response_auxiliar_summary, | |
) -> List[ContextualizedChunk]: | |
"""Add context to all chunks""" | |
lista_de_listas_cada_com_20_chunks = [ | |
all_PDFs_chunks[i : i + 20] for i in range(0, len(all_PDFs_chunks), 20) | |
] | |
async with asyncio.TaskGroup() as tg: | |
tasks = [ | |
tg.create_task( | |
self.contextualize_uma_lista_de_chunks( | |
lista_com_20_chunks, | |
response_auxiliar_summary, | |
) | |
) | |
for lista_com_20_chunks in lista_de_listas_cada_com_20_chunks | |
] | |
# contextualized_chunks = [task.result() for task in tasks] | |
contextualized_chunks = [] | |
for task in tasks: | |
contextualized_chunks = contextualized_chunks + task.result() | |
return contextualized_chunks | |
# Código comentado abaixo é para ler as páginas ao redor da página atual do chunk | |
# page_content = "" | |
# for i in range( | |
# max(0, chunk.page_number - 1), | |
# min(len(single_page_text), chunk.page_number + 2), | |
# ): | |
# page_content += single_page_text[i].page_content if single_page_text[i] else "" | |