Spaces:
Sleeping
Sleeping
"""Unstructured file reader. | |
A parser for unstructured text files using Unstructured.io. | |
Supports .txt, .docx, .pptx, .jpg, .png, .eml, .html, and .pdf documents. | |
""" | |
from datetime import datetime | |
import mimetypes | |
import os | |
from pathlib import Path | |
import re | |
from typing import Any, Dict, List, Optional | |
from llama_index.readers.base import BaseReader | |
from llama_index.readers.schema.base import Document | |
class UnstructuredReader(BaseReader): | |
"""General unstructured text reader for a variety of files.""" | |
def __init__(self, *args: Any, **kwargs: Any) -> None: | |
"""Init params.""" | |
super().__init__(*args, **kwargs) | |
# Prerequisite for Unstructured.io to work | |
import nltk | |
nltk.download("punkt") | |
nltk.download("averaged_perceptron_tagger") | |
def load_data( | |
self, | |
file: Path, | |
extra_info: Optional[Dict] = None, | |
split_documents: Optional[bool] = True, | |
) -> List[Document]: | |
"""Parse file.""" | |
from unstructured.partition.auto import partition | |
elements = partition(str(file)) | |
text_chunks = [" ".join(str(el).split()) for el in elements] | |
if split_documents: | |
return [ | |
Document(text=chunk, extra_info=extra_info or {}) | |
for chunk in text_chunks | |
] | |
else: | |
return [ | |
Document(text="\n\n".join(text_chunks), extra_info=extra_info or {}) | |
] | |
class MarkdownReader(BaseReader): | |
"""General unstructured text reader for a variety of files.""" | |
def __init__(self, *args: Any, **kwargs: Any) -> None: | |
"""Init params.""" | |
super().__init__(*args, **kwargs) | |
def load_data( | |
self, | |
file: Path, | |
extra_info: Optional[Dict] = None, | |
split_documents: Optional[bool] = True, | |
) -> List[Document]: | |
"""Parse file.""" | |
from unstructured.partition.auto import partition | |
elements = parse_knowledge_units(str(file)) | |
if split_documents: | |
return [ | |
Document(text=ele, extra_info=extra_info or {}) | |
for ele in elements | |
] | |
def parse_knowledge_units(file_path): | |
with open(file_path, 'r', encoding='utf-8') as file: | |
lines = file.readlines() | |
knowledge_units = [] | |
current_unit = "" | |
unit_start_pattern = re.compile(r'^\d+\.\s') | |
for line in lines: | |
stripped_line = line.strip() | |
if unit_start_pattern.match(stripped_line): | |
if current_unit: | |
knowledge_units.append(current_unit.strip()) | |
current_unit = "" | |
current_unit += line | |
else: | |
current_unit += line | |
if current_unit: | |
knowledge_units.append(current_unit.strip()) | |
# for line in lines: | |
# if line.strip() and line[0].isdigit() and '.' in line: | |
# if current_unit: | |
# knowledge_units.append(current_unit.strip()) | |
# current_unit = "" | |
# current_unit += line | |
# else: | |
# current_unit += line | |
# if current_unit: | |
# knowledge_units.append(current_unit.strip()) | |
return knowledge_units | |
def default_file_metadata_func(file_path: str) -> Dict: | |
"""Get some handy metadate from filesystem. | |
Args: | |
file_path: str: file path in str | |
""" | |
return { | |
"file_path": file_path, | |
"file_name": os.path.basename(file_path), | |
"file_type": mimetypes.guess_type(file_path)[0], | |
"file_size": os.path.getsize(file_path), | |
"creation_date": datetime.fromtimestamp( | |
Path(file_path).stat().st_ctime | |
).strftime("%Y-%m-%d"), | |
"last_modified_date": datetime.fromtimestamp( | |
Path(file_path).stat().st_mtime | |
).strftime("%Y-%m-%d"), | |
"last_accessed_date": datetime.fromtimestamp( | |
Path(file_path).stat().st_atime | |
).strftime("%Y-%m-%d"), | |
} |