Spaces:
Runtime error
Runtime error
import gradio as gr | |
import requests | |
import re | |
import logging | |
import json | |
from bs4 import BeautifulSoup | |
from selenium import webdriver | |
from selenium.webdriver.chrome.options import Options | |
from PIL import Image | |
from datetime import datetime # Keep this line | |
import io | |
import zipfile | |
import os | |
import tempfile | |
import nltk | |
try: | |
nltk.download('punkt') | |
nltk.download('stopwords') | |
nltk.download('wordnet') | |
nltk.download('averaged_perceptron_tagger') | |
except Exception as e: | |
logging.error(f"Error downloading NLTK data: {str(e)}") | |
# Configure logging | |
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
def sanitize_filename(filename): | |
return re.sub(r'[<>:"/\\|?*\n]+', '_', filename) | |
def validate_url(url): | |
"""Validate if the URL is properly formatted.""" | |
try: | |
result = urlparse(url) | |
return all([result.scheme, result.netloc]) | |
except: | |
return False | |
def get_latest_data(url): | |
"""Get the latest HTML content of a webpage.""" | |
try: | |
response = requests.get(url, timeout=10) | |
return response.text | |
except Exception as e: | |
logging.error(f"Error fetching latest data from {url}: {str(e)}") | |
return None | |
def compare_html(old_html, new_html): | |
"""Compare two HTML contents to detect changes.""" | |
if not old_html or not new_html: | |
return False | |
return old_html.strip() != new_html.strip() | |
def compare_screenshot(old_screenshot, new_screenshot): | |
"""Compare two screenshots to detect changes.""" | |
try: | |
if not old_screenshot or not new_screenshot: | |
return False | |
old_img = Image.open(io.BytesIO(old_screenshot)) | |
new_img = Image.open(io.BytesIO(new_screenshot)) | |
return not (old_img == new_img) | |
except Exception as e: | |
logging.error(f"Error comparing screenshots: {str(e)}") | |
return False | |
def alert_changes(url, change_type): | |
"""Log detected changes.""" | |
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
logging.warning(f"[{timestamp}] Changes detected at {url}: {change_type}") | |
return f"[{timestamp}] {change_type}" | |
def extract_links_from_page(url): | |
"""Extract all links from a webpage.""" | |
try: | |
response = requests.get(url, timeout=10) | |
soup = BeautifulSoup(response.text, 'html.parser') | |
links = [a['href'] for a in soup.find_all('a', href=True)] | |
return links | |
except Exception as e: | |
logging.error(f"Error extracting links from {url}: {str(e)}") | |
return [] | |
def take_screenshot(url): | |
"""Take a screenshot of a webpage.""" | |
try: | |
chrome_options = Options() | |
chrome_options.add_argument("--headless") | |
chrome_options.add_argument("--no-sandbox") | |
chrome_options.add_argument("--disable-dev-shm-usage") | |
chrome_options.add_argument("--window-size=1920,1080") | |
driver = webdriver.Chrome(options=chrome_options) | |
driver.get(url) | |
screenshot = driver.get_screenshot_as_png() | |
driver.quit() | |
image = Image.open(io.BytesIO(screenshot)) | |
max_size = (1024, 1024) | |
image.thumbnail(max_size, Image.LANCZOS) | |
img_byte_arr = io.BytesIO() | |
image.save(img_byte_arr, format='PNG') | |
return img_byte_arr.getvalue() | |
except Exception as e: | |
logging.error(f"Screenshot error for {url}: {str(e)}") | |
return None | |
def is_webpage(url): | |
"""Check if the URL points to a webpage (HTML).""" | |
try: | |
response = requests.head(url, timeout=10) | |
content_type = response.headers.get('Content-Type', '').lower() | |
return 'text/html' in content_type | |
except Exception as e: | |
logging.error(f"Error checking content type for {url}: {str(e)}") | |
return False | |
def crawl_url(url, depth, max_depth, visited=None): | |
"""Recursively crawl a URL up to a specified depth.""" | |
if visited is None: | |
visited = set() | |
if depth > max_depth or url in visited: | |
return [] | |
visited.add(url) | |
screenshots = [] | |
if is_webpage(url): | |
links = extract_links_from_page(url) | |
screenshot = take_screenshot(url) | |
if screenshot: | |
screenshots.append((url, screenshot)) | |
if depth < max_depth: | |
for link in links: | |
if not link.startswith(('http://', 'https://')): | |
link = f"https://{link}" | |
screenshots.extend(crawl_url(link, depth + 1, max_depth, visited)) | |
else: | |
logging.info(f"Skipping non-webpage content: {url}") | |
return screenshots | |
def process_urls(url_input, bulk_toggle, action_radio, max_urls, crawl_depth, mode='standard', progress=gr.Progress()): | |
"""Process URLs with crawl depth and change detection.""" | |
# Validate URLs first | |
urls = re.split(r'[,\n]+', url_input.strip()) | |
if bulk_toggle: | |
urls = [url.strip() for url in urls if url.strip()] | |
else: | |
urls = [url_input.strip()] | |
urls = urls[:int(max_urls)] | |
# Validate all URLs | |
invalid_urls = [url for url in urls if not validate_url(url)] | |
if invalid_urls: | |
if mode == 'chat': | |
return f"Invalid URLs detected: {', '.join(invalid_urls)}" | |
else: | |
return None, json.dumps({"error": f"Invalid URLs detected: {', '.join(invalid_urls)}"}, indent=2) | |
scraped_data = [] | |
screenshots = [] | |
changes_log = [] | |
# Initialize progress tracking | |
total_urls = len(urls) | |
progress(0) | |
# Directory to store scraped data | |
data_dir = 'scraped_data' | |
os.makedirs(data_dir, exist_ok=True) | |
# Process each URL | |
for idx, url in enumerate(urls): | |
if not url.startswith(('http://', 'https://')): | |
url = f'https://{url}' | |
# Sanitize URL for file naming | |
sanitized_url = sanitize_filename(url) | |
# Check for changes | |
old_html_path = os.path.join(data_dir, f"{sanitized_url}_html.txt") | |
old_screenshot_path = os.path.join(data_dir, f"{sanitized_url}_screenshot.png") | |
# Fetch latest data | |
latest_html = get_latest_data(url) | |
latest_screenshot = take_screenshot(url) | |
# Compare with previous data if available | |
if os.path.exists(old_html_path): | |
with open(old_html_path, 'r', encoding='utf-8') as f: | |
old_html = f.read() | |
if compare_html(old_html, latest_html): | |
changes_log.append(alert_changes(url, "HTML content has changed")) | |
if os.path.exists(old_screenshot_path): | |
with open(old_screenshot_path, 'rb') as f: | |
old_screenshot = f.read() | |
if latest_screenshot and compare_screenshot(old_screenshot, latest_screenshot): | |
changes_log.append(alert_changes(url, "Visual content has changed")) | |
# Store latest data | |
if latest_html: | |
with open(old_html_path, 'w', encoding='utf-8') as f: | |
f.write(latest_html) | |
if latest_screenshot: | |
with open(old_screenshot_path, 'wb') as f: | |
f.write(latest_screenshot) | |
# Prepare output data | |
if action_radio in ['Scrape data', 'Both']: | |
scraped_data.append({ | |
'url': url, | |
'content': latest_html, # Include full HTML content | |
'timestamp': datetime.now().isoformat(), | |
'changes_detected': changes_log | |
}) | |
if action_radio in ['Capture image', 'Both']: | |
crawled_screenshots = crawl_url(url, depth=1, max_depth=int(crawl_depth)) | |
screenshots.extend(crawled_screenshots) | |
# Update progress | |
progress((idx + 1) / total_urls) | |
if mode == 'chat': | |
return "\n".join(changes_log) | |
else: | |
# Create a temporary file to store the ZIP | |
with tempfile.NamedTemporaryFile(delete=False, suffix=".zip") as tmp_file: | |
with zipfile.ZipFile(tmp_file, 'w', zipfile.ZIP_DEFLATED) as zipf: | |
# Add screenshots to ZIP | |
for screenshot_url, screenshot_data in screenshots: | |
sanitized_screenshot_url = sanitize_filename(screenshot_url) | |
filename = f"{sanitized_screenshot_url}.png" | |
zipf.writestr(filename, screenshot_data) | |
# Add scraped data and changes log to ZIP | |
if scraped_data: | |
data_to_save = { | |
'scraped_data': scraped_data, | |
'changes_log': changes_log, | |
'timestamp': datetime.datetime.now().isoformat() | |
} | |
zipf.writestr('data.json', json.dumps(data_to_save, indent=2)) | |
# Get the path to the temporary file | |
zip_file_path = tmp_file.name | |
# Prepare display data | |
display_data = { | |
'total_scraped_urls': len(scraped_data), | |
'total_screenshots_taken': len(screenshots), | |
'changes_detected': changes_log, | |
'scraped_data': scraped_data # Include full scraped data | |
} | |
# Return the path to the temporary ZIP file and display data | |
return zip_file_path, json.dumps(display_data, indent=2) | |
def recognize_intent(instruction: str) -> str: | |
instruction = instruction.lower() | |
# General patterns for actions and data types | |
action_patterns = { | |
r'\b(find|extract|scrape)\s+(links|images|videos|texts|prices|product names|reviews)\b': 'extract_data', | |
r'\b(count)\s+(links|images|videos|products)\b': 'count_data', | |
r'\b(what is|get|fetch)\s+(channel name|subscriber count|viewers)\b': 'fetch_specific_data', | |
r'\b(monitor)\s+changes\b': 'monitor_changes', | |
} | |
for pattern, intent in action_patterns.items(): | |
if re.search(pattern, instruction): | |
return intent | |
return "unknown" | |
def extract_data_type(instruction: str) -> str: | |
instruction = instruction.lower() | |
data_types = { | |
r'\b(links|images|videos|texts|prices|product names|reviews)\b': 'links', | |
r'\b(links|images|videos|products)\b': 'images', | |
r'\b(channel name|subscriber count|viewers)\b': 'channel name', | |
} | |
for pattern, data_type in data_types.items(): | |
if re.search(pattern, instruction): | |
return data_type | |
return "unknown" | |
def format_output(data, output_format): | |
if output_format == "JSON": | |
return json.dumps(data, indent=2) | |
elif output_format == "Cleaned JSON": | |
# Implement data cleaning logic here | |
return json.dumps(data, indent=2) | |
else: | |
return str(data) | |
def generate_command(intent: str, url_input: str, data_type: str, output_format: str) -> str: | |
if intent == "extract_data": | |
data = extract_data(url_input, data_type) | |
return format_output(data, output_format) | |
elif intent == "count_data": | |
count = count_data(url_input, data_type) | |
return f"The number of {data_type} is {count}." | |
elif intent == "fetch_specific_data": | |
specific_data = fetch_specific_data(url_input, data_type) | |
return specific_data | |
elif intent == "monitor_changes": | |
changes_log = monitor_changes(url_input) | |
return changes_log | |
else: | |
return "Instruction not recognized. Please try again." | |
def extract_data(url, data_type): | |
try: | |
response = requests.get(url) | |
soup = BeautifulSoup(response.text, 'html.parser') | |
if data_type == "links": | |
return [a['href'] for a in soup.find_all('a', href=True)] | |
elif data_type == "images": | |
return [img['src'] for img in soup.find_all('img', src=True)] | |
# Add more data types as needed | |
else: | |
return [] | |
except Exception as e: | |
return f"Error extracting {data_type}: {str(e)}" | |
def count_data(url, data_type): | |
try: | |
response = requests.get(url) | |
soup = BeautifulSoup(response.text, 'html.parser') | |
if data_type == "links": | |
return len(soup.find_all('a', href=True)) | |
elif data_type == "images": | |
return len(soup.find_all('img', src=True)) | |
# Add more data types as needed | |
else: | |
return 0 | |
except Exception as e: | |
return f"Error counting {data_type}: {str(e)}" | |
def fetch_specific_data(url, data_type): | |
try: | |
# Implement specific data fetching logic here | |
# For demonstration, return a placeholder | |
return f"Fetched {data_type} from {url}" | |
except Exception as e: | |
return f"Error fetching {data_type}: {str(e)}" | |
def monitor_changes(url_input): | |
try: | |
# Implement change monitoring logic here | |
# For demonstration, return a placeholder | |
return f"Changes monitored for {url_input}" | |
except Exception as e: | |
return f"Error monitoring changes: {str(e)}" | |
def chat_based_scrape(instruction, url_input, output_format): | |
# Recognize intent and extract data type if applicable | |
intent = recognize_intent(instruction) | |
data_type = extract_data_type(instruction) | |
# Generate command based on the recognized intent | |
command_output = generate_command(intent, url_input, data_type, output_format) | |
return command_output | |
def create_interface(): | |
"""Create the Gradio interface.""" | |
with gr.Blocks(theme=gr.themes.Soft()) as demo: | |
gr.Markdown( | |
""" | |
# Smart Web Scraper with Change Detection | |
Monitor and capture changes in web content automatically. | |
""" | |
) | |
with gr.Tabs(): | |
with gr.Tab("URL Scrape/Screenshot"): | |
url_input = gr.Textbox(label="Enter URL(s)", value="https://example.com", placeholder="Enter single URL or multiple URLs separated by commas") | |
with gr.Row(): | |
bulk_toggle = gr.Checkbox(label="Bulk URLs", value=False) | |
action_radio = gr.Radio(["Scrape data", "Capture image", "Both"], label="Select Action", value="Both") | |
with gr.Row(): | |
max_urls = gr.Slider(minimum=1, maximum=1000, value=5, step=1, label="Max URLs to process") | |
crawl_depth = gr.Slider(minimum=1, maximum=5, value=1, step=1, label="Crawl Depth") | |
process_button = gr.Button("Process URLs", variant="primary") | |
with gr.Column(): | |
screenshot_zip = gr.File(label="Download Results") | |
scraped_data_output = gr.JSON(label="Results Summary") | |
process_button.click(fn=process_urls, inputs=[url_input, bulk_toggle, action_radio, max_urls, crawl_depth], outputs=[screenshot_zip, scraped_data_output], show_progress=True) | |
with gr.Tab("Chat-Based Scrape"): | |
instruction = gr.Textbox(label="Enter Instruction", placeholder="e.g., 'Scrape all links' or 'Extract all images'") | |
url_input = gr.Textbox(label="Enter URL", value="https://example.com", placeholder="Enter the target URL") | |
output_format = gr.Radio(["JSON", "Cleaned JSON", "Raw Data"], label="Output Format", value="JSON") | |
output = gr.Textbox(label="Output") | |
chat_button = gr.Button("Execute Instruction", variant="primary") | |
chat_button.click(fn=chat_based_scrape, inputs=[instruction, url_input, output_format], outputs=output) | |
gr.Markdown( | |
""" | |
### Features | |
- Bulk URL processing | |
- Screenshot capture | |
- Content change detection | |
- Recursive crawling | |
- Chat-based instructions | |
""" | |
) | |
return demo | |
if __name__ == "__main__": | |
demo = create_interface() # Call the function to create the interface | |
demo.launch() # Launch the Gradio app | |