Spaces:
Running
Running
import gradio as gr | |
import requests | |
from bs4 import BeautifulSoup | |
import re | |
from urllib.parse import urljoin, urlparse | |
import asyncio | |
import aiohttp | |
from collections import defaultdict | |
import unicodedata | |
import logging | |
logging.basicConfig(level=logging.INFO) | |
logger = logging.getLogger(__name__) | |
class WebsiteCrawler: | |
def __init__(self, max_depth=3, max_pages=50): | |
self.max_depth = max_depth | |
self.max_pages = max_pages | |
self.visited_urls = set() | |
self.url_metadata = defaultdict(dict) | |
self.homepage_metadata = None | |
self.headers = { | |
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", | |
} | |
def determine_category_importance(self, url, title, desc): | |
url_lower = url.lower() | |
path = urlparse(url).path.lower() | |
if path == "/" or path == "": | |
return "Main", 10 | |
if any(x in url_lower for x in ["/docs", "/faq", "/help"]): | |
return "Documentation", 8 | |
elif any(x in url_lower for x in ["/api", "/developer"]): | |
return "API", 8 | |
elif any(x in url_lower for x in ["/about", "/company", "/contact"]): | |
return "About", 7 | |
elif any(x in url_lower for x in ["/news", "/blog", "/events"]): | |
return "News", 5 | |
elif any(x in url_lower for x in ["/tools", "/pricing"]): | |
return "Tools", 6 | |
return "Optional", 1 | |
def clean_text(self, text, is_title=False): | |
if not text: | |
return "" | |
text = unicodedata.normalize("NFKD", text) | |
text = re.sub(r"[^\x00-\x7F]+", "", text) | |
text = " ".join(text.split()).strip() | |
if is_title: | |
text = re.sub(r"^\s*Welcome to\s+", "", text) | |
return text | |
async def crawl_page(self, url, depth, base_domain): | |
if ( | |
depth > self.max_depth | |
or url in self.visited_urls | |
or len(self.visited_urls) >= self.max_pages | |
): | |
return [] | |
try: | |
async with aiohttp.ClientSession( | |
timeout=aiohttp.ClientTimeout(total=20) | |
) as session: | |
async with session.get( | |
url, headers=self.headers, allow_redirects=True | |
) as response: | |
if response.status != 200: | |
return [] | |
text = await response.text() | |
self.visited_urls.add(url) | |
soup = BeautifulSoup(text, "html.parser") | |
title_tag = soup.find("title") | |
title = ( | |
self.clean_text(title_tag.text) | |
if title_tag | |
else url.split("/")[-1] | |
) | |
desc_tag = soup.find("meta", {"name": "description"}) | |
desc = ( | |
self.clean_text(desc_tag["content"]) | |
if desc_tag and desc_tag.get("content") | |
else "" | |
) | |
category, importance = self.determine_category_importance( | |
url, title, desc | |
) | |
self.url_metadata[url] = { | |
"title": title, | |
"description": desc, | |
"category": category, | |
"importance": importance, | |
} | |
links = [] | |
for a in soup.find_all("a", href=True): | |
next_url = urljoin(url, a["href"]) | |
if urlparse(next_url).netloc == base_domain: | |
links.append(next_url) | |
return links | |
except Exception as e: | |
logger.error(f"Error crawling {url}: {str(e)}") | |
return [] | |
async def process_homepage(self, url): | |
try: | |
async with aiohttp.ClientSession( | |
timeout=aiohttp.ClientTimeout(total=20) | |
) as session: | |
async with session.get( | |
url, headers=self.headers, allow_redirects=True | |
) as response: | |
if response.status != 200: | |
return | |
text = await response.text() | |
soup = BeautifulSoup(text, "html.parser") | |
site_name = ( | |
soup.find("title").text.split("|")[0].strip() | |
if soup.find("title") | |
else urlparse(url).netloc | |
) | |
description = soup.find("meta", {"name": "description"}) | |
description = ( | |
description["content"].strip() | |
if description and description.get("content") | |
else None | |
) | |
self.homepage_metadata = { | |
"site_name": self.clean_text(site_name, is_title=True), | |
"description": ( | |
self.clean_text(description) if description else None | |
), | |
} | |
except Exception as e: | |
logger.error(f"Error processing homepage {url}: {str(e)}") | |
async def crawl_website(self, start_url): | |
try: | |
await self.process_homepage(start_url) | |
base_domain = urlparse(start_url).netloc | |
queue = [(start_url, 0)] | |
seen = {start_url} | |
while queue and len(self.visited_urls) < self.max_pages: | |
current_url, depth = queue.pop(0) | |
if depth > self.max_depth: | |
continue | |
links = await self.crawl_page(current_url, depth, base_domain) | |
for link in links: | |
if link not in seen: | |
seen.add(link) | |
queue.append((link, depth + 1)) | |
except Exception as e: | |
logger.error(f"Error during crawl: {str(e)}") | |
raise | |
def generate_llms_txt(self): | |
if not self.url_metadata: | |
return "No content available." | |
content = [] | |
homepage_title = self.homepage_metadata.get("site_name", "Website") | |
homepage_description = self.homepage_metadata.get( | |
"description", "No description available." | |
) | |
content.append(f"# {homepage_title}\n\n> {homepage_description}\n") | |
categories = defaultdict(list) | |
for url, metadata in self.url_metadata.items(): | |
categories[metadata["category"]].append((url, metadata)) | |
category_order = [ | |
"Main", | |
"Documentation", | |
"API", | |
"About", | |
"News", | |
"Tools", | |
"Optional", | |
] | |
for category in category_order: | |
if category in categories: | |
content.append(f"## {category}") | |
for url, metadata in categories[category]: | |
content.append( | |
f"- [{metadata['title']}]({url}): {metadata['description']}" | |
) | |
return "\n".join(content) | |
async def process_url(url, max_depth, max_pages): | |
try: | |
if not url.startswith(("http://", "https://")): | |
url = "https://" + url | |
result = urlparse(url) | |
if not result.scheme or not result.netloc: | |
return "", "Invalid URL format. Please enter a valid URL." | |
crawler = WebsiteCrawler(max_depth=int(max_depth), max_pages=int(max_pages)) | |
await crawler.crawl_website(url) | |
content = crawler.generate_llms_txt() | |
return content, f"Successfully crawled {len(crawler.visited_urls)} pages." | |
except Exception as e: | |
logger.error(f"Error processing URL {url}: {str(e)}") | |
return "", f"Error: {str(e)}" | |
# Gradio interface | |
theme = gr.themes.Soft(primary_hue="blue", font="Open Sans") | |
with gr.Blocks(theme=theme) as iface: | |
with gr.Row(): | |
gr.Markdown("## Website Crawler - Generate llms.txt") | |
with gr.Row(): | |
url_input = gr.Textbox( | |
label="Website URL", | |
placeholder="Enter the website URL (e.g., example.com)", | |
info="The URL will be automatically prefixed with https:// if not provided", | |
lines=1, | |
) | |
with gr.Row(): | |
depth_input = gr.Slider( | |
minimum=1, maximum=5, value=3, step=1, label="Maximum Crawl Depth" | |
) | |
pages_input = gr.Slider( | |
minimum=10, maximum=100, value=50, step=10, label="Maximum Pages" | |
) | |
with gr.Row(): | |
generate_btn = gr.Button("Generate llms.txt", variant="primary") | |
with gr.Row(): | |
output = gr.Textbox( | |
label="Generated llms.txt Content", | |
lines=15, | |
show_copy_button=True, | |
container=True, | |
) | |
with gr.Row(): | |
status = gr.Textbox(label="Status", interactive=False) | |
def process_url_sync_wrapper(url, depth, pages): | |
return asyncio.run(process_url(url, depth, pages)) | |
generate_btn.click( | |
fn=process_url_sync_wrapper, | |
inputs=[url_input, depth_input, pages_input], | |
outputs=[output, status], | |
) | |
if __name__ == "__main__": | |
iface.launch() | |