from torch._C import device import ffmpeg import youtube_dl import numpy as np from PIL import Image import requests import torch from sentence_transformers import SentenceTransformer, util, models from clip import CLIPModel # from sentence_transformers.models import CLIPModel from PIL import Image clip = CLIPModel() model = SentenceTransformer(modules=[clip]).to(dtype=torch.float32, device=torch.device('cpu')) def get_embedding(query, video): text_emb = model.encode(query, device='cpu') # Encode an image: images = [] for img in video: images.append(Image.fromarray(img)) img_embs = model.encode(images, device='cpu') return text_emb, img_embs # # Encode an image: # url = "http://images.cocodataset.org/val2017/000000039769.jpg" # img = Image.fromarray(np.array(Image.open(requests.get(url, stream=True).raw))).convert('RGB') # img_emb = model.encode([img, img], device='cpu') # # Encode text descriptions # text_emb = model.encode(['Two dogs in the snow', 'Two cats laying on a sofa', # 'A picture of London at night'], device='cpu') # # Compute cosine similarities # cos_scores = util.cos_sim(img_emb, text_emb) # print(cos_scores) def my_hook(d): if d['status'] == 'finished': print(d) print('Done downloading, now extracting frames ...') probe = ffmpeg.probe(d["filename"]) video_stream = next((stream for stream in probe['streams'] if stream['codec_type'] == 'video'), None) width = int(video_stream['width']) height = int(video_stream['height']) out, _ = ( ffmpeg .input(d["filename"]) .output('pipe:', format='rawvideo', pix_fmt='rgb24') .run(capture_stdout=True) ) video = ( np .frombuffer(out, np.uint8) .reshape([-1, height, width, 3]) )[::10] print(video.shape) txt_embd, img_embds = get_embedding("two white puppies", video) cos_scores = util.cos_sim(txt_embd, img_embds) print(cos_scores) ydl_opts = {"format": "mp4", "progress_hooks": [my_hook], } with youtube_dl.YoutubeDL(ydl_opts) as ydl: ydl.download(['https://youtu.be/I3AaW9ZevIU']) # # out, _ = ( # # ffmpeg # # .input('in.mp4') # # .output('pipe:', format='rawvideo', pix_fmt='rgb24') # # .run(capture_stdout=True) # # ) # # video = ( # # np # # .frombuffer(out, np.uint8) # # .reshape([-1, height, width, 3]) # )