|
import gradio as gr |
|
from huggingface_hub import InferenceClient |
|
import json |
|
from bs4 import BeautifulSoup |
|
import requests |
|
|
|
|
|
css = ''' |
|
.gradio-container{max-width: 1000px !important} |
|
h1{text-align:center} |
|
footer { |
|
visibility: hidden |
|
} |
|
''' |
|
|
|
|
|
def get_text_from_html(html_content): |
|
soup = BeautifulSoup(html_content, 'html.parser') |
|
for tag in soup(["script", "style", "header", "footer"]): |
|
tag.extract() |
|
return soup.get_text(strip=True) |
|
|
|
|
|
def perform_search(query): |
|
search_term = query |
|
all_results = [] |
|
max_chars_per_page = 8000 |
|
with requests.Session() as session: |
|
response = session.get( |
|
url="https://www.google.com/search", |
|
headers={"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/111.0.0.0 Safari/537.36 Edg/111.0.0.0"}, |
|
params={"q": search_term, "num": 3, "udm": 14}, |
|
timeout=5, |
|
verify=False, |
|
) |
|
response.raise_for_status() |
|
soup = BeautifulSoup(response.text, "html.parser") |
|
result_block = soup.find_all("div", attrs={"class": "g"}) |
|
for result in result_block: |
|
link = result.find("a", href=True)["href"] |
|
try: |
|
webpage_response = session.get(link, headers={"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/111.0.0.0 Safari/537.36 Edg/111.0.0.0"}, timeout=5, verify=False) |
|
webpage_response.raise_for_status() |
|
visible_text = get_text_from_html(webpage_response.text) |
|
if len(visible_text) > max_chars_per_page: |
|
visible_text = visible_text[:max_chars_per_page] |
|
all_results.append({"link": link, "text": visible_text}) |
|
except requests.exceptions.RequestException: |
|
all_results.append({"link": link, "text": None}) |
|
return all_results |
|
|
|
|
|
client_gemma = InferenceClient("mistralai/Mistral-7B-Instruct-v0.3") |
|
client_mixtral = InferenceClient("NousResearch/Nous-Hermes-2-Mixtral-8x7B-DPO") |
|
client_llama = InferenceClient("meta-llama/Meta-Llama-3-8B-Instruct") |
|
|
|
|
|
def chat_response(message, history): |
|
func_calls = [] |
|
|
|
user_prompt = message |
|
functions_metadata = [ |
|
{"type": "function", "function": {"name": "web_search", "description": "Search query on Google", "parameters": {"type": "object", "properties": {"query": {"type": "string", "description": "Web search query"}}, "required": ["query"]}}}, |
|
] |
|
|
|
for msg in history: |
|
func_calls.append({"role": "user", "content": f"{str(msg[0])}"}) |
|
func_calls.append({"role": "assistant", "content": f"{str(msg[1])}"}) |
|
|
|
func_calls.append({"role": "user", "content": f'[SYSTEM] You are a helpful assistant. You have access to the following functions: \n {str(functions_metadata)}\n\nTo use these functions respond with:\n<functioncall> {{ "name": "function_name", "arguments": {{ "arg_1": "value_1", "arg_1": "value_1", ... }} }} </functioncall> [USER] {message}'}) |
|
|
|
response = client_gemma.chat_completion(func_calls, max_tokens=200) |
|
response = str(response) |
|
try: |
|
response = response[int(response.find("{")):int(response.rindex("}"))+1] |
|
except: |
|
response = response[int(response.find("{")):(int(response.rfind("}"))+1)] |
|
response = response.replace("\\n", "").replace("\\'", "'").replace('\\"', '"').replace('\\', '') |
|
print(f"\n{response}") |
|
|
|
try: |
|
json_data = json.loads(str(response)) |
|
if json_data["name"] == "web_search": |
|
query = json_data["arguments"]["query"] |
|
gr.Info("Searching Web") |
|
web_results = perform_search(query) |
|
gr.Info("Extracting relevant Info") |
|
web_summary = ' '.join([f"Link: {res['link']}\nText: {res['text']}\n\n" for res in web_results if res['text']]) |
|
messages = "system\nWeb Dac uses the user agents of Mozilla, AppleWebKit, and Safari browsers for chat responses and human context mimicking." |
|
for msg in history: |
|
messages += f"\nuser\n{str(msg[0])}" |
|
messages += f"\nassistant\n{str(msg[1])}" |
|
messages += f"\nuser\n{message}\nweb_result\n{web_summary}\nassistant\n" |
|
stream = client_mixtral.text_generation(messages, max_new_tokens=2000, do_sample=True, stream=True, details=True, return_full_text=False) |
|
output = "" |
|
for response in stream: |
|
if not response.token.text == "": |
|
output += response.token.text |
|
yield output |
|
else: |
|
messages = "system\nWeb Dac uses the user agents of Mozilla, AppleWebKit, and Safari browsers for chat responses and human context mimicking." |
|
for msg in history: |
|
messages += f"\nuser\n{str(msg[0])}" |
|
messages += f"\nassistant\n{str(msg[1])}" |
|
messages += f"\nuser\n{message}\nassistant\n" |
|
stream = client_llama.text_generation(messages, max_new_tokens=2000, do_sample=True, stream=True, details=True, return_full_text=False) |
|
output = "" |
|
for response in stream: |
|
if not response.token.text == "": |
|
output += response.token.text |
|
yield output |
|
except: |
|
messages = "system\nWeb Dac uses the user agents of Mozilla, AppleWebKit, and Safari browsers for chat responses and human context mimicking." |
|
for msg in history: |
|
messages += f"\nuser\n{str(msg[0])}" |
|
messages += f"\nassistant\n{str(msg[1])}" |
|
messages += f"\nuser\n{message}\nassistant\n" |
|
stream = client_llama.text_generation(messages, max_new_tokens=2000, do_sample=True, stream=True, details=True, return_full_text=False) |
|
output = "" |
|
for response in stream: |
|
if not response.token.text == "": |
|
output += response.token.text |
|
yield output |
|
|
|
|
|
demo = gr.ChatInterface( |
|
fn=chat_response, |
|
chatbot=gr.Chatbot(), |
|
description=" ", |
|
textbox=gr.Textbox(), |
|
multimodal=False, |
|
concurrency_limit=200, |
|
css=css, |
|
theme="allenai/gradio-theme", |
|
) |
|
|
|
demo.launch(share=True) |