from haystack.components.generators import OpenAIGenerator from haystack.utils import Secret from haystack.components.builders.prompt_builder import PromptBuilder from haystack.components.routers import ConditionalRouter from haystack import Pipeline # from haystack.components.writers import DocumentWriter from haystack.components.embedders import SentenceTransformersTextEmbedder #, SentenceTransformersDocumentEmbedder # from haystack.components.preprocessors import DocumentSplitter # from haystack.components.converters.txt import TextFileToDocument # from haystack.components.preprocessors import DocumentCleaner from haystack_integrations.document_stores.chroma import ChromaDocumentStore from haystack_integrations.components.retrievers.chroma import ChromaEmbeddingRetriever # from haystack.document_stores.in_memory import InMemoryDocumentStore # from haystack.components.retrievers import InMemoryEmbeddingRetriever import gradio as gr embedding_model = "Alibaba-NLP/gte-multilingual-base" ######################## ####### Indexing ####### ######################## # Skipped: now using Chroma # In memory version for now # document_store = InMemoryDocumentStore(embedding_similarity_function="cosine") # converter = TextFileToDocument() # cleaner = DocumentCleaner() # splitter = DocumentSplitter(split_by="word", split_length=200, split_overlap=100) # embedder = SentenceTransformersDocumentEmbedder(model=embedding_model, # trust_remote_code=True) # writer = DocumentWriter(document_store=document_store) # indexing = Pipeline() # indexing.add_component("converter", converter) # indexing.add_component("cleaner", cleaner) # indexing.add_component("splitter", splitter) # indexing.add_component("embedder", embedder) # indexing.add_component("writer", writer) # indexing.connect("converter", "cleaner") # indexing.connect("cleaner", "splitter") # indexing.connect("splitter", "embedder") # indexing.connect("embedder", "writer") # indexing.run({"sources": ["knowledge-plain.txt"]}) # Chroma version (no support for overlaps in documents) # document_store = ChromaDocumentStore(persist_path="vstore_4012") document_store = ChromaDocumentStore( persist_path="vstore_4012" ) ################################## ####### Answering pipeline ####### ################################## no_answer_message = ( "I'm not allowed to answer this question. Please ask something related to " "APIs access in accordance DSA’s transparency and data-sharing provisions. " "Is there anything else I can do for you? " ) relevance_prompt_template = """ Classify whether this user is asking for something related to social media APIs, the Digital Services Act (DSA), or any topic related to online platforms’ compliance with legal and data-sharing frameworks. Relevant topics include: - Social media API access - Data transparency - Compliance with DSA provisions - Online platform regulations Here is their message: {{query}} Here are the two previous messages. ONLY refer to these if the above message refers previous ones. {% for message in user_history[-2:] %} * {{message["content"]}} {% endfor %} Instructions: - Respond with “YES” if the query pertains to any of the relevant topics listed above and not mixed with off-topic content. - Respond with “NO” if the query is off-topic and does not relate to the topics listed above. Examples: - Query: "How does the DSA affect API usage?" - Response: "YES" - Query: "How to make a pancake with APIs?" - Response: "NO" """ routes = [ { "condition": "{{'YES' in replies[0]}}", "output": "{{query}}", "output_name": "query", "output_type": str, }, { "condition": "{{'NO' in replies[0]}}", "output": no_answer_message, "output_name": "no_answer", "output_type": str, } ] query_prompt_template = """ Conversation history: {{conv_history}} Here is what the user has requested: {{query}} Instructions: - Craft a concise, short informative answer to the user's request using the information provided below. - Synthesize the key points into a seamless response that appears as your own expert knowledge. - Avoid direct quotes or explicit references to the documents. - You are directly answering the user's query. Relevant Information: {% for document in documents %} - {{ document.content }} {% endfor %} """ def setup_generator(model_name, api_key_env_var="GROQ_API_KEY", max_tokens=8192): return OpenAIGenerator( api_key=Secret.from_env_var(api_key_env_var), api_base_url="https://api.groq.com/openai/v1", model=model_name, generation_kwargs={"max_tokens": max_tokens} ) llm = setup_generator("llama3-8b-8192", max_tokens=30) llm2 = setup_generator("llama3-8b-8192") embedder = SentenceTransformersTextEmbedder(model=embedding_model, trust_remote_code=True) retriever = ChromaEmbeddingRetriever(document_store) router = ConditionalRouter(routes=routes) prompt_builder = PromptBuilder(template=relevance_prompt_template) prompt_builder2 = PromptBuilder(template=query_prompt_template) answer_query = Pipeline() answer_query.add_component("prompt_builder", prompt_builder) answer_query.add_component("llm", llm) answer_query.add_component("router", router) answer_query.add_component("embedder", embedder) answer_query.add_component("retriever", retriever) answer_query.add_component("prompt_builder2", prompt_builder2) answer_query.add_component("llm2", llm2) answer_query.connect("prompt_builder", "llm") answer_query.connect("llm", "router") answer_query.connect("router.query", "embedder") answer_query.connect("embedder", "retriever") answer_query.connect("retriever", "prompt_builder2") answer_query.connect("prompt_builder2", "llm2") answer_query.warm_up() ########################## ####### Gradio app ####### ########################## def chat(message, history): """ Chat function for Gradio. Uses the pipeline to produce next answer. """ conv_history = "\n\n".join([f"{message["role"]}: {message["content"]}" for message in history[-2:]]) user_history = [message for message in history if message["role"] == "user"] results = answer_query.run({"user_history": user_history, "query": message, "conv_history": conv_history, "top_k":3}, include_outputs_from=["retriever"]) print(results["retriever"]["documents"]) if "llm2" in results: answer = results["llm2"]["replies"][0] elif "router" in results and "no_answer" in results["router"]: answer = results["router"]["no_answer"] else: answer = "Sorry, a mistake occured" return answer if __name__ == "__main__": print("length of document store: ", document_store.count_documents()) gr.ChatInterface(chat, type="messages").launch()