rairo commited on
Commit
7bda769
·
verified ·
1 Parent(s): 1460b64

Update main.py

Browse files
Files changed (1) hide show
  1. main.py +173 -58
main.py CHANGED
@@ -101,32 +101,135 @@ GREETING_PATTERN = re.compile(r'^\s*(hi|hello|hola|hey|greetings|sawubona)\b.*$'
101
  # --- Duplicate Message Handling ---
102
  PROCESSED_MESSAGES_TTL_HOURS = 24 # Store processed message IDs for 24 hours
103
 
104
- def check_and_mark_processed(message_id: str) -> bool:
105
- """Improved duplicate checking with atomic transactions"""
106
- if not db or not message_id:
107
- logger.warning("Firestore client or message ID missing")
108
- return False
109
 
110
- doc_ref = db.collection("processed_messages").document(message_id)
 
 
 
 
 
111
 
112
- try:
113
- # Use transaction to prevent race conditions
114
- @firestore.transactional
115
- def check_and_mark_transaction(transaction):
116
- doc = transaction.get(doc_ref)
117
- if doc.exists:
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
118
  return True
119
- transaction.set(doc_ref, {
120
- "processed_at": firestore.SERVER_TIMESTAMP,
121
- "expires_at": datetime.now() + timedelta(hours=PROCESSED_MESSAGES_TTL_HOURS)
122
- })
 
 
 
 
 
 
 
 
 
 
 
 
 
 
123
  return False
124
-
125
- return check_and_mark_transaction(db.transaction())
 
 
126
 
127
- except Exception as e:
128
- logger.error(f"Firestore error: {e}", exc_info=True)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
129
  return False
 
 
130
 
131
 
132
  # --- Message Sending Wrappers (using whatsapp_client) ---
@@ -347,7 +450,7 @@ def process_audio_message(audio_id: str, mobile: str) -> None:
347
  # --- Webhook Endpoint ---
348
  @app.route("/", methods=["GET", "POST"])
349
  def webhook_handler():
350
- """Handles incoming WhatsApp webhooks."""
351
  if request.method == "GET":
352
  # Webhook Verification
353
  mode = request.args.get("hub.mode")
@@ -362,77 +465,89 @@ def webhook_handler():
362
 
363
  # --- POST Request Handling ---
364
  if request.method == "POST":
365
- data = request.get_json()
366
- if not data:
367
- logger.warning("Received empty POST request body.")
368
- return make_response("Bad Request: No data", 400)
 
369
 
370
- # Log the raw incoming payload for debugging
371
- logger.debug(f"Incoming Webhook Payload: {json.dumps(data, indent=2)}")
372
 
373
- # Use the helper function from whatsapp_client to parse the payload
374
- msg_details = wa_client.get_message_details(data)
375
 
376
- if not msg_details:
377
- # Not a message we handle (e.g., status update, unsupported type) or parsing failed
378
- logger.info("Webhook payload did not contain a processable message.")
379
- return make_response("ok", 200) # Acknowledge receipt even if not processed
380
 
381
- message_id = msg_details.get("id")
382
- mobile = msg_details.get("from")
383
- message_type = msg_details.get("type")
384
 
385
- # --- Duplicate Check ---
386
- if message_id and check_and_mark_processed(message_id):
387
- return make_response("ok - duplicate", 200) # Acknowledge receipt, but don't process
 
388
 
389
- # --- Route based on message type ---
390
- try:
 
 
 
 
391
  if message_type == "text":
392
  message_text = msg_details.get("text")
393
- if message_text and mobile:
394
  process_text_message(message_text, mobile)
395
  else:
396
- logger.warning(f"Missing text content or sender for text message: {msg_details}")
397
 
398
  elif message_type == "audio":
399
  audio_id = msg_details.get("audio_id")
400
- if audio_id and mobile:
401
  process_audio_message(audio_id, mobile)
402
  else:
403
- logger.warning(f"Missing audio ID or sender for audio message: {msg_details}")
404
 
405
  elif message_type == "interactive":
406
  interactive_type = msg_details.get("interactive_type")
407
  if interactive_type == "button_reply":
408
  button_id = msg_details.get("button_reply_id")
409
- if button_id and mobile:
410
  handle_interactive_response(mobile, button_id)
411
  else:
412
- logger.warning(f"Missing button ID or sender for button reply: {msg_details}")
413
- # Add elif for list_reply etc. if needed
414
  else:
415
- logger.info(f"Received unhandled interactive type: {interactive_type}")
416
-
417
  else:
418
- # Handle other message types if needed (image, document, location etc.)
419
  logger.info(f"Received unhandled message type: {message_type} from {mobile}")
420
- # Optionally send a message back
421
- # wa_client.send_text_message(mobile, f"Sorry, I cannot handle '{message_type}' messages yet.")
422
 
423
  except Exception as e:
424
  # Generic error handler for safety
425
  logger.error(f"Unhandled exception during message processing: {e}", exc_info=True)
426
- if mobile: # Try to notify the user if we know who they are
427
- wa_client.send_text_message(mobile, "Sorry, an unexpected error occurred while processing your message.")
428
-
429
- # Acknowledge receipt of the webhook event
 
 
 
 
 
 
 
 
 
 
 
430
  return make_response("ok", 200)
431
 
432
  # Method Not Allowed
433
  return make_response("Method Not Allowed", 405)
434
 
435
-
436
  if __name__ == '__main__':
437
  port = int(os.environ.get("PORT", 7860)) # Use PORT env var if available (for deployment)
438
  debug_mode = os.environ.get("FLASK_DEBUG", "False").lower() == "true"
 
101
  # --- Duplicate Message Handling ---
102
  PROCESSED_MESSAGES_TTL_HOURS = 24 # Store processed message IDs for 24 hours
103
 
104
+ # Add this new function to your code
 
 
 
 
105
 
106
+ import threading
107
+ from collections import OrderedDict
108
+ from time import time
109
+
110
+ class MessageDeduplicator:
111
+ """Thread-safe in-memory cache for message IDs with TTL and disk backup."""
112
 
113
+ def __init__(self, ttl_hours=24, max_cache_size=10000, db_client=None):
114
+ """
115
+ Initialize deduplicator with TTL in hours, max cache size, and optional DB client.
116
+
117
+ Args:
118
+ ttl_hours: Hours to keep message IDs in memory/DB
119
+ max_cache_size: Maximum number of messages to keep in memory
120
+ db_client: Firestore client for persistent storage (optional)
121
+ """
122
+ self.ttl_seconds = ttl_hours * 3600
123
+ self.max_cache_size = max_cache_size
124
+ self.db_client = db_client
125
+ self.cache = OrderedDict() # {message_id: timestamp}
126
+ self.lock = threading.RLock()
127
+
128
+ # Start background cleanup thread
129
+ self._cleanup_thread = threading.Thread(
130
+ target=self._periodic_cleanup,
131
+ daemon=True
132
+ )
133
+ self._cleanup_thread.start()
134
+
135
+ def is_duplicate(self, message_id):
136
+ """
137
+ Check if message_id has been processed already.
138
+
139
+ Args:
140
+ message_id: Unique identifier for the message
141
+
142
+ Returns:
143
+ bool: True if message is a duplicate, False otherwise
144
+ """
145
+ if not message_id:
146
+ return False
147
+
148
+ with self.lock:
149
+ # Check in-memory cache first (fastest)
150
+ if message_id in self.cache:
151
+ # Refresh the timestamp and move to end (most recently used)
152
+ self.cache.move_to_end(message_id)
153
  return True
154
+
155
+ # If not in memory, check database if available
156
+ if self.db_client:
157
+ try:
158
+ doc_ref = self.db_client.collection("processed_messages").document(message_id)
159
+ doc = doc_ref.get()
160
+ if doc.exists:
161
+ # Add to in-memory cache too
162
+ self.cache[message_id] = time()
163
+ # Maintain cache size limit
164
+ if len(self.cache) > self.max_cache_size:
165
+ self.cache.popitem(last=False) # Remove oldest item
166
+ return True
167
+ except Exception as e:
168
+ logger.error(f"Database error in is_duplicate: {e}", exc_info=True)
169
+
170
+ # Not a duplicate, mark as processed
171
+ self._mark_processed(message_id)
172
  return False
173
+
174
+ def _mark_processed(self, message_id):
175
+ """Mark message as processed in both cache and database."""
176
+ current_time = time()
177
 
178
+ with self.lock:
179
+ # Add to in-memory cache
180
+ self.cache[message_id] = current_time
181
+
182
+ # Maintain cache size limit
183
+ if len(self.cache) > self.max_cache_size:
184
+ self.cache.popitem(last=False) # Remove oldest item
185
+
186
+ # Add to database if available
187
+ if self.db_client:
188
+ try:
189
+ expiry = datetime.now() + timedelta(hours=self.ttl_seconds/3600)
190
+ doc_ref = self.db_client.collection("processed_messages").document(message_id)
191
+ doc_ref.set({
192
+ "processed_at": firestore.SERVER_TIMESTAMP,
193
+ "expires_at": expiry
194
+ })
195
+ except Exception as e:
196
+ logger.error(f"Database error in _mark_processed: {e}", exc_info=True)
197
+
198
+ def _periodic_cleanup(self):
199
+ """Background thread that removes expired entries periodically."""
200
+ while True:
201
+ try:
202
+ with self.lock:
203
+ current_time = time()
204
+ # Remove expired entries from memory
205
+ expired_ids = [
206
+ msg_id for msg_id, timestamp in list(self.cache.items())
207
+ if current_time - timestamp > self.ttl_seconds
208
+ ]
209
+ for msg_id in expired_ids:
210
+ self.cache.pop(msg_id, None)
211
+
212
+ # Sleep for an hour between cleanups
213
+ threading.Event().wait(3600)
214
+ except Exception as e:
215
+ logger.error(f"Error in cleanup thread: {e}", exc_info=True)
216
+ # Sleep for a while before retrying
217
+ threading.Event().wait(300)
218
+
219
+ # Initialize the deduplicator at the module level
220
+ message_deduplicator = MessageDeduplicator(
221
+ ttl_hours=PROCESSED_MESSAGES_TTL_HOURS,
222
+ db_client=db
223
+ )
224
+
225
+ # Replace your check_and_mark_processed function with this:
226
+ def check_and_mark_processed(message_id: str) -> bool:
227
+ """Check if a message has been processed already using the deduplicator."""
228
+ if not message_id:
229
+ logger.warning("Empty message ID provided to check_and_mark_processed")
230
  return False
231
+
232
+ return message_deduplicator.is_duplicate(message_id)
233
 
234
 
235
  # --- Message Sending Wrappers (using whatsapp_client) ---
 
450
  # --- Webhook Endpoint ---
451
  @app.route("/", methods=["GET", "POST"])
452
  def webhook_handler():
453
+ """Handles incoming WhatsApp webhooks with improved duplicate detection."""
454
  if request.method == "GET":
455
  # Webhook Verification
456
  mode = request.args.get("hub.mode")
 
465
 
466
  # --- POST Request Handling ---
467
  if request.method == "POST":
468
+ try:
469
+ data = request.get_json()
470
+ if not data:
471
+ logger.warning("Received empty POST request body.")
472
+ return make_response("Bad Request: No data", 400)
473
 
474
+ # Log the raw incoming payload for debugging (consider sampling for high traffic)
475
+ # logger.debug(f"Incoming Webhook Payload: {json.dumps(data, indent=2)}")
476
 
477
+ # Use the helper function from whatsapp_client to parse the payload
478
+ msg_details = wa_client.get_message_details(data)
479
 
480
+ if not msg_details:
481
+ # Not a message we handle (e.g., status update, unsupported type) or parsing failed
482
+ logger.info("Webhook payload did not contain a processable message.")
483
+ return make_response("ok", 200) # Acknowledge receipt even if not processed
484
 
485
+ message_id = msg_details.get("id")
486
+ mobile = msg_details.get("from")
487
+ message_type = msg_details.get("type")
488
 
489
+ # Early return if no message ID or phone number
490
+ if not message_id or not mobile:
491
+ logger.warning("Missing message ID or sender phone number in payload")
492
+ return make_response("ok - invalid message", 200)
493
 
494
+ # --- Improved Duplicate Check ---
495
+ if check_and_mark_processed(message_id):
496
+ logger.info(f"Ignoring duplicate message with ID {message_id} from {mobile}")
497
+ return make_response("ok - duplicate", 200) # Acknowledge but don't process
498
+
499
+ # --- Route based on message type ---
500
  if message_type == "text":
501
  message_text = msg_details.get("text")
502
+ if message_text:
503
  process_text_message(message_text, mobile)
504
  else:
505
+ logger.warning(f"Missing text content for text message: {msg_details}")
506
 
507
  elif message_type == "audio":
508
  audio_id = msg_details.get("audio_id")
509
+ if audio_id:
510
  process_audio_message(audio_id, mobile)
511
  else:
512
+ logger.warning(f"Missing audio ID for audio message: {msg_details}")
513
 
514
  elif message_type == "interactive":
515
  interactive_type = msg_details.get("interactive_type")
516
  if interactive_type == "button_reply":
517
  button_id = msg_details.get("button_reply_id")
518
+ if button_id:
519
  handle_interactive_response(mobile, button_id)
520
  else:
521
+ logger.warning(f"Missing button ID for button reply: {msg_details}")
 
522
  else:
523
+ logger.info(f"Received unhandled interactive type: {interactive_type}")
 
524
  else:
525
+ # Handle other message types if needed
526
  logger.info(f"Received unhandled message type: {message_type} from {mobile}")
 
 
527
 
528
  except Exception as e:
529
  # Generic error handler for safety
530
  logger.error(f"Unhandled exception during message processing: {e}", exc_info=True)
531
+ # Only try to notify if we can extract the mobile number
532
+ mobile = None
533
+ try:
534
+ msg_details = wa_client.get_message_details(data)
535
+ mobile = msg_details.get("from") if msg_details else None
536
+ except:
537
+ pass
538
+
539
+ if mobile:
540
+ wa_client.send_text_message(
541
+ mobile,
542
+ "Sorry, an unexpected error occurred while processing your message."
543
+ )
544
+
545
+ # Always acknowledge receipt of the webhook event
546
  return make_response("ok", 200)
547
 
548
  # Method Not Allowed
549
  return make_response("Method Not Allowed", 405)
550
 
 
551
  if __name__ == '__main__':
552
  port = int(os.environ.get("PORT", 7860)) # Use PORT env var if available (for deployment)
553
  debug_mode = os.environ.get("FLASK_DEBUG", "False").lower() == "true"