saifeddinemk commited on
Commit
5aaa320
·
1 Parent(s): 79b52c8

Fixed app v2

Browse files
Files changed (1) hide show
  1. app.py +69 -73
app.py CHANGED
@@ -1,86 +1,82 @@
1
- from fastapi import FastAPI
 
 
2
  from pydantic import BaseModel
 
3
  from typing import List
4
- from transformers import pipeline
5
 
6
- # Initialize FastAPI app
7
  app = FastAPI()
8
 
 
 
 
9
 
10
- unmasker = pipeline("fill-mask", model="s2w-ai/CyBERTuned-SecurityLLM")
 
 
 
 
 
 
11
 
12
- # Define request model
13
- class LogRequest(BaseModel):
14
- log: str
15
 
16
- # Define response model
17
- class ThreatResponse(BaseModel):
18
- log: str
19
- prompt: str
20
- #threat_level_predictions: List[str]
21
- #threat_type_predictions: List[str]
22
- #detected_threat_level: str
23
- #detected_threat_type: str
24
- pred : List[object]
25
 
26
- # Function to predict masked words for threat level and type
27
- def predict_threat(log: str, unmasker, topk=5) -> List[List[str]]:
28
- # Create prompt with masked tokens for threat level and threat type
29
- prompt = f"{log} Source Ip : <mask> Dest Ip : <mask> , Threat Level : <mask> , Threat Type : <mask>"
30
-
31
- # Predict top options for each <mask>
32
- predictions = unmasker(prompt, top_k=topk)
33
-
34
- # Extract top predictions for each <mask>
35
- #threat_level_predictions = [pred["token_str"].strip() for pred in predictions[:topk]]
36
- #threat_type_predictions = [pred["token_str"].strip() for pred in predictions[topk:2*topk]]
37
-
38
- return predictions
39
- def get_maximum_predictions(data):
40
- # Initialize list to store maximum values for each prediction array
41
- max_predictions = []
42
 
43
- # Loop over each prediction array in "pred"
44
- for index, predictions in enumerate(data["pred"]):
45
- max_score = float('-inf')
46
- max_prediction = None
 
 
 
 
 
 
 
 
 
 
 
 
47
 
48
- # Find the prediction with the highest score in the current array
49
- for pred in predictions:
50
- if pred["score"] > max_score:
51
- max_score = pred["score"]
52
- max_prediction = pred["token_str"].strip()
53
 
54
- # Append the result with the max prediction for this array
55
- max_predictions.append({
56
- f"max_prediction_{index + 1}": max_prediction
57
- })
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
58
 
59
- return max_predictions
60
-
61
- # Get result
62
-
63
- # FastAPI endpoint for detecting threat level and type
64
- @app.post("/detect_threat", response_model=ThreatResponse)
65
- async def detect_threat(log_request: LogRequest):
66
- log = log_request.log
67
-
68
- # Predict the threat level and type for the given log entry
69
- predictions = predict_threat(log, unmasker)
70
-
71
- # Extract top predictions for threat level and type
72
- ##threat_level_predictions = predictions[0] if len(predictions) > 0 else ["Unknown"]
73
- ## threat_type_predictions = predictions[1] if len(predictions) > 1 else ["Unknown"]
74
-
75
- # Use the top prediction as the most likely threat level and type
76
- ##detected_threat_level = threat_level_predictions[0] if threat_level_predictions else "Unknown"
77
- #detected_threat_type = threat_type_predictions[0] if threat_type_predictions else "Unknown"
78
-
79
- # Prepare response
80
- response = ThreatResponse(
81
- log=log,
82
- prompt=f"{log} Source Ip : <mask> \n Dest Ip : <mask> \n , Threat Level : <mask> \n Threat Type : <mask>",
83
- pred=predictions
84
- )
85
-
86
- return response
 
1
+ import torch
2
+ import json
3
+ from fastapi import FastAPI, HTTPException
4
  from pydantic import BaseModel
5
+ from transformers import AutoModelForCausalLM, AutoTokenizer
6
  from typing import List
 
7
 
8
+ # Initialize the FastAPI app
9
  app = FastAPI()
10
 
11
+ # Model and tokenizer paths and loading
12
+ model_path = "WhiteRabbitNeo/WhiteRabbitNeo-2.5-Qwen-2.5-Coder-7B"
13
+ output_file_path = "/home/user/conversations.jsonl"
14
 
15
+ model = AutoModelForCausalLM.from_pretrained(
16
+ model_path,
17
+ torch_dtype=torch.float16,
18
+ device_map="auto",
19
+ load_in_4bit=False,
20
+ trust_remote_code=False,
21
+ )
22
 
23
+ tokenizer = AutoTokenizer.from_pretrained(model_path, trust_remote_code=True)
 
 
24
 
25
+ # Function to generate text
26
+ def generate_text(instruction):
27
+ tokens = tokenizer.encode(instruction)
28
+ tokens = torch.LongTensor(tokens).unsqueeze(0)
29
+ tokens = tokens.to("cuda")
 
 
 
 
30
 
31
+ instance = {
32
+ "input_ids": tokens,
33
+ "top_p": 1.0,
34
+ "temperature": 0.75,
35
+ "generate_len": 2048,
36
+ "top_k": 50,
37
+ }
 
 
 
 
 
 
 
 
 
38
 
39
+ length = len(tokens[0])
40
+ with torch.no_grad():
41
+ rest = model.generate(
42
+ input_ids=tokens,
43
+ max_length=length + instance["generate_len"],
44
+ use_cache=True,
45
+ do_sample=True,
46
+ top_p=instance["top_p"],
47
+ temperature=instance["temperature"],
48
+ top_k=instance["top_k"],
49
+ num_return_sequences=1,
50
+ pad_token_id=tokenizer.eos_token_id,
51
+ )
52
+ output = rest[0][length:]
53
+ string = tokenizer.decode(output, skip_special_tokens=True)
54
+ return f"{string}"
55
 
56
+ # Data model for FastAPI input
57
+ class UserInput(BaseModel):
58
+ conversation: str
59
+ user_input: str
 
60
 
61
+ @app.post("/generate/")
62
+ async def generate_response(user_input: UserInput):
63
+ try:
64
+ # Construct the prompt
65
+ conversation = user_input.conversation
66
+ llm_prompt = f"{conversation}{user_input.user_input}<|im_end|>\n<|im_start|>assistant\nSure! Let me provide a complete and a thorough answer to your question, with functional and production-ready code.\n"
67
+
68
+ # Generate response
69
+ answer = generate_text(llm_prompt)
70
+
71
+ # Update conversation for future requests
72
+ updated_conversation = f"{llm_prompt}{answer}<|im_end|>\n<|im_start|>user\n"
73
+
74
+ return {
75
+ "response": answer,
76
+ "updated_conversation": updated_conversation
77
+ }
78
+ except Exception as e:
79
+ raise HTTPException(status_code=500, detail=str(e))
80
 
81
+ # Run the app
82
+ # To start the server, use the command: uvicorn filename:app --host 0.0.0.0 --port 8000