hf_extractor / app.py
dwb2023's picture
Update app.py
3f2007b verified
raw
history blame
7.02 kB
import os
import subprocess
import gradio as gr
from tqdm import tqdm
import chardet
import logging
import tempfile
import concurrent.futures
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Configurable supported file types and size limit
SUPPORTED_FILE_TYPES = ["txt", "python", "markdown", "yaml", "json", "csv", "tsv", "xml", "html"]
MAX_FILE_SIZE = 32 * 1024 # 32 KB
def validate_url(url):
return url.startswith('https://')
def clone_repo(url, repo_dir, hf_token, hf_user):
env = os.environ.copy()
env['GIT_LFS_SKIP_SMUDGE'] = '1'
token_url = url.replace('https://', f'https://{hf_user}:{hf_token}@')
result = subprocess.run(["git", "clone", token_url, repo_dir], env=env, capture_output=True, text=True)
if result.returncode != 0:
return False, result.stderr
return True, None
def get_file_summary(file_path, file_type):
size = os.path.getsize(file_path)
return {
"name": os.path.relpath(file_path),
"type": file_type,
"size": size,
}
def read_file_content(file_path):
with open(file_path, "rb") as file:
file_bytes = file.read()
encoding = chardet.detect(file_bytes)["encoding"]
try:
content = file_bytes.decode(encoding)
return content
except (UnicodeDecodeError, TypeError):
return None
def validate_file_types(directory, supported_file_types):
from magika import Magika
m = Magika()
file_types = {}
for root, _, files in os.walk(directory):
if any(dir_name in root for dir_name in ['.git', '__pycache__']):
continue
for file_name in files:
file_path = os.path.join(root, file_name)
try:
with open(file_path, 'rb') as file:
file_bytes = file.read()
result = m.identify_bytes(file_bytes)
file_type = result.output.ct_label
if file_type not in supported_file_types:
file_type = "Unsupported"
file_types[file_path] = file_type
except Exception as e:
file_types[file_path] = f"Error: {str(e)}"
return file_types
def process_file(file_path, file_type, max_file_size):
file_summary = get_file_summary(file_path, file_type)
content = {"header": file_summary}
if file_type != "Unsupported" and file_summary["size"] <= max_file_size:
try:
file_content = read_file_content(file_path)
if file_content is not None:
content["content"] = file_content
else:
content["content"] = "Failed to read file content: Unsupported encoding or binary file."
except Exception as e:
content["content"] = f"Failed to read file content: {str(e)}"
else:
content["content"] = f"Skipped: {'File size exceeds limit.' if file_summary['size'] > max_file_size else 'Unsupported file type.'}"
return content
def extract_repo_content(url, hf_token, hf_user, supported_file_types, max_file_size):
if not validate_url(url):
return [{"header": {"name": "Error", "type": "error", "size": 0}, "content": "Invalid URL"}]
repo_dir = tempfile.mkdtemp(prefix="temp_repo_")
success, error = clone_repo(url, repo_dir, hf_token, hf_user)
if not success:
return [{"header": {"name": "Error", "type": "error", "size": 0}, "content": f"Failed to clone repository: {error}"}]
file_types = validate_file_types(repo_dir, supported_file_types)
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = []
for file_path, file_type in file_types.items():
future = executor.submit(process_file, file_path, file_type, max_file_size)
futures.append(future)
extracted_content = []
with tqdm(total=len(futures), desc="Processing files") as progress_bar:
for future in concurrent.futures.as_completed(futures):
content = future.result()
extracted_content.append(content)
progress_bar.update(1)
# Cleanup temporary directory
subprocess.run(["rm", "-rf", repo_dir])
return extracted_content
def format_output(extracted_content, repo_url):
formatted_output = f"# Repository URL: {repo_url}\n\n"
for file_data in extracted_content:
if isinstance(file_data, dict) and 'header' in file_data:
formatted_output += f"### File: {file_data['header']['name']}\n"
formatted_output += f"**Type:** {file_data['header']['type']}\n"
formatted_output += f"**Size:** {file_data['header']['size']} bytes\n"
formatted_output += "#### Content:\n"
formatted_output += f"```\n{file_data['content']}\n```\n\n"
else:
formatted_output += "Error in file data format.\n"
return formatted_output
def extract_and_display(url, supported_file_types, max_file_size):
hf_token = os.getenv("HF_TOKEN")
hf_user = os.getenv("SPACE_AUTHOR_NAME")
if not hf_token:
raise ValueError("HF_TOKEN environment variable is not set")
if not hf_user:
raise ValueError("SPACE_AUTHOR_NAME environment variable is not set")
extracted_content = extract_repo_content(url, hf_token, hf_user, supported_file_types, max_file_size)
formatted_output = format_output(extracted_content, url)
return formatted_output
app = gr.Blocks()
with app:
gr.Markdown("# Hugging Face Space / Model Repository Content Extractor")
url_input = gr.Textbox(label="https:// URL of Repository", placeholder="Enter the repository URL here OR select an example below...")
url_examples = gr.Examples(
examples=[
["https://huggingface.co/spaces/big-vision/paligemma-hf"],
["https://huggingface.co/google/paligemma-3b-mix-224"],
["https://huggingface.co/microsoft/Phi-3-vision-128k-instruct"],
["https://huggingface.co/llava-hf/llava-v1.6-mistral-7b-hf"]
],
inputs=url_input
)
supported_file_types = gr.CheckboxGroup(SUPPORTED_FILE_TYPES, label="Supported File Types", info="Select the file types to include in the extraction.")
max_file_size = gr.Slider(1, 1024, value=32, step=1, label="Max File Size (KB)", info="Files larger than this size will be skipped.")
output_display = gr.Textbox(label="Extracted Repository Content", show_copy_button=True, lines=20, placeholder="Repository content will be extracted here...\n\nMetadata is captured for all files, but text content provided only for files less than the specified size limit.\n\n\n\nReview and search through the content here OR simply copy it for offline analysis!!. 🤖")
extract_button = gr.Button("Extract Content")
extract_button.click(fn=extract_and_display, inputs=[url_input, supported_file_types, max_file_size], outputs=output_display)
app.launch()