GitBot / app.py
acecalisto3's picture
Update app.py
41a01ba verified
raw
history blame
19.1 kB
import sys
import signal
import shutil
import logging
import time
import os
from datetime import datetime
from typing import List, Dict, Any
import requests
import gradio as gr
import atexit
import subprocess
from urllib.parse import urlparse, quote
import webbrowser
import spaces
device = "cuda"
@spaces.GPU()
def stream_chat(
message: str,
history: list,
system_prompt: str,
temperature: float = 0.5,
max_new_tokens: int = 16000,
top_p: float = 1.0,
top_k: int = 15,
penalty: float = 0.9,
):
print(f'message: {message}')
print(f'history: {history}')
conversation = [
{"role": "system", "content": system_prompt}
]
for prompt, answer in history:
conversation.extend([
{"role": "user", "content": prompt},
{"role": "assistant", "content": answer},
])
conversation.append({"role": "user", "content": message})
input_ids = tokenizer.apply_chat_template(conversation, add_generation_prompt=True, return_tensors="pt").to(model.device)
streamer = TextIteratorStreamer(tokenizer, timeout=60.0, skip_prompt=True, skip_special_tokens=True)
generate_kwargs = dict(
input_ids=input_ids,
max_new_tokens = max_new_tokens,
do_sample = False if temperature == 0 else True,
top_p = top_p,
top_k = top_k,
temperature = temperature,
eos_token_id=[128001,128008,128009],
streamer=streamer,
)
# Constants
INPUT_DIRECTORY = 'input'
OUTPUT_DIRECTORY = 'output'
LOGS_DIRECTORY = 'logs'
RESOLUTIONS_DIRECTORY = 'resolutions'
REPOS_DIRECTORY = 'repos'
# Set up logging
def initialize_logger() -> logging.Logger:
log_file = f"{LOGS_DIRECTORY}/github_bot_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log"
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(log_file),
logging.StreamHandler()
]
)
return logging.getLogger(__name__)
# Initialize environment and logger
def initialize_environment():
directories = [LOGS_DIRECTORY, RESOLUTIONS_DIRECTORY, REPOS_DIRECTORY, INPUT_DIRECTORY, OUTPUT_DIRECTORY]
for directory in directories:
os.makedirs(directory, exist_ok=True)
initialize_environment()
logger = initialize_logger() # Initialize logger globally
# GitHub API handler
class GitHubAPI:
def __init__(self, token: str):
self.token = token
self.headers = {
'Authorization': f'token {token}',
'Accept': 'application/vnd.github.v3+json'
}
self.base_url = "https://api.github.com"
def _check_rate_limit(self) -> bool:
try:
response = requests.get(f"{self.base_url}/rate_limit", headers=self.headers)
response.raise_for_status()
limits = response.json()
remaining = limits['resources']['core']['remaining']
reset_time = limits['resources']['core']['reset']
if remaining < 10:
wait_time = max(0, reset_time - int(time.time()))
if wait_time > 0:
logger.warning(f"Rate limit nearly exceeded. Waiting {wait_time} seconds before retrying...")
time.sleep(wait_time)
return False
return True
except requests.exceptions.RequestException as e:
logger.error(f"Error checking rate limit: {str(e)}. Retrying...")
return True
def get_repository(self, owner: str, repo: str) -> Dict:
try:
response = requests.get(f"{self.base_url}/repos/{owner}/{repo}", headers=self.headers)
response.raise_for_status()
return response.json()
except requests.HTTPError as e:
logger.error(f"HTTP error getting repository info for {owner}/{repo}: {str(e)}. Please check the repository details.")
raise
except Exception as e:
logger.error(f"Error getting repository info: {str(e)}")
raise
def get_issues(self, owner: str, repo: str, state: str = 'open') -> List[Dict]:
if not self._check_rate_limit():
return []
try:
response = requests.get(f"{self.base_url}/repos/{owner}/{repo}/issues", headers=self.headers, params={'state': state})
response.raise_for_status()
issues = response.json()
return [issue for issue in issues if 'pull_request' not in issue]
except Exception as e:
logger.error(f"Error fetching issues for repository {owner}/{repo}: {str(e)}. Please verify the repository and token.")
return []
# GitHub Bot
class GitHubBot:
def __init__(self):
self.github_api = None
def initialize_api(self, token: str):
self.github_api = GitHubAPI(token)
def fetch_issues(self, token: str, owner: str, repo: str) -> List[Dict]:
try:
self.initialize_api(token)
return self.github_api.get_issues(owner, repo)
except Exception as e:
logger.error(f"Error fetching issues for repository {owner}/{repo}: {str(e)}")
return []
def resolve_issue(self, token: str, owner: str, repo: str, issue_number: int, resolution: str, forked_repo: str) -> str:
try:
self.initialize_api(token)
self.github_api.get_repository(owner, repo)
# Create resolution file
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
resolution_file = f"{RESOLUTIONS_DIRECTORY}/resolution_{issue_number}_{timestamp}.md"
with open(resolution_file, "w") as f:
f.write(f"# Resolution for Issue #{issue_number}\n\n{resolution}")
# Clone the forked repo
subprocess.run(['git', 'clone', forked_repo, '/tmp/' + forked_repo.split('/')[-1]], check=True)
# Change to the cloned directory
os.chdir('/tmp/' + forked_repo.split('/')[-1])
# Assuming manual intervention now
input("Apply the fix manually and stage the changes (press ENTER)? ")
# Commit and push the modifications
subprocess.run(['git', 'add', '.'], check=True)
subprocess.run(['git', 'commit', '-m', f"Resolved issue #{issue_number} ({quote(resolution)})"], check=True)
subprocess.run(['git', 'push', 'origin', 'HEAD'], check=True)
# Open Pull Request page
webbrowser.open(f'https://github.com/{forked_repo.split("/")[-1]}/compare/master...{owner}:{forked_repo.split("/")[-1]}_resolved_issue_{issue_number}')
return f"Resolution saved: {resolution_file}"
except Exception as e:
error_msg = f"Error resolving issue #{issue_number} in repository {owner}/{repo}: {str(e)}"
logger.error(error_msg)
return error_msg
def handle_issue_selection(token, owner, repo, issue_number, resolution, forked_repo):
bot = GitHubBot()
result = bot.resolve_issue(token, owner, repo, issue_number, resolution, forked_repo)
return result
def extract_info_from_url(url: str) -> Dict[str, Any]:
info = {}
try:
response = requests.get(url)
response.raise_for_status()
info['status_code'] = response.status_code
info['headers'] = dict(response.headers)
info['content'] = response.text[:500] # Limit content to first 500 characters for brevity
parsed_url = urlparse(url)
if 'github.com' in parsed_url.netloc:
parts = parsed_url.path.split('/')
if len(parts) > 2:
owner = parts[1]
repo = parts[2]
issues = bot.fetch_issues(github_token, owner, repo)
info['issues'] = issues
elif 'huggingface.co' in parsed_url.netloc:
# Add Hugging Face specific handling if needed
pass
except requests.HTTPError as e:
info['error'] = f"HTTP error: {str(e)}"
except Exception as e:
info['error'] = f"Error: {str(e)}"
return info
# Initialize GitHubBot globally
bot = GitHubBot()
# Define missing functions with validation
def fetch_issues(token, repo_url):
try:
parts = repo_url.split('/')
if len(parts) < 2:
raise ValueError("Repository URL is not in the correct format. Expected format: 'owner/repo'.")
owner, repo = parts[-2], parts[-1]
issues = bot.fetch_issues(token, owner, repo)
return issues
except Exception as e:
return str(e)
def resolve_issue(token, repo_url, issue_number, resolution, forked_repo_url):
try:
parts = repo_url.split('/')
if len(parts) < 2:
raise ValueError("Repository URL is not in the correct format. Expected format: 'owner/repo'.")
owner, repo = parts[-2], parts[-1]
result = bot.resolve_issue(token, owner, repo, issue_number, resolution, forked_repo_url)
return result
except Exception as e:
return str(e)
def extract_info(url):
try:
info = extract_info_from_url(url)
return info
except Exception as e:
return str(e)
# HTML and CSS integration
custom_html = """
<!DOCTYPE html>
<html>
<head>
<title>GitHub Issue Manager</title>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<link href="https://cdn.jsdelivr.net/npm/tailwindcss@2.2.19/dist/tailwind.min.css" rel="stylesheet">
<link href="https://cdn.jsdelivr.net/npm/daisyui@3.1.0/dist/full.css" rel="stylesheet" type="text/css" />
<style>
body {
background: linear-gradient(to bottom right, #121212, #303030) !important;
color: #e0e0e0;
font-family: 'Roboto', sans-serif;
overflow-x: hidden; /* Prevent horizontal scrollbar */
}
.card {
border-radius: 1.5rem;
box-shadow: 0 15px 25px rgba(0, 0, 0, 0.2);
margin-bottom: 20px;
}
.input, .select, .textarea {
border: 2px solid #4a4a4a;
border-radius: 0.75rem;
padding-inline: 1.5rem;
padding-block: 0.75rem;
}
h1, h2, h3 {
color: #e0e0e0;
}
.output-area {
padding: 1.5rem;
border-radius: 0.75rem;
}
.btn {
border-radius: 0.75rem;
padding-block: 0.75rem;
padding-inline: 1.5rem;
}
.btn-primary {
background-color: #7928CA;
color: white;
border: 2px solid #7928CA;
}
.btn-primary:hover {
background-color: #5e22a1;
}
.btn-success {
background-color: #22c55e;
color: white;
border: 2px solid #22c55e;
}
.btn-success:hover {
background-color: #1a9a46;
}
</style>
</head>
<body class="bg-gray-900">
<div class="container mx-auto p-4">
<h1 class="text-4xl md:text-5xl font-extrabold text-center mb-8">GitHub Issue Manager</h1>
<!-- GitHub Token & Repo URL -->
<div class="card bg-gray-800 p-8">
<div class="form-control">
<label class="label">
<span class="label-text">GitHub Token</span>
</label>
<input type="password" placeholder="Enter your GitHub Personal Access Token" class="input input-bordered input-primary" id="github-token">
</div>
<div class="form-control">
<label class="label">
<span class="label-text">Repository URL</span>
</label>
<input type="text" placeholder="Enter the full GitHub repository URL" class="input input-bordered input-primary" id="repo-url">
</div>
</div>
<!-- Fetch & Resolve Section -->
<div class="card bg-gray-800 p-8">
<div class="flex justify-between gap-4">
<button class="btn btn-primary" id="fetch-issues">Fetch Issues</button>
<select class="select select-primary w-full max-w-xs" id="issue-dropdown">
<option disabled selected>Select Issue</option>
</select>
</div>
<div class="form-control mt-4">
<label class="label">
<span class="label-text">Resolution</span>
</label>
<textarea class="textarea textarea-primary" placeholder="Enter the resolution details" id="resolution-textarea"></textarea>
</div>
<div class="form-control mt-4">
<label class="label">
<span class="label-text">Forked Repository URL</span>
</label>
<input type="text" placeholder="URL to your forked repository (Optional)" class="input input-bordered input-primary" id="forked-repo-url">
</div>
<div class="form-control mt-4">
<button class="btn btn-success" id="resolve-issue">Resolve Issue</button>
</div>
</div>
<!-- Output Area -->
<div class="card bg-gray-800 p-8 mt-4">
<label class="label">
<span class="label-text">Output</span>
</label>
<textarea class="textarea textarea-primary h-48" id="output-textarea" readonly></textarea>
</div>
<!-- URL to Extract -->
<div class="card bg-gray-800 p-8 mt-4">
<div class="form-control">
<label class="label">
<span class="label-text">URL</span>
</label>
<input type="text" placeholder="Enter a URL to extract information" class="input input-bordered input-primary" id="url-textbox">
</div>
<div class="form-control">
<button class="btn btn-primary" id="extract-info">Extract Info</button>
</div>
</div>
</div>
<script>
const githubTokenInput = document.getElementById(' github-token');
const repoUrlInput = document.getElementById('repo-url');
const fetchIssuesButton = document.getElementById('fetch-issues');
const issueDropdown = document.getElementById('issue-dropdown');
const resolutionTextarea = document.getElementById('resolution-textarea');
const forkedRepoUrlInput = document.getElementById('forked-repo-url');
const resolveIssueButton = document.getElementById('resolve-issue');
const outputTextarea = document.getElementById('output-textarea');
const urlTextbox = document.getElementById('url-textbox');
const extractInfoButton = document.getElementById('extract-info');
fetchIssuesButton.addEventListener('click', async () => {
const token = githubTokenInput.value;
const repoUrl = repoUrlInput.value;
if (!token || !repoUrl) {
outputTextarea.value = "Please provide both GitHub Token and Repository URL.";
return;
}
try {
const response = await fetch('/fetch-issues', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
body: JSON.stringify({ githubToken: token, repoUrl: repoUrl }),
});
if (!response.ok) {
throw new Error(`HTTP error! status: ${response.status}`);
}
const data = await response.json();
if (data.error) {
outputTextarea.value = data.error;
} else {
issueDropdown.innerHTML = '';
data.issues.forEach(issue => {
const option = document.createElement('option');
option.value = issue.number;
option.text = `${issue.number}: ${issue.title}`;
issueDropdown.add(option);
});
}
} catch (error) {
outputTextarea.value = `Error fetching issues: ${error.message}`;
}
});
resolveIssueButton.addEventListener('click', async () => {
const token = githubTokenInput.value;
const repoUrl = repoUrlInput.value;
const issueNumber = issueDropdown.value;
const resolution = resolutionTextarea.value;
const forkedRepoUrl = forkedRepoUrlInput.value;
if (!token || !repoUrl || !issueNumber || !resolution) {
outputTextarea.value = "Please provide all required fields.";
return;
}
try {
const response = await fetch('/resolve-issue', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
body: JSON.stringify({ githubToken: token, repoUrl: repoUrl, issueNumber: issueNumber, resolution: resolution, forkedRepoUrl: forkedRepoUrl }),
});
if (!response.ok) {
throw new Error(`HTTP error! status: ${response.status}`);
}
const data = await response.json();
if (data.error) {
outputTextarea.value = data.error;
} else {
outputTextarea.value = data.output;
}
} catch (error) {
outputTextarea.value = `Error resolving issue: ${error.message}`;
}
});
extractInfoButton.addEventListener('click', async () => {
const url = urlTextbox.value;
if (!url) {
outputTextarea.value = "Please provide a URL.";
return;
}
try {
const response = await fetch('/extract-info', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
body: JSON.stringify({ url: url }),
});
if (!response.ok) {
throw new Error(`HTTP error! status: ${response.status}`);
}
const data = await response.json();
if (data.error) {
outputTextarea.value = data.error;
} else {
outputTextarea.value = JSON.stringify(data, null, 2);
}
} catch (error) {
outputTextarea.value = `Error extracting info: ${error.message}`;
}
});
</script>
</body>
</html>
"""
def create_gradio_interface():
with gr.Blocks(css=None, theme=None) as demo:
gr.HTML(custom_html)
return demo
# Cleanup function
def cleanup():
try:
temp_dirs = [REPOS_DIRECTORY]
for dir_name in temp_dirs:
if os.path.exists(dir_name):
shutil.rmtree(dir_name)
logging.shutdown()
except Exception as e:
print(f"Error during cleanup: {str(e)}")
# Signal handler
def signal_handler(signum, frame):
logger.info(f"Received signal {signum}")
cleanup()
sys.exit(0)
if __name__ == "__main__":
# Register cleanup handlers
atexit.register(cleanup)
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
# Create Gradio interface
demo = create_gradio_interface()
demo.launch()