import numpy as np | |
from sklearn.ensemble import IsolationForest | |
from sklearn.preprocessing import StandardScaler | |
class AnomalyDetector: | |
def __init__(self): | |
self.model = IsolationForest(contamination=0.1, random_state=42) | |
self.scaler = StandardScaler() | |
def detect(self, data): | |
# Select numeric columns | |
numeric_columns = data.select_dtypes(include=[np.number]).columns | |
X = data[numeric_columns] | |
# Scale the data | |
X_scaled = self.scaler.fit_transform(X) | |
# Fit the model and predict | |
self.model.fit(X_scaled) | |
anomaly_labels = self.model.predict(X_scaled) | |
# Create a DataFrame with anomaly information | |
anomaly_data = data.copy() | |
anomaly_data['is_anomaly'] = anomaly_labels == -1 | |
# Calculate anomaly scores | |
anomaly_scores = self.model.decision_function(X_scaled) | |
anomaly_data['anomaly_score'] = anomaly_scores | |
# Sort by anomaly score (most anomalous first) | |
anomaly_data = anomaly_data.sort_values('anomaly_score') | |
# Return summary of anomalies | |
n_anomalies = anomaly_data['is_anomaly'].sum() | |
summary = f"Detected {n_anomalies} anomalies out of {len(data)} data points." | |
return summary, anomaly_data[anomaly_data['is_anomaly']] |