import os import gradio as gr import torch from transformers import pipeline, StoppingCriteria, StoppingCriteriaList, MaxTimeCriteria, AutoTokenizer, AutoModelForCausalLM, PreTrainedTokenizer from openai import OpenAI from elasticsearch import Elasticsearch class MultiTokenEOSCriteria(StoppingCriteria): def __init__(self, sequence: str, tokenizer: PreTrainedTokenizer, initial_decoder_input_length: int, batch_size: int = 1) -> None: self.initial_decoder_input_length = initial_decoder_input_length self.done_tracker = [False] * batch_size self.sequence = sequence self.sequence_ids = tokenizer.encode(sequence, add_special_tokens=False) self.sequence_id_len = len(self.sequence_ids) + 2 self.tokenizer = tokenizer def __call__(self, input_ids, scores, **kwargs) -> bool: # For efficiency, we compare the last n tokens where n is the number of tokens in the stop_sequence lookback_ids_batch = input_ids[:, self.initial_decoder_input_length :] lookback_ids_batch = lookback_ids_batch[:, -self.sequence_id_len :] lookback_tokens_batch = self.tokenizer.batch_decode(lookback_ids_batch) for i, done in enumerate(self.done_tracker): if not done: self.done_tracker[i] = self.sequence in lookback_tokens_batch[i] return False not in self.done_tracker # Connect to Elasticsearch es = Elasticsearch(hosts=["https://data.neuralnoise.com:9200"], basic_auth=('elastic', os.environ['ES_PASSWORD']), verify_certs=False, ssl_show_warn=False) def search(query, index="pubmed", num_docs=3): """ Search the Elasticsearch index for the most relevant documents. """ docs = [] if num_docs > 0: print(f'Running query: {query}') es_request_body = { "query": { "match": { "content": query # Assuming documents have a 'content' field } }, "size": num_docs } response = es.options(request_timeout=60).search(index=index, body=es_request_body) # Extract and return the documents docs = [hit["_source"]["content"] for hit in response['hits']['hits']] print(f'Received {len(docs)} documents from index {index}') return docs def rag_pipeline(prompt, index="pubmed", num_docs=3, model_name="HuggingFaceH4/zephyr-7b-beta"): """ A simple RAG pipeline that retrieves documents and uses them to enrich the context for the LLM. """ num_docs = int(num_docs) # Retrieve documents docs = search(prompt, index=index, num_docs=num_docs) joined_docs = '\n\n'.join(docs) messages = [ { "role": "system", "content": f"You are an advanced medical support assistant, designed to help clinicians by providing quick access to medical information, guidelines, and evidence-based recommendations. Alongside your built-in knowledge, you have access to a curated set of documents retrieved from trustworthy sources such as Wikipedia and PubMed. These documents include up-to-date medical guidelines, research summaries, and clinical practice information. You should use these documents as a primary source of information to ensure your responses are based on the most current and credible evidence available. Your responses should be accurate, concise, and in full compliance with medical ethics. You must always remind users that your guidance does not substitute for professional medical advice, diagnosis, or treatment. Your tone should be professional, supportive, and respectful, recognizing the complexity of healthcare decisions and the importance of personalized patient care. While you can offer information and suggestions based on the documents provided and current medical knowledge, you must emphasize the importance of clinicians' expertise and judgment in making clinical decisions. Please append a newline only when you have finished answering.\n\nRetrieved documents from {index}:\n\n{joined_docs}" }, { "role": "user", "content": prompt } ] for message in messages: print('MSG', message) max_new_tokens = 1024 if model_name.startswith('openai/'): openai_model_name = model_name.split('/')[1] client = OpenAI() openai_res = client.chat.completions.create(model=openai_model_name, messages=messages, max_tokens=max_new_tokens, temperature=0) print('OAI_RESPONSE', openai_res) response = openai_res.choices[0].message.content.strip() else: model = AutoModelForCausalLM.from_pretrained(model_name, torch_dtype=torch.bfloat16, device_map="auto", low_cpu_mem_usage=True) tokenizer = AutoTokenizer.from_pretrained(model_name) # Load your language model from HuggingFace Transformers generator = pipeline("text-generation", model=model, tokenizer=tokenizer) tokenized_prompt = tokenizer.apply_chat_template(messages, tokenize=True) # Define the stopping criteria using MaxTimeCriteria stopping_criteria = StoppingCriteriaList([ # MaxTimeCriteria(32), MultiTokenEOSCriteria("\n", tokenizer, len(tokenized_prompt)) ]) # Define the generation_kwargs with stopping criteria generation_kwargs = { "max_new_tokens": max_new_tokens, "generation_kwargs": {"stopping_criteria": stopping_criteria}, "return_full_text": False } # Generate response using the HF LLM hf_response = generator(messages, **generation_kwargs) print('HF_RESPONSE', hf_response) response = hf_response[0]['generated_text'] # Return the generated text and the documents return response, joined_docs # Create the Gradio interface iface = gr.Interface(fn=rag_pipeline, inputs=[ gr.Textbox(label="Input Prompt", value="Are group 2 innate lymphoid cells (ILC2s) increased in chronic rhinosinusitis with nasal polyps or eosinophilia?"), gr.Dropdown(label="Index", choices=["pubmed", "wikipedia", "textbooks"], value="pubmed"), gr.Number(label="Number of Documents", value=3, step=1, minimum=0, maximum=10), gr.Dropdown(label="Model", choices=["HuggingFaceH4/zephyr-7b-beta", "meta-llama/Llama-2-7b-chat-hf", "meta-llama/Llama-2-13b-chat-hf", "meta-llama/Llama-2-70b-chat-hf", "openai/gpt-3.5-turbo"], value="HuggingFaceH4/zephyr-7b-beta") ], outputs=[ gr.Textbox(label="Generated Text"), gr.Textbox(label="Retrieved Documents") ], description="Retrieval-Augmented Generation Pipeline") # Launch the interface iface.launch()