import requests from bs4 import BeautifulSoup import os import json import gradio as gr from datasets import Dataset from PIL import Image from huggingface_hub import HfApi, HfFolder, Repository, create_repo import io import uuid import time import random import zipfile import csv DATA_DIR = "/data" IMAGES_DIR = os.path.join(DATA_DIR, "images") USER_AGENTS = [ "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.0.3 Safari/605.1.15", "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:89.0) Gecko/20100101 Firefox/89.0" ] def get_headers(cookies=None): headers = { "User-Agent": random.choice(USER_AGENTS), "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8", "Accept-Language": "en-US,en;q=0.5", "Referer": "https://www.google.com/", "DNT": "1", "Connection": "keep-alive", "Upgrade-Insecure-Requests": "1" } if cookies: headers["Cookie"] = cookies return headers def make_request(url, cookies=None): time.sleep(random.uniform(1, 3)) # Add a random delay between requests return requests.get(url, headers=get_headers(cookies), timeout=10) def extract_image_url(html_content): soup = BeautifulSoup(html_content, 'html.parser') script = soup.find('script', type='text/javascript', string=lambda text: 'image =' in text if text else False) if script: try: js_object_str = script.string.split('=', 1)[1].strip().rstrip(';') js_object_str = js_object_str.replace("'", '"') image_data = json.loads(js_object_str) return f"{image_data['domain']}{image_data['base_dir']}/{image_data['dir']}/{image_data['img']}" except json.JSONDecodeError as e: raise Exception(f"Failed to decode JSON: {str(e)}") img_tag = soup.find('img', alt=True) if img_tag and 'src' in img_tag.attrs: return img_tag['src'] return None def extract_tags(html_content): soup = BeautifulSoup(html_content, 'html.parser') tag_elements = soup.find_all('li', class_='tag-type-general') tags = [tag_element.find_all('a')[1].text for tag_element in tag_elements if len(tag_element.find_all('a')) > 1] return ','.join(tags) def download_image(url, cookies=None): try: response = make_request(url, cookies) response.raise_for_status() return Image.open(io.BytesIO(response.content)) except requests.RequestException as e: raise Exception(f"Failed to download image: {str(e)}") class DatasetBuilder: def __init__(self, dataset_name): self.dataset_name = dataset_name self.dataset = self.load_dataset() os.makedirs(IMAGES_DIR, exist_ok=True) self.hf_token = os.getenv("HF_Token") # Access the token from the environment variable def get_dataset_file(self): return os.path.join(DATA_DIR, f"{self.dataset_name}.json") def load_dataset(self): dataset_file = self.get_dataset_file() if os.path.exists(dataset_file): with open(dataset_file, 'r') as f: return json.load(f) return [] def save_dataset(self): dataset_file = self.get_dataset_file() with open(dataset_file, 'w') as f: json.dump(self.dataset, f) def resize_images(self, min_size=512, max_size=768): for item in self.dataset: image_path = os.path.join(IMAGES_DIR, item['image']) image = Image.open(image_path) # Resize the image while maintaining the aspect ratio image.thumbnail((max_size, max_size), resample=Image.BICUBIC) # Save the resized image image.save(image_path) def resize_dataset(self): resized_dataset_name = f"{self.dataset_name} (resized)" resized_dataset_builder = DatasetBuilder(resized_dataset_name) resized_dataset_builder.dataset = self.dataset resized_dataset_builder.resize_images() resized_dataset_builder.save_dataset() return f"Resized dataset '{self.dataset_name}' to '{resized_dataset_name}'." def create_downloadable_dataset(self): if not self.dataset: return None, "Dataset is empty. Add some images first." try: # Create a temporary ZIP file zip_filename = f"{self.dataset_name}.zip" zip_path = os.path.join(DATA_DIR, zip_filename) with zipfile.ZipFile(zip_path, 'w') as zipf: # Add the dataset CSV file dataset_file = f"{self.dataset_name}.csv" dataset_file_path = os.path.join(DATA_DIR, dataset_file) with open(dataset_file_path, 'w', newline='') as csvfile: writer = csv.writer(csvfile) writer.writerow(['image', 'tags']) for item in self.dataset: writer.writerow([item['image'], item['tags']]) zipf.write(dataset_file_path, os.path.basename(dataset_file_path)) # Add all images for item in self.dataset: image_path = os.path.join(IMAGES_DIR, item['image']) zipf.write(image_path, os.path.join("images", item['image'])) return zip_path, f"Dataset '{self.dataset_name}' ready for download." except Exception as e: return None, f"Error creating downloadable dataset: {str(e)}" def add_image(self, url, cookies=None): try: response = make_request(url, cookies) response.raise_for_status() html_content = response.text image_url = extract_image_url(html_content) if not image_url: raise Exception("Failed to extract image URL") tags = extract_tags(html_content) image = download_image(image_url, cookies) filename = f"{uuid.uuid4()}.jpg" filepath = os.path.join(IMAGES_DIR, filename) image.save(filepath) self.dataset.append({ 'image': filename, 'text': tags }) self.save_dataset() return f"Added image with tags: {tags}" except Exception as e: return f"Error: {str(e)}" def build_huggingface_dataset(self): if not self.dataset: return "Dataset is empty. Add some images first." try: hf_dataset = Dataset.from_dict({ 'image': [os.path.join(IMAGES_DIR, item['image']) for item in self.dataset], 'text': [item['tags'] for item in self.dataset] }) return "HuggingFace Dataset created successfully!" except Exception as e: return f"Error creating HuggingFace Dataset: {str(e)}" def get_dataset_info(self): return f"Current dataset size ({self.dataset_name}): {len(self.dataset)} images" def get_dataset_preview(self, num_images=5): preview = [] for item in self.dataset[-num_images:]: image_path = os.path.join(IMAGES_DIR, item['image']) preview.append((image_path, item['tags'])) return preview def upload_to_huggingface(self, private=True): if not self.dataset: return "Dataset is empty. Add some images first." if not self.hf_token: return "Error: Hugging Face Token not found. Please make sure the token is correctly set as an environment variable." try: hf_api = HfApi(token=self.hf_token) # Use the token hf_user = hf_api.whoami()["name"] repo_id = f"{hf_user}/{self.dataset_name}" # Create or update the repository repo_url = create_repo(repo_id, token=self.hf_token, private=private, exist_ok=True) # Save the dataset locally as a JSON file dataset_file = self.get_dataset_file() self.save_dataset() # Initialize a local repository repo = Repository(local_dir=DATA_DIR, clone_from=repo_id, use_auth_token=self.hf_token) # Copy dataset files to the repository directory repo.git_pull(lfs=True) # Pull the latest changes os.makedirs(os.path.join(DATA_DIR, "images"), exist_ok=True) for item in self.dataset: src_image_path = os.path.join(IMAGES_DIR, item['image']) dst_image_path = os.path.join(repo.local_dir, "images", item['image']) if not os.path.exists(dst_image_path): os.makedirs(os.path.dirname(dst_image_path), exist_ok=True) os.system(f"cp {src_image_path} {dst_image_path}") # Add files to the repository and push repo.git_add(pattern=".") repo.git_commit("Add dataset and images") repo.git_push() return f"Dataset '{self.dataset_name}' successfully uploaded to Hugging Face Hub as a {'private' if private else 'public'} repository." except Exception as e: return f"Error uploading dataset to Hugging Face: {str(e)}" def add_image_to_dataset(url, cookies, dataset_name): builder = DatasetBuilder(dataset_name) result = builder.add_image(url, cookies) return result, builder.get_dataset_info(), builder.get_dataset_preview() def create_huggingface_dataset(dataset_name): builder = DatasetBuilder(dataset_name) return builder.build_huggingface_dataset() def view_dataset(dataset_name): builder = DatasetBuilder(dataset_name) return builder.get_dataset_preview(num_images=60) def upload_huggingface_dataset(dataset_name, privacy): builder = DatasetBuilder(dataset_name) return builder.upload_to_huggingface(private=privacy) def download_dataset(dataset_name): builder = DatasetBuilder(dataset_name) zip_path, message = builder.create_downloadable_dataset() return zip_path, message def resize_dataset(dataset_name): builder = DatasetBuilder(dataset_name) return builder.resize_dataset() def download_resized_dataset(dataset_name): builder = DatasetBuilder(f"{dataset_name} (resized)") zip_path, message = builder.create_downloadable_dataset() return zip_path, message # Create Gradio interface with gr.Blocks(theme="huggingface") as iface: gr.Markdown("# Image Dataset Builder") gr.Markdown("Enter a URL to add an image and its tags to the dataset. Progress is saved automatically.") with gr.Row(): dataset_name_input = gr.Textbox(lines=1, label="Dataset Name", placeholder="Enter dataset name...", value="default_dataset") url_input = gr.Textbox(lines=2, label="URL", placeholder="Enter image URL here...") cookies_input = gr.Textbox(lines=2, label="Cookies (optional)", placeholder="Enter cookies") add_button = gr.Button("Add Image") result_output = gr.Textbox(label="Result") dataset_info = gr.Textbox(label="Dataset Info") gr.Markdown("## Dataset Preview") preview_gallery = gr.Gallery(label="Recent Additions", show_label=False, elem_id="preview_gallery", columns=5, rows=1, height="auto") add_button.click(add_image_to_dataset, inputs=[url_input, cookies_input, dataset_name_input], outputs=[result_output, dataset_info, preview_gallery]) create_hf_button = gr.Button("Create HuggingFace Dataset") hf_result = gr.Textbox(label="Dataset Creation Result") create_hf_button.click(create_huggingface_dataset, inputs=[dataset_name_input], outputs=hf_result) view_dataset_button = gr.Button("View Dataset") dataset_gallery = gr.Gallery(label="Dataset Contents", show_label=False, elem_id="dataset_gallery", columns=5, rows=4, height="auto") view_dataset_button.click(view_dataset, inputs=[dataset_name_input], outputs=dataset_gallery) gr.Markdown("## Upload Dataset to Hugging Face") privacy_radio = gr.Radio(choices=["private", "public"], value="private", label="Repository Privacy") upload_hf_button = gr.Button("Upload to Hugging Face") hf_upload_result = gr.Textbox(label="Upload Result") upload_hf_button.click(upload_huggingface_dataset, inputs=[dataset_name_input, privacy_radio], outputs=hf_upload_result) gr.Markdown("## Download Dataset") download_button = gr.Button("Download Dataset") download_output = gr.File(label="Download") download_message = gr.Textbox(label="Download Status") download_button.click( download_dataset, inputs=[dataset_name_input], outputs=[download_output, download_message] ) gr.Markdown("## Resize Dataset") resize_button = gr.Button("Resize Dataset") resize_result = gr.Textbox(label="Resize Result") resize_button.click( resize_dataset, inputs=[dataset_name_input], outputs=resize_result ) gr.Markdown("## Download Resized Dataset") download_resized_button = gr.Button("Download Resized Dataset") download_resized_output = gr.File(label="Download Resized") download_resized_message = gr.Textbox(label="Resized Download Status") download_resized_button.click( download_resized_dataset, inputs=[dataset_name_input], outputs=[download_resized_output, download_resized_message] ) # Launch the interface iface.launch()