import os import joblib import torch import numpy as np from transformers import DistilBertTokenizerFast, DistilBertForSequenceClassification import torch.nn.functional as F class EndpointHandler: def __init__(self, model_dir): self.model = DistilBertForSequenceClassification.from_pretrained(model_dir) self.tokenizer = DistilBertTokenizerFast.from_pretrained(model_dir) self.label_mapping = joblib.load(os.path.join(model_dir, "label_mapping.joblib")) self.labels = {v: k for k, v in self.label_mapping.items()} def __call__(self, inputs): if isinstance(inputs, dict) and 'inputs' in inputs: return self.predict(inputs['inputs']) return self.predict(inputs) def predict(self, text): # Tokenize and encode the input encoded_input = self.tokenizer(text, return_tensors='pt', truncation=True, max_length=512) # Get model prediction with torch.no_grad(): outputs = self.model(**encoded_input) logits = outputs.logits # Get probabilities probabilities = F.softmax(logits, dim=-1).squeeze().numpy() # Get predicted class and confidence predicted_class_idx = np.argmax(probabilities) predicted_label = self.labels[predicted_class_idx] confidence = probabilities[predicted_class_idx] # Additional analysis entropy = -np.sum(probabilities * np.log(probabilities + 1e-9)) # Adjust confidence based on entropy adjusted_confidence = confidence * (1 - entropy/np.log(len(probabilities))) # Post-processing to better distinguish between INJECTION and JAILBREAK injection_keywords = ['ignore', 'previous', 'instructions', 'don\'t', 'matter'] jailbreak_keywords = ['bypass', 'restrictions', 'override', 'security'] injection_score = sum(keyword in text.lower() for keyword in injection_keywords) / len(injection_keywords) jailbreak_score = sum(keyword in text.lower() for keyword in jailbreak_keywords) / len(jailbreak_keywords) if predicted_label in ['INJECTION', 'JAILBREAK']: if injection_score > jailbreak_score: predicted_label = 'INJECTION' elif jailbreak_score > injection_score: predicted_label = 'JAILBREAK' adjusted_confidence = max(adjusted_confidence, injection_score, jailbreak_score) return { "label": predicted_label, "score": float(adjusted_confidence), "raw_scores": {label: float(prob) for label, prob in zip(self.labels.values(), probabilities)} } def get_pipeline(): return EndpointHandler