Spaces:
Runtime error
Runtime error
# Welcome to Team Tonic's MultiMed | |
import os | |
import numpy as np | |
import base64 | |
import torch | |
import torchaudio | |
import gradio as gr | |
import requests | |
import json | |
import dotenv | |
from transformers import AutoProcessor, SeamlessM4TModel | |
from lang_list import ( | |
LANGUAGE_NAME_TO_CODE, | |
S2ST_TARGET_LANGUAGE_NAMES, | |
S2TT_TARGET_LANGUAGE_NAMES, | |
T2TT_TARGET_LANGUAGE_NAMES, | |
TEXT_SOURCE_LANGUAGE_NAMES, | |
LANG_TO_SPKR_ID, | |
) | |
dotenv.load_dotenv() | |
DEFAULT_TARGET_LANGUAGE = "English" | |
AUDIO_SAMPLE_RATE = 16000.0 | |
MAX_INPUT_AUDIO_LENGTH = 60 # in seconds | |
def predict( | |
task_name: str, | |
audio_source: str, | |
input_audio_mic: str | None, | |
input_audio_file: str | None, | |
input_text: str | None, | |
source_language: str | None, | |
target_language: str, | |
) -> tuple[tuple[int, np.ndarray] | None, str]: | |
task_name = task_name.split()[0] | |
source_language_code = LANGUAGE_NAME_TO_CODE[source_language] if source_language else None | |
target_language_code = LANGUAGE_NAME_TO_CODE[target_language] | |
if task_name in ["S2ST", "S2TT", "ASR"]: | |
if audio_source == "microphone": | |
input_data = input_audio_mic | |
else: | |
input_data = input_audio_file | |
arr, org_sr = torchaudio.load(input_data) | |
new_arr = torchaudio.functional.resample(arr, orig_freq=org_sr, new_freq=AUDIO_SAMPLE_RATE) | |
max_length = int(MAX_INPUT_AUDIO_LENGTH * AUDIO_SAMPLE_RATE) | |
if new_arr.shape[1] > max_length: | |
new_arr = new_arr[:, :max_length] | |
gr.Warning(f"Input audio is too long. Only the first {MAX_INPUT_AUDIO_LENGTH} seconds is used.") | |
input_data = processor(audios = new_arr, sampling_rate=AUDIO_SAMPLE_RATE, return_tensors="pt").to(device) | |
else: | |
input_data = processor(text = input_text, src_lang=source_language_code, return_tensors="pt").to(device) | |
if task_name in ["S2TT", "T2TT"]: | |
tokens_ids = model.generate(**input_data, generate_speech=False, tgt_lang=target_language_code, num_beams=5, do_sample=True)[0].cpu().squeeze().detach().tolist() | |
else: | |
output = model.generate(**input_data, return_intermediate_token_ids=True, tgt_lang=target_language_code, num_beams=5, do_sample=True, spkr_id=LANG_TO_SPKR_ID[target_language_code][0]) | |
waveform = output.waveform.cpu().squeeze().detach().numpy() | |
tokens_ids = output.sequences.cpu().squeeze().detach().tolist() | |
text_out = processor.decode(tokens_ids, skip_special_tokens=True) | |
if task_name in ["S2ST", "T2ST"]: | |
return (AUDIO_SAMPLE_RATE, waveform), text_out | |
else: | |
return None, text_out | |
def convert_image_to_required_format(image): | |
""" | |
convert image from numpy to base64 | |
""" | |
if type(image) == type(np.array([])): | |
return base64.b64encode(image).decode('utf-8') | |
def process_image_with_openai(image): | |
image_data = convert_image_to_required_format(image) | |
openai_api_key = os.getenv('OPENAI_API_KEY') # Make sure to have this in your .env file | |
data_payload = { | |
"model": "gpt-4-vision-preview", | |
"messages": [ | |
{ | |
"role": "user", | |
"content": image_data | |
} | |
], | |
"max_tokens": 300 | |
} | |
response = requests.post( | |
"https://api.openai.com/v1/chat/completions", | |
headers={ | |
"Content-Type": "application/json", | |
"Authorization": f"Bearer {openai_api_key}" | |
}, | |
json=data_payload | |
) | |
if response.status_code == 200: | |
return response.json()['choices'][0]['message']['content'] | |
else: | |
raise Exception(f"OpenAI Error: {response.status_code}") | |
def query_vectara(text): | |
user_message = text | |
# Read authentication parameters from the .env file | |
CUSTOMER_ID = os.getenv('CUSTOMER_ID') | |
CORPUS_ID = os.getenv('CORPUS_ID') | |
API_KEY = os.getenv('API_KEY') | |
# Define the headers | |
api_key_header = { | |
"customer-id": CUSTOMER_ID, | |
"x-api-key": API_KEY | |
} | |
# Define the request body in the structure provided in the example | |
request_body = { | |
"query": [ | |
{ | |
"query": user_message, | |
"queryContext": "", | |
"start": 1, | |
"numResults": 50, | |
"contextConfig": { | |
"charsBefore": 0, | |
"charsAfter": 0, | |
"sentencesBefore": 2, | |
"sentencesAfter": 2, | |
"startTag": "%START_SNIPPET%", | |
"endTag": "%END_SNIPPET%", | |
}, | |
"rerankingConfig": { | |
"rerankerId": 272725718, | |
"mmrConfig": { | |
"diversityBias": 0.35 | |
} | |
}, | |
"corpusKey": [ | |
{ | |
"customerId": CUSTOMER_ID, | |
"corpusId": CORPUS_ID, | |
"semantics": 0, | |
"metadataFilter": "", | |
"lexicalInterpolationConfig": { | |
"lambda": 0 | |
}, | |
"dim": [] | |
} | |
], | |
"summary": [ | |
{ | |
"maxSummarizedResults": 5, | |
"responseLang": "auto", | |
"summarizerPromptName": "vectara-summary-ext-v1.2.0" | |
} | |
] | |
} | |
] | |
} | |
# Make the API request using Gradio | |
response = requests.post( | |
"https://api.vectara.io/v1/query", | |
json=request_body, # Use json to automatically serialize the request body | |
verify=True, | |
headers=api_key_header | |
) | |
if response.status_code == 200: | |
query_data = response.json() | |
if query_data: | |
sources_info = [] | |
# Extract the summary. | |
summary = query_data['responseSet'][0]['summary'][0]['text'] | |
# Iterate over all response sets | |
for response_set in query_data.get('responseSet', []): | |
# Extract sources | |
for source in response_set.get('response', [])[:5]: # Limit to top 5 sources. | |
source_metadata = source.get('metadata', []) | |
source_info = {} | |
for metadata in source_metadata: | |
metadata_name = metadata.get('name', '') | |
metadata_value = metadata.get('value', '') | |
if metadata_name == 'title': | |
source_info['title'] = metadata_value | |
elif metadata_name == 'author': | |
source_info['author'] = metadata_value | |
elif metadata_name == 'pageNumber': | |
source_info['page number'] = metadata_value | |
if source_info: | |
sources_info.append(source_info) | |
result = {"summary": summary, "sources": sources_info} | |
return f"{json.dumps(result, indent=2)}" | |
else: | |
return "No data found in the response." | |
else: | |
return f"Error: {response.status_code}" | |
def convert_to_markdown(vectara_response_json): | |
vectara_response = json.loads(vectara_response_json) | |
if vectara_response: | |
summary = vectara_response.get('summary', 'No summary available') | |
sources_info = vectara_response.get('sources', []) | |
# Format the summary as Markdown | |
markdown_summary = f'**Summary:** {summary}\n\n' | |
# Format the sources as a numbered list | |
markdown_sources = "" | |
for i, source_info in enumerate(sources_info): | |
author = source_info.get('author', 'Unknown author') | |
title = source_info.get('title', 'Unknown title') | |
page_number = source_info.get('page number', 'Unknown page number') | |
markdown_sources += f"{i+1}. {title} by {author}, Page {page_number}\n" | |
return f"{markdown_summary}**Sources:**\n{markdown_sources}" | |
else: | |
return "No data found in the response." | |
# Main function to handle the Gradio interface logic | |
def process_and_query(text, image,audio): | |
try: | |
# If an image is provided, process it with OpenAI and use the response as the text query for Vectara | |
if image is not None: | |
text = process_image_with_openai(image) | |
# Now, use the text (either provided by the user or obtained from OpenAI) to query Vectara | |
vectara_response_json = query_vectara(text) | |
markdown_output = convert_to_markdown(vectara_response_json) | |
return markdown_output | |
except Exception as e: | |
return str(e) | |
# Define the Gradio interface | |
iface = gr.Interface( | |
fn=process_and_query, | |
inputs=[ | |
gr.Textbox(label="Input Text"), | |
gr.Image(label="Upload Image"), | |
gr.Audio(sources="microphone"), | |
], | |
outputs=[gr.Markdown(label="Output Text")], | |
title="👋🏻Welcome to ⚕🗣️😷MultiMed - Access Chat ⚕🗣️😷", | |
description = ''' | |
### How To Use ⚕🗣️😷MultiMed⚕: | |
#### 🗣️📝Interact with ⚕🗣️😷MultiMed⚕ in any language using audio or text! | |
#### 🗣️📝 This is an educational and accessible conversational tool to improve wellness and sanitation in support of public health. | |
#### 📚🌟💼 The knowledge base is composed of publicly available medical and health sources in multiple languages. We also used [Kelvalya/MedAware](https://huggingface.co/datasets/keivalya/MedQuad-MedicalQnADataset) that we processed and converted to HTML. The quality of the answers depends on the quality of the dataset, so if you want to see some data represented here, do [get in touch](https://discord.gg/GWpVpekp). You can also use 😷MultiMed⚕️ on your own data & in your own way by cloning this space. 🧬🔬🔍 Simply click here: <a style="display:inline-block" href="https://huggingface.co/spaces/TeamTonic/MultiMed?duplicate=true"><img src="https://img.shields.io/badge/-Duplicate%20Space-blue?labelColor=white&style=flat&logo=&logoWidth=14" alt="Duplicate Space"></a></h3> | |
#### Join us : 🌟TeamTonic🌟 is always making cool demos! Join our active builder's🛠️community on 👻Discord: [Discord](https://discord.gg/GWpVpekp) On 🤗Huggingface: [TeamTonic](https://huggingface.co/TeamTonic) & [MultiTransformer](https://huggingface.co/MultiTransformer) On 🌐Github: [Polytonic](https://github.com/tonic-ai) & contribute to 🌟 [PolyGPT](https://github.com/tonic-ai/polygpt-alpha)" | |
''', | |
theme='ParityError/Anime', | |
examples=[ | |
["What is the proper treatment for buccal herpes?"], | |
["Male, 40 presenting with swollen glands and a rash"], | |
["How does cellular metabolism work TCA cycle"], | |
["What special care must be provided to children with chicken pox?"], | |
["When and how often should I wash my hands ?"], | |
["بکل ہرپس کا صحیح علاج کیا ہے؟"], | |
["구강 헤르페스의 적절한 치료법은 무엇입니까?"], | |
["Je, ni matibabu gani sahihi kwa herpes ya buccal?"], | |
], | |
) | |
iface.launch() |