acecalisto3's picture
Update 1app.py
0e0f8eb verified
import gradio as gr
import requests
import re
import logging
import json
from typing import Tuple, List, Dict, Union, Optional
from bs4 import BeautifulSoup
from urllib.parse import urlparse, urljoin
from nltk import word_tokenize
from nltk.corpus import stopwords
from nltk.stem import WordNetLemmatizer
from datetime import datetime
import io
import zipfile
import os
import tempfile
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
from PIL import Image
import base64
import asyncio
import yaml
from pathlib import Path
from tqdm import tqdm
import plotly.graph_objects as go
# Configure detailed logging
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('webscraper.log'),
logging.StreamHandler()
]
)
# Download necessary NLTK data
import nltk
try:
nltk.download('punkt', quiet=True)
nltk.download('stopwords', quiet=True)
nltk.download('wordnet', quiet=True)
nltk.download('averaged_perceptron_tagger', quiet=True)
except Exception as e:
logging.error(f"Error downloading NLTK data: {str(e)}")
# Configuration and logging setup
class Config:
DATA_DIR = Path('scraped_data')
LOGS_DIR = Path('logs')
MAX_RETRIES = 3
TIMEOUT = 30
@classmethod
def initialize(cls):
"""Initialize necessary directories and configurations"""
cls.DATA_DIR.mkdir(exist_ok=True)
cls.LOGS_DIR.mkdir(exist_ok=True)
# Setup logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(cls.LOGS_DIR / 'app.log'),
logging.StreamHandler()
]
)
return logging.getLogger(__name__)
logger = Config.initialize()
class WebDriverManager:
"""Manage WebDriver instances"""
@staticmethod
def get_driver() -> webdriver.Chrome:
options = Options()
options.add_argument('--headless')
options.add_argument('--no-sandbox')
options.add_argument('--disable-dev-shm-usage')
options.add_argument('--window-size=1920,1080')
return webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=options)
class DataExtractor:
"""Extract and process webpage content"""
def __init__(self):
self.soup = None
self.url = None
self.logger = logging.getLogger(__name__)
def set_page(self, html: str, url: str):
"""Set the page content for extraction"""
self.soup = BeautifulSoup(html, 'html.parser')
self.url = url
def extract_images(self) -> List[Dict]:
"""Extract image information from the page"""
images = []
try:
for img in self.soup.find_all('img'):
image_info = {
'src': urljoin(self.url, img.get('src', '')),
'alt': img.get('alt', ''),
'title': img.get('title', ''),
'dimensions': self._get_image_dimensions(img),
'file_type': self._get_file_type(img.get('src', ''))
}
images.append(image_info)
except Exception as e:
self.logger.error(f"Error extracting images: {str(e)}")
return images
def extract_links(self) -> List[Dict]:
"""Extract link information from the page"""
links = []
try:
for a in self.soup.find_all('a', href=True):
absolute_url = urljoin(self.url, a.get('href', ''))
link_info = {
'href': absolute_url,
'text': a.get_text(strip=True),
'title': a.get('title', ''),
'type': 'internal' if self.url in absolute_url else 'external',
'has_image': bool(a.find('img'))
}
links.append(link_info)
except Exception as e:
self.logger.error(f"Error extracting links: {str(e)}")
return links
def extract_text(self) -> List[Dict]:
"""Extract text content from the page"""
texts = []
try:
for text_element in self.soup.find_all(['p', 'h1', 'h2', 'h3', 'h4', 'h5', 'h6']):
text_info = {
'content': text_element.get_text(strip=True),
'source': text_element.name
}
if text_info['content']: # Only add non-empty text blocks
texts.append(text_info)
except Exception as e:
self.logger.error(f"Error extracting text: {str(e)}")
return texts
def _get_image_dimensions(self, img_tag) -> str:
"""Get image dimensions from tag attributes"""
width = img_tag.get('width', '')
height = img_tag.get('height', '')
if width and height:
return f"{width}x{height}"
return "unknown"
def _get_file_type(self, src: str) -> str:
"""Determine image file type from URL"""
if not src:
return "unknown"
ext = src.split('.')[-1].lower()
return ext if ext in ['jpg', 'jpeg', 'png', 'gif', 'webp'] else "unknown"
class QueryAnalyzer:
"""Analyze natural language queries"""
def __init__(self):
self.logger = logging.getLogger(__name__)
self.stop_words = set(stopwords.words('english'))
self.lemmatizer = WordNetLemmatizer()
self.logger.info("QueryAnalyzer initialized")
def parse_query(self, query: str) -> Dict[str, Union[str, int]]:
try:
self.logger.info(f"Parsing query: {query}")
tokens = word_tokenize(query.lower())
filtered_tokens = [self.lemmatizer.lemmatize(token) for token in tokens
if token.isalnum() and token not in self.stop_words]
return {
'target': self._identify_target(filtered_tokens),
'limit': self._identify_limit(filtered_tokens),
'filters': self._identify_filters(filtered_tokens),
'output': 'JSON' if 'json' in query.lower() else 'Formatted Text'
}
except Exception as e:
self.logger.error(f"Error parsing query: {str(e)}")
return {'target': 'unknown', 'limit': 0, 'filters': {}}
def _identify_target(self, tokens: List[str]) -> str:
target_map = {
'image': 'image',
'images': 'image',
'picture': 'image',
'link': 'link',
'links': 'link',
'text': 'text',
'content': 'text'
}
for token in tokens:
if token in target_map:
return target_map[token]
return 'unknown'
def _identify_limit(self, tokens: List[str]) -> int:
for token in tokens:
if token.isdigit():
return int(token)
return 0
def _identify_filters(self, tokens: List[str]) -> Dict[str, str]:
filters = {}
if 'external' in tokens:
filters['link_type'] = 'external'
elif 'internal' in tokens:
filters['link_type'] = 'internal'
if 'png' in tokens:
filters['file_type'] = 'png'
elif 'jpg' in tokens or 'jpeg' in tokens:
filters['file_type'] = 'jpg'
return filters
class ResponseFormatter:
"""Format scraped data based on user preferences"""
def __init__(self):
self.logger = logging.getLogger(__name__)
def format_data(self, data: List[Dict], query_info: Dict) -> str:
try:
if not data:
return "No data found for the specified query."
# Apply filters
filtered_data = self._apply_filters(data, query_info.get('filters', {}))
# Apply limit
if query_info.get('limit', 0) > 0:
filtered_data = filtered_data[:query_info['limit']]
if query_info['output'] == "JSON":
return json.dumps({
"metadata": {
"query": query_info,
"timestamp": datetime.now().isoformat(),
"results_count": len(filtered_data)
},
"results": filtered_data
}, indent=2)
return self._format_human_readable(filtered_data, query_info['target'])
except Exception as e:
self.logger.error(f"Formatting error: {str(e)}")
return f"Error formatting results: {str(e)}"
def _apply_filters(self, data: List[Dict], filters: Dict) -> List[Dict]:
filtered_data = data
if 'link_type' in filters:
filtered_data = [item for item in filtered_data
if item.get('type', '') == filters['link_type']]
if 'file_type' in filters:
filtered_data = [item for item in filtered_data
if item.get('file_type', '').lower() == filters['file_type']]
return filtered_data
def _format_human_readable(self, data: List[Dict], target: str) -> str:
formats = {
'image': self._format_images,
'link': self._format_links,
'text': self._format_texts
}
return formats.get(target, lambda x: "Unknown data type")(data)
def _format_images(self, images: List[Dict]) -> str:
return "\n\n".join(
f"Image {idx+1}:\n"
f"Source: {img['src']}\n"
f"Alt Text: {img['alt']}\n"
f"Dimensions: {img['dimensions']}\n"
f"Type: {img['file_type']}"
for idx, img in enumerate(images)
)
def _format_links(self, links: List[Dict]) -> str:
return "\n\n".join(
f"Link {idx+1}:\n"
f"URL: {link['href']}\n"
f"Text: {link['text']}\n"
f"Type: {link['type']}\n"
f"Contains Image: {'Yes' if link['has_image'] else 'No'}"
for idx, link in enumerate(links)
)
def _format_texts(self, texts: List[Dict]) -> str:
return "\n\n".join(
f"Text Block {idx+1} ({text['source'].upper()}):\n"
f"{text['content']}"
for idx, text in enumerate(texts)
)
class Scraper:
"""Core scraping functionality with improved error handling"""
def __init__(self):
self.logger = logging.getLogger(__name__)
self.session = requests.Session()
self.session.headers.update({
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
})
async def fetch_page(self, url: str) -> Optional[str]:
"""Fetch page content with retry mechanism"""
for attempt in range(Config.MAX_RETRIES):
try:
response = self.session.get(url, timeout=Config.TIMEOUT)
response.raise_for_status()
return response.text
except Exception as e:
self.logger.error(f"Attempt {attempt + 1} failed for {url}: {str(e)}")
if attempt == Config.MAX_RETRIES - 1:
return None
async def take_screenshot(self, url: str) -> Optional[bytes]:
"""Take a screenshot of a webpage with improved error handling."""
driver = None
try:
options = Options()
options.add_argument("--headless")
options.add_argument("--no-sandbox")
options.add_argument("--disable-dev-shm-usage")
options.add_argument("--window-size=1920,1080")
driver = webdriver.Chrome(options=options)
driver.get(url)
# Wait for page load
time.sleep(2)
# Take screenshot
screenshot = driver.get_screenshot_as_png()
# Process image
img = Image.open(io.BytesIO(screenshot))
img = img.convert('RGB') # Convert to RGB to ensure compatibility
# Save to bytes
img_byte_arr = io.BytesIO()
img.save(img_byte_arr, format='PNG', optimize=True)
return img_byte_arr.getvalue()
except Exception as e:
logging.error(f"Screenshot error for {url}: {str(e)}")
return None
finally:
if driver:
driver.quit()
class SmartWebScraper:
"""Smart web scraping with natural language processing capabilities"""
def __init__(self):
self.query_analyzer = QueryAnalyzer()
self.data_extractor = DataExtractor()
self.response_formatter = ResponseFormatter()
self.logger = logging.getLogger(__name__)
self.scraped_data = {}
def chat_based_scrape(self, instruction: str, url: str, output_format: str = "Formatted Text") -> str:
"""Process natural language instructions for web scraping"""
try:
if not instruction or not url:
return "Please provide both instruction and URL."
# Process the URL and instruction
raw_data = self.process_url(url, instruction) # Call the full scraping function
if isinstance(raw_data, str): # Check if the response is an error message
return raw_data # Return the error message directly
query_info = self.query_analyzer.parse_query(instruction)
query_info['output'] = output_format
if output_format == "JSON":
return json.dumps({
"status": "success",
"request": {
"url": url,
"instruction": instruction,
"timestamp": datetime.now().isoformat()
},
"data": raw_data,
"metadata": {
"source": url,
"elements_found": len(raw_data),
"content_type": type(raw_data).__name__
}
}, indent=2)
return self.response_formatter.format_data(raw_data, query_info)
except Exception as e:
error_msg = f"Error processing chat-based scrape: {str(e)}"
self.logger.error(error_msg)
return error_msg
def process_url(self, url: str, query: str) -> str:
"""Process URL based on query"""
try:
# Validate URL
if not self._validate_url(url):
return "Please provide a valid URL (including http:// or https://)."
# Fetch page
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
}
response = requests.get(url, headers=headers, timeout=10)
response.raise_for_status()
# Set page content and store in scraped_data
self.data_extractor.set_page(response.text, url)
self.logger.info(f"Scraping data from URL: {url}") # Log the URL being scraped
images = self.data_extractor.extract_images()
links = self.data_extractor.extract_links()
texts = self.data_extractor.extract_text()
self.scraped_data[url] = {
'images': images,
'links': links,
'texts': texts
'links': links,
'texts': texts
'images': self.data_extractor.extract_images(),
'links': self.data_extractor.extract_links(),
'texts': self.data_extractor.extract_text()
}
# Analyze query and extract data
query_info = self.query_analyzer.parse_query(query)
data = self._get_data_for_target(query_info['target'], url)
html_content = get_latest_data(url) # Fetch the HTML content
full_scraped_data = {
'url': url,
'images': self.scraped_data[url]['images'],
'links': self.scraped_data[url]['links'],
'texts': self.scraped_data[url]['texts'],
'metadata': {
'content_length': len(html_content),
'timestamp': datetime.now().isoformat()
}
}
return self.response_formatter.format_data(full_scraped_data, query_info)
except requests.exceptions.RequestException as e:
error_msg = f"Error fetching the webpage: {str(e)}"
self.logger.error(error_msg)
return error_msg
except Exception as e:
error_msg = f"An error occurred: {str(e)}"
self.logger.error(error_msg)
return error_msg
def _validate_url(self, url: str) -> bool:
"""Validate URL format"""
try:
result = urlparse(url)
return all([result.scheme, result.netloc])
except Exception as e:
self.logger.error(f"URL validation error: {str(e)}")
return False
def _get_data_for_target(self, target: str, url: str) -> List[Dict]:
"""Get specific data based on target type"""
if url not in self.scraped_data:
self.logger.warning(f"No data found for URL: {url}")
return []
if target == 'image':
return self.scraped_data[url]['images']
elif target == 'link':
return self.scraped_data[url]['links']
elif target == 'text':
return self.scraped_data[url]['texts']
else:
self.logger.warning(f"Unknown target type: {target}")
return []
class QueryAnalyzer:
def __init__(self):
self.logger = logging.getLogger(__name__)
self.stop_words = set(stopwords.words('english'))
self.lemmatizer = WordNetLemmatizer()
def parse_query(self, query: str) -> Dict[str, Union[str, int]]:
try:
tokens = word_tokenize(query.lower())
filtered_tokens = [
self.lemmatizer.lemmatize(token)
for token in tokens
if token.isalnum() and token not in self.stop_words
]
return {
'target': self._identify_target(filtered_tokens),
'limit': self._identify_limit(filtered_tokens),
'filters': self._identify_filters(filtered_tokens),
'output': 'JSON' if 'json' in query.lower() else 'Formatted Text'
}
except Exception as e:
self.logger.error(f"Error parsing query: {str(e)}")
return {'target': 'unknown', 'limit': 0, 'filters': {}}
def _identify_target(self, tokens: List[str]) -> str:
targets = {'image': 'image', 'link': 'link', 'text': 'text'}
for token in tokens:
if token in targets:
return targets[token]
return 'unknown'
def _identify_limit(self, tokens: List[str]) -> int:
for token in tokens:
if token.isdigit():
return int(token)
return 0
def _identify_filters(self, tokens: List[str]) -> Dict[str, str]:
filters = {}
if 'external' in tokens:
filters['link_type'] = 'external'
elif 'internal' in tokens:
filters['link_type'] = 'internal'
return filters
class ResponseFormatter:
def __init__(self):
self.logger = logging.getLogger(__name__)
def format_data(self, data: List[Dict], query_info: Dict) -> Union[str, dict]:
try:
if not data:
return {"status": "success", "data": [], "message": "No data found"} if query_info['output'] == "JSON" else "No data found"
response = {
"metadata": {
"target": query_info['target'],
"limit": query_info['limit'],
"filters": query_info['filters'],
"timestamp": datetime.now().isoformat()
},
"data": data[:query_info['limit']] if query_info['limit'] > 0 else data
}
return json.dumps(response, indent=2) if query_info['output'] == "JSON" else self._format_text(response)
except Exception as e:
error_msg = {"status": "error", "message": str(e)}
return json.dumps(error_msg, indent=2) if query_info['output'] == "JSON" else f"Error: {str(e)}"
def _format_text(self, response: dict) -> str:
return json.dumps(response, indent=2) # Fallback if text formatting fails
def sanitize_filename(filename):
"""Sanitizes a filename by removing invalid characters."""
return re.sub(r'[<>:"/\\|?*\n]+', '_', filename)
def validate_url(url):
"""Validate if the URL is properly formatted."""
try:
result = urlparse(url)
return all([result.scheme, result.netloc])
except Exception:
return False
def get_latest_data(url):
"""Get the latest HTML content of a webpage."""
try:
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
}
response = requests.get(url, headers=headers, timeout=10)
response.raise_for_status() # Raise an exception for bad status codes
return response.text
except requests.exceptions.RequestException as e:
logging.error(f"Error fetching latest data from {url}: {str(e)}")
return None
def take_screenshot(url):
"""Take a screenshot of a webpage."""
try:
chrome_options = Options()
chrome_options.add_argument("--headless")
chrome_options.add_argument("--no-sandbox")
chrome_options.add_argument("--disable-dev-shm-usage")
chrome_options.add_argument("--window-size=1920,1080")
driver = webdriver.Chrome(options=chrome_options)
driver.get(url)
screenshot = driver.get_screenshot_as_png()
driver.quit()
image = Image.open(io.BytesIO(screenshot))
max_size = (1024, 1024)
image.thumbnail(max_size, Image.LANCZOS)
img_byte_arr = io.BytesIO()
image.save(img_byte_arr, format='PNG')
return img_byte_arr.getvalue()
except Exception as e:
logging.error(f"Screenshot error for {url}: {str(e)}")
return None
def process_urls(url_input, bulk_toggle, action_radio, max_urls, crawl_depth):
"""Process URLs with crawl depth and change detection."""
try:
urls = re.split(r'[,\n]+', url_input.strip()) if bulk_toggle else [url_input]
urls = [url.strip() for url in urls if url.strip()]
urls = urls[:int(max_urls)]
# Validate URLs
invalid_urls = [url for url in urls if not validate_url(url)]
if invalid_urls:
return None, None, json.dumps({"error": f"Invalid URLs detected: {', '.join(invalid_urls)}"}, indent=2)
scraped_data = []
screenshots = []
changes_log = []
# Create temporary directory for screenshots
temp_dir = Path("temp_screenshots")
temp_dir.mkdir(exist_ok=True)
# Process each URL with progress tracking
total_urls = len(urls)
for idx, url in enumerate(urls):
if not url.startswith(('http://', 'https://')):
url = f'https://{url}'
sanitized_url = sanitize_filename(url)
# Take screenshot
if action_radio in ['Capture image', 'Both']:
screenshot = take_screenshot(url)
if screenshot:
screenshot_path = temp_dir / f"{sanitized_url}.png"
with open(screenshot_path, 'wb') as f:
f.write(screenshot)
screenshots.append((url, str(screenshot_path))) # Convert Path to string
logger.info(f"Screenshot saved: {screenshot_path}") # Log the saved screenshot path
# Scrape data
if action_radio in ['Scrape data', 'Both']:
html_content = get_latest_data(url)
if html_content:
scraped_data.append({
'url': url,
'content_length': len(html_content),
'timestamp': datetime.now().isoformat()
})
# Create a ZIP file for the screenshots
zip_file_path = temp_dir / "screenshots.zip"
with zipfile.ZipFile(zip_file_path, 'w') as zipf:
for screenshot in screenshots:
zipf.write(screenshot[1], arcname=Path(screenshot[1]).name) # Use string for writing
# Return the results
return str(zip_file_path), screenshots, scraped_data # Return structured data for JSON output
except Exception as e:
logging.error(f"Error in process_urls: {str(e)}")
return None, None, json.dumps({"error": str(e)}, indent=2)
return demo
def create_interface():
"""Create the Gradio interface."""
scraper = SmartWebScraper()
with gr.Blocks(theme=gr.themes.Soft()) as demo:
gr.Markdown(
"""
# 🌐 Enhanced Web Scraper with Change Detection and Chat
Monitor and capture changes in web content automatically. Use the chat interface to interact with scraped data.
"""
)
with gr.Tabs():
with gr.Tab("URL Scrape/Screenshot"):
url_input = gr.Textbox(
label="Enter URL(s)",
placeholder="Enter single URL or multiple URLs separated by commas"
)
with gr.Row():
bulk_toggle = gr.Checkbox(label="Bulk URLs", value=False)
action_radio = gr.Radio(
["Scrape data", "Capture image", "Both"],
label="Select Action",
value="Both"
)
with gr.Row():
max_urls = gr.Slider(
minimum=1,
maximum=20,
value=5,
step=1,
label="Max URLs to process"
)
crawl_depth = gr.Slider(
minimum=0,
maximum=3,
value=1,
step=1,
label="Crawl Depth (0 for no recursion)"
)
process_button = gr.Button("Process URLs", variant="primary")
with gr.Column():
# Add gallery for screenshot preview
gallery = gr.Gallery(
label="Screenshots Preview",
show_label=True,
elem_id="gallery",
columns=[3],
rows=[2],
height="auto",
object_fit="contain" # Add proper image scaling
)
# Download button and results
download_file = gr.File(label="Download Results (ZIP)")
scraped_data_output = gr.JSON(label="Results Summary")
process_button.click(
fn=process_urls,
inputs=[
url_input,
bulk_toggle,
action_radio,
max_urls,
crawl_depth
],
outputs=[
download_file,
gallery,
scraped_data_output
],
show_progress=True
)
with gr.Tab("Chat-Based Scrape"):
instruction = gr.Textbox(
label="Enter Instruction",
placeholder="e.g., 'Scrape all links' or 'Extract all images'"
)
chat_url_input = gr.Textbox(
label="Enter URL",
value="https://example.com",
placeholder="Enter the target URL"
)
output_format = gr.Radio(
["Formatted Text", "JSON"],
label="Output Format",
value="Formatted Text"
)
chat_output = gr.Textbox(label="Output")
chat_button = gr.Button("Execute Instruction", variant="primary")
chat_button.click (
fn=scraper.chat_based_scrape,
inputs=[instruction, chat_url_input, output_format],
outputs=chat_output
)
gr.Markdown(
"""
### Features
- Bulk URL processing
- Screenshot capture
- Content change detection
- Recursive crawling
- Chat-based instructions for interacting with scraped data
"""
)
return demo
if __name__ == "__main__":
demo = create_interface()
demo.launch(debug=True)