cyberandy commited on
Commit
1de7c37
1 Parent(s): dd2349f
Files changed (1) hide show
  1. app.py +154 -131
app.py CHANGED
@@ -4,9 +4,12 @@ from bs4 import BeautifulSoup
4
  import re
5
  from urllib.parse import urljoin, urlparse
6
  import asyncio
 
7
  from collections import defaultdict
8
  import unicodedata
9
  import logging
 
 
10
 
11
  logging.basicConfig(level=logging.INFO)
12
  logger = logging.getLogger(__name__)
@@ -18,9 +21,54 @@ class WebsiteCrawler:
18
  self.max_pages = max_pages
19
  self.visited_urls = set()
20
  self.url_metadata = defaultdict(dict)
 
21
  self.headers = {
22
- "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"
 
 
 
 
 
 
23
  }
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
24
 
25
  def clean_text(self, text, is_title=False):
26
  """Clean and normalize text"""
@@ -38,131 +86,97 @@ class WebsiteCrawler:
38
 
39
  return " ".join(text.split()).strip()
40
 
41
- async def crawl_page(self, url, depth, base_domain):
42
- """Crawl a single page and extract information"""
43
- if (
44
- depth > self.max_depth
45
- or url in self.visited_urls
46
- or len(self.visited_urls) >= self.max_pages
47
- ):
48
- return []
49
-
50
  try:
51
- response = requests.get(url, headers=self.headers, timeout=10)
52
- response.encoding = "utf-8"
53
- self.visited_urls.add(url)
54
-
55
- soup = BeautifulSoup(response.text, "html.parser")
56
-
57
- # Extract title with fallbacks
58
- title = None
59
- meta_title = soup.find("meta", property="og:title")
60
- if meta_title and meta_title.get("content"):
61
- title = meta_title["content"]
62
- if not title:
63
- title_tag = soup.find("title")
64
- if title_tag:
65
- title = title_tag.text
66
- if not title:
67
- h1_tag = soup.find("h1")
68
- if h1_tag:
69
- title = h1_tag.text
70
- if not title:
71
- title = url.split("/")[-1]
72
-
73
- title = self.clean_text(title, is_title=True)
74
-
75
- # Extract description with fallbacks
76
- desc = None
77
- meta_desc = soup.find("meta", {"name": "description"})
78
- if meta_desc and meta_desc.get("content"):
79
- desc = meta_desc["content"]
80
- if not desc:
81
- og_desc = soup.find("meta", property="og:description")
82
- if og_desc and og_desc.get("content"):
83
- desc = og_desc["content"]
84
- if not desc:
85
- first_p = soup.find("p")
86
- if first_p:
87
- desc = first_p.text
88
-
89
- desc = self.clean_text(desc) if desc else ""
90
-
91
- # Determine category and importance
92
- url_lower = url.lower()
93
- category = "Optional"
94
- importance = 0
95
-
96
- if "docs" in url_lower or "documentation" in url_lower:
97
- category = "Docs"
98
- importance = 5
99
- elif "api" in url_lower:
100
- category = "API"
101
- importance = 4
102
- elif "guide" in url_lower or "tutorial" in url_lower:
103
- category = "Guides"
104
- importance = 3
105
- elif "example" in url_lower:
106
- category = "Examples"
107
- importance = 2
108
- elif "blog" in url_lower:
109
- category = "Blog"
110
- importance = 1
111
-
112
- # Store metadata
113
- clean_url = re.sub(r"#.*", "", url).rstrip("/")
114
- if title and len(title.strip()) > 0: # Only store if we have a valid title
115
- self.url_metadata[clean_url] = {
116
- "title": title,
117
- "description": desc,
118
- "category": category,
119
- "importance": importance,
120
  }
121
 
122
- # Find links
123
- links = []
124
- for a in soup.find_all("a", href=True):
125
- href = a["href"]
126
- if not any(
127
- x in href.lower()
128
- for x in ["javascript:", "mailto:", ".pdf", ".jpg", ".png", ".gif"]
129
- ):
130
- next_url = urljoin(url, href)
131
- if urlparse(next_url).netloc == base_domain:
132
- links.append(next_url)
133
- return links
134
-
135
  except Exception as e:
136
- logger.error(f"Error crawling {url}: {str(e)}")
137
- return []
 
 
 
138
 
139
  async def crawl_website(self, start_url):
140
  """Crawl website starting from the given URL"""
141
- base_domain = urlparse(start_url).netloc
142
- queue = [(start_url, 0)]
143
- seen = {start_url}
144
-
145
- while queue and len(self.visited_urls) < self.max_pages:
146
- current_url, depth = queue.pop(0)
147
- if depth > self.max_depth:
148
- continue
149
-
150
- links = await self.crawl_page(current_url, depth, base_domain)
151
- for link in links:
152
- if link not in seen and urlparse(link).netloc == base_domain:
153
- seen.add(link)
154
- queue.append((link, depth + 1))
155
-
156
- def clean_description(self, desc):
157
- """Clean description text"""
158
- if not desc:
159
- return ""
160
- # Remove leading dashes, hyphens, or colons
161
- desc = re.sub(r"^[-:\s]+", "", desc)
162
- # Remove any strings that are just "Editors", "APIs", etc.
163
- if len(desc.split()) <= 1:
164
- return ""
165
- return desc.strip()
 
 
 
 
 
