File size: 2,749 Bytes
f311c70 180d0f0 f311c70 180d0f0 c48a85b f311c70 180d0f0 c48a85b 1b297c3 c48a85b 180d0f0 f311c70 60fbaa9 180d0f0 f311c70 60fbaa9 b2eddc3 60fbaa9 b2eddc3 60fbaa9 c48a85b |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 |
import os
import joblib
import torch
import numpy as np
from transformers import DistilBertTokenizerFast, DistilBertForSequenceClassification
import torch.nn.functional as F
class EndpointHandler:
def __init__(self, model_dir):
self.model = DistilBertForSequenceClassification.from_pretrained(model_dir)
self.tokenizer = DistilBertTokenizerFast.from_pretrained(model_dir)
self.label_mapping = joblib.load(os.path.join(model_dir, "label_mapping.joblib"))
self.labels = {v: k for k, v in self.label_mapping.items()}
def __call__(self, inputs):
if isinstance(inputs, dict) and 'inputs' in inputs:
return self.predict(inputs['inputs'])
return self.predict(inputs)
def predict(self, text):
# Tokenize and encode the input
encoded_input = self.tokenizer(text, return_tensors='pt', truncation=True, max_length=512)
# Get model prediction
with torch.no_grad():
outputs = self.model(**encoded_input)
logits = outputs.logits
# Get probabilities
probabilities = F.softmax(logits, dim=-1).squeeze().numpy()
# Get predicted class and confidence
predicted_class_idx = np.argmax(probabilities)
predicted_label = self.labels[predicted_class_idx]
confidence = probabilities[predicted_class_idx]
# Additional analysis
entropy = -np.sum(probabilities * np.log(probabilities + 1e-9))
# Adjust confidence based on entropy
adjusted_confidence = confidence * (1 - entropy/np.log(len(probabilities)))
# Post-processing to better distinguish between INJECTION and JAILBREAK
injection_keywords = ['ignore', 'previous', 'instructions', 'don\'t', 'matter']
jailbreak_keywords = ['bypass', 'restrictions', 'override', 'security']
injection_score = sum(keyword in text.lower() for keyword in injection_keywords) / len(injection_keywords)
jailbreak_score = sum(keyword in text.lower() for keyword in jailbreak_keywords) / len(jailbreak_keywords)
if predicted_label in ['INJECTION', 'JAILBREAK']:
if injection_score > jailbreak_score:
predicted_label = 'INJECTION'
elif jailbreak_score > injection_score:
predicted_label = 'JAILBREAK'
adjusted_confidence = max(adjusted_confidence, injection_score, jailbreak_score)
return {
"label": predicted_label,
"score": float(adjusted_confidence),
"raw_scores": {label: float(prob) for label, prob in zip(self.labels.values(), probabilities)}
}
def get_pipeline():
return EndpointHandler |