Quantx-WA / whatsapp_client.py
rairo's picture
Update whatsapp_client.py
cb05fee verified
# whatsapp_client.py
import requests
import os
import logging
from typing import Optional, Dict, Any
import json
logger = logging.getLogger(__name__)
WHATSAPP_API_VERSION = os.getenv("WHATSAPP_API_VERSION", "v19.0") # Use a recent version
WHATSAPP_TOKEN = os.environ["whatsapp_token"]
PHONE_NUMBER_ID = os.environ["phone_number_id"]
BASE_URL = f"https://graph.facebook.com/{WHATSAPP_API_VERSION}/{PHONE_NUMBER_ID}"
HEADERS = {
"Authorization": f"Bearer {WHATSAPP_TOKEN}",
"Content-Type": "application/json",
}
def send_message(recipient_id: str, message_data: Dict[str, Any]) -> bool:
"""Sends a message using the WhatsApp Cloud API."""
url = f"{BASE_URL}/messages"
payload = {
"messaging_product": "whatsapp",
"to": recipient_id,
**message_data, # Spread the specific message type payload here
}
try:
response = requests.post(url, headers=HEADERS, json=payload)
response.raise_for_status() # Raise an exception for bad status codes (4xx or 5xx)
logger.info(f"Message sent to {recipient_id}. Response: {response.json()}")
return True
except requests.exceptions.RequestException as e:
logger.error(f"Error sending message to {recipient_id}: {e}")
if e.response is not None:
logger.error(f"Response status: {e.response.status_code}")
logger.error(f"Response body: {e.response.text}")
return False
except Exception as e:
logger.error(f"Unexpected error sending message: {e}")
return False
def send_text_message(recipient_id: str, text: str) -> bool:
"""Sends a simple text message."""
message_data = {
"type": "text",
"text": {"preview_url": False, "body": text},
}
return send_message(recipient_id, message_data)
def send_image_message(recipient_id: str, image_url: Optional[str] = None, image_id: Optional[str] = None) -> bool:
"""Sends an image message using either a URL or a previously uploaded ID."""
if not image_url and not image_id:
logger.error("Either image_url or image_id must be provided.")
return False
message_data: Dict[str, Any] = {"type": "image"}
if image_id:
message_data["image"] = {"id": image_id}
else:
message_data["image"] = {"link": image_url}
return send_message(recipient_id, message_data)
def send_reply_buttons(recipient_id: str, body_text: str, button_data: list) -> bool:
"""Sends an interactive message with reply buttons."""
# Ensure button IDs are strings and <= 256 chars
# Ensure button titles are strings and <= 20 chars
# Ensure body text is <= 1024 chars
# Max 3 buttons
valid_buttons = []
for btn in button_data[:3]: # Max 3 buttons
reply = btn.get("reply", {})
btn_id = str(reply.get("id", ""))[:256]
btn_title = str(reply.get("title", ""))[:20]
if btn_id and btn_title:
valid_buttons.append({
"type": "reply",
"reply": {"id": btn_id, "title": btn_title}
})
else:
logger.warning(f"Skipping invalid button: {btn}")
if not valid_buttons:
logger.error("No valid buttons provided for interactive message.")
return False
message_data = {
"type": "interactive",
"interactive": {
"type": "button",
"body": {"text": str(body_text)[:1024]},
"action": {"buttons": valid_buttons},
},
}
return send_message(recipient_id, message_data)
def get_media_url(media_id: str) -> Optional[str]:
"""Retrieves the download URL for a media item."""
url = f"https://graph.facebook.com/{WHATSAPP_API_VERSION}/{media_id}"
headers = {"Authorization": f"Bearer {WHATSAPP_TOKEN}"}
try:
response = requests.get(url, headers=headers)
response.raise_for_status()
data = response.json()
logger.info(f"Retrieved media URL for {media_id}: {data.get('url')}")
return data.get("url")
except requests.exceptions.RequestException as e:
logger.error(f"Error getting media URL for {media_id}: {e}")
if e.response is not None:
logger.error(f"Response status: {e.response.status_code}")
logger.error(f"Response body: {e.response.text}")
return None
except Exception as e:
logger.error(f"Unexpected error getting media URL: {e}")
return None
def download_media(media_url: str, save_path: str) -> Optional[str]:
"""Downloads media from a URL obtained via get_media_url."""
headers = {"Authorization": f"Bearer {WHATSAPP_TOKEN}"}
try:
response = requests.get(media_url, headers=headers, stream=True)
response.raise_for_status()
# Ensure directory exists
os.makedirs(os.path.dirname(save_path), exist_ok=True)
with open(save_path, "wb") as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
logger.info(f"Media downloaded successfully to {save_path}")
return save_path
except requests.exceptions.RequestException as e:
logger.error(f"Error downloading media from {media_url}: {e}")
if e.response is not None:
logger.error(f"Response status: {e.response.status_code}")
logger.error(f"Response body: {e.response.text}")
return None
except Exception as e:
logger.error(f"Unexpected error downloading media: {e}")
return None
# --- Helper functions for parsing incoming webhook data ---
def get_message_details(data: Dict[str, Any]) -> Optional[Dict[str, Any]]:
"""Extracts key details from the incoming webhook payload."""
try:
if "entry" not in data or not data["entry"]:
return None
change = data["entry"][0].get("changes", [{}])[0]
if change.get("field") != "messages":
return None # Not a message change
message_data = change.get("value", {}).get("messages", [{}])[0]
if not message_data:
# Could be a status update, etc. Check for other types if needed.
return None
message_type = message_data.get("type")
from_number = message_data.get("from")
message_id = message_data.get("id")
details = {
"type": message_type,
"from": from_number,
"id": message_id,
"timestamp": message_data.get("timestamp")
}
if message_type == "text":
details["text"] = message_data.get("text", {}).get("body")
elif message_type == "audio":
details["audio_id"] = message_data.get("audio", {}).get("id")
# You might want mime_type too: message_data.get("audio", {}).get("mime_type")
elif message_type == "interactive":
interactive_data = message_data.get("interactive", {})
interactive_type = interactive_data.get("type")
details["interactive_type"] = interactive_type
if interactive_type == "button_reply":
details["button_reply_id"] = interactive_data.get("button_reply", {}).get("id")
details["button_reply_title"] = interactive_data.get("button_reply", {}).get("title")
# Add elif for list_reply if you use lists
# Add elif for other types: image, document, location, contacts, etc.
if not all([details["type"], details["from"], details["id"]]):
logger.warning(f"Incomplete message data extracted: {details}")
return None
return details
except (KeyError, IndexError, TypeError) as e:
logger.error(f"Error parsing webhook data: {e}\nData: {json.dumps(data)}")
return None