|
|
|
import tensorflow as tf |
|
from ultralytics import YOLO |
|
import numpy as np |
|
from sklearn.preprocessing import StandardScaler |
|
|
|
class SuspiciousActivityModel: |
|
def __init__(self, lstm_model_path, yolo_model_path): |
|
|
|
self.yolo_model = YOLO(yolo_model_path) |
|
|
|
self.lstm_model = tf.keras.models.load_model(lstm_model_path) |
|
self.scaler = StandardScaler() |
|
|
|
def extract_keypoints(self, frame): |
|
""" |
|
Extracts normalized keypoints from a frame using YOLO pose model. |
|
""" |
|
results = self.yolo_model(frame, verbose=False) |
|
for r in results: |
|
if r.keypoints is not None and len(r.keypoints) > 0: |
|
keypoints = r.keypoints.xyn.tolist()[0] |
|
flattened_keypoints = [kp for keypoint in keypoints for kp in keypoint[:2]] |
|
return flattened_keypoints |
|
return None |
|
|
|
def process_frame(self, frame): |
|
results = self.yolo_model(frame, verbose=False) |
|
|
|
for box in results[0].boxes: |
|
cls = int(box.cls[0]) |
|
confidence = float(box.conf[0]) |
|
|
|
if cls == 0 and confidence > 0.5: |
|
x1, y1, x2, y2 = map(int, box.xyxy[0]) |
|
|
|
|
|
roi = frame[y1:y2, x1:x2] |
|
if roi.size > 0: |
|
keypoints = self.extract_keypoints(roi) |
|
if keypoints is not None and len(keypoints) > 0: |
|
|
|
keypoints_scaled = self.scaler.fit_transform([keypoints]) |
|
keypoints_reshaped = keypoints_scaled.reshape((1, 1, len(keypoints))) |
|
|
|
|
|
prediction = (self.lstm_model.predict(keypoints_reshaped) > 0.5).astype(int)[0][0] |
|
|
|
|
|
return 'Suspicious' if prediction == 1 else 'Normal' |
|
return 'Normal' |
|
|
|
def detect_activity(self, frame): |
|
return self.process_frame(frame) |
|
|