Spaces:
Runtime error
Runtime error
import sys | |
import signal | |
import shutil | |
import logging | |
import time | |
import os | |
from datetime import datetime | |
from typing import List, Dict, Any | |
import requests | |
import gradio as gr | |
import atexit | |
import subprocess | |
from urllib.parse import urlparse, quote | |
import webbrowser | |
import spaces | |
device = "cuda" | |
def stream_chat( | |
message: str, | |
history: list, | |
system_prompt: str, | |
temperature: float = 0.5, | |
max_new_tokens: int = 16000, | |
top_p: float = 1.0, | |
top_k: int = 15, | |
penalty: float = 0.9, | |
): | |
print(f'message: {message}') | |
print(f'history: {history}') | |
conversation = [ | |
{"role": "system", "content": system_prompt} | |
] | |
for prompt, answer in history: | |
conversation.extend([ | |
{"role": "user", "content": prompt}, | |
{"role": "assistant", "content": answer}, | |
]) | |
conversation.append({"role": "user", "content": message}) | |
input_ids = tokenizer.apply_chat_template(conversation, add_generation_prompt=True, return_tensors="pt").to(model.device) | |
streamer = TextIteratorStreamer(tokenizer, timeout=60.0, skip_prompt=True, skip_special_tokens=True) | |
generate_kwargs = dict( | |
input_ids=input_ids, | |
max_new_tokens = max_new_tokens, | |
do_sample = False if temperature == 0 else True, | |
top_p = top_p, | |
top_k = top_k, | |
temperature = temperature, | |
eos_token_id=[128001,128008,128009], | |
streamer=streamer, | |
) | |
# Constants | |
INPUT_DIRECTORY = 'input' | |
OUTPUT_DIRECTORY = 'output' | |
LOGS_DIRECTORY = 'logs' | |
RESOLUTIONS_DIRECTORY = 'resolutions' | |
REPOS_DIRECTORY = 'repos' | |
# Set up logging | |
def initialize_logger() -> logging.Logger: | |
log_file = f"{LOGS_DIRECTORY}/github_bot_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log" | |
logging.basicConfig( | |
level=logging.INFO, | |
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', | |
handlers=[ | |
logging.FileHandler(log_file), | |
logging.StreamHandler() | |
] | |
) | |
return logging.getLogger(__name__) | |
# Initialize environment and logger | |
def initialize_environment(): | |
directories = [LOGS_DIRECTORY, RESOLUTIONS_DIRECTORY, REPOS_DIRECTORY, INPUT_DIRECTORY, OUTPUT_DIRECTORY] | |
for directory in directories: | |
os.makedirs(directory, exist_ok=True) | |
initialize_environment() | |
logger = initialize_logger() # Initialize logger globally | |
# GitHub API handler | |
class GitHubAPI: | |
def __init__(self, token: str): | |
self.token = token | |
self.headers = { | |
'Authorization': f'token {token}', | |
'Accept': 'application/vnd.github.v3+json' | |
} | |
self.base_url = "https://api.github.com" | |
def _check_rate_limit(self) -> bool: | |
try: | |
response = requests.get(f"{self.base_url}/rate_limit", headers=self.headers) | |
response.raise_for_status() | |
limits = response.json() | |
remaining = limits['resources']['core']['remaining'] | |
reset_time = limits['resources']['core']['reset'] | |
if remaining < 10: | |
wait_time = max(0, reset_time - int(time.time())) | |
if wait_time > 0: | |
logger.warning(f"Rate limit nearly exceeded. Waiting {wait_time} seconds before retrying...") | |
time.sleep(wait_time) | |
return False | |
return True | |
except requests.exceptions.RequestException as e: | |
logger.error(f"Error checking rate limit: {str(e)}. Retrying...") | |
return True | |
def get_repository(self, owner: str, repo: str) -> Dict: | |
try: | |
response = requests.get(f"{self.base_url}/repos/{owner}/{repo}", headers=self.headers) | |
response.raise_for_status() | |
return response.json() | |
except requests.HTTPError as e: | |
logger.error(f"HTTP error getting repository info for {owner}/{repo}: {str(e)}. Please check the repository details.") | |
raise | |
except Exception as e: | |
logger.error(f"Error getting repository info: {str(e)}") | |
raise | |
def get_issues(self, owner: str, repo: str, state: str = 'open') -> List[Dict]: | |
if not self._check_rate_limit(): | |
return [] | |
try: | |
response = requests.get(f"{self.base_url}/repos/{owner}/{repo}/issues", headers=self.headers, params={'state': state}) | |
response.raise_for_status() | |
issues = response.json() | |
return [issue for issue in issues if 'pull_request' not in issue] | |
except Exception as e: | |
logger.error(f"Error fetching issues for repository {owner}/{repo}: {str(e)}. Please verify the repository and token.") | |
return [] | |
# GitHub Bot | |
class GitHubBot: | |
def __init__(self): | |
self.github_api = None | |
def initialize_api(self, token: str): | |
self.github_api = GitHubAPI(token) | |
def fetch_issues(self, token: str, owner: str, repo: str) -> List[Dict]: | |
try: | |
self.initialize_api(token) | |
return self.github_api.get_issues(owner, repo) | |
except Exception as e: | |
logger.error(f"Error fetching issues for repository {owner}/{repo}: {str(e)}") | |
return [] | |
def resolve_issue(self, token: str, owner: str, repo: str, issue_number: int, resolution: str, forked_repo: str) -> str: | |
try: | |
self.initialize_api(token) | |
self.github_api.get_repository(owner, repo) | |
# Create resolution file | |
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
resolution_file = f"{RESOLUTIONS_DIRECTORY}/resolution_{issue_number}_{timestamp}.md" | |
with open(resolution_file, "w") as f: | |
f.write(f"# Resolution for Issue #{issue_number}\n\n{resolution}") | |
# Clone the forked repo | |
subprocess.run(['git', 'clone', forked_repo, '/tmp/' + forked_repo.split('/')[-1]], check=True) | |
# Change to the cloned directory | |
os.chdir('/tmp/' + forked_repo.split('/')[-1]) | |
# Assuming manual intervention now | |
input("Apply the fix manually and stage the changes (press ENTER)? ") | |
# Commit and push the modifications | |
subprocess.run(['git', 'add', '.'], check=True) | |
subprocess.run(['git', 'commit', '-m', f"Resolved issue #{issue_number} ({quote(resolution)})"], check=True) | |
subprocess.run(['git', 'push', 'origin', 'HEAD'], check=True) | |
# Open Pull Request page | |
webbrowser.open(f'https://github.com/{forked_repo.split("/")[-1]}/compare/master...{owner}:{forked_repo.split("/")[-1]}_resolved_issue_{issue_number}') | |
return f"Resolution saved: {resolution_file}" | |
except Exception as e: | |
error_msg = f"Error resolving issue #{issue_number} in repository {owner}/{repo}: {str(e)}" | |
logger.error(error_msg) | |
return error_msg | |
def handle_issue_selection(token, owner, repo, issue_number, resolution, forked_repo): | |
bot = GitHubBot() | |
result = bot.resolve_issue(token, owner, repo, issue_number, resolution, forked_repo) | |
return result | |
def extract_info_from_url(url: str) -> Dict[str, Any]: | |
info = {} | |
try: | |
response = requests.get(url) | |
response.raise_for_status() | |
info['status_code'] = response.status_code | |
info['headers'] = dict(response.headers) | |
info['content'] = response.text[:500] # Limit content to first 500 characters for brevity | |
parsed_url = urlparse(url) | |
if 'github.com' in parsed_url.netloc: | |
parts = parsed_url.path.split('/') | |
if len(parts) > 2: | |
owner = parts[1] | |
repo = parts[2] | |
issues = bot.fetch_issues(github_token, owner, repo) | |
info['issues'] = issues | |
elif 'huggingface.co' in parsed_url.netloc: | |
# Add Hugging Face specific handling if needed | |
pass | |
except requests.HTTPError as e: | |
info['error'] = f"HTTP error: {str(e)}" | |
except Exception as e: | |
info['error'] = f"Error: {str(e)}" | |
return info | |
# Initialize GitHubBot globally | |
bot = GitHubBot() | |
# Define missing functions with validation | |
def fetch_issues(token, repo_url): | |
try: | |
parts = repo_url.split('/') | |
if len(parts) < 2: | |
raise ValueError("Repository URL is not in the correct format. Expected format: 'owner/repo'.") | |
owner, repo = parts[-2], parts[-1] | |
issues = bot.fetch_issues(token, owner, repo) | |
return issues | |
except Exception as e: | |
return str(e) | |
def resolve_issue(token, repo_url, issue_number, resolution, forked_repo_url): | |
try: | |
parts = repo_url.split('/') | |
if len(parts) < 2: | |
raise ValueError("Repository URL is not in the correct format. Expected format: 'owner/repo'.") | |
owner, repo = parts[-2], parts[-1] | |
result = bot.resolve_issue(token, owner, repo, issue_number, resolution, forked_repo_url) | |
return result | |
except Exception as e: | |
return str(e) | |
def extract_info(url): | |
try: | |
info = extract_info_from_url(url) | |
return info | |
except Exception as e: | |
return str(e) | |
# HTML and CSS integration | |
custom_html = """ | |
<!DOCTYPE html> | |
<html> | |
<head> | |
<title>GitHub Issue Manager</title> | |
<meta charset="UTF-8"> | |
<meta name="viewport" content="width=device-width, initial-scale=1.0"> | |
<link href="https://cdn.jsdelivr.net/npm/tailwindcss@2.2.19/dist/tailwind.min.css" rel="stylesheet"> | |
<link href="https://cdn.jsdelivr.net/npm/daisyui@3.1.0/dist/full.css" rel="stylesheet" type="text/css" /> | |
<style> | |
body { | |
background: linear-gradient(to bottom right, #121212, #303030) !important; | |
color: #e0e0e0; | |
font-family: 'Roboto', sans-serif; | |
overflow-x: hidden; /* Prevent horizontal scrollbar */ | |
} | |
.card { | |
border-radius: 1.5rem; | |
box-shadow: 0 15px 25px rgba(0, 0, 0, 0.2); | |
margin-bottom: 20px; | |
} | |
.input, .select, .textarea { | |
border: 2px solid #4a4a4a; | |
border-radius: 0.75rem; | |
padding-inline: 1.5rem; | |
padding-block: 0.75rem; | |
} | |
h1, h2, h3 { | |
color: #e0e0e0; | |
} | |
.output-area { | |
padding: 1.5rem; | |
border-radius: 0.75rem; | |
} | |
.btn { | |
border-radius: 0.75rem; | |
padding-block: 0.75rem; | |
padding-inline: 1.5rem; | |
} | |
.btn-primary { | |
background-color: #7928CA; | |
color: white; | |
border: 2px solid #7928CA; | |
} | |
.btn-primary:hover { | |
background-color: #5e22a1; | |
} | |
.btn-success { | |
background-color: #22c55e; | |
color: white; | |
border: 2px solid #22c55e; | |
} | |
.btn-success:hover { | |
background-color: #1a9a46; | |
} | |
</style> | |
</head> | |
<body class="bg-gray-900"> | |
<div class="container mx-auto p-4"> | |
<h1 class="text-4xl md:text-5xl font-extrabold text-center mb-8">GitHub Issue Manager</h1> | |
<!-- GitHub Token & Repo URL --> | |
<div class="card bg-gray-800 p-8"> | |
<div class="form-control"> | |
<label class="label"> | |
<span class="label-text">GitHub Token</span> | |
</label> | |
<input type="password" placeholder="Enter your GitHub Personal Access Token" class="input input-bordered input-primary" id="github-token"> | |
</div> | |
<div class="form-control"> | |
<label class="label"> | |
<span class="label-text">Repository URL</span> | |
</label> | |
<input type="text" placeholder="Enter the full GitHub repository URL" class="input input-bordered input-primary" id="repo-url"> | |
</div> | |
</div> | |
<!-- Fetch & Resolve Section --> | |
<div class="card bg-gray-800 p-8"> | |
<div class="flex justify-between gap-4"> | |
<button class="btn btn-primary" id="fetch-issues">Fetch Issues</button> | |
<select class="select select-primary w-full max-w-xs" id="issue-dropdown"> | |
<option disabled selected>Select Issue</option> | |
</select> | |
</div> | |
<div class="form-control mt-4"> | |
<label class="label"> | |
<span class="label-text">Resolution</span> | |
</label> | |
<textarea class="textarea textarea-primary" placeholder="Enter the resolution details" id="resolution-textarea"></textarea> | |
</div> | |
<div class="form-control mt-4"> | |
<label class="label"> | |
<span class="label-text">Forked Repository URL</span> | |
</label> | |
<input type="text" placeholder="URL to your forked repository (Optional)" class="input input-bordered input-primary" id="forked-repo-url"> | |
</div> | |
<div class="form-control mt-4"> | |
<button class="btn btn-success" id="resolve-issue">Resolve Issue</button> | |
</div> | |
</div> | |
<!-- Output Area --> | |
<div class="card bg-gray-800 p-8 mt-4"> | |
<label class="label"> | |
<span class="label-text">Output</span> | |
</label> | |
<textarea class="textarea textarea-primary h-48" id="output-textarea" readonly></textarea> | |
</div> | |
<!-- URL to Extract --> | |
<div class="card bg-gray-800 p-8 mt-4"> | |
<div class="form-control"> | |
<label class="label"> | |
<span class="label-text">URL</span> | |
</label> | |
<input type="text" placeholder="Enter a URL to extract information" class="input input-bordered input-primary" id="url-textbox"> | |
</div> | |
<div class="form-control"> | |
<button class="btn btn-primary" id="extract-info">Extract Info</button> | |
</div> | |
</div> | |
</div> | |
<script> | |
const githubTokenInput = document.getElementById(' github-token'); | |
const repoUrlInput = document.getElementById('repo-url'); | |
const fetchIssuesButton = document.getElementById('fetch-issues'); | |
const issueDropdown = document.getElementById('issue-dropdown'); | |
const resolutionTextarea = document.getElementById('resolution-textarea'); | |
const forkedRepoUrlInput = document.getElementById('forked-repo-url'); | |
const resolveIssueButton = document.getElementById('resolve-issue'); | |
const outputTextarea = document.getElementById('output-textarea'); | |
const urlTextbox = document.getElementById('url-textbox'); | |
const extractInfoButton = document.getElementById('extract-info'); | |
fetchIssuesButton.addEventListener('click', async () => { | |
const token = githubTokenInput.value; | |
const repoUrl = repoUrlInput.value; | |
if (!token || !repoUrl) { | |
outputTextarea.value = "Please provide both GitHub Token and Repository URL."; | |
return; | |
} | |
try { | |
const response = await fetch('/fetch-issues', { | |
method: 'POST', | |
headers: { | |
'Content-Type': 'application/json', | |
}, | |
body: JSON.stringify({ githubToken: token, repoUrl: repoUrl }), | |
}); | |
if (!response.ok) { | |
throw new Error(`HTTP error! status: ${response.status}`); | |
} | |
const data = await response.json(); | |
if (data.error) { | |
outputTextarea.value = data.error; | |
} else { | |
issueDropdown.innerHTML = ''; | |
data.issues.forEach(issue => { | |
const option = document.createElement('option'); | |
option.value = issue.number; | |
option.text = `${issue.number}: ${issue.title}`; | |
issueDropdown.add(option); | |
}); | |
} | |
} catch (error) { | |
outputTextarea.value = `Error fetching issues: ${error.message}`; | |
} | |
}); | |
resolveIssueButton.addEventListener('click', async () => { | |
const token = githubTokenInput.value; | |
const repoUrl = repoUrlInput.value; | |
const issueNumber = issueDropdown.value; | |
const resolution = resolutionTextarea.value; | |
const forkedRepoUrl = forkedRepoUrlInput.value; | |
if (!token || !repoUrl || !issueNumber || !resolution) { | |
outputTextarea.value = "Please provide all required fields."; | |
return; | |
} | |
try { | |
const response = await fetch('/resolve-issue', { | |
method: 'POST', | |
headers: { | |
'Content-Type': 'application/json', | |
}, | |
body: JSON.stringify({ githubToken: token, repoUrl: repoUrl, issueNumber: issueNumber, resolution: resolution, forkedRepoUrl: forkedRepoUrl }), | |
}); | |
if (!response.ok) { | |
throw new Error(`HTTP error! status: ${response.status}`); | |
} | |
const data = await response.json(); | |
if (data.error) { | |
outputTextarea.value = data.error; | |
} else { | |
outputTextarea.value = data.output; | |
} | |
} catch (error) { | |
outputTextarea.value = `Error resolving issue: ${error.message}`; | |
} | |
}); | |
extractInfoButton.addEventListener('click', async () => { | |
const url = urlTextbox.value; | |
if (!url) { | |
outputTextarea.value = "Please provide a URL."; | |
return; | |
} | |
try { | |
const response = await fetch('/extract-info', { | |
method: 'POST', | |
headers: { | |
'Content-Type': 'application/json', | |
}, | |
body: JSON.stringify({ url: url }), | |
}); | |
if (!response.ok) { | |
throw new Error(`HTTP error! status: ${response.status}`); | |
} | |
const data = await response.json(); | |
if (data.error) { | |
outputTextarea.value = data.error; | |
} else { | |
outputTextarea.value = JSON.stringify(data, null, 2); | |
} | |
} catch (error) { | |
outputTextarea.value = `Error extracting info: ${error.message}`; | |
} | |
}); | |
</script> | |
</body> | |
</html> | |
""" | |
def create_gradio_interface(): | |
with gr.Blocks(css=None, theme=None) as demo: | |
gr.HTML(custom_html) | |
return demo | |
# Cleanup function | |
def cleanup(): | |
try: | |
temp_dirs = [REPOS_DIRECTORY] | |
for dir_name in temp_dirs: | |
if os.path.exists(dir_name): | |
shutil.rmtree(dir_name) | |
logging.shutdown() | |
except Exception as e: | |
print(f"Error during cleanup: {str(e)}") | |
# Signal handler | |
def signal_handler(signum, frame): | |
logger.info(f"Received signal {signum}") | |
cleanup() | |
sys.exit(0) | |
if __name__ == "__main__": | |
# Register cleanup handlers | |
atexit.register(cleanup) | |
signal.signal(signal.SIGINT, signal_handler) | |
signal.signal(signal.SIGTERM, signal_handler) | |
# Create Gradio interface | |
demo = create_gradio_interface() | |
demo.launch() |