File size: 2,749 Bytes
f311c70
 
180d0f0
f311c70
 
 
180d0f0
c48a85b
 
f311c70
 
 
 
180d0f0
c48a85b
1b297c3
 
c48a85b
180d0f0
f311c70
60fbaa9
 
 
 
180d0f0
f311c70
60fbaa9
 
 
 
 
 
 
 
 
 
 
 
b2eddc3
 
 
60fbaa9
b2eddc3
 
 
 
 
 
 
 
 
 
 
 
 
 
60fbaa9
 
 
 
 
 
c48a85b
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
import os
import joblib
import torch
import numpy as np
from transformers import DistilBertTokenizerFast, DistilBertForSequenceClassification
import torch.nn.functional as F

class EndpointHandler:
    def __init__(self, model_dir):
        self.model = DistilBertForSequenceClassification.from_pretrained(model_dir)
        self.tokenizer = DistilBertTokenizerFast.from_pretrained(model_dir)
        self.label_mapping = joblib.load(os.path.join(model_dir, "label_mapping.joblib"))
        self.labels = {v: k for k, v in self.label_mapping.items()}

    def __call__(self, inputs):
        if isinstance(inputs, dict) and 'inputs' in inputs:
            return self.predict(inputs['inputs'])
        return self.predict(inputs)

    def predict(self, text):
        # Tokenize and encode the input
        encoded_input = self.tokenizer(text, return_tensors='pt', truncation=True, max_length=512)
        
        # Get model prediction
        with torch.no_grad():
            outputs = self.model(**encoded_input)
            logits = outputs.logits
        
        # Get probabilities
        probabilities = F.softmax(logits, dim=-1).squeeze().numpy()
        
        # Get predicted class and confidence
        predicted_class_idx = np.argmax(probabilities)
        predicted_label = self.labels[predicted_class_idx]
        confidence = probabilities[predicted_class_idx]

        # Additional analysis
        entropy = -np.sum(probabilities * np.log(probabilities + 1e-9))
        
        # Adjust confidence based on entropy
        adjusted_confidence = confidence * (1 - entropy/np.log(len(probabilities)))

        # Post-processing to better distinguish between INJECTION and JAILBREAK
        injection_keywords = ['ignore', 'previous', 'instructions', 'don\'t', 'matter']
        jailbreak_keywords = ['bypass', 'restrictions', 'override', 'security']
        
        injection_score = sum(keyword in text.lower() for keyword in injection_keywords) / len(injection_keywords)
        jailbreak_score = sum(keyword in text.lower() for keyword in jailbreak_keywords) / len(jailbreak_keywords)
        
        if predicted_label in ['INJECTION', 'JAILBREAK']:
            if injection_score > jailbreak_score:
                predicted_label = 'INJECTION'
            elif jailbreak_score > injection_score:
                predicted_label = 'JAILBREAK'
            
            adjusted_confidence = max(adjusted_confidence, injection_score, jailbreak_score)

        return {
            "label": predicted_label,
            "score": float(adjusted_confidence),
            "raw_scores": {label: float(prob) for label, prob in zip(self.labels.values(), probabilities)}
        }

def get_pipeline():
    return EndpointHandler