J.A.R.V.I.S / scam_detector_local.py
varun324242's picture
Upload folder using huggingface_hub
fe2a0f2 verified
import subprocess
import sys
import os
import requests
from PIL import Image
import pytesseract
from io import BytesIO
import pandas as pd
import json
from groq import Groq
from twilio.rest import Client
import logging
from datetime import datetime
from selenium import webdriver
from selenium.webdriver.common.by import By
import time
from pathlib import Path
import csv
from datetime import datetime, timedelta
import time
from typing import List, Dict
# Configure logging
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('logs/sms_debug.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
def setup_directories():
"""Create necessary directories"""
directories = ['logs', 'data', 'data/images', 'data/texts', 'data/reports']
for directory in directories:
os.makedirs(directory, exist_ok=True)
logger.info("Directory structure created")
class SMSSender:
def __init__(self):
"""Initialize Twilio client with credentials"""
self.account_sid = "AC68e68b700bfe8ede9080e426042e6ccf"
self.auth_token = "27814cd39d313e35713c81e7b36da11f"
self.from_number = "+17322534518"
self.client = Client(self.account_sid, self.auth_token)
def send_sms(self, to_number, message):
"""Send SMS using Twilio"""
try:
logger.info(f"Attempting to send SMS to: {to_number}")
if not to_number.startswith('+'):
to_number = f"+91{to_number}"
message = self.client.messages.create(
body=message,
from_=self.from_number,
to=to_number
)
logger.info(f"SMS sent successfully! Message SID: {message.sid}")
return True
except Exception as e:
logger.error(f"Failed to send SMS: {str(e)}", exc_info=True)
return False
# Enhanced logging configuration
def setup_logging():
"""Setup enhanced logging with custom formatting"""
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
log_dir = Path('logs')
log_dir.mkdir(exist_ok=True)
# Create different log files for different purposes
log_files = {
'main': log_dir / f'scam_detector_{timestamp}.log',
'api': log_dir / f'api_calls_{timestamp}.log',
'image': log_dir / f'image_processing_{timestamp}.log',
'csv': log_dir / f'csv_operations_{timestamp}.log'
}
# Configure logging with custom formatter
formatter = logging.Formatter(
'%(asctime)s | %(levelname)-8s | %(name)-12s | %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
# Create handlers for each log file
handlers = {}
for name, log_file in log_files.items():
handler = logging.FileHandler(log_file)
handler.setFormatter(formatter)
handlers[name] = handler
# Configure root logger
logging.basicConfig(
level=logging.INFO,
handlers=[*handlers.values(), logging.StreamHandler()]
)
return handlers
class ScamDetector:
def __init__(self, groq_api_key, sms_sender):
self.groq_client = Groq(api_key=groq_api_key)
self.sms_sender = sms_sender
self.base_path = Path(os.getcwd()) / 'data'
self.request_count = 0
self.last_csv_update = datetime.now()
setup_directories()
self.handlers = setup_logging()
self.logger = logging.getLogger('ScamDetector')
def process_text_with_groq(self, text):
try:
prompt = f"""
Format the following extracted text from an SMS image.
Keep the original content intact but improve the formatting and remove any OCR artifacts:
{text}
"""
completion = self.groq_client.chat.completions.create(
model="llama3-8b-8192",
messages=[{"role": "user", "content": prompt}],
temperature=0.3,
max_tokens=1024,
top_p=1,
stream=False,
stop=None
)
return completion.choices[0].message.content.strip()
except Exception as e:
logger.error(f"Error in Groq processing: {str(e)}")
return text
def download_and_extract_text(self, url, save_image=True):
try:
response = requests.get(url, timeout=10)
img = Image.open(BytesIO(response.content))
# Save image if requested
if save_image:
img_filename = f"image_{datetime.now().strftime('%Y%m%d_%H%M%S')}.png"
img_path = os.path.join(self.base_path, 'images', img_filename)
img.save(img_path)
logger.info(f"Image saved: {img_path}")
text = pytesseract.image_to_string(img)
text = text.strip()
if text:
return self.process_text_with_groq(text)
except Exception as e:
logger.error(f"Error processing image from {url}: {str(e)}")
return None
def scrape_images(self):
"""Scrape images from Bing"""
chrome_options = webdriver.ChromeOptions()
chrome_options.add_argument('--headless')
chrome_options.add_argument('--no-sandbox')
chrome_options.add_argument('--disable-dev-shm-usage')
driver = webdriver.Chrome(options=chrome_options)
try:
search_query = "indian scam sms"
encoded_query = search_query.replace(' ', '+')
driver.get(f"https://www.bing.com/images/search?q={encoded_query}")
logger.info("Loading images...")
time.sleep(3)
# Scroll to load more images
for i in range(5):
driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
time.sleep(2)
logger.info(f"Scroll {i+1}/5 completed")
image_urls = []
# Get both standard and high-res images
selectors = [".mimg", ".iusc"]
for selector in selectors:
elements = driver.find_elements(By.CSS_SELECTOR, selector)
for element in elements:
try:
if selector == ".mimg":
url = element.get_attribute('src')
else:
m = element.get_attribute('m')
if m:
m_json = json.loads(m)
url = m_json.get('murl')
else:
continue
if url and url.startswith('http') and url not in image_urls:
image_urls.append(url)
except Exception as e:
logger.error(f"Error getting URL from {selector}: {str(e)}")
return image_urls
finally:
driver.quit()
def update_scam123_csv(self, new_data: List[Dict]):
"""Update scam123.csv with new data"""
csv_logger = logging.getLogger('CSVOperations')
csv_path = self.base_path / 'scam123.csv'
try:
# Read existing data
existing_data = []
if csv_path.exists():
csv_logger.info(f"Reading existing data from {csv_path}")
with open(csv_path, 'r', encoding='utf-8') as f:
reader = csv.DictReader(f)
existing_data = list(reader)
# Add new data
updated_data = existing_data + new_data
# Remove duplicates based on text content
seen = set()
unique_data = []
for item in updated_data:
if item['text'] not in seen:
seen.add(item['text'])
unique_data.append(item)
# Write back to CSV
csv_logger.info(f"Writing {len(unique_data)} entries to {csv_path}")
with open(csv_path, 'w', encoding='utf-8', newline='') as f:
writer = csv.DictWriter(f, fieldnames=['text', 'prediction', 'timestamp'])
writer.writeheader()
writer.writerows(unique_data)
csv_logger.info(f"Successfully updated {csv_path}")
return True
except Exception as e:
csv_logger.error(f"Error updating CSV: {str(e)}", exc_info=True)
return False
def process_and_save(self, image_urls):
"""Process images and save results with enhanced logging"""
api_logger = logging.getLogger('APIOperations')
image_logger = logging.getLogger('ImageProcessing')
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
image_texts = []
batch_data = []
for i, url in enumerate(image_urls, 1):
try:
image_logger.info(f"Processing image {i}/{len(image_urls)}")
image_logger.debug(f"URL: {url}")
text = self.download_and_extract_text(url)
if text:
# Send to prediction API
api_logger.info(f"Sending text to prediction API for image {i}")
try:
response = requests.post(
"https://varun324242-sssssss.hf.space/predict",
json={"message": text}
)
response.raise_for_status()
prediction = response.json().get("predicted_result", "unknown")
# Store results
result = {
'text': text,
'prediction': prediction,
'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
}
batch_data.append(result)
image_texts.append({
'url': url,
'text': text,
'prediction': prediction
})
# Update request counter
self.request_count += 1
# Check if we should update CSV
time_since_last_update = datetime.now() - self.last_csv_update
if self.request_count >= 20 or time_since_last_update > timedelta(minutes=30):
api_logger.info("Triggering CSV update...")
if self.update_scam123_csv(batch_data):
self.request_count = 0
self.last_csv_update = datetime.now()
batch_data = []
api_logger.info("CSV update successful")
except Exception as e:
api_logger.error(f"API error for image {i}: {str(e)}")
continue
except Exception as e:
image_logger.error(f"Error processing image {i}: {str(e)}")
continue
# Save remaining batch data if any
if batch_data:
api_logger.info("Processing final batch update to CSV...")
self.update_scam123_csv(batch_data)
# Save regular files
self.save_results(image_urls, image_texts, timestamp)
return self.generate_report(image_urls, image_texts, timestamp)
def save_results(self, image_urls, image_texts, timestamp):
"""Save results to various file formats"""
try:
# Save URLs
url_path = self.base_path / 'texts' / f'scam_urls_{timestamp}.txt'
with open(url_path, 'w') as f:
for url in image_urls:
f.write(f"{url}\n")
# Save detailed text results
text_path = self.base_path / 'texts' / f'scam_texts_{timestamp}.txt'
with open(text_path, 'w', encoding='utf-8') as f:
for item in image_texts:
f.write(f"URL: {item['url']}\n")
f.write(f"Text: {item['text']}\n")
f.write(f"Prediction: {item['prediction']}\n")
f.write("-" * 80 + "\n")
# Save CSV report
csv_path = self.base_path / 'reports' / f'scam_report_{timestamp}.csv'
df = pd.DataFrame(image_texts)
df.to_csv(csv_path, index=False)
self.logger.info(f"All results saved successfully for timestamp: {timestamp}")
return True
except Exception as e:
self.logger.error(f"Error saving results: {str(e)}")
return False
def generate_report(self, image_urls, image_texts, timestamp):
"""Generate final report and return paths"""
url_path = self.base_path / 'texts' / f'scam_urls_{timestamp}.txt'
text_path = self.base_path / 'texts' / f'scam_texts_{timestamp}.txt'
csv_path = self.base_path / 'reports' / f'scam_report_{timestamp}.csv'
# Calculate statistics
stats = {
'total_urls': len(image_urls),
'processed': len(image_texts),
'scam_count': sum(1 for item in image_texts if item['prediction'] == 'scam'),
'ham_count': sum(1 for item in image_texts if item['prediction'] == 'ham')
}
# Send SMS report
message = (
f"Scam Detector Run Report\n"
f"Time: {timestamp}\n"
f"Total URLs: {stats['total_urls']}\n"
f"Processed: {stats['processed']}\n"
f"Scams: {stats['scam_count']}\n"
f"Ham: {stats['ham_count']}\n"
f"Files saved locally"
)
self.sms_sender.send_sms(
to_number="8140030507",
message=message
)
return url_path, text_path, csv_path
def main():
try:
logger.info("Starting the scam detection process...")
# Create timestamp for this run
run_timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
# Setup run-specific logging
log_path = os.path.join('logs', f'scam_run_{run_timestamp}.log')
run_log_handler = logging.FileHandler(log_path)
run_log_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))
logger.addHandler(run_log_handler)
logger.info(f"Starting new detection run at {run_timestamp}")
GROQ_API_KEY = "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"
sms_sender = SMSSender()
detector = ScamDetector(groq_api_key=GROQ_API_KEY, sms_sender=sms_sender)
logger.info("Starting image scraping...")
image_urls = detector.scrape_images()
logger.info(f"Found {len(image_urls)} unique images")
url_path, text_path, csv_path = detector.process_and_save(image_urls)
logger.info(f"Results saved locally and SMS sent!")
logger.info("Detection run completed")
print("Detection run completed successfully.")
except Exception as e:
logger.error(f"An error occurred: {str(e)}", exc_info=True)
import traceback
traceback.print_exc()
if __name__ == "__main__":
main()