Spaces:
Sleeping
Sleeping
from flask import Flask,request,make_response | |
import os | |
import logging | |
from dotenv import load_dotenv | |
from heyoo import WhatsApp | |
import random | |
import shutil | |
from tempfile import NamedTemporaryFile | |
import assemblyai as aai | |
from pandasai.llm import GoogleGemini | |
from pandasai import SmartDataframe | |
from pandasai import Agent | |
from pandasai.responses.response_parser import ResponseParser | |
from langchain_experimental.agents import create_pandas_dataframe_agent | |
import pandas as pd | |
from langchain_google_genai import GoogleGenerativeAI | |
from langchain_google_genai.chat_models import ChatGoogleGenerativeAI | |
import requests | |
import base64 | |
from pandasai.helpers import path | |
import uuid | |
import pandasai as pai | |
# load env data | |
load_dotenv() | |
''' | |
def modify_file_path(file_path): | |
guid = uuid.uuid4() | |
print(guid) | |
guid = str(uuid.uuid4()) | |
new_filename = f"{guid}temp_chart.png" | |
new_file_path = os.path.join("/home/runner/Flasktestrairo/plots/", new_filename) | |
# Copy the file | |
shutil.copy2(file_path, new_file_path) | |
return new_file_path | |
''' | |
guid = uuid.uuid4() | |
new_filename = f"{guid}" | |
user_defined_path = os.path.join("/plots/", new_filename) | |
class FlaskResponse(ResponseParser): | |
def __init__(self, context) -> None: | |
super().__init__(context) | |
def format_dataframe(self, result): | |
return result['value'].to_html() | |
def format_plot(self, result): | |
# Save the plot using savefig | |
try: | |
img_path = result['value'] | |
except ValueError: | |
img_path = str(result['value']) | |
print("value error!", img_path) | |
print("response_class_path:", img_path) | |
return img_path | |
def format_other(self, result): | |
return str(result['value']) | |
# messenger object | |
messenger = WhatsApp( | |
os.environ["whatsapp_token"], | |
phone_number_id=os.environ["phone_number_id"] ) | |
aai.settings.api_key = os.environ["aai_key"] | |
transcriber = aai.Transcriber() | |
g_api_key = os.environ["google_api_key"] | |
app = Flask(__name__) | |
VERIFY_TOKEN = "30cca545-3838-48b2-80a7-9e43b1ae8ce4" | |
#df = pd.read_excel("craig.xlsx") | |
df = pd.read_csv("small_business_data2.csv") | |
llm = ChatGoogleGenerativeAI(model="gemini-pro", google_api_key=g_api_key, temperature=0.2) | |
def generateResponse(prompt): | |
llm = GoogleGemini(api_key=g_api_key) | |
pandas_agent = Agent(df,config={"llm":llm, "save_charts_path": user_defined_path,"save_charts": False,"response_parser":FlaskResponse, "enable_cache": False, "conversational":True}) | |
answer = pandas_agent.chat(prompt) | |
return answer | |
img_ID = "344744a88ad1098" | |
img_secret = "3c542a40c215327045d7155bddfd8b8bc84aebbf" | |
url = "https://api.imgur.com/3/image" | |
headers = {"Authorization": f"Client-ID {img_ID}"} | |
def respond(query_str:str): | |
response = "hello, I don't have a brain yet" | |
return response | |
def hook(): | |
if request.method == "GET": | |
if request.args.get("hub.verify_token") == VERIFY_TOKEN: | |
logging.info("Verified webhook") | |
response = make_response(request.args.get("hub.challenge"), 200) | |
response.mimetype = "text/plain" | |
return response | |
logging.error("Webhook Verification failed") | |
return "Invalid verification token" | |
# get message update.. | |
data = request.get_json() | |
changed_field = messenger.changed_field(data) | |
if changed_field == "messages": | |
new_message = messenger.get_mobile(data) | |
if new_message: | |
mobile = messenger.get_mobile(data) | |
message_type = messenger.get_message_type(data) | |
if message_type == "text": | |
message = messenger.get_message(data) | |
# Handle greetings | |
if message.lower() in ("hi", "hello", "help", "how are you"): | |
response = "Hi there! My name is BuzyHelper. How can I help you today?" | |
messenger.send_message(message=f"{response}",recipient_id=mobile) | |
else: | |
response = str(generateResponse(message)) | |
print("Response:", response) | |
logging.info(f"\nAnswer: {response}\n") | |
# Check if response is a string and represents a valid image path | |
if isinstance(response, str) and os.path.isfile(os.path.join(response)): | |
image_path = os.path.join(response) | |
print("My image path:", image_path) | |
with open(image_path, "rb") as file: | |
data = file.read() | |
base64_data = base64.b64encode(data) | |
# Upload image to Imgur and get URL | |
response = requests.post(url, headers=headers, data={"image": base64_data}) | |
url1= response.json()["data"]["link"] | |
print(url1) | |
messenger.send_image(image=url1, recipient_id=mobile) | |
else: | |
# Handle cases where response is not a valid image path | |
messenger.send_message(message=f"{response}", recipient_id=mobile) | |
elif message_type == "audio": | |
audio = messenger.get_audio(data) | |
audio_id, mime_type = audio["id"], audio["mime_type"] | |
audio_url = messenger.query_media_url(audio_id) | |
audio_filename = messenger.download_media(audio_url, mime_type) | |
transcript =transcriber.transcribe(audio_filename) | |
print(audio_filename) | |
print(transcript.text) | |
res = transcript.text | |
logging.info(f"\nAudio: {audio}\n") | |
response = str(generateResponse(res)) | |
if isinstance(response, str): | |
messenger.send_message(message=f"{response}", recipient_id=mobile) | |
elif isinstance(response, str) and os.path.isfile(response): | |
messenger.send_image(image_path=response, recipient_id=mobile) | |
else: | |
messenger.send_message(message="Please send me text or audio messages",recipient_id=mobile) | |
return "ok" | |
if __name__ == '__main__': | |
app.run(debug=True,host="0.0.0.0", port=7860) |