import gradio as gr import requests import re import logging import json from typing import Tuple, List, Dict, Union, Optional from bs4 import BeautifulSoup from urllib.parse import urlparse, urljoin from nltk import word_tokenize from nltk.corpus import stopwords from nltk.stem import WordNetLemmatizer from datetime import datetime import io import zipfile import os import tempfile from selenium import webdriver from selenium.webdriver.chrome.service import Service from selenium.webdriver.chrome.options import Options from PIL import Image import base64 import asyncio import yaml from pathlib import Path from tqdm import tqdm import plotly.graph_objects as go # Configure detailed logging logging.basicConfig( level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler('webscraper.log'), logging.StreamHandler() ] ) # Download necessary NLTK data import nltk try: nltk.download('punkt', quiet=True) nltk.download('stopwords', quiet=True) nltk.download('wordnet', quiet=True) nltk.download('averaged_perceptron_tagger', quiet=True) except Exception as e: logging.error(f"Error downloading NLTK data: {str(e)}") # Configuration and logging setup class Config: DATA_DIR = Path('scraped_data') LOGS_DIR = Path('logs') MAX_RETRIES = 3 TIMEOUT = 30 @classmethod def initialize(cls): """Initialize necessary directories and configurations""" cls.DATA_DIR.mkdir(exist_ok=True) cls.LOGS_DIR.mkdir(exist_ok=True) # Setup logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler(cls.LOGS_DIR / 'app.log'), logging.StreamHandler() ] ) return logging.getLogger(__name__) logger = Config.initialize() class WebDriverManager: """Manage WebDriver instances""" @staticmethod def get_driver() -> webdriver.Chrome: options = Options() options.add_argument('--headless') options.add_argument('--no-sandbox') options.add_argument('--disable-dev-shm-usage') options.add_argument('--window-size=1920,1080') return webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=options) class DataExtractor: """Extract and process webpage content""" def __init__(self): self.soup = None self.url = None self.logger = logging.getLogger(__name__) def set_page(self, html: str, url: str): """Set the page content for extraction""" self.soup = BeautifulSoup(html, 'html.parser') self.url = url def extract_images(self) -> List[Dict]: """Extract image information from the page""" images = [] try: for img in self.soup.find_all('img'): image_info = { 'src': urljoin(self.url, img.get('src', '')), 'alt': img.get('alt', ''), 'title': img.get('title', ''), 'dimensions': self._get_image_dimensions(img), 'file_type': self._get_file_type(img.get('src', '')) } images.append(image_info) except Exception as e: self.logger.error(f"Error extracting images: {str(e)}") return images def extract_links(self) -> List[Dict]: """Extract link information from the page""" links = [] try: for a in self.soup.find_all('a', href=True): absolute_url = urljoin(self.url, a.get('href', '')) link_info = { 'href': absolute_url, 'text': a.get_text(strip=True), 'title': a.get('title', ''), 'type': 'internal' if self.url in absolute_url else 'external', 'has_image': bool(a.find('img')) } links.append(link_info) except Exception as e: self.logger.error(f"Error extracting links: {str(e)}") return links def extract_text(self) -> List[Dict]: """Extract text content from the page""" texts = [] try: for text_element in self.soup.find_all(['p', 'h1', 'h2', 'h3', 'h4', 'h5', 'h6']): text_info = { 'content': text_element.get_text(strip=True), 'source': text_element.name } if text_info['content']: # Only add non-empty text blocks texts.append(text_info) except Exception as e: self.logger.error(f"Error extracting text: {str(e)}") return texts def _get_image_dimensions(self, img_tag) -> str: """Get image dimensions from tag attributes""" width = img_tag.get('width', '') height = img_tag.get('height', '') if width and height: return f"{width}x{height}" return "unknown" def _get_file_type(self, src: str) -> str: """Determine image file type from URL""" if not src: return "unknown" ext = src.split('.')[-1].lower() return ext if ext in ['jpg', 'jpeg', 'png', 'gif', 'webp'] else "unknown" class QueryAnalyzer: """Analyze natural language queries""" def __init__(self): self.logger = logging.getLogger(__name__) self.stop_words = set(stopwords.words('english')) self.lemmatizer = WordNetLemmatizer() self.logger.info("QueryAnalyzer initialized") def parse_query(self, query: str) -> Dict[str, Union[str, int]]: try: self.logger.info(f"Parsing query: {query}") tokens = word_tokenize(query.lower()) filtered_tokens = [self.lemmatizer.lemmatize(token) for token in tokens if token.isalnum() and token not in self.stop_words] return { 'target': self._identify_target(filtered_tokens), 'limit': self._identify_limit(filtered_tokens), 'filters': self._identify_filters(filtered_tokens), 'output': 'JSON' if 'json' in query.lower() else 'Formatted Text' } except Exception as e: self.logger.error(f"Error parsing query: {str(e)}") return {'target': 'unknown', 'limit': 0, 'filters': {}} def _identify_target(self, tokens: List[str]) -> str: target_map = { 'image': 'image', 'images': 'image', 'picture': 'image', 'link': 'link', 'links': 'link', 'text': 'text', 'content': 'text' } for token in tokens: if token in target_map: return target_map[token] return 'unknown' def _identify_limit(self, tokens: List[str]) -> int: for token in tokens: if token.isdigit(): return int(token) return 0 def _identify_filters(self, tokens: List[str]) -> Dict[str, str]: filters = {} if 'external' in tokens: filters['link_type'] = 'external' elif 'internal' in tokens: filters['link_type'] = 'internal' if 'png' in tokens: filters['file_type'] = 'png' elif 'jpg' in tokens or 'jpeg' in tokens: filters['file_type'] = 'jpg' return filters class ResponseFormatter: """Format scraped data based on user preferences""" def __init__(self): self.logger = logging.getLogger(__name__) def format_data(self, data: List[Dict], query_info: Dict) -> str: try: if not data: return "No data found for the specified query." # Apply filters filtered_data = self._apply_filters(data, query_info.get('filters', {})) # Apply limit if query_info.get('limit', 0) > 0: filtered_data = filtered_data[:query_info['limit']] if query_info['output'] == "JSON": return json.dumps({ "metadata": { "query": query_info, "timestamp": datetime.now().isoformat(), "results_count": len(filtered_data) }, "results": filtered_data }, indent=2) return self._format_human_readable(filtered_data, query_info['target']) except Exception as e: self.logger.error(f"Formatting error: {str(e)}") return f"Error formatting results: {str(e)}" def _apply_filters(self, data: List[Dict], filters: Dict) -> List[Dict]: filtered_data = data if 'link_type' in filters: filtered_data = [item for item in filtered_data if item.get('type', '') == filters['link_type']] if 'file_type' in filters: filtered_data = [item for item in filtered_data if item.get('file_type', '').lower() == filters['file_type']] return filtered_data def _format_human_readable(self, data: List[Dict], target: str) -> str: formats = { 'image': self._format_images, 'link': self._format_links, 'text': self._format_texts } return formats.get(target, lambda x: "Unknown data type")(data) def _format_images(self, images: List[Dict]) -> str: return "\n\n".join( f"Image {idx+1}:\n" f"Source: {img['src']}\n" f"Alt Text: {img['alt']}\n" f"Dimensions: {img['dimensions']}\n" f"Type: {img['file_type']}" for idx, img in enumerate(images) ) def _format_links(self, links: List[Dict]) -> str: return "\n\n".join( f"Link {idx+1}:\n" f"URL: {link['href']}\n" f"Text: {link['text']}\n" f"Type: {link['type']}\n" f"Contains Image: {'Yes' if link['has_image'] else 'No'}" for idx, link in enumerate(links) ) def _format_texts(self, texts: List[Dict]) -> str: return "\n\n".join( f"Text Block {idx+1} ({text['source'].upper()}):\n" f"{text['content']}" for idx, text in enumerate(texts) ) class Scraper: """Core scraping functionality with improved error handling""" def __init__(self): self.logger = logging.getLogger(__name__) self.session = requests.Session() self.session.headers.update({ 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36' }) async def fetch_page(self, url: str) -> Optional[str]: """Fetch page content with retry mechanism""" for attempt in range(Config.MAX_RETRIES): try: response = self.session.get(url, timeout=Config.TIMEOUT) response.raise_for_status() return response.text except Exception as e: self.logger.error(f"Attempt {attempt + 1} failed for {url}: {str(e)}") if attempt == Config.MAX_RETRIES - 1: return None async def take_screenshot(self, url: str) -> Optional[bytes]: """Take a screenshot of a webpage with improved error handling.""" driver = None try: options = Options() options.add_argument("--headless") options.add_argument("--no-sandbox") options.add_argument("--disable-dev-shm-usage") options.add_argument("--window-size=1920,1080") driver = webdriver.Chrome(options=options) driver.get(url) # Wait for page load time.sleep(2) # Take screenshot screenshot = driver.get_screenshot_as_png() # Process image img = Image.open(io.BytesIO(screenshot)) img = img.convert('RGB') # Convert to RGB to ensure compatibility # Save to bytes img_byte_arr = io.BytesIO() img.save(img_byte_arr, format='PNG', optimize=True) return img_byte_arr.getvalue() except Exception as e: logging.error(f"Screenshot error for {url}: {str(e)}") return None finally: if driver: driver.quit() class SmartWebScraper: """Smart web scraping with natural language processing capabilities""" def __init__(self): self.query_analyzer = QueryAnalyzer() self.data_extractor = DataExtractor() self.response_formatter = ResponseFormatter() self.logger = logging.getLogger(__name__) self.scraped_data = {} def chat_based_scrape(self, instruction: str, url: str, output_format: str = "Formatted Text") -> str: """Process natural language instructions for web scraping""" try: if not instruction or not url: return "Please provide both instruction and URL." # Process the URL and instruction raw_data = self.process_url(url, instruction) # Call the full scraping function if isinstance(raw_data, str): # Check if the response is an error message return raw_data # Return the error message directly query_info = self.query_analyzer.parse_query(instruction) query_info['output'] = output_format if output_format == "JSON": return json.dumps({ "status": "success", "request": { "url": url, "instruction": instruction, "timestamp": datetime.now().isoformat() }, "data": raw_data, "metadata": { "source": url, "elements_found": len(raw_data), "content_type": type(raw_data).__name__ } }, indent=2) return self.response_formatter.format_data(raw_data, query_info) except Exception as e: error_msg = f"Error processing chat-based scrape: {str(e)}" self.logger.error(error_msg) return error_msg def process_url(self, url: str, query: str) -> str: """Process URL based on query""" try: # Validate URL if not self._validate_url(url): return "Please provide a valid URL (including http:// or https://)." # Fetch page headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36' } response = requests.get(url, headers=headers, timeout=10) response.raise_for_status() # Set page content and store in scraped_data self.data_extractor.set_page(response.text, url) self.logger.info(f"Scraping data from URL: {url}") # Log the URL being scraped images = self.data_extractor.extract_images() links = self.data_extractor.extract_links() texts = self.data_extractor.extract_text() self.scraped_data[url] = { 'images': images, 'links': links, 'texts': texts 'links': links, 'texts': texts 'images': self.data_extractor.extract_images(), 'links': self.data_extractor.extract_links(), 'texts': self.data_extractor.extract_text() } # Analyze query and extract data query_info = self.query_analyzer.parse_query(query) data = self._get_data_for_target(query_info['target'], url) html_content = get_latest_data(url) # Fetch the HTML content full_scraped_data = { 'url': url, 'images': self.scraped_data[url]['images'], 'links': self.scraped_data[url]['links'], 'texts': self.scraped_data[url]['texts'], 'metadata': { 'content_length': len(html_content), 'timestamp': datetime.now().isoformat() } } return self.response_formatter.format_data(full_scraped_data, query_info) except requests.exceptions.RequestException as e: error_msg = f"Error fetching the webpage: {str(e)}" self.logger.error(error_msg) return error_msg except Exception as e: error_msg = f"An error occurred: {str(e)}" self.logger.error(error_msg) return error_msg def _validate_url(self, url: str) -> bool: """Validate URL format""" try: result = urlparse(url) return all([result.scheme, result.netloc]) except Exception as e: self.logger.error(f"URL validation error: {str(e)}") return False def _get_data_for_target(self, target: str, url: str) -> List[Dict]: """Get specific data based on target type""" if url not in self.scraped_data: self.logger.warning(f"No data found for URL: {url}") return [] if target == 'image': return self.scraped_data[url]['images'] elif target == 'link': return self.scraped_data[url]['links'] elif target == 'text': return self.scraped_data[url]['texts'] else: self.logger.warning(f"Unknown target type: {target}") return [] class QueryAnalyzer: def __init__(self): self.logger = logging.getLogger(__name__) self.stop_words = set(stopwords.words('english')) self.lemmatizer = WordNetLemmatizer() def parse_query(self, query: str) -> Dict[str, Union[str, int]]: try: tokens = word_tokenize(query.lower()) filtered_tokens = [ self.lemmatizer.lemmatize(token) for token in tokens if token.isalnum() and token not in self.stop_words ] return { 'target': self._identify_target(filtered_tokens), 'limit': self._identify_limit(filtered_tokens), 'filters': self._identify_filters(filtered_tokens), 'output': 'JSON' if 'json' in query.lower() else 'Formatted Text' } except Exception as e: self.logger.error(f"Error parsing query: {str(e)}") return {'target': 'unknown', 'limit': 0, 'filters': {}} def _identify_target(self, tokens: List[str]) -> str: targets = {'image': 'image', 'link': 'link', 'text': 'text'} for token in tokens: if token in targets: return targets[token] return 'unknown' def _identify_limit(self, tokens: List[str]) -> int: for token in tokens: if token.isdigit(): return int(token) return 0 def _identify_filters(self, tokens: List[str]) -> Dict[str, str]: filters = {} if 'external' in tokens: filters['link_type'] = 'external' elif 'internal' in tokens: filters['link_type'] = 'internal' return filters class ResponseFormatter: def __init__(self): self.logger = logging.getLogger(__name__) def format_data(self, data: List[Dict], query_info: Dict) -> Union[str, dict]: try: if not data: return {"status": "success", "data": [], "message": "No data found"} if query_info['output'] == "JSON" else "No data found" response = { "metadata": { "target": query_info['target'], "limit": query_info['limit'], "filters": query_info['filters'], "timestamp": datetime.now().isoformat() }, "data": data[:query_info['limit']] if query_info['limit'] > 0 else data } return json.dumps(response, indent=2) if query_info['output'] == "JSON" else self._format_text(response) except Exception as e: error_msg = {"status": "error", "message": str(e)} return json.dumps(error_msg, indent=2) if query_info['output'] == "JSON" else f"Error: {str(e)}" def _format_text(self, response: dict) -> str: return json.dumps(response, indent=2) # Fallback if text formatting fails def sanitize_filename(filename): """Sanitizes a filename by removing invalid characters.""" return re.sub(r'[<>:"/\\|?*\n]+', '_', filename) def validate_url(url): """Validate if the URL is properly formatted.""" try: result = urlparse(url) return all([result.scheme, result.netloc]) except Exception: return False def get_latest_data(url): """Get the latest HTML content of a webpage.""" try: headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' } response = requests.get(url, headers=headers, timeout=10) response.raise_for_status() # Raise an exception for bad status codes return response.text except requests.exceptions.RequestException as e: logging.error(f"Error fetching latest data from {url}: {str(e)}") return None def take_screenshot(url): """Take a screenshot of a webpage.""" try: chrome_options = Options() chrome_options.add_argument("--headless") chrome_options.add_argument("--no-sandbox") chrome_options.add_argument("--disable-dev-shm-usage") chrome_options.add_argument("--window-size=1920,1080") driver = webdriver.Chrome(options=chrome_options) driver.get(url) screenshot = driver.get_screenshot_as_png() driver.quit() image = Image.open(io.BytesIO(screenshot)) max_size = (1024, 1024) image.thumbnail(max_size, Image.LANCZOS) img_byte_arr = io.BytesIO() image.save(img_byte_arr, format='PNG') return img_byte_arr.getvalue() except Exception as e: logging.error(f"Screenshot error for {url}: {str(e)}") return None def process_urls(url_input, bulk_toggle, action_radio, max_urls, crawl_depth): """Process URLs with crawl depth and change detection.""" try: urls = re.split(r'[,\n]+', url_input.strip()) if bulk_toggle else [url_input] urls = [url.strip() for url in urls if url.strip()] urls = urls[:int(max_urls)] # Validate URLs invalid_urls = [url for url in urls if not validate_url(url)] if invalid_urls: return None, None, json.dumps({"error": f"Invalid URLs detected: {', '.join(invalid_urls)}"}, indent=2) scraped_data = [] screenshots = [] changes_log = [] # Create temporary directory for screenshots temp_dir = Path("temp_screenshots") temp_dir.mkdir(exist_ok=True) # Process each URL with progress tracking total_urls = len(urls) for idx, url in enumerate(urls): if not url.startswith(('http://', 'https://')): url = f'https://{url}' sanitized_url = sanitize_filename(url) # Take screenshot if action_radio in ['Capture image', 'Both']: screenshot = take_screenshot(url) if screenshot: screenshot_path = temp_dir / f"{sanitized_url}.png" with open(screenshot_path, 'wb') as f: f.write(screenshot) screenshots.append((url, str(screenshot_path))) # Convert Path to string logger.info(f"Screenshot saved: {screenshot_path}") # Log the saved screenshot path # Scrape data if action_radio in ['Scrape data', 'Both']: html_content = get_latest_data(url) if html_content: scraped_data.append({ 'url': url, 'content_length': len(html_content), 'timestamp': datetime.now().isoformat() }) # Create a ZIP file for the screenshots zip_file_path = temp_dir / "screenshots.zip" with zipfile.ZipFile(zip_file_path, 'w') as zipf: for screenshot in screenshots: zipf.write(screenshot[1], arcname=Path(screenshot[1]).name) # Use string for writing # Return the results return str(zip_file_path), screenshots, scraped_data # Return structured data for JSON output except Exception as e: logging.error(f"Error in process_urls: {str(e)}") return None, None, json.dumps({"error": str(e)}, indent=2) return demo def create_interface(): """Create the Gradio interface.""" scraper = SmartWebScraper() with gr.Blocks(theme=gr.themes.Soft()) as demo: gr.Markdown( """ # 🌐 Enhanced Web Scraper with Change Detection and Chat Monitor and capture changes in web content automatically. Use the chat interface to interact with scraped data. """ ) with gr.Tabs(): with gr.Tab("URL Scrape/Screenshot"): url_input = gr.Textbox( label="Enter URL(s)", placeholder="Enter single URL or multiple URLs separated by commas" ) with gr.Row(): bulk_toggle = gr.Checkbox(label="Bulk URLs", value=False) action_radio = gr.Radio( ["Scrape data", "Capture image", "Both"], label="Select Action", value="Both" ) with gr.Row(): max_urls = gr.Slider( minimum=1, maximum=20, value=5, step=1, label="Max URLs to process" ) crawl_depth = gr.Slider( minimum=0, maximum=3, value=1, step=1, label="Crawl Depth (0 for no recursion)" ) process_button = gr.Button("Process URLs", variant="primary") with gr.Column(): # Add gallery for screenshot preview gallery = gr.Gallery( label="Screenshots Preview", show_label=True, elem_id="gallery", columns=[3], rows=[2], height="auto", object_fit="contain" # Add proper image scaling ) # Download button and results download_file = gr.File(label="Download Results (ZIP)") scraped_data_output = gr.JSON(label="Results Summary") process_button.click( fn=process_urls, inputs=[ url_input, bulk_toggle, action_radio, max_urls, crawl_depth ], outputs=[ download_file, gallery, scraped_data_output ], show_progress=True ) with gr.Tab("Chat-Based Scrape"): instruction = gr.Textbox( label="Enter Instruction", placeholder="e.g., 'Scrape all links' or 'Extract all images'" ) chat_url_input = gr.Textbox( label="Enter URL", value="https://example.com", placeholder="Enter the target URL" ) output_format = gr.Radio( ["Formatted Text", "JSON"], label="Output Format", value="Formatted Text" ) chat_output = gr.Textbox(label="Output") chat_button = gr.Button("Execute Instruction", variant="primary") chat_button.click ( fn=scraper.chat_based_scrape, inputs=[instruction, chat_url_input, output_format], outputs=chat_output ) gr.Markdown( """ ### Features - Bulk URL processing - Screenshot capture - Content change detection - Recursive crawling - Chat-based instructions for interacting with scraped data """ ) return demo if __name__ == "__main__": demo = create_interface() demo.launch(debug=True)