Spaces:
Running
Running
''' | |
Guidelines for Creating and Utilizing Tools in tools.py: | |
1. Initial Assessment: | |
Review Existing Tools: | |
Before adding new functions, thoroughly read through tools.py to understand the existing tools and their functionalities. | |
Determine if an existing tool can be adapted or extended to meet the current needs, avoiding redundancy. | |
2. Tool Creation and Function Design: | |
Create Within tools.py: | |
Add new tools as functions within tools.py. If tools.py doesn't exist, create it. | |
Ensure each function is self-contained and focused on a single task for modularity. | |
Design for Importing: | |
Design tools to be imported and executed via terminal commands. Do not include execution code that runs when tools.py is imported. | |
Follow Best Practices: | |
Use clear and descriptive function names that reflect their general purpose. | |
Include docstrings for each function, detailing the purpose, parameters, and expected outputs. | |
Adhere to PEP 8 style guidelines for readable and maintainable code. | |
3. Generalization: | |
Broad Input Handling: | |
Design functions to handle a wide range of inputs, enhancing reusability for future tasks. | |
Accept parameters that allow the function to be applicable in various scenarios (e.g., any stock ticker, URL, or data file). | |
Flexible Functionality: | |
Ensure functions can process different data types and structures when applicable. | |
Avoid hardcoding values; use parameters and defaults where necessary. | |
Modularity: | |
If a task involves multiple distinct operations, split them into separate functions. | |
This approach enhances clarity and allows for individual functions to be reused independently. | |
4. Execution and Script Management: | |
Import and Run via Terminal: | |
Do not execute tools.py directly. Instead, import the necessary functions and run them using the terminal. | |
Use the command: | |
bash | |
Copy code | |
python -c "from tools import function_name; function_name(args)" | |
Replace function_name and args with the appropriate function and arguments. | |
Avoid Additional Scripts: | |
Do not create extra .py files or scripts for execution purposes. | |
Keep all tool functions within tools.py and execute them using the import method shown above. | |
5. Output: | |
Console Printing: | |
Ensure that all tools print their output directly to the console. | |
Format the output for readability, using clear messages and organizing data in a logical manner. | |
No Return Statements for Output: | |
While functions can return values for internal use, the primary results should be displayed using print() statements. | |
6. Error Handling: | |
Input Validation: | |
Validate all input parameters to catch errors before execution. | |
Provide informative error messages to guide correct usage. | |
Exception Management: | |
Use try-except blocks to handle potential exceptions without crashing the program. | |
Log errors where appropriate, and ensure they don't expose sensitive information. | |
Debugging and Testing: | |
Test functions with various inputs, including edge cases, to ensure robustness. | |
If errors are found, revise and debug the functions promptly. | |
7. Post-Creation: | |
Execute to Fulfill Requests: | |
After creating or updating tools, execute them as needed to fulfill user requests unless the request was solely for tool creation. | |
Documentation: | |
Update any relevant documentation or comments within tools.py to reflect new additions or changes. | |
Consider maintaining a usage example within the docstring for complex functions. | |
8. Maintenance and Refactoring: | |
Regular Review: | |
Periodically review tools.py to identify opportunities for optimization and improvement. | |
Remove or update deprecated functions that are no longer effective or necessary. | |
Enhance Generalization: | |
Refactor functions to improve their general applicability as new requirements emerge. | |
Stay vigilant for patterns that can be abstracted into more general solutions. | |
9. Compliance and Security: | |
Data Protection: | |
Ensure that tools handle data securely, especially when dealing with sensitive information. | |
Avoid hardcoding credentials or exposing private data through outputs. | |
Licensing and Dependencies: | |
Verify that any third-party libraries used are properly licensed and documented. | |
Include installation instructions for dependencies if they are not part of the standard library. | |
''' | |
# Your tools will be defined below this line | |
import os | |
from datetime import datetime, timedelta | |
import PyPDF2 | |
from pathlib import Path | |
import json | |
import re | |
import requests | |
from typing import List, Dict, Optional, Union | |
import subprocess | |
import sys | |
from git import Repo | |
def get_current_month_folder() -> str: | |
"""Returns the path to the current month's folder.""" | |
base_path = r"C:\Users\admin\Dropbox\Current\2024" | |
current_month = datetime.now().strftime("%B") # Full month name | |
return os.path.join(base_path, current_month) | |
def get_pdfs_for_date(target_date: datetime = None) -> List[str]: | |
""" | |
Finds all PDFs saved on a specific date in the current month's folder structure. | |
Args: | |
target_date: datetime object for the target date. If None, uses today's date. | |
Returns a list of full paths to PDF files. | |
""" | |
if target_date is None: | |
target_date = datetime.now() | |
target_date_str = target_date.strftime("%Y-%m-%d") | |
month_folder = get_current_month_folder() | |
pdf_files = [] | |
# Walk through all subdirectories | |
for root, _, files in os.walk(month_folder): | |
for file in files: | |
if file.lower().endswith('.pdf'): | |
file_path = os.path.join(root, file) | |
# Get file's modification time | |
mod_time = datetime.fromtimestamp(os.path.getmtime(file_path)) | |
if mod_time.strftime("%Y-%m-%d") == target_date_str: | |
pdf_files.append(file_path) | |
return pdf_files | |
def extract_text_from_pdf(pdf_path: str) -> str: | |
"""Extract text content from a PDF file.""" | |
try: | |
with open(pdf_path, 'rb') as file: | |
reader = PyPDF2.PdfReader(file) | |
text = "" | |
for page in reader.pages: | |
text += page.extract_text() + "\n" | |
return text | |
except Exception as e: | |
print(f"Error processing {pdf_path}: {str(e)}") | |
return "" | |
def summarize_pdfs_for_date(target_date: datetime = None): | |
""" | |
Main function to process and summarize PDFs for a specific date. | |
Args: | |
target_date: datetime object for the target date. If None, uses today's date. | |
Prints summary to console and saves to a JSON file. | |
""" | |
if target_date is None: | |
target_date = datetime.now() | |
pdfs = get_pdfs_for_date(target_date) | |
if not pdfs: | |
print(f"No PDFs found for {target_date.strftime('%Y-%m-%d')}") | |
return | |
summaries = {} | |
for pdf_path in pdfs: | |
print(f"Processing: {pdf_path}") | |
text = extract_text_from_pdf(pdf_path) | |
# Basic summary: first 500 characters of text | |
summary = text[:500] + "..." if len(text) > 500 else text | |
# Store in dictionary with filename as key | |
filename = os.path.basename(pdf_path) | |
summaries[filename] = { | |
"path": pdf_path, | |
"summary": summary, | |
"processed_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
} | |
# Save summaries to JSON file in the current directory | |
output_dir = "summaries" | |
os.makedirs(output_dir, exist_ok=True) | |
output_file = os.path.join(output_dir, f"summaries_{target_date.strftime('%Y-%m-%d')}.json") | |
with open(output_file, 'w', encoding='utf-8') as f: | |
json.dump(summaries, f, indent=4, ensure_ascii=False) | |
print(f"\nProcessed {len(pdfs)} PDFs") | |
print(f"Summaries saved to: {output_file}") | |
# Print summaries to console | |
for filename, data in summaries.items(): | |
print(f"\n{'='*80}\n{filename}") | |
print(f"Path: {data['path']}") | |
print(f"\nSummary:\n{data['summary'][:200]}...") | |
class WebSearchTool: | |
""" | |
A tool for performing web searches using the Perplexity API. | |
Designed to be the default web search mechanism for context-requiring queries. | |
""" | |
def __init__(self, api_key: Optional[str] = None): | |
""" | |
Initialize the WebSearchTool. | |
Args: | |
api_key: Perplexity API key. If None, will try to get from environment variable. | |
""" | |
self.api_key = api_key or os.getenv("PERPLEXITY_API_KEY") | |
if not self.api_key: | |
raise ValueError("Perplexity API key must be provided or set in PERPLEXITY_API_KEY environment variable") | |
self.headers = { | |
"Authorization": f"Bearer {self.api_key}", | |
"Content-Type": "application/json" | |
} | |
def search(self, query: str, max_results: int = 5) -> Dict: | |
""" | |
Perform a web search using Perplexity API. | |
Args: | |
query: The search query | |
max_results: Maximum number of results to return | |
Returns: | |
Dictionary containing search results and metadata | |
""" | |
try: | |
# Make the API request | |
response = requests.post( | |
"https://api.perplexity.ai/chat/completions", | |
headers=self.headers, | |
json={ | |
"model": "llama-3.1-sonar-huge-128k-online", | |
"messages": [ | |
{ | |
"role": "system", | |
"content": "You are a helpful assistant that provides accurate and concise answers based on web search results." | |
}, | |
{ | |
"role": "user", | |
"content": query | |
} | |
] | |
} | |
) | |
response.raise_for_status() | |
data = response.json() | |
# Extract answer from the response | |
answer = data.get("choices", [{}])[0].get("message", {}).get("content", "No answer found") | |
# Process and format the results | |
results = { | |
"query": query, | |
"timestamp": datetime.now().isoformat(), | |
"answer": answer, | |
"references": [], # References not available in this API version | |
"metadata": { | |
"source": "Perplexity API", | |
"model": "llama-3.1-sonar-huge-128k-online" | |
} | |
} | |
return results | |
except Exception as e: | |
print(f"Error performing search: {str(e)}") | |
return { | |
"query": query, | |
"timestamp": datetime.now().isoformat(), | |
"error": str(e), | |
"metadata": { | |
"source": "Perplexity API", | |
"status": "error" | |
} | |
} | |
def format_results(self, results: Dict, format: str = "text") -> str: | |
""" | |
Format search results in the specified format. | |
Args: | |
results: Search results dictionary | |
format: Output format ("text" or "markdown") | |
Returns: | |
Formatted string of results | |
""" | |
if "error" in results: | |
return f"Error: {results['error']}" | |
if format == "markdown": | |
output = f"# Search Results for: {results['query']}\n\n" | |
output += f"## Answer\n{results['answer']}\n\n" | |
if results['references']: | |
output += "## References\n" | |
for i, ref in enumerate(results['references'], 1): | |
output += f"{i}. {ref['title']} - {ref['url']}\n" | |
return output | |
else: | |
output = f"Search Results for: {results['query']}\n\n" | |
output += f"Answer:\n{results['answer']}\n\n" | |
if results['references']: | |
output += "References:\n" | |
for i, ref in enumerate(results['references'], 1): | |
output += f"{i}. {ref['title']} - {ref['url']}\n" | |
return output | |
class MCPServerManager: | |
""" | |
A tool for installing and managing Model Context Protocol (MCP) servers. | |
Integrates with Claude desktop and manages server configurations. | |
""" | |
DEFAULT_CONFIG_LOCATIONS = [ | |
"mcp.json", | |
".mcp/config.json", | |
"config/mcp.json", | |
"mcp_config.json", | |
".config/mcp/servers.json" | |
] | |
def __init__(self, base_dir: Optional[str] = None): | |
""" | |
Initialize the MCP Server Manager. | |
Args: | |
base_dir: Base directory for installing servers. If None, uses current directory. | |
""" | |
self.base_dir = Path(base_dir) if base_dir else Path.cwd() / "mcp_servers" | |
self.base_dir.mkdir(parents=True, exist_ok=True) | |
self.servers_repo_url = "https://github.com/modelcontextprotocol/servers.git" | |
self.installed_servers = {} | |
self.load_installed_servers() | |
def load_installed_servers(self): | |
"""Load information about installed servers from the config file.""" | |
config_file = self.base_dir / "config.json" | |
if config_file.exists(): | |
with open(config_file, "r") as f: | |
self.installed_servers = json.load(f) | |
def save_installed_servers(self): | |
"""Save information about installed servers to the config file.""" | |
config_file = self.base_dir / "config.json" | |
with open(config_file, "w") as f: | |
json.dump(self.installed_servers, f, indent=4) | |
def get_featured_servers(self) -> List[Dict]: | |
""" | |
Get list of featured servers from the MCP GitHub repository. | |
Returns: | |
List of server information dictionaries | |
""" | |
try: | |
# Clone or update the servers repository | |
repo_dir = self.base_dir / "servers_repo" | |
if repo_dir.exists(): | |
repo = Repo(repo_dir) | |
repo.remotes.origin.pull() | |
else: | |
repo = Repo.clone_from(self.servers_repo_url, repo_dir) | |
# First try to read from featured.json | |
featured_file = repo_dir / "featured.json" | |
if featured_file.exists(): | |
with open(featured_file, "r", encoding="utf-8") as f: | |
return json.load(f) | |
# If featured.json doesn't exist, parse README.md | |
readme_file = repo_dir / "README.md" | |
if readme_file.exists(): | |
servers = [] | |
with open(readme_file, "r", encoding="utf-8", errors="ignore") as f: | |
content = f.read() | |
# Look for server repository links | |
repo_links = re.findall(r"\[([^\]]+)\]\((https://github.com/[^)]+)\)", content) | |
for name, url in repo_links: | |
if "/modelcontextprotocol/" in url: | |
servers.append({ | |
"name": name, | |
"repository": url, | |
"config": {} | |
}) | |
return servers | |
return [] | |
except Exception as e: | |
print(f"Error getting featured servers: {str(e)}") | |
return [] | |
def find_server_config(self, server_dir: Path) -> Optional[Dict]: | |
""" | |
Search for MCP server configuration in common locations. | |
Args: | |
server_dir: Directory to search in | |
Returns: | |
Server configuration dictionary if found, None otherwise | |
""" | |
# First check common config file locations | |
for config_path in self.DEFAULT_CONFIG_LOCATIONS: | |
config_file = server_dir / config_path | |
if config_file.exists(): | |
try: | |
with open(config_file, "r", encoding="utf-8") as f: | |
config = json.load(f) | |
if "mcpServers" in config: | |
return config["mcpServers"] | |
except Exception as e: | |
print(f"Error reading config from {config_file}: {str(e)}") | |
# Check package.json for Node.js projects | |
package_json = server_dir / "package.json" | |
if package_json.exists(): | |
try: | |
with open(package_json, "r", encoding="utf-8") as f: | |
config = json.load(f) | |
if "mcpServers" in config: | |
return config["mcpServers"] | |
except Exception as e: | |
print(f"Error reading config from package.json: {str(e)}") | |
# Check pyproject.toml for Python projects | |
pyproject_toml = server_dir / "pyproject.toml" | |
if pyproject_toml.exists(): | |
try: | |
import tomli | |
with open(pyproject_toml, "rb") as f: | |
config = tomli.load(f) | |
if "tool" in config and "mcp" in config["tool"]: | |
return {"python": config["tool"]["mcp"]} | |
except ImportError: | |
print("tomli package not found, skipping pyproject.toml parsing") | |
except Exception as e: | |
print(f"Error reading config from pyproject.toml: {str(e)}") | |
return None | |
def install_server(self, server_name: str, custom_config: Optional[Dict] = None) -> bool: | |
""" | |
Install a specific MCP server. | |
Args: | |
server_name: Name of the server to install | |
custom_config: Optional custom configuration for the server | |
Returns: | |
True if installation was successful, False otherwise | |
""" | |
try: | |
# Get server information from featured servers | |
featured_servers = self.get_featured_servers() | |
server_info = next((s for s in featured_servers if s["name"] == server_name), None) | |
if not server_info: | |
print(f"Server '{server_name}' not found in featured servers") | |
return False | |
# Create server directory | |
server_dir = self.base_dir / server_name | |
server_dir.mkdir(exist_ok=True) | |
# Clone server repository | |
repo = Repo.clone_from(server_info["repository"], server_dir) | |
# Install dependencies | |
if (server_dir / "requirements.txt").exists(): | |
subprocess.run([sys.executable, "-m", "pip", "install", "-r", "requirements.txt"], cwd=server_dir) | |
elif (server_dir / "package.json").exists(): | |
subprocess.run(["npm", "install"], cwd=server_dir) | |
# Find or use custom server configuration | |
config = custom_config or self.find_server_config(server_dir) or {} | |
# Store server information | |
self.installed_servers[server_name] = { | |
"path": str(server_dir), | |
"version": repo.head.commit.hexsha[:8], | |
"install_date": datetime.now().isoformat(), | |
"config": config | |
} | |
self.save_installed_servers() | |
print(f"Successfully installed {server_name}") | |
if config: | |
print(f"Found server configuration: {json.dumps(config, indent=2)}") | |
else: | |
print("No server configuration found. You may need to configure it manually.") | |
return True | |
except Exception as e: | |
print(f"Error installing server '{server_name}': {str(e)}") | |
return False | |
def uninstall_server(self, server_name: str) -> bool: | |
""" | |
Uninstall a specific MCP server. | |
Args: | |
server_name: Name of the server to uninstall | |
Returns: | |
True if uninstallation was successful, False otherwise | |
""" | |
try: | |
if server_name not in self.installed_servers: | |
print(f"Server '{server_name}' is not installed") | |
return False | |
# Remove server directory | |
server_dir = Path(self.installed_servers[server_name]["path"]) | |
shutil.rmtree(server_dir) | |
# Remove from installed servers | |
del self.installed_servers[server_name] | |
self.save_installed_servers() | |
print(f"Successfully uninstalled {server_name}") | |
return True | |
except Exception as e: | |
print(f"Error uninstalling server '{server_name}': {str(e)}") | |
return False | |
def list_installed_servers(self) -> Dict[str, Dict]: | |
""" | |
Get information about installed servers. | |
Returns: | |
Dictionary of installed server information | |
""" | |
return self.installed_servers | |
def update_server(self, server_name: str) -> bool: | |
""" | |
Update a specific MCP server to the latest version. | |
Args: | |
server_name: Name of the server to update | |
Returns: | |
True if update was successful, False otherwise | |
""" | |
try: | |
if server_name not in self.installed_servers: | |
print(f"Server '{server_name}' is not installed") | |
return False | |
server_dir = Path(self.installed_servers[server_name]["path"]) | |
repo = Repo(server_dir) | |
# Get current version | |
old_version = repo.head.commit.hexsha[:8] | |
# Pull latest changes | |
repo.remotes.origin.pull() | |
# Get new version | |
new_version = repo.head.commit.hexsha[:8] | |
# Update dependencies if needed | |
if (server_dir / "requirements.txt").exists(): | |
subprocess.run([sys.executable, "-m", "pip", "install", "-r", "requirements.txt"], cwd=server_dir) | |
# Update stored information | |
self.installed_servers[server_name]["version"] = new_version | |
self.save_installed_servers() | |
print(f"Updated {server_name} from {old_version} to {new_version}") | |
return True | |
except Exception as e: | |
print(f"Error updating server '{server_name}': {str(e)}") | |
return False | |
def configure_server(self, server_name: str, config: Dict) -> bool: | |
""" | |
Configure a specific MCP server. | |
Args: | |
server_name: Name of the server to configure | |
config: Configuration dictionary in the format: | |
{ | |
"command": str, # Command to run the server | |
"args": List[str], # Arguments for the command | |
"env": Dict[str, str], # Optional environment variables | |
"cwd": str, # Optional working directory | |
} | |
Returns: | |
True if configuration was successful, False otherwise | |
""" | |
try: | |
if server_name not in self.installed_servers: | |
print(f"Server '{server_name}' is not installed") | |
return False | |
server_dir = Path(self.installed_servers[server_name]["path"]) | |
# Validate configuration | |
if "command" not in config: | |
print("Error: Server configuration must include 'command'") | |
return False | |
# Update configuration | |
self.installed_servers[server_name]["config"] = config | |
self.save_installed_servers() | |
# Try to write configuration to a standard location | |
config_dir = server_dir / ".mcp" | |
config_dir.mkdir(exist_ok=True) | |
config_file = config_dir / "config.json" | |
with open(config_file, "w", encoding="utf-8") as f: | |
json.dump({"mcpServers": {server_name: config}}, f, indent=2) | |
print(f"Successfully configured {server_name}") | |
print(f"Configuration saved to {config_file}") | |
return True | |
except Exception as e: | |
print(f"Error configuring server '{server_name}': {str(e)}") | |
return False | |
def start_server(self, server_name: str) -> bool: | |
""" | |
Start a specific MCP server. | |
Args: | |
server_name: Name of the server to start | |
Returns: | |
True if server was started successfully, False otherwise | |
""" | |
try: | |
if server_name not in self.installed_servers: | |
print(f"Server '{server_name}' is not installed") | |
return False | |
server_info = self.installed_servers[server_name] | |
config = server_info.get("config", {}) | |
if not config: | |
print(f"Server '{server_name}' is not configured") | |
return False | |
# Prepare command and arguments | |
command = config.get("command") | |
args = config.get("args", []) | |
env = {**os.environ, **(config.get("env", {}))} | |
cwd = config.get("cwd") or server_info["path"] | |
# Start the server process | |
process = subprocess.Popen( | |
[command, *args], | |
env=env, | |
cwd=cwd, | |
stdout=subprocess.PIPE, | |
stderr=subprocess.PIPE, | |
text=True | |
) | |
# Store process information | |
self.installed_servers[server_name]["process"] = process | |
self.save_installed_servers() | |
print(f"Started server '{server_name}' (PID: {process.pid})") | |
return True | |
except Exception as e: | |
print(f"Error starting server '{server_name}': {str(e)}") | |
return False | |
def stop_server(self, server_name: str) -> bool: | |
""" | |
Stop a specific MCP server. | |
Args: | |
server_name: Name of the server to stop | |
Returns: | |
True if server was stopped successfully, False otherwise | |
""" | |
try: | |
if server_name not in self.installed_servers: | |
print(f"Server '{server_name}' is not installed") | |
return False | |
server_info = self.installed_servers[server_name] | |
process = server_info.get("process") | |
if not process: | |
print(f"Server '{server_name}' is not running") | |
return False | |
# Try to stop the process gracefully | |
process.terminate() | |
try: | |
process.wait(timeout=5) | |
except subprocess.TimeoutExpired: | |
process.kill() | |
# Remove process information | |
del self.installed_servers[server_name]["process"] | |
self.save_installed_servers() | |
print(f"Stopped server '{server_name}'") | |
return True | |
except Exception as e: | |
print(f"Error stopping server '{server_name}': {str(e)}") | |
return False | |
async def generate_pyflowchart(repo_path: str, output_dir: Optional[str] = None) -> None: | |
""" | |
Generate flowcharts for all Python files in a repository using pyflowchart. | |
Creates HTML flowcharts that can be viewed in a browser. | |
Args: | |
repo_path: Path to the repository | |
output_dir: Optional directory to save flowcharts. If None, creates a 'flowcharts' directory in the repo. | |
""" | |
try: | |
# Ensure pyflowchart is installed | |
subprocess.run([sys.executable, "-m", "pip", "install", "pyflowchart"], check=True) | |
# Set up output directory | |
if output_dir is None: | |
output_dir = os.path.join(repo_path, 'flowcharts') | |
os.makedirs(output_dir, exist_ok=True) | |
# Generate HTML flowcharts | |
for root, _, files in os.walk(repo_path): | |
for file in files: | |
if file.endswith('.py'): | |
py_file = os.path.join(root, file) | |
# Skip empty files | |
if os.path.getsize(py_file) == 0: | |
print(f"Skipping empty file: {py_file}") | |
continue | |
# Check if file has actual Python code | |
with open(py_file, 'r', encoding='utf-8') as f: | |
content = f.read().strip() | |
if not content: | |
print(f"Skipping empty file: {py_file}") | |
continue | |
html_file = os.path.join(output_dir, f"{os.path.splitext(file)[0]}_flowchart.html") | |
# Generate flowchart HTML | |
print(f"Generating flowchart for {py_file}") | |
try: | |
subprocess.run([ | |
sys.executable, "-m", "pyflowchart", py_file, | |
"--output", html_file | |
], check=True) | |
print(f"Saved HTML to {html_file}") | |
except subprocess.CalledProcessError as e: | |
print(f"Error generating flowchart for {py_file}: {str(e)}") | |
continue | |
print(f"\nFlowcharts generated in: {output_dir}") | |
except subprocess.CalledProcessError as e: | |
print(f"Error running pyflowchart: {str(e)}") | |
except Exception as e: | |
print(f"Error generating flowcharts: {str(e)}") | |
def extract_repo_context(repo_url: str, output_file: Optional[str] = None) -> None: | |
""" | |
Extract repository context using gitingest.com. | |
Args: | |
repo_url: GitHub repository URL | |
output_file: Optional file to save the context. If None, uses repo name with .txt extension. | |
""" | |
try: | |
# Convert github.com URL to gitingest.com | |
if "github.com" not in repo_url: | |
raise ValueError("Only GitHub repositories are supported") | |
ingest_url = repo_url.replace("github.com", "gitingest.com") | |
print(f"Fetching repository context from: {ingest_url}") | |
# Make request to gitingest.com | |
response = requests.get(ingest_url) | |
response.raise_for_status() | |
# Extract content | |
content = response.text | |
# Save to file | |
if output_file is None: | |
repo_name = repo_url.split('/')[-1].replace('.git', '') | |
output_file = f"{repo_name}_context.txt" | |
with open(output_file, 'w', encoding='utf-8') as f: | |
f.write(content) | |
print(f"Repository context saved to: {output_file}") | |
except requests.RequestException as e: | |
print(f"Error fetching repository context: {str(e)}") | |
except Exception as e: | |
print(f"Error extracting repository context: {str(e)}") | |
async def post_clone_actions(repo_url: str) -> None: | |
""" | |
Perform post-clone actions after a git clone operation: | |
1. Generate flowcharts for all Python files using pyflowchart | |
2. Extract repository context using gitingest.com | |
Args: | |
repo_url: URL of the repository that was just cloned | |
""" | |
try: | |
# Get the repository name from the URL | |
repo_name = repo_url.split('/')[-1].replace('.git', '') | |
repo_path = os.path.join(os.getcwd(), repo_name) | |
# Generate flowcharts | |
print("\nGenerating flowcharts...") | |
await generate_pyflowchart(repo_path) | |
# Extract repository context | |
print("\nExtracting repository context...") | |
extract_repo_context(repo_url) | |
except Exception as e: | |
print(f"Error in post-clone actions: {str(e)}") | |
# Example usage: | |
if __name__ == "__main__": | |
# Initialize the search tool | |
search_tool = WebSearchTool() | |
# Perform a simple search | |
results = search_tool.search("Latest developments in AI technology") | |
# Print formatted results | |
print(search_tool.format_results(results, "markdown")) | |