Spaces:
Runtime error
Runtime error
import os | |
import json | |
import keras | |
from datasets import load_dataset | |
import tensorflow as tf | |
from huggingface_hub import login | |
import torch | |
from transformers import ( AutoTokenizer, AutoModelForCausalLM) | |
from sentence_transformers import SentenceTransformer | |
from typing import List, Dict, Union, Tuple | |
import faiss | |
import numpy as np | |
from datasets import Dataset | |
import torch.nn.functional as F | |
from torch.cuda.amp import autocast | |
import gc | |
from peft import ( LoraConfig, get_peft_model, prepare_model_for_kbit_training, TaskType, PeftModel) | |
from tqdm.auto import tqdm | |
from torch.utils.data import DataLoader | |
import logging | |
import wandb | |
from pathlib import Path | |
from typing import List, Dict, Union, Optional, Any | |
import torch.nn as nn | |
from dataclasses import dataclass, field | |
import time | |
import asyncio | |
import pytest | |
from unittest.mock import Mock, patch | |
from sklearn.metrics import classification_report, confusion_matrix | |
import gradio as gr | |
import matplotlib.pyplot as plt | |
from datetime import datetime | |
import requests | |
import pandas as pd | |
import seaborn as sns | |
import traceback | |
from matplotlib.gridspec import GridSpec | |
from google.colab import userdata | |
# Retrieve secrets securely from environment variables | |
kaggle_username = os.getenv("KAGGLE_USERNAME") | |
kaggle_key = os.getenv("KAGGLE_KEY") | |
hf_token = os.getenv("HF_TOKEN") | |
wandb_key = os.getenv("WANDB_API_KEY") | |
# Log in to Hugging Face | |
login(token=hf_token) | |
# Set up WandB if needed | |
wandb.login(key=wandb_key) | |
# Setup logging | |
logging.basicConfig( | |
level=logging.INFO, | |
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' | |
) | |
logger = logging.getLogger(__name__) | |
class MedicalConfig: | |
"""Enhanced configuration for medical chatbot""" | |
# LoRA parameters | |
LORA_WEIGHTS_PATH: str = "medical_lora_weights" | |
LORA_R: int = 16 | |
LORA_ALPHA: int = 32 | |
LORA_DROPOUT: float = 0.1 | |
LORA_TARGET_MODULES: List[str] = field(default_factory=lambda: ["q_proj", "v_proj", "k_proj", "o_proj"]) | |
# Training parameters | |
TRAINING_BATCH_SIZE: int = 4 | |
LEARNING_RATE: float = 2e-5 | |
NUM_EPOCHS: int = 3 | |
MAX_LENGTH: int = 2048 | |
INDEX_BATCH_SIZE: int = 32 | |
# Medical specific parameters | |
EMERGENCY_KEYWORDS: List[str] = field(default_factory=lambda: [ | |
'chest pain', 'breathing difficulty', 'stroke', 'heart attack', 'unconscious', | |
'severe bleeding', 'seizure', 'anaphylaxis', 'severe burn', 'choking', | |
'severe head injury', 'spinal injury', 'drowning', 'electric shock', | |
'severe allergic reaction', 'poisoning', 'overdose', 'self-harm', | |
'suicidal thoughts', 'severe trauma' | |
]) | |
URGENT_KEYWORDS: List[str] = field(default_factory=lambda: [ | |
'infection', 'high fever', 'severe pain', 'vomiting', 'dehydration', | |
'anxiety attack', 'panic attack', 'mental health crisis', 'broken bone', | |
'deep cut', 'asthma attack', 'migraine', 'severe rash', 'eye injury', | |
'dental emergency', 'pregnancy complications', 'severe back pain', | |
'severe abdominal pain', 'concussion', 'severe allergies' | |
]) | |
# UK Healthcare specific | |
EMERGENCY_NUMBERS: List[str] = field(default_factory=lambda: ["999", "112", "111"]) | |
GP_SERVICES: Dict[str, Dict[str, str]] = field(default_factory=lambda: { | |
"EMERGENCY": { | |
"name": "A&E", | |
"wait_time": "4 hours target", | |
"when_to_use": "Life-threatening emergencies" | |
}, | |
"URGENT": { | |
"name": "Urgent Care Center", | |
"wait_time": "2-4 hours typically", | |
"when_to_use": "Urgent but not life-threatening conditions" | |
}, | |
"NON_URGENT": { | |
"name": "GP Practice", | |
"wait_time": "Same day to 2 weeks", | |
"when_to_use": "Routine medical care" | |
} | |
}) | |
# Cultural considerations | |
CULTURAL_CONTEXTS: List[Dict[str, str]] = field(default_factory=lambda: [ | |
{ | |
"group": "South Asian", | |
"considerations": [ | |
"Different presentation of skin conditions", | |
"Higher diabetes risk", | |
"Cultural dietary practices", | |
"Language preferences" | |
] | |
}, | |
{ | |
"group": "African/Caribbean", | |
"considerations": [ | |
"Different presentation of skin conditions", | |
"Higher hypertension risk", | |
"Specific hair/scalp conditions", | |
"Cultural health beliefs" | |
] | |
}, | |
{ | |
"group": "Middle Eastern", | |
"considerations": [ | |
"Cultural modesty requirements", | |
"Ramadan considerations", | |
"Gender preferences for healthcare providers", | |
"Traditional medicine practices" | |
] | |
} | |
]) | |
class GPUOptimizedRAG: | |
def __init__( | |
self, | |
model_path: str = "google/gemma-7b", | |
embedding_model: str = "sentence-transformers/all-MiniLM-L6-v2", | |
config: MedicalConfig = MedicalConfig(), | |
use_cpu_fallback: bool = False | |
): | |
self.conversation_memory = { | |
'name': 'Pearly', | |
'role': 'GP Medical Assistant', | |
'style': 'professional, empathetic, and clear', | |
'system_prompt': None, | |
'past_interactions': [] | |
} | |
"""Initialize RAG with enhanced configuration, LoRA support, and memory optimization""" | |
self.config = config | |
self.use_cpu_fallback = use_cpu_fallback | |
# Determine device with memory check | |
if torch.cuda.is_available() and not use_cpu_fallback: | |
try: | |
# Check available GPU memory | |
gpu_memory = torch.cuda.get_device_properties(0).total_memory | |
if gpu_memory < 8e9: # Less than 8GB | |
logger.warning("Limited GPU memory, falling back to CPU") | |
self.device = torch.device("cpu") | |
else: | |
self.device = torch.device("cuda") | |
except Exception as e: | |
logger.warning(f"GPU initialization error: {e}, falling back to CPU") | |
self.device = torch.device("cpu") | |
else: | |
self.device = torch.device("cpu") | |
# Add conversation memory for consistent persona | |
self.conversation_memory = { | |
'name': 'Pearly', | |
'role': 'GP Medical Assistant', | |
'style': 'professional, empathetic, and clear', | |
'past_interactions': [] | |
} | |
# Add clinical quality metrics | |
self.clinical_metrics = { | |
'terminology_accuracy': 0.0, | |
'assessment_accuracy': 0.0, | |
'guideline_adherence': 0.0, | |
'symptom_recognition': 0.0 | |
} | |
# Initialize models with memory optimization | |
try: | |
self.tokenizer = AutoTokenizer.from_pretrained(model_path) | |
self.model = AutoModelForCausalLM.from_pretrained( | |
model_path, | |
torch_dtype=torch.float16 if self.device.type == "cuda" else torch.float32, | |
device_map="auto" if self.device.type == "cuda" else None, | |
low_cpu_mem_usage=True | |
) | |
# Apply LoRA configuration | |
self.setup_lora() | |
# Initialize embedding model with memory considerations | |
self.embedding_model = SentenceTransformer(embedding_model) | |
if self.device.type == "cuda": | |
self.embedding_model = self.embedding_model.half().to(self.device) | |
else: | |
self.embedding_model = self.embedding_model.to(self.device) | |
# Setup FAISS index with CPU fallback | |
self.embedding_dim = self.embedding_model.get_sentence_embedding_dimension() | |
if self.device.type == "cuda" and not use_cpu_fallback: | |
try: | |
self.index = faiss.IndexFlatIP(self.embedding_dim) | |
res = faiss.StandardGpuResources() | |
# Limit temp memory for GPU index | |
res.setTempMemory(64 * 1024 * 1024) # 64MB temp memory | |
self.index = faiss.index_cpu_to_gpu(res, 0, self.index) | |
except Exception as e: | |
logger.warning(f"GPU FAISS initialization failed: {e}, using CPU index") | |
self.index = faiss.IndexFlatIP(self.embedding_dim) | |
else: | |
self.index = faiss.IndexFlatIP(self.embedding_dim) | |
logger.info(f"RAG system initialized successfully on {self.device}") | |
except Exception as e: | |
logger.error(f"Error initializing RAG system: {e}") | |
raise | |
def setup_lora(self): | |
"""Configure and apply LoRA to the model with memory optimization""" | |
try: | |
lora_config = LoraConfig( | |
r=self.config.LORA_R, | |
lora_alpha=self.config.LORA_ALPHA, | |
target_modules=self.config.LORA_TARGET_MODULES, | |
lora_dropout=self.config.LORA_DROPOUT, | |
bias="none", | |
task_type=TaskType.CAUSAL_LM | |
) | |
self.model = get_peft_model(self.model, lora_config) | |
logger.info("LoRA configuration applied successfully") | |
except Exception as e: | |
logger.error(f"Error setting up LoRA: {e}") | |
raise | |
def evaluate_clinical_quality(self, response: str, expected_elements: List[str]) -> Dict[str, float]: | |
"""Add clinical quality evaluation matching test requirements""" | |
quality_metrics = { | |
'terminology_accuracy': self._evaluate_terminology(response, expected_elements), | |
'assessment_accuracy': self._evaluate_assessment(response), | |
'guideline_adherence': self._evaluate_guidelines(response), | |
'symptom_recognition': self._evaluate_symptoms(response, expected_elements) | |
} | |
return quality_metrics | |
def prepare_documents(self, documents: List[Dict]): | |
"""Enhanced document preparation with improved batching and memory management""" | |
self.documents = documents | |
embeddings = [] | |
try: | |
for i in tqdm(range(0, len(documents), self.config.INDEX_BATCH_SIZE), | |
desc="Processing documents"): | |
batch = documents[i:i + self.config.INDEX_BATCH_SIZE] | |
texts = [doc['text'] for doc in batch] | |
with torch.amp.autocast(device_type='cuda'): | |
batch_embeddings = self.embedding_model.encode( | |
texts, | |
convert_to_tensor=True, | |
show_progress_bar=False, | |
batch_size=8 | |
) | |
embeddings.append(batch_embeddings.cpu().numpy()) | |
all_embeddings = np.vstack(embeddings) | |
self.index.add(all_embeddings) | |
logger.info(f"Indexed {len(documents)} documents successfully") | |
except Exception as e: | |
logger.error(f"Error preparing documents: {e}") | |
raise | |
def assess_urgency(self, symptoms: str) -> Dict[str, Any]: | |
"""Enhanced symptom assessment with detailed analysis""" | |
symptoms_lower = symptoms.lower() | |
# Initialize response | |
assessment = { | |
'level': 'NON-URGENT', | |
'reasons': [], | |
'recommendations': [], | |
'follow_up_needed': False | |
} | |
# Check emergency keywords | |
emergency_matches = [kw for kw in self.config.EMERGENCY_KEYWORDS | |
if kw in symptoms_lower] | |
if emergency_matches: | |
assessment.update({ | |
'level': 'EMERGENCY', | |
'reasons': emergency_matches, | |
'recommendations': [ | |
'Call 999 immediately', | |
'Do not move if spinal injury suspected', | |
'Stay on the line for guidance' | |
], | |
'follow_up_needed': True | |
}) | |
return assessment | |
# Check urgent keywords | |
urgent_matches = [kw for kw in self.config.URGENT_KEYWORDS | |
if kw in symptoms_lower] | |
if urgent_matches: | |
assessment.update({ | |
'level': 'URGENT', | |
'reasons': urgent_matches, | |
'recommendations': [ | |
'Visit urgent care center', | |
'Book emergency GP appointment', | |
'Monitor symptoms closely' | |
], | |
'follow_up_needed': True | |
}) | |
return assessment | |
# Non-urgent default | |
assessment.update({ | |
'recommendations': [ | |
'Book routine GP appointment', | |
'Monitor symptoms', | |
'Try self-care measures' | |
], | |
'follow_up_needed': False | |
}) | |
return assessment | |
def generate_cultural_considerations(self, symptoms: str) -> List[str]: | |
"""Generate culturally-aware medical considerations""" | |
considerations = [] | |
symptoms_lower = symptoms.lower() | |
for context in self.config.CULTURAL_CONTEXTS: | |
relevant_considerations = [ | |
cons for cons in context['considerations'] | |
if any(keyword in symptoms_lower for keyword in cons.lower().split()) | |
] | |
if relevant_considerations: | |
considerations.extend([ | |
f"{context['group']}: {consideration}" | |
for consideration in relevant_considerations | |
]) | |
return considerations if considerations else ["No specific cultural considerations identified"] | |
def get_booking_template(self, urgency_level: str) -> str: | |
"""Get appropriate booking template based on urgency level""" | |
service_info = self.config.GP_SERVICES[urgency_level] | |
templates = { | |
"EMERGENCY": f""" | |
🚨 EMERGENCY SERVICES REQUIRED 🚨 | |
Service: {service_info['name']} | |
Target Wait Time: {service_info['wait_time']} | |
When to Use: {service_info['when_to_use']} | |
IMMEDIATE ACTIONS: | |
1. 🚑 Call 999 (or 112) | |
2. 🏥 Nearest A&E: [Location Placeholder] | |
3. 🚨 Stay on line for guidance | |
Type '999' to initiate emergency call | |
""", | |
"URGENT": f""" | |
⚡ URGENT CARE NEEDED ⚡ | |
Service: {service_info['name']} | |
Expected Wait: {service_info['wait_time']} | |
When to Use: {service_info['when_to_use']} | |
OPTIONS: | |
1. 🏥 Find nearest urgent care | |
2. 📅 Book urgent GP slot | |
3. 🔍 Locate walk-in clinic | |
Reply with option number (1-3) | |
""", | |
"NON_URGENT": f""" | |
📋 ROUTINE CARE BOOKING 📋 | |
Service: {service_info['name']} | |
Typical Wait: {service_info['wait_time']} | |
When to Use: {service_info['when_to_use']} | |
OPTIONS: | |
1. 📅 Schedule GP visit | |
2. 👨⚕️ Find local GP | |
3. ℹ️ Self-care advice | |
Reply with option number (1-3) | |
""" | |
} | |
return templates.get(urgency_level, templates["NON_URGENT"]) | |
def generate_response(self, query: str, chat_history: List[tuple] = None) -> Dict[str, Any]: | |
"""Generate response with enhanced conversational context""" | |
try: | |
# Placeholder for response generation logic | |
urgency_assessment = {'level': 'NORMAL'} # Placeholder urgency assessment | |
response_text = "This is a placeholder response." | |
# Example cultural considerations | |
cultural_considerations = ["Consider patient background"] | |
return { | |
'response': response_text, | |
'urgency_assessment': urgency_assessment, | |
'cultural_considerations': cultural_considerations | |
} | |
except Exception as e: | |
logger.error(f"Error generating response: {e}") | |
return { | |
'response': "I apologize, but I encountered an error. If this is an emergency, please call 999 immediately.", | |
'urgency_assessment': {'level': 'UNKNOWN'}, | |
'cultural_considerations': [] | |
} | |
def prepare_documents(self, documents): | |
"""Prepare and index documents for RAG system""" | |
pass # Placeholder for document preparation logic | |
def retrieve(self, query: str, k: int = 5) -> List[Dict]: | |
"""Retrieve relevant documents""" | |
try: | |
return [] # Placeholder for document retrieval | |
except Exception as e: | |
logger.error(f"Error in retrieval: {e}") | |
return [] | |
def generate_report(self, results: Dict) -> Dict: | |
"""Generate a simple summary report of the test results.""" | |
total_cases = sum(cat['total'] for cat in results.values()) | |
total_correct = sum(cat['correct'] for cat in results.values()) | |
return { | |
'timestamp': datetime.now().isoformat(), | |
'triage_performance': { | |
'emergency_accuracy': results['emergency']['correct'] / results['emergency']['total'], | |
'urgent_accuracy': results['urgent']['correct'] / results['urgent']['total'], | |
'non_urgent_accuracy': results['non_urgent']['correct'] / results['non_urgent']['total'], | |
'overall_accuracy': total_correct / total_cases | |
} | |
} | |
def enhance_response_generation(self): | |
"""Add test-aligned response enhancement""" | |
self.response_enhancers = { | |
'demographic_sensitivity': self._enhance_demographic_sensitivity, | |
'cultural_competency': self._enhance_cultural_competency, | |
'clinical_quality': self._enhance_clinical_quality, | |
'follow_up_generation': self._enhance_follow_up | |
} | |
def _enhance_demographic_sensitivity(self, response: str, demographic: str) -> str: | |
"""Add demographic-specific enhancements matching test requirements""" | |
demographic_patterns = { | |
'pediatric': ['age-appropriate', 'child-friendly', 'developmental'], | |
'elderly': ['mobility', 'cognitive', 'fall risk'], | |
'pregnant': ['trimester', 'fetal', 'pregnancy-safe'], | |
'chronic_condition': ['management', 'monitoring', 'ongoing care'] | |
} | |
return response # Placeholder implementation | |
def process_appointment_booking(message, patient_info): | |
"""Process appointment booking queries""" | |
return "I can help you book an appointment. Please provide further details." | |
def create_gradio_interface(rag_system: GPUOptimizedRAG): | |
"""Create enhanced Gradio interface with appointment booking capabilities""" | |
SYSTEM_MESSAGE = "You are Pearly, a friendly medical triaging Chatbot." | |
def process_chat_response(response_data: Dict[str, Any], message: str, history: List[tuple]) -> str: | |
"""Format chat response based on context, handle appointments, and maintain persona""" | |
try: | |
if not history or message.lower().startswith(("hi", "hello", "hey", "good")): | |
return "Hi! I'm Pearly, your medical triaging assistant. I'm here to help assess your symptoms and provide guidance. How may I assist you today?" | |
urgency_level = response_data['urgency_assessment']['level'] | |
response_text = response_data['response'] | |
if urgency_level == "EMERGENCY": | |
return f"🚨 EMERGENCY ALERT 🚨\n\n{response_text}\n\nWould you like me to help connect you to emergency services?" | |
elif urgency_level == "URGENT": | |
return f"⚠️ URGENT CARE NEEDED ⚠️\n\n{response_text}\n\nWould you like help finding your nearest urgent care center?" | |
else: | |
return f"{response_text}\n\nWould you like help booking a GP appointment or finding more NHS resources?" | |
except Exception as e: | |
logger.error(f"Error processing chat response: {e}") | |
return ( | |
"I'm Pearly, and I apologize for the technical difficulty. For your safety:\n\n" | |
"- Call 999 for emergencies\n" | |
"- Call 111 for urgent medical advice\n" | |
"- Visit NHS 111 online for non-urgent concerns\n\n" | |
"Would you like to try asking your question again?" | |
) | |
def chat(message: str, history: List[tuple]) -> tuple[str, List[tuple]]: | |
"""Enhanced chat function with better error handling and context awareness""" | |
try: | |
response_data = rag_system.generate_response(message, history) | |
response = process_chat_response(response_data, message, history) | |
history.append((message, response)) | |
return history | |
except Exception as e: | |
logger.error(f"Error in chat: {e}") | |
emergency_response = ( | |
"I apologize for the technical difficulty. For your safety:\n\n" | |
"- Call 999 for emergencies\n" | |
"- Call 111 for urgent medical advice\n" | |
"- Visit NHS 111 online for non-urgent concerns\n\n" | |
"Would you like to try asking your question again?" | |
) | |
history.append((message, emergency_response)) | |
return history | |
# Define interface for the chatbot | |
with gr.Blocks() as interface: | |
gr.HTML("<h1>Pearly Medical Assistant</h1><p>Hi! I'm Pearly, your GP medical assistant.</p>") | |
chatbot = gr.Chatbot(value=[(None, "Hi! I'm Pearly, your GP medical assistant. How can I help you today?")]) | |
msg = gr.Textbox(label="Your Message") | |
submit = gr.Button("Send") | |
submit.click(chat, inputs=[msg, chatbot], outputs=chatbot) | |
return interface | |
def prepare_medical_documents(): | |
"""Prepare medical knowledge base documents with enhanced conversation flow""" | |
try: | |
logger.info("Loading medical and persona datasets...") | |
datasets = { | |
"persona": load_dataset("AlekseyKorshuk/persona-chat", split="train[:500]"), | |
"medqa": load_dataset("medalpaca/medical_meadow_medqa", split="train[:500]"), | |
"meddia": load_dataset("wasiqnauman/medical-diagnosis-synthetic", split="train[:500]") | |
} | |
documents = [] | |
# Process Persona dataset for enhanced conversational style | |
logger.info("Processing persona dataset...") | |
for item in datasets["persona"]: | |
if isinstance(item.get('personality'), list): | |
personality = " ".join(item['personality']) | |
documents.append({ | |
'text': f""" | |
Conversation Style Guide: | |
Personality: {personality} | |
Role: Pearly - Medical Assistant | |
Core Traits: Professional, empathetic, clear | |
Key Behaviors: | |
- Always introduce as Pearly | |
- Show empathy for symptoms | |
- Ask relevant follow-up questions | |
- Offer practical assistance | |
- Maintain professional tone while being approachable | |
""", | |
'type': 'persona' | |
}) | |
# Process conversation examples with enhanced structure | |
if isinstance(item.get('utterances'), list): | |
for utterance in item['utterances']: | |
if isinstance(utterance, dict) and 'history' in utterance: | |
conversation = ' '.join(utterance['history']) | |
documents.append({ | |
'text': f""" | |
Medical Consultation Pattern: | |
Conversation: {conversation} | |
Key Elements: | |
- Show understanding of symptoms | |
- Ask clarifying questions | |
- Provide clear guidance | |
- Offer next steps | |
- Check if assistance needed | |
""", | |
'type': 'conversation_pattern' | |
}) | |
# Process MedQA dataset with enhanced medical context | |
logger.info("Processing medical QA dataset...") | |
for item in datasets["medqa"]: | |
if 'input' in item and 'output' in item: | |
input_text = item['input'] | |
if input_text.startswith('Q:'): | |
input_text = input_text[2:] | |
documents.append({ | |
'text': f""" | |
Medical Knowledge Base: | |
Question: {input_text} | |
Answer: {item['output']} | |
Application: | |
- Use information to inform recommendations | |
- Adapt to patient's situation | |
- Maintain clinical accuracy | |
- Explain in clear terms | |
""", | |
'type': 'medical_qa' | |
}) | |
# Process diagnosis dataset with structured guidance | |
logger.info("Processing diagnosis dataset...") | |
for item in datasets["meddia"]: | |
if 'input' in item and 'output' in item: | |
documents.append({ | |
'text': f""" | |
Clinical Assessment Framework: | |
Symptoms: {item['input']} | |
Assessment and Plan: {item['output']} | |
Response Structure: | |
1. Acknowledge symptoms | |
2. Ask about severity and duration | |
3. Inquire about related symptoms | |
4. Provide clear recommendations | |
5. Offer assistance with next steps | |
""", | |
'type': 'diagnosis_guidance' | |
}) | |
# Add enhanced conversation templates | |
conversation_templates = [ | |
{ | |
'text': """ | |
Consultation Framework: | |
1. Initial Response: | |
- Acknowledge the concern | |
- Show empathy | |
- Ask about duration/severity | |
2. Follow-up Questions: | |
- Ask specific, relevant questions | |
- Clarify symptoms | |
- Check for related issues | |
3. Assessment: | |
- Summarize findings | |
- Explain reasoning | |
- State level of concern | |
4. Recommendations: | |
- Provide clear guidance | |
- List specific actions | |
- Offer assistance | |
5. Next Steps: | |
- Suggest appropriate care level | |
- Offer to help with appointments | |
- Provide relevant resources | |
6. Safety Checks: | |
- Verify understanding | |
- Confirm action plan | |
- Ensure patient comfort | |
Response Patterns: | |
Emergency: | |
"I understand you're experiencing [symptom]. This requires immediate attention..." | |
Urgent: | |
"I'm concerned about [symptom]. To better assess the urgency..." | |
Non-urgent: | |
"I hear you about [symptom]. Let's understand this better..." | |
Follow-ups: | |
"Would you like help with [specific assistance]?" | |
"Shall I help you [book/find/connect]?" | |
Guidance Structure: | |
1. Acknowledge | |
2. Assess | |
3. Advise | |
4. Assist | |
5. Arrange follow-up | |
""", | |
'type': 'conversation_framework' | |
} | |
] | |
documents.extend(conversation_templates) | |
# Add your existing GP templates | |
# Define GP templates inside the function | |
GP_templates = [ | |
{ | |
'text': """ | |
Role: GP Medical Assistant | |
Context: Emergency situation - Chest Pain | |
Style: Calm, clear, and authoritative | |
Conversation Pattern: | |
Patient Input: Chest pain symptoms | |
Response Framework: | |
1. Initial acknowledgment | |
2. Immediate triage questions: | |
- Duration of pain | |
- Pain characteristics | |
- Associated symptoms | |
3. Emergency guidance | |
4. Clear action steps | |
Example Flow: | |
Patient: I'm having chest pain | |
Pearly: I understand you're experiencing chest pain. This requires immediate attention. How long have you been experiencing this pain? | |
Patient: About 30 minutes now | |
Pearly: Thank you for letting me know. Is the pain sharp or dull, and where exactly is it located? Also, are you experiencing any other symptoms like shortness of breath or sweating? | |
Patient: It's sharp and in the center of my chest. Yes, I'm sweating a lot | |
Pearly: Given your symptoms, this could be a serious cardiac event requiring immediate emergency care. | |
IMMEDIATE ACTIONS: | |
1. Call 999 immediately - I can help you do this | |
2. Sit down and try to stay calm | |
3. If available and you're not allergic, take one adult aspirin (300mg) | |
4. Loosen any tight clothing | |
5. Keep the line open - I'll stay with you until help arrives | |
Would you like me to connect you to emergency services now? | |
""", | |
'type': 'GP_template' | |
}, | |
{ | |
'text': """ | |
Role: GP Medical Assistant | |
Context: Non-emergency consultation - Persistent Cough | |
Style: Empathetic and thorough | |
Conversation Pattern: | |
Patient Input: Cough symptoms | |
Response Framework: | |
1. Show empathy | |
2. Gather specific details: | |
- Type of cough | |
- Duration | |
- Associated symptoms | |
3. Risk assessment | |
4. Clear recommendations | |
Example Flow: | |
Patient: I've had a cough for 5 days now | |
Pearly: I'm sorry to hear you've been unwell. To better understand your condition, is it a dry cough or are you producing any phlegm? | |
Patient: It's mostly dry but sometimes I cough up clear phlegm | |
Pearly: Thank you for that detail. Have you experienced any fever, and are you having any difficulty breathing? | |
Patient: No fever but I feel a bit short of breath sometimes | |
Pearly: I understand. A few more important questions: | |
1. Have you been exposed to anyone with COVID-19 or other respiratory infections? | |
2. Do you have any underlying health conditions? | |
3. Are your symptoms worse at any particular time of day? | |
Would you like help booking an appointment with your GP? | |
""", | |
'type': 'GP_template' | |
}, | |
{ | |
'text': """ | |
Role: GP Medical Assistant | |
Context: Mental Health Support | |
Style: Empathetic, patient, and supportive | |
Conversation Pattern: | |
Patient Input: Mental health concerns | |
Response Framework: | |
1. Validate feelings | |
2. Gentle exploration: | |
- Duration | |
- Impact on daily life | |
- Support systems | |
3. Clear support options | |
4. Immediate resources | |
Example Flow: | |
Patient: I've been feeling really anxious and can't sleep | |
Pearly: I hear you, and I want you to know it's brave to speak up about this. How long have you been experiencing these feelings? | |
Patient: About a month now, getting worse | |
Pearly: Thank you for sharing that. To better understand your situation: | |
1. Are there any specific triggers you've noticed? | |
2. How is this affecting your daily activities? | |
3. Have you had any previous mental health support? | |
Would you like information about immediate support services or help booking a GP appointment? | |
""", | |
'type': 'GP_template' | |
} | |
] | |
# Add all templates to documents | |
documents.extend(GP_templates) | |
logger.info(f"Prepared {len(documents)} documents including:") | |
logger.info(f"- {len([d for d in documents if d['type'] == 'persona'])} persona guides") | |
logger.info(f"- {len([d for d in documents if d['type'] == 'conversation_pattern'])} conversation patterns") | |
logger.info(f"- {len([d for d in documents if d['type'] == 'medical_qa'])} medical QA pairs") | |
logger.info(f"- {len([d for d in documents if d['type'] == 'diagnosis_guidance'])} diagnosis guidelines") | |
logger.info(f"- {len([d for d in documents if d['type'] == 'conversation_framework'])} conversation frameworks") | |
logger.info(f"- {len([d for d in documents if d['type'] == 'GP_template'])} GP templates") | |
return documents | |
except Exception as e: | |
logger.error(f"Error preparing medical documents: {e}") | |
# Print sample data for debugging | |
for dataset_name, dataset in datasets.items(): | |
try: | |
sample = dataset[0] | |
logger.error(f"\nSample from {dataset_name}:") | |
logger.error(f"Keys: {list(sample.keys())}") | |
logger.error(f"Sample content: {str(sample)[:500]}") | |
except Exception as debug_e: | |
logger.error(f"Error inspecting {dataset_name}: {debug_e}") | |
raise | |
def generate_response(self, query: str, chat_history: List[tuple] = None) -> Dict[str, Any]: | |
"""Generate response with enhanced conversational context and persona""" | |
try: | |
# Update conversation memory | |
if chat_history: | |
self.conversation_memory['past_interactions'] = chat_history[-3:] | |
# Retrieve relevant documents with boosted weights for persona matches | |
retrieved_docs = self.retrieve(query, k=7) | |
# Separate documents by type | |
medical_docs = [doc for doc in retrieved_docs if doc['document']['type'] in ['medical_qa', 'diagnosis']] | |
persona_docs = [doc for doc in retrieved_docs if doc['document']['type'] in ['persona', 'conversation', 'GP_template']] | |
# Build context with weighted emphasis on different document types | |
medical_context = " ".join([doc['document']['text'] for doc in medical_docs]) | |
persona_context = " ".join([doc['document']['text'] for doc in persona_docs]) | |
# Assess urgency and get considerations | |
urgency_assessment = self.assess_urgency(query) | |
cultural_considerations = self.generate_cultural_considerations(query) | |
# Build conversation history context | |
history_context = "" | |
if chat_history: | |
history_context = "\n".join([f"Human: {h}\nPearly: {a}" for h, a in chat_history[-3:]]) | |
# Add persona reminder | |
persona_reminder = f""" | |
I am {self.conversation_memory['name']}, a {self.conversation_memory['role']}. | |
My communication style is {self.conversation_memory['style']}. | |
""" | |
# Create enhanced prompt with persona integration | |
prompt = f"""Context: | |
Medical Information: {medical_context} | |
{persona_reminder} | |
Previous Interactions: | |
{history_context} | |
Current Query: {query} | |
Maintain my identity as {self.conversation_memory['name']}, the {self.conversation_memory['role']}, | |
providing clear, professional guidance following NHS protocols. | |
Urgency Level: {urgency_assessment['level']} | |
Cultural Considerations: {', '.join(cultural_considerations)} | |
Respond in a clear, caring manner, always referring to myself as {self.conversation_memory['name']}. | |
Response:""" | |
# Generate response | |
inputs = self.tokenizer( | |
prompt, | |
return_tensors="pt", | |
max_length=self.config.MAX_LENGTH, | |
truncation=True | |
).to(self.device) | |
with torch.amp.autocast(device_type='cuda'): | |
outputs = self.model.generate( | |
**inputs, | |
max_new_tokens=512, | |
do_sample=True, | |
top_p=0.9, | |
temperature=0.7, | |
num_return_sequences=1, | |
pad_token_id=self.tokenizer.eos_token_id | |
) | |
response = self.tokenizer.decode(outputs[0], skip_special_tokens=True) | |
response = response.split("Response:")[-1].strip() | |
# Add booking template for emergency/urgent cases | |
if urgency_assessment['level'] in ["EMERGENCY", "URGENT"]: | |
booking_template = self.get_booking_template(urgency_assessment['level']) | |
response = f"{response}\n\n{booking_template}" | |
return { | |
'response': response, | |
'urgency_assessment': urgency_assessment, | |
'cultural_considerations': cultural_considerations | |
} | |
except Exception as e: | |
logger.error(f"Error generating response: {e}") | |
return { | |
'response': "I apologize, but I encountered an error. If this is an emergency, please call 999 immediately.", | |
'urgency_assessment': {'level': 'UNKNOWN'}, | |
'cultural_considerations': [] | |
} | |
def check_urgency_accuracy(self, predicted: str, expected: str) -> float: | |
"""Check if urgency level matches expected""" | |
return 1.0 if predicted == expected else 0.0 | |
def check_action_accuracy(self, response: str, expected_actions: List[str]) -> float: | |
"""Check if recommended actions match expected""" | |
if not expected_actions: | |
return 1.0 | |
found_actions = sum(1 for action in expected_actions | |
if action.lower() in response.lower()) | |
return found_actions / len(expected_actions) | |
def assess_conversation_quality(self, response: str) -> float: | |
"""Assess conversation quality metrics""" | |
metrics = { | |
'empathy': any(word in response.lower() | |
for word in ['understand', 'hear you', 'sorry']), | |
'clarity': len(response.split('.')) <= 5, # Check for concise sentences | |
'follow_up': '?' in response, # Check for follow-up questions | |
'structure': any(word in response.lower() | |
for word in ['first', 'then', 'next', 'finally']) | |
} | |
return sum(metrics.values()) / len(metrics) | |
def check_cultural_sensitivity(self, response_data: Dict, context: str) -> float: | |
"""Check cultural sensitivity of response""" | |
if not context: | |
return 1.0 | |
cultural_considerations = response_data.get('cultural_considerations', []) | |
return 1.0 if any(context.lower() in cons.lower() | |
for cons in cultural_considerations) else 0.0 | |
def setup_wandb(config: MedicalConfig): | |
"""Setup Weights & Biases tracking""" | |
try: | |
wandb.init( | |
project="medical-chatbot", | |
config={ | |
"learning_rate": config.LEARNING_RATE, | |
"epochs": config.NUM_EPOCHS, | |
"batch_size": config.TRAINING_BATCH_SIZE, | |
"lora_r": config.LORA_R, | |
"lora_alpha": config.LORA_ALPHA | |
} | |
) | |
logger.info("Weights & Biases initialized successfully") | |
except Exception as e: | |
logger.warning(f"Failed to initialize Weights & Biases: {e}") | |
logger.warning("Continuing without wandb tracking") | |
if __name__ == "__main__": | |
try: | |
# Initialize configuration | |
config = MedicalConfig() | |
# Setup wandb tracking | |
setup_wandb(config) | |
# Initialize RAG system | |
logger.info("Initializing RAG system...") | |
rag_system = GPUOptimizedRAG(config=config) | |
# Prepare and index documents | |
logger.info("Preparing medical knowledge base...") | |
medical_documents = prepare_medical_documents() | |
rag_system.prepare_documents(medical_documents) | |
# Create and launch Gradio interface | |
interface = create_gradio_interface(rag_system) | |
if interface is None: | |
raise ValueError("Failed to create Gradio interface - interface object is None") | |
# Launch the interface | |
interface.launch(server_name="0.0.0.0", server_port=7860, share=True) | |
except Exception as e: | |
logger.error(f"Application startup error: {e}") | |
raise | |