narendrasinghd's picture
Upload app.py
44a05d4 verified
import os, json, random, requests
from livekit import api
from fastapi.responses import JSONResponse
from twilio.twiml.messaging_response import MessagingResponse
from fastapi.middleware.cors import CORSMiddleware
from fastapi import FastAPI, Request
from fastapi.responses import Response
from pydantic import BaseModel, Field
from dotenv import load_dotenv
load_dotenv()
LIVEKIT_API_KEY_LOCAL = os.getenv('LIVEKIT_API_KEY_LOCAL')
LIVEKIT_API_SECRET_LOCAL = os.getenv('LIVEKIT_API_SECRET_LOCAL')
LIVEKIT_WEBSCOKET_URL_LOCAL = os.getenv('LIVEKIT_WEBSCOKET_URL_LOCAL')
LIVEKIT_API_KEY_STAGE = os.getenv('LIVEKIT_API_KEY_STAGE')
LIVEKIT_API_SECRET_STAGE = os.getenv('LIVEKIT_API_SECRET_STAGE')
LIVEKIT_WEBSCOKET_URL_STAGE = os.getenv('LIVEKIT_WEBSCOKET_URL_STAGE')
LIVEKIT_API_KEY_PROD = os.getenv('LIVEKIT_API_KEY_PROD')
LIVEKIT_API_SECRET_PROD = os.getenv('LIVEKIT_API_SECRET_PROD')
LIVEKIT_WEBSCOKET_URL_PROD = os.getenv('LIVEKIT_WEBSCOKET_URL_PROD')
LIVEKIT_API_KEY_SUWARNA = os.getenv('LIVEKIT_API_KEY_SUWARNA')
LIVEKIT_API_SECRET_SUWARNA = os.getenv('LIVEKIT_API_SECRET_SUWARNA')
LIVEKIT_WEBSCOKET_URL_SUWARNA = os.getenv('LIVEKIT_URL_SUWARNA')
LIVEKIT_API_SECRET_NUNNA = os.getenv('LIVEKIT_API_SECRET_NUNNA')
LIVEKIT_API_KEY_NUNNA = os.getenv('LIVEKIT_API_KEY_NUNNA')
LIVEKIT_WEBSCOKET_URL_NUNNA = os.getenv('LIVEKIT_WEBSCOKET_URL_NUNNA')
app = FastAPI()
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@app.get('/generate_token_local')
async def generate_token(request: Request):
agent_token = request.query_params.get("agent_token")
web_uuid = request.query_params.get("web_uuid")
room_name = 'local-'+''.join(random.choices('abcdefghijklmnopqrstuvwxyz0123456789', k=5))
if not LIVEKIT_API_KEY_LOCAL or not LIVEKIT_API_SECRET_LOCAL:
raise ValueError("LIVEKIT_API_KEY_LOCAL and LIVEKIT_API_SECRET_LOCAL must be set")
metadata = json.dumps({
"agent_token": agent_token,
"web_uuid": web_uuid
})
identity = "human_local"
name = "kickcall_local_name"
at = api.AccessToken(LIVEKIT_API_KEY_LOCAL, LIVEKIT_API_SECRET_LOCAL).with_identity(identity).with_name(name).with_metadata(metadata)
at.with_grants(api.VideoGrants(room=room_name, room_join=True, can_publish=True,
can_publish_data=True, can_subscribe=True, can_update_own_metadata=True))
access_token = at.to_jwt()
return JSONResponse(content={
"accessToken": access_token,
"url": LIVEKIT_WEBSCOKET_URL_LOCAL,
})
@app.post('/sms_interactions/reply_message')
async def sms_interactions(request: Request):
body = await request.body()
body_data = json.loads(body.decode("utf-8"))
print(f"body_data: {body_data}")
return { "body": "DOne"}
@app.get('/generate_token_stage')
async def generate_token(request: Request):
agent_token = request.query_params.get("agent_token")
web_uuid = request.query_params.get("web_uuid")
room_name = 'stage-'+''.join(random.choices('abcdefghijklmnopqrstuvwxyz0123456789', k=7))
if not LIVEKIT_API_KEY_STAGE or not LIVEKIT_API_SECRET_STAGE:
raise ValueError("LIVEKIT_API_KEY_STAGE and LIVEKIT_API_SECRET_STAGE must be set")
metadata = json.dumps({
"agent_token": agent_token,
"web_uuid": web_uuid
})
identity = "human_stage"
name = "kickcall_stage_name"
at = api.AccessToken(LIVEKIT_API_KEY_STAGE, LIVEKIT_API_SECRET_STAGE).with_identity(identity).with_name(name).with_metadata(metadata)
at.with_grants(api.VideoGrants(room=room_name, room_join=True, can_publish=True,
can_publish_data=True, can_subscribe=True, can_update_own_metadata=True))
access_token = at.to_jwt()
return JSONResponse(content={
"accessToken": access_token,
"url": LIVEKIT_WEBSCOKET_URL_STAGE,
})
@app.get('/generate_token_prod')
async def generate_token(request: Request):
agent_token = request.query_params.get("agent_token")
web_uuid = request.query_params.get("web_uuid")
room_name = ''.join(random.choices('abcdefghijklmnopqrstuvwxyz0123456789', k=7))
if not LIVEKIT_API_KEY_PROD or not LIVEKIT_API_SECRET_PROD:
raise ValueError("LIVEKIT_API_KEY_PROD and LIVEKIT_API_SECRET_PROD must be set")
metadata = json.dumps({
"agent_token": agent_token,
"web_uuid": web_uuid
})
identity = "human_prod"
name = "kickcall_prod_name"
at = api.AccessToken(LIVEKIT_API_KEY_PROD, LIVEKIT_API_SECRET_PROD).with_identity(identity).with_name(name).with_metadata(metadata)
at.with_grants(api.VideoGrants(room=room_name, room_join=True, can_publish=True,
can_publish_data=True, can_subscribe=True, can_update_own_metadata=True))
access_token = at.to_jwt()
return JSONResponse(content={
"accessToken": access_token,
"url": LIVEKIT_WEBSCOKET_URL_PROD,
})
@app.get('/generate_token_suwarna')
async def generate_token(request: Request):
agent_token = request.query_params.get("agent_token")
web_uuid = request.query_params.get("web_uuid")
room_name = ''.join(random.choices('abcdefghijklmnopqrstuvwxyz0123456789', k=7))
if not LIVEKIT_API_KEY_SUWARNA or not LIVEKIT_API_SECRET_SUWARNA:
raise ValueError("LIVEKIT_API_KEY and LIVEKIT_API_SECRET must be set")
metadata = json.dumps({
"agent_token": agent_token,
"web_uuid": web_uuid
})
identity = "human_suwarna"
name = "suwarna"
at = api.AccessToken(LIVEKIT_API_KEY_SUWARNA, LIVEKIT_API_SECRET_SUWARNA).with_identity(identity).with_name(name).with_metadata(metadata)
at.with_grants(api.VideoGrants(room=room_name, room_join=True, can_publish=True,
can_publish_data=True, can_subscribe=True, can_update_own_metadata=True))
access_token = at.to_jwt()
return JSONResponse(content={
"accessToken": access_token,
"url": LIVEKIT_WEBSCOKET_URL_SUWARNA,
})
@app.get('/generate_token_nunna')
async def generate_token(request: Request):
agent_token = request.query_params.get("agent_token")
web_uuid = request.query_params.get("web_uuid")
room_name = ''.join(random.choices('abcdefghijklmnopqrstuvwxyz0123456789', k=7))
if not LIVEKIT_API_KEY_NUNNA or not LIVEKIT_API_SECRET_NUNNA:
raise ValueError("LIVEKIT_API_KEY and LIVEKIT_API_SECRET must be set")
metadata = json.dumps({
"agent_token": agent_token,
"web_uuid": web_uuid
})
identity = "human_lakshmi"
name = "laksmi"
at = api.AccessToken(LIVEKIT_API_KEY_NUNNA, LIVEKIT_API_SECRET_NUNNA).with_identity(identity).with_name(name).with_metadata(metadata)
at.with_grants(api.VideoGrants(room=room_name, room_join=True, can_publish=True,
can_publish_data=True, can_subscribe=True, can_update_own_metadata=True))
access_token = at.to_jwt()
return JSONResponse(content={
"accessToken": access_token,
"url": LIVEKIT_WEBSCOKET_URL_NUNNA,
})
class MessageRequestValidate(BaseModel):
from_phone: str = Field(default="+917389058485")
@app.post('/realtor/validate')
async def realtor_validate_user(request: MessageRequestValidate):
print(request)
if request.from_phone == "+917389058485":
return JSONResponse(content="User is verified")
return JSONResponse(content="user is not verified")
class MessageRequestSignUP(BaseModel):
from_phone: str = Field(default="+917389058485")
agent_id: str = Field(default="agent_id1")
full_name: str = Field(default="agent_full_name")
broker_id: str = Field(default="broker_id1")
@app.post('/realtor/singup')
async def realtor_sign_up(request: MessageRequestSignUP):
print(request)
return JSONResponse(content="user is signed up sucessfully now you can contine to ask propertry related questions")
@app.post('/chatbot/')
async def whatsapp_webhook(request: Request):
form_data = await request.form()
num_media = int(form_data.get('NumMedia', 0))
sender_number = form_data.get("From")[9:]
profile_name = form_data.get("ProfileName")
user_query = form_data.get("Body")
print(f"num_media: {num_media}, sender_number: {sender_number}, ProfileName: {profile_name}, user_query: {user_query}")
payload = {
"input_value": json.dumps({"from_phone": sender_number, "msg": user_query}),
"session_id": sender_number
}
print(f"Payload: {payload}")
headers = {
'Content-Type': 'application/json'
}
url = "https://workflows.kickcall.ai/api/v1/run/c94b14f5-d7a7-4fc8-89d8-eb672716c778?stream=false&user_id=1"
try:
response = requests.post(url, headers=headers, data=json.dumps(payload))
print(f"[Langflow][run] Response status code received: {response.status_code}")
if response.status_code == 200:
response_json = response.json()
print(f"[Langflow][run] Response: {response_json}")
output_text = response_json["outputs"][0]["outputs"][0]["results"]["message"]["data"]["text"]
bot_resp = MessagingResponse()
msg = bot_resp.message()
msg.body(output_text)
return Response(content=str(bot_resp), media_type="application/xml")
else:
print(f"Error: Received status code {response.status_code}")
bot_resp = MessagingResponse()
msg = bot_resp.message()
msg.body("I'm sorry, I couldn't process your request at the moment. Please try again later.")
return Response(content=str(bot_resp), media_type="application/xml")
except Exception as e:
print(f"Error in processing user query: {e}")
bot_resp = MessagingResponse()
msg = bot_resp.message()
msg.body("I'm sorry, an unexpected error occurred. Please try again later.")
return Response(content=str(bot_resp), media_type="application/xml")
class MessageRequestRunFlow(BaseModel):
from_phone: str = Field(default="+917389058485")
user_query: str = Field(default="Hello")
@app.post('/realtor/run_flow')
async def realtor_validate_user(request: MessageRequestRunFlow):
print(f"body: {request}")
user_query = request.user_query
print(f"user_query: {user_query}")
payload = {
"input_value": json.dumps({"from_phone": request.from_phone, "msg": user_query}),
"session_id": request.from_phone
}
print(f"Payload: {payload}")
headers = {
'Content-Type': 'application/json'
}
url = "https://workflows.kickcall.ai/api/v1/run/c94b14f5-d7a7-4fc8-89d8-eb672716c778?stream=false&user_id=1"
try:
response = requests.post(url, headers=headers, data=json.dumps(payload))
print(f"[Langflow][run] Response status code received: {response.status_code}")
if response.status_code == 200:
response_json = response.json()
print(f"[Langflow][run] Response: {response_json}")
output_text = response_json["outputs"][0]["outputs"][0]["results"]["message"]["data"]["text"]
return JSONResponse(content=output_text)
else:
print(f"Error: Received status code {response.status_code}")
msg ="I'm sorry, I couldn't process your request at the moment. Please try again later."
return JSONResponse(content=msg)
except Exception as e:
print(f"Error in processing user query: {e}")
msg = "I'm sorry, an unexpected error occurred. Please try again later."
return JSONResponse(content=msg)