import os from _utils.LLMs.LLM_class import LLM from _utils.gerar_relatorio_modelo_usuario.utils import ( get_response_from_auxiliar_contextual_prompt, validate_many_chunks_in_one_request, ) from typing import Any, List, Dict, Tuple, Optional, cast from anthropic import Anthropic, AsyncAnthropic import logging from langchain.schema import Document from llama_index import Document as Llama_Index_Document import asyncio from typing import List from dataclasses import dataclass from _utils.gerar_relatorio_modelo_usuario.llm_calls import aclaude_answer, agemini_answer, agpt_answer from _utils.gerar_relatorio_modelo_usuario.prompts import contextual_prompt from _utils.models.gerar_relatorio import ( ContextualizedChunk, DocumentChunk, RetrievalConfig, ) from langchain_core.messages import HumanMessage lista_contador = [] class ContextualRetriever: def __init__(self, config: RetrievalConfig, claude_context_model: str): self.config = config self.logger = logging.getLogger(__name__) self.bm25 = None self.claude_context_model = claude_context_model self.claude_api_key = os.environ.get("CLAUDE_API_KEY", "") self.claude_client = AsyncAnthropic(api_key=self.claude_api_key) # self.claude_client = Anthropic(api_key=claude_api_key) def getAllDocumentsIds(self, lista_com_20_chunks: List[DocumentChunk]): contador = 1 all_chunks_contents = "" all_document_ids = [] for chunk in lista_com_20_chunks: all_chunks_contents += f"\n\nCHUNK {contador}:\n" all_chunks_contents += chunk.content pattern = r"Num\. (\d+)" import re match = re.search(pattern, chunk.content) if match: number = match.group(1) # Extract the number else: number = 0 all_document_ids.append(int(number)) contador += 1 return all_chunks_contents, all_document_ids def get_info_from_validated_chunks(self, matches): result = [ [int(doc_id), title.strip(), content.strip()] for doc_id, title, content in matches ] return result async def llm_call_uma_lista_de_chunks( self, lista_com_20_chunks: List[DocumentChunk], resumo_auxiliar ) -> List[List[Any]]: """Generate contextual description using ChatOpenAI""" all_chunks_contents, all_document_ids = self.getAllDocumentsIds( lista_com_20_chunks ) try: print("\n\nCOMEÇOU A REQUISIÇÃO") prompt = contextual_prompt( resumo_auxiliar, all_chunks_contents, len(lista_com_20_chunks) ) for attempt in range(4): if attempt != 0: print("------------- FORMATAÇÃO DO CONTEXTUAL INCORRETA - TENTANDO NOVAMENTE -------------") print( f"TENTATIVA FORMATAÇÃO CHUNKS NÚMERO {attempt + 1}" ) print("COMEÇANDO UMA REQUISIÇÃO DO CONTEXTUAL") # raw_response = await agpt_answer(prompt) # raw_response = await agemini_answer(prompt, "gemini-2.0-flash-lite-preview-02-05") raw_response = await agemini_answer(prompt, "gemini-2.0-flash-lite") print("TERMINOU UMA REQUISIÇÃO DO CONTEXTUAL") response = cast(str, raw_response) # response = await llms.deepseek().ainvoke([HumanMessage(content=prompt)]) # return cast(str, response.content) matches = validate_many_chunks_in_one_request( response, all_document_ids ) if matches: return self.get_info_from_validated_chunks(matches) raise ValueError(f"FORMATAÇÃO DOS CHUNKS FOI INVÁLIDA: {response}") except Exception as e: self.logger.error(f"Context generation failed for chunks .... : {str(e)}") return [[""]] async def contextualize_uma_lista_de_chunks( self, lista_com_20_chunks: List[DocumentChunk], response_auxiliar_summary ): lista_contador.append(0) print("contador: ", len(lista_contador)) result = await self.llm_call_uma_lista_de_chunks( lista_com_20_chunks, response_auxiliar_summary ) lista_chunks: List[ContextualizedChunk] = [] try: for index, chunk in enumerate(lista_com_20_chunks): lista_chunks.append( ContextualizedChunk( contextual_summary=result[index][2], content=chunk.content, page_number=chunk.page_number, id_do_processo=int(result[index][0]), chunk_id=chunk.chunk_id, start_char=chunk.start_char, end_char=chunk.end_char, context=result[index][1], ) ) except BaseException as e : print(e) print("\nERRO DO CONTEXTUAL") print('\n\nresult', result) return lista_chunks async def contextualize_all_chunks( self, all_PDFs_chunks: List[DocumentChunk], response_auxiliar_summary, ) -> List[ContextualizedChunk]: """Add context to all chunks""" lista_de_listas_cada_com_20_chunks = [ all_PDFs_chunks[i : i + 20] for i in range(0, len(all_PDFs_chunks), 20) ] async with asyncio.TaskGroup() as tg: tasks = [ tg.create_task( self.contextualize_uma_lista_de_chunks( lista_com_20_chunks, response_auxiliar_summary, ) ) for lista_com_20_chunks in lista_de_listas_cada_com_20_chunks ] # contextualized_chunks = [task.result() for task in tasks] contextualized_chunks = [] for task in tasks: contextualized_chunks = contextualized_chunks + task.result() return contextualized_chunks # Código comentado abaixo é para ler as páginas ao redor da página atual do chunk # page_content = "" # for i in range( # max(0, chunk.page_number - 1), # min(len(single_page_text), chunk.page_number + 2), # ): # page_content += single_page_text[i].page_content if single_page_text[i] else ""