166
 
167
  def generate_llms_txt(self):
168
  """Generate llms.txt content"""
@@ -188,20 +202,20 @@ class WebsiteCrawler:
188
  # Generate content
189
  content = []
190
 
191
- # Find the best title for the main header (prefer "Welcome" or "Overview")
192
- main_title = "Welcome" # Default to Welcome
193
-
194
- # Find a good description for the blockquote
195
- best_description = None
196
- for _, metadata in sorted_urls:
197
- desc = self.clean_description(metadata["description"])
198
- if desc and len(desc) > 20 and "null" not in desc.lower():
199
- best_description = desc
200
- break
201
 
202
  content.append(f"# {main_title}")
203
- if best_description:
204
- content.append(f"\n> {best_description}")
 
 
 
 
 
 
 
205
 
206
  # Group by category
207
  categories = defaultdict(list)
@@ -229,6 +243,7 @@ class WebsiteCrawler:
229
  return "\n".join(content)
230
 
231
 
 
232
  async def process_url(url, max_depth, max_pages):
233
  """Process URL and generate llms.txt"""
234
  try:
@@ -241,14 +256,22 @@ async def process_url(url, max_depth, max_pages):
241
  if not all([result.scheme, result.netloc]):
242
  return "", "Invalid URL format. Please enter a valid URL."
243
 
 
 
244
  # Process website
245
  crawler = WebsiteCrawler(max_depth=int(max_depth), max_pages=int(max_pages))
246
  await crawler.crawl_website(url)
 
 
247
  content = crawler.generate_llms_txt()
248
 
 
 
 
249
  return content, f"Successfully crawled {len(crawler.visited_urls)} pages."
250
 
251
  except Exception as e:
 
252
  return "", f"Error: {str(e)}"
253
 
254
 
 
4
  import re
5
  from urllib.parse import urljoin, urlparse
6
  import asyncio
7
+ import aiohttp
8
  from collections import defaultdict
9
  import unicodedata
10
  import logging
11
+ import ssl
12
+ import brotli # Add this import
13
 
14
  logging.basicConfig(level=logging.INFO)
15
  logger = logging.getLogger(__name__)
 
21
  self.max_pages = max_pages
22
  self.visited_urls = set()
23
  self.url_metadata = defaultdict(dict)
24
+ self.homepage_metadata = None
25
  self.headers = {
26
+ "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
27
+ "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8",
28
+ "Accept-Language": "en-US,en;q=0.5",
29
+ "Accept-Encoding": "gzip, deflate, br",
30
+ "DNT": "1",
31
+ "Connection": "keep-alive",
32
+ "Upgrade-Insecure-Requests": "1",
33
  }
34
+ self.session = None
35
+
36
+ async def get_session(self):
37
+ if self.session is None:
38
+ ssl_context = ssl.create_default_context()
39
+ ssl_context.check_hostname = False
40
+ ssl_context.verify_mode = ssl.CERT_NONE
41
+
42
+ # Configure client with brotli support
43
+ connector = aiohttp.TCPConnector(ssl=ssl_context)
44
+ self.session = aiohttp.ClientSession(
45
+ connector=connector, timeout=aiohttp.ClientTimeout(total=30)
46
+ )
47
+ return self.session
48
+
49
+ async def decode_response(self, response):
50
+ """Handle various content encodings including brotli"""
51
+ content_encoding = response.headers.get("Content-Encoding", "").lower()
52
+ content = await response.read()
53
+
54
+ if content_encoding == "br":
55
+ try:
56
+ decoded = brotli.decompress(content)
57
+ return decoded.decode("utf-8", errors="ignore")
58
+ except Exception as e:
59
+ logger.error(f"Error decoding brotli content: {str(e)}")
60
+ return content.decode("utf-8", errors="ignore")
61
+ elif content_encoding == "gzip":
62
+ import gzip
63
+
64
+ try:
65
+ decoded = gzip.decompress(content)
66
+ return decoded.decode("utf-8", errors="ignore")
67
+ except Exception as e:
68
+ logger.error(f"Error decoding gzip content: {str(e)}")
69
+ return content.decode("utf-8", errors="ignore")
70
+ else:
71
+ return content.decode("utf-8", errors="ignore")
72
 
73
  def clean_text(self, text, is_title=False):
74
  """Clean and normalize text"""
 
86
 
87
  return " ".join(text.split()).strip()
88
 
89
+ async def process_homepage(self, url):
90
+ """Specifically process the homepage to extract key metadata"""
 
 
 
 
 
 
 
91
  try:
92
+ session = await self.get_session()
93
+ async with session.get(
94
+ url, headers=self.headers, allow_redirects=True
95
+ ) as response:
96
+ if response.status != 200:
97
+ raise Exception(
98
+ f"Failed to fetch homepage: status {response.status}"
99
+ )
100
+
101
+ text = await self.decode_response(response)
102
+ soup = BeautifulSoup(text, "html.parser")
103
+
104
+ # Extract site name
105
+ site_name = None
106
+ site_meta = soup.find("meta", property="og:site_name")
107
+ if site_meta and site_meta.get("content"):
108
+ site_name = site_meta["content"]
109
+
110
+ if not site_name:
111
+ title_tag = soup.find("title")
112
+ if title_tag:
113
+ site_name = title_tag.text.split("|")[0].strip()
114
+
115
+ if not site_name:
116
+ site_name = urlparse(url).netloc.split(".")[0].capitalize()
117
+
118
+ # Get homepage description
119
+ description = None
120
+ meta_desc = soup.find("meta", {"name": "description"})
121
+ if meta_desc and meta_desc.get("content"):
122
+ description = meta_desc["content"]
123
+
124
+ if not description:
125
+ og_desc = soup.find("meta", property="og:description")
126
+ if og_desc and og_desc.get("content"):
127
+ description = og_desc["content"]
128
+
129
+ if not description:
130
+ first_p = soup.find("p")
131
+ if first_p:
132
+ description = first_p.text
133
+
134
+ self.homepage_metadata = {
135
+ "site_name": self.clean_text(site_name, is_title=True),
136
+ "description": (
137
+ self.clean_text(description) if description else None
138
+ ),
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
139
  }
140
 
 
 
 
 
 
 
 
 
 
 
 
 
 
141
  except Exception as e:
142
+ logger.error(f"Error processing homepage {url}: {str(e)}")
143
+ self.homepage_metadata = {
144
+ "site_name": urlparse(url).netloc.split(".")[0].capitalize(),
145
+ "description": None,
146
+ }
147
 
148
  async def crawl_website(self, start_url):
149
  """Crawl website starting from the given URL"""
150
+ try:
151
+ # First process the homepage
152
+ logger.info(f"Processing homepage: {start_url}")
153
+ await self.process_homepage(start_url)
154
+
155
+ base_domain = urlparse(start_url).netloc
156
+ queue = [(start_url, 0)]
157
+ seen = {start_url}
158
+
159
+ while queue and len(self.visited_urls) < self.max_pages:
160
+ current_url, depth = queue.pop(0)
161
+ if depth > self.max_depth:
162
+ continue
163
+
164
+ logger.info(f"Crawling page: {current_url} (depth: {depth})")
165
+ links = await self.crawl_page(current_url, depth, base_domain)
166
+ logger.info(f"Found {len(links)} links on {current_url}")
167
+
168
+ for link in links:
169
+ if link not in seen and urlparse(link).netloc == base_domain:
170
+ seen.add(link)
171
+ queue.append((link, depth + 1))
172
+
173
+ logger.info(f"Crawl completed. Visited {len(self.visited_urls)} pages")
174
+
175
+ except Exception as e:
176
+ logger.error(f"Error during crawl: {str(e)}")
177
+ raise
178
+ finally:
179
+ await self.cleanup()
180
 
181
  def generate_llms_txt(self):
182
  """Generate llms.txt content"""
 
202
  # Generate content
203
  content = []
204
 
205
+ # Use homepage metadata for main title and description
206
+ main_title = self.homepage_metadata.get("site_name", "Welcome")
207
+ homepage_description = self.homepage_metadata.get("description")
 
 
 
 
 
 
 
208
 
209
  content.append(f"# {main_title}")
210
+ if homepage_description:
211
+ content.append(f"\n> {homepage_description}")
212
+ else:
213
+ # Fallback to first good description from content
214
+ for _, metadata in sorted_urls:
215
+ desc = self.clean_description(metadata["description"])
216
+ if desc and len(desc) > 20 and "null" not in desc.lower():
217
+ content.append(f"\n> {desc}")
218
+ break
219
 
220
  # Group by category
221
  categories = defaultdict(list)
 
243
  return "\n".join(content)
244
 
245
 
246
+ # Process URL function (outside the class)
247
  async def process_url(url, max_depth, max_pages):
248
  """Process URL and generate llms.txt"""
249
  try:
 
256
  if not all([result.scheme, result.netloc]):
257
  return "", "Invalid URL format. Please enter a valid URL."
258
 
259
+ logger.info(f"Starting crawl of {url}")
260
+
261
  # Process website
262
  crawler = WebsiteCrawler(max_depth=int(max_depth), max_pages=int(max_pages))
263
  await crawler.crawl_website(url)
264
+
265
+ logger.info("Generating llms.txt content")
266
  content = crawler.generate_llms_txt()
267
 
268
+ if not content or content.strip() == "":
269
+ return "", "No content was generated. Check the logs for details."
270
+
271
  return content, f"Successfully crawled {len(crawler.visited_urls)} pages."
272
 
273
  except Exception as e:
274
+ logger.error(f"Error processing URL {url}: {str(e)}")
275
  return "", f"Error: {str(e)}"
276
 
277