# Media_Wiki.py # Description: This file contains the functions to import MediaWiki dumps into the media_db and Chroma databases. ####################################################################################################################### # # Imports import json import logging import os import re import traceback from typing import List, Dict, Any, Iterator, Optional # 3rd-Party Imports import mwparserfromhell import mwxml import yaml # # Local Imports from App_Function_Libraries.DB.DB_Manager import add_media_with_keywords from App_Function_Libraries.RAG.ChromaDB_Library import process_and_store_content # ####################################################################################################################### # # Functions: # Load configuration def load_mediawiki_import_config(): with open(os.path.join('Config_Files', 'mediawiki_import_config.yaml'), 'r') as f: return yaml.safe_load(f) config = load_mediawiki_import_config() def setup_logger(name: str, level: int = logging.INFO, log_file: Optional[str] = None) -> logging.Logger: """Set up and return a logger with the given name and level.""" logger = logging.getLogger(name) logger.setLevel(level) formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') if log_file: file_handler = logging.FileHandler(log_file) file_handler.setFormatter(formatter) logger.addHandler(file_handler) console_handler = logging.StreamHandler() console_handler.setFormatter(formatter) logger.addHandler(console_handler) return logger # Usage logger = setup_logger('mediawiki_import', log_file='mediawiki_import.log') # End of setup ####################################################################################################################### # # Functions: def parse_mediawiki_dump(file_path: str, namespaces: List[int] = None, skip_redirects: bool = False) -> Iterator[ Dict[str, Any]]: dump = mwxml.Dump.from_file(open(file_path, encoding='utf-8')) for page in dump.pages: if skip_redirects and page.redirect: continue if namespaces and page.namespace not in namespaces: continue for revision in page: wikicode = mwparserfromhell.parse(revision.text) plain_text = wikicode.strip_code() yield { "title": page.title, "content": plain_text, "namespace": page.namespace, "page_id": page.id, "revision_id": revision.id, "timestamp": revision.timestamp } logger.debug(f"Yielded page: {page.title}") def optimized_chunking(text: str, chunk_options: Dict[str, Any]) -> List[Dict[str, Any]]: sections = re.split(r'\n==\s*(.*?)\s*==\n', text) chunks = [] current_chunk = "" current_size = 0 logging.debug(f"optimized_chunking: Processing text with {len(sections) // 2} sections") for i in range(0, len(sections), 2): section_title = sections[i] if i > 0 else "Introduction" section_content = sections[i + 1] if i + 1 < len(sections) else "" if current_size + len(section_content) > chunk_options['max_size']: if current_chunk: chunks.append({"text": current_chunk, "metadata": {"section": section_title}}) current_chunk = section_content current_size = len(section_content) else: current_chunk += f"\n== {section_title} ==\n" + section_content current_size += len(section_content) if current_chunk: chunks.append({"text": current_chunk, "metadata": {"section": "End"}}) return chunks def process_single_item(content: str, title: str, wiki_name: str, chunk_options: Dict[str, Any], is_combined: bool = False, item: Dict[str, Any] = None, api_name: str = None): try: logging.debug(f"process_single_item: Processing item: {title}") # Create a unique URL using the wiki name and article title encoded_title = title.replace(" ", "_") url = f"mediawiki:{wiki_name}:{encoded_title}" logging.debug(f"Generated URL: {url}") result = add_media_with_keywords( url=url, # Use the generated URL here title=title, media_type="mediawiki_dump" if is_combined else "mediawiki_article", content=content, keywords=f"mediawiki,{wiki_name}" + (",full_dump" if is_combined else ",article"), prompt="", summary="", transcription_model="", author="MediaWiki", ingestion_date=item['timestamp'].strftime('%Y-%m-%d') if item else None ) logging.debug(f"Result from add_media_with_keywords: {result}") # Unpack the result media_id, message = result logging.info(f"Media item result: {message}") logging.debug(f"Final media_id: {media_id}") chunks = optimized_chunking(content, chunk_options) for i, chunk in enumerate(chunks): logging.debug(f"Processing chunk {i + 1}/{len(chunks)} for item: {title}") # FIXME # def process_and_store_content(content: str, collection_name: str, media_id: int, file_name: str, # create_embeddings: bool = False, create_summary: bool = False, # api_name: str = None): if api_name: process_and_store_content(chunk['text'], f"mediawiki_{wiki_name}", media_id, title, True, True, api_name) else: process_and_store_content(chunk['text'], f"mediawiki_{wiki_name}", media_id, title) logging.info(f"Successfully processed item: {title}") except Exception as e: logging.error(f"Error processing item {title}: {str(e)}") logging.error(f"Exception details: {traceback.format_exc()}") def load_checkpoint(file_path: str) -> int: if os.path.exists(file_path): with open(file_path, 'r') as f: return json.load(f)['last_processed_id'] return 0 def save_checkpoint(file_path: str, last_processed_id: int): with open(file_path, 'w') as f: json.dump({'last_processed_id': last_processed_id}, f) def import_mediawiki_dump( file_path: str, wiki_name: str, namespaces: List[int] = None, skip_redirects: bool = False, chunk_options: Dict[str, Any] = None, single_item: bool = False, progress_callback: Any = None, api_name: str = None, api_key: str = None ) -> Iterator[str]: try: logging.info(f"Importing MediaWiki dump: {file_path}") if chunk_options is None: chunk_options = config['chunking'] checkpoint_file = f"{wiki_name}_import_checkpoint.json" last_processed_id = load_checkpoint(checkpoint_file) total_pages = count_pages(file_path, namespaces, skip_redirects) processed_pages = 0 yield f"Found {total_pages} pages to process." for item in parse_mediawiki_dump(file_path, namespaces, skip_redirects): if item['page_id'] <= last_processed_id: continue # FIXME - ensure this works... if api_name is not None: # FIXME - add API key to the call/params process_single_item(item['content'], item['title'], wiki_name, chunk_options, False, item, api_name) process_single_item(item['content'], item['title'], wiki_name, chunk_options, False, item) save_checkpoint(checkpoint_file, item['page_id']) processed_pages += 1 if progress_callback is not None: progress_callback(processed_pages / total_pages, f"Processed page: {item['title']}") yield f"Processed page {processed_pages}/{total_pages}: {item['title']}" os.remove(checkpoint_file) # Remove checkpoint file after successful import yield f"Successfully imported and indexed MediaWiki dump: {wiki_name}" except FileNotFoundError: logger.error(f"MediaWiki dump file not found: {file_path}") yield f"Error: File not found - {file_path}" except PermissionError: logger.error(f"Permission denied when trying to read: {file_path}") yield f"Error: Permission denied - {file_path}" except Exception as e: logger.exception(f"Error during MediaWiki import: {str(e)}") yield f"Error during import: {str(e)}" def count_pages(file_path: str, namespaces: List[int] = None, skip_redirects: bool = False) -> int: """ Count the number of pages in a MediaWiki XML dump file. Args: file_path (str): Path to the MediaWiki XML dump file. namespaces (List[int], optional): List of namespace IDs to include. If None, include all namespaces. skip_redirects (bool, optional): Whether to skip redirect pages. Returns: int: The number of pages in the dump file. """ try: dump = mwxml.Dump.from_file(open(file_path, encoding='utf-8')) count = 0 for page in dump.pages: if skip_redirects and page.redirect: continue if namespaces and page.namespace not in namespaces: continue count += 1 return count except Exception as e: logger.error(f"Error counting pages in MediaWiki dump: {str(e)}") return 0 # # End of Media_Wiki.py #######################################################################################################################