Spaces:
Runtime error
Runtime error
import gradio as gr | |
import requests | |
import re | |
import logging | |
import json | |
from typing import Tuple, List, Dict, Union, Optional | |
from bs4 import BeautifulSoup | |
from urllib.parse import urlparse, urljoin | |
from nltk import word_tokenize | |
from nltk.corpus import stopwords | |
from nltk.stem import WordNetLemmatizer | |
from datetime import datetime | |
import io | |
import zipfile | |
import os | |
import tempfile | |
from selenium import webdriver | |
from selenium.webdriver.chrome.service import Service | |
from selenium.webdriver.chrome.options import Options | |
from PIL import Image | |
import base64 | |
import asyncio | |
import yaml | |
from pathlib import Path | |
from tqdm import tqdm | |
import plotly.graph_objects as go | |
# Configure detailed logging | |
logging.basicConfig( | |
level=logging.DEBUG, | |
format='%(asctime)s - %(levelname)s - %(message)s', | |
handlers=[ | |
logging.FileHandler('webscraper.log'), | |
logging.StreamHandler() | |
] | |
) | |
# Download necessary NLTK data | |
import nltk | |
try: | |
nltk.download('punkt', quiet=True) | |
nltk.download('stopwords', quiet=True) | |
nltk.download('wordnet', quiet=True) | |
nltk.download('averaged_perceptron_tagger', quiet=True) | |
except Exception as e: | |
logging.error(f"Error downloading NLTK data: {str(e)}") | |
# Configuration and logging setup | |
class Config: | |
DATA_DIR = Path('scraped_data') | |
LOGS_DIR = Path('logs') | |
MAX_RETRIES = 3 | |
TIMEOUT = 30 | |
def initialize(cls): | |
"""Initialize necessary directories and configurations""" | |
cls.DATA_DIR.mkdir(exist_ok=True) | |
cls.LOGS_DIR.mkdir(exist_ok=True) | |
# Setup logging | |
logging.basicConfig( | |
level=logging.INFO, | |
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', | |
handlers=[ | |
logging.FileHandler(cls.LOGS_DIR / 'app.log'), | |
logging.StreamHandler() | |
] | |
) | |
return logging.getLogger(__name__) | |
logger = Config.initialize() | |
class WebDriverManager: | |
"""Manage WebDriver instances""" | |
def get_driver() -> webdriver.Chrome: | |
options = Options() | |
options.add_argument('--headless') | |
options.add_argument('--no-sandbox') | |
options.add_argument('--disable-dev-shm-usage') | |
options.add_argument('--window-size=1920,1080') | |
return webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=options) | |
class DataExtractor: | |
"""Extract and process webpage content""" | |
def __init__(self): | |
self.soup = None | |
self.url = None | |
self.logger = logging.getLogger(__name__) | |
def set_page(self, html: str, url: str): | |
"""Set the page content for extraction""" | |
self.soup = BeautifulSoup(html, 'html.parser') | |
self.url = url | |
def extract_images(self) -> List[Dict]: | |
"""Extract image information from the page""" | |
images = [] | |
try: | |
for img in self.soup.find_all('img'): | |
image_info = { | |
'src': urljoin(self.url, img.get('src', '')), | |
'alt': img.get('alt', ''), | |
'title': img.get('title', ''), | |
'dimensions': self._get_image_dimensions(img), | |
'file_type': self._get_file_type(img.get('src', '')) | |
} | |
images.append(image_info) | |
except Exception as e: | |
self.logger.error(f"Error extracting images: {str(e)}") | |
return images | |
def extract_links(self) -> List[Dict]: | |
"""Extract link information from the page""" | |
links = [] | |
try: | |
for a in self.soup.find_all('a', href=True): | |
absolute_url = urljoin(self.url, a.get('href', '')) | |
link_info = { | |
'href': absolute_url, | |
'text': a.get_text(strip=True), | |
'title': a.get('title', ''), | |
'type': 'internal' if self.url in absolute_url else 'external', | |
'has_image': bool(a.find('img')) | |
} | |
links.append(link_info) | |
except Exception as e: | |
self.logger.error(f"Error extracting links: {str(e)}") | |
return links | |
def extract_text(self) -> List[Dict]: | |
"""Extract text content from the page""" | |
texts = [] | |
try: | |
for text_element in self.soup.find_all(['p', 'h1', 'h2', 'h3', 'h4', 'h5', 'h6']): | |
text_info = { | |
'content': text_element.get_text(strip=True), | |
'source': text_element.name | |
} | |
if text_info['content']: # Only add non-empty text blocks | |
texts.append(text_info) | |
except Exception as e: | |
self.logger.error(f"Error extracting text: {str(e)}") | |
return texts | |
def _get_image_dimensions(self, img_tag) -> str: | |
"""Get image dimensions from tag attributes""" | |
width = img_tag.get('width', '') | |
height = img_tag.get('height', '') | |
if width and height: | |
return f"{width}x{height}" | |
return "unknown" | |
def _get_file_type(self, src: str) -> str: | |
"""Determine image file type from URL""" | |
if not src: | |
return "unknown" | |
ext = src.split('.')[-1].lower() | |
return ext if ext in ['jpg', 'jpeg', 'png', 'gif', 'webp'] else "unknown" | |
class QueryAnalyzer: | |
"""Analyze natural language queries""" | |
def __init__(self): | |
self.logger = logging.getLogger(__name__) | |
self.stop_words = set(stopwords.words('english')) | |
self.lemmatizer = WordNetLemmatizer() | |
self.logger.info("QueryAnalyzer initialized") | |
def parse_query(self, query: str) -> Dict[str, Union[str, int]]: | |
try: | |
self.logger.info(f"Parsing query: {query}") | |
tokens = word_tokenize(query.lower()) | |
filtered_tokens = [self.lemmatizer.lemmatize(token) for token in tokens | |
if token.isalnum() and token not in self.stop_words] | |
return { | |
'target': self._identify_target(filtered_tokens), | |
'limit': self._identify_limit(filtered_tokens), | |
'filters': self._identify_filters(filtered_tokens), | |
'output': 'JSON' if 'json' in query.lower() else 'Formatted Text' | |
} | |
except Exception as e: | |
self.logger.error(f"Error parsing query: {str(e)}") | |
return {'target': 'unknown', 'limit': 0, 'filters': {}} | |
def _identify_target(self, tokens: List[str]) -> str: | |
target_map = { | |
'image': 'image', | |
'images': 'image', | |
'picture': 'image', | |
'link': 'link', | |
'links': 'link', | |
'text': 'text', | |
'content': 'text' | |
} | |
for token in tokens: | |
if token in target_map: | |
return target_map[token] | |
return 'unknown' | |
def _identify_limit(self, tokens: List[str]) -> int: | |
for token in tokens: | |
if token.isdigit(): | |
return int(token) | |
return 0 | |
def _identify_filters(self, tokens: List[str]) -> Dict[str, str]: | |
filters = {} | |
if 'external' in tokens: | |
filters['link_type'] = 'external' | |
elif 'internal' in tokens: | |
filters['link_type'] = 'internal' | |
if 'png' in tokens: | |
filters['file_type'] = 'png' | |
elif 'jpg' in tokens or 'jpeg' in tokens: | |
filters['file_type'] = 'jpg' | |
return filters | |
class ResponseFormatter: | |
"""Format scraped data based on user preferences""" | |
def __init__(self): | |
self.logger = logging.getLogger(__name__) | |
def format_data(self, data: List[Dict], query_info: Dict) -> str: | |
try: | |
if not data: | |
return "No data found for the specified query." | |
# Apply filters | |
filtered_data = self._apply_filters(data, query_info.get('filters', {})) | |
# Apply limit | |
if query_info.get('limit', 0) > 0: | |
filtered_data = filtered_data[:query_info['limit']] | |
if query_info['output'] == "JSON": | |
return json.dumps({ | |
"metadata": { | |
"query": query_info, | |
"timestamp": datetime.now().isoformat(), | |
"results_count": len(filtered_data) | |
}, | |
"results": filtered_data | |
}, indent=2) | |
return self._format_human_readable(filtered_data, query_info['target']) | |
except Exception as e: | |
self.logger.error(f"Formatting error: {str(e)}") | |
return f"Error formatting results: {str(e)}" | |
def _apply_filters(self, data: List[Dict], filters: Dict) -> List[Dict]: | |
filtered_data = data | |
if 'link_type' in filters: | |
filtered_data = [item for item in filtered_data | |
if item.get('type', '') == filters['link_type']] | |
if 'file_type' in filters: | |
filtered_data = [item for item in filtered_data | |
if item.get('file_type', '').lower() == filters['file_type']] | |
return filtered_data | |
def _format_human_readable(self, data: List[Dict], target: str) -> str: | |
formats = { | |
'image': self._format_images, | |
'link': self._format_links, | |
'text': self._format_texts | |
} | |
return formats.get(target, lambda x: "Unknown data type")(data) | |
def _format_images(self, images: List[Dict]) -> str: | |
return "\n\n".join( | |
f"Image {idx+1}:\n" | |
f"Source: {img['src']}\n" | |
f"Alt Text: {img['alt']}\n" | |
f"Dimensions: {img['dimensions']}\n" | |
f"Type: {img['file_type']}" | |
for idx, img in enumerate(images) | |
) | |
def _format_links(self, links: List[Dict]) -> str: | |
return "\n\n".join( | |
f"Link {idx+1}:\n" | |
f"URL: {link['href']}\n" | |
f"Text: {link['text']}\n" | |
f"Type: {link['type']}\n" | |
f"Contains Image: {'Yes' if link['has_image'] else 'No'}" | |
for idx, link in enumerate(links) | |
) | |
def _format_texts(self, texts: List[Dict]) -> str: | |
return "\n\n".join( | |
f"Text Block {idx+1} ({text['source'].upper()}):\n" | |
f"{text['content']}" | |
for idx, text in enumerate(texts) | |
) | |
class Scraper: | |
"""Core scraping functionality with improved error handling""" | |
def __init__(self): | |
self.logger = logging.getLogger(__name__) | |
self.session = requests.Session() | |
self.session.headers.update({ | |
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36' | |
}) | |
async def fetch_page(self, url: str) -> Optional[str]: | |
"""Fetch page content with retry mechanism""" | |
for attempt in range(Config.MAX_RETRIES): | |
try: | |
response = self.session.get(url, timeout=Config.TIMEOUT) | |
response.raise_for_status() | |
return response.text | |
except Exception as e: | |
self.logger.error(f"Attempt {attempt + 1} failed for {url}: {str(e)}") | |
if attempt == Config.MAX_RETRIES - 1: | |
return None | |
async def take_screenshot(self, url: str) -> Optional[bytes]: | |
"""Take a screenshot of a webpage with improved error handling.""" | |
driver = None | |
try: | |
options = Options() | |
options.add_argument("--headless") | |
options.add_argument("--no-sandbox") | |
options.add_argument("--disable-dev-shm-usage") | |
options.add_argument("--window-size=1920,1080") | |
driver = webdriver.Chrome(options=options) | |
driver.get(url) | |
# Wait for page load | |
time.sleep(2) | |
# Take screenshot | |
screenshot = driver.get_screenshot_as_png() | |
# Process image | |
img = Image.open(io.BytesIO(screenshot)) | |
img = img.convert('RGB') # Convert to RGB to ensure compatibility | |
# Save to bytes | |
img_byte_arr = io.BytesIO() | |
img.save(img_byte_arr, format='PNG', optimize=True) | |
return img_byte_arr.getvalue() | |
except Exception as e: | |
logging.error(f"Screenshot error for {url}: {str(e)}") | |
return None | |
finally: | |
if driver: | |
driver.quit() | |
class SmartWebScraper: | |
"""Smart web scraping with natural language processing capabilities""" | |
def __init__(self): | |
self.query_analyzer = QueryAnalyzer() | |
self.data_extractor = DataExtractor() | |
self.response_formatter = ResponseFormatter() | |
self.logger = logging.getLogger(__name__) | |
self.scraped_data = {} | |
def chat_based_scrape(self, instruction: str, url: str, output_format: str = "Formatted Text") -> str: | |
"""Process natural language instructions for web scraping""" | |
try: | |
if not instruction or not url: | |
return "Please provide both instruction and URL." | |
# Process the URL and instruction | |
raw_data = self.process_url(url, instruction) # Call the full scraping function | |
if isinstance(raw_data, str): # Check if the response is an error message | |
return raw_data # Return the error message directly | |
query_info = self.query_analyzer.parse_query(instruction) | |
query_info['output'] = output_format | |
if output_format == "JSON": | |
return json.dumps({ | |
"status": "success", | |
"request": { | |
"url": url, | |
"instruction": instruction, | |
"timestamp": datetime.now().isoformat() | |
}, | |
"data": raw_data, | |
"metadata": { | |
"source": url, | |
"elements_found": len(raw_data), | |
"content_type": type(raw_data).__name__ | |
} | |
}, indent=2) | |
return self.response_formatter.format_data(raw_data, query_info) | |
except Exception as e: | |
error_msg = f"Error processing chat-based scrape: {str(e)}" | |
self.logger.error(error_msg) | |
return error_msg | |
def process_url(self, url: str, query: str) -> str: | |
"""Process URL based on query""" | |
try: | |
# Validate URL | |
if not self._validate_url(url): | |
return "Please provide a valid URL (including http:// or https://)." | |
# Fetch page | |
headers = { | |
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36' | |
} | |
response = requests.get(url, headers=headers, timeout=10) | |
response.raise_for_status() | |
# Set page content and store in scraped_data | |
self.data_extractor.set_page(response.text, url) | |
self.logger.info(f"Scraping data from URL: {url}") # Log the URL being scraped | |
images = self.data_extractor.extract_images() | |
links = self.data_extractor.extract_links() | |
texts = self.data_extractor.extract_text() | |
self.scraped_data[url] = { | |
'images': images, | |
'links': links, | |
'texts': texts | |
'links': links, | |
'texts': texts | |
'images': self.data_extractor.extract_images(), | |
'links': self.data_extractor.extract_links(), | |
'texts': self.data_extractor.extract_text() | |
} | |
# Analyze query and extract data | |
query_info = self.query_analyzer.parse_query(query) | |
data = self._get_data_for_target(query_info['target'], url) | |
html_content = get_latest_data(url) # Fetch the HTML content | |
full_scraped_data = { | |
'url': url, | |
'images': self.scraped_data[url]['images'], | |
'links': self.scraped_data[url]['links'], | |
'texts': self.scraped_data[url]['texts'], | |
'metadata': { | |
'content_length': len(html_content), | |
'timestamp': datetime.now().isoformat() | |
} | |
} | |
return self.response_formatter.format_data(full_scraped_data, query_info) | |
except requests.exceptions.RequestException as e: | |
error_msg = f"Error fetching the webpage: {str(e)}" | |
self.logger.error(error_msg) | |
return error_msg | |
except Exception as e: | |
error_msg = f"An error occurred: {str(e)}" | |
self.logger.error(error_msg) | |
return error_msg | |
def _validate_url(self, url: str) -> bool: | |
"""Validate URL format""" | |
try: | |
result = urlparse(url) | |
return all([result.scheme, result.netloc]) | |
except Exception as e: | |
self.logger.error(f"URL validation error: {str(e)}") | |
return False | |
def _get_data_for_target(self, target: str, url: str) -> List[Dict]: | |
"""Get specific data based on target type""" | |
if url not in self.scraped_data: | |
self.logger.warning(f"No data found for URL: {url}") | |
return [] | |
if target == 'image': | |
return self.scraped_data[url]['images'] | |
elif target == 'link': | |
return self.scraped_data[url]['links'] | |
elif target == 'text': | |
return self.scraped_data[url]['texts'] | |
else: | |
self.logger.warning(f"Unknown target type: {target}") | |
return [] | |
class QueryAnalyzer: | |
def __init__(self): | |
self.logger = logging.getLogger(__name__) | |
self.stop_words = set(stopwords.words('english')) | |
self.lemmatizer = WordNetLemmatizer() | |
def parse_query(self, query: str) -> Dict[str, Union[str, int]]: | |
try: | |
tokens = word_tokenize(query.lower()) | |
filtered_tokens = [ | |
self.lemmatizer.lemmatize(token) | |
for token in tokens | |
if token.isalnum() and token not in self.stop_words | |
] | |
return { | |
'target': self._identify_target(filtered_tokens), | |
'limit': self._identify_limit(filtered_tokens), | |
'filters': self._identify_filters(filtered_tokens), | |
'output': 'JSON' if 'json' in query.lower() else 'Formatted Text' | |
} | |
except Exception as e: | |
self.logger.error(f"Error parsing query: {str(e)}") | |
return {'target': 'unknown', 'limit': 0, 'filters': {}} | |
def _identify_target(self, tokens: List[str]) -> str: | |
targets = {'image': 'image', 'link': 'link', 'text': 'text'} | |
for token in tokens: | |
if token in targets: | |
return targets[token] | |
return 'unknown' | |
def _identify_limit(self, tokens: List[str]) -> int: | |
for token in tokens: | |
if token.isdigit(): | |
return int(token) | |
return 0 | |
def _identify_filters(self, tokens: List[str]) -> Dict[str, str]: | |
filters = {} | |
if 'external' in tokens: | |
filters['link_type'] = 'external' | |
elif 'internal' in tokens: | |
filters['link_type'] = 'internal' | |
return filters | |
class ResponseFormatter: | |
def __init__(self): | |
self.logger = logging.getLogger(__name__) | |
def format_data(self, data: List[Dict], query_info: Dict) -> Union[str, dict]: | |
try: | |
if not data: | |
return {"status": "success", "data": [], "message": "No data found"} if query_info['output'] == "JSON" else "No data found" | |
response = { | |
"metadata": { | |
"target": query_info['target'], | |
"limit": query_info['limit'], | |
"filters": query_info['filters'], | |
"timestamp": datetime.now().isoformat() | |
}, | |
"data": data[:query_info['limit']] if query_info['limit'] > 0 else data | |
} | |
return json.dumps(response, indent=2) if query_info['output'] == "JSON" else self._format_text(response) | |
except Exception as e: | |
error_msg = {"status": "error", "message": str(e)} | |
return json.dumps(error_msg, indent=2) if query_info['output'] == "JSON" else f"Error: {str(e)}" | |
def _format_text(self, response: dict) -> str: | |
return json.dumps(response, indent=2) # Fallback if text formatting fails | |
def sanitize_filename(filename): | |
"""Sanitizes a filename by removing invalid characters.""" | |
return re.sub(r'[<>:"/\\|?*\n]+', '_', filename) | |
def validate_url(url): | |
"""Validate if the URL is properly formatted.""" | |
try: | |
result = urlparse(url) | |
return all([result.scheme, result.netloc]) | |
except Exception: | |
return False | |
def get_latest_data(url): | |
"""Get the latest HTML content of a webpage.""" | |
try: | |
headers = { | |
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' | |
} | |
response = requests.get(url, headers=headers, timeout=10) | |
response.raise_for_status() # Raise an exception for bad status codes | |
return response.text | |
except requests.exceptions.RequestException as e: | |
logging.error(f"Error fetching latest data from {url}: {str(e)}") | |
return None | |
def take_screenshot(url): | |
"""Take a screenshot of a webpage.""" | |
try: | |
chrome_options = Options() | |
chrome_options.add_argument("--headless") | |
chrome_options.add_argument("--no-sandbox") | |
chrome_options.add_argument("--disable-dev-shm-usage") | |
chrome_options.add_argument("--window-size=1920,1080") | |
driver = webdriver.Chrome(options=chrome_options) | |
driver.get(url) | |
screenshot = driver.get_screenshot_as_png() | |
driver.quit() | |
image = Image.open(io.BytesIO(screenshot)) | |
max_size = (1024, 1024) | |
image.thumbnail(max_size, Image.LANCZOS) | |
img_byte_arr = io.BytesIO() | |
image.save(img_byte_arr, format='PNG') | |
return img_byte_arr.getvalue() | |
except Exception as e: | |
logging.error(f"Screenshot error for {url}: {str(e)}") | |
return None | |
def process_urls(url_input, bulk_toggle, action_radio, max_urls, crawl_depth): | |
"""Process URLs with crawl depth and change detection.""" | |
try: | |
urls = re.split(r'[,\n]+', url_input.strip()) if bulk_toggle else [url_input] | |
urls = [url.strip() for url in urls if url.strip()] | |
urls = urls[:int(max_urls)] | |
# Validate URLs | |
invalid_urls = [url for url in urls if not validate_url(url)] | |
if invalid_urls: | |
return None, None, json.dumps({"error": f"Invalid URLs detected: {', '.join(invalid_urls)}"}, indent=2) | |
scraped_data = [] | |
screenshots = [] | |
changes_log = [] | |
# Create temporary directory for screenshots | |
temp_dir = Path("temp_screenshots") | |
temp_dir.mkdir(exist_ok=True) | |
# Process each URL with progress tracking | |
total_urls = len(urls) | |
for idx, url in enumerate(urls): | |
if not url.startswith(('http://', 'https://')): | |
url = f'https://{url}' | |
sanitized_url = sanitize_filename(url) | |
# Take screenshot | |
if action_radio in ['Capture image', 'Both']: | |
screenshot = take_screenshot(url) | |
if screenshot: | |
screenshot_path = temp_dir / f"{sanitized_url}.png" | |
with open(screenshot_path, 'wb') as f: | |
f.write(screenshot) | |
screenshots.append((url, str(screenshot_path))) # Convert Path to string | |
logger.info(f"Screenshot saved: {screenshot_path}") # Log the saved screenshot path | |
# Scrape data | |
if action_radio in ['Scrape data', 'Both']: | |
html_content = get_latest_data(url) | |
if html_content: | |
scraped_data.append({ | |
'url': url, | |
'content_length': len(html_content), | |
'timestamp': datetime.now().isoformat() | |
}) | |
# Create a ZIP file for the screenshots | |
zip_file_path = temp_dir / "screenshots.zip" | |
with zipfile.ZipFile(zip_file_path, 'w') as zipf: | |
for screenshot in screenshots: | |
zipf.write(screenshot[1], arcname=Path(screenshot[1]).name) # Use string for writing | |
# Return the results | |
return str(zip_file_path), screenshots, scraped_data # Return structured data for JSON output | |
except Exception as e: | |
logging.error(f"Error in process_urls: {str(e)}") | |
return None, None, json.dumps({"error": str(e)}, indent=2) | |
return demo | |
def create_interface(): | |
"""Create the Gradio interface.""" | |
scraper = SmartWebScraper() | |
with gr.Blocks(theme=gr.themes.Soft()) as demo: | |
gr.Markdown( | |
""" | |
# 🌐 Enhanced Web Scraper with Change Detection and Chat | |
Monitor and capture changes in web content automatically. Use the chat interface to interact with scraped data. | |
""" | |
) | |
with gr.Tabs(): | |
with gr.Tab("URL Scrape/Screenshot"): | |
url_input = gr.Textbox( | |
label="Enter URL(s)", | |
placeholder="Enter single URL or multiple URLs separated by commas" | |
) | |
with gr.Row(): | |
bulk_toggle = gr.Checkbox(label="Bulk URLs", value=False) | |
action_radio = gr.Radio( | |
["Scrape data", "Capture image", "Both"], | |
label="Select Action", | |
value="Both" | |
) | |
with gr.Row(): | |
max_urls = gr.Slider( | |
minimum=1, | |
maximum=20, | |
value=5, | |
step=1, | |
label="Max URLs to process" | |
) | |
crawl_depth = gr.Slider( | |
minimum=0, | |
maximum=3, | |
value=1, | |
step=1, | |
label="Crawl Depth (0 for no recursion)" | |
) | |
process_button = gr.Button("Process URLs", variant="primary") | |
with gr.Column(): | |
# Add gallery for screenshot preview | |
gallery = gr.Gallery( | |
label="Screenshots Preview", | |
show_label=True, | |
elem_id="gallery", | |
columns=[3], | |
rows=[2], | |
height="auto", | |
object_fit="contain" # Add proper image scaling | |
) | |
# Download button and results | |
download_file = gr.File(label="Download Results (ZIP)") | |
scraped_data_output = gr.JSON(label="Results Summary") | |
process_button.click( | |
fn=process_urls, | |
inputs=[ | |
url_input, | |
bulk_toggle, | |
action_radio, | |
max_urls, | |
crawl_depth | |
], | |
outputs=[ | |
download_file, | |
gallery, | |
scraped_data_output | |
], | |
show_progress=True | |
) | |
with gr.Tab("Chat-Based Scrape"): | |
instruction = gr.Textbox( | |
label="Enter Instruction", | |
placeholder="e.g., 'Scrape all links' or 'Extract all images'" | |
) | |
chat_url_input = gr.Textbox( | |
label="Enter URL", | |
value="https://example.com", | |
placeholder="Enter the target URL" | |
) | |
output_format = gr.Radio( | |
["Formatted Text", "JSON"], | |
label="Output Format", | |
value="Formatted Text" | |
) | |
chat_output = gr.Textbox(label="Output") | |
chat_button = gr.Button("Execute Instruction", variant="primary") | |
chat_button.click ( | |
fn=scraper.chat_based_scrape, | |
inputs=[instruction, chat_url_input, output_format], | |
outputs=chat_output | |
) | |
gr.Markdown( | |
""" | |
### Features | |
- Bulk URL processing | |
- Screenshot capture | |
- Content change detection | |
- Recursive crawling | |
- Chat-based instructions for interacting with scraped data | |
""" | |
) | |
return demo | |
if __name__ == "__main__": | |
demo = create_interface() | |
demo.launch(debug=True) | |