Spaces:
Sleeping
Sleeping
import base64 | |
import itertools | |
import json | |
import os | |
import time | |
import cv2 | |
import numpy as np | |
import pandas_datareader as pdr | |
import pytesseract | |
import speech_recognition as sr | |
from CaesarDetectEntity import CaesarDetectEntity | |
from CaesarHotelBooking.caesarhotelbooking import CaesarHotelBooking | |
from CaesarObjectDetection.CaesarYolo import CaesarYolo | |
from CaesarTranslate import CaesarLangTranslate | |
from CaesarVoice import CaesarVoice | |
from csv_to_db import ImportCSV | |
from flask import Flask, request, send_file | |
from flask_cors import cross_origin | |
from flask_jwt_extended import get_current_user, jwt_required | |
from tqdm import tqdm | |
from transformers import pipeline | |
app = Flask(__name__) | |
app.config['SECRET_KEY'] = 'secret!' | |
importcsv = ImportCSV("CaesarAI") | |
caesaryolo = CaesarYolo() | |
def caesaraihome(): | |
return "Welcome to CaesarAI's API's and CaesarAINL." | |
def caesarobjectdetect(): | |
frames = request.get_json() | |
image = caesaryolo.caesar_object_detect(np.frombuffer(base64.b64decode(frames["frame"]),dtype="uint8").reshape(480,640,3))#base64.b64decode(frames["frame"])) | |
return {'frame': base64.b64encode(image).decode()} | |
def createcaesaraipi(): | |
caesar_api_post = request.get_json() | |
caesarapi_db_exists = list(importcsv.db.caesarapis.find( { "caesarapis" : { "$exists" : "true" } } )) | |
#print(caesarapi_db) | |
if len(caesarapi_db_exists) == 0: | |
importcsv.db.caesarapis.insert_one(caesar_api_post) | |
return {"message":"caesarapi created."} | |
elif len(caesarapi_db_exists) > 0: | |
caesarapi_db = caesarapi_db_exists[0] | |
#print(caesarapi_db) | |
for apis in caesar_api_post["caesarapis"]: | |
if apis not in caesarapi_db["caesarapis"]: | |
caesarapi_db["caesarapis"].append(apis) | |
elif apis in caesarapi_db["caesarapis"]: | |
continue | |
importcsv.db.caesarapis.replace_one({ "caesarapis" : { "$exists" : "true" } },caesarapi_db) | |
return {"message":"caesarapi stored."} | |
def getcaesaraipi(): | |
try: | |
caesarapi_db_exists = list(importcsv.db.caesarapis.find( { "caesarapis" : { "$exists" : "true" } } ))[0] | |
del caesarapi_db_exists["_id"] | |
return caesarapi_db_exists | |
except KeyError as kex: | |
return {"error":f"Api doesn't exist"} | |
def triggerapi(): | |
user_trigger = request.get_json()["user_trigger"] | |
try: | |
caesarapi_db_exists = list(importcsv.db.caesarapis.find( { "caesarapis" : { "$exists" : "true" } } ))[0] | |
except KeyError as kex: | |
return {"error":"Api doesn't exist"} | |
try: | |
triggered_apis = [i for i in caesarapi_db_exists["caesarapis"] if i["triggerwords"] in user_trigger] | |
triggered_api = triggered_apis[0] | |
return triggered_api | |
except (IndexError,KeyError) as kex: | |
return {"message":"sorry couldn't understand what api you want."} | |
def caesaraihotelbookings(): | |
""" | |
params = { | |
"city":city, | |
"checkin_date":"2023-8-01", | |
"checkout_date":"2023-8-08", | |
"purpose":"work", | |
"num_of_adults":10, | |
"num_of_rooms":5, | |
"num_of_children":0, | |
"page_num":i | |
} | |
""" | |
def get_price_range(bookings_json,city,range,): | |
def condition(dic): | |
''' Define your own condition here''' | |
try: | |
price = dic['assumed_final_price'] | |
return price <= range | |
except KeyError as kex: | |
return False | |
#print(bookings_json) | |
bookings = bookings_json[f"{city.lower()}_bookings"] | |
filtered = [d for d in bookings if condition(d)] | |
return filtered | |
try: | |
overall_booking_info = [] | |
hotelbookings_json = request.get_json() | |
try: | |
exclude_whole = hotelbookings_json["exclude_whole"] | |
except KeyError as kex: | |
exclude_whole = None | |
city = hotelbookings_json["city"] | |
price_range = hotelbookings_json["price_range"] | |
print(f"Extracting flight data for {city}...") | |
for i in tqdm(range(1,hotelbookings_json["num_of_pages"]+1)): | |
params = { | |
"city":city, | |
"checkin_date":hotelbookings_json["checkin_date"], | |
"checkout_date":hotelbookings_json["checkout_date"], | |
"purpose":hotelbookings_json["purpose"], | |
"num_of_adults":hotelbookings_json["num_of_adults"], | |
"num_of_rooms":hotelbookings_json["num_of_rooms"], | |
"num_of_children":hotelbookings_json["num_of_children"], | |
"page_num":i | |
} | |
url = CaesarHotelBooking.create_url(**params) | |
bookinginfo = CaesarHotelBooking.caesar_get_hotel_info(url) | |
overall_booking_info.append(bookinginfo) | |
full_bookings = list(itertools.chain(*overall_booking_info)) | |
price_range_bookings = get_price_range({f"{city.lower()}_bookings":full_bookings},city,price_range) | |
if exclude_whole == "true": | |
return {"caesaroutput":{"caesarbookings":price_range_bookings}} | |
#return {f"{city.lower()}_bookings_lower_than_{price_range}":price_range_bookings} | |
return {"caesaroutput":{"caesarbookings":full_bookings}} | |
#return {"caesaroutput":full_bookings} | |
#return {f"{city.lower()}_bookings":full_bookings,f"{city.lower()}_bookings_lower_than_{price_range}":price_range_bookings} | |
except Exception as ex: | |
return {"error":f"{type(ex)}{ex}"} | |
def caesarlangtranslate(): | |
try:# hello | |
if request.method == "POST": | |
translate_json = request.get_json() | |
text = translate_json["caesartranslate"] | |
language = "en" | |
try: | |
responsejson = translate_json["response"] | |
language = translate_json["language"] | |
try: | |
triggerword = translate_json["triggerword"] | |
caesarlang = CaesarDetectEntity() | |
text,language = caesarlang.run(triggerword,text,caesarlang.main_entities[triggerword]) | |
except KeyError as kex: | |
pass | |
if responsejson == "true": | |
response = True | |
elif responsejson == "false": | |
response = False | |
else: | |
response = False | |
except KeyError as kex: | |
response = False | |
caesarlangtranslate = CaesarLangTranslate() | |
original,translation,original_language,destination_language = caesarlangtranslate.translate(text,lang=language,response=response) | |
return {"caesaroutput":translation,"caesartranslation":{"original":original,"translation":translation,"original_language":original_language,"destination_language":destination_language}} | |
elif request.method == "GET": | |
text = request.args.get("text") | |
triggerword = request.args.get("triggerword") | |
responsejson = request.args.get("response") | |
caesarlang = CaesarDetectEntity() | |
try: | |
text,language = caesarlang.run(triggerword,text,caesarlang.main_entities[triggerword]) | |
if responsejson == "true": | |
response = True | |
elif responsejson == "false": | |
response = False | |
else: | |
response = False | |
except KeyError as kex: | |
response = False | |
caesarlangtranslate = CaesarLangTranslate() | |
original,translation,original_language,destination_language = caesarlangtranslate.translate(text,lang=language,response=response) | |
return {"caesaroutput":translation,"caesartranslation":{"original":original,"translation":translation,"original_language":original_language,"destination_language":destination_language}} | |
except Exception as ex: | |
return {"error":f"{type(ex)}-{ex}"} | |
def caesarsr(): | |
if request.method == "POST": | |
transcript = "" | |
try: | |
#print("FORM DATA RECEIVED") | |
file = request.files.get("audio_data") | |
print(file) | |
if file: | |
recognizer = sr.Recognizer() | |
audioFile = sr.AudioFile(file) | |
with audioFile as source: | |
data = recognizer.record(source) | |
transcript = recognizer.recognize_google(data, key=None) | |
return {"message":transcript} | |
elif not file: | |
# TODO Make it show that the .3pg extension is adaptable | |
audiobase64json = request.get_json() | |
decode_bytes = base64.b64decode(audiobase64json["audio_data"].replace("data:video/3gpp;base64,","")) | |
with open("temp.3pg", "wb") as wav_file: | |
wav_file.write(decode_bytes) | |
os.system('ffmpeg -y -i temp.3pg temp.wav') | |
recognizer = sr.Recognizer() | |
audioFile = sr.AudioFile("temp.wav") | |
with audioFile as source: | |
data = recognizer.record(source) | |
transcript = recognizer.recognize_google(data, key=None) | |
return {"message":transcript} | |
except Exception as ex: | |
return {"error":f"{type(ex)}-{ex}"} | |
def caesarvoice(): | |
if request.method == "POST": | |
try: | |
voice_input = request.get_json() | |
try: | |
filename = voice_input["filename"] | |
lang = voice_input["language"] | |
except KeyError as kex: | |
filename = "temp.wav" | |
lang = "en" | |
CaesarVoice.synthesise(voice_input["text"],filename,lang=lang) | |
#if request.method == "POST": | |
# return {"message":transcript} | |
return {"message":"voice syntheized"}# send_file(filename,"audio/x-wav") | |
except Exception as ex: | |
return {"error":f"{type(ex)}-{ex}"} | |
def caesarsummarize(): | |
if request.method == "POST": | |
try: | |
json_input = request.get_json() | |
original_text = json_input["text"] | |
summarization = pipeline("summarization") | |
summary_text = summarization(original_text)[0]['summary_text'] | |
return {"caesaroutput":summary_text}# send_file(filename,"audio/x-wav") | |
except Exception as ex: | |
return {"error":f"{type(ex)}-{ex}"} | |
def caesarstockinfo(): | |
if request.method == "POST": | |
try: | |
json_input = request.get_json() | |
# import AAPL stock price | |
stock_tick = json_input["stock"] | |
start_date = json_input["start_date"] | |
end_date = json_input["end_date"] | |
df = pdr.get_data_yahoo(stock_tick, start=start_date,end=end_date) | |
print(df) | |
return {"caesaroutput":str(df.to_csv(index=False))}# send_file(filename,"audio/x-wav") | |
except Exception as ex: | |
return {"error":f"{type(ex)}-{ex}"} | |
def caesarocr(): | |
def data_uri_to_cv2_img(uri): | |
nparr = np.fromstring(uri, np.uint8) | |
img = cv2.imdecode(nparr, cv2.IMREAD_COLOR) | |
return img | |
if request.method == "POST": | |
try: | |
# read the image using OpenCV | |
# from the command line first argument | |
imagebase64json = request.get_json() | |
#.replace("data:image/jpeg;base64,","").replace("data:image/png;base64,","") | |
data_uri = base64.b64decode(imagebase64json["ocr_data"]) #.replace("'","").replace("b","") | |
#print(data_uri) | |
image = data_uri_to_cv2_img(data_uri) | |
# or you can use Pillow | |
# image = Image.open(sys.argv[1]) | |
# get the string | |
string = pytesseract.image_to_string(image) | |
# print it | |
print(string) | |
# get all data | |
# data = pytesseract.image_to_data(image) | |
# print(data) | |
return {"caesaroutput":string}# send_file(filename,"audio/x-wav") | |
except Exception as ex: | |
return {"error":f"{type(ex)}-{ex}"} | |
def caesarvoiceget(): | |
if request.method == "GET": | |
try: | |
filename = "temp.wav" | |
return send_file(filename,"audio/x-wav") | |
except Exception as ex: | |
return {"error":f"{type(ex)}-{ex}"} | |
if __name__ == "__main__": | |
#port = int(os.environ.get('PORT', 5000)) # 80 | |
app.run(debug=True,host="0.0.0.0",port=7860) | |
#socketio.run(app,debug=True,host="0.0.0.0",port=5000) | |