|
|
|
import requests |
|
import os |
|
import logging |
|
from typing import Optional, Dict, Any |
|
import json |
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
WHATSAPP_API_VERSION = os.getenv("WHATSAPP_API_VERSION", "v19.0") |
|
WHATSAPP_TOKEN = os.environ["whatsapp_token"] |
|
PHONE_NUMBER_ID = os.environ["phone_number_id"] |
|
BASE_URL = f"https://graph.facebook.com/{WHATSAPP_API_VERSION}/{PHONE_NUMBER_ID}" |
|
HEADERS = { |
|
"Authorization": f"Bearer {WHATSAPP_TOKEN}", |
|
"Content-Type": "application/json", |
|
} |
|
|
|
def send_message(recipient_id: str, message_data: Dict[str, Any]) -> bool: |
|
"""Sends a message using the WhatsApp Cloud API.""" |
|
url = f"{BASE_URL}/messages" |
|
payload = { |
|
"messaging_product": "whatsapp", |
|
"to": recipient_id, |
|
**message_data, |
|
} |
|
try: |
|
response = requests.post(url, headers=HEADERS, json=payload) |
|
response.raise_for_status() |
|
logger.info(f"Message sent to {recipient_id}. Response: {response.json()}") |
|
return True |
|
except requests.exceptions.RequestException as e: |
|
logger.error(f"Error sending message to {recipient_id}: {e}") |
|
if e.response is not None: |
|
logger.error(f"Response status: {e.response.status_code}") |
|
logger.error(f"Response body: {e.response.text}") |
|
return False |
|
except Exception as e: |
|
logger.error(f"Unexpected error sending message: {e}") |
|
return False |
|
|
|
def send_text_message(recipient_id: str, text: str) -> bool: |
|
"""Sends a simple text message.""" |
|
message_data = { |
|
"type": "text", |
|
"text": {"preview_url": False, "body": text}, |
|
} |
|
return send_message(recipient_id, message_data) |
|
|
|
def send_image_message(recipient_id: str, image_url: Optional[str] = None, image_id: Optional[str] = None) -> bool: |
|
"""Sends an image message using either a URL or a previously uploaded ID.""" |
|
if not image_url and not image_id: |
|
logger.error("Either image_url or image_id must be provided.") |
|
return False |
|
|
|
message_data: Dict[str, Any] = {"type": "image"} |
|
if image_id: |
|
message_data["image"] = {"id": image_id} |
|
else: |
|
message_data["image"] = {"link": image_url} |
|
|
|
return send_message(recipient_id, message_data) |
|
|
|
|
|
def send_reply_buttons(recipient_id: str, body_text: str, button_data: list) -> bool: |
|
"""Sends an interactive message with reply buttons.""" |
|
|
|
|
|
|
|
|
|
valid_buttons = [] |
|
for btn in button_data[:3]: |
|
reply = btn.get("reply", {}) |
|
btn_id = str(reply.get("id", ""))[:256] |
|
btn_title = str(reply.get("title", ""))[:20] |
|
if btn_id and btn_title: |
|
valid_buttons.append({ |
|
"type": "reply", |
|
"reply": {"id": btn_id, "title": btn_title} |
|
}) |
|
else: |
|
logger.warning(f"Skipping invalid button: {btn}") |
|
|
|
if not valid_buttons: |
|
logger.error("No valid buttons provided for interactive message.") |
|
return False |
|
|
|
message_data = { |
|
"type": "interactive", |
|
"interactive": { |
|
"type": "button", |
|
"body": {"text": str(body_text)[:1024]}, |
|
"action": {"buttons": valid_buttons}, |
|
}, |
|
} |
|
return send_message(recipient_id, message_data) |
|
|
|
|
|
def get_media_url(media_id: str) -> Optional[str]: |
|
"""Retrieves the download URL for a media item.""" |
|
url = f"https://graph.facebook.com/{WHATSAPP_API_VERSION}/{media_id}" |
|
headers = {"Authorization": f"Bearer {WHATSAPP_TOKEN}"} |
|
try: |
|
response = requests.get(url, headers=headers) |
|
response.raise_for_status() |
|
data = response.json() |
|
logger.info(f"Retrieved media URL for {media_id}: {data.get('url')}") |
|
return data.get("url") |
|
except requests.exceptions.RequestException as e: |
|
logger.error(f"Error getting media URL for {media_id}: {e}") |
|
if e.response is not None: |
|
logger.error(f"Response status: {e.response.status_code}") |
|
logger.error(f"Response body: {e.response.text}") |
|
return None |
|
except Exception as e: |
|
logger.error(f"Unexpected error getting media URL: {e}") |
|
return None |
|
|
|
|
|
def download_media(media_url: str, save_path: str) -> Optional[str]: |
|
"""Downloads media from a URL obtained via get_media_url.""" |
|
headers = {"Authorization": f"Bearer {WHATSAPP_TOKEN}"} |
|
try: |
|
response = requests.get(media_url, headers=headers, stream=True) |
|
response.raise_for_status() |
|
|
|
|
|
os.makedirs(os.path.dirname(save_path), exist_ok=True) |
|
|
|
with open(save_path, "wb") as f: |
|
for chunk in response.iter_content(chunk_size=8192): |
|
f.write(chunk) |
|
logger.info(f"Media downloaded successfully to {save_path}") |
|
return save_path |
|
except requests.exceptions.RequestException as e: |
|
logger.error(f"Error downloading media from {media_url}: {e}") |
|
if e.response is not None: |
|
logger.error(f"Response status: {e.response.status_code}") |
|
logger.error(f"Response body: {e.response.text}") |
|
return None |
|
except Exception as e: |
|
logger.error(f"Unexpected error downloading media: {e}") |
|
return None |
|
|
|
|
|
|
|
def get_message_details(data: Dict[str, Any]) -> Optional[Dict[str, Any]]: |
|
"""Extracts key details from the incoming webhook payload.""" |
|
try: |
|
if "entry" not in data or not data["entry"]: |
|
return None |
|
change = data["entry"][0].get("changes", [{}])[0] |
|
if change.get("field") != "messages": |
|
return None |
|
|
|
message_data = change.get("value", {}).get("messages", [{}])[0] |
|
if not message_data: |
|
|
|
return None |
|
|
|
message_type = message_data.get("type") |
|
from_number = message_data.get("from") |
|
message_id = message_data.get("id") |
|
|
|
details = { |
|
"type": message_type, |
|
"from": from_number, |
|
"id": message_id, |
|
"timestamp": message_data.get("timestamp") |
|
} |
|
|
|
if message_type == "text": |
|
details["text"] = message_data.get("text", {}).get("body") |
|
elif message_type == "audio": |
|
details["audio_id"] = message_data.get("audio", {}).get("id") |
|
|
|
elif message_type == "interactive": |
|
interactive_data = message_data.get("interactive", {}) |
|
interactive_type = interactive_data.get("type") |
|
details["interactive_type"] = interactive_type |
|
if interactive_type == "button_reply": |
|
details["button_reply_id"] = interactive_data.get("button_reply", {}).get("id") |
|
details["button_reply_title"] = interactive_data.get("button_reply", {}).get("title") |
|
|
|
|
|
|
|
if not all([details["type"], details["from"], details["id"]]): |
|
logger.warning(f"Incomplete message data extracted: {details}") |
|
return None |
|
|
|
return details |
|
|
|
except (KeyError, IndexError, TypeError) as e: |
|
logger.error(f"Error parsing webhook data: {e}\nData: {json.dumps(data)}") |
|
return None |