import transformers import re from transformers import AutoConfig, AutoTokenizer, AutoModel, AutoModelForCausalLM import torch import gradio as gr import json import os import shutil import requests import lancedb import pandas as pd # Define the device device = "cuda" if torch.cuda.is_available() else "cpu" model_name = "PleIAs/Pleias-Rag" # Get Hugging Face token from environment variable hf_token = os.environ.get('HF_TOKEN') if not hf_token: raise ValueError("Please set the HF_TOKEN environment variable") # Initialize model and tokenizer tokenizer = AutoTokenizer.from_pretrained(model_name, token=hf_token) model = AutoModelForCausalLM.from_pretrained(model_name, token=hf_token) model.to(device) # Set tokenizer configuration tokenizer.eos_token = "<|answer_end|>" eos_token_id=tokenizer.eos_token_id tokenizer.pad_token = tokenizer.eos_token tokenizer.pad_token_id = 1 # Define variables temperature = 0.0 max_new_tokens = 1400 top_p = 0.95 repetition_penalty = 1.0 min_new_tokens = 700 early_stopping = False # Connect to the LanceDB database db = lancedb.connect("content19/lancedb_data") table = db.open_table("edunat19") def hybrid_search(text): results = table.search(text, query_type="hybrid").limit(5).to_pandas() document = [] document_html = [] for _, row in results.iterrows(): hash_id = str(row['hash']) title = row['section'] content = row['text'] document.append(f"<|source_start|><|source_id_start|>{hash_id}<|source_id_end|>{title}\n{content}<|source_end|>") document_html.append(f'

{hash_id} : {title}
{content}

') document = "\n".join(document) document_html = '
' + "".join(document_html) + "
" return document, document_html class pleiasBot: def __init__(self, system_prompt="Tu es Appli, un asistant de recherche qui donne des responses sourcées"): self.system_prompt = system_prompt def predict(self, user_message): fiches, fiches_html = hybrid_search(user_message) detailed_prompt = f"""<|query_start|>{user_message}<|query_end|>\n{fiches}\n<|source_analysis_start|>""" # Convert inputs to tensor input_ids = tokenizer.encode(detailed_prompt, return_tensors="pt").to(device) attention_mask = torch.ones_like(input_ids) try: output = model.generate( input_ids, attention_mask=attention_mask, max_new_tokens=max_new_tokens, do_sample=False, early_stopping=early_stopping, min_new_tokens=min_new_tokens, temperature=temperature, repetition_penalty=repetition_penalty, pad_token_id=tokenizer.pad_token_id, eos_token_id=tokenizer.eos_token_id ) # Decode the generated text generated_text = tokenizer.decode(output[0][len(input_ids[0]):]) # Split the text into analysis and answer sections parts = generated_text.split("<|source_analysis_end|>") if len(parts) == 2: analysis = parts[0].strip() answer = parts[1].replace("<|answer_start|>", "").replace("<|answer_end|>", "").strip() # Format with section titles using strong tags for extra emphasis formatted_text = f'
Analyse des sources
\n\n{analysis}\n\n
Réponse
\n\n{answer}' else: formatted_text = generated_text generated_text = '

Réponse

\n
' + format_references(formatted_text) + "
" fiches_html = '

Sources

\n' + fiches_html return generated_text, fiches_html except Exception as e: print(f"Error during generation: {str(e)}") import traceback traceback.print_exc() return None, None def format_references(text): ref_pattern = r'"([^"]+)"' parts = [] current_pos = 0 ref_number = 1 import re for match in re.finditer(ref_pattern, text): # Add text before the reference parts.append(text[current_pos:match.start()]) # Extract reference components ref_id = match.group(1) # The source ID ref_text = match.group(2).strip() # The reference text # Create tooltip HTML with source ID in bold tooltip_html = f'[{ref_number}]{ref_id}: {ref_text}' parts.append(tooltip_html) current_pos = match.end() ref_number += 1 # Add any remaining text parts.append(text[current_pos:]) return ''.join(parts) # Initialize the pleiasBot pleias_bot = pleiasBot() # CSS for styling css = """ .generation { margin-left: 2em; margin-right: 2em; } :target { background-color: #CCF3DF; } .source { float: left; max-width: 17%; margin-left: 2%; } .tooltip { position: relative; display: inline-block; color: #2563eb; cursor: pointer; } .tooltip .tooltiptext { visibility: hidden; background-color: #fff; color: #000; text-align: left; padding: 12px; border-radius: 6px; border: 1px solid #e5e7eb; box-shadow: 0 4px 6px -1px rgba(0, 0, 0, 0.1), 0 2px 4px -1px rgba(0, 0, 0, 0.06); position: absolute; z-index: 1; bottom: 125%; left: 50%; transform: translateX(-50%); min-width: 300px; max-width: 400px; white-space: normal; font-size: 0.9em; line-height: 1.4; } .tooltip:hover .tooltiptext { visibility: visible; } .tooltip .tooltiptext::after { content: ""; position: absolute; top: 100%; left: 50%; margin-left: -5px; border-width: 5px; border-style: solid; border-color: #fff transparent transparent transparent; } .section-title { font-weight: bold; font-size: 15px; margin-bottom: 1em; margin-top: 1em; } """ # Gradio interface def gradio_interface(user_message): response, sources = pleias_bot.predict(user_message) return response, sources # Create Gradio app demo = gr.Blocks(css=css + """ .logo { display: block; margin: 0 auto; width: 200px; height: 200px; margin-bottom: 1em; } """) with demo: # Add logo and title gr.HTML("""

pleias-RAG 1.0

""") with gr.Row(): with gr.Column(scale=2): text_input = gr.Textbox(label="Votre question ou votre instruction", lines=3) text_button = gr.Button("Interroger pleias-RAG") with gr.Column(scale=3): text_output = gr.HTML(label="La réponse du modèle") with gr.Row(): embedding_output = gr.HTML(label="Les sources utilisées") text_button.click(gradio_interface, inputs=text_input, outputs=[text_output, embedding_output]) # Launch the app if __name__ == "__main__": demo.launch()