import io from fastapi import FastAPI, File, UploadFile from fastapi import FastAPI, Request, Response, BackgroundTasks import subprocess import os import requests import random # import undetected_chromedriver as uc import shutil import json # from transformers import AutoModelForQuestionAnswering, AutoTokenizer, pipeline from pydantic import BaseModel from typing import Annotated from fastapi import Form import selenium from selenium import webdriver from selenium.webdriver import ChromeOptions from selenium.webdriver.chrome.service import Service import threading import random import string from selenium.webdriver.common.keys import Keys import time # from selenium.webdriver.firefox.options import Options # options = FirefoxOptions() # options.headless = True # service = Service() # driver = webdriver.Firefox(options= options,service=service) # driver.get("https://yuntian-deng-chatgpt.hf.space/") # driver.get("https://yuntian-deng-chatgpt.hf.space/") class Query(BaseModel): text: str host:str from fastapi import FastAPI, Request, Depends, UploadFile, File from fastapi.exceptions import HTTPException from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import JSONResponse app = FastAPI() app.add_middleware( CORSMiddleware, allow_origins=['*'], allow_credentials=True, allow_methods=['*'], allow_headers=['*'], ) # cred = credentials.Certificate('key.json') # app1 = firebase_admin.initialize_app(cred) # db = firestore.client() # data_frame = pd.read_csv('data.csv') from selenium.webdriver.common.by import By from pymongo.mongo_client import MongoClient @app.on_event("startup") async def startup_event(): x=requests.get('https://open-ai-ping-eight.vercel.app/proxy') print("response ", x.text) print("on startup") # t = threading.Thread(target=makeqimg) # t.start() proxy =None llama_last_error=0 # LLAMA chrome_driver_path = '/usr/local/bin/chromedriver-linux64/chromedriver' @app.post("/llama") async def get_answer_llama(request: Request ): data = await request.json() text = data['text'] # print("recived ",text) res= "" res= do_ML_LLAMA70b(text,0) dict={"LLAMA":res} return JSONResponse(dict) def do_ML_LLAMA70b(text:str, trycount:int): starttime=time.time() options = ChromeOptions() global llama_last_error options.add_argument('--no-sandbox') options.add_argument('-headless') options.add_argument("start-maximized") service = Service(executable_path=chrome_driver_path) driver = webdriver.Chrome(options= options,service=service) driver.get("https://huggingface-projects-llama-2-7b-chat.hf.space") try: while True: currtime= time.time() if(currtime>starttime+30): driver.delete_all_cookies() driver.quit() return "Requested Could not be proceed" try: # span_element = driver.find_element(By.CLASS_NAME, 'svelte-s1r2yt') # span_element.click() # input_element = driver.find_elements(By.CSS_SELECTOR, 'input[data-testid="number-input"]') # new_value = "2000" # input_element[1].clear() # input_element[1].send_keys(new_value) xpath_expression = '//textarea[@data-testid="textbox"]' textarea_element = driver.find_element(By.XPATH, xpath_expression) for line in text.split('\n'): textarea_element.send_keys(line) textarea_element.send_keys(Keys.SHIFT + Keys.ENTER) textarea_element.send_keys('\n') break except Exception as e: print(e) continue prev ="" time.sleep(2) while True: time.sleep(0.5) currtime= time.time() if(currtime>starttime+170): driver.delete_all_cookies() driver.quit() return prev element_selector = '.wrap.default.minimal.svelte-119qaqt.translucent.generating' try: element = driver.find_element(By.CSS_SELECTOR,element_selector) print("generating") continue except: print("Element is not present.") parent_element = driver.find_elements(By.CLASS_NAME,'message') extracted_text = parent_element[1].text element_text= extracted_text if element_text=="" or element_text=='Loading content ': pritn("empty text exception") continue driver.quit() return element_text driver.delete_all_cookies() driver.quit() llama_last_error= time.time() return " --Error Occurred-- " except Exception as e: print(e) driver.delete_all_cookies() driver.quit() llama_last_error= time.time() return "Error Occurred " ########## MPT @app.post("/mpt") async def get_answer_mpt(request: Request ): data = await request.json() text = data['text'] print("recived ",text) res= do_ML_MPT(text,0) dict={"MPT":res} return JSONResponse(dict) def do_ML_MPT(text:str, trycount:int): starttime=time.time() options = ChromeOptions() options.add_argument('--no-sandbox') options.add_argument('-headless') options.add_argument("start-maximized") service = Service(executable_path=chrome_driver_path) driver = webdriver.Chrome(options= options,service=service) driver.get("https://mosaicml-mpt-30b-chat.hf.space") try: while True: currtime= time.time() if(currtime>starttime+20): driver.delete_all_cookies() driver.quit() return "Requested Could not be proceed" try: textarea_xpath = "//textarea[@data-testid='textbox' and @class='scroll-hide svelte-1pie7s6' and @placeholder='Chat Message Box']" textarea_element = driver.find_element(By.XPATH,textarea_xpath) for line in text.split('\n'): textarea_element.send_keys(line) # Simulate pressing the 'Shift + Enter' keys to add a newline without triggering submit textarea_element.send_keys(Keys.SHIFT + Keys.ENTER) break except: continue while True: currtime= time.time() if(currtime>starttime+20): driver.delete_all_cookies() driver.quit() return "Requested Could not be proceed" try: button_id = 'component-9' # Replace 'comonent-7' with the actual ID button_element = driver.find_element(By.ID,button_id) # Perform actions on the element (e.g., clicking the button) button_element.click() break except Exception as e: print(e) time.sleep(0.2) prev ="" # time.sleep(2) while True: time.sleep(0.5) currtime= time.time() if(currtime>starttime+120): driver.delete_all_cookies() driver.quit() return "Requested Could not be proceed" # value="" try: element = driver.find_element(By.XPATH,'//div[@data-testid="bot" and contains(@class, "message bot")]') x=(element.text) print("From text ",x) if x=="": raise ValueError(" k") driver.quit() return x except Exception as e: print(e) continue driver.quit() return " --Error Occurred-- " except: print("Error") if trycount>1: driver.delete_all_cookies() driver.quit() return "Error" driver.quit() return do_ML_MPT(text,trycount+1) '''Falcon 40 b intruct''' '''Star Code''' @app.post("/starcode") async def get_answer_falcon(request: Request ): data = await request.json() text = data['text'] print("recived ",text) res= do_ML_STARCODE(text,0) dict={"RESULT":res} return JSONResponse(dict) def do_ML_STARCODE(text:str, trycount:int): starttime=time.time() options = ChromeOptions() options.add_argument('--no-sandbox') options.add_argument('-headless') options.add_argument("start-maximized") service = Service(executable_path=chrome_driver_path) driver = webdriver.Chrome(options= options,service=service) driver.get("https://huggingfaceh4-starchat-playground.hf.space/") try: while True: currtime= time.time() if(currtime>starttime+20): driver.delete_all_cookies() driver.quit() return "Requested Could not be proceed" try: textarea_element = driver.find_element(By.CSS_SELECTOR,'textarea[placeholder="Enter your message here"]') for line in text.split('\n'): textarea_element.send_keys(line) # Simulate pressing the 'Shift + Enter' keys to add a newline without triggering submit textarea_element.send_keys(Keys.SHIFT + Keys.ENTER) time.sleep(0.5) textarea_element.send_keys('\n') break except: continue prev ="" # time.sleep(2) while True: time.sleep(0.5) currtime= time.time() if(currtime>starttime+170): driver.delete_all_cookies() driver.quit() return "Requested Could not be proceed" try: div_element = driver.find_element(By.CLASS_NAME,'svelte-j1gjts.generating') continue except: pass # value="" try: div_element = driver.find_element(By.CSS_SELECTOR,'div[data-testid="bot"]') text_content = div_element.text print(text_content) driver.delete_all_cookies() driver.quit() return text_content except Exception as e: print(e) continue driver.quit() return " --Error Occurred-- " except: print("Error") driver.delete_all_cookies() driver.quit() return "Error Occureed" @app.post("/falcon180b") async def get_answer_falcon180(request: Request, background_tasks: BackgroundTasks ): data = await request.json() text= data['text'] print("recievoed , ", text ) return StreamingResponse(do_falcon180(text), media_type="text/plain") import asyncio from fastapi.responses import StreamingResponse async def do_falcon180(text): starttime=time.time() options = ChromeOptions() options.add_argument('--no-sandbox') options.add_argument('-headless') options.add_argument("start-maximized") service = Service(executable_path=chrome_driver_path) driver = webdriver.Chrome(options= options,service=service) # driver.get("https://tiiuae-falcon-180b-demo.hf.space") # driver.get("https://ysharma-falcon-180b-demo.hf.space/") driver.get("https://lunarflu-falcon-180b-demo-duplicate.hf.space") try: while True: try: curr= time.time() if curr- starttime>20: break xpath_expression = "//span[@class='svelte-s1r2yt' and contains(text(), 'Additional Inputs')]" element = driver.find_element(By.XPATH, xpath_expression) element.click() element = driver.find_elements(By.CSS_SELECTOR, 'input[data-testid="number-input"]') element[1].clear() # Clear any existing value element[1].send_keys('1900') textarea_element = driver.find_element(By.XPATH,'//textarea[@placeholder="Type a message..."]') for line in text.split('\n'): textarea_element.send_keys(line) textarea_element.send_keys(Keys.SHIFT + Keys.ENTER) textarea_element.send_keys('\n') break except Exception as e: print(e) continue old_text="" while True: # await asyncio.sleep(0.1) curr= time.time() if curr- starttime>180: driver.quit() break done = False try: xpath_expression = "//span[contains(@class, 'chatbot')]" element = driver.find_elements(By.XPATH, xpath_expression) text = element[1].text # print(text) while True: text = element[1].text send_text= text[len(old_text):] old_text= text yield send_text.encode() try: element = driver.find_element(By.CLASS_NAME,"generating") # print("generating") except: # print("generated") driver.quit() done= True break if done: break except Exception as e: print(e) continue except: driver.quit() @app.post("/wizlm70b") async def get_answer_falcon180(request: Request): data = await request.json() text= data['text'] print("recievoed , ", text ) return do_wizard70b(text) def do_wizard70b(text): starttime=time.time() options = ChromeOptions() options.add_argument('--no-sandbox') options.add_argument('-headless') options.add_argument("start-maximized") service = Service(executable_path=chrome_driver_path) driver = webdriver.Chrome(options= options,service=service) driver.get("https://chat.lmsys.org/") while True: curr= time.time() if curr- starttime>10: driver.delete_all_cookies() driver.quit() return "Error-- 1" try: button = driver.find_element(By.XPATH, "//button[contains(@class, 'svelte-kqij2n') and text()='Single Model']") button.click() break except Exception as e: print(e) time.sleep(0.5) # return driver.page_source # while True: # curr= time.time() # if curr- starttime>50: # driver.delete_all_cookies() # driver.quit() # return "Error-- 2" # try: # input_element = driver.find_elements(By.CLASS_NAME,'border-none') # # Send text to the element # input_element[2].clear() # input_element[2].send_keys('wizardlm-70b\n') # span_element = driver.find_elements(By.XPATH,'//span[text()="Parameters"]') # span_element[2].click() # input_element = driver.find_elements(By.XPATH ,'//input[@data-testid="number-input"]') # input_element[8].clear() # Clear any existing value # input_element[8].send_keys('1024') # break # except Exception as e: # print(e) # time.sleep(0.5) time.sleep(1) while True: curr= time.time() if curr- starttime>60: driver.delete_all_cookies() driver.quit() return "Error-- 3" try: textarea = driver.find_elements(By.XPATH,'//textarea[@data-testid="textbox"]') # You can also use other locators like by_id, by_name, by_class_name, etc. textarea[2].clear() for line in text.split('\n'): textarea[2].send_keys(line) textarea[2].send_keys('\n') except Exception as e: print(e) time.sleep(0.5) time.sleep(2) while True: curr= time.time() if curr- starttime>178: driver.delete_all_cookies() driver.quit() return "Error-- 3 -- timeout" try: element = driver.find_element(By.CSS_SELECTOR, '.generating') print("geberating") time.sleep(0.5) except: print("Element with 'generating' keyword in class is not present.") div_element = driver.find_element(By.XPATH,'//div[@data-testid="bot"]') extracted_text = div_element.text driver.delete_all_cookies() driver.quit() return extracted_text #####PALM 2 @app.post("/palm2") async def get_answer_llama(request: Request ): data = await request.json() text = data['text'] # print("recived ",text) res= "" res= do_ML_PALM(text,0) dict={"BOT":res} return JSONResponse(dict) def do_ML_PALM(text:str, trycount:int): starttime=time.time() options = ChromeOptions() options.add_argument('--no-sandbox') options.add_argument('-headless') options.add_argument("start-maximized") service = Service(executable_path=chrome_driver_path) driver = webdriver.Chrome(options= options,service=service) driver.get('https://chansung-palm-with-gradio-chat.hf.space') try: while True: curr = time.time() if curr-starttime>178: driver.delete_all_cookies() driver.quit() return 'Error -1' try: xpath = "//textarea[@data-testid='textbox' and @placeholder='Ask anything']" textarea = driver.find_element(By.XPATH,xpath) textarea.clear() for line in text.split('\n'): textarea.send_keys(line) textarea.send_keys("\n") break except: continue time.sleep(1.5) while True: if curr-starttime>178: driver.delete_all_cookies() driver.quit() return 'Error -2' try: div_element = driver.find_element(By.CSS_SELECTOR,'div.wrap.default.full.svelte-zlszon.generating') continue except: print('fail') try: div_element = driver.find_elements(By.CLASS_NAME,'md.svelte-9tftx4.chatbot') div_element= div_element[1] # Extract and print the text content of the div element div_text = div_element.text if div_text=="": continue driver.delete_all_cookies() driver.quit() return div_text except: time.sleep(0.5) continue except Exception as e: print(e) driver.delete_all_cookies() driver.quit() return "Error 3"