|
import gradio as gr |
|
import os |
|
import jpype |
|
import jpype.imports |
|
from jpype.types import * |
|
import subprocess |
|
import uno |
|
from com.sun.star.beans import PropertyValue |
|
from com.sun.star.connection import NoConnectException |
|
import time |
|
import sys |
|
import logging |
|
from pathlib import Path |
|
|
|
class DocumentConverter: |
|
def __init__(self): |
|
self.init_jpype() |
|
self.init_libreoffice() |
|
|
|
def init_jpype(self): |
|
if not jpype.isJVMStarted(): |
|
jpype.startJVM() |
|
|
|
|
|
from org.apache.poi.xwpf.usermodel import XWPFDocument |
|
from org.apache.poi.hwpf.usermodel import HWPFDocument |
|
from org.apache.poi.xssf.usermodel import XSSFWorkbook |
|
from org.apache.poi.hssf.usermodel import HSSFWorkbook |
|
from org.apache.poi.sl.usermodel import SlideShowFactory |
|
from java.io import FileInputStream, FileOutputStream |
|
from fr.opensagres.poi.xwpf.converter.pdf import PdfConverter |
|
|
|
def init_libreoffice(self): |
|
|
|
try: |
|
subprocess.Popen(['soffice', '--headless', '--accept=socket,host=localhost,port=2002;urp;']) |
|
time.sleep(2) |
|
except Exception as e: |
|
logging.error(f"Failed to start LibreOffice: {e}") |
|
|
|
def connect_to_libreoffice(self): |
|
localContext = uno.getComponentContext() |
|
resolver = localContext.ServiceManager.createInstanceWithContext( |
|
"com.sun.star.bridge.UnoUrlResolver", localContext) |
|
context = resolver.resolve("uno:socket,host=localhost,port=2002;urp;StarOffice.ComponentContext") |
|
return context.ServiceManager.createInstanceWithContext("com.sun.star.frame.Desktop", context) |
|
|
|
def convert_with_poi(self, input_file, output_format, progress=None): |
|
try: |
|
input_filepath = input_file.name |
|
output_filename = f"{os.path.splitext(os.path.basename(input_filepath))[0]}.{output_format}" |
|
output_filepath = os.path.join(os.getcwd(), output_filename) |
|
|
|
extension = os.path.splitext(input_filepath)[1].lower() |
|
input_stream = FileInputStream(input_filepath) |
|
|
|
|
|
if extension in ['.docx', '.doc']: |
|
if progress: |
|
progress(0.1, "Loading document...") |
|
|
|
if extension == '.docx': |
|
doc = XWPFDocument(input_stream) |
|
else: |
|
doc = HWPFDocument(input_stream) |
|
|
|
total_paragraphs = len(doc.getParagraphs()) |
|
|
|
class ProgressListener(jpype.JImplements('fr.opensagres.poi.xwpf.converter.core.IProgressListener')): |
|
def onProgress(self, processed_paragraphs): |
|
if progress: |
|
percent = min((processed_paragraphs / total_paragraphs) * 100, 100) |
|
progress(percent/100, f"Converting paragraph {processed_paragraphs} of {total_paragraphs}") |
|
|
|
if output_format == 'pdf': |
|
output_stream = FileOutputStream(output_filepath) |
|
converter = PdfConverter.getInstance() |
|
converter.setProgressListener(ProgressListener()) |
|
converter.convert(doc, output_stream) |
|
output_stream.close() |
|
|
|
|
|
elif extension in ['.xlsx', '.xls']: |
|
if progress: |
|
progress(0.1, "Loading spreadsheet...") |
|
|
|
if extension == '.xlsx': |
|
workbook = XSSFWorkbook(input_stream) |
|
else: |
|
workbook = HSSFWorkbook(input_stream) |
|
|
|
total_sheets = workbook.getNumberOfSheets() |
|
|
|
for sheet_idx in range(total_sheets): |
|
if progress: |
|
percent = ((sheet_idx + 1) / total_sheets) * 100 |
|
progress(percent/100, f"Converting sheet {sheet_idx + 1} of {total_sheets}") |
|
|
|
sheet = workbook.getSheetAt(sheet_idx) |
|
|
|
output_stream = FileOutputStream(output_filepath) |
|
workbook.write(output_stream) |
|
output_stream.close() |
|
|
|
input_stream.close() |
|
return output_filepath |
|
|
|
except Exception as e: |
|
logging.error(f"POI conversion error: {e}") |
|
return f"Error: {str(e)}" |
|
|
|
def convert_with_libreoffice(self, input_file, output_format, progress=None): |
|
try: |
|
input_filepath = input_file.name |
|
output_filename = f"{os.path.splitext(os.path.basename(input_filepath))[0]}.{output_format}" |
|
output_filepath = os.path.join(os.getcwd(), output_filename) |
|
|
|
if progress: |
|
progress(0.1, "Connecting to LibreOffice...") |
|
|
|
desktop = self.connect_to_libreoffice() |
|
|
|
if progress: |
|
progress(0.2, "Loading document...") |
|
|
|
props = PropertyValue(Name="Hidden", Value=True), |
|
doc = desktop.loadComponentFromURL( |
|
f"file://{os.path.abspath(input_filepath)}", "_blank", 0, props) |
|
|
|
if progress: |
|
progress(0.5, "Converting document...") |
|
|
|
output_url = f"file://{os.path.abspath(output_filepath)}" |
|
props = PropertyValue(Name="FilterName", Value=self.get_filter_name(output_format)), |
|
|
|
doc.storeToURL(output_url, props) |
|
doc.close(True) |
|
|
|
if progress: |
|
progress(1.0, "Conversion complete!") |
|
|
|
return output_filepath |
|
|
|
except Exception as e: |
|
logging.error(f"LibreOffice conversion error: {e}") |
|
return f"Error: {str(e)}" |
|
|
|
def get_filter_name(self, format): |
|
filters = { |
|
'pdf': 'writer_pdf_Export', |
|
'doc': 'MS Word 97', |
|
'docx': 'Office Open XML Text', |
|
'odt': 'writer8', |
|
'rtf': 'Rich Text Format', |
|
'txt': 'Text', |
|
'html': 'HTML', |
|
'ppt': 'MS PowerPoint 97', |
|
'pptx': 'Impress Office Open XML', |
|
'xls': 'MS Excel 97', |
|
'xlsx': 'Office Open XML Spreadsheet' |
|
} |
|
return filters.get(format, 'writer_pdf_Export') |
|
|
|
def convert_file(self, input_file, output_format, progress=None): |
|
extension = os.path.splitext(input_file.name)[1].lower() |
|
|
|
|
|
if extension in ['.docx', '.doc', '.xlsx', '.xls'] and output_format in ['pdf', 'docx', 'xlsx']: |
|
return self.convert_with_poi(input_file, output_format, progress) |
|
|
|
else: |
|
return self.convert_with_libreoffice(input_file, output_format, progress) |
|
|
|
def gradio_interface(): |
|
converter = DocumentConverter() |
|
|
|
supported_formats = [ |
|
"pdf", "doc", "docx", "html", "txt", "odt", "rtf", |
|
"ppt", "pptx", "xls", "xlsx" |
|
] |
|
|
|
with gr.Blocks() as demo: |
|
with gr.Row(): |
|
with gr.Column(): |
|
file_input = gr.File(label="Upload a file") |
|
format_dropdown = gr.Dropdown( |
|
choices=supported_formats, |
|
value="pdf", |
|
label="Select Output Format" |
|
) |
|
convert_button = gr.Button("Convert") |
|
|
|
with gr.Column(): |
|
progress_bar = gr.Progress() |
|
output_file = gr.File(label="Converted File") |
|
status_text = gr.Textbox(label="Status", interactive=False) |
|
|
|
def wrapped_convert(input_file, output_format, progress=None): |
|
status_text.update("Starting conversion...") |
|
result = converter.convert_file(input_file, output_format, progress) |
|
status_text.update("Complete!" if not isinstance(result, str) else result) |
|
return result |
|
|
|
convert_button.click( |
|
fn=wrapped_convert, |
|
inputs=[file_input, format_dropdown], |
|
outputs=output_file, |
|
show_progress="full", |
|
) |
|
|
|
return demo |
|
|
|
if __name__ == "__main__": |
|
gradio_interface().launch() |