Spaces:
Sleeping
Sleeping
import os, json, random, requests | |
from livekit import api | |
from fastapi.responses import JSONResponse | |
from twilio.twiml.messaging_response import MessagingResponse | |
from fastapi.middleware.cors import CORSMiddleware | |
from fastapi import FastAPI, Request | |
from fastapi.responses import Response | |
from pydantic import BaseModel, Field | |
from dotenv import load_dotenv | |
load_dotenv() | |
LIVEKIT_API_KEY_LOCAL = os.getenv('LIVEKIT_API_KEY_LOCAL') | |
LIVEKIT_API_SECRET_LOCAL = os.getenv('LIVEKIT_API_SECRET_LOCAL') | |
LIVEKIT_WEBSCOKET_URL_LOCAL = os.getenv('LIVEKIT_WEBSCOKET_URL_LOCAL') | |
LIVEKIT_API_KEY_STAGE = os.getenv('LIVEKIT_API_KEY_STAGE') | |
LIVEKIT_API_SECRET_STAGE = os.getenv('LIVEKIT_API_SECRET_STAGE') | |
LIVEKIT_WEBSCOKET_URL_STAGE = os.getenv('LIVEKIT_WEBSCOKET_URL_STAGE') | |
LIVEKIT_API_KEY_PROD = os.getenv('LIVEKIT_API_KEY_PROD') | |
LIVEKIT_API_SECRET_PROD = os.getenv('LIVEKIT_API_SECRET_PROD') | |
LIVEKIT_WEBSCOKET_URL_PROD = os.getenv('LIVEKIT_WEBSCOKET_URL_PROD') | |
LIVEKIT_API_KEY_SUWARNA = os.getenv('LIVEKIT_API_KEY_SUWARNA') | |
LIVEKIT_API_SECRET_SUWARNA = os.getenv('LIVEKIT_API_SECRET_SUWARNA') | |
LIVEKIT_WEBSCOKET_URL_SUWARNA = os.getenv('LIVEKIT_URL_SUWARNA') | |
LIVEKIT_API_SECRET_NUNNA = os.getenv('LIVEKIT_API_SECRET_NUNNA') | |
LIVEKIT_API_KEY_NUNNA = os.getenv('LIVEKIT_API_KEY_NUNNA') | |
LIVEKIT_WEBSCOKET_URL_NUNNA = os.getenv('LIVEKIT_WEBSCOKET_URL_NUNNA') | |
app = FastAPI() | |
app.add_middleware( | |
CORSMiddleware, | |
allow_origins=["*"], | |
allow_credentials=True, | |
allow_methods=["*"], | |
allow_headers=["*"], | |
) | |
async def generate_token(request: Request): | |
agent_token = request.query_params.get("agent_token") | |
web_uuid = request.query_params.get("web_uuid") | |
room_name = 'local-'+''.join(random.choices('abcdefghijklmnopqrstuvwxyz0123456789', k=5)) | |
if not LIVEKIT_API_KEY_LOCAL or not LIVEKIT_API_SECRET_LOCAL: | |
raise ValueError("LIVEKIT_API_KEY_LOCAL and LIVEKIT_API_SECRET_LOCAL must be set") | |
metadata = json.dumps({ | |
"agent_token": agent_token, | |
"web_uuid": web_uuid | |
}) | |
identity = "human_local" | |
name = "kickcall_local_name" | |
at = api.AccessToken(LIVEKIT_API_KEY_LOCAL, LIVEKIT_API_SECRET_LOCAL).with_identity(identity).with_name(name).with_metadata(metadata) | |
at.with_grants(api.VideoGrants(room=room_name, room_join=True, can_publish=True, | |
can_publish_data=True, can_subscribe=True, can_update_own_metadata=True)) | |
access_token = at.to_jwt() | |
return JSONResponse(content={ | |
"accessToken": access_token, | |
"url": LIVEKIT_WEBSCOKET_URL_LOCAL, | |
}) | |
async def sms_interactions(request: Request): | |
body = await request.body() | |
body_data = json.loads(body.decode("utf-8")) | |
print(f"body_data: {body_data}") | |
return { "body": "DOne"} | |
async def generate_token(request: Request): | |
agent_token = request.query_params.get("agent_token") | |
web_uuid = request.query_params.get("web_uuid") | |
room_name = 'stage-'+''.join(random.choices('abcdefghijklmnopqrstuvwxyz0123456789', k=7)) | |
if not LIVEKIT_API_KEY_STAGE or not LIVEKIT_API_SECRET_STAGE: | |
raise ValueError("LIVEKIT_API_KEY_STAGE and LIVEKIT_API_SECRET_STAGE must be set") | |
metadata = json.dumps({ | |
"agent_token": agent_token, | |
"web_uuid": web_uuid | |
}) | |
identity = "human_stage" | |
name = "kickcall_stage_name" | |
at = api.AccessToken(LIVEKIT_API_KEY_STAGE, LIVEKIT_API_SECRET_STAGE).with_identity(identity).with_name(name).with_metadata(metadata) | |
at.with_grants(api.VideoGrants(room=room_name, room_join=True, can_publish=True, | |
can_publish_data=True, can_subscribe=True, can_update_own_metadata=True)) | |
access_token = at.to_jwt() | |
return JSONResponse(content={ | |
"accessToken": access_token, | |
"url": LIVEKIT_WEBSCOKET_URL_STAGE, | |
}) | |
async def generate_token(request: Request): | |
agent_token = request.query_params.get("agent_token") | |
web_uuid = request.query_params.get("web_uuid") | |
room_name = ''.join(random.choices('abcdefghijklmnopqrstuvwxyz0123456789', k=7)) | |
if not LIVEKIT_API_KEY_PROD or not LIVEKIT_API_SECRET_PROD: | |
raise ValueError("LIVEKIT_API_KEY_PROD and LIVEKIT_API_SECRET_PROD must be set") | |
metadata = json.dumps({ | |
"agent_token": agent_token, | |
"web_uuid": web_uuid | |
}) | |
identity = "human_prod" | |
name = "kickcall_prod_name" | |
at = api.AccessToken(LIVEKIT_API_KEY_PROD, LIVEKIT_API_SECRET_PROD).with_identity(identity).with_name(name).with_metadata(metadata) | |
at.with_grants(api.VideoGrants(room=room_name, room_join=True, can_publish=True, | |
can_publish_data=True, can_subscribe=True, can_update_own_metadata=True)) | |
access_token = at.to_jwt() | |
return JSONResponse(content={ | |
"accessToken": access_token, | |
"url": LIVEKIT_WEBSCOKET_URL_PROD, | |
}) | |
async def generate_token(request: Request): | |
agent_token = request.query_params.get("agent_token") | |
web_uuid = request.query_params.get("web_uuid") | |
room_name = ''.join(random.choices('abcdefghijklmnopqrstuvwxyz0123456789', k=7)) | |
if not LIVEKIT_API_KEY_SUWARNA or not LIVEKIT_API_SECRET_SUWARNA: | |
raise ValueError("LIVEKIT_API_KEY and LIVEKIT_API_SECRET must be set") | |
metadata = json.dumps({ | |
"agent_token": agent_token, | |
"web_uuid": web_uuid | |
}) | |
identity = "human_suwarna" | |
name = "suwarna" | |
at = api.AccessToken(LIVEKIT_API_KEY_SUWARNA, LIVEKIT_API_SECRET_SUWARNA).with_identity(identity).with_name(name).with_metadata(metadata) | |
at.with_grants(api.VideoGrants(room=room_name, room_join=True, can_publish=True, | |
can_publish_data=True, can_subscribe=True, can_update_own_metadata=True)) | |
access_token = at.to_jwt() | |
return JSONResponse(content={ | |
"accessToken": access_token, | |
"url": LIVEKIT_WEBSCOKET_URL_SUWARNA, | |
}) | |
async def generate_token(request: Request): | |
agent_token = request.query_params.get("agent_token") | |
web_uuid = request.query_params.get("web_uuid") | |
room_name = ''.join(random.choices('abcdefghijklmnopqrstuvwxyz0123456789', k=7)) | |
if not LIVEKIT_API_KEY_NUNNA or not LIVEKIT_API_SECRET_NUNNA: | |
raise ValueError("LIVEKIT_API_KEY and LIVEKIT_API_SECRET must be set") | |
metadata = json.dumps({ | |
"agent_token": agent_token, | |
"web_uuid": web_uuid | |
}) | |
identity = "human_lakshmi" | |
name = "laksmi" | |
at = api.AccessToken(LIVEKIT_API_KEY_NUNNA, LIVEKIT_API_SECRET_NUNNA).with_identity(identity).with_name(name).with_metadata(metadata) | |
at.with_grants(api.VideoGrants(room=room_name, room_join=True, can_publish=True, | |
can_publish_data=True, can_subscribe=True, can_update_own_metadata=True)) | |
access_token = at.to_jwt() | |
return JSONResponse(content={ | |
"accessToken": access_token, | |
"url": LIVEKIT_WEBSCOKET_URL_NUNNA, | |
}) | |
class MessageRequestValidate(BaseModel): | |
from_phone: str = Field(default="+917389058485") | |
async def realtor_validate_user(request: MessageRequestValidate): | |
print(request) | |
if request.from_phone == "+917389058485": | |
return JSONResponse(content="User is verified") | |
return JSONResponse(content="user is not verified") | |
class MessageRequestSignUP(BaseModel): | |
from_phone: str = Field(default="+917389058485") | |
agent_id: str = Field(default="agent_id1") | |
full_name: str = Field(default="agent_full_name") | |
broker_id: str = Field(default="broker_id1") | |
async def realtor_sign_up(request: MessageRequestSignUP): | |
print(request) | |
return JSONResponse(content="user is signed up sucessfully now you can contine to ask propertry related questions") | |
async def whatsapp_webhook(request: Request): | |
form_data = await request.form() | |
num_media = int(form_data.get('NumMedia', 0)) | |
sender_number = form_data.get("From")[9:] | |
profile_name = form_data.get("ProfileName") | |
user_query = form_data.get("Body") | |
print(f"num_media: {num_media}, sender_number: {sender_number}, ProfileName: {profile_name}, user_query: {user_query}") | |
payload = { | |
"input_value": json.dumps({"from_phone": sender_number, "msg": user_query}), | |
"session_id": sender_number | |
} | |
print(f"Payload: {payload}") | |
headers = { | |
'Content-Type': 'application/json' | |
} | |
url = "https://workflows.kickcall.ai/api/v1/run/c94b14f5-d7a7-4fc8-89d8-eb672716c778?stream=false&user_id=1" | |
try: | |
response = requests.post(url, headers=headers, data=json.dumps(payload)) | |
print(f"[Langflow][run] Response status code received: {response.status_code}") | |
if response.status_code == 200: | |
response_json = response.json() | |
print(f"[Langflow][run] Response: {response_json}") | |
output_text = response_json["outputs"][0]["outputs"][0]["results"]["message"]["data"]["text"] | |
bot_resp = MessagingResponse() | |
msg = bot_resp.message() | |
msg.body(output_text) | |
return Response(content=str(bot_resp), media_type="application/xml") | |
else: | |
print(f"Error: Received status code {response.status_code}") | |
bot_resp = MessagingResponse() | |
msg = bot_resp.message() | |
msg.body("I'm sorry, I couldn't process your request at the moment. Please try again later.") | |
return Response(content=str(bot_resp), media_type="application/xml") | |
except Exception as e: | |
print(f"Error in processing user query: {e}") | |
bot_resp = MessagingResponse() | |
msg = bot_resp.message() | |
msg.body("I'm sorry, an unexpected error occurred. Please try again later.") | |
return Response(content=str(bot_resp), media_type="application/xml") | |
class MessageRequestRunFlow(BaseModel): | |
from_phone: str = Field(default="+917389058485") | |
user_query: str = Field(default="Hello") | |
async def realtor_validate_user(request: MessageRequestRunFlow): | |
print(f"body: {request}") | |
user_query = request.user_query | |
print(f"user_query: {user_query}") | |
payload = { | |
"input_value": json.dumps({"from_phone": request.from_phone, "msg": user_query}), | |
"session_id": request.from_phone | |
} | |
print(f"Payload: {payload}") | |
headers = { | |
'Content-Type': 'application/json' | |
} | |
url = "https://workflows.kickcall.ai/api/v1/run/c94b14f5-d7a7-4fc8-89d8-eb672716c778?stream=false&user_id=1" | |
try: | |
response = requests.post(url, headers=headers, data=json.dumps(payload)) | |
print(f"[Langflow][run] Response status code received: {response.status_code}") | |
if response.status_code == 200: | |
response_json = response.json() | |
print(f"[Langflow][run] Response: {response_json}") | |
output_text = response_json["outputs"][0]["outputs"][0]["results"]["message"]["data"]["text"] | |
return JSONResponse(content=output_text) | |
else: | |
print(f"Error: Received status code {response.status_code}") | |
msg ="I'm sorry, I couldn't process your request at the moment. Please try again later." | |
return JSONResponse(content=msg) | |
except Exception as e: | |
print(f"Error in processing user query: {e}") | |
msg = "I'm sorry, an unexpected error occurred. Please try again later." | |
return JSONResponse(content=msg) |