Spaces:
Runtime error
Runtime error
from gradio_client import Client | |
import numpy as np | |
import gradio as gr | |
import requests | |
import json | |
import dotenv | |
import soundfile as sf | |
import time | |
import textwrap | |
from PIL import Image | |
from transformers import AutoTokenizer, AutoModelForCausalLM | |
import torch | |
import os | |
import uuid | |
import optimum | |
welcome_message = """ | |
# 👋🏻Welcome to ⚕🗣️😷TruEra - MultiMed ⚕🗣️😷 | |
🗣️📝 This is an accessible and multimodal tool optimized using TruEra! We evaluated several configurations, prompts, and models to optimize this application. | |
### How To Use ⚕🗣️😷TruEra - MultiMed⚕: | |
🗣️📝Interact with ⚕🗣️😷TruEra - MultiMed⚕ in any language using image, audio or text. ⚕🗣️😷TruEra - MultiMed is an accessible application 📚🌟💼 that uses [Qwen/Qwen-1_8B-Chat](https://huggingface.co/Qwen/Qwen-1_8B-Chat) and [Tonic1/Official-Qwen-VL-Chat](https://huggingface.co/Qwen/Qwen-VL-Chat) with [Vectara](https://huggingface.co/vectara) embeddings + retrieval w/ [facebook/seamless-m4t-v2-large](https://huggingface.co/facebook/hf-seamless-m4t-large) for audio translation & accessibility. | |
do [get in touch](https://discord.gg/GWpVpekp). You can also use 😷TruEra MultiMed⚕️ on your own data & in your own way by cloning this space. 🧬🔬🔍 Simply click here: <a style="display:inline-block" href="https://huggingface.co/spaces/TeamTonic/MultiMed?duplicate=true"><img src="https://img.shields.io/badge/-Duplicate%20Space-blue?labelColor=white&style=flat&logo=&logoWidth=14" alt="Duplicate Space"></a></h3> | |
### Join us : | |
🌟TeamTonic🌟 is always making cool demos! Join our active builder's🛠️community on 👻Discord: [Discord](https://discord.gg/GWpVpekp) On 🤗Huggingface: [TeamTonic](https://huggingface.co/TeamTonic) & [MultiTransformer](https://huggingface.co/MultiTransformer) On 🌐Github: [Polytonic](https://github.com/tonic-ai) & contribute to 🌟 [PolyGPT](https://github.com/tonic-ai/polygpt-alpha)" | |
""" | |
languages = { | |
"English": "eng", | |
"Modern Standard Arabic": "arb", | |
"Bengali": "ben", | |
"Catalan": "cat", | |
"Czech": "ces", | |
"Mandarin Chinese": "cmn", | |
"Welsh": "cym", | |
"Danish": "dan", | |
"German": "deu", | |
"Estonian": "est", | |
"Finnish": "fin", | |
"French": "fra", | |
"Hindi": "hin", | |
"Indonesian": "ind", | |
"Italian": "ita", | |
"Japanese": "jpn", | |
"Korean": "kor", | |
"Maltese": "mlt", | |
"Dutch": "nld", | |
"Western Persian": "pes", | |
"Polish": "pol", | |
"Portuguese": "por", | |
"Romanian": "ron", | |
"Russian": "rus", | |
"Slovak": "slk", | |
"Spanish": "spa", | |
"Swedish": "swe", | |
"Swahili": "swh", | |
"Telugu": "tel", | |
"Tagalog": "tgl", | |
"Thai": "tha", | |
"Turkish": "tur", | |
"Ukrainian": "ukr", | |
"Urdu": "urd", | |
"Northern Uzbek": "uzn", | |
"Vietnamese": "vie" | |
} | |
# Global variables to hold component references | |
components = {} | |
dotenv.load_dotenv() | |
seamless_client = Client("https://facebook-seamless-m4t.hf.space/--replicas/7sv2b/") #TruEra | |
HuggingFace_Token = os.getenv("HuggingFace_Token") | |
hf_token = os.getenv("HuggingFace_Token") | |
device = "cuda" if torch.cuda.is_available() else "cpu" | |
image_description = "" | |
# audio_output = "" | |
# global markdown_output | |
# global audio_output | |
def check_hallucination(assertion, citation): | |
print("Entering check_hallucination function") | |
api_url = "https://api-inference.huggingface.co/models/vectara/hallucination_evaluation_model" | |
header = {"Authorization": f"Bearer {hf_token}"} | |
payload = {"inputs": f"{assertion} [SEP] {citation}"} | |
response = requests.post(api_url, headers=header, json=payload, timeout=120) | |
output = response.json() | |
output = output[0][0]["score"] | |
print(f"check_hallucination output: {output}") | |
return f"**hallucination score:** {output}" | |
# Define the API parameters | |
vapi_url = "https://api-inference.huggingface.co/models/vectara/hallucination_evaluation_model" | |
headers = {"Authorization": f"Bearer {hf_token}"} | |
# Function to query the API | |
def query(payload): | |
print("Entering query function") | |
response = requests.post(vapi_url, headers=headers, json=payload) | |
print(f"API response: {response.json()}") | |
return response.json() | |
# Function to evaluate hallucination | |
def evaluate_hallucination(input1, input2): | |
print("Entering evaluate_hallucination function") | |
combined_input = f"{input1}[SEP]{input2}" | |
output = query({"inputs": combined_input}) | |
score = output[0][0]['score'] | |
if score < 0.5: | |
label = f"🔴 High risk. Score: {score:.2f}" | |
else: | |
label = f"🟢 Low risk. Score: {score:.2f}" | |
print(f"evaluate_hallucination label: {label}") | |
return label | |
def save_audio(audio_input, output_dir="saved_audio"): | |
if not os.path.exists(output_dir): | |
os.makedirs(output_dir) | |
# Extract sample rate and audio data | |
sample_rate, audio_data = audio_input | |
# Generate a unique file name | |
file_name = f"audio_{int(time.time())}.wav" | |
file_path = os.path.join(output_dir, file_name) | |
# Save the audio file | |
sf.write(file_path, audio_data, sample_rate) | |
return file_path | |
def save_image(image_input, output_dir="saved_images"): | |
print("Entering save_image function") | |
if not os.path.exists(output_dir): | |
os.makedirs(output_dir) | |
if isinstance(image_input, np.ndarray): | |
image = Image.fromarray(image_input) | |
file_name = f"image_{int(time.time())}.png" | |
file_path = os.path.join(output_dir, file_name) | |
image.save(file_path) | |
print(f"Image saved at: {file_path}") | |
return file_path | |
else: | |
raise ValueError("Invalid image input type") | |
def process_image(image_file_path): | |
print("Entering process_image function") | |
client = Client("https://tonic1-official-qwen-vl-chat.hf.space/--replicas/8w8ws/") # TruEra | |
try: | |
result = client.predict( | |
"Describe this image in detail, identify every detail in this image. Describe the image the best you can.", | |
image_file_path, | |
fn_index=0 | |
) | |
print(f"Image processing result: {result}") | |
return result | |
except Exception as e: | |
print(f"Error in process_image: {e}") | |
return f"Error occurred during image processing: {e}" | |
def process_speech(audio_input, source_language, target_language="English"): | |
print("Entering process_speech function") | |
if audio_input is None: | |
return "No audio input provided." | |
try: | |
result = seamless_client.predict( | |
audio_input, | |
source_language, | |
target_language, | |
api_name="/s2tt" | |
) | |
print(f"Speech processing result: {result}") | |
return result | |
except Exception as e: | |
print(f"Error in process_speech: {str(e)}") | |
return f"Error in speech processing: {str(e)}" | |
def convert_text_to_speech(input_text, source_language, target_language): | |
print("Entering convert_text_to_speech function") | |
try: | |
result = seamless_client.predict( | |
input_text, | |
source_language, | |
target_language, | |
api_name="/t2st" | |
) | |
audio_file_path = result[0] if result else None | |
translated_text = result[1] if result else "" | |
print(f"Text-to-speech conversion result: Audio file path: {audio_file_path}, Translated text: {translated_text}") | |
return audio_file_path, translated_text | |
except Exception as e: | |
print(f"Error in convert_text_to_speech: {str(e)}") | |
return None, f"Error in text-to-speech conversion: {str(e)}" | |
def query_vectara(text): | |
user_message = text | |
customer_id = os.getenv('CUSTOMER_ID') | |
corpus_id = os.getenv('CORPUS_ID') | |
api_key = os.getenv('API_KEY') | |
# Define the headers | |
api_key_header = { | |
"customer-id": customer_id, | |
"x-api-key": api_key | |
} | |
# Define the request body in the structure provided in the example | |
request_body = { | |
"query": [ | |
{ | |
"query": user_message, | |
"queryContext": "", | |
"start": 1, | |
"numResults": 25, | |
"contextConfig": { | |
"charsBefore": 0, | |
"charsAfter": 0, | |
"sentencesBefore": 2, | |
"sentencesAfter": 2, | |
"startTag": "%START_SNIPPET%", | |
"endTag": "%END_SNIPPET%", | |
}, | |
"rerankingConfig": { | |
"rerankerId": 272725718, | |
"mmrConfig": { | |
"diversityBias": 0.35 | |
} | |
}, | |
"corpusKey": [ | |
{ | |
"customerId": customer_id, | |
"corpusId": corpus_id, | |
"semantics": 0, | |
"metadataFilter": "", | |
"lexicalInterpolationConfig": { | |
"lambda": 0 | |
}, | |
"dim": [] | |
} | |
], | |
"summary": [ | |
{ | |
"maxSummarizedResults": 5, | |
"responseLang": "auto", | |
"summarizerPromptName": "vectara-summary-ext-v1.2.0" | |
} | |
] | |
} | |
] | |
} | |
# Make the API request using Gradio | |
response = requests.post( | |
"https://api.vectara.io/v1/query", | |
json=request_body, # Use json to automatically serialize the request body | |
verify=True, | |
headers=api_key_header | |
) | |
if response.status_code == 200: | |
query_data = response.json() | |
if query_data: | |
sources_info = [] | |
# Extract the summary. | |
summary = query_data['responseSet'][0]['summary'][0]['text'] | |
# Iterate over all response sets | |
for response_set in query_data.get('responseSet', []): | |
# Extract sources | |
# Limit to top 5 sources. | |
for source in response_set.get('response', [])[:5]: | |
source_metadata = source.get('metadata', []) | |
source_info = {} | |
for metadata in source_metadata: | |
metadata_name = metadata.get('name', '') | |
metadata_value = metadata.get('value', '') | |
if metadata_name == 'title': | |
source_info['title'] = metadata_value | |
elif metadata_name == 'author': | |
source_info['author'] = metadata_value | |
elif metadata_name == 'pageNumber': | |
source_info['page number'] = metadata_value | |
if source_info: | |
sources_info.append(source_info) | |
result = {"summary": summary, "sources": sources_info} | |
return f"{json.dumps(result, indent=2)}" | |
else: | |
return "No data found in the response." | |
else: | |
return f"Error: {response.status_code}" | |
def wrap_text(text, width=90): | |
print("Wrapping text...") | |
lines = text.split('\n') | |
wrapped_lines = [textwrap.fill(line, width=width) for line in lines] | |
wrapped_text = '\n'.join(wrapped_lines) | |
return wrapped_text | |
tokenizer = AutoTokenizer.from_pretrained("Qwen/Qwen-1_8B-Chat", trust_remote_code=True) #TruEra | |
model = AutoModelForCausalLM.from_pretrained("Qwen/Qwen-1_8B-Chat", device_map="auto", trust_remote_code=True).eval() | |
class ChatBot: | |
def __init__(self): | |
self.history = None | |
def predict(self, user_input, system_prompt=""): | |
print("Generating prediction...") | |
response, self.history = model.chat(tokenizer, user_input, history=self.history, system=system_prompt) | |
return response | |
bot = ChatBot() | |
def multimodal_prompt(user_input, system_prompt="You are an expert medical analyst:"): | |
print("Processing multimodal prompt...") | |
return bot.predict(user_input, system_prompt) | |
def process_summary_with_qwen(summary): | |
print("Processing summary with Qwen...") | |
system_prompt = "You are a medical instructor. Assess and describe the proper options to your students in minute detail. Propose a course of action for them to base their recommendations on based on your description." | |
response_text = bot.predict(summary, system_prompt) | |
return response_text | |
def process_and_query(input_language=None, audio_input=None, image_input=None, text_input=None): | |
try: | |
print("Processing and querying...") | |
combined_text = "" | |
markdown_output = "" | |
image_text = "" | |
print(f"Image Input Type: {type(image_input)}, Audio Input Type: {type(audio_input)}") | |
if image_input is not None: | |
print("Processing image input...") | |
image_file_path = save_image(image_input) | |
image_text = process_image(image_file_path) | |
combined_text += "\n\n**Image Input:**\n" + image_text | |
elif audio_input is not None: | |
print("Processing audio input...") | |
sample_rate, audio_data = audio_input | |
audio_file_path = save_audio(audio_input) | |
audio_text = process_speech(audio_file_path, input_language, "English") | |
combined_text += "\n\n**Audio Input:**\n" + audio_text | |
elif text_input is not None and text_input.strip(): | |
print("Processing text input...") | |
combined_text += "The user asks the query above to his health adviser: " + text_input | |
else: | |
return "Error: Please provide some input (text, audio, or image)." | |
if image_text: | |
markdown_output += "\n### Original Image Description\n" | |
markdown_output += image_text + "\n" | |
print("Querying Vectara...") | |
vectara_response_json = query_vectara(combined_text) | |
vectara_response = json.loads(vectara_response_json) | |
summary = vectara_response.get('summary', 'No summary available') | |
sources_info = vectara_response.get('sources', []) | |
markdown_output = "### Vectara Response Summary\n" | |
markdown_output += f"* **Summary**: {summary}\n" | |
markdown_output += "### Sources Information\n" | |
for source in sources_info: | |
markdown_output += f"* {source}\n" | |
final_response = process_summary_with_qwen(summary) | |
print("Converting text to speech...") | |
target_language = "English" | |
audio_output, translated_text = convert_text_to_speech(final_response, target_language, input_language) | |
print("Evaluating hallucination...") | |
try: | |
hallucination_label = evaluate_hallucination(final_response, summary) | |
except Exception as e: | |
print(f"Error in hallucination evaluation: {e}") | |
hallucination_label = "Evaluation skipped due to the model loading. For evaluation results, please try again in 29 minutes." | |
markdown_output += "\n### Processed Summary with Qwen\n" | |
markdown_output += final_response + "\n" | |
markdown_output += "\n### Hallucination Evaluation\n" | |
markdown_output += f"* **Label**: {hallucination_label}\n" | |
markdown_output += "\n### Translated Text\n" | |
markdown_output += translated_text + "\n" | |
return markdown_output, audio_output | |
except Exception as e: | |
print(f"Error occurred: {e}") | |
return f"Error occurred during processing: {e}.", None | |
def clear(): | |
return "English", None, None, "", None | |
def create_interface(): | |
with gr.Blocks(theme='ParityError/Anime') as interface: | |
# Display the welcome message | |
gr.Markdown(welcome_message) | |
# Extract the full names of the languages | |
language_names = list(languages.keys()) | |
# Add a 'None' or similar option to represent no selection | |
input_language_options = ["None"] + language_names | |
# Create a dropdown for language selection | |
input_language = gr.Dropdown(input_language_options, label="Select the language", value="English", interactive=True) | |
with gr.Accordion("Use Voice", open=False) as voice_accordion: | |
audio_input = gr.Audio(label="Speak") | |
audio_output = gr.Markdown(label="Output text") # Markdown component for audio | |
gr.Examples([["audio1.wav"], ["audio2.wav"], ], inputs=[audio_input]) | |
with gr.Accordion("Use a Picture", open=False) as picture_accordion: | |
image_input = gr.Image(label="Upload image") | |
image_output = gr.Markdown(label="Output text") # Markdown component for image | |
gr.Examples([["image1.png"], ["image2.jpeg"], ["image3.jpeg"], ], inputs=[image_input]) | |
with gr.Accordion("MultiMed", open=False) as multimend_accordion: | |
text_input = gr.Textbox(label="Use Text", lines=3, placeholder="I have had a sore throat and phlegm for a few days and now my cough has gotten worse!") | |
gr.Examples([ | |
["What is the proper treatment for buccal herpes?"], | |
["I have had a sore throat and hoarse voice for several days and now a strong cough recently "], | |
["How does cellular metabolism work TCA cycle"], | |
["What special care must be provided to children with chicken pox?"], | |
["When and how often should I wash my hands?"], | |
["بکل ہرپس کا صحیح علاج کیا ہے؟"], | |
["구강 헤르페스의 적절한 치료법은 무엇입니까?"], | |
["Je, ni matibabu gani sahihi kwa herpes ya buccal?"], | |
], inputs=[text_input]) | |
text_output = gr.Markdown(label="MultiMed") | |
audio_output = gr.Audio(label="Audio Out", type="filepath") | |
text_button = gr.Button("Use MultiMed") | |
text_button.click(process_and_query, inputs=[input_language, audio_input, image_input, text_input], outputs=[text_output, audio_output]) | |
clear_button = gr.Button("Clear") | |
clear_button.click(clear, inputs=[], outputs=[input_language, audio_input, image_input, text_output, audio_output]) | |
return interface | |
app = create_interface() | |
app.launch(show_error=True, debug=True) |