CEEMEESEEK / background_tasks.py
acecalisto3's picture
Create background_tasks.py
b5dac12 verified
import threading
import time
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
import hashlib
import sqlite3
import csv
import os
import logging
import traceback
def create_database():
try:
conn = sqlite3.connect('/home/user/app/scraped_data/culver/culvers_changes.db')
c = conn.cursor()
c.execute('''CREATE TABLE IF NOT EXISTS changes
(id INTEGER PRIMARY KEY AUTOINCREMENT,
date TEXT,
time TEXT,
url TEXT,
change TEXT)''')
conn.commit()
conn.close()
logging.info("Database created or already exists")
except Exception as e:
logging.error(f"Error creating database: {e}")
traceback.print_exc()
def insert_change(date, time, url, change):
try:
conn = sqlite3.connect('/home/user/app/scraped_data/culver/culvers_changes.db')
c = conn.cursor()
c.execute("INSERT INTO changes (date, time, url, change) VALUES (?, ?, ?, ?)",
(date, time, url, change))
conn.commit()
conn.close()
logging.info(f"Change inserted: {date} {time} {url}")
except Exception as e:
logging.error(f"Error inserting change: {e}")
traceback.print_exc()
def continuous_monitoring(storage_location, urls, scrape_interval, content_type):
create_database()
os.makedirs(os.path.dirname(storage_location), exist_ok=True)
previous_hashes = {url: "" for url in urls}
options = Options()
options.add_argument("--headless")
options.add_argument("--no-sandbox")
options.add_argument("--disable-dev-shm-usage")
service = Service('/usr/bin/chromedriver')
logging.info(f"Starting continuous monitoring for URLs: {urls}")
try:
with webdriver.Chrome(service=service, options=options) as driver:
while True:
for url in urls:
try:
logging.info(f"Accessing URL: {url}")
driver.get(url)
time.sleep(2) # Wait for the page to load
if content_type == "text":
current_content = driver.page_source
elif content_type == "media":
current_content = driver.find_elements_by_tag_name("img")
else:
current_content = driver.page_source
current_hash = hashlib.md5(str(current_content).encode('utf-8')).hexdigest()
if current_hash != previous_hashes[url]:
previous_hashes[url] = current_hash
date_time_str = time.strftime("%Y-%m-%d %H:%M:%S")
date, time_str = date_time_str.split()
change = "Content changed"
with open(storage_location, "a", newline='') as csvfile:
csv_toolkit = csv.DictWriter(csvfile, fieldnames=["date", "time", "url", "change"])
csv_toolkit.writerow({"date": date, "time": time_str, "url": url, "change": change})
insert_change(date, time_str, url, change)
logging.info(f"Change detected at {url} on {date_time_str}")
else:
logging.info(f"No change detected at {url}")
except Exception as e:
logging.error(f"Error accessing {url}: {e}")
traceback.print_exc()
logging.info(f"Sleeping for {scrape_interval} minutes")
time.sleep(scrape_interval * 60) # Check every scrape_interval minutes
except Exception as e:
logging.error(f"Error in continuous monitoring: {e}")
traceback.print_exc()
def start_background_monitoring(storage_location, urls, scrape_interval, content_type):
thread = threading.Thread(target=continuous_monitoring, args=(storage_location, urls, scrape_interval, content_type))
thread.daemon = True
thread.start()
logging.info("Background monitoring started")