Spaces:
Running
Running
Update app.py
Browse files
app.py
CHANGED
@@ -9,6 +9,7 @@ from collections import defaultdict
|
|
9 |
import unicodedata
|
10 |
import logging
|
11 |
import ssl
|
|
|
12 |
|
13 |
logging.basicConfig(level=logging.INFO)
|
14 |
logger = logging.getLogger(__name__)
|
@@ -28,377 +29,164 @@ class WebsiteCrawler:
|
|
28 |
"Accept-Encoding": "gzip, deflate, br",
|
29 |
"DNT": "1",
|
30 |
"Connection": "keep-alive",
|
31 |
-
"Upgrade-Insecure-Requests": "1"
|
32 |
}
|
|
|
33 |
|
34 |
-
def
|
35 |
-
|
36 |
-
|
37 |
-
|
38 |
-
|
39 |
-
|
40 |
-
|
41 |
-
|
42 |
-
|
43 |
-
|
44 |
-
|
45 |
-
|
46 |
-
|
47 |
-
|
48 |
-
|
49 |
-
|
50 |
-
|
51 |
-
|
52 |
-
|
53 |
-
|
54 |
-
|
55 |
-
|
56 |
-
|
57 |
-
|
58 |
-
|
59 |
-
elif
|
60 |
-
|
61 |
-
|
62 |
-
|
63 |
-
|
64 |
-
|
65 |
-
|
66 |
-
|
67 |
-
|
68 |
-
|
69 |
-
|
70 |
-
return "Tools", 6
|
71 |
-
|
72 |
-
# Check if URL path contains non-ASCII or percent-encoded characters
|
73 |
-
if bool(re.search(r'[^\x00-\x7F]', path)) or bool(re.search(r'%[0-9A-F]{2}', path)):
|
74 |
-
return "Optional", 0
|
75 |
-
|
76 |
-
return "Optional", 1
|
77 |
-
|
78 |
-
def is_duplicate_content(self, desc, title, url):
|
79 |
-
"""Improved duplicate/translation detection"""
|
80 |
-
if not desc or not title:
|
81 |
-
return False
|
82 |
-
|
83 |
-
# Skip non-latin character URLs or URLs with percent-encoded non-ASCII
|
84 |
-
if bool(re.search(r'[^\x00-\x7F]', url)) or bool(re.search(r'%[0-9A-F]{2}', url)):
|
85 |
-
return True
|
86 |
-
|
87 |
-
|
88 |
-
# Skip common translation paths
|
89 |
-
translation_indicators = [
|
90 |
-
'/welcome', '/bienvenue', '/willkommen', '/benvenuto',
|
91 |
-
'/tervetuloa', '/bienvenido', '/velkommen', '/welkom',
|
92 |
-
'translate.com/', '/translate/', '/translation/'
|
93 |
-
]
|
94 |
-
if any(indicator in url.lower() for indicator in translation_indicators):
|
95 |
-
url_path = urlparse(url).path.lower()
|
96 |
-
if url_path != '/': # Don't skip homepage
|
97 |
-
return True
|
98 |
-
|
99 |
-
# Check for similar content length and patterns
|
100 |
-
for existing_metadata in self.url_metadata.values():
|
101 |
-
existing_desc = existing_metadata.get("description", "")
|
102 |
-
existing_title = existing_metadata.get("title", "")
|
103 |
-
if not existing_desc or not existing_title:
|
104 |
-
continue
|
105 |
-
|
106 |
-
# If descriptions are very similar in length, likely a translation
|
107 |
-
if (abs(len(desc) - len(existing_desc)) < 20 and
|
108 |
-
len(desc) > 50 and
|
109 |
-
desc != existing_desc): # Allow exact duplicates for main page
|
110 |
-
return True
|
111 |
-
|
112 |
-
return False
|
113 |
|
114 |
def clean_text(self, text, is_title=False):
|
115 |
-
"""
|
116 |
-
if not text
|
117 |
return ""
|
118 |
-
|
119 |
# Normalize unicode characters
|
120 |
text = unicodedata.normalize("NFKD", text)
|
121 |
text = re.sub(r"[^\x00-\x7F]+", "", text)
|
122 |
-
|
123 |
-
# Remove any template variables/placeholders
|
124 |
-
text = re.sub(r'\{\{.*?\}\}', '', text)
|
125 |
-
text = re.sub(r'\{\%.*?\%\}', '', text)
|
126 |
-
text = re.sub(r'\${.*?\}', '', text)
|
127 |
-
|
128 |
if is_title:
|
129 |
# Remove common suffixes and fragments for titles
|
130 |
-
text = re.sub(r
|
131 |
-
text = re.sub(r
|
132 |
-
text =
|
133 |
-
|
134 |
-
|
135 |
-
if text.lower() in ['features', 'home', 'homepage', 'welcome']:
|
136 |
-
return ""
|
137 |
-
|
138 |
-
# Only return if we have meaningful text
|
139 |
-
cleaned = " ".join(text.split()).strip()
|
140 |
-
if len(cleaned.split()) < 2 and not is_title: # Allow single-word titles
|
141 |
-
return ""
|
142 |
-
|
143 |
-
return cleaned
|
144 |
|
145 |
-
|
146 |
-
def clean_description(self, desc):
|
147 |
-
"""Clean description text"""
|
148 |
-
if not desc:
|
149 |
-
return ""
|
150 |
-
# Remove leading dashes, hyphens, or colons
|
151 |
-
desc = re.sub(r"^[-:\s]+", "", desc)
|
152 |
-
# Remove any strings that are just "Editors", "APIs", etc.
|
153 |
-
if len(desc.split()) <= 1:
|
154 |
-
return ""
|
155 |
-
return desc.strip()
|
156 |
-
|
157 |
-
|
158 |
-
def extract_homepage_description(self, soup):
|
159 |
-
"""Extract description from homepage with multiple fallbacks"""
|
160 |
-
# Try meta description first
|
161 |
-
meta_desc = soup.find("meta", {"name": "description"})
|
162 |
-
if meta_desc and meta_desc.get("content"):
|
163 |
-
desc = meta_desc["content"]
|
164 |
-
if desc and len(desc.strip()) > 20:
|
165 |
-
return self.clean_text(desc)
|
166 |
-
|
167 |
-
# Try OpenGraph description
|
168 |
-
og_desc = soup.find("meta", property="og:description")
|
169 |
-
if og_desc and og_desc.get("content"):
|
170 |
-
desc = og_desc["content"]
|
171 |
-
if desc and len(desc.strip()) > 20:
|
172 |
-
return self.clean_text(desc)
|
173 |
-
|
174 |
-
# Try first significant paragraph
|
175 |
-
for p in soup.find_all("p"):
|
176 |
-
text = p.get_text().strip()
|
177 |
-
if len(text) > 50 and not any(x in text.lower() for x in ["cookie", "accept", "privacy"]):
|
178 |
-
return self.clean_text(text)
|
179 |
-
|
180 |
-
# Try main content area if exists
|
181 |
-
main = soup.find("main")
|
182 |
-
if main:
|
183 |
-
first_p = main.find("p")
|
184 |
-
if first_p:
|
185 |
-
text = first_p.get_text().strip()
|
186 |
-
if len(text) > 50:
|
187 |
-
return self.clean_text(text)
|
188 |
-
|
189 |
-
return None
|
190 |
-
|
191 |
-
async def crawl_page(self, url, depth, base_domain):
|
192 |
-
"""Crawl a single page and extract information"""
|
193 |
-
if (
|
194 |
-
depth > self.max_depth
|
195 |
-
or url in self.visited_urls
|
196 |
-
or len(self.visited_urls) >= self.max_pages
|
197 |
-
):
|
198 |
-
return []
|
199 |
-
|
200 |
-
try:
|
201 |
-
await asyncio.sleep(1) # Be polite to servers
|
202 |
-
async with aiohttp.ClientSession() as session:
|
203 |
-
async with session.get(url, headers=self.headers, allow_redirects=True) as response:
|
204 |
-
if response.status == 403:
|
205 |
-
# Try with alternative headers
|
206 |
-
alt_headers = {
|
207 |
-
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36",
|
208 |
-
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
|
209 |
-
}
|
210 |
-
async with session.get(url, headers=alt_headers, allow_redirects=True) as retry_response:
|
211 |
-
if retry_response.status != 200:
|
212 |
-
return []
|
213 |
-
text = await retry_response.text()
|
214 |
-
elif response.status != 200:
|
215 |
-
return []
|
216 |
-
else:
|
217 |
-
text = await response.text()
|
218 |
-
|
219 |
-
self.visited_urls.add(url)
|
220 |
-
soup = BeautifulSoup(text, "html.parser")
|
221 |
-
|
222 |
-
# Extract title with fallbacks
|
223 |
-
title = None
|
224 |
-
meta_title = soup.find("meta", property="og:title")
|
225 |
-
if meta_title and meta_title.get("content"):
|
226 |
-
title = meta_title["content"]
|
227 |
-
if not title:
|
228 |
-
title_tag = soup.find("title")
|
229 |
-
if title_tag:
|
230 |
-
title = title_tag.text
|
231 |
-
if not title:
|
232 |
-
h1_tag = soup.find("h1")
|
233 |
-
if h1_tag:
|
234 |
-
title = h1_tag.text
|
235 |
-
if not title:
|
236 |
-
title = url.split("/")[-1]
|
237 |
-
|
238 |
-
title = self.clean_text(title, is_title=True)
|
239 |
-
|
240 |
-
# Extract description with fallbacks
|
241 |
-
desc = None
|
242 |
-
meta_desc = soup.find("meta", {"name": "description"})
|
243 |
-
if meta_desc and meta_desc.get("content"):
|
244 |
-
desc = meta_desc["content"]
|
245 |
-
if not desc:
|
246 |
-
og_desc = soup.find("meta", property="og:description")
|
247 |
-
if og_desc and og_desc.get("content"):
|
248 |
-
desc = og_desc["content"]
|
249 |
-
if not desc:
|
250 |
-
first_p = soup.find("p")
|
251 |
-
if first_p:
|
252 |
-
desc = first_p.text
|
253 |
-
|
254 |
-
desc = self.clean_text(desc) if desc else ""
|
255 |
-
|
256 |
-
# Skip if it's duplicate content
|
257 |
-
if self.is_duplicate_content(desc, title, url):
|
258 |
-
return []
|
259 |
-
|
260 |
-
# Determine category and importance
|
261 |
-
category, importance = self.determine_category_importance(url, title, desc)
|
262 |
-
|
263 |
-
# Store metadata
|
264 |
-
clean_url = re.sub(r"#.*", "", url).rstrip("/")
|
265 |
-
if title and len(title.strip()) > 0: # Only store if we have a valid title
|
266 |
-
logger.info(f"Storing metadata for {clean_url}: {title[:30]}...")
|
267 |
-
self.url_metadata[clean_url] = {
|
268 |
-
"title": title,
|
269 |
-
"description": desc,
|
270 |
-
"category": category,
|
271 |
-
"importance": importance,
|
272 |
-
}
|
273 |
-
|
274 |
-
# Find links
|
275 |
-
links = []
|
276 |
-
for a in soup.find_all("a", href=True):
|
277 |
-
href = a["href"]
|
278 |
-
if not any(
|
279 |
-
x in href.lower()
|
280 |
-
for x in ["javascript:", "mailto:", ".pdf", ".jpg", ".png", ".gif"]
|
281 |
-
):
|
282 |
-
next_url = urljoin(url, href)
|
283 |
-
if urlparse(next_url).netloc == base_domain:
|
284 |
-
links.append(next_url)
|
285 |
-
return links
|
286 |
-
|
287 |
-
except Exception as e:
|
288 |
-
logger.error(f"Error crawling {url}: {str(e)}")
|
289 |
-
return []
|
290 |
-
|
291 |
async def process_homepage(self, url):
|
292 |
"""Specifically process the homepage to extract key metadata"""
|
293 |
try:
|
294 |
-
|
295 |
-
|
296 |
-
|
297 |
-
|
298 |
-
|
299 |
-
|
300 |
-
|
301 |
-
|
302 |
-
|
303 |
-
|
304 |
-
|
305 |
-
|
306 |
-
|
307 |
-
|
308 |
-
|
309 |
-
|
310 |
-
|
311 |
-
|
312 |
-
|
313 |
-
|
314 |
-
|
315 |
-
|
316 |
-
|
317 |
-
|
318 |
-
|
319 |
-
|
320 |
-
|
321 |
-
|
322 |
-
|
323 |
-
|
324 |
-
|
325 |
-
|
326 |
-
|
327 |
-
|
328 |
-
|
329 |
-
|
330 |
-
|
331 |
-
|
332 |
-
|
333 |
-
|
334 |
-
|
335 |
-
|
336 |
-
|
337 |
-
|
338 |
-
|
339 |
-
|
340 |
-
|
341 |
-
|
342 |
-
|
343 |
-
|
344 |
-
# Get homepage description
|
345 |
-
description = self.extract_homepage_description(soup)
|
346 |
-
|
347 |
-
self.homepage_metadata = {
|
348 |
-
"site_name": self.clean_text(site_name, is_title=True),
|
349 |
-
"description": description
|
350 |
-
}
|
351 |
-
|
352 |
except Exception as e:
|
353 |
logger.error(f"Error processing homepage {url}: {str(e)}")
|
354 |
self.homepage_metadata = {
|
355 |
-
"site_name": urlparse(url).netloc.split(
|
356 |
-
"description": None
|
357 |
}
|
358 |
-
|
359 |
async def crawl_website(self, start_url):
|
360 |
"""Crawl website starting from the given URL"""
|
361 |
try:
|
362 |
# First process the homepage
|
363 |
logger.info(f"Processing homepage: {start_url}")
|
364 |
await self.process_homepage(start_url)
|
365 |
-
|
366 |
base_domain = urlparse(start_url).netloc
|
367 |
queue = [(start_url, 0)]
|
368 |
seen = {start_url}
|
369 |
-
|
370 |
while queue and len(self.visited_urls) < self.max_pages:
|
371 |
current_url, depth = queue.pop(0)
|
372 |
if depth > self.max_depth:
|
373 |
continue
|
374 |
-
|
375 |
logger.info(f"Crawling page: {current_url} (depth: {depth})")
|
376 |
links = await self.crawl_page(current_url, depth, base_domain)
|
377 |
logger.info(f"Found {len(links)} links on {current_url}")
|
378 |
-
|
379 |
for link in links:
|
380 |
if link not in seen and urlparse(link).netloc == base_domain:
|
381 |
seen.add(link)
|
382 |
queue.append((link, depth + 1))
|
383 |
-
|
384 |
logger.info(f"Crawl completed. Visited {len(self.visited_urls)} pages")
|
385 |
-
|
386 |
except Exception as e:
|
387 |
logger.error(f"Error during crawl: {str(e)}")
|
388 |
raise
|
|
|
|
|
389 |
|
390 |
def generate_llms_txt(self):
|
391 |
"""Generate llms.txt content"""
|
392 |
-
logger.info(f"Starting generate_llms_txt with {len(self.url_metadata)} URLs")
|
393 |
-
|
394 |
if not self.url_metadata:
|
395 |
-
logger.error("No URL metadata found")
|
396 |
return "No content was found to generate llms.txt"
|
397 |
-
|
398 |
# Sort URLs by importance and remove duplicates
|
399 |
sorted_urls = []
|
400 |
seen_titles = set()
|
401 |
-
|
402 |
for url, metadata in sorted(
|
403 |
self.url_metadata.items(),
|
404 |
key=lambda x: (x[1]["importance"], x[0]),
|
@@ -407,95 +195,55 @@ class WebsiteCrawler:
|
|
407 |
if metadata["title"] not in seen_titles:
|
408 |
sorted_urls.append((url, metadata))
|
409 |
seen_titles.add(metadata["title"])
|
410 |
-
|
411 |
-
logger.info(f"Found {len(sorted_urls)} unique URLs after deduplication")
|
412 |
-
|
413 |
if not sorted_urls:
|
414 |
-
logger.error("No valid URLs found after sorting")
|
415 |
return "No valid content was found"
|
416 |
-
|
417 |
# Generate content
|
418 |
content = []
|
419 |
-
|
420 |
# Use homepage metadata for main title and description
|
421 |
main_title = self.homepage_metadata.get("site_name", "Welcome")
|
422 |
homepage_description = self.homepage_metadata.get("description")
|
423 |
-
|
424 |
-
logger.info(f"Homepage title: {main_title}")
|
425 |
-
logger.info(f"Homepage description: {homepage_description}")
|
426 |
-
|
427 |
content.append(f"# {main_title}")
|
428 |
if homepage_description:
|
429 |
content.append(f"\n> {homepage_description}")
|
430 |
-
|
431 |
-
# Fallback to first good description from content
|
432 |
for _, metadata in sorted_urls:
|
433 |
desc = self.clean_description(metadata["description"])
|
434 |
if desc and len(desc) > 20 and "null" not in desc.lower():
|
435 |
content.append(f"\n> {desc}")
|
436 |
break
|
437 |
-
|
438 |
# Group by category
|
439 |
categories = defaultdict(list)
|
440 |
for url, metadata in sorted_urls:
|
441 |
if metadata["title"] and url:
|
442 |
categories[metadata["category"]].append((url, metadata))
|
443 |
-
|
444 |
-
|
445 |
-
|
446 |
-
|
447 |
-
category_order = [
|
448 |
-
"Main",
|
449 |
-
"Documentation",
|
450 |
-
"API",
|
451 |
-
"Tools",
|
452 |
-
"About",
|
453 |
-
"News",
|
454 |
-
"Optional"
|
455 |
-
]
|
456 |
-
|
457 |
-
# Only show Main section if it has content different from the homepage description
|
458 |
-
if "Main" in categories:
|
459 |
-
main_content = categories["Main"]
|
460 |
-
if len(main_content) == 1 and main_content[0][1]["description"] == homepage_description:
|
461 |
-
logger.info("Removing duplicate Main content")
|
462 |
-
del categories["Main"]
|
463 |
-
|
464 |
-
for category in category_order:
|
465 |
-
if category in categories and categories[category]:
|
466 |
-
logger.info(f"Processing category {category} with {len(categories[category])} items")
|
467 |
content.append(f"\n## {category}")
|
468 |
-
|
469 |
-
#
|
470 |
-
category_links = sorted(
|
471 |
-
categories[category],
|
472 |
-
key=lambda x: (-len(x[1]["description"] or ""), x[1]["title"])
|
473 |
-
)
|
474 |
-
|
475 |
links = []
|
476 |
-
|
477 |
-
for url, metadata in category_links:
|
478 |
title = metadata["title"].strip()
|
479 |
desc = self.clean_description(metadata["description"])
|
480 |
-
|
481 |
-
# Skip if description is duplicate within category
|
482 |
-
if desc in seen_desc:
|
483 |
-
continue
|
484 |
-
seen_desc.add(desc)
|
485 |
-
|
486 |
if desc:
|
487 |
links.append(f"- [{title}]({url}): {desc}")
|
488 |
else:
|
489 |
links.append(f"- [{title}]({url})")
|
490 |
-
|
491 |
content.append("\n".join(links))
|
492 |
-
|
493 |
-
|
494 |
-
logger.info(f"Generated content length: {len(final_content)}")
|
495 |
-
return final_content
|
496 |
-
|
497 |
|
498 |
|
|
|
499 |
async def process_url(url, max_depth, max_pages):
|
500 |
"""Process URL and generate llms.txt"""
|
501 |
try:
|
@@ -509,11 +257,11 @@ async def process_url(url, max_depth, max_pages):
|
|
509 |
return "", "Invalid URL format. Please enter a valid URL."
|
510 |
|
511 |
logger.info(f"Starting crawl of {url}")
|
512 |
-
|
513 |
# Process website
|
514 |
crawler = WebsiteCrawler(max_depth=int(max_depth), max_pages=int(max_pages))
|
515 |
await crawler.crawl_website(url)
|
516 |
-
|
517 |
logger.info("Generating llms.txt content")
|
518 |
content = crawler.generate_llms_txt()
|
519 |
|
@@ -570,6 +318,8 @@ with gr.Blocks(
|
|
570 |
}
|
571 |
""",
|
572 |
) as iface:
|
|
|
|
|
573 |
|
574 |
with gr.Row():
|
575 |
url_input = gr.Textbox(
|
@@ -606,4 +356,4 @@ with gr.Blocks(
|
|
606 |
)
|
607 |
|
608 |
if __name__ == "__main__":
|
609 |
-
iface.launch()
|
|
|
9 |
import unicodedata
|
10 |
import logging
|
11 |
import ssl
|
12 |
+
import brotli # Add this import
|
13 |
|
14 |
logging.basicConfig(level=logging.INFO)
|
15 |
logger = logging.getLogger(__name__)
|
|
|
29 |
"Accept-Encoding": "gzip, deflate, br",
|
30 |
"DNT": "1",
|
31 |
"Connection": "keep-alive",
|
32 |
+
"Upgrade-Insecure-Requests": "1",
|
33 |
}
|
34 |
+
self.session = None
|
35 |
|
36 |
+
async def get_session(self):
|
37 |
+
if self.session is None:
|
38 |
+
ssl_context = ssl.create_default_context()
|
39 |
+
ssl_context.check_hostname = False
|
40 |
+
ssl_context.verify_mode = ssl.CERT_NONE
|
41 |
+
|
42 |
+
# Configure client with brotli support
|
43 |
+
connector = aiohttp.TCPConnector(ssl=ssl_context)
|
44 |
+
self.session = aiohttp.ClientSession(
|
45 |
+
connector=connector, timeout=aiohttp.ClientTimeout(total=30)
|
46 |
+
)
|
47 |
+
return self.session
|
48 |
+
|
49 |
+
async def decode_response(self, response):
|
50 |
+
"""Handle various content encodings including brotli"""
|
51 |
+
content_encoding = response.headers.get("Content-Encoding", "").lower()
|
52 |
+
content = await response.read()
|
53 |
+
|
54 |
+
if content_encoding == "br":
|
55 |
+
try:
|
56 |
+
decoded = brotli.decompress(content)
|
57 |
+
return decoded.decode("utf-8", errors="ignore")
|
58 |
+
except Exception as e:
|
59 |
+
logger.error(f"Error decoding brotli content: {str(e)}")
|
60 |
+
return content.decode("utf-8", errors="ignore")
|
61 |
+
elif content_encoding == "gzip":
|
62 |
+
import gzip
|
63 |
+
|
64 |
+
try:
|
65 |
+
decoded = gzip.decompress(content)
|
66 |
+
return decoded.decode("utf-8", errors="ignore")
|
67 |
+
except Exception as e:
|
68 |
+
logger.error(f"Error decoding gzip content: {str(e)}")
|
69 |
+
return content.decode("utf-8", errors="ignore")
|
70 |
+
else:
|
71 |
+
return content.decode("utf-8", errors="ignore")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
72 |
|
73 |
def clean_text(self, text, is_title=False):
|
74 |
+
"""Clean and normalize text"""
|
75 |
+
if not text:
|
76 |
return ""
|
|
|
77 |
# Normalize unicode characters
|
78 |
text = unicodedata.normalize("NFKD", text)
|
79 |
text = re.sub(r"[^\x00-\x7F]+", "", text)
|
80 |
+
|
|
|
|
|
|
|
|
|
|
|
81 |
if is_title:
|
82 |
# Remove common suffixes and fragments for titles
|
83 |
+
text = re.sub(r"\s*[\|\-#:•].*", "", text)
|
84 |
+
text = re.sub(r"^\s*Welcome to\s+", "", text)
|
85 |
+
text = text.replace("docusaurus_skipToContent_fallback", "")
|
86 |
+
|
87 |
+
return " ".join(text.split()).strip()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
88 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
89 |
async def process_homepage(self, url):
|
90 |
"""Specifically process the homepage to extract key metadata"""
|
91 |
try:
|
92 |
+
session = await self.get_session()
|
93 |
+
async with session.get(
|
94 |
+
url, headers=self.headers, allow_redirects=True
|
95 |
+
) as response:
|
96 |
+
if response.status != 200:
|
97 |
+
raise Exception(
|
98 |
+
f"Failed to fetch homepage: status {response.status}"
|
99 |
+
)
|
100 |
+
|
101 |
+
text = await self.decode_response(response)
|
102 |
+
soup = BeautifulSoup(text, "html.parser")
|
103 |
+
|
104 |
+
# Extract site name
|
105 |
+
site_name = None
|
106 |
+
site_meta = soup.find("meta", property="og:site_name")
|
107 |
+
if site_meta and site_meta.get("content"):
|
108 |
+
site_name = site_meta["content"]
|
109 |
+
|
110 |
+
if not site_name:
|
111 |
+
title_tag = soup.find("title")
|
112 |
+
if title_tag:
|
113 |
+
site_name = title_tag.text.split("|")[0].strip()
|
114 |
+
|
115 |
+
if not site_name:
|
116 |
+
site_name = urlparse(url).netloc.split(".")[0].capitalize()
|
117 |
+
|
118 |
+
# Get homepage description
|
119 |
+
description = None
|
120 |
+
meta_desc = soup.find("meta", {"name": "description"})
|
121 |
+
if meta_desc and meta_desc.get("content"):
|
122 |
+
description = meta_desc["content"]
|
123 |
+
|
124 |
+
if not description:
|
125 |
+
og_desc = soup.find("meta", property="og:description")
|
126 |
+
if og_desc and og_desc.get("content"):
|
127 |
+
description = og_desc["content"]
|
128 |
+
|
129 |
+
if not description:
|
130 |
+
first_p = soup.find("p")
|
131 |
+
if first_p:
|
132 |
+
description = first_p.text
|
133 |
+
|
134 |
+
self.homepage_metadata = {
|
135 |
+
"site_name": self.clean_text(site_name, is_title=True),
|
136 |
+
"description": (
|
137 |
+
self.clean_text(description) if description else None
|
138 |
+
),
|
139 |
+
}
|
140 |
+
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
141 |
except Exception as e:
|
142 |
logger.error(f"Error processing homepage {url}: {str(e)}")
|
143 |
self.homepage_metadata = {
|
144 |
+
"site_name": urlparse(url).netloc.split(".")[0].capitalize(),
|
145 |
+
"description": None,
|
146 |
}
|
147 |
+
|
148 |
async def crawl_website(self, start_url):
|
149 |
"""Crawl website starting from the given URL"""
|
150 |
try:
|
151 |
# First process the homepage
|
152 |
logger.info(f"Processing homepage: {start_url}")
|
153 |
await self.process_homepage(start_url)
|
154 |
+
|
155 |
base_domain = urlparse(start_url).netloc
|
156 |
queue = [(start_url, 0)]
|
157 |
seen = {start_url}
|
158 |
+
|
159 |
while queue and len(self.visited_urls) < self.max_pages:
|
160 |
current_url, depth = queue.pop(0)
|
161 |
if depth > self.max_depth:
|
162 |
continue
|
163 |
+
|
164 |
logger.info(f"Crawling page: {current_url} (depth: {depth})")
|
165 |
links = await self.crawl_page(current_url, depth, base_domain)
|
166 |
logger.info(f"Found {len(links)} links on {current_url}")
|
167 |
+
|
168 |
for link in links:
|
169 |
if link not in seen and urlparse(link).netloc == base_domain:
|
170 |
seen.add(link)
|
171 |
queue.append((link, depth + 1))
|
172 |
+
|
173 |
logger.info(f"Crawl completed. Visited {len(self.visited_urls)} pages")
|
174 |
+
|
175 |
except Exception as e:
|
176 |
logger.error(f"Error during crawl: {str(e)}")
|
177 |
raise
|
178 |
+
finally:
|
179 |
+
await self.cleanup()
|
180 |
|
181 |
def generate_llms_txt(self):
|
182 |
"""Generate llms.txt content"""
|
|
|
|
|
183 |
if not self.url_metadata:
|
|
|
184 |
return "No content was found to generate llms.txt"
|
185 |
+
|
186 |
# Sort URLs by importance and remove duplicates
|
187 |
sorted_urls = []
|
188 |
seen_titles = set()
|
189 |
+
|
190 |
for url, metadata in sorted(
|
191 |
self.url_metadata.items(),
|
192 |
key=lambda x: (x[1]["importance"], x[0]),
|
|
|
195 |
if metadata["title"] not in seen_titles:
|
196 |
sorted_urls.append((url, metadata))
|
197 |
seen_titles.add(metadata["title"])
|
198 |
+
|
|
|
|
|
199 |
if not sorted_urls:
|
|
|
200 |
return "No valid content was found"
|
201 |
+
|
202 |
# Generate content
|
203 |
content = []
|
204 |
+
|
205 |
# Use homepage metadata for main title and description
|
206 |
main_title = self.homepage_metadata.get("site_name", "Welcome")
|
207 |
homepage_description = self.homepage_metadata.get("description")
|
208 |
+
|
|
|
|
|
|
|
209 |
content.append(f"# {main_title}")
|
210 |
if homepage_description:
|
211 |
content.append(f"\n> {homepage_description}")
|
212 |
+
else:
|
213 |
+
# Fallback to first good description from content
|
214 |
for _, metadata in sorted_urls:
|
215 |
desc = self.clean_description(metadata["description"])
|
216 |
if desc and len(desc) > 20 and "null" not in desc.lower():
|
217 |
content.append(f"\n> {desc}")
|
218 |
break
|
219 |
+
|
220 |
# Group by category
|
221 |
categories = defaultdict(list)
|
222 |
for url, metadata in sorted_urls:
|
223 |
if metadata["title"] and url:
|
224 |
categories[metadata["category"]].append((url, metadata))
|
225 |
+
|
226 |
+
# Add sections
|
227 |
+
for category in ["Docs", "API", "Guides", "Examples", "Blog", "Optional"]:
|
228 |
+
if category in categories:
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
229 |
content.append(f"\n## {category}")
|
230 |
+
|
231 |
+
# Add links without extra newlines
|
|
|
|
|
|
|
|
|
|
|
232 |
links = []
|
233 |
+
for url, metadata in categories[category]:
|
|
|
234 |
title = metadata["title"].strip()
|
235 |
desc = self.clean_description(metadata["description"])
|
|
|
|
|
|
|
|
|
|
|
|
|
236 |
if desc:
|
237 |
links.append(f"- [{title}]({url}): {desc}")
|
238 |
else:
|
239 |
links.append(f"- [{title}]({url})")
|
240 |
+
|
241 |
content.append("\n".join(links))
|
242 |
+
|
243 |
+
return "\n".join(content)
|
|
|
|
|
|
|
244 |
|
245 |
|
246 |
+
# Process URL function (outside the class)
|
247 |
async def process_url(url, max_depth, max_pages):
|
248 |
"""Process URL and generate llms.txt"""
|
249 |
try:
|
|
|
257 |
return "", "Invalid URL format. Please enter a valid URL."
|
258 |
|
259 |
logger.info(f"Starting crawl of {url}")
|
260 |
+
|
261 |
# Process website
|
262 |
crawler = WebsiteCrawler(max_depth=int(max_depth), max_pages=int(max_pages))
|
263 |
await crawler.crawl_website(url)
|
264 |
+
|
265 |
logger.info("Generating llms.txt content")
|
266 |
content = crawler.generate_llms_txt()
|
267 |
|
|
|
318 |
}
|
319 |
""",
|
320 |
) as iface:
|
321 |
+
gr.Markdown("# llms.txt Generator")
|
322 |
+
gr.Markdown("Generate an llms.txt file from a website following the specification.")
|
323 |
|
324 |
with gr.Row():
|
325 |
url_input = gr.Textbox(
|
|
|
356 |
)
|
357 |
|
358 |
if __name__ == "__main__":
|
359 |
+
iface.launch()
|