In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
!sudo apt update && sudo apt install ffmpeg

In [30]:
!git clone https://github.com/k-sashank/ml-nlgma-body-cam.git

Cloning into 'ml-nlgma-body-cam'...
remote: Enumerating objects: 395, done.[K
remote: Counting objects: 100% (292/292), done.[K
remote: Compressing objects: 100% (239/239), done.[K
remote: Total 395 (delta 77), reused 224 (delta 47), pack-reused 103[K
Receiving objects: 100% (395/395), 9.15 MiB | 6.64 MiB/s, done.
Resolving deltas: 100% (117/117), done.


In [34]:
cd ml-nlgma-body-cam/deployment/

/content/ml-nlgma-body-cam/deployment


In [29]:
!pip install -r requirements.txt



In [8]:
import whisper
import cv2
import os
import urllib.request
from PIL import Image
from ultralytics import YOLO
import torch
import matplotlib.pyplot as plt
from tqdm import tqdm
from transformers import pipeline
import moviepy.editor as mp
import json
import re
import gradio as gr
from openai import OpenAI

# Full

In [16]:
def video_transcription(video_path):
  try:
    model = whisper.load_model('large-v3')
    transcript = model.transcribe(video_path, verbose = True)

    # JSON Dump (Find a way to not create a file and just dump into a variable or something)
    #json_file_path = video_path.split('/')[-1][:-4]+ ".json"
    #with open(json_file_path, 'w') as json_file:
    #json.dump(transcript, json_file, indent = 2)
    #json_file_path = video_path.split('/')[-1][:-4]+ ".json"
    #with open(json_file_path, 'w') as json_file:
    return json.dump(transcript)

  except Exception as e:
    return e

def action_detection(transcript, openai_key):
  try:
    # JSON Dump (Find a way to not create a file and just dump into a variable or something)
    #with open(json_path, 'r') as f:
    #  transcript = json.load(f)
    transcript_string = ''
    for segments in transcript['segments']:
      transcript_string+=str(segments['id'])+str(segments['text']+'\n')

    client = OpenAI(api_key = openai_key)

    completion = client.chat.completions.create(
      model="gpt-3.5-turbo-1106",
      messages=[
        {"role": "system", "content": f"Given this {transcript_string} You are an AI system specialized in detecting planning issues, critiquing plans, and analyzing conversations between people regarding how to disperse. Additionally, identify any instances suggesting 1st Amendment violations or officers expressing the belief that this protest was anti-police. Finally, flag any aggressive comments found in the audio transcript."},
        {"role": "user", "content":"Give responce like this following examples: Sentence: '18: What do you got?' Explanation: This sentence may indicate confusion or a need for clarification, as the speaker is asking for information. It could potentially be a planning issue if the speaker is seeking information to execute a specific task."}
      ]
    )

    output = completion.choices[0].message.content

    paragraphs = re.split(r'\n\n', output)

    sentences = []
    explanations = []

    for paragraph in paragraphs:
        sentence_match = re.search(r"Sentence: '(.+)'", paragraph)
        explanation_match = re.search(r"Explanation: (.+)", paragraph)

        if sentence_match and explanation_match:
            sentences.append(sentence_match.group(1).split(': ')[-1])
            explanations.append(explanation_match.group(1))

    #for i in range(len(sentences)):
    #    print(f"Sentence: '{sentences[i]}'")
    #    print(f"Explanation: {explanations[i]}\n")

    for sentence_to_search in sentences:
        pattern = re.compile(re.escape(sentence_to_search), re.IGNORECASE)

        matching_entries = [entry for entry in transcript['segments'] if re.search(pattern, entry['text'])]

        sent_with_time = []
        if matching_entries:
            for entry in matching_entries:
                sent_with_time.append(sentence_to_search + ' Start Time: ', entry['start'] + ' End Time: ', entry['end'])

    return sent_with_time

  except Exception as e:
    return e

def process_video(video_path, weights):
    try:
        # This code cell detects batons in the video
        current_frame = 0
        model = YOLO(weights)
        cap = cv2.VideoCapture(video_path)
        fps = int(cap.get(cv2.CAP_PROP_FPS))
        conseq_frames = 0
        start_time = ""
        end_time = ""
        res = []

        while True:
            ret, frame = cap.read()
            if not ret:
                break

            # Detecting baton on one frame per second
            if current_frame % fps == 0:
                currect_sec = current_frame/fps

                # Model prediction on current frame
                results = model(frame, verbose = False)
                count = 0
                classes = results[0].boxes.data

                # Formatting the time for printing
                hours, remainder = divmod(currect_sec, 3600)
                minutes, seconds = divmod(remainder, 60)
                hours = str(int(hours)).zfill(2)
                minutes = str(int(minutes)).zfill(2)
                seconds = str(int(seconds)).zfill(2)

                for i in classes:

                   # Checking if baton is detected (i.e. if the class corresponding to baton is 1 or not)
                    if float(i[5]) == 1:
                        count+=1

                # Marking the start_time if this is the first consecutive frame a baton is detected in
                if count >= 1:
                    conseq_frames+=1
                    if conseq_frames == 1:
                        start_time = hours + ":" + minutes + ":" + seconds

                # Marking the end time if after one or multiple consecutive frames of detection, a baton is not detected
                else:
                    if conseq_frames > 0:
                        conseq_frames = 0
                        end_time = hours + ":" + minutes + ":" + seconds

                        # Printing time intervals in which baton was detected
                        res.append(start_time + " to " + end_time)
                        start_time = ""
                        end_time = ""

            current_frame += 1
        cap.release()

        return "\n".join(res)

    except Exception as e:

        return e

def all_funcs(openai_key,video_path, yolo_weights):
  transcript = video_transcription(video_path)
  sentences = action_detection(json.loads(transcript), openai_key)
  batons = process_video(video_path, yolo_weights)

  return sentences, batons

In [24]:
btn = gr.Interface(
    fn=all_funcs,
    inputs=["text",gr.Files(label="Select Video File"), gr.Files(label="Select YOLOv8 Weights File")],
    outputs=[gr.Textbox(label="Audio analysis time stamps",lines=20), gr.Textbox(label="Baton detection timestamps",lines=20)]
)

btn.launch()


Setting queue=True in a Colab notebook requires sharing enabled. Setting `share=True` (you can turn this off by setting `share=False` in `launch()` explicitly).

Colab notebook detected. To show errors in colab notebook, set debug=True in launch()
Running on public URL: https://868bba52379b375588.gradio.live

This share link expires in 72 hours. For free permanent hosting and GPU upgrades, run `gradio deploy` from Terminal to deploy to Spaces (https://huggingface.co/spaces)




# Baton Detection

In [None]:
def process_video(video_path, weights):
    try:
        # This code cell detects batons in the video
        current_frame = 0
        model = YOLO(weights)
        cap = cv2.VideoCapture(video_path)
        fps = int(cap.get(cv2.CAP_PROP_FPS))
        conseq_frames = 0
        start_time = ""
        end_time = ""
        res = []

        while True:
            ret, frame = cap.read()
            if not ret:
                break

            # Detecting baton on one frame per second
            if current_frame % fps == 0:
                currect_sec = current_frame/fps

                # Model prediction on current frame
                results = model(frame, verbose = False)
                count = 0
                classes = results[0].boxes.data

                # Formatting the time for printing
                hours, remainder = divmod(currect_sec, 3600)
                minutes, seconds = divmod(remainder, 60)
                hours = str(int(hours)).zfill(2)
                minutes = str(int(minutes)).zfill(2)
                seconds = str(int(seconds)).zfill(2)

                for i in classes:

                   # Checking if baton is detected (i.e. if the class corresponding to baton is 1 or not)
                    if float(i[5]) == 1:
                        count+=1

                # Marking the start_time if this is the first consecutive frame a baton is detected in
                if count >= 1:
                    conseq_frames+=1
                    if conseq_frames == 1:
                        start_time = hours + ":" + minutes + ":" + seconds

                # Marking the end time if after one or multiple consecutive frames of detection, a baton is not detected
                else:
                    if conseq_frames > 0:
                        conseq_frames = 0
                        end_time = hours + ":" + minutes + ":" + seconds

                        # Printing time intervals in which baton was detected
                        res.append(start_time + " to " + end_time)
                        start_time = ""
                        end_time = ""

            current_frame += 1
        cap.release()

        return "\n".join(res)

    except Exception as e:

        return e

In [None]:
with gr.Blocks() as demo:

    video_path = gr.Textbox(label = "Enter Path to Video")
    #openai_keys = gr.Textbox(label = "Enter your OpenAI Key")
    weights = gr.Textbox(label = "Enter Path to YOLOv8 Weights")
    #sentences = gr.Textbox(label = "Sentences Detected")
    batons = gr.Textbox(label = "Batons Detected")
    btn = gr.Button(value = "Process Video")
    btn.click(process_video, inputs = [video_path, weights], outputs = batons)

demo.launch()

Setting queue=True in a Colab notebook requires sharing enabled. Setting `share=True` (you can turn this off by setting `share=False` in `launch()` explicitly).

Colab notebook detected. To show errors in colab notebook, set debug=True in launch()
Running on public URL: https://7b4faa7028f75417d8.gradio.live

This share link expires in 72 hours. For free permanent hosting and GPU upgrades, run `gradio deploy` from Terminal to deploy to Spaces (https://huggingface.co/spaces)




In [None]:
/content/drive/MyDrive/Spark Project/Test_Video.mp4

In [None]:
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

In [None]:
/content/drive/MyDrive/Spark Project/Data (For YOLOv8 Training)/Option 3 - Roboflow (60 Images)/YOLOv8 Best Weights.pt

In [None]:
process_video("/content/drive/MyDrive/Spark Project/Test_Video.mp4", "/content/drive/MyDrive/Spark Project/Data (For YOLOv8 Training)/Option 3 - Roboflow (60 Images)/YOLOv8 Best Weights.pt")

In [None]:
a = video_transcription("/content/drive/MyDrive/Spark Project/Test_Video.mp4")
a

In [None]:
!pip install ultralytics

Collecting ultralytics
  Downloading ultralytics-8.0.214-py3-none-any.whl (645 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m645.5/645.5 kB[0m [31m6.3 MB/s[0m eta [36m0:00:00[0m
Collecting thop>=0.1.1 (from ultralytics)
  Downloading thop-0.1.1.post2209072238-py3-none-any.whl (15 kB)
Installing collected packages: thop, ultralytics
Successfully installed thop-0.1.1.post2209072238 ultralytics-8.0.214
