|
import json
|
|
import logging
|
|
from datetime import datetime
|
|
from typing import Dict, List
|
|
import pandas as pd
|
|
from bs4 import BeautifulSoup
|
|
import requests
|
|
from urllib.parse import urlparse
|
|
|
|
from config import RAW_DIR, PROCESSED_DIR, LOG_DIR
|
|
|
|
class DataProcessor:
|
|
def __init__(self):
|
|
|
|
log_file = LOG_DIR / f"processor_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log"
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format='%(asctime)s - %(levelname)s - %(message)s',
|
|
filename=log_file
|
|
)
|
|
|
|
self.processed_data = {}
|
|
|
|
def _extract_domain(self, url: str) -> str:
|
|
"""Extract domain from URL"""
|
|
try:
|
|
return urlparse(url).netloc
|
|
except Exception:
|
|
return ""
|
|
|
|
def _scrape_webpage(self, url: str) -> str:
|
|
"""Scrape additional content from webpage"""
|
|
try:
|
|
headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'}
|
|
response = requests.get(url, headers=headers, timeout=10)
|
|
soup = BeautifulSoup(response.text, 'lxml')
|
|
|
|
|
|
for element in soup(['script', 'style', 'nav', 'footer']):
|
|
element.decompose()
|
|
|
|
return ' '.join(soup.stripped_strings)
|
|
except Exception as e:
|
|
logging.error(f"Error scraping {url}: {e}")
|
|
return ""
|
|
|
|
def process_category(self, category: str) -> List[Dict]:
|
|
"""Process data for a single category"""
|
|
input_file = RAW_DIR / f"{category}_results.json"
|
|
|
|
try:
|
|
with open(input_file, 'r') as f:
|
|
raw_results = json.load(f)
|
|
except Exception as e:
|
|
logging.error(f"Error loading {input_file}: {e}")
|
|
return []
|
|
|
|
processed_results = []
|
|
|
|
for result in raw_results:
|
|
processed_result = {
|
|
'title': result.get('title', ''),
|
|
'snippet': result.get('snippet', ''),
|
|
'url': result.get('link', ''),
|
|
'domain': self._extract_domain(result.get('link', '')),
|
|
'category': category
|
|
}
|
|
|
|
|
|
if any(domain in processed_result['domain']
|
|
for domain in ['visitbloomington.com', 'indiana.edu', 'bloomington.in.gov']):
|
|
additional_content = self._scrape_webpage(processed_result['url'])
|
|
processed_result['additional_content'] = additional_content[:5000]
|
|
|
|
processed_results.append(processed_result)
|
|
|
|
|
|
output_file = PROCESSED_DIR / f"{category}_processed.json"
|
|
with open(output_file, 'w') as f:
|
|
json.dump(processed_results, f, indent=2)
|
|
|
|
|
|
df = pd.DataFrame(processed_results)
|
|
df.to_csv(PROCESSED_DIR / f"{category}_processed.csv", index=False)
|
|
|
|
self.processed_data[category] = processed_results
|
|
return processed_results
|
|
|
|
def process_all_categories(self) -> Dict[str, List[Dict]]:
|
|
"""Process all categories"""
|
|
categories = [f.stem.replace('_results', '')
|
|
for f in RAW_DIR.glob('*_results.json')]
|
|
|
|
for category in categories:
|
|
logging.info(f"Processing category: {category}")
|
|
self.process_category(category)
|
|
|
|
|
|
all_results = []
|
|
for category_results in self.processed_data.values():
|
|
all_results.extend(category_results)
|
|
|
|
combined_df = pd.DataFrame(all_results)
|
|
combined_df.to_csv(PROCESSED_DIR / "all_processed.csv", index=False)
|
|
|
|
|
|
stats = {
|
|
'total_results': len(all_results),
|
|
'results_per_category': {
|
|
category: len(results)
|
|
for category, results in self.processed_data.items()
|
|
},
|
|
'domains_distribution': combined_df['domain'].value_counts().to_dict(),
|
|
'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
|
|
}
|
|
|
|
with open(PROCESSED_DIR / "processing_stats.json", 'w') as f:
|
|
json.dump(stats, f, indent=2)
|
|
|
|
return self.processed_data |