Bloomington / processor.py
krishna3103's picture
Upload 8 files
0baf78e verified
import json
import logging
from datetime import datetime
from typing import Dict, List
import pandas as pd
from bs4 import BeautifulSoup
import requests
from urllib.parse import urlparse
from config import RAW_DIR, PROCESSED_DIR, LOG_DIR
class DataProcessor:
def __init__(self):
# Set up logging
log_file = LOG_DIR / f"processor_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log"
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
filename=log_file
)
self.processed_data = {}
def _extract_domain(self, url: str) -> str:
"""Extract domain from URL"""
try:
return urlparse(url).netloc
except Exception:
return ""
def _scrape_webpage(self, url: str) -> str:
"""Scrape additional content from webpage"""
try:
headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'}
response = requests.get(url, headers=headers, timeout=10)
soup = BeautifulSoup(response.text, 'lxml')
# Remove unwanted elements
for element in soup(['script', 'style', 'nav', 'footer']):
element.decompose()
return ' '.join(soup.stripped_strings)
except Exception as e:
logging.error(f"Error scraping {url}: {e}")
return ""
def process_category(self, category: str) -> List[Dict]:
"""Process data for a single category"""
input_file = RAW_DIR / f"{category}_results.json"
try:
with open(input_file, 'r') as f:
raw_results = json.load(f)
except Exception as e:
logging.error(f"Error loading {input_file}: {e}")
return []
processed_results = []
for result in raw_results:
processed_result = {
'title': result.get('title', ''),
'snippet': result.get('snippet', ''),
'url': result.get('link', ''),
'domain': self._extract_domain(result.get('link', '')),
'category': category
}
# Add additional content for certain domains
if any(domain in processed_result['domain']
for domain in ['visitbloomington.com', 'indiana.edu', 'bloomington.in.gov']):
additional_content = self._scrape_webpage(processed_result['url'])
processed_result['additional_content'] = additional_content[:5000] # Limit content length
processed_results.append(processed_result)
# Save processed results
output_file = PROCESSED_DIR / f"{category}_processed.json"
with open(output_file, 'w') as f:
json.dump(processed_results, f, indent=2)
# Also save as CSV for easy viewing
df = pd.DataFrame(processed_results)
df.to_csv(PROCESSED_DIR / f"{category}_processed.csv", index=False)
self.processed_data[category] = processed_results
return processed_results
def process_all_categories(self) -> Dict[str, List[Dict]]:
"""Process all categories"""
categories = [f.stem.replace('_results', '')
for f in RAW_DIR.glob('*_results.json')]
for category in categories:
logging.info(f"Processing category: {category}")
self.process_category(category)
# Save combined results
all_results = []
for category_results in self.processed_data.values():
all_results.extend(category_results)
combined_df = pd.DataFrame(all_results)
combined_df.to_csv(PROCESSED_DIR / "all_processed.csv", index=False)
# Generate and save statistics
stats = {
'total_results': len(all_results),
'results_per_category': {
category: len(results)
for category, results in self.processed_data.items()
},
'domains_distribution': combined_df['domain'].value_counts().to_dict(),
'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
}
with open(PROCESSED_DIR / "processing_stats.json", 'w') as f:
json.dump(stats, f, indent=2)
return self.processed_data