oceansweep's picture
Update App_Function_Libraries/RAG/RAG_Libary_2.py
3655951 verified
raw
history blame
9.33 kB
# RAG_Library_2.py
# Description: This script contains the main RAG pipeline function and related functions for the RAG pipeline.
#
# Import necessary modules and functions
import configparser
import logging
import os
from typing import Dict, Any, List, Optional
# Local Imports
#from App_Function_Libraries.RAG.ChromaDB_Library import process_and_store_content, vector_search, chroma_client
from App_Function_Libraries.Article_Extractor_Lib import scrape_article
from App_Function_Libraries.DB.DB_Manager import add_media_to_database, search_db, get_unprocessed_media, \
fetch_keywords_for_media
from App_Function_Libraries.Utils.Utils import load_comprehensive_config
#
# 3rd-Party Imports
import openai
#
########################################################################################################################
#
# Functions:
# Initialize OpenAI client (adjust this based on your API key management)
openai.api_key = "your-openai-api-key"
# Get the directory of the current script
current_dir = os.path.dirname(os.path.abspath(__file__))
# Construct the path to the config file
config_path = os.path.join(current_dir, 'Config_Files', 'config.txt')
# Read the config file
config = configparser.ConfigParser()
# Read the configuration file
config.read('config.txt')
# RAG Search with keyword filtering
def enhanced_rag_pipeline(query: str, api_choice: str, keywords: str = None) -> Dict[str, Any]:
try:
# Load embedding provider from config, or fallback to 'openai'
embedding_provider = config.get('Embeddings', 'provider', fallback='openai')
# Log the provider used
logging.debug(f"Using embedding provider: {embedding_provider}")
# Process keywords if provided
keyword_list = [k.strip().lower() for k in keywords.split(',')] if keywords else []
logging.debug(f"enhanced_rag_pipeline - Keywords: {keyword_list}")
# Fetch relevant media IDs based on keywords if keywords are provided
relevant_media_ids = fetch_relevant_media_ids(keyword_list) if keyword_list else None
logging.debug(f"enhanced_rag_pipeline - relevant media IDs: {relevant_media_ids}")
# Perform vector search
vector_results = perform_vector_search(query, relevant_media_ids)
logging.debug(f"enhanced_rag_pipeline - Vector search results: {vector_results}")
# Perform full-text search
fts_results = perform_full_text_search(query, relevant_media_ids)
logging.debug(f"enhanced_rag_pipeline - Full-text search results: {fts_results}")
# Combine results
all_results = vector_results + fts_results
# FIXME
if not all_results:
logging.info(f"No results found. Query: {query}, Keywords: {keywords}")
return {
"answer": "I couldn't find any relevant information based on your query and keywords.",
"context": ""
}
# FIXME - Apply Re-Ranking of results here
apply_re_ranking = False
if apply_re_ranking:
# Implement re-ranking logic here
pass
# Extract content from results
context = "\n".join([result['content'] for result in all_results[:10]]) # Limit to top 10 results
logging.debug(f"Context length: {len(context)}")
logging.debug(f"Context: {context[:200]}")
# Generate answer using the selected API
answer = generate_answer(api_choice, context, query)
return {
"answer": answer,
"context": context
}
except Exception as e:
logging.error(f"Error in enhanced_rag_pipeline: {str(e)}")
return {
"answer": "An error occurred while processing your request.",
"context": ""
}
def generate_answer(api_choice: str, context: str, query: str) -> str:
logging.debug("Entering generate_answer function")
config = load_comprehensive_config()
logging.debug(f"Config sections: {config.sections()}")
prompt = f"Context: {context}\n\nQuestion: {query}"
if api_choice == "OpenAI":
from App_Function_Libraries.Summarization_General_Lib import summarize_with_openai
return summarize_with_openai(config['API']['openai_api_key'], prompt, "")
elif api_choice == "Anthropic":
from App_Function_Libraries.Summarization_General_Lib import summarize_with_anthropic
return summarize_with_anthropic(config['API']['anthropic_api_key'], prompt, "")
elif api_choice == "Cohere":
from App_Function_Libraries.Summarization_General_Lib import summarize_with_cohere
return summarize_with_cohere(config['API']['cohere_api_key'], prompt, "")
elif api_choice == "Groq":
from App_Function_Libraries.Summarization_General_Lib import summarize_with_groq
return summarize_with_groq(config['API']['groq_api_key'], prompt, "")
elif api_choice == "OpenRouter":
from App_Function_Libraries.Summarization_General_Lib import summarize_with_openrouter
return summarize_with_openrouter(config['API']['openrouter_api_key'], prompt, "")
elif api_choice == "HuggingFace":
from App_Function_Libraries.Summarization_General_Lib import summarize_with_huggingface
return summarize_with_huggingface(config['API']['huggingface_api_key'], prompt, "")
elif api_choice == "DeepSeek":
from App_Function_Libraries.Summarization_General_Lib import summarize_with_deepseek
return summarize_with_deepseek(config['API']['deepseek_api_key'], prompt, "")
elif api_choice == "Mistral":
from App_Function_Libraries.Summarization_General_Lib import summarize_with_mistral
return summarize_with_mistral(config['API']['mistral_api_key'], prompt, "")
elif api_choice == "Local-LLM":
from App_Function_Libraries.Local_Summarization_Lib import summarize_with_local_llm
return summarize_with_local_llm(config['API']['local_llm_path'], prompt, "")
elif api_choice == "Llama.cpp":
from App_Function_Libraries.Local_Summarization_Lib import summarize_with_llama
return summarize_with_llama(config['API']['llama_api_key'], prompt, "")
elif api_choice == "Kobold":
from App_Function_Libraries.Local_Summarization_Lib import summarize_with_kobold
return summarize_with_kobold(config['API']['kobold_api_key'], prompt, "")
elif api_choice == "Ooba":
from App_Function_Libraries.Local_Summarization_Lib import summarize_with_oobabooga
return summarize_with_oobabooga(config['API']['ooba_api_key'], prompt, "")
elif api_choice == "TabbyAPI":
from App_Function_Libraries.Local_Summarization_Lib import summarize_with_tabbyapi
return summarize_with_tabbyapi(config['API']['tabby_api_key'], prompt, "")
elif api_choice == "vLLM":
from App_Function_Libraries.Local_Summarization_Lib import summarize_with_vllm
return summarize_with_vllm(config['API']['vllm_api_key'], prompt, "")
elif api_choice == "ollama":
from App_Function_Libraries.Local_Summarization_Lib import summarize_with_ollama
return summarize_with_ollama(config['API']['ollama_api_key'], prompt, "")
else:
raise ValueError(f"Unsupported API choice: {api_choice}")
def perform_full_text_search(query: str, relevant_media_ids: List[str] = None) -> List[Dict[str, Any]]:
fts_results = search_db(query, ["content"], "", page=1, results_per_page=5)
filtered_fts_results = [
{
"content": result['content'],
"metadata": {"media_id": result['id']}
}
for result in fts_results
if relevant_media_ids is None or result['id'] in relevant_media_ids
]
return filtered_fts_results
def fetch_relevant_media_ids(keywords: List[str]) -> List[int]:
relevant_ids = set()
try:
for keyword in keywords:
media_ids = fetch_keywords_for_media(keyword)
relevant_ids.update(media_ids)
except Exception as e:
logging.error(f"Error fetching relevant media IDs: {str(e)}")
return list(relevant_ids)
# Example usage:
# 1. Initialize the system:
# create_tables(db) # Ensure FTS tables are set up
#
# 2. Create ChromaDB
# chroma_client = ChromaDBClient()
#
# 3. Create Embeddings
# Store embeddings in ChromaDB
# preprocess_all_content() or create_embeddings()
#
# 4. Perform RAG search across all content:
# result = rag_search("What are the key points about climate change?")
# print(result['answer'])
#
# (Extra)5. Perform RAG on a specific URL:
# result = rag_pipeline("https://example.com/article", "What is the main topic of this article?")
# print(result['answer'])
#
########################################################################################################################
############################################################################################################
#
# ElasticSearch Retriever
# https://github.com/langchain-ai/langchain/tree/44e3e2391c48bfd0a8e6a20adde0b6567f4f43c3/templates/rag-elasticsearch
#
# https://github.com/langchain-ai/langchain/tree/44e3e2391c48bfd0a8e6a20adde0b6567f4f43c3/templates/rag-self-query
#
# End of RAG_Library_2.py
############################################################################################################