Spaces:
Running
Running
import os | |
import fitz | |
from pptx import Presentation | |
import subprocess | |
from datetime import datetime | |
from llama_index.core import Document | |
from utils import ( | |
describe_image, is_graph, process_graph, extract_text_around_item, | |
process_text_blocks, save_uploaded_file | |
) | |
def get_pdf_documents(pdf_file): | |
"""Process a PDF file and extract text, tables, and images.""" | |
all_pdf_documents = [] | |
ongoing_tables = {} | |
try: | |
f = fitz.open(stream=pdf_file.read(), filetype="pdf") | |
except Exception as e: | |
print(f"Error opening or processing the PDF file: {e}") | |
return [] | |
for i in range(len(f)): | |
page = f[i] | |
text_blocks = [block for block in page.get_text("blocks", sort=True) | |
if block[-1] == 0 and not (block[1] < page.rect.height * 0.1 or block[3] > page.rect.height * 0.9)] | |
grouped_text_blocks = process_text_blocks(text_blocks) | |
table_docs, table_bboxes, ongoing_tables = parse_all_tables(pdf_file.name, page, i, text_blocks, ongoing_tables) | |
all_pdf_documents.extend(table_docs) | |
image_docs = parse_all_images(pdf_file.name, page, i, text_blocks) | |
all_pdf_documents.extend(image_docs) | |
for text_block_ctr, (heading_block, content) in enumerate(grouped_text_blocks, 1): | |
heading_bbox = fitz.Rect(heading_block[:4]) | |
if not any(heading_bbox.intersects(table_bbox) for table_bbox in table_bboxes): | |
bbox = {"x1": heading_block[0], "y1": heading_block[1], "x2": heading_block[2], "x3": heading_block[3]} | |
text_doc = Document( | |
text=f"{heading_block[4]}\n{content}", | |
metadata={ | |
**bbox, | |
"type": "text", | |
"page_num": i, | |
"source": f"{pdf_file.name[:-4]}-page{i}-block{text_block_ctr}" | |
}, | |
id_=f"{pdf_file.name[:-4]}-page{i}-block{text_block_ctr}" | |
) | |
all_pdf_documents.append(text_doc) | |
f.close() | |
return all_pdf_documents | |
def parse_all_tables(filename, page, pagenum, text_blocks, ongoing_tables): | |
"""Extract tables from a PDF page.""" | |
table_docs = [] | |
table_bboxes = [] | |
try: | |
tables = page.find_tables(horizontal_strategy="lines_strict", vertical_strategy="lines_strict") | |
for tab in tables: | |
if not tab.header.external: | |
pandas_df = tab.to_pandas() | |
tablerefdir = os.path.join(os.getcwd(), "vectorstore/table_references") | |
os.makedirs(tablerefdir, exist_ok=True) | |
df_xlsx_path = os.path.join(tablerefdir, f"table{len(table_docs)+1}-page{pagenum}.xlsx") | |
pandas_df.to_excel(df_xlsx_path) | |
bbox = fitz.Rect(tab.bbox) | |
table_bboxes.append(bbox) | |
before_text, after_text = extract_text_around_item(text_blocks, bbox, page.rect.height) | |
table_img = page.get_pixmap(clip=bbox) | |
table_img_path = os.path.join(tablerefdir, f"table{len(table_docs)+1}-page{pagenum}.jpg") | |
table_img.save(table_img_path) | |
description = process_graph(table_img.tobytes()) | |
caption = before_text.replace("\n", " ") + description + after_text.replace("\n", " ") | |
if before_text == "" and after_text == "": | |
caption = " ".join(tab.header.names) | |
table_metadata = { | |
"source": f"{filename[:-4]}-page{pagenum}-table{len(table_docs)+1}", | |
"dataframe": df_xlsx_path, | |
"image": table_img_path, | |
"caption": caption, | |
"type": "table", | |
"page_num": pagenum | |
} | |
all_cols = ", ".join(list(pandas_df.columns.values)) | |
doc = Document(text=f"This is a table with the caption: {caption}\nThe columns are {all_cols}", metadata=table_metadata) | |
table_docs.append(doc) | |
except Exception as e: | |
print(f"Error during table extraction: {e}") | |
return table_docs, table_bboxes, ongoing_tables | |
def parse_all_images(filename, page, pagenum, text_blocks): | |
"""Extract images from a PDF page.""" | |
image_docs = [] | |
image_info_list = page.get_image_info(xrefs=True) | |
page_rect = page.rect | |
for image_info in image_info_list: | |
xref = image_info['xref'] | |
if xref == 0: | |
continue | |
img_bbox = fitz.Rect(image_info['bbox']) | |
if img_bbox.width < page_rect.width / 20 or img_bbox.height < page_rect.height / 20: | |
continue | |
extracted_image = page.parent.extract_image(xref) | |
image_data = extracted_image["image"] | |
imgrefpath = os.path.join(os.getcwd(), "vectorstore/image_references") | |
os.makedirs(imgrefpath, exist_ok=True) | |
image_path = os.path.join(imgrefpath, f"image{xref}-page{pagenum}.png") | |
with open(image_path, "wb") as img_file: | |
img_file.write(image_data) | |
before_text, after_text = extract_text_around_item(text_blocks, img_bbox, page.rect.height) | |
if before_text == "" and after_text == "": | |
continue | |
image_description = " " | |
if is_graph(image_data): | |
image_description = process_graph(image_data) | |
caption = before_text.replace("\n", " ") + image_description + after_text.replace("\n", " ") | |
image_metadata = { | |
"source": f"{filename[:-4]}-page{pagenum}-image{xref}", | |
"image": image_path, | |
"caption": caption, | |
"type": "image", | |
"page_num": pagenum | |
} | |
image_docs.append(Document(text="This is an image with the caption: " + caption, metadata=image_metadata)) | |
return image_docs | |
def process_ppt_file(ppt_path): | |
"""Process a PowerPoint file.""" | |
pdf_path = convert_ppt_to_pdf(ppt_path) | |
images_data = convert_pdf_to_images(pdf_path) | |
slide_texts = extract_text_and_notes_from_ppt(ppt_path) | |
processed_data = [] | |
for (image_path, page_num), (slide_text, notes) in zip(images_data, slide_texts): | |
if notes: | |
notes = "\n\nThe speaker notes for this slide are: " + notes | |
with open(image_path, 'rb') as image_file: | |
image_content = image_file.read() | |
image_description = " " | |
if is_graph(image_content): | |
image_description = process_graph(image_content) | |
image_metadata = { | |
"source": f"{os.path.basename(ppt_path)}", | |
"image": image_path, | |
"caption": slide_text + image_description + notes, | |
"type": "image", | |
"page_num": page_num | |
} | |
processed_data.append(Document(text="This is a slide with the text: " + slide_text + image_description, metadata=image_metadata)) | |
return processed_data | |
def convert_ppt_to_pdf(ppt_path): | |
"""Convert a PowerPoint file to PDF using LibreOffice.""" | |
base_name = os.path.basename(ppt_path) | |
ppt_name_without_ext = os.path.splitext(base_name)[0].replace(' ', '_') | |
new_dir_path = os.path.abspath("vectorstore/ppt_references") | |
os.makedirs(new_dir_path, exist_ok=True) | |
pdf_path = os.path.join(new_dir_path, f"{ppt_name_without_ext}.pdf") | |
command = ['libreoffice', '--headless', '--convert-to', 'pdf', '--outdir', new_dir_path, ppt_path] | |
subprocess.run(command, check=True) | |
return pdf_path | |
def convert_pdf_to_images(pdf_path): | |
"""Convert a PDF file to a series of images using PyMuPDF.""" | |
doc = fitz.open(pdf_path) | |
base_name = os.path.basename(pdf_path) | |
pdf_name_without_ext = os.path.splitext(base_name)[0].replace(' ', '_') | |
new_dir_path = os.path.join(os.getcwd(), "vectorstore/ppt_references") | |
os.makedirs(new_dir_path, exist_ok=True) | |
image_paths = [] | |
for page_num in range(len(doc)): | |
page = doc.load_page(page_num) | |
pix = page.get_pixmap() | |
output_image_path = os.path.join(new_dir_path, f"{pdf_name_without_ext}_{page_num:04d}.png") | |
pix.save(output_image_path) | |
image_paths.append((output_image_path, page_num)) | |
doc.close() | |
return image_paths | |
def extract_text_and_notes_from_ppt(ppt_path): | |
"""Extract text and notes from a PowerPoint file.""" | |
prs = Presentation(ppt_path) | |
text_and_notes = [] | |
for slide in prs.slides: | |
slide_text = ' '.join([shape.text for shape in slide.shapes if hasattr(shape, "text")]) | |
try: | |
notes = slide.notes_slide.notes_text_frame.text if slide.notes_slide else '' | |
except: | |
notes = '' | |
text_and_notes.append((slide_text, notes)) | |
return text_and_notes | |
def load_multimodal_data(files): | |
"""Load and process multiple file types with timestamp metadata.""" | |
documents = [] | |
for file in files: | |
# Get current timestamp | |
current_timestamp = datetime.now().isoformat() | |
file_extension = os.path.splitext(file.lower())[1] | |
if file_extension in ('.png', '.jpg', '.jpeg'): | |
image_content = open(file, "rb").read() | |
image_text = describe_image(image_content) | |
doc = Document( | |
text=image_text, | |
metadata={ | |
"source": file.lower(), | |
"type": "image", | |
"timestamp": current_timestamp | |
} | |
) | |
documents.append(doc) | |
elif file_extension == '.pdf': | |
try: | |
pdf_documents = get_pdf_documents(file) | |
# Add timestamp to each PDF document | |
for pdf_doc in pdf_documents: | |
pdf_doc.metadata['timestamp'] = current_timestamp | |
documents.extend(pdf_documents) | |
except Exception as e: | |
print(f"Error processing PDF {file.lower()}: {e}") | |
elif file_extension in ('.ppt', '.pptx'): | |
try: | |
ppt_documents = process_ppt_file(save_uploaded_file(file)) | |
# Add timestamp to each PPT document | |
for ppt_doc in ppt_documents: | |
ppt_doc.metadata['timestamp'] = current_timestamp | |
documents.extend(ppt_documents) | |
except Exception as e: | |
print(f"Error processing PPT {file.lower()}: {e}") | |
else: | |
text = file.read().decode("utf-8") | |
doc = Document( | |
text=text, | |
metadata={ | |
"source": file.lower(), | |
"type": "text", | |
"timestamp": current_timestamp | |
} | |
) | |
documents.append(doc) | |
return documents | |
def load_data_from_directory(directory): | |
"""Load and process multiple file types from a directory with timestamp metadata.""" | |
documents = [] | |
for filename in os.listdir(directory): | |
filepath = os.path.join(directory, filename) | |
# Get current timestamp | |
current_timestamp = datetime.now().isoformat() | |
file_extension = os.path.splitext(filename.lower())[1] | |
print(filename) | |
if file_extension in ('.png', '.jpg', '.jpeg'): | |
with open(filepath, "rb") as image_file: | |
image_content = image_file.read() | |
image_text = describe_image(image_content) | |
doc = Document( | |
text=image_text, | |
metadata={ | |
"source": filename, | |
"type": "image", | |
"timestamp": current_timestamp | |
} | |
) | |
print(doc) | |
documents.append(doc) | |
elif file_extension == '.pdf': | |
with open(filepath, "rb") as pdf_file: | |
try: | |
pdf_documents = get_pdf_documents(pdf_file) | |
# Add timestamp to each PDF document | |
for pdf_doc in pdf_documents: | |
pdf_doc.metadata['timestamp'] = current_timestamp | |
documents.extend(pdf_documents) | |
except Exception as e: | |
print(f"Error processing PDF {filename}: {e}") | |
elif file_extension in ('.ppt', '.pptx'): | |
try: | |
ppt_documents = process_ppt_file(filepath) | |
# Add timestamp to each PPT document | |
for ppt_doc in ppt_documents: | |
ppt_doc.metadata['timestamp'] = current_timestamp | |
documents.extend(ppt_documents) | |
print(ppt_documents) | |
except Exception as e: | |
print(f"Error processing PPT {filename}: {e}") | |
else: | |
with open(filepath, "r", encoding="utf-8") as text_file: | |
text = text_file.read() | |
doc = Document( | |
text=text, | |
metadata={ | |
"source": filename, | |
"type": "text", | |
"timestamp": current_timestamp | |
} | |
) | |
documents.append(doc) | |
return documents |