acecalisto3's picture
Update 1app.py
4c6fb3f verified
raw
history blame
28 kB
import gradio as gr
import requests
import re
import logging
import json
from typing import Tuple, List, Dict, Union
from bs4 import BeautifulSoup
from urllib.parse import urlparse, urljoin
from nltk import word_tokenize
from nltk.corpus import stopwords
from nltk.stem import WordNetLemmatizer
from datetime import datetime
import io
import zipfile
import os
import tempfile
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from PIL import Image
# Configure detailed logging
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('webscraper.log'),
logging.StreamHandler()
]
)
# Download necessary NLTK data
import nltk
try:
nltk.download('punkt', quiet=True)
nltk.download('stopwords', quiet=True)
nltk.download('wordnet', quiet=True)
nltk.download('averaged_perceptron_tagger', quiet=True)
except Exception as e:
logging.error(f"Error downloading NLTK data: {str(e)}")
def sanitize_filename(filename):
"""Sanitizes a filename by removing invalid characters."""
return re.sub(r'[<>:"/\\|?*\n]+', '_', filename)
def validate_url(url):
"""Validate if the URL is properly formatted."""
try:
result = urlparse(url)
return all([result.scheme, result.netloc])
except Exception:
return False
def get_latest_data(url):
"""Get the latest HTML content of a webpage."""
try:
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
}
response = requests.get(url, headers=headers, timeout=10)
response.raise_for_status() # Raise an exception for bad status codes
return response.text
except requests.exceptions.RequestException as e:
logging.error(f"Error fetching latest data from {url}: {str(e)}")
return None
def compare_html(old_html, new_html):
"""Compare two HTML contents to detect changes."""
if not old_html or not new_html:
return False
return old_html.strip() != new_html.strip()
def compare_screenshot(old_screenshot, new_screenshot):
"""Compare two screenshots to detect changes."""
try:
if not old_screenshot or not new_screenshot:
return False
old_img = Image.open(io.BytesIO(old_screenshot))
new_img = Image.open(io.BytesIO(new_screenshot))
return not (old_img.tobytes() == new_img.tobytes())
except Exception as e:
logging.error(f"Error comparing screenshots: {str(e)}")
return False
def alert_changes(url, change_type):
"""Log detected changes."""
timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
logging.warning(f"[{timestamp}] Changes detected at {url}: {change_type}")
return f"[{timestamp}] {change_type}"
def extract_links_from_page(url):
"""Extract all links from a webpage."""
try:
response = requests.get(url, timeout=10)
response.raise_for_status()
soup = BeautifulSoup(response.text, 'html.parser')
links = [a.get('href') for a in soup.find_all('a', href=True)]
return links
except requests.exceptions.RequestException as e:
logging.error(f"Error extracting links from {url}: {str(e)}")
return []
def take_screenshot(url):
"""Take a screenshot of a webpage."""
try:
chrome_options = Options()
chrome_options.add_argument("--headless")
chrome_options.add_argument("--no-sandbox")
chrome_options.add_argument("--disable-dev-shm-usage")
chrome_options.add_argument("--window-size=1920,1080")
driver = webdriver.Chrome(options=chrome_options)
driver.get(url)
screenshot = driver.get_screenshot_as_png()
driver.quit()
image = Image.open(io.BytesIO(screenshot))
max_size = (1024, 1024)
image.thumbnail(max_size, Image.LANCZOS)
img_byte_arr = io.BytesIO()
image.save(img_byte_arr, format='PNG')
return img_byte_arr.getvalue()
except Exception as e:
logging.error(f"Screenshot error for {url}: {str(e)}")
return None
def is_webpage(url):
"""Check if the URL points to a webpage (HTML)."""
try:
response = requests.head(url, timeout=10)
response.raise_for_status()
content_type = response.headers.get('Content-Type', '').lower()
return 'text/html' in content_type
except requests.exceptions.RequestException as e:
logging.error(f"Error checking content type for {url}: {str(e)}")
return False
def crawl_url(url, depth, max_depth, visited=None):
"""Recursively crawl a URL up to a specified depth."""
if visited is None:
visited = set()
if depth > max_depth or url in visited or not validate_url(url):
return []
visited.add(url)
screenshots = []
if is_webpage(url):
links = extract_links_from_page(url)
screenshot = take_screenshot(url)
if screenshot:
screenshots.append((url, screenshot))
if depth < max_depth:
for link in links:
absolute_link = urljoin(url, link)
screenshots.extend(crawl_url(absolute_link, depth + 1, max_depth, visited))
else:
logging.info(f"Skipping non-webpage content: {url}")
return screenshots
def process_urls(url_input, bulk_toggle, action_radio, max_urls, crawl_depth, mode='standard', progress=gr.Progress()):
"""Process URLs with crawl depth and change detection."""
urls = re.split(r'[,\n]+', url_input.strip()) if bulk_toggle else [url_input]
urls = [url.strip() for url in urls if url.strip()]
urls = urls[:int(max_urls)]
# Validate all URLs
invalid_urls = [url for url in urls if not validate_url(url)]
if invalid_urls:
if mode == 'chat':
return f"Invalid URLs detected: {', '.join(invalid_urls)}"
else:
return None, json.dumps({"error": f"Invalid URLs detected: {', '.join(invalid_urls)}"}, indent=2)
scraped_data = []
screenshots = []
changes_log = []
# Initialize progress tracking
total_urls = len(urls)
progress(0, desc="Starting...")
# Directory to store scraped data
data_dir = 'scraped_data'
os.makedirs(data_dir, exist_ok=True)
# Process each URL
for idx, url in enumerate(urls):
progress((idx + 1) / total_urls, desc=f"Processing: {url}")
if not url.startswith(('http://', 'https://')):
url = f'https://{url}'
# Sanitize URL for file naming
sanitized_url = sanitize_filename(url)
# Check for changes
old_html_path = os.path.join(data_dir, f"{sanitized_url}_html.txt")
old_screenshot_path = os.path.join(data_dir, f"{sanitized_url}_screenshot.png")
# Fetch latest data
latest_html = get_latest_data(url)
latest_screenshot = take_screenshot(url)
# Compare with previous data if available
if os.path.exists(old_html_path):
with open(old_html_path, 'r', encoding='utf-8') as f:
old_html = f.read()
if compare_html(old_html, latest_html):
changes_log.append(alert_changes(url, "HTML content has changed"))
if os.path.exists(old_screenshot_path):
with open(old_screenshot_path, 'rb') as f:
old_screenshot = f.read()
if latest_screenshot and compare_screenshot(old_screenshot, latest_screenshot):
changes_log.append(alert_changes(url, "Visual content has changed"))
# Store latest data
if latest_html:
with open(old_html_path, 'w', encoding='utf-8') as f:
f.write(latest_html)
if latest_screenshot:
with open(old_screenshot_path, 'wb') as f:
f.write(latest_screenshot)
# Prepare output data
if action_radio in ['Scrape data', 'Both']:
scraped_data.append({
'url': url,
'content': latest_html,
'timestamp': datetime.datetime.now().isoformat(),
'changes_detected': changes_log
})
if action_radio in ['Capture image', 'Both']:
crawled_screenshots = crawl_url(url, depth=0, max_depth=int(crawl_depth))
screenshots.extend(crawled_screenshots)
if mode == 'chat':
return "\n".join(changes_log)
else:
# Create a temporary file to store the ZIP
with tempfile.NamedTemporaryFile(delete=False, suffix=".zip") as tmp_file:
with zipfile.ZipFile(tmp_file, 'w', zipfile.ZIP_DEFLATED) as zipf:
# Add screenshots to ZIP
for screenshot_url, screenshot_data in screenshots:
sanitized_screenshot_url = sanitize_filename(screenshot_url)
filename = f"{sanitized_screenshot_url}.png"
zipf.writestr(filename, screenshot_data)
# Add scraped data and changes log to ZIP
if scraped_data:
data_to_save = {
'scraped_data': scraped_data,
'changes_log': changes_log,
'timestamp': datetime.datetime.now().isoformat()
}
zipf.writestr('data.json', json.dumps(data_to_save, indent=2))
# Get the path to the temporary file
zip_file_path = tmp_file.name
# Prepare display data
display_data = {
'total_scraped_urls': len(scraped_data),
'total_screenshots_taken': len(screenshots),
'changes_detected': changes_log,
'scraped_data': scraped_data
}
# Return the path to the temporary ZIP file and display data
return zip_file_path, json.dumps(display_data, indent=2)
class DataExtractor:
def __init__(self):
self.soup = None
self.base_url = None
self.logger = logging.getLogger(__name__)
def set_page(self, html_content: str, url: str):
self.soup = BeautifulSoup(html_content, 'html.parser')
self.base_url = url
self.logger.info(f"Page parsed. Base URL set to: {self.base_url}")
def extract_images(self) -> List[Dict]:
if not self.soup:
self.logger.error("BeautifulSoup object not initialized")
return []
images = []
all_imgs = self.soup.find_all('img')
self.logger.info(f"Found {len(all_imgs)} raw image tags")
for img in all_imgs:
try:
src = img.get('src', '')
if src:
# Handle relative URLs
src = urljoin(self.base_url, src)
image_data = {
'src': src,
'alt': img.get('alt', 'No description'),
'title': img.get('title', 'No title'),
'dimensions': f"{img.get('width', 'unknown')}x{img.get('height', 'unknown')}",
'file_type': self._get_file_type(src)
}
images.append(image_data)
self.logger.debug(f"Processed image: {src[:100]}...")
except Exception as e:
self.logger.error(f"Error processing image: {str(e)}")
continue
self.logger.info(f"Successfully extracted {len(images)} valid images")
return images
def extract_links(self) -> List[Dict]:
if not self.soup:
self.logger.error("BeautifulSoup object not initialized")
return []
links = []
all_links = self.soup.find_all('a')
self.logger.info(f"Found {len(all_links)} raw link tags")
for a in all_links:
try:
href = a.get('href', '')
if href and not href.startswith(('#', 'javascript:', 'mailto:')):
# Handle relative URLs
href = urljoin(self.base_url, href)
links.append({
'href': href,
'text': a.get_text(strip=True) or '[No Text]',
'title': a.get('title', 'No title'),
'type': 'internal' if self._is_internal_link(href) else 'external',
'has_image': bool(a.find('img'))
})
self.logger.debug(f"Processed link: {href[:100]}...")
except Exception as e:
self.logger.error(f"Error processing link: {str(e)}")
continue
self.logger.info(f"Successfully extracted {len(links)} valid links")
return links
def extract_text(self) -> List[Dict]:
if not self.soup:
self.logger.error("BeautifulSoup object not initialized")
return []
texts = []
all_paragraphs = self.soup.find_all('p') # Extracting all paragraph tags
self.logger.info(f"Found {len(all_paragraphs)} raw paragraph tags")
for p in all_paragraphs:
try:
text_content = p.get_text(strip=True)
if text_content: # Only add non-empty paragraphs
texts.append({
'content': text_content,
'source': self.base_url
})
self.logger.debug(f"Processed text block: {text_content[:100]}...")
except Exception as e:
self.logger.error(f"Error processing text block: {str(e)}")
continue
self.logger.info(f"Successfully extracted {len(texts)} valid text blocks")
return texts
def _get_file_type(self, url: str) -> str:
try:
ext = url.split('.')[-1].lower()
return ext if ext in ['jpg', 'jpeg', 'png', 'gif', 'webp', 'svg'] else 'unknown'
except Exception:
return 'unknown'
def _is_internal_link(self, href: str) -> bool:
try:
return bool(self.base_url and (href.startswith('/') or self.base_url in href))
except Exception:
return False
class QueryAnalyzer:
def __init__(self):
self.logger = logging.getLogger(__name__)
self.stop_words = set(stopwords.words('english'))
self.lemmatizer = WordNetLemmatizer()
self.logger.info("QueryAnalyzer initialized")
def parse_query(self, query: str) -> Dict[str, Union[str, int]]:
try:
self.logger.info(f"Parsing query: {query}")
tokens = word_tokenize(query.lower())
filtered_tokens = [self.lemmatizer.lemmatize(token) for token in tokens if token.isalnum() and token not in self.stop_words]
query_info = {
'target': self._identify_target(filtered_tokens),
'limit': self._identify_limit(filtered_tokens),
'filters': self._identify_filters(filtered_tokens)
}
self.logger.info(f"Query parsed: {query_info}")
return query_info
except Exception as e:
self.logger.error(f"Error parsing query: {str(e)}")
return {'target': 'unknown', 'limit': 0, 'filters': {}}
def _identify_target(self, tokens: List[str]) -> str:
if 'image' in tokens:
return 'image'
elif 'link' in tokens:
return 'link'
elif 'text' in tokens:
return 'text'
else:
return 'unknown'
def _identify_limit(self, tokens: List[str]) -> int:
for token in tokens:
if token.isdigit():
return int(token)
return 0
def _identify_filters(self, tokens: List[str]) -> Dict[str, str]:
filters = {}
if 'external' in tokens:
filters['link_type'] = 'external'
elif 'internal' in tokens:
filters['link_type'] = 'internal'
return filters
class ResponseFormatter:
def __init__(self):
self.logger = logging.getLogger(__name__)
def format_data(self, data: List[Dict], query_info: Dict) -> str:
try:
if not data:
return "No data found for the specified query."
target = query_info['target']
limit = query_info['limit']
if limit > 0:
data = data[:limit]
if target == 'image':
return self._format_images(data)
elif target == 'link':
return self._format_links(data)
elif target == 'text':
return self._format_texts(data)
else:
return "Unknown target for formatting."
except Exception as e:
self.logger.error(f"Error formatting response: {str(e)}")
return f"An error occurred while formatting the response: {str(e)}"
def _format_images(self, images: List[Dict]) -> str:
if not images:
return "No images found."
formatted_images = []
for idx, img in enumerate(images, start=1):
formatted = f"Image {idx}:\n"
formatted += f" Source: {img['src']}\n"
formatted += f" Alt Text: {img['alt']}\n"
formatted += f" Title: {img['title']}\n"
formatted += f" Dimensions: {img['dimensions']}\n"
formatted += f" File Type: {img['file_type']}\n\n"
formatted_images.append(formatted)
return ''.join(formatted_images)
def _format_links(self, links: List[Dict]) -> str:
if not links:
return "No links found."
formatted_links = []
for idx, link in enumerate(links, start=1):
formatted = f"Link {idx}:\n"
formatted += f" URL: {link['href']}\n"
formatted += f" Text: {link['text']}\n"
formatted += f" Title: {link['title']}\n"
formatted += f" Type: {link['type']}\n"
formatted += f" Has Image: {'Yes' if link['has_image'] else 'No'}\n\n"
formatted_links.append(formatted)
return ''.join(formatted_links)
def _format_texts(self, texts: List[Dict]) -> str:
if not texts:
return "No text blocks found."
formatted_texts = []
for idx, text in enumerate(texts, start=1):
formatted = f"Text Block {idx}:\n"
formatted += f" Content: {text['content']}\n"
formatted += f" Source: {text['source']}\n\n"
formatted_texts.append(formatted)
return ''.join(formatted_texts)
class SmartWebScraper:
def __init__(self):
self.query_analyzer = QueryAnalyzer()
self.data_extractor = DataExtractor()
self.response_formatter = ResponseFormatter()
self.logger = logging.getLogger(__name__)
self.scraped_data = {} # Temporarily store scraped data
def process_url(self, url: str, query: str) -> str:
try:
# Validate URL
if not self._validate_url(url):
return "Please provide a valid URL (including http:// or https://)."
# Fetch page
self.logger.info(f"Fetching URL: {url}")
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
}
response = requests.get(url, headers=headers, timeout=10)
response.raise_for_status()
self.logger.info(f"Successfully fetched page. Status code: {response.status_code}")
# Set page content and store in scraped_data
self.data_extractor.set_page(response.text, url)
self.scraped_data[url] = {
'images': self.data_extractor.extract_images(),
'links': self.data_extractor.extract_links(),
'texts': self.data_extractor.extract_text()
}
# Analyze query
query_info = self.query_analyzer.parse_query(query)
self.logger.info(f"Query analysis: {query_info}")
# Extract requested data
data = self._get_data_for_target(query_info['target'], url)
self.logger.info(f"Extracted {len(data)} items for target: {query_info['target']}")
# Format response
formatted_response = self.response_formatter.format_data(data, query_info)
self.logger.info("Response formatted successfully")
return formatted_response
except requests.exceptions.RequestException as e:
error_msg = f"Error fetching the webpage: {str(e)}"
self.logger.error(error_msg)
return error_msg
except Exception as e:
error_msg = f"An error occurred: {str(e)}"
self.logger.error(error_msg)
return error_msg
def _validate_url(self, url: str) -> bool:
try:
result = urlparse(url)
return all([result.scheme, result.netloc])
except Exception as e:
self.logger.error(f"URL validation error: {str(e)}")
return False
def _get_data_for_target(self, target: str, url: str) -> List[Dict]:
if url not in self.scraped_data:
self.logger.warning(f"No data found for URL: {url}")
return []
if target == 'image':
return self.scraped_data[url]['images']
elif target == 'link':
return self.scraped_data[url]['links']
elif target == 'text':
return self.scraped_data[url]['texts']
else:
self.logger.warning(f"No extractor found for target: {target}")
return []
def recognize_intent(self, instruction: str) -> str:
"""Recognizes the intent of an instruction."""
instruction = instruction.lower()
# General patterns for actions and data types
action_patterns = {
r'\b(find|extract|scrape)\s+(links|images|texts)\b': 'extract_data',
r'\b(count)\s+(links|images|texts)\b': 'count_data',
}
for pattern, intent in action_patterns.items():
if re.search(pattern, instruction):
return intent
return "unknown"
def extract_data_type(self, instruction: str) -> str:
"""Extracts the data type from an instruction."""
instruction = instruction.lower()
data_types = {
r'\b(links)\b': 'link',
r'\b(images)\b': 'image',
r'\b(texts)\b': 'text',
}
for pattern, data_type in data_types.items():
if re.search(pattern, instruction):
return data_type
return "unknown"
def chat_based_scrape(self, instruction, url_input, output_format):
"""Handles chat-based instructions for web scraping."""
if not validate_url(url_input):
return "Invalid URL. Please enter a valid URL."
if url_input not in self.scraped_data:
self.process_url(url_input, "") # Fetch and store data if not already present
# Recognize intent and extract data type if applicable
intent = self.recognize_intent(instruction)
data_type = self.extract_data_type(instruction)
if intent == "unknown" or data_type == "unknown":
return "Instruction not recognized. Please try again."
# Extract data based on intent and data type
if intent == "extract_data":
data = self._get_data_for_target(data_type, url_input)
if output_format == "JSON":
return json.dumps(data, indent=2)
else:
query_info = {'target': data_type, 'limit': 0, 'filters': {}}
return self.response_formatter.format_data(data, query_info)
elif intent == "count_data":
data = self._get_data_for_target(data_type, url_input)
return f"The number of {data_type}s is {len(data)}."
else:
return "Instruction not recognized. Please try again."
def create_interface():
"""Create the Gradio interface."""
scraper = SmartWebScraper()
with gr.Blocks(theme=gr.themes.Soft()) as demo:
gr.Markdown(
"""
# 🌐 Enhanced Web Scraper with Change Detection and Chat
Monitor and capture changes in web content automatically. Use the chat interface to interact with scraped data.
"""
)
with gr.Tabs():
with gr.Tab("URL Scrape/Screenshot"):
url_input = gr.Textbox(
label="Enter URL(s)",
value="https://example.com",
placeholder="Enter single URL or multiple URLs separated by commas"
)
with gr.Row():
bulk_toggle = gr.Checkbox(label="Bulk URLs", value=False)
action_radio = gr.Radio(
["Scrape data", "Capture image", "Both"],
label="Select Action",
value="Both"
)
with gr.Row():
max_urls = gr.Slider(
minimum=1,
maximum=20,
value=5,
step=1,
label="Max URLs to process"
)
crawl_depth = gr.Slider(
minimum=0,
maximum=3,
value=1,
step=1,
label="Crawl Depth (0 for no recursion)"
)
process_button = gr.Button("Process URLs", variant="primary")
with gr.Column():
screenshot_zip = gr.File(label="Download Results")
scraped_data_output = gr.JSON(label="Results Summary")
process_button.click(
fn=process_urls,
inputs=[
url_input,
bulk_toggle,
action_radio,
max_urls,
crawl_depth
],
outputs=[
screenshot_zip,
scraped_data_output
],
show_progress=True
)
with gr.Tab("Chat-Based Scrape"):
instruction = gr.Textbox(
label="Enter Instruction",
placeholder="e.g., 'Scrape all links' or 'Extract all images'"
)
chat_url_input = gr.Textbox(
label="Enter URL",
value="https://example.com",
placeholder="Enter the target URL"
)
output_format = gr.Radio(
["Formatted Text", "JSON"],
label="Output Format",
value="Formatted Text"
)
chat_output = gr.Textbox(label="Output")
chat_button = gr.Button("Execute Instruction", variant="primary")
chat_button.click(
fn=scraper.chat_based_scrape,
inputs=[instruction, chat_url_input, output_format],
outputs=chat_output
)
gr.Markdown(
"""
### Features
- Bulk URL processing
- Screenshot capture
- Content change detection
- Recursive crawling
- Chat-based instructions for interacting with scraped data
"""
)
return demo
if __name__ == "__main__":
demo = create_interface()
demo.launch(debug=True)