import requests import pandas as pd from datetime import datetime import gradio as gr import pickle from sentence_transformers import SentenceTransformer, util from wordcloud import WordCloud import matplotlib.pyplot as plt import base64 from io import BytesIO import json from openai import OpenAI from graphviz import Source import re from PIL import Image import os import uuid import logging # Add this line import boto3 from botocore.exceptions import NoCredentialsError, PartialCredentialsError # Set up logging logging.basicConfig(level=logging.DEBUG) # Access the secrets (no need to explicitly set them in the code) aws_access_key_id = os.getenv('AWS_ACCESS_KEY_ID') aws_secret_access_key = os.getenv('AWS_SECRET_ACCESS_KEY') aws_region = os.getenv('AWS_DEFAULT_REGION', 'us-east-1') # Default region if not set # Initialize the S3 client #s3_client = boto3.client('s3') # Initialize the S3 client with these credentials s3_client = boto3.client( 's3', aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key, region_name=aws_region ) BUCKET_NAME = "wt-video-dl" # Replace with your bucket name #IMAGE_DIR = "./images" IMAGE_DIR = "/tmp" os.makedirs(IMAGE_DIR, exist_ok=True) GITHUB_API_URL = "https://api.github.com/search/repositories" ACCESS_TOKEN = os.getenv("github_pat") if not ACCESS_TOKEN: raise ValueError("Missing GitHub Personal Access Token.") HEADERS = {"Authorization": f"Bearer {ACCESS_TOKEN}"} OPENAI_API_KEY = os.getenv("openai_key") if not OPENAI_API_KEY: raise ValueError("Missing OpenAI API Key. Please set it as a secret in Hugging Face.") client = OpenAI(api_key=OPENAI_API_KEY) ALLOWED_EXTENSIONS = [".py", ".js", ".md", ".toml", ".yaml"] with open("github_topics_embeddings.pkl", "rb") as f: topic_data = pickle.load(f) topics = topic_data["topics"] embeddings = topic_data["embeddings"] discovered_repos = [] # Function to upload image to S3 def upload_image_to_s3(image_data, filename): try: # Upload the image data to S3 s3_client.put_object( Bucket=BUCKET_NAME, Key=filename, Body=image_data, ContentType='image/png', ) # Generate the S3 URL s3_url = f"https://{BUCKET_NAME}.s3.amazonaws.com/{filename}" return s3_url except (NoCredentialsError, PartialCredentialsError) as e: return f"Error with AWS credentials: {str(e)}" except Exception as e: return f"Error uploading image to S3: {str(e)}" def search_similar_topics(input_text): if not input_text.strip(): return "Enter topics to see suggestions." try: model = SentenceTransformer('all-MiniLM-L6-v2') query_embedding = model.encode(input_text, convert_to_tensor=True) similarities = util.pytorch_cos_sim(query_embedding, embeddings) top_indices = similarities[0].argsort(descending=True)[:10] return ", ".join([topics[i] for i in top_indices]) except Exception as e: return f"Error in generating suggestions: {str(e)}" def search_repositories(query, sort="stars", order="desc", total_repos=10): all_repos = [] per_page = 100 if total_repos > 100 else total_repos total_pages = (total_repos // per_page) + 1 for page in range(1, total_pages + 1): params = { "q": query, "sort": sort, "order": order, "per_page": per_page, "page": page, } response = requests.get(GITHUB_API_URL, headers=HEADERS, params=params) if response.status_code != 200: raise Exception(f"GitHub API error: {response.status_code} {response.text}") items = response.json().get("items", []) if not items: break all_repos.extend(items) if len(all_repos) >= total_repos: break return all_repos[:total_repos] def calculate_additional_metrics(repo): created_date = datetime.strptime(repo["created_at"], "%Y-%m-%dT%H:%M:%SZ") updated_date = datetime.strptime(repo["updated_at"], "%Y-%m-%dT%H:%M:%SZ") days_since_creation = (datetime.utcnow() - created_date).days days_since_update = (datetime.utcnow() - updated_date).days star_velocity = repo["stargazers_count"] / days_since_creation if days_since_creation > 0 else 0 fork_to_star_ratio = (repo["forks_count"] / repo["stargazers_count"] * 100) if repo["stargazers_count"] > 0 else 0 hidden_gem = "Yes" if repo["stargazers_count"] < 500 and repo["forks_count"] < 50 else "No" hidden_gem_trend = "Rising" if star_velocity > 1 else "Stable" rising_score = ((star_velocity * 10) + (repo["forks_count"] * 0.2) + (repo.get("watchers_count", 0) * 0.3) + (1 / (days_since_update + 1) * 20) - (repo["open_issues_count"] * 0.01)) legacy_score = (repo["stargazers_count"] * 0.6) + \ (repo["forks_count"] * 0.3) + \ (repo.get("watchers_count", 0) * 0.1) - \ (repo["open_issues_count"] * 0.05) owner, repo_name = repo["owner"]["login"], repo["name"] repo_details_url = f"https://api.github.com/repos/{owner}/{repo_name}" response = requests.get(repo_details_url, headers=HEADERS) if response.status_code == 200: repo_details = response.json() actual_watchers = repo_details.get("subscribers_count", 0) else: actual_watchers = 0 watcher_to_stars_ratio = (actual_watchers / repo["stargazers_count"]) * 100 if repo["stargazers_count"] > 0 else 0 return { "Rising Score": round(rising_score, 2), "Legacy Score": round(legacy_score, 2), "Star Velocity (Stars/Day)": round(star_velocity, 2), "Fork-to-Star Ratio (%)": round(fork_to_star_ratio, 2), "Watchers": actual_watchers, "Watcher-to-Stars Ratio (%)": round(watcher_to_stars_ratio, 2), "Language": repo.get("language", "N/A"), "Topics": ", ".join(repo.get("topics", [])), "Hidden Gem": hidden_gem, "Hidden Gem Trend": hidden_gem_trend, "Open Issues": repo["open_issues_count"], "Created At": repo["created_at"], "Last Updated": repo["pushed_at"], "days_since_creation": round(days_since_creation, 2), "days_since_update": round(days_since_update, 2), "URL": repo["html_url"], } def gradio_interface(topics, start_date, language_filter, stars_min, stars_max, forks_min, forks_max, total_repos, sort_order): global discovered_repos if not topics.strip() and not start_date.strip(): return pd.DataFrame(), "Please provide at least a topic or a start date." topics_list = [topic.strip() for topic in topics.split(",") if topic.strip()] stars_range = (stars_min, stars_max) forks_range = (forks_min, forks_max) df = pd.DataFrame() all_repos_data = [] try: if not topics_list: query = f"stars:{stars_range[0]}..{stars_range[1]} forks:{forks_range[0]}..{forks_range[1]}" if start_date.strip(): query += f" created:>{start_date.strip()}" if language_filter: query += f" language:{language_filter}" repos = search_repositories(query=query, sort=sort_order, total_repos=total_repos) for repo in repos: repo_data = { "Name": repo["name"], "Owner": repo["owner"]["login"], "Stars": repo["stargazers_count"], "Forks": repo["forks_count"], "Description": repo.get("description", "N/A"), } repo_data.update(calculate_additional_metrics(repo)) all_repos_data.append(repo_data) else: for topic in topics_list: query = f"topic:{topic} stars:{stars_range[0]}..{stars_range[1]} forks:{forks_range[0]}..{forks_range[1]}" if start_date.strip(): query += f" created:>{start_date.strip()}" if language_filter: query += f" language:{language_filter}" repos = search_repositories(query=query, sort=sort_order, total_repos=total_repos) for repo in repos: repo_data = { "Name": repo["name"], "Owner": repo["owner"]["login"], "Stars": repo["stargazers_count"], "Forks": repo["forks_count"], "Description": repo.get("description", "N/A"), } repo_data.update(calculate_additional_metrics(repo)) all_repos_data.append(repo_data) discovered_repos.append(f"{repo['owner']['login']}/{repo['name']}") if not all_repos_data: return pd.DataFrame(), "No repositories found matching the criteria." discovered_repos = list(set(discovered_repos)) df = pd.DataFrame(all_repos_data) except Exception as e: print(f"Error: {e}") return pd.DataFrame(), f"Error fetching repositories: {str(e)}" csv_file = None if not df.empty: csv_file = "discovered_repositories.csv" df.to_csv(csv_file, index=False) return df, csv_file def fetch_org_repositories(org_names, language_filter, stars_min, stars_max, forks_min, forks_max, sort_order, total_repos): try: org_list = [org.strip() for org in org_names.split(",") if org.strip()] if not org_list: return pd.DataFrame(), "Enter at least one organization." all_repos_data = [] for org in org_list: query = f"user:{org} stars:{stars_min}..{stars_max} forks:{forks_min}..{forks_max}" if language_filter: query += f" language:{language_filter}" repos = search_repositories(query=query, sort=sort_order, total_repos=total_repos) for repo in repos: repo_data = { "Name": repo["name"], "Owner": repo["owner"]["login"], "Stars": repo["stargazers_count"], "Forks": repo["forks_count"], "Description": repo.get("description", "N/A"), } repo_data.update(calculate_additional_metrics(repo)) all_repos_data.append(repo_data) if not all_repos_data: return pd.DataFrame(), "No repositories found for the specified organizations." df = pd.DataFrame(all_repos_data) csv_file = "organization_repositories.csv" df.to_csv(csv_file, index=False) return df, csv_file except Exception as e: print(f"Error in fetch_org_repositories: {e}") return pd.DataFrame(), f"Error: {str(e)}" def get_discovered_repos(): global discovered_repos return discovered_repos def process_readme(owner, repo, branch): url = f"https://raw.githubusercontent.com/{owner}/{repo}/{branch}/README.md" response = requests.get(url, headers=HEADERS) if response.status_code == 200: readme_content = response.text else: return f"Failed to fetch README content from branch {branch}.", "", "", None MODEL = "gpt-4o-mini" completion = client.chat.completions.create( model=MODEL, messages=[ {"role": "system", "content": "You are a helpful assistant that extracts keywords, named entities, and generates summaries from text."}, {"role": "user", "content": f""" Perform the following tasks on the following README file: 1. Extract the top 25 most important keywords from the text only. 2. Extract All Major named entities (e.g., people, organizations, technologies). 3. Summarize the content in one paragraph. Return the results in the following JSON format: {{ "keywords": ["keyword1", "keyword2", ...], "entities": ["entity1", "entity2", ...], "summary": "A concise summary of the README." }} README file: {readme_content} """} ], response_format={"type": "json_object"} ) result = completion.choices[0].message.content result_json = json.loads(result) keywords = ", ".join(result_json["keywords"]) entities = ", ".join(result_json["entities"]) summary = result_json["summary"] wordcloud = WordCloud(width=800, height=400, background_color='white').generate(keywords) plt.figure(figsize=(10, 5)) plt.imshow(wordcloud, interpolation='bilinear') plt.axis('off') return keywords, entities, summary, plt def get_branches(owner, repo): url = f"https://api.github.com/repos/{owner}/{repo}/branches" response = requests.get(url, headers=HEADERS) if response.status_code == 200: branches = [branch["name"] for branch in response.json()] return branches else: return [] def get_default_branch(owner, repo): url = f"https://api.github.com/repos/{owner}/{repo}" response = requests.get(url, headers=HEADERS) if response.status_code == 200: repo_data = response.json() return repo_data["default_branch"] else: return None def fetch_files(owner, repo, path=""): url = f"https://api.github.com/repos/{owner}/{repo}/contents/{path}" if path else f"https://api.github.com/repos/{owner}/{repo}/contents" response = requests.get(url, headers=HEADERS) if response.status_code != 200: return f"Failed to fetch files: {response.status_code}", [] files = [] for item in response.json(): if item["type"] == "file": # Only add files if any(item["name"].endswith(ext) for ext in ALLOWED_EXTENSIONS): files.append({ "name": item["name"], "path": item["path"], "download_url": item["download_url"] }) elif item["type"] == "dir": sub_files = fetch_files(owner, repo, item["path"]) files.extend(sub_files) return files def fetch_file_content(owner, repo, branch, file_path): file_url = f"https://raw.githubusercontent.com/{owner}/{repo}/{branch}/{file_path}" response = requests.get(file_url) if response.status_code == 200: return response.text else: return f"Failed to fetch file content: {response.status_code}" def ask_code_question(code_content, question): if not code_content.strip(): return "No code content available to analyze." if not question.strip(): return "Please enter a question about the code." prompt = f""" Here is a Python file from a GitHub repository: {code_content} Please answer the following question about this file: - {question} """ try: response = client.chat.completions.create( model="gpt-4o-mini", messages=[ {"role": "system", "content": "You are a helpful assistant skilled in understanding code."}, {"role": "user", "content": prompt} ] ) return response.choices[0].message.content.strip() except Exception as e: return f"Error querying the LLM: {str(e)}" def upload_image_to_imgur(image_path): """ Upload an image to Imgur and return the hosted URL. Args: image_path (str): Path to the image file to upload. Returns: str: The URL of the uploaded image or an error message. """ url = "https://api.imgur.com/3/image" headers = { "Authorization": f"Client-ID {IMGUR_CLIENT_ID}" } with open(image_path, "rb") as image_file: payload = { "image": image_file, "type": "file" } try: response = requests.post(url, headers=headers, files=payload) if response.status_code == 200: data = response.json() return data["data"]["link"] # URL of the uploaded image else: return f"Failed to upload image. Status code: {response.status_code}, Response: {response.text}" except Exception as e: return f"Error uploading image to Imgur: {str(e)}" def generate_dot_code_from_code(code_content, diagram_type): if not code_content.strip(): return "No code content available to analyze." prompt = f""" Here is some Python code from a GitHub repository: {code_content} Please generate a {diagram_type} for this code in Graphviz DOT/digraph format. Ensure the DOT code is valid and renderable. Don't include any other text. Don't provide any other explanatory commentary. Ensure the DOT code includes all necessary opening and closing brackets {"brackets"} for graphs and subgraphs. """ try: response = client.chat.completions.create( model="gpt-4o", messages=[ {"role": "system", "content": "You are a helpful assistant that generates Graphviz DOT code for visualizing Python code. You are restricted to only generate Graphviz Code starting with digraph & ending with }"}, {"role": "user", "content": prompt} ] ) raw_dot_code = response.choices[0].message.content.strip() validated_dot_code = validate_and_fix_dot_code(raw_dot_code) # Fix any missing brackets pattern = r"digraph\b[\s\S]*?^\}" match = re.search(pattern, validated_dot_code,re.MULTILINE | re.DOTALL) if match: validated_dot_code = match.group(0) # Extract the matched content else: return "Failed to extract valid Graphviz code." return validated_dot_code except Exception as e: return f"Error querying GPT-4o-mini: {str(e)}" def validate_and_fix_dot_code(dot_code): open_brackets = dot_code.count("{") close_brackets = dot_code.count("}") if open_brackets > close_brackets: missing_brackets = open_brackets - close_brackets dot_code += "}" * missing_brackets return dot_code def render_dot_code(dot_code, filename=None): if not filename: filename = f"diagram_{uuid.uuid4().hex}.png" # Generate a unique filename try: # Render the DOT code to an in-memory PNG src = Source(dot_code, format="png") rendered_png = src.pipe() # In-memory PNG image data # Upload the rendered PNG to S3 s3_url = upload_image_to_s3(rendered_png, filename) # Return the S3 URL return s3_url except Exception as e: return f"Error rendering or uploading diagram: {str(e)}" import time def handle_generate_diagram(code_content, diagram_type, retries=5, wait_time=1): s3_url = render_dot_code(generate_dot_code_from_code(code_content, diagram_type)) if s3_url.startswith("http"): # Check if the response is a valid URL return f'Generated Diagram' else: return f"

