import os import random import shutil import tempfile import zipfile from concurrent.futures import ThreadPoolExecutor, as_completed from datetime import datetime import fitz # PyMuPDF import gradio as gr from huggingface_hub import DatasetCard, DatasetCardData, HfApi from dataset_card_template import DATASET_CARD_TEMPLATE os.environ["HF_HUB_ENABLE_HF_TRANSFER"] = "1" def process_pdf(pdf_file, sample_size, temp_dir): try: pdf_path = pdf_file.name doc = fitz.open(pdf_path) total_pages = len(doc) pages_to_convert = ( total_pages if sample_size == 0 else min(sample_size, total_pages) ) selected_pages = ( sorted(random.sample(range(total_pages), pages_to_convert)) if sample_size > 0 and sample_size < total_pages else range(total_pages) ) images = [] for page_num in selected_pages: page = doc[page_num] pix = page.get_pixmap() image_path = os.path.join( temp_dir, f"{os.path.basename(pdf_path)}_page_{page_num+1}.png" ) pix.save(image_path) images.append(image_path) doc.close() return images, None except Exception as e: return [], f"Error processing {pdf_file.name}: {str(e)}" def pdf_to_images(pdf_files, sample_size, temp_dir, progress=gr.Progress()): if not os.path.exists(temp_dir): os.makedirs(temp_dir) progress(0, desc="Starting conversion") all_images = [] skipped_pdfs = [] with ThreadPoolExecutor(max_workers=os.cpu_count()) as executor: future_to_pdf = { executor.submit(process_pdf, pdf_file, sample_size, temp_dir): pdf_file for pdf_file in pdf_files } for future in progress.tqdm( as_completed(future_to_pdf), total=len(pdf_files), desc="Converting PDFs" ): pdf_file = future_to_pdf[future] images, error = future.result() if error: skipped_pdfs.append(pdf_file.name) gr.Info(error) else: all_images.extend(images) message = f"Saved {len(all_images)} images to temporary directory" if skipped_pdfs: message += f"\nSkipped {len(skipped_pdfs)} PDFs due to errors: {', '.join(skipped_pdfs)}" return all_images, message def get_size_category(num_images): if num_images < 1000: return "n<1K" elif num_images < 10000: return "1K1M" def process_pdfs( pdf_files, sample_size, hf_repo, create_zip, private_repo, oauth_token: gr.OAuthToken | None, progress=gr.Progress(), ): if not pdf_files: return ( None, None, gr.Markdown( "⚠️ No PDF files uploaded. Please upload at least one PDF file." ), ) if oauth_token is None: return ( None, None, gr.Markdown( "⚠️ Not logged in to Hugging Face. Please log in to upload to a Hugging Face dataset." ), ) try: temp_dir = tempfile.mkdtemp() images_dir = os.path.join(temp_dir, "images") os.makedirs(images_dir) progress(0, desc="Starting PDF processing") images, message = pdf_to_images(pdf_files, sample_size, images_dir) zip_path = None if create_zip: # Create a zip file of the images zip_path = os.path.join(temp_dir, "converted_images.zip") with zipfile.ZipFile(zip_path, "w") as zipf: progress(0, desc="Zipping images") for image in progress.tqdm(images, desc="Zipping images"): zipf.write(image, os.path.basename(image)) message += f"\nCreated zip file with {len(images)} images" if hf_repo: try: hf_api = HfApi(token=oauth_token.token) hf_api.create_repo( hf_repo, repo_type="dataset", private=private_repo, ) hf_api.upload_large_folder( folder_path=temp_dir, repo_id=hf_repo, repo_type="dataset", # path_in_repo="images", ) # Determine size category size_category = get_size_category(len(images)) # Create DatasetCardData instance card_data = DatasetCardData( tags=["created-with-pdfs-to-page-images-converter", "pdf-to-image"], size_categories=[size_category], ) # Create and populate the dataset card card = DatasetCard.from_template( card_data, template_path=None, # Use default template hf_repo=hf_repo, num_images=len(images), num_pdfs=len(pdf_files), sample_size=sample_size if sample_size > 0 else "All pages", creation_date=datetime.now().strftime("%Y-%m-%d %H:%M:%S"), ) # Add our custom content to the card card.text = DATASET_CARD_TEMPLATE.format( hf_repo=hf_repo, num_images=len(images), num_pdfs=len(pdf_files), sample_size=sample_size if sample_size > 0 else "All pages", creation_date=datetime.now().strftime("%Y-%m-%d %H:%M:%S"), size_category=size_category, ) repo_url = f"https://huggingface.co/datasets/{hf_repo}" message += f"\nUploaded dataset card to Hugging Face repo: [{hf_repo}]({repo_url})" card.push_to_hub(hf_repo, token=oauth_token.token) except Exception as e: message += f"\nFailed to upload to Hugging Face: {str(e)}" return images, zip_path, message except Exception as e: if "temp_dir" in locals(): shutil.rmtree(temp_dir) return None, None, f"An error occurred: {str(e)}" # Define the Gradio interface with gr.Blocks() as demo: gr.HTML( """

PDFs to Page Images Converter

📁 Convert PDFs to an image dataset, splitting pages into individual images 📁
""" ) gr.Markdown( """ This app allows you to: 1. Upload one or more PDF files 2. Convert each page of the PDFs into separate image files 3. (Optionally) sample a specific number of pages from each PDF 4. (Optionally) Create a downloadable ZIP file of the converted images 5. (Optionally) Upload the images to a Hugging Face dataset repository """ ) with gr.Row(): gr.LoginButton(size="sm") with gr.Row(): pdf_files = gr.File( file_count="multiple", label="Upload PDF(s)", file_types=["*.pdf"] ) with gr.Row(): sample_size = gr.Number( value=None, label="Pages per PDF (0 for all pages)", info="Specify how many pages to convert from each PDF. Use 0 to convert all pages.", ) hf_repo = gr.Textbox( label="Hugging Face Repo", placeholder="username/repo-name", info="Enter the Hugging Face repository name in the format 'username/repo-name'", ) with gr.Row(): create_zip = gr.Checkbox(label="Create ZIP file of images?", value=False) private_repo = gr.Checkbox(label="Make repository private?", value=False) with gr.Accordion("View converted images", open=False): output_gallery = gr.Gallery(label="Converted Images") status_text = gr.Markdown(label="Status") download_button = gr.File(label="Download Converted Images") submit_button = gr.Button("Convert PDFs to page images") submit_button.click( process_pdfs, inputs=[pdf_files, sample_size, hf_repo, create_zip, private_repo], outputs=[output_gallery, download_button, status_text], ) # Launch the app demo.launch(debug=True)