Spaces:
Running
Running
import json | |
import os | |
from bs4 import BeautifulSoup | |
import logging | |
from typing import List, Dict, Any | |
from haystack.components.preprocessors.document_splitter import DocumentSplitter | |
from haystack import Document | |
logging.basicConfig(level=logging.INFO) | |
logger = logging.getLogger(__name__) | |
def load_json_data(file_path: str) -> List[Dict[str, str]]: | |
""" | |
Load data from a JSON file. | |
Args: | |
file_path: Path to the JSON file | |
Returns: | |
List of dictionaries containing the data | |
""" | |
try: | |
with open(file_path, 'r', encoding='utf-8') as f: | |
data = json.load(f) | |
logger.info(f"Successfully loaded {len(data)} records from {file_path}") | |
return data | |
except Exception as e: | |
logger.error(f"Error loading JSON data: {e}") | |
return [] | |
def extract_text_from_html(html_content: str) -> str: | |
""" | |
Extract text content from HTML. | |
Args: | |
html_content: HTML content as string | |
Returns: | |
Extracted text content | |
""" | |
try: | |
soup = BeautifulSoup(html_content, 'html.parser') | |
# Remove script and style elements | |
for script in soup(["script", "style"]): | |
script.extract() | |
# Get text | |
text = soup.get_text(separator=' ', strip=True) | |
# Remove extra whitespace | |
lines = (line.strip() for line in text.splitlines()) | |
chunks = (phrase.strip() for line in lines for phrase in line.split(" ")) | |
text = ' '.join(chunk for chunk in chunks if chunk) | |
return text | |
except Exception as e: | |
logger.error(f"Error extracting text from HTML: {e}") | |
return "" | |
def process_documents(data: List[Dict[str, str]]) -> List[Dict[str, Any]]: | |
""" | |
Process documents from the dataset. | |
Args: | |
data: List of dictionaries containing url and html fields | |
Returns: | |
List of processed documents with text content | |
""" | |
processed_docs = [] | |
for i, item in enumerate(data): | |
try: | |
url = item.get('url', '') | |
content = item.get('content', '') | |
if not url or not content: | |
continue | |
# text = extract_text_from_html(html) | |
# text = html | |
# if not text: | |
# continue | |
# Create document with metadata | |
doc = { | |
'content': content, | |
'meta': { | |
'url': url, | |
'doc_id': f"doc_{i}" | |
} | |
} | |
processed_docs.append(doc) | |
except Exception as e: | |
logger.error(f"Error processing document {i}: {e}") | |
logger.info(f"Successfully processed {len(processed_docs)} documents") | |
return processed_docs | |
def split_documents(docs: List[Dict[str, Any]], chunk_size: int = 500, overlap: int = 50) -> List[Dict[str, Any]]: | |
""" | |
Split documents into smaller chunks for better retrieval using Haystack. | |
Args: | |
docs: List of processed documents | |
chunk_size: Size of each chunk in characters | |
overlap: Overlap between chunks in characters | |
Returns: | |
List of document chunks | |
""" | |
# Initialize Haystack document splitter | |
document_splitter = DocumentSplitter( | |
# split_by="character", | |
split_length=chunk_size, | |
split_overlap=overlap | |
) | |
chunked_docs = [] | |
for doc in docs: | |
# If content is shorter than chunk_size, keep as is | |
if len(doc['content']) <= chunk_size: | |
chunked_docs.append(doc) | |
continue | |
# Prepare document for Haystack splitter | |
haystack_doc = Document( | |
content=doc['content'], | |
meta=doc['meta'] | |
) | |
# Split the document | |
result = document_splitter.run(documents=[haystack_doc]) | |
split_docs = result["documents"] | |
# Update document IDs for the chunks | |
for i, split_doc in enumerate(split_docs): | |
split_doc.meta["doc_id"] = f"{doc['meta']['doc_id']}_chunk_{i}" | |
split_doc.meta["chunk_id"] = i | |
chunked_docs.append(split_doc) | |
logger.info(f"Split {len(docs)} documents into {len(chunked_docs)} chunks") | |
return chunked_docs | |
if __name__ == "__main__": | |
# Test the functions | |
data_path = "ltu_programme_data.json" | |
if os.path.exists(data_path): | |
data = load_json_data(data_path) | |
processed_docs = process_documents(data[:5]) # Process first 5 docs as a test | |
chunked_docs = split_documents(processed_docs) | |
print(f"Processed {len(processed_docs)} documents into {len(chunked_docs)} chunks") |