import requests
import pandas as pd
from datetime import datetime
import gradio as gr
import pickle
from sentence_transformers import SentenceTransformer, util
from wordcloud import WordCloud
import matplotlib.pyplot as plt
import base64
from io import BytesIO
import json
from openai import OpenAI
from graphviz import Source
import re
from PIL import Image
import os
import uuid
import logging # Add this line
import boto3
from botocore.exceptions import NoCredentialsError, PartialCredentialsError
# Set up logging
logging.basicConfig(level=logging.DEBUG)
# Access the secrets (no need to explicitly set them in the code)
aws_access_key_id = os.getenv('AWS_ACCESS_KEY_ID')
aws_secret_access_key = os.getenv('AWS_SECRET_ACCESS_KEY')
aws_region = os.getenv('AWS_DEFAULT_REGION', 'us-east-1') # Default region if not set
# Initialize the S3 client
#s3_client = boto3.client('s3')
# Initialize the S3 client with these credentials
s3_client = boto3.client(
's3',
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
region_name=aws_region
)
BUCKET_NAME = "wt-video-dl" # Replace with your bucket name
#IMAGE_DIR = "./images"
IMAGE_DIR = "/tmp"
os.makedirs(IMAGE_DIR, exist_ok=True)
GITHUB_API_URL = "https://api.github.com/search/repositories"
ACCESS_TOKEN = os.getenv("github_pat")
if not ACCESS_TOKEN:
raise ValueError("Missing GitHub Personal Access Token.")
HEADERS = {"Authorization": f"Bearer {ACCESS_TOKEN}"}
OPENAI_API_KEY = os.getenv("openai_key")
if not OPENAI_API_KEY:
raise ValueError("Missing OpenAI API Key. Please set it as a secret in Hugging Face.")
client = OpenAI(api_key=OPENAI_API_KEY)
ALLOWED_EXTENSIONS = [".py", ".js", ".md", ".toml", ".yaml"]
with open("github_topics_embeddings.pkl", "rb") as f:
topic_data = pickle.load(f)
topics = topic_data["topics"]
embeddings = topic_data["embeddings"]
discovered_repos = []
# Function to upload image to S3
def upload_image_to_s3(image_data, filename):
try:
# Upload the image data to S3
s3_client.put_object(
Bucket=BUCKET_NAME,
Key=filename,
Body=image_data,
ContentType='image/png',
)
# Generate the S3 URL
s3_url = f"https://{BUCKET_NAME}.s3.amazonaws.com/{filename}"
return s3_url
except (NoCredentialsError, PartialCredentialsError) as e:
return f"Error with AWS credentials: {str(e)}"
except Exception as e:
return f"Error uploading image to S3: {str(e)}"
def search_similar_topics(input_text):
if not input_text.strip():
return "Enter topics to see suggestions."
try:
model = SentenceTransformer('all-MiniLM-L6-v2')
query_embedding = model.encode(input_text, convert_to_tensor=True)
similarities = util.pytorch_cos_sim(query_embedding, embeddings)
top_indices = similarities[0].argsort(descending=True)[:10]
return ", ".join([topics[i] for i in top_indices])
except Exception as e:
return f"Error in generating suggestions: {str(e)}"
def search_repositories(query, sort="stars", order="desc", total_repos=10):
all_repos = []
per_page = 100 if total_repos > 100 else total_repos
total_pages = (total_repos // per_page) + 1
for page in range(1, total_pages + 1):
params = {
"q": query,
"sort": sort,
"order": order,
"per_page": per_page,
"page": page,
}
response = requests.get(GITHUB_API_URL, headers=HEADERS, params=params)
if response.status_code != 200:
raise Exception(f"GitHub API error: {response.status_code} {response.text}")
items = response.json().get("items", [])
if not items:
break
all_repos.extend(items)
if len(all_repos) >= total_repos:
break
return all_repos[:total_repos]
def calculate_additional_metrics(repo):
created_date = datetime.strptime(repo["created_at"], "%Y-%m-%dT%H:%M:%SZ")
updated_date = datetime.strptime(repo["updated_at"], "%Y-%m-%dT%H:%M:%SZ")
days_since_creation = (datetime.utcnow() - created_date).days
days_since_update = (datetime.utcnow() - updated_date).days
star_velocity = repo["stargazers_count"] / days_since_creation if days_since_creation > 0 else 0
fork_to_star_ratio = (repo["forks_count"] / repo["stargazers_count"] * 100) if repo["stargazers_count"] > 0 else 0
hidden_gem = "Yes" if repo["stargazers_count"] < 500 and repo["forks_count"] < 50 else "No"
hidden_gem_trend = "Rising" if star_velocity > 1 else "Stable"
rising_score = ((star_velocity * 10) +
(repo["forks_count"] * 0.2) +
(repo.get("watchers_count", 0) * 0.3) +
(1 / (days_since_update + 1) * 20) -
(repo["open_issues_count"] * 0.01))
legacy_score = (repo["stargazers_count"] * 0.6) + \
(repo["forks_count"] * 0.3) + \
(repo.get("watchers_count", 0) * 0.1) - \
(repo["open_issues_count"] * 0.05)
owner, repo_name = repo["owner"]["login"], repo["name"]
repo_details_url = f"https://api.github.com/repos/{owner}/{repo_name}"
response = requests.get(repo_details_url, headers=HEADERS)
if response.status_code == 200:
repo_details = response.json()
actual_watchers = repo_details.get("subscribers_count", 0)
else:
actual_watchers = 0
watcher_to_stars_ratio = (actual_watchers / repo["stargazers_count"]) * 100 if repo["stargazers_count"] > 0 else 0
return {
"Rising Score": round(rising_score, 2),
"Legacy Score": round(legacy_score, 2),
"Star Velocity (Stars/Day)": round(star_velocity, 2),
"Fork-to-Star Ratio (%)": round(fork_to_star_ratio, 2),
"Watchers": actual_watchers,
"Watcher-to-Stars Ratio (%)": round(watcher_to_stars_ratio, 2),
"Language": repo.get("language", "N/A"),
"Topics": ", ".join(repo.get("topics", [])),
"Hidden Gem": hidden_gem,
"Hidden Gem Trend": hidden_gem_trend,
"Open Issues": repo["open_issues_count"],
"Created At": repo["created_at"],
"Last Updated": repo["pushed_at"],
"days_since_creation": round(days_since_creation, 2),
"days_since_update": round(days_since_update, 2),
"URL": repo["html_url"],
}
def gradio_interface(topics, start_date, language_filter, stars_min, stars_max, forks_min, forks_max, total_repos, sort_order):
global discovered_repos
if not topics.strip() and not start_date.strip():
return pd.DataFrame(), "Please provide at least a topic or a start date."
topics_list = [topic.strip() for topic in topics.split(",") if topic.strip()]
stars_range = (stars_min, stars_max)
forks_range = (forks_min, forks_max)
df = pd.DataFrame()
all_repos_data = []
try:
if not topics_list:
query = f"stars:{stars_range[0]}..{stars_range[1]} forks:{forks_range[0]}..{forks_range[1]}"
if start_date.strip():
query += f" created:>{start_date.strip()}"
if language_filter:
query += f" language:{language_filter}"
repos = search_repositories(query=query, sort=sort_order, total_repos=total_repos)
for repo in repos:
repo_data = {
"Name": repo["name"],
"Owner": repo["owner"]["login"],
"Stars": repo["stargazers_count"],
"Forks": repo["forks_count"],
"Description": repo.get("description", "N/A"),
}
repo_data.update(calculate_additional_metrics(repo))
all_repos_data.append(repo_data)
else:
for topic in topics_list:
query = f"topic:{topic} stars:{stars_range[0]}..{stars_range[1]} forks:{forks_range[0]}..{forks_range[1]}"
if start_date.strip():
query += f" created:>{start_date.strip()}"
if language_filter:
query += f" language:{language_filter}"
repos = search_repositories(query=query, sort=sort_order, total_repos=total_repos)
for repo in repos:
repo_data = {
"Name": repo["name"],
"Owner": repo["owner"]["login"],
"Stars": repo["stargazers_count"],
"Forks": repo["forks_count"],
"Description": repo.get("description", "N/A"),
}
repo_data.update(calculate_additional_metrics(repo))
all_repos_data.append(repo_data)
discovered_repos.append(f"{repo['owner']['login']}/{repo['name']}")
if not all_repos_data:
return pd.DataFrame(), "No repositories found matching the criteria."
discovered_repos = list(set(discovered_repos))
df = pd.DataFrame(all_repos_data)
except Exception as e:
print(f"Error: {e}")
return pd.DataFrame(), f"Error fetching repositories: {str(e)}"
csv_file = None
if not df.empty:
csv_file = "discovered_repositories.csv"
df.to_csv(csv_file, index=False)
return df, csv_file
def fetch_org_repositories(org_names, language_filter, stars_min, stars_max, forks_min, forks_max, sort_order, total_repos):
try:
org_list = [org.strip() for org in org_names.split(",") if org.strip()]
if not org_list:
return pd.DataFrame(), "Enter at least one organization."
all_repos_data = []
for org in org_list:
query = f"user:{org} stars:{stars_min}..{stars_max} forks:{forks_min}..{forks_max}"
if language_filter:
query += f" language:{language_filter}"
repos = search_repositories(query=query, sort=sort_order, total_repos=total_repos)
for repo in repos:
repo_data = {
"Name": repo["name"],
"Owner": repo["owner"]["login"],
"Stars": repo["stargazers_count"],
"Forks": repo["forks_count"],
"Description": repo.get("description", "N/A"),
}
repo_data.update(calculate_additional_metrics(repo))
all_repos_data.append(repo_data)
if not all_repos_data:
return pd.DataFrame(), "No repositories found for the specified organizations."
df = pd.DataFrame(all_repos_data)
csv_file = "organization_repositories.csv"
df.to_csv(csv_file, index=False)
return df, csv_file
except Exception as e:
print(f"Error in fetch_org_repositories: {e}")
return pd.DataFrame(), f"Error: {str(e)}"
def get_discovered_repos():
global discovered_repos
return discovered_repos
def process_readme(owner, repo, branch):
url = f"https://raw.githubusercontent.com/{owner}/{repo}/{branch}/README.md"
response = requests.get(url, headers=HEADERS)
if response.status_code == 200:
readme_content = response.text
else:
return f"Failed to fetch README content from branch {branch}.", "", "", None
MODEL = "gpt-4o-mini"
completion = client.chat.completions.create(
model=MODEL,
messages=[
{"role": "system", "content": "You are a helpful assistant that extracts keywords, named entities, and generates summaries from text."},
{"role": "user", "content": f"""
Perform the following tasks on the following README file:
1. Extract the top 25 most important keywords from the text only.
2. Extract All Major named entities (e.g., people, organizations, technologies).
3. Summarize the content in one paragraph.
Return the results in the following JSON format:
{{
"keywords": ["keyword1", "keyword2", ...],
"entities": ["entity1", "entity2", ...],
"summary": "A concise summary of the README."
}}
README file:
{readme_content}
"""}
],
response_format={"type": "json_object"}
)
result = completion.choices[0].message.content
result_json = json.loads(result)
keywords = ", ".join(result_json["keywords"])
entities = ", ".join(result_json["entities"])
summary = result_json["summary"]
wordcloud = WordCloud(width=800, height=400, background_color='white').generate(keywords)
plt.figure(figsize=(10, 5))
plt.imshow(wordcloud, interpolation='bilinear')
plt.axis('off')
return keywords, entities, summary, plt
def get_branches(owner, repo):
url = f"https://api.github.com/repos/{owner}/{repo}/branches"
response = requests.get(url, headers=HEADERS)
if response.status_code == 200:
branches = [branch["name"] for branch in response.json()]
return branches
else:
return []
def get_default_branch(owner, repo):
url = f"https://api.github.com/repos/{owner}/{repo}"
response = requests.get(url, headers=HEADERS)
if response.status_code == 200:
repo_data = response.json()
return repo_data["default_branch"]
else:
return None
def fetch_files(owner, repo, path=""):
url = f"https://api.github.com/repos/{owner}/{repo}/contents/{path}" if path else f"https://api.github.com/repos/{owner}/{repo}/contents"
response = requests.get(url, headers=HEADERS)
if response.status_code != 200:
return f"Failed to fetch files: {response.status_code}", []
files = []
for item in response.json():
if item["type"] == "file": # Only add files
if any(item["name"].endswith(ext) for ext in ALLOWED_EXTENSIONS):
files.append({
"name": item["name"],
"path": item["path"],
"download_url": item["download_url"]
})
elif item["type"] == "dir":
sub_files = fetch_files(owner, repo, item["path"])
files.extend(sub_files)
return files
def fetch_file_content(owner, repo, branch, file_path):
file_url = f"https://raw.githubusercontent.com/{owner}/{repo}/{branch}/{file_path}"
response = requests.get(file_url)
if response.status_code == 200:
return response.text
else:
return f"Failed to fetch file content: {response.status_code}"
def ask_code_question(code_content, question):
if not code_content.strip():
return "No code content available to analyze."
if not question.strip():
return "Please enter a question about the code."
prompt = f"""
Here is a Python file from a GitHub repository:
{code_content}
Please answer the following question about this file:
- {question}
"""
try:
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{"role": "system", "content": "You are a helpful assistant skilled in understanding code."},
{"role": "user", "content": prompt}
]
)
return response.choices[0].message.content.strip()
except Exception as e:
return f"Error querying the LLM: {str(e)}"
def upload_image_to_imgur(image_path):
"""
Upload an image to Imgur and return the hosted URL.
Args:
image_path (str): Path to the image file to upload.
Returns:
str: The URL of the uploaded image or an error message.
"""
url = "https://api.imgur.com/3/image"
headers = {
"Authorization": f"Client-ID {IMGUR_CLIENT_ID}"
}
with open(image_path, "rb") as image_file:
payload = {
"image": image_file,
"type": "file"
}
try:
response = requests.post(url, headers=headers, files=payload)
if response.status_code == 200:
data = response.json()
return data["data"]["link"] # URL of the uploaded image
else:
return f"Failed to upload image. Status code: {response.status_code}, Response: {response.text}"
except Exception as e:
return f"Error uploading image to Imgur: {str(e)}"
def generate_dot_code_from_code(code_content, diagram_type):
if not code_content.strip():
return "No code content available to analyze."
prompt = f"""
Here is some Python code from a GitHub repository:
{code_content}
Please generate a {diagram_type} for this code in Graphviz DOT/digraph format. Ensure the DOT code is valid and renderable.
Don't include any other text. Don't provide any other explanatory commentary.
Ensure the DOT code includes all necessary opening and closing brackets {"brackets"} for graphs and subgraphs.
"""
try:
response = client.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "system", "content": "You are a helpful assistant that generates Graphviz DOT code for visualizing Python code. You are restricted to only generate Graphviz Code starting with digraph & ending with }"},
{"role": "user", "content": prompt}
]
)
raw_dot_code = response.choices[0].message.content.strip()
validated_dot_code = validate_and_fix_dot_code(raw_dot_code) # Fix any missing brackets
pattern = r"digraph\b[\s\S]*?^\}"
match = re.search(pattern, validated_dot_code,re.MULTILINE | re.DOTALL)
if match:
validated_dot_code = match.group(0) # Extract the matched content
else:
return "Failed to extract valid Graphviz code."
return validated_dot_code
except Exception as e:
return f"Error querying GPT-4o-mini: {str(e)}"
def validate_and_fix_dot_code(dot_code):
open_brackets = dot_code.count("{")
close_brackets = dot_code.count("}")
if open_brackets > close_brackets:
missing_brackets = open_brackets - close_brackets
dot_code += "}" * missing_brackets
return dot_code
def render_dot_code(dot_code, filename=None):
if not filename:
filename = f"diagram_{uuid.uuid4().hex}.png" # Generate a unique filename
try:
# Render the DOT code to an in-memory PNG
src = Source(dot_code, format="png")
rendered_png = src.pipe() # In-memory PNG image data
# Upload the rendered PNG to S3
s3_url = upload_image_to_s3(rendered_png, filename)
# Return the S3 URL
return s3_url
except Exception as e:
return f"Error rendering or uploading diagram: {str(e)}"
import time
def handle_generate_diagram(code_content, diagram_type, retries=5, wait_time=1):
s3_url = render_dot_code(generate_dot_code_from_code(code_content, diagram_type))
if s3_url.startswith("http"): # Check if the response is a valid URL
return f''
else:
return f"
Error: {s3_url}
" # Return the error message in HTML format # Gradio Interface with gr.Blocks() as demo: # Tab 1: Repository Discovery with gr.Tab("Repository Discovery"): with gr.Row(): topics_input = gr.Textbox( label="Topics (comma-separated, leave empty to fetch by date only)", placeholder="e.g., machine-learning, deep-learning (leave empty for date-based search)" ) similar_topics = gr.Textbox( label="Similar Topics (based on embeddings)", interactive=False ) gr.Button("Get Similar Topics").click( search_similar_topics, inputs=[topics_input], outputs=[similar_topics] ) with gr.Row(): start_date_input = gr.Textbox( label="Start Date (YYYY-MM-DD, leave empty if not filtering by date)", placeholder="Set to filter recent repositories by date or leave empty" ) language_filter = gr.Dropdown( choices=["", "Python", "JavaScript", "Java", "C++", "Ruby", "Go"], label="Language Filter", value="" ) stars_min = gr.Number(label="Stars Min", value=10) stars_max = gr.Number(label="Stars Max", value=1000) with gr.Row(): forks_min = gr.Number(label="Forks Min", value=0) forks_max = gr.Number(label="Forks Max", value=500) total_repos = gr.Number(label="Total Repositories", value=10, step=10) sort_order = gr.Dropdown( choices=["stars", "forks", "updated"], label="Sort Order", value="stars" ) with gr.Row(): output_data = gr.Dataframe(label="Discovered Repositories") output_file = gr.File(label="Download CSV", file_count="single") gr.Button("Discover Repositories").click( gradio_interface, inputs=[ topics_input, start_date_input, language_filter, stars_min, stars_max, forks_min, forks_max, total_repos, sort_order ], outputs=[output_data, output_file] ) # Tab 2: Organization Watch with gr.Tab("Organization Watch"): with gr.Row(): org_input = gr.Textbox( label="Organizations (comma-separated)", placeholder="e.g., facebookresearch, openai" ) with gr.Row(): language_filter = gr.Dropdown( choices=["", "Python", "JavaScript", "Java", "C++", "Ruby", "Go"], label="Language Filter", value="" ) stars_min = gr.Number(label="Stars Min", value=10) stars_max = gr.Number(label="Stars Max", value=1000) with gr.Row(): forks_min = gr.Number(label="Forks Min", value=0) forks_max = gr.Number(label="Forks Max", value=500) total_repos = gr.Number(label="Total Repositories", value=10, step=10) sort_order = gr.Dropdown( choices=["stars", "forks", "updated"], label="Sort Order", value="stars" ) with gr.Row(): output_data = gr.Dataframe(label="Repositories by Organizations") output_file = gr.File(label="Download CSV", file_count="single") gr.Button("Fetch Organization Repositories").click( fetch_org_repositories, inputs=[ org_input, language_filter, stars_min, stars_max, forks_min, forks_max, sort_order, total_repos ], outputs=[output_data, output_file] ) # Tab 3: Code Analysis with gr.Tab("Code Analysis"): with gr.Row(): repo_dropdown = gr.Dropdown( label="Select Repository", choices=[], interactive=True ) refresh_button = gr.Button("Refresh Repositories") with gr.Row(): branch_dropdown = gr.Dropdown( label="Select Branch", choices=[], interactive=True ) with gr.Row(): keywords_output = gr.Textbox(label="Keywords") entities_output = gr.Textbox(label="Entities") with gr.Row(): summary_output = gr.Textbox(label="Summary") wordcloud_output = gr.Plot(label="Word Cloud") with gr.Row(): files_list = gr.Dropdown( label="Files in Repository", choices=[], interactive=True ) with gr.Row(): file_content_box = gr.Textbox( label="File Content", lines=20, interactive=True ) with gr.Row(): question_input = gr.Textbox( label="Ask a Question", placeholder="Enter your question about the code...", lines=1 ) question_button = gr.Button("Get Answer") with gr.Row(): answer_output = gr.Textbox(label="Bot's Answer", lines=10, interactive=False) with gr.Row(): diagram_type = gr.Dropdown( label="Select Diagram Type", choices=["Call Graph", "Data Flow Diagram", "Sequence Diagram", "Class Diagram", "Component Diagram", "Workflow Diagram"], value="Call Graph" ) generate_diagram_button = gr.Button("Generate Diagram") with gr.Row(): diagram_output = gr.HTML( label="Generated Diagram", ) question_button.click( ask_code_question, inputs=[file_content_box, question_input], outputs=[answer_output] ) def generate_and_render_diagram(code_content, diagram_type): dot_code = generate_dot_code_from_code(code_content, diagram_type) if not dot_code.strip().startswith("digraph"): return "Invalid DOT code generated." unique_filename = f"diagram_{uuid.uuid4().hex}" return render_dot_code(dot_code, filename=unique_filename) generate_diagram_button.click( handle_generate_diagram, inputs=[file_content_box, diagram_type], outputs=[diagram_output] ) refresh_button.click( lambda: gr.update(choices=get_discovered_repos()), inputs=[], outputs=[repo_dropdown] ) def update_branches(repo): if repo: owner, repo_name = repo.split("/") branches = get_branches(owner, repo_name) default_branch = get_default_branch(owner, repo_name) return gr.update(choices=branches, value=default_branch) return gr.update(choices=[], value=None) repo_dropdown.change( update_branches, inputs=[repo_dropdown], outputs=[branch_dropdown] ) def analyze_readme(repo, branch): if repo and branch: owner, repo_name = repo.split("/") return process_readme(owner, repo_name, branch) return "No repository or branch selected.", "", "", None repo_dropdown.change( analyze_readme, inputs=[repo_dropdown, branch_dropdown], outputs=[keywords_output, entities_output, summary_output, wordcloud_output] ) branch_dropdown.change( analyze_readme, inputs=[repo_dropdown, branch_dropdown], outputs=[keywords_output, entities_output, summary_output, wordcloud_output] ) def update_files(repo): global files_data if repo: owner, repo_name = repo.split("/") files = fetch_files(owner, repo_name) files_data = files file_names = [f"{file['name']} ({file['path']})" for file in files] return gr.update(choices=file_names, value=None) files_data = [] return gr.update(choices=[], value=None) repo_dropdown.change( lambda repo: update_files(repo), inputs=[repo_dropdown], outputs=[files_list] ) def display_file_content(repo, branch, selected_file): if repo and branch and selected_file: owner, repo_name = repo.split("/") file_path = selected_file.split(" (")[1][:-1] content = fetch_file_content(owner, repo_name, branch, file_path) return content return "No file selected." files_list.change( display_file_content, inputs=[repo_dropdown, branch_dropdown, files_list], outputs=[file_content_box] ) #demo.launch() #demo.launch(share=True, server_name="0.0.0.0", server_port=7860, static_dirs={"images": "./images"}) demo.launch(share=True)