# app.py import gradio as gr import pandas as pd import numpy as np from transformers import pipeline, AutoTokenizer, AutoModelForSequenceClassification from sklearn.ensemble import RandomForestClassifier import joblib import os # Load Hugging Face model for anomaly detection tokenizer = AutoTokenizer.from_pretrained("huggingface-course/distilbert-base-uncased-finetuned-imdb") model = AutoModelForSequenceClassification.from_pretrained("huggingface-course/distilbert-base-uncased-finetuned-imdb") anomaly_detection = pipeline("text-classification", model=model, tokenizer=tokenizer) # Train or load Random Forest model for failure prediction if not os.path.exists('failure_prediction_model.pkl'): data = pd.DataFrame({ 'cpu_usage': [10, 20, 15, 35, 55], 'memory_usage': [30, 60, 45, 50, 80], 'error_rate': [0, 1, 0, 2, 5], 'failure': [0, 1, 0, 1, 1] }) X = data[['cpu_usage', 'memory_usage', 'error_rate']] y = data['failure'] failure_prediction_model = RandomForestClassifier(n_estimators=100, random_state=42) failure_prediction_model.fit(X, y) joblib.dump(failure_prediction_model, 'failure_prediction_model.pkl') else: failure_prediction_model = joblib.load('failure_prediction_model.pkl') # Preprocess logs for anomaly detection def preprocess_logs(logs): logs['timestamp'] = pd.to_datetime(logs['timestamp']) logs['log_message'] = logs['log_message'].str.lower() return logs # Detect anomalies in logs with label mapping def detect_anomaly(logs): preprocessed_logs = preprocess_logs(logs) label_map = { # Map Hugging Face output labels to meaningful labels "LABEL_0": "Normal", "LABEL_1": "Anomaly" } results = [] for log in preprocessed_logs['log_message']: anomaly_result = anomaly_detection(log) label = anomaly_result[0]['label'] results.append(label_map.get(label, label)) # Map the label or return the original label return results # Predict failures based on device metrics def predict_failure(device_metrics): if device_metrics is None: return "Device metrics are missing." if 'cpu_usage' not in device_metrics or 'memory_usage' not in device_metrics or 'error_rate' not in device_metrics: return "Invalid metrics format. Please provide 'cpu_usage', 'memory_usage', and 'error_rate'." metrics_array = np.array([device_metrics['cpu_usage'], device_metrics['memory_usage'], device_metrics['error_rate']]).reshape(1, -1) failure_prediction = failure_prediction_model.predict(metrics_array) return failure_prediction # Process logs and predict anomalies and failures def process_logs_and_predict(log_file, metrics): # Read and validate log file format try: logs = pd.read_json(log_file) if not isinstance(logs, pd.DataFrame) or logs.empty: return "Invalid log file format. Please upload a JSON array of log entries." except ValueError as e: return f"Error reading JSON file: {str(e)}" # Detect anomalies anomalies = detect_anomaly(logs) # Predict failures using device metrics failure_pred = predict_failure(metrics) return f"Anomalies Detected: {anomalies}, Failure Prediction: {failure_pred}" # Gradio interface iface = gr.Interface(fn=process_logs_and_predict, inputs=["file", "json"], outputs="text", title="Cisco Device Monitoring", description="Upload log files to detect anomalies and predict potential device failures.") iface.launch()