Error: {s3_url}

" # Return the error message in HTML format # Gradio Interface with gr.Blocks() as demo: # Tab 1: Repository Discovery with gr.Tab("Repository Discovery"): with gr.Row(): topics_input = gr.Textbox( label="Topics (comma-separated, leave empty to fetch by date only)", placeholder="e.g., machine-learning, deep-learning (leave empty for date-based search)" ) similar_topics = gr.Textbox( label="Similar Topics (based on embeddings)", interactive=False ) gr.Button("Get Similar Topics").click( search_similar_topics, inputs=[topics_input], outputs=[similar_topics] ) with gr.Row(): start_date_input = gr.Textbox( label="Start Date (YYYY-MM-DD, leave empty if not filtering by date)", placeholder="Set to filter recent repositories by date or leave empty" ) language_filter = gr.Dropdown( choices=["", "Python", "JavaScript", "Java", "C++", "Ruby", "Go"], label="Language Filter", value="" ) stars_min = gr.Number(label="Stars Min", value=10) stars_max = gr.Number(label="Stars Max", value=1000) with gr.Row(): forks_min = gr.Number(label="Forks Min", value=0) forks_max = gr.Number(label="Forks Max", value=500) total_repos = gr.Number(label="Total Repositories", value=10, step=10) sort_order = gr.Dropdown( choices=["stars", "forks", "updated"], label="Sort Order", value="stars" ) with gr.Row(): output_data = gr.Dataframe(label="Discovered Repositories") output_file = gr.File(label="Download CSV", file_count="single") gr.Button("Discover Repositories").click( gradio_interface, inputs=[ topics_input, start_date_input, language_filter, stars_min, stars_max, forks_min, forks_max, total_repos, sort_order ], outputs=[output_data, output_file] ) # Tab 2: Organization Watch with gr.Tab("Organization Watch"): with gr.Row(): org_input = gr.Textbox( label="Organizations (comma-separated)", placeholder="e.g., facebookresearch, openai" ) with gr.Row(): language_filter = gr.Dropdown( choices=["", "Python", "JavaScript", "Java", "C++", "Ruby", "Go"], label="Language Filter", value="" ) stars_min = gr.Number(label="Stars Min", value=10) stars_max = gr.Number(label="Stars Max", value=1000) with gr.Row(): forks_min = gr.Number(label="Forks Min", value=0) forks_max = gr.Number(label="Forks Max", value=500) total_repos = gr.Number(label="Total Repositories", value=10, step=10) sort_order = gr.Dropdown( choices=["stars", "forks", "updated"], label="Sort Order", value="stars" ) with gr.Row(): output_data = gr.Dataframe(label="Repositories by Organizations") output_file = gr.File(label="Download CSV", file_count="single") gr.Button("Fetch Organization Repositories").click( fetch_org_repositories, inputs=[ org_input, language_filter, stars_min, stars_max, forks_min, forks_max, sort_order, total_repos ], outputs=[output_data, output_file] ) # Tab 3: Code Analysis with gr.Tab("Code Analysis"): with gr.Row(): repo_dropdown = gr.Dropdown( label="Select Repository", choices=[], interactive=True ) refresh_button = gr.Button("Refresh Repositories") with gr.Row(): branch_dropdown = gr.Dropdown( label="Select Branch", choices=[], interactive=True ) with gr.Row(): keywords_output = gr.Textbox(label="Keywords") entities_output = gr.Textbox(label="Entities") with gr.Row(): summary_output = gr.Textbox(label="Summary") wordcloud_output = gr.Plot(label="Word Cloud") with gr.Row(): files_list = gr.Dropdown( label="Files in Repository", choices=[], interactive=True ) with gr.Row(): file_content_box = gr.Textbox( label="File Content", lines=20, interactive=True ) with gr.Row(): question_input = gr.Textbox( label="Ask a Question", placeholder="Enter your question about the code...", lines=1 ) question_button = gr.Button("Get Answer") with gr.Row(): answer_output = gr.Textbox(label="Bot's Answer", lines=10, interactive=False) with gr.Row(): diagram_type = gr.Dropdown( label="Select Diagram Type", choices=["Call Graph", "Data Flow Diagram", "Sequence Diagram", "Class Diagram", "Component Diagram", "Workflow Diagram"], value="Call Graph" ) generate_diagram_button = gr.Button("Generate Diagram") with gr.Row(): diagram_output = gr.HTML( label="Generated Diagram", ) question_button.click( ask_code_question, inputs=[file_content_box, question_input], outputs=[answer_output] ) def generate_and_render_diagram(code_content, diagram_type): dot_code = generate_dot_code_from_code(code_content, diagram_type) if not dot_code.strip().startswith("digraph"): return "Invalid DOT code generated." unique_filename = f"diagram_{uuid.uuid4().hex}" return render_dot_code(dot_code, filename=unique_filename) generate_diagram_button.click( handle_generate_diagram, inputs=[file_content_box, diagram_type], outputs=[diagram_output] ) refresh_button.click( lambda: gr.update(choices=get_discovered_repos()), inputs=[], outputs=[repo_dropdown] ) def update_branches(repo): if repo: owner, repo_name = repo.split("/") branches = get_branches(owner, repo_name) default_branch = get_default_branch(owner, repo_name) return gr.update(choices=branches, value=default_branch) return gr.update(choices=[], value=None) repo_dropdown.change( update_branches, inputs=[repo_dropdown], outputs=[branch_dropdown] ) def analyze_readme(repo, branch): if repo and branch: owner, repo_name = repo.split("/") return process_readme(owner, repo_name, branch) return "No repository or branch selected.", "", "", None repo_dropdown.change( analyze_readme, inputs=[repo_dropdown, branch_dropdown], outputs=[keywords_output, entities_output, summary_output, wordcloud_output] ) branch_dropdown.change( analyze_readme, inputs=[repo_dropdown, branch_dropdown], outputs=[keywords_output, entities_output, summary_output, wordcloud_output] ) def update_files(repo): global files_data if repo: owner, repo_name = repo.split("/") files = fetch_files(owner, repo_name) files_data = files file_names = [f"{file['name']} ({file['path']})" for file in files] return gr.update(choices=file_names, value=None) files_data = [] return gr.update(choices=[], value=None) repo_dropdown.change( lambda repo: update_files(repo), inputs=[repo_dropdown], outputs=[files_list] ) def display_file_content(repo, branch, selected_file): if repo and branch and selected_file: owner, repo_name = repo.split("/") file_path = selected_file.split(" (")[1][:-1] content = fetch_file_content(owner, repo_name, branch, file_path) return content return "No file selected." files_list.change( display_file_content, inputs=[repo_dropdown, branch_dropdown, files_list], outputs=[file_content_box] ) #demo.launch() #demo.launch(share=True, server_name="0.0.0.0", server_port=7860, static_dirs={"images": "./images"}) demo.launch(share=True)