datasetbuilder / app.py
throaway2854's picture
Update app.py
addc6c7 verified
import requests
from bs4 import BeautifulSoup
import os
import json
import gradio as gr
from datasets import Dataset
from PIL import Image
from huggingface_hub import HfApi, HfFolder, Repository, create_repo
import io
import uuid
import time
import random
import zipfile
import csv
DATA_DIR = "/data"
IMAGES_DIR = os.path.join(DATA_DIR, "images")
USER_AGENTS = [
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.0.3 Safari/605.1.15",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:89.0) Gecko/20100101 Firefox/89.0"
]
def get_headers(cookies=None):
headers = {
"User-Agent": random.choice(USER_AGENTS),
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8",
"Accept-Language": "en-US,en;q=0.5",
"Referer": "https://www.google.com/",
"DNT": "1",
"Connection": "keep-alive",
"Upgrade-Insecure-Requests": "1"
}
if cookies:
headers["Cookie"] = cookies
return headers
def make_request(url, cookies=None):
time.sleep(random.uniform(1, 3)) # Add a random delay between requests
return requests.get(url, headers=get_headers(cookies), timeout=10)
def extract_image_url(html_content):
soup = BeautifulSoup(html_content, 'html.parser')
script = soup.find('script', type='text/javascript', string=lambda text: 'image =' in text if text else False)
if script:
try:
js_object_str = script.string.split('=', 1)[1].strip().rstrip(';')
js_object_str = js_object_str.replace("'", '"')
image_data = json.loads(js_object_str)
return f"{image_data['domain']}{image_data['base_dir']}/{image_data['dir']}/{image_data['img']}"
except json.JSONDecodeError as e:
raise Exception(f"Failed to decode JSON: {str(e)}")
img_tag = soup.find('img', alt=True)
if img_tag and 'src' in img_tag.attrs:
return img_tag['src']
return None
def extract_tags(html_content):
soup = BeautifulSoup(html_content, 'html.parser')
tag_elements = soup.find_all('li', class_='tag-type-general')
tags = [tag_element.find_all('a')[1].text for tag_element in tag_elements if len(tag_element.find_all('a')) > 1]
return ','.join(tags)
def download_image(url, cookies=None):
try:
response = make_request(url, cookies)
response.raise_for_status()
return Image.open(io.BytesIO(response.content))
except requests.RequestException as e:
raise Exception(f"Failed to download image: {str(e)}")
class DatasetBuilder:
def __init__(self, dataset_name):
self.dataset_name = dataset_name
self.dataset = self.load_dataset()
os.makedirs(IMAGES_DIR, exist_ok=True)
self.hf_token = os.getenv("HF_Token") # Access the token from the environment variable
def get_dataset_file(self):
return os.path.join(DATA_DIR, f"{self.dataset_name}.json")
def load_dataset(self):
dataset_file = self.get_dataset_file()
if os.path.exists(dataset_file):
with open(dataset_file, 'r') as f:
return json.load(f)
return []
def save_dataset(self):
dataset_file = self.get_dataset_file()
with open(dataset_file, 'w') as f:
json.dump(self.dataset, f)
def resize_images(self, min_size=512, max_size=768):
for item in self.dataset:
image_path = os.path.join(IMAGES_DIR, item['image'])
image = Image.open(image_path)
# Resize the image while maintaining the aspect ratio
image.thumbnail((max_size, max_size), resample=Image.BICUBIC)
# Save the resized image
image.save(image_path)
def resize_dataset(self):
resized_dataset_name = f"{self.dataset_name} (resized)"
resized_dataset_builder = DatasetBuilder(resized_dataset_name)
resized_dataset_builder.dataset = self.dataset
resized_dataset_builder.resize_images()
resized_dataset_builder.save_dataset()
return f"Resized dataset '{self.dataset_name}' to '{resized_dataset_name}'."
def create_downloadable_dataset(self):
if not self.dataset:
return None, "Dataset is empty. Add some images first."
try:
# Create a temporary ZIP file
zip_filename = f"{self.dataset_name}.zip"
zip_path = os.path.join(DATA_DIR, zip_filename)
with zipfile.ZipFile(zip_path, 'w') as zipf:
# Add the dataset CSV file
dataset_file = f"{self.dataset_name}.csv"
dataset_file_path = os.path.join(DATA_DIR, dataset_file)
with open(dataset_file_path, 'w', newline='') as csvfile:
writer = csv.writer(csvfile)
writer.writerow(['image', 'tags'])
for item in self.dataset:
writer.writerow([item['image'], item['tags']])
zipf.write(dataset_file_path, os.path.basename(dataset_file_path))
# Add all images
for item in self.dataset:
image_path = os.path.join(IMAGES_DIR, item['image'])
zipf.write(image_path, os.path.join("images", item['image']))
return zip_path, f"Dataset '{self.dataset_name}' ready for download."
except Exception as e:
return None, f"Error creating downloadable dataset: {str(e)}"
def add_image(self, url, cookies=None):
try:
response = make_request(url, cookies)
response.raise_for_status()
html_content = response.text
image_url = extract_image_url(html_content)
if not image_url:
raise Exception("Failed to extract image URL")
tags = extract_tags(html_content)
image = download_image(image_url, cookies)
filename = f"{uuid.uuid4()}.jpg"
filepath = os.path.join(IMAGES_DIR, filename)
image.save(filepath)
self.dataset.append({
'image': filename,
'text': tags
})
self.save_dataset()
return f"Added image with tags: {tags}"
except Exception as e:
return f"Error: {str(e)}"
def build_huggingface_dataset(self):
if not self.dataset:
return "Dataset is empty. Add some images first."
try:
hf_dataset = Dataset.from_dict({
'image': [os.path.join(IMAGES_DIR, item['image']) for item in self.dataset],
'text': [item['tags'] for item in self.dataset]
})
return "HuggingFace Dataset created successfully!"
except Exception as e:
return f"Error creating HuggingFace Dataset: {str(e)}"
def get_dataset_info(self):
return f"Current dataset size ({self.dataset_name}): {len(self.dataset)} images"
def get_dataset_preview(self, num_images=5):
preview = []
for item in self.dataset[-num_images:]:
image_path = os.path.join(IMAGES_DIR, item['image'])
preview.append((image_path, item['tags']))
return preview
def upload_to_huggingface(self, private=True):
if not self.dataset:
return "Dataset is empty. Add some images first."
if not self.hf_token:
return "Error: Hugging Face Token not found. Please make sure the token is correctly set as an environment variable."
try:
hf_api = HfApi(token=self.hf_token) # Use the token
hf_user = hf_api.whoami()["name"]
repo_id = f"{hf_user}/{self.dataset_name}"
# Create or update the repository
repo_url = create_repo(repo_id, token=self.hf_token, private=private, exist_ok=True)
# Save the dataset locally as a JSON file
dataset_file = self.get_dataset_file()
self.save_dataset()
# Initialize a local repository
repo = Repository(local_dir=DATA_DIR, clone_from=repo_id, use_auth_token=self.hf_token)
# Copy dataset files to the repository directory
repo.git_pull(lfs=True) # Pull the latest changes
os.makedirs(os.path.join(DATA_DIR, "images"), exist_ok=True)
for item in self.dataset:
src_image_path = os.path.join(IMAGES_DIR, item['image'])
dst_image_path = os.path.join(repo.local_dir, "images", item['image'])
if not os.path.exists(dst_image_path):
os.makedirs(os.path.dirname(dst_image_path), exist_ok=True)
os.system(f"cp {src_image_path} {dst_image_path}")
# Add files to the repository and push
repo.git_add(pattern=".")
repo.git_commit("Add dataset and images")
repo.git_push()
return f"Dataset '{self.dataset_name}' successfully uploaded to Hugging Face Hub as a {'private' if private else 'public'} repository."
except Exception as e:
return f"Error uploading dataset to Hugging Face: {str(e)}"
def add_image_to_dataset(url, cookies, dataset_name):
builder = DatasetBuilder(dataset_name)
result = builder.add_image(url, cookies)
return result, builder.get_dataset_info(), builder.get_dataset_preview()
def create_huggingface_dataset(dataset_name):
builder = DatasetBuilder(dataset_name)
return builder.build_huggingface_dataset()
def view_dataset(dataset_name):
builder = DatasetBuilder(dataset_name)
return builder.get_dataset_preview(num_images=60)
def upload_huggingface_dataset(dataset_name, privacy):
builder = DatasetBuilder(dataset_name)
return builder.upload_to_huggingface(private=privacy)
def download_dataset(dataset_name):
builder = DatasetBuilder(dataset_name)
zip_path, message = builder.create_downloadable_dataset()
return zip_path, message
def resize_dataset(dataset_name):
builder = DatasetBuilder(dataset_name)
return builder.resize_dataset()
def download_resized_dataset(dataset_name):
builder = DatasetBuilder(f"{dataset_name} (resized)")
zip_path, message = builder.create_downloadable_dataset()
return zip_path, message
# Create Gradio interface
with gr.Blocks(theme="huggingface") as iface:
gr.Markdown("# Image Dataset Builder")
gr.Markdown("Enter a URL to add an image and its tags to the dataset. Progress is saved automatically.")
with gr.Row():
dataset_name_input = gr.Textbox(lines=1, label="Dataset Name", placeholder="Enter dataset name...", value="default_dataset")
url_input = gr.Textbox(lines=2, label="URL", placeholder="Enter image URL here...")
cookies_input = gr.Textbox(lines=2, label="Cookies (optional)", placeholder="Enter cookies")
add_button = gr.Button("Add Image")
result_output = gr.Textbox(label="Result")
dataset_info = gr.Textbox(label="Dataset Info")
gr.Markdown("## Dataset Preview")
preview_gallery = gr.Gallery(label="Recent Additions", show_label=False, elem_id="preview_gallery", columns=5, rows=1, height="auto")
add_button.click(add_image_to_dataset, inputs=[url_input, cookies_input, dataset_name_input], outputs=[result_output, dataset_info, preview_gallery])
create_hf_button = gr.Button("Create HuggingFace Dataset")
hf_result = gr.Textbox(label="Dataset Creation Result")
create_hf_button.click(create_huggingface_dataset, inputs=[dataset_name_input], outputs=hf_result)
view_dataset_button = gr.Button("View Dataset")
dataset_gallery = gr.Gallery(label="Dataset Contents", show_label=False, elem_id="dataset_gallery", columns=5, rows=4, height="auto")
view_dataset_button.click(view_dataset, inputs=[dataset_name_input], outputs=dataset_gallery)
gr.Markdown("## Upload Dataset to Hugging Face")
privacy_radio = gr.Radio(choices=["private", "public"], value="private", label="Repository Privacy")
upload_hf_button = gr.Button("Upload to Hugging Face")
hf_upload_result = gr.Textbox(label="Upload Result")
upload_hf_button.click(upload_huggingface_dataset, inputs=[dataset_name_input, privacy_radio], outputs=hf_upload_result)
gr.Markdown("## Download Dataset")
download_button = gr.Button("Download Dataset")
download_output = gr.File(label="Download")
download_message = gr.Textbox(label="Download Status")
download_button.click(
download_dataset,
inputs=[dataset_name_input],
outputs=[download_output, download_message]
)
gr.Markdown("## Resize Dataset")
resize_button = gr.Button("Resize Dataset")
resize_result = gr.Textbox(label="Resize Result")
resize_button.click(
resize_dataset,
inputs=[dataset_name_input],
outputs=resize_result
)
gr.Markdown("## Download Resized Dataset")
download_resized_button = gr.Button("Download Resized Dataset")
download_resized_output = gr.File(label="Download Resized")
download_resized_message = gr.Textbox(label="Resized Download Status")
download_resized_button.click(
download_resized_dataset,
inputs=[dataset_name_input],
outputs=[download_resized_output, download_resized_message]
)
# Launch the interface
iface.launch()