Spaces:
Paused
Paused
import requests | |
from bs4 import BeautifulSoup | |
import os | |
import json | |
import gradio as gr | |
from datasets import Dataset | |
from PIL import Image | |
from huggingface_hub import HfApi, HfFolder, Repository, create_repo | |
import io | |
import uuid | |
import time | |
import random | |
import zipfile | |
import csv | |
DATA_DIR = "/data" | |
IMAGES_DIR = os.path.join(DATA_DIR, "images") | |
USER_AGENTS = [ | |
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36", | |
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.0.3 Safari/605.1.15", | |
"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:89.0) Gecko/20100101 Firefox/89.0" | |
] | |
def get_headers(cookies=None): | |
headers = { | |
"User-Agent": random.choice(USER_AGENTS), | |
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8", | |
"Accept-Language": "en-US,en;q=0.5", | |
"Referer": "https://www.google.com/", | |
"DNT": "1", | |
"Connection": "keep-alive", | |
"Upgrade-Insecure-Requests": "1" | |
} | |
if cookies: | |
headers["Cookie"] = cookies | |
return headers | |
def make_request(url, cookies=None): | |
time.sleep(random.uniform(1, 3)) # Add a random delay between requests | |
return requests.get(url, headers=get_headers(cookies), timeout=10) | |
def extract_image_url(html_content): | |
soup = BeautifulSoup(html_content, 'html.parser') | |
script = soup.find('script', type='text/javascript', string=lambda text: 'image =' in text if text else False) | |
if script: | |
try: | |
js_object_str = script.string.split('=', 1)[1].strip().rstrip(';') | |
js_object_str = js_object_str.replace("'", '"') | |
image_data = json.loads(js_object_str) | |
return f"{image_data['domain']}{image_data['base_dir']}/{image_data['dir']}/{image_data['img']}" | |
except json.JSONDecodeError as e: | |
raise Exception(f"Failed to decode JSON: {str(e)}") | |
img_tag = soup.find('img', alt=True) | |
if img_tag and 'src' in img_tag.attrs: | |
return img_tag['src'] | |
return None | |
def extract_tags(html_content): | |
soup = BeautifulSoup(html_content, 'html.parser') | |
tag_elements = soup.find_all('li', class_='tag-type-general') | |
tags = [tag_element.find_all('a')[1].text for tag_element in tag_elements if len(tag_element.find_all('a')) > 1] | |
return ','.join(tags) | |
def download_image(url, cookies=None): | |
try: | |
response = make_request(url, cookies) | |
response.raise_for_status() | |
return Image.open(io.BytesIO(response.content)) | |
except requests.RequestException as e: | |
raise Exception(f"Failed to download image: {str(e)}") | |
class DatasetBuilder: | |
def __init__(self, dataset_name): | |
self.dataset_name = dataset_name | |
self.dataset = self.load_dataset() | |
os.makedirs(IMAGES_DIR, exist_ok=True) | |
self.hf_token = os.getenv("HF_Token") # Access the token from the environment variable | |
def get_dataset_file(self): | |
return os.path.join(DATA_DIR, f"{self.dataset_name}.json") | |
def load_dataset(self): | |
dataset_file = self.get_dataset_file() | |
if os.path.exists(dataset_file): | |
with open(dataset_file, 'r') as f: | |
return json.load(f) | |
return [] | |
def save_dataset(self): | |
dataset_file = self.get_dataset_file() | |
with open(dataset_file, 'w') as f: | |
json.dump(self.dataset, f) | |
def resize_images(self, min_size=512, max_size=768): | |
for item in self.dataset: | |
image_path = os.path.join(IMAGES_DIR, item['image']) | |
image = Image.open(image_path) | |
# Resize the image while maintaining the aspect ratio | |
image.thumbnail((max_size, max_size), resample=Image.BICUBIC) | |
# Save the resized image | |
image.save(image_path) | |
def resize_dataset(self): | |
resized_dataset_name = f"{self.dataset_name} (resized)" | |
resized_dataset_builder = DatasetBuilder(resized_dataset_name) | |
resized_dataset_builder.dataset = self.dataset | |
resized_dataset_builder.resize_images() | |
resized_dataset_builder.save_dataset() | |
return f"Resized dataset '{self.dataset_name}' to '{resized_dataset_name}'." | |
def create_downloadable_dataset(self): | |
if not self.dataset: | |
return None, "Dataset is empty. Add some images first." | |
try: | |
# Create a temporary ZIP file | |
zip_filename = f"{self.dataset_name}.zip" | |
zip_path = os.path.join(DATA_DIR, zip_filename) | |
with zipfile.ZipFile(zip_path, 'w') as zipf: | |
# Add the dataset CSV file | |
dataset_file = f"{self.dataset_name}.csv" | |
dataset_file_path = os.path.join(DATA_DIR, dataset_file) | |
with open(dataset_file_path, 'w', newline='') as csvfile: | |
writer = csv.writer(csvfile) | |
writer.writerow(['image', 'tags']) | |
for item in self.dataset: | |
writer.writerow([item['image'], item['tags']]) | |
zipf.write(dataset_file_path, os.path.basename(dataset_file_path)) | |
# Add all images | |
for item in self.dataset: | |
image_path = os.path.join(IMAGES_DIR, item['image']) | |
zipf.write(image_path, os.path.join("images", item['image'])) | |
return zip_path, f"Dataset '{self.dataset_name}' ready for download." | |
except Exception as e: | |
return None, f"Error creating downloadable dataset: {str(e)}" | |
def add_image(self, url, cookies=None): | |
try: | |
response = make_request(url, cookies) | |
response.raise_for_status() | |
html_content = response.text | |
image_url = extract_image_url(html_content) | |
if not image_url: | |
raise Exception("Failed to extract image URL") | |
tags = extract_tags(html_content) | |
image = download_image(image_url, cookies) | |
filename = f"{uuid.uuid4()}.jpg" | |
filepath = os.path.join(IMAGES_DIR, filename) | |
image.save(filepath) | |
self.dataset.append({ | |
'image': filename, | |
'text': tags | |
}) | |
self.save_dataset() | |
return f"Added image with tags: {tags}" | |
except Exception as e: | |
return f"Error: {str(e)}" | |
def build_huggingface_dataset(self): | |
if not self.dataset: | |
return "Dataset is empty. Add some images first." | |
try: | |
hf_dataset = Dataset.from_dict({ | |
'image': [os.path.join(IMAGES_DIR, item['image']) for item in self.dataset], | |
'text': [item['tags'] for item in self.dataset] | |
}) | |
return "HuggingFace Dataset created successfully!" | |
except Exception as e: | |
return f"Error creating HuggingFace Dataset: {str(e)}" | |
def get_dataset_info(self): | |
return f"Current dataset size ({self.dataset_name}): {len(self.dataset)} images" | |
def get_dataset_preview(self, num_images=5): | |
preview = [] | |
for item in self.dataset[-num_images:]: | |
image_path = os.path.join(IMAGES_DIR, item['image']) | |
preview.append((image_path, item['tags'])) | |
return preview | |
def upload_to_huggingface(self, private=True): | |
if not self.dataset: | |
return "Dataset is empty. Add some images first." | |
if not self.hf_token: | |
return "Error: Hugging Face Token not found. Please make sure the token is correctly set as an environment variable." | |
try: | |
hf_api = HfApi(token=self.hf_token) # Use the token | |
hf_user = hf_api.whoami()["name"] | |
repo_id = f"{hf_user}/{self.dataset_name}" | |
# Create or update the repository | |
repo_url = create_repo(repo_id, token=self.hf_token, private=private, exist_ok=True) | |
# Save the dataset locally as a JSON file | |
dataset_file = self.get_dataset_file() | |
self.save_dataset() | |
# Initialize a local repository | |
repo = Repository(local_dir=DATA_DIR, clone_from=repo_id, use_auth_token=self.hf_token) | |
# Copy dataset files to the repository directory | |
repo.git_pull(lfs=True) # Pull the latest changes | |
os.makedirs(os.path.join(DATA_DIR, "images"), exist_ok=True) | |
for item in self.dataset: | |
src_image_path = os.path.join(IMAGES_DIR, item['image']) | |
dst_image_path = os.path.join(repo.local_dir, "images", item['image']) | |
if not os.path.exists(dst_image_path): | |
os.makedirs(os.path.dirname(dst_image_path), exist_ok=True) | |
os.system(f"cp {src_image_path} {dst_image_path}") | |
# Add files to the repository and push | |
repo.git_add(pattern=".") | |
repo.git_commit("Add dataset and images") | |
repo.git_push() | |
return f"Dataset '{self.dataset_name}' successfully uploaded to Hugging Face Hub as a {'private' if private else 'public'} repository." | |
except Exception as e: | |
return f"Error uploading dataset to Hugging Face: {str(e)}" | |
def add_image_to_dataset(url, cookies, dataset_name): | |
builder = DatasetBuilder(dataset_name) | |
result = builder.add_image(url, cookies) | |
return result, builder.get_dataset_info(), builder.get_dataset_preview() | |
def create_huggingface_dataset(dataset_name): | |
builder = DatasetBuilder(dataset_name) | |
return builder.build_huggingface_dataset() | |
def view_dataset(dataset_name): | |
builder = DatasetBuilder(dataset_name) | |
return builder.get_dataset_preview(num_images=60) | |
def upload_huggingface_dataset(dataset_name, privacy): | |
builder = DatasetBuilder(dataset_name) | |
return builder.upload_to_huggingface(private=privacy) | |
def download_dataset(dataset_name): | |
builder = DatasetBuilder(dataset_name) | |
zip_path, message = builder.create_downloadable_dataset() | |
return zip_path, message | |
def resize_dataset(dataset_name): | |
builder = DatasetBuilder(dataset_name) | |
return builder.resize_dataset() | |
def download_resized_dataset(dataset_name): | |
builder = DatasetBuilder(f"{dataset_name} (resized)") | |
zip_path, message = builder.create_downloadable_dataset() | |
return zip_path, message | |
# Create Gradio interface | |
with gr.Blocks(theme="huggingface") as iface: | |
gr.Markdown("# Image Dataset Builder") | |
gr.Markdown("Enter a URL to add an image and its tags to the dataset. Progress is saved automatically.") | |
with gr.Row(): | |
dataset_name_input = gr.Textbox(lines=1, label="Dataset Name", placeholder="Enter dataset name...", value="default_dataset") | |
url_input = gr.Textbox(lines=2, label="URL", placeholder="Enter image URL here...") | |
cookies_input = gr.Textbox(lines=2, label="Cookies (optional)", placeholder="Enter cookies") | |
add_button = gr.Button("Add Image") | |
result_output = gr.Textbox(label="Result") | |
dataset_info = gr.Textbox(label="Dataset Info") | |
gr.Markdown("## Dataset Preview") | |
preview_gallery = gr.Gallery(label="Recent Additions", show_label=False, elem_id="preview_gallery", columns=5, rows=1, height="auto") | |
add_button.click(add_image_to_dataset, inputs=[url_input, cookies_input, dataset_name_input], outputs=[result_output, dataset_info, preview_gallery]) | |
create_hf_button = gr.Button("Create HuggingFace Dataset") | |
hf_result = gr.Textbox(label="Dataset Creation Result") | |
create_hf_button.click(create_huggingface_dataset, inputs=[dataset_name_input], outputs=hf_result) | |
view_dataset_button = gr.Button("View Dataset") | |
dataset_gallery = gr.Gallery(label="Dataset Contents", show_label=False, elem_id="dataset_gallery", columns=5, rows=4, height="auto") | |
view_dataset_button.click(view_dataset, inputs=[dataset_name_input], outputs=dataset_gallery) | |
gr.Markdown("## Upload Dataset to Hugging Face") | |
privacy_radio = gr.Radio(choices=["private", "public"], value="private", label="Repository Privacy") | |
upload_hf_button = gr.Button("Upload to Hugging Face") | |
hf_upload_result = gr.Textbox(label="Upload Result") | |
upload_hf_button.click(upload_huggingface_dataset, inputs=[dataset_name_input, privacy_radio], outputs=hf_upload_result) | |
gr.Markdown("## Download Dataset") | |
download_button = gr.Button("Download Dataset") | |
download_output = gr.File(label="Download") | |
download_message = gr.Textbox(label="Download Status") | |
download_button.click( | |
download_dataset, | |
inputs=[dataset_name_input], | |
outputs=[download_output, download_message] | |
) | |
gr.Markdown("## Resize Dataset") | |
resize_button = gr.Button("Resize Dataset") | |
resize_result = gr.Textbox(label="Resize Result") | |
resize_button.click( | |
resize_dataset, | |
inputs=[dataset_name_input], | |
outputs=resize_result | |
) | |
gr.Markdown("## Download Resized Dataset") | |
download_resized_button = gr.Button("Download Resized Dataset") | |
download_resized_output = gr.File(label="Download Resized") | |
download_resized_message = gr.Textbox(label="Resized Download Status") | |
download_resized_button.click( | |
download_resized_dataset, | |
inputs=[dataset_name_input], | |
outputs=[download_resized_output, download_resized_message] | |
) | |
# Launch the interface | |
iface.launch() |