|
import os |
|
import random |
|
import shutil |
|
import tempfile |
|
import zipfile |
|
from concurrent.futures import ThreadPoolExecutor, as_completed |
|
from datetime import datetime |
|
|
|
import fitz |
|
import gradio as gr |
|
from huggingface_hub import DatasetCard, DatasetCardData, HfApi |
|
|
|
from dataset_card_template import DATASET_CARD_TEMPLATE |
|
|
|
os.environ["HF_HUB_ENABLE_HF_TRANSFER"] = "1" |
|
|
|
|
|
def process_pdf(pdf_file, sample_size, temp_dir): |
|
try: |
|
pdf_path = pdf_file.name |
|
doc = fitz.open(pdf_path) |
|
total_pages = len(doc) |
|
|
|
pages_to_convert = ( |
|
total_pages if sample_size == 0 else min(sample_size, total_pages) |
|
) |
|
selected_pages = ( |
|
sorted(random.sample(range(total_pages), pages_to_convert)) |
|
if sample_size > 0 and sample_size < total_pages |
|
else range(total_pages) |
|
) |
|
|
|
images = [] |
|
for page_num in selected_pages: |
|
page = doc[page_num] |
|
pix = page.get_pixmap() |
|
image_path = os.path.join( |
|
temp_dir, f"{os.path.basename(pdf_path)}_page_{page_num+1}.png" |
|
) |
|
pix.save(image_path) |
|
images.append(image_path) |
|
|
|
doc.close() |
|
return images, None |
|
except Exception as e: |
|
return [], f"Error processing {pdf_file.name}: {str(e)}" |
|
|
|
|
|
def pdf_to_images(pdf_files, sample_size, temp_dir, progress=gr.Progress()): |
|
if not os.path.exists(temp_dir): |
|
os.makedirs(temp_dir) |
|
|
|
progress(0, desc="Starting conversion") |
|
all_images = [] |
|
skipped_pdfs = [] |
|
|
|
with ThreadPoolExecutor(max_workers=os.cpu_count()) as executor: |
|
future_to_pdf = { |
|
executor.submit(process_pdf, pdf_file, sample_size, temp_dir): pdf_file |
|
for pdf_file in pdf_files |
|
} |
|
|
|
for future in progress.tqdm( |
|
as_completed(future_to_pdf), total=len(pdf_files), desc="Converting PDFs" |
|
): |
|
pdf_file = future_to_pdf[future] |
|
images, error = future.result() |
|
if error: |
|
skipped_pdfs.append(pdf_file.name) |
|
gr.Info(error) |
|
else: |
|
all_images.extend(images) |
|
|
|
message = f"Saved {len(all_images)} images to temporary directory" |
|
if skipped_pdfs: |
|
message += f"\nSkipped {len(skipped_pdfs)} PDFs due to errors: {', '.join(skipped_pdfs)}" |
|
return all_images, message |
|
|
|
|
|
def get_size_category(num_images): |
|
if num_images < 1000: |
|
return "n<1K" |
|
elif num_images < 10000: |
|
return "1K<n<10K" |
|
elif num_images < 100000: |
|
return "10K<n<100K" |
|
elif num_images < 1000000: |
|
return "100K<n<1M" |
|
else: |
|
return "n>1M" |
|
|
|
|
|
def process_pdfs( |
|
pdf_files, |
|
sample_size, |
|
hf_repo, |
|
create_zip, |
|
private_repo, |
|
oauth_token: gr.OAuthToken | None, |
|
progress=gr.Progress(), |
|
): |
|
if not pdf_files: |
|
return ( |
|
None, |
|
None, |
|
gr.Markdown( |
|
"⚠️ No PDF files uploaded. Please upload at least one PDF file." |
|
), |
|
) |
|
|
|
if oauth_token is None: |
|
return ( |
|
None, |
|
None, |
|
gr.Markdown( |
|
"⚠️ Not logged in to Hugging Face. Please log in to upload to a Hugging Face dataset." |
|
), |
|
) |
|
|
|
try: |
|
temp_dir = tempfile.mkdtemp() |
|
images_dir = os.path.join(temp_dir, "images") |
|
os.makedirs(images_dir) |
|
|
|
progress(0, desc="Starting PDF processing") |
|
images, message = pdf_to_images(pdf_files, sample_size, images_dir) |
|
|
|
zip_path = None |
|
if create_zip: |
|
|
|
zip_path = os.path.join(temp_dir, "converted_images.zip") |
|
with zipfile.ZipFile(zip_path, "w") as zipf: |
|
progress(0, desc="Zipping images") |
|
for image in progress.tqdm(images, desc="Zipping images"): |
|
zipf.write(image, os.path.basename(image)) |
|
message += f"\nCreated zip file with {len(images)} images" |
|
|
|
if hf_repo: |
|
try: |
|
hf_api = HfApi(token=oauth_token.token) |
|
hf_api.create_repo( |
|
hf_repo, |
|
repo_type="dataset", |
|
private=private_repo, |
|
) |
|
hf_api.upload_large_folder( |
|
folder_path=temp_dir, |
|
repo_id=hf_repo, |
|
repo_type="dataset", |
|
|
|
) |
|
|
|
|
|
size_category = get_size_category(len(images)) |
|
|
|
|
|
card_data = DatasetCardData( |
|
tags=["created-with-pdfs-to-page-images-converter", "pdf-to-image"], |
|
size_categories=[size_category], |
|
) |
|
|
|
|
|
card = DatasetCard.from_template( |
|
card_data, |
|
template_path=None, |
|
hf_repo=hf_repo, |
|
num_images=len(images), |
|
num_pdfs=len(pdf_files), |
|
sample_size=sample_size if sample_size > 0 else "All pages", |
|
creation_date=datetime.now().strftime("%Y-%m-%d %H:%M:%S"), |
|
) |
|
|
|
|
|
card.text = DATASET_CARD_TEMPLATE.format( |
|
hf_repo=hf_repo, |
|
num_images=len(images), |
|
num_pdfs=len(pdf_files), |
|
sample_size=sample_size if sample_size > 0 else "All pages", |
|
creation_date=datetime.now().strftime("%Y-%m-%d %H:%M:%S"), |
|
size_category=size_category, |
|
) |
|
|
|
repo_url = f"https://huggingface.co/datasets/{hf_repo}" |
|
message += f"\nUploaded dataset card to Hugging Face repo: [{hf_repo}]({repo_url})" |
|
|
|
card.push_to_hub(hf_repo, token=oauth_token.token) |
|
except Exception as e: |
|
message += f"\nFailed to upload to Hugging Face: {str(e)}" |
|
|
|
return images, zip_path, message |
|
except Exception as e: |
|
if "temp_dir" in locals(): |
|
shutil.rmtree(temp_dir) |
|
return None, None, f"An error occurred: {str(e)}" |
|
|
|
|
|
|
|
with gr.Blocks() as demo: |
|
gr.HTML( |
|
"""<h1 style='text-align: center;'> PDFs to Page Images Converter</h1> |
|
<center><i> 📁 Convert PDFs to an image dataset, splitting pages into individual images 📁 </i></center>""" |
|
) |
|
gr.Markdown( |
|
""" |
|
This app allows you to: |
|
1. Upload one or more PDF files |
|
2. Convert each page of the PDFs into separate image files |
|
3. (Optionally) sample a specific number of pages from each PDF |
|
4. (Optionally) Create a downloadable ZIP file of the converted images |
|
5. (Optionally) Upload the images to a Hugging Face dataset repository |
|
""" |
|
) |
|
|
|
with gr.Row(): |
|
gr.LoginButton(size="sm") |
|
|
|
with gr.Row(): |
|
pdf_files = gr.File( |
|
file_count="multiple", label="Upload PDF(s)", file_types=["*.pdf"] |
|
) |
|
with gr.Row(): |
|
sample_size = gr.Number( |
|
value=None, |
|
label="Pages per PDF (0 for all pages)", |
|
info="Specify how many pages to convert from each PDF. Use 0 to convert all pages.", |
|
) |
|
hf_repo = gr.Textbox( |
|
label="Hugging Face Repo", |
|
placeholder="username/repo-name", |
|
info="Enter the Hugging Face repository name in the format 'username/repo-name'", |
|
) |
|
with gr.Row(): |
|
create_zip = gr.Checkbox(label="Create ZIP file of images?", value=False) |
|
private_repo = gr.Checkbox(label="Make repository private?", value=False) |
|
with gr.Accordion("View converted images", open=False): |
|
output_gallery = gr.Gallery(label="Converted Images") |
|
status_text = gr.Markdown(label="Status") |
|
download_button = gr.File(label="Download Converted Images") |
|
|
|
submit_button = gr.Button("Convert PDFs to page images") |
|
submit_button.click( |
|
process_pdfs, |
|
inputs=[pdf_files, sample_size, hf_repo, create_zip, private_repo], |
|
outputs=[output_gallery, download_button, status_text], |
|
) |
|
|
|
|
|
demo.launch(debug=True) |
|
|