Spaces:
Sleeping
Sleeping
import subprocess | |
import sys | |
import os | |
import requests | |
from PIL import Image | |
import pytesseract | |
from io import BytesIO | |
import pandas as pd | |
import json | |
from groq import Groq | |
from twilio.rest import Client | |
import logging | |
from datetime import datetime | |
from selenium import webdriver | |
from selenium.webdriver.common.by import By | |
import time | |
from pathlib import Path | |
import csv | |
from datetime import datetime, timedelta | |
import time | |
from typing import List, Dict | |
# Configure logging | |
logging.basicConfig( | |
level=logging.DEBUG, | |
format='%(asctime)s - %(levelname)s - %(message)s', | |
handlers=[ | |
logging.FileHandler('logs/sms_debug.log'), | |
logging.StreamHandler() | |
] | |
) | |
logger = logging.getLogger(__name__) | |
def setup_directories(): | |
"""Create necessary directories""" | |
directories = ['logs', 'data', 'data/images', 'data/texts', 'data/reports'] | |
for directory in directories: | |
os.makedirs(directory, exist_ok=True) | |
logger.info("Directory structure created") | |
class SMSSender: | |
def __init__(self): | |
"""Initialize Twilio client with credentials""" | |
self.account_sid = "AC68e68b700bfe8ede9080e426042e6ccf" | |
self.auth_token = "27814cd39d313e35713c81e7b36da11f" | |
self.from_number = "+17322534518" | |
self.client = Client(self.account_sid, self.auth_token) | |
def send_sms(self, to_number, message): | |
"""Send SMS using Twilio""" | |
try: | |
logger.info(f"Attempting to send SMS to: {to_number}") | |
if not to_number.startswith('+'): | |
to_number = f"+91{to_number}" | |
message = self.client.messages.create( | |
body=message, | |
from_=self.from_number, | |
to=to_number | |
) | |
logger.info(f"SMS sent successfully! Message SID: {message.sid}") | |
return True | |
except Exception as e: | |
logger.error(f"Failed to send SMS: {str(e)}", exc_info=True) | |
return False | |
# Enhanced logging configuration | |
def setup_logging(): | |
"""Setup enhanced logging with custom formatting""" | |
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') | |
log_dir = Path('logs') | |
log_dir.mkdir(exist_ok=True) | |
# Create different log files for different purposes | |
log_files = { | |
'main': log_dir / f'scam_detector_{timestamp}.log', | |
'api': log_dir / f'api_calls_{timestamp}.log', | |
'image': log_dir / f'image_processing_{timestamp}.log', | |
'csv': log_dir / f'csv_operations_{timestamp}.log' | |
} | |
# Configure logging with custom formatter | |
formatter = logging.Formatter( | |
'%(asctime)s | %(levelname)-8s | %(name)-12s | %(message)s', | |
datefmt='%Y-%m-%d %H:%M:%S' | |
) | |
# Create handlers for each log file | |
handlers = {} | |
for name, log_file in log_files.items(): | |
handler = logging.FileHandler(log_file) | |
handler.setFormatter(formatter) | |
handlers[name] = handler | |
# Configure root logger | |
logging.basicConfig( | |
level=logging.INFO, | |
handlers=[*handlers.values(), logging.StreamHandler()] | |
) | |
return handlers | |
class ScamDetector: | |
def __init__(self, groq_api_key, sms_sender): | |
self.groq_client = Groq(api_key=groq_api_key) | |
self.sms_sender = sms_sender | |
self.base_path = Path(os.getcwd()) / 'data' | |
self.request_count = 0 | |
self.last_csv_update = datetime.now() | |
setup_directories() | |
self.handlers = setup_logging() | |
self.logger = logging.getLogger('ScamDetector') | |
def process_text_with_groq(self, text): | |
try: | |
prompt = f""" | |
Format the following extracted text from an SMS image. | |
Keep the original content intact but improve the formatting and remove any OCR artifacts: | |
{text} | |
""" | |
completion = self.groq_client.chat.completions.create( | |
model="llama3-8b-8192", | |
messages=[{"role": "user", "content": prompt}], | |
temperature=0.3, | |
max_tokens=1024, | |
top_p=1, | |
stream=False, | |
stop=None | |
) | |
return completion.choices[0].message.content.strip() | |
except Exception as e: | |
logger.error(f"Error in Groq processing: {str(e)}") | |
return text | |
def download_and_extract_text(self, url, save_image=True): | |
try: | |
response = requests.get(url, timeout=10) | |
img = Image.open(BytesIO(response.content)) | |
# Save image if requested | |
if save_image: | |
img_filename = f"image_{datetime.now().strftime('%Y%m%d_%H%M%S')}.png" | |
img_path = os.path.join(self.base_path, 'images', img_filename) | |
img.save(img_path) | |
logger.info(f"Image saved: {img_path}") | |
text = pytesseract.image_to_string(img) | |
text = text.strip() | |
if text: | |
return self.process_text_with_groq(text) | |
except Exception as e: | |
logger.error(f"Error processing image from {url}: {str(e)}") | |
return None | |
def scrape_images(self): | |
"""Scrape images from Bing""" | |
chrome_options = webdriver.ChromeOptions() | |
chrome_options.add_argument('--headless') | |
chrome_options.add_argument('--no-sandbox') | |
chrome_options.add_argument('--disable-dev-shm-usage') | |
driver = webdriver.Chrome(options=chrome_options) | |
try: | |
search_query = "indian scam sms" | |
encoded_query = search_query.replace(' ', '+') | |
driver.get(f"https://www.bing.com/images/search?q={encoded_query}") | |
logger.info("Loading images...") | |
time.sleep(3) | |
# Scroll to load more images | |
for i in range(5): | |
driver.execute_script("window.scrollTo(0, document.body.scrollHeight);") | |
time.sleep(2) | |
logger.info(f"Scroll {i+1}/5 completed") | |
image_urls = [] | |
# Get both standard and high-res images | |
selectors = [".mimg", ".iusc"] | |
for selector in selectors: | |
elements = driver.find_elements(By.CSS_SELECTOR, selector) | |
for element in elements: | |
try: | |
if selector == ".mimg": | |
url = element.get_attribute('src') | |
else: | |
m = element.get_attribute('m') | |
if m: | |
m_json = json.loads(m) | |
url = m_json.get('murl') | |
else: | |
continue | |
if url and url.startswith('http') and url not in image_urls: | |
image_urls.append(url) | |
except Exception as e: | |
logger.error(f"Error getting URL from {selector}: {str(e)}") | |
return image_urls | |
finally: | |
driver.quit() | |
def update_scam123_csv(self, new_data: List[Dict]): | |
"""Update scam123.csv with new data""" | |
csv_logger = logging.getLogger('CSVOperations') | |
csv_path = self.base_path / 'scam123.csv' | |
try: | |
# Read existing data | |
existing_data = [] | |
if csv_path.exists(): | |
csv_logger.info(f"Reading existing data from {csv_path}") | |
with open(csv_path, 'r', encoding='utf-8') as f: | |
reader = csv.DictReader(f) | |
existing_data = list(reader) | |
# Add new data | |
updated_data = existing_data + new_data | |
# Remove duplicates based on text content | |
seen = set() | |
unique_data = [] | |
for item in updated_data: | |
if item['text'] not in seen: | |
seen.add(item['text']) | |
unique_data.append(item) | |
# Write back to CSV | |
csv_logger.info(f"Writing {len(unique_data)} entries to {csv_path}") | |
with open(csv_path, 'w', encoding='utf-8', newline='') as f: | |
writer = csv.DictWriter(f, fieldnames=['text', 'prediction', 'timestamp']) | |
writer.writeheader() | |
writer.writerows(unique_data) | |
csv_logger.info(f"Successfully updated {csv_path}") | |
return True | |
except Exception as e: | |
csv_logger.error(f"Error updating CSV: {str(e)}", exc_info=True) | |
return False | |
def process_and_save(self, image_urls): | |
"""Process images and save results with enhanced logging""" | |
api_logger = logging.getLogger('APIOperations') | |
image_logger = logging.getLogger('ImageProcessing') | |
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
image_texts = [] | |
batch_data = [] | |
for i, url in enumerate(image_urls, 1): | |
try: | |
image_logger.info(f"Processing image {i}/{len(image_urls)}") | |
image_logger.debug(f"URL: {url}") | |
text = self.download_and_extract_text(url) | |
if text: | |
# Send to prediction API | |
api_logger.info(f"Sending text to prediction API for image {i}") | |
try: | |
response = requests.post( | |
"https://varun324242-sssssss.hf.space/predict", | |
json={"message": text} | |
) | |
response.raise_for_status() | |
prediction = response.json().get("predicted_result", "unknown") | |
# Store results | |
result = { | |
'text': text, | |
'prediction': prediction, | |
'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S') | |
} | |
batch_data.append(result) | |
image_texts.append({ | |
'url': url, | |
'text': text, | |
'prediction': prediction | |
}) | |
# Update request counter | |
self.request_count += 1 | |
# Check if we should update CSV | |
time_since_last_update = datetime.now() - self.last_csv_update | |
if self.request_count >= 20 or time_since_last_update > timedelta(minutes=30): | |
api_logger.info("Triggering CSV update...") | |
if self.update_scam123_csv(batch_data): | |
self.request_count = 0 | |
self.last_csv_update = datetime.now() | |
batch_data = [] | |
api_logger.info("CSV update successful") | |
except Exception as e: | |
api_logger.error(f"API error for image {i}: {str(e)}") | |
continue | |
except Exception as e: | |
image_logger.error(f"Error processing image {i}: {str(e)}") | |
continue | |
# Save remaining batch data if any | |
if batch_data: | |
api_logger.info("Processing final batch update to CSV...") | |
self.update_scam123_csv(batch_data) | |
# Save regular files | |
self.save_results(image_urls, image_texts, timestamp) | |
return self.generate_report(image_urls, image_texts, timestamp) | |
def save_results(self, image_urls, image_texts, timestamp): | |
"""Save results to various file formats""" | |
try: | |
# Save URLs | |
url_path = self.base_path / 'texts' / f'scam_urls_{timestamp}.txt' | |
with open(url_path, 'w') as f: | |
for url in image_urls: | |
f.write(f"{url}\n") | |
# Save detailed text results | |
text_path = self.base_path / 'texts' / f'scam_texts_{timestamp}.txt' | |
with open(text_path, 'w', encoding='utf-8') as f: | |
for item in image_texts: | |
f.write(f"URL: {item['url']}\n") | |
f.write(f"Text: {item['text']}\n") | |
f.write(f"Prediction: {item['prediction']}\n") | |
f.write("-" * 80 + "\n") | |
# Save CSV report | |
csv_path = self.base_path / 'reports' / f'scam_report_{timestamp}.csv' | |
df = pd.DataFrame(image_texts) | |
df.to_csv(csv_path, index=False) | |
self.logger.info(f"All results saved successfully for timestamp: {timestamp}") | |
return True | |
except Exception as e: | |
self.logger.error(f"Error saving results: {str(e)}") | |
return False | |
def generate_report(self, image_urls, image_texts, timestamp): | |
"""Generate final report and return paths""" | |
url_path = self.base_path / 'texts' / f'scam_urls_{timestamp}.txt' | |
text_path = self.base_path / 'texts' / f'scam_texts_{timestamp}.txt' | |
csv_path = self.base_path / 'reports' / f'scam_report_{timestamp}.csv' | |
# Calculate statistics | |
stats = { | |
'total_urls': len(image_urls), | |
'processed': len(image_texts), | |
'scam_count': sum(1 for item in image_texts if item['prediction'] == 'scam'), | |
'ham_count': sum(1 for item in image_texts if item['prediction'] == 'ham') | |
} | |
# Send SMS report | |
message = ( | |
f"Scam Detector Run Report\n" | |
f"Time: {timestamp}\n" | |
f"Total URLs: {stats['total_urls']}\n" | |
f"Processed: {stats['processed']}\n" | |
f"Scams: {stats['scam_count']}\n" | |
f"Ham: {stats['ham_count']}\n" | |
f"Files saved locally" | |
) | |
self.sms_sender.send_sms( | |
to_number="8140030507", | |
message=message | |
) | |
return url_path, text_path, csv_path | |
def main(): | |
try: | |
logger.info("Starting the scam detection process...") | |
# Create timestamp for this run | |
run_timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
# Setup run-specific logging | |
log_path = os.path.join('logs', f'scam_run_{run_timestamp}.log') | |
run_log_handler = logging.FileHandler(log_path) | |
run_log_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')) | |
logger.addHandler(run_log_handler) | |
logger.info(f"Starting new detection run at {run_timestamp}") | |
GROQ_API_KEY = "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" | |
sms_sender = SMSSender() | |
detector = ScamDetector(groq_api_key=GROQ_API_KEY, sms_sender=sms_sender) | |
logger.info("Starting image scraping...") | |
image_urls = detector.scrape_images() | |
logger.info(f"Found {len(image_urls)} unique images") | |
url_path, text_path, csv_path = detector.process_and_save(image_urls) | |
logger.info(f"Results saved locally and SMS sent!") | |
logger.info("Detection run completed") | |
print("Detection run completed successfully.") | |
except Exception as e: | |
logger.error(f"An error occurred: {str(e)}", exc_info=True) | |
import traceback | |
traceback.print_exc() | |
if __name__ == "__main__": | |
main() |