from urllib.parse import urljoin, urlparse import requests from bs4 import BeautifulSoup from autogpt.config import Config from autogpt.llm_utils import create_chat_completion from autogpt.memory import get_memory cfg = Config() memory = get_memory(cfg) session = requests.Session() session.headers.update({"User-Agent": cfg.user_agent}) # Function to check if the URL is valid def is_valid_url(url): try: result = urlparse(url) return all([result.scheme, result.netloc]) except ValueError: return False # Function to sanitize the URL def sanitize_url(url): return urljoin(url, urlparse(url).path) # Define and check for local file address prefixes def check_local_file_access(url): local_prefixes = [ "file:///", "file://localhost", "http://localhost", "https://localhost", ] return any(url.startswith(prefix) for prefix in local_prefixes) def get_response(url, timeout=10): try: # Restrict access to local files if check_local_file_access(url): raise ValueError("Access to local files is restricted") # Most basic check if the URL is valid: if not url.startswith("http://") and not url.startswith("https://"): raise ValueError("Invalid URL format") sanitized_url = sanitize_url(url) response = session.get(sanitized_url, timeout=timeout) # Check if the response contains an HTTP error if response.status_code >= 400: return None, "Error: HTTP " + str(response.status_code) + " error" return response, None except ValueError as ve: # Handle invalid URL format return None, "Error: " + str(ve) except requests.exceptions.RequestException as re: # Handle exceptions related to the HTTP request # (e.g., connection errors, timeouts, etc.) return None, "Error: " + str(re) def scrape_text(url): """Scrape text from a webpage""" response, error_message = get_response(url) if error_message: return error_message if not response: return "Error: Could not get response" soup = BeautifulSoup(response.text, "html.parser") for script in soup(["script", "style"]): script.extract() text = soup.get_text() lines = (line.strip() for line in text.splitlines()) chunks = (phrase.strip() for line in lines for phrase in line.split(" ")) text = "\n".join(chunk for chunk in chunks if chunk) return text def extract_hyperlinks(soup): """Extract hyperlinks from a BeautifulSoup object""" hyperlinks = [] for link in soup.find_all("a", href=True): hyperlinks.append((link.text, link["href"])) return hyperlinks def format_hyperlinks(hyperlinks): """Format hyperlinks into a list of strings""" formatted_links = [] for link_text, link_url in hyperlinks: formatted_links.append(f"{link_text} ({link_url})") return formatted_links def scrape_links(url): """Scrape links from a webpage""" response, error_message = get_response(url) if error_message: return error_message if not response: return "Error: Could not get response" soup = BeautifulSoup(response.text, "html.parser") for script in soup(["script", "style"]): script.extract() hyperlinks = extract_hyperlinks(soup) return format_hyperlinks(hyperlinks) def split_text(text, max_length=cfg.browse_chunk_max_length): """Split text into chunks of a maximum length""" paragraphs = text.split("\n") current_length = 0 current_chunk = [] for paragraph in paragraphs: if current_length + len(paragraph) + 1 <= max_length: current_chunk.append(paragraph) current_length += len(paragraph) + 1 else: yield "\n".join(current_chunk) current_chunk = [paragraph] current_length = len(paragraph) + 1 if current_chunk: yield "\n".join(current_chunk) def create_message(chunk, question): """Create a message for the user to summarize a chunk of text""" return { "role": "user", "content": f'"""{chunk}""" Using the above text, please answer the following' f' question: "{question}" -- if the question cannot be answered using the' " text, please summarize the text.", } def summarize_text(url, text, question): """Summarize text using the LLM model""" if not text: return "Error: No text to summarize" text_length = len(text) print(f"Text length: {text_length} characters") summaries = [] chunks = list(split_text(text)) for i, chunk in enumerate(chunks): print(f"Adding chunk {i + 1} / {len(chunks)} to memory") memory_to_add = f"Source: {url}\n" f"Raw content part#{i + 1}: {chunk}" memory.add(memory_to_add) print(f"Summarizing chunk {i + 1} / {len(chunks)}") messages = [create_message(chunk, question)] summary = create_chat_completion( model=cfg.fast_llm_model, messages=messages, max_tokens=cfg.browse_summary_max_token, ) summaries.append(summary) print(f"Added chunk {i + 1} summary to memory") memory_to_add = f"Source: {url}\n" f"Content summary part#{i + 1}: {summary}" memory.add(memory_to_add) print(f"Summarized {len(chunks)} chunks.") combined_summary = "\n".join(summaries) messages = [create_message(combined_summary, question)] final_summary = create_chat_completion( model=cfg.fast_llm_model, messages=messages, max_tokens=cfg.browse_summary_max_token, ) return final_summary