Spaces:
Sleeping
Sleeping
import logging | |
import subprocess | |
import sys | |
import os | |
import requests | |
from PIL import Image | |
import pytesseract | |
from io import BytesIO | |
import pandas as pd | |
import json | |
from groq import Groq | |
from twilio.rest import Client | |
from datetime import datetime | |
from selenium import webdriver | |
from selenium.webdriver.common.by import By | |
import time | |
from google.colab import drive, auth | |
# Configure logging | |
logging.basicConfig( | |
level=logging.DEBUG, | |
format='%(asctime)s - %(levelname)s - %(message)s', | |
handlers=[ | |
logging.FileHandler('sms_debug.log'), | |
logging.StreamHandler() | |
] | |
) | |
logger = logging.getLogger(__name__) | |
def install_dependencies(): | |
"""Install all required packages""" | |
subprocess.run(['apt-get', 'update'], check=True) | |
subprocess.run(['apt-get', 'install', '-y', 'chromium-chromedriver'], check=True) | |
subprocess.run(['apt-get', 'install', '-y', 'tesseract-ocr'], check=True) | |
packages = [ | |
'selenium', | |
'Pillow', | |
'pytesseract', | |
'pandas', | |
'requests', | |
'groq', | |
'twilio' | |
] | |
for package in packages: | |
subprocess.run([sys.executable, '-m', 'pip', 'install', package], check=True) | |
class SMSSender: | |
def __init__(self): | |
"""Initialize Twilio client with credentials""" | |
# Updated Twilio credentials | |
self.account_sid = "AC68e68b700bfe8ede9080e426042e6ccf" | |
self.auth_token = "c8a89a8f95c29b8ea8ea2c4668d4635f" # New auth token | |
self.from_number = "+17322534518" | |
try: | |
self.client = Client(self.account_sid, self.auth_token) | |
# Test authentication without making an API call | |
self.client.http_client.last_response = None | |
logger.info("Twilio client initialized successfully") | |
except Exception as e: | |
logger.error(f"Failed to initialize Twilio client: {str(e)}") | |
# Instead of raising, we'll set client to None | |
self.client = None | |
def send_sms(self, to_number, message): | |
"""Send SMS using Twilio""" | |
if not self.client: | |
logger.error("Twilio client not initialized. SMS will not be sent.") | |
return False | |
try: | |
logger.info(f"Attempting to send SMS to: {to_number}") | |
if not to_number.startswith('+'): | |
to_number = f"+91{to_number}" | |
message = self.client.messages.create( | |
body=message, | |
from_=self.from_number, | |
to=to_number | |
) | |
logger.info(f"SMS sent successfully! Message SID: {message.sid}") | |
return True | |
except Exception as e: | |
logger.error(f"Failed to send SMS: {str(e)}", exc_info=True) | |
return False | |
class ScamDetector: | |
def __init__(self, groq_api_key, sms_sender): | |
self.groq_client = Groq(api_key=groq_api_key) | |
self.sms_sender = sms_sender | |
self.setup_drive() | |
def setup_drive(self): | |
auth.authenticate_user() | |
drive.mount('/content/drive') | |
def process_text_with_groq(self, text): | |
try: | |
prompt = f""" | |
Format the following extracted text from an SMS image. | |
Keep the original content intact but improve the formatting and remove any OCR artifacts: | |
{text} | |
""" | |
# Make API call to Groq | |
completion = self.groq_client.chat.completions.create( | |
model="llama3-8b-8192", | |
messages=[{"role": "user", "content": prompt}], | |
temperature=0.8, | |
max_tokens=1024, | |
top_p=1, | |
stream=False, | |
stop=None | |
) | |
# Check if we got a valid response | |
if completion and hasattr(completion, 'choices') and completion.choices: | |
return completion.choices[0].message.content.strip() | |
else: | |
error_msg = "Invalid response from Groq API" | |
logger.error(error_msg) | |
self.sms_sender.send_sms( | |
to_number="8140030507", | |
message=f"CRITICAL: Groq API Error - {error_msg}. Stopping process." | |
) | |
sys.exit(1) | |
except Exception as e: | |
error_msg = f"Critical error in Groq processing: {str(e)}" | |
logger.error(error_msg) | |
self.sms_sender.send_sms( | |
to_number="8140030507", | |
message=f"CRITICAL: Groq API Error - {error_msg}. Stopping process." | |
) | |
sys.exit(1) | |
def download_and_extract_text(self, url): | |
try: | |
response = requests.get(url, timeout=10) | |
img = Image.open(BytesIO(response.content)) | |
text = pytesseract.image_to_string(img) | |
text = text.strip() | |
if text: | |
return self.process_text_with_groq(text) | |
except Exception as e: | |
logger.error(f"Error processing image from {url}: {str(e)}") | |
return None | |
def scrape_images(self): | |
chrome_options = webdriver.ChromeOptions() | |
chrome_options.add_argument('--headless') | |
chrome_options.add_argument('--no-sandbox') | |
chrome_options.add_argument('--disable-dev-shm-usage') | |
driver = webdriver.Chrome(options=chrome_options) | |
try: | |
search_query = "indian scam sms" # Updated search query | |
encoded_query = search_query.replace(' ', '+') | |
driver.get(f"https://www.bing.com/images/search?q={encoded_query}") | |
logger.info("Loading images...") | |
time.sleep(3) | |
for i in range(5): | |
driver.execute_script("window.scrollTo(0, document.body.scrollHeight);") | |
time.sleep(2) | |
logger.info(f"Scroll {i+1}/5 completed") | |
image_urls = [] | |
# Get both standard and high-res images | |
selectors = [".mimg", ".iusc"] | |
for selector in selectors: | |
elements = driver.find_elements(By.CSS_SELECTOR, selector) | |
for element in elements: | |
try: | |
if selector == ".mimg": | |
url = element.get_attribute('src') | |
else: | |
m = element.get_attribute('m') | |
if m: | |
m_json = json.loads(m) | |
url = m_json.get('murl') | |
else: | |
continue | |
if url and url.startswith('http') and url not in image_urls: | |
image_urls.append(url) | |
except Exception as e: | |
logger.error(f"Error getting URL from {selector}: {str(e)}") | |
return image_urls | |
finally: | |
driver.quit() | |
def update_status_report(self, timestamp, total_images, processed_images, scam_count, ham_count, folder_path, base_path='/content/drive/MyDrive'): | |
"""Update the running status report""" | |
report_path = f"{base_path}/scam_detector_status_report.txt" | |
try: | |
# Read existing report if it exists | |
existing_runs = [] | |
if os.path.exists(report_path): | |
with open(report_path, 'r', encoding='utf-8') as f: | |
existing_runs = f.readlines() | |
# Create new run entry | |
new_run = ( | |
f"\n=== Scan Run: {timestamp} ===\n" | |
f"Total Images Found: {total_images}\n" | |
f"Successfully Processed: {processed_images}\n" | |
f"Scams Detected: {scam_count}\n" | |
f"Legitimate Messages: {ham_count}\n" | |
f"Results Location: {folder_path}\n" | |
f"{'=' * 50}\n" | |
) | |
# Append new run to existing runs | |
with open(report_path, 'a', encoding='utf-8') as f: | |
f.write(new_run) | |
# Calculate and append totals | |
total_runs = len([line for line in existing_runs if "=== Scan Run:" in line]) + 1 | |
total_processed = sum(int(line.split(': ')[1]) for line in existing_runs if "Successfully Processed:" in line) + processed_images | |
total_scams = sum(int(line.split(': ')[1]) for line in existing_runs if "Scams Detected:" in line) + scam_count | |
total_ham = sum(int(line.split(': ')[1]) for line in existing_runs if "Legitimate Messages:" in line) + ham_count | |
summary = ( | |
f"\n=== OVERALL STATISTICS ===\n" | |
f"Total Runs: {total_runs}\n" | |
f"Total Images Processed: {total_processed}\n" | |
f"Total Scams Detected: {total_scams}\n" | |
f"Total Legitimate Messages: {total_ham}\n" | |
f"Last Updated: {timestamp}\n" | |
f"{'=' * 50}\n" | |
) | |
# Update the summary at the end of file | |
with open(report_path, 'a', encoding='utf-8') as f: | |
f.write(summary) | |
logger.info(f"Status report updated at: {report_path}") | |
return total_runs, total_processed, total_scams, total_ham | |
except Exception as e: | |
logger.error(f"Error updating status report: {str(e)}") | |
return None | |
def process_and_save(self, image_urls, base_path='/content/drive/MyDrive'): | |
"""Process images and save results""" | |
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
logger.info("Starting to process images one by one...") | |
image_texts = [] | |
scam_file_path = f"{base_path}/scam123.csv" | |
# Create or append to scam123.csv | |
if not os.path.exists(scam_file_path): | |
df = pd.DataFrame(columns=['v1', 'v2']) | |
df.to_csv(scam_file_path, index=False) | |
try: | |
existing_df = pd.read_csv(scam_file_path) | |
except Exception as e: | |
logger.error(f"Error reading existing CSV: {str(e)}") | |
existing_df = pd.DataFrame(columns=['v1', 'v2']) | |
total_images = len(image_urls) | |
for i, url in enumerate(image_urls, 1): | |
try: | |
logger.info(f"\n--- Processing image {i}/{total_images} ---") | |
logger.info(f"URL: {url}") | |
# Step 1: Download and extract text | |
logger.info("Extracting text from image...") | |
text = self.download_and_extract_text(url) | |
if not text: | |
logger.warning(f"No text extracted from image {i}, skipping...") | |
continue | |
# Step 2: Send to prediction API | |
logger.info("Sending text to prediction API...") | |
try: | |
response = requests.post( | |
"https://varun324242-sssssss.hf.space/predict", | |
json={"message": text} | |
) | |
response.raise_for_status() | |
prediction_result = response.json() | |
prediction = prediction_result.get("predicted_result", "unknown") | |
# Store the result | |
image_texts.append({ | |
'URL': url, | |
'Text': text, | |
'Prediction': prediction | |
}) | |
# If prediction is ham, append to scam123.csv | |
if prediction == "ham": | |
new_row = pd.DataFrame([{ | |
'v1': 'scam', | |
'v2': text | |
}]) | |
existing_df = pd.concat([existing_df, new_row], ignore_index=True) | |
existing_df.to_csv(scam_file_path, index=False) | |
logger.info(f"Added ham message to scam123.csv") | |
logger.info(f"Successfully processed image {i}/{total_images}") | |
logger.info(f"Prediction: {prediction}") | |
except requests.exceptions.RequestException as e: | |
logger.error(f"API error for image {i}: {str(e)}") | |
continue | |
except Exception as e: | |
logger.error(f"Error processing image {i}: {str(e)}") | |
continue | |
# Save final results | |
folder_path = f"{base_path}/scam_detector_{timestamp}" | |
os.makedirs(folder_path, exist_ok=True) | |
url_path = f'{folder_path}/scam_image_urls.txt' | |
text_path = f'{folder_path}/scam_image_texts.txt' | |
csv_path = f'{folder_path}/scam_messages.csv' | |
# Save all results to files | |
with open(url_path, 'w') as f: | |
for url in image_urls: | |
f.write(url + '\n') | |
with open(text_path, 'w', encoding='utf-8') as f: | |
for item in image_texts: | |
f.write(f"URL: {item['URL']}\n") | |
f.write(f"Text: {item['Text']}\n") | |
f.write(f"Prediction: {item['Prediction']}\n") | |
f.write("-" * 80 + "\n") | |
df = pd.DataFrame(image_texts) | |
df.to_csv(csv_path, index=False) | |
# Calculate statistics | |
ham_count = sum(1 for item in image_texts if item['Prediction'] == 'ham') | |
scam_count = sum(1 for item in image_texts if item['Prediction'] == 'scam') | |
# Update status report | |
total_runs, total_processed, total_scams, total_ham = self.update_status_report( | |
timestamp=timestamp, | |
total_images=len(image_urls), | |
processed_images=len(image_texts), | |
scam_count=scam_count, | |
ham_count=ham_count, | |
folder_path=folder_path, | |
base_path=base_path | |
) | |
# Modified final message to include overall statistics | |
final_message = ( | |
f"Scan Complete!\n" | |
f"This Run:\n" | |
f"- Images Found: {len(image_urls)}\n" | |
f"- Processed: {len(image_texts)}\n" | |
f"- Scams: {scam_count}\n" | |
f"- Legitimate: {ham_count}\n" | |
f"\nOverall Statistics:\n" | |
f"- Total Runs: {total_runs}\n" | |
f"- Total Processed: {total_processed}\n" | |
f"- Total Scams: {total_scams}\n" | |
f"- Total Ham: {total_ham}\n" | |
f"\nResults saved to: {folder_path}" | |
) | |
self.sms_sender.send_sms( | |
to_number="8140030507", | |
message=final_message | |
) | |
return url_path, text_path, csv_path | |
def main(): | |
try: | |
logger.info("Starting the continuous scam detection process...") | |
install_dependencies() | |
GROQ_API_KEY = "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" | |
try: | |
sms_sender = SMSSender() | |
except Exception as e: | |
logger.error(f"Failed to initialize SMS sender: {str(e)}") | |
sms_sender = None | |
detector = ScamDetector(groq_api_key=GROQ_API_KEY, sms_sender=sms_sender) | |
while True: # Continuous loop | |
try: | |
run_timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
run_log_handler = logging.FileHandler(f'scam_run_{run_timestamp}.log') | |
run_log_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')) | |
logger.addHandler(run_log_handler) | |
# Send start notification only if SMS sender is available | |
start_message = f"Starting new scam detection scan at {run_timestamp}" | |
logger.info(start_message) | |
if sms_sender: | |
sms_sender.send_sms( | |
to_number="8140030507", | |
message=start_message | |
) | |
logger.info("Starting image scraping...") | |
image_urls = detector.scrape_images() | |
logger.info(f"\nFound {len(image_urls)} unique images") | |
url_path, text_path, csv_path = detector.process_and_save(image_urls) | |
logger.info(f"\nResults saved!") | |
# Remove the run-specific log handler | |
logger.removeHandler(run_log_handler) | |
run_log_handler.close() | |
# Wait for 30 seconds before next run | |
logger.info("Waiting 30 seconds before next scan...") | |
time.sleep(30) | |
except Exception as e: | |
error_msg = f"Error in detection run: {str(e)}" | |
logger.error(error_msg, exc_info=True) | |
time.sleep(300) # Wait 5 minutes before retrying | |
continue | |
except Exception as e: | |
critical_error = f"Critical error occurred: {str(e)}" | |
logger.error(critical_error, exc_info=True) | |
# Send critical error notification only if SMS sender is available | |
if sms_sender: | |
sms_sender.send_sms( | |
to_number="8140030507", | |
message=f"CRITICAL ERROR: {str(e)[:100]}... System will restart in 1 minute." | |
) | |
time.sleep(60) # Wait 1 minute before restart | |
main() # Restart the main function | |
if __name__ == "__main__": | |
main() | |