import cv2 import numpy as np import requests import base64 import time import websockets import asyncio import cv2 import json import os import imutils class CaesarSendWeb: @classmethod def send_video_https(self,uri = "http://192.168.0.10:7860/caesarobjectdetect"): cap = cv2.VideoCapture(0) while True: _, image = cap.read() #uri = "http://palondomus-caesarai.hf.space/caesarobjectdetect" response = requests.post(uri,json={"frame":base64.b64encode(image).decode(),"shape":[image.shape[0],image.shape[1]]}) valresp = response.json() imagebase64 = np.array(valresp["frame"]) image = np.frombuffer(base64.b64decode(imagebase64),dtype="uint8").reshape(valresp["shape"][0],valresp["shape"][1],3) cv2.imshow("image", image) if ord("q") == cv2.waitKey(1): break cap.release() cv2.destroyAllWindows() @classmethod def send_video_websocket(self,uri = 'wss://palondomus-caesarai.hf.space/caesarobjectdetectws',jsondata=None,phone=False): if phone == False: camera = cv2.VideoCapture(0) async def main(): # Connect to the server #uri = 'wss://palondomus-caesarai.hf.space/sendvideows' #uri = 'wss://palondomus-caesarai.hf.space/caesarobjectdetectws' async with websockets.connect(uri) as ws: while True: if phone == False: success, frame = camera.read() elif phone == True: img_resp = requests.get("http://192.168.0.14:8080/shot.jpg") frame_arr = np.array(bytearray(img_resp.content),dtype=np.uint8) frame = cv2.imdecode(frame_arr,-1) frame = imutils.resize(frame,width=500,height=600) success = True #print(frame.shape) #1080, 1920, 3) #print(success) #print(frame.shape) if not success: break else: #print("hi") ret, buffer = cv2.imencode('.png', frame) await ws.send(buffer.tobytes()) if jsondata: await ws.send(json.dumps(jsondata)) contents = await ws.recv() if type(contents) == bytes: arr = np.frombuffer(contents, np.uint8) frameobj = cv2.imdecode(arr, cv2.IMREAD_UNCHANGED) cv2.imshow('frame',frameobj) cv2.waitKey(1) if type(contents) == str: print(json.loads(contents)) asyncio.run(main()) @classmethod def send_image_recieve_text(self,uri ="http://192.168.0.10:7860/caesarocr",showimage=False): cap = cv2.VideoCapture(0) _, image = cap.read() #uri = "http://palondomus-caesarai.hf.space/caesarobjectdetect" response = requests.post(uri,json={"frame":base64.b64encode(image).decode(),"shape":[image.shape[0],image.shape[1]]}) messageresp = response.json() if showimage == True: cv2.imshow('frame',image) cv2.waitKey(0) # closing all open windows cv2.destroyAllWindows() return messageresp @classmethod def send_image_recieve_image(self,uri ="http://192.168.0.10:7860/caesarfacesnap",showimage=False,saveimage=False): cap = cv2.VideoCapture(0) _, image = cap.read() #uri = "http://palondomus-caesarai.hf.space/caesarobjectdetect" response = requests.post(uri,json={"frame":base64.b64encode(image).decode(),"shape":[image.shape[0],image.shape[1]]}) valresp = response.json() if valresp["frame"] == "no face was detected.": print("No face was detected.") if valresp["frame"] != "no face was detected.": imagebase64 = np.array(valresp["frame"]) image = np.frombuffer(base64.b64decode(imagebase64),dtype="uint8").reshape(valresp["shape"][0],valresp["shape"][1],3) if showimage == True: cv2.imshow('frame',image) cv2.waitKey(0) # closing all open windows cv2.destroyAllWindows() if saveimage == True: filename = "croppedimage.png" count = 0 while filename in os.listdir("CaesarFaceDetection/FaceDataPoints/"): filename = f"{filename.replace('.png','').replace(f'{count-1}','')}{count}.png" count +=1 cv2.imwrite(f"CaesarFaceDetection/FaceDataPoints/{filename}", image) return image if __name__ == "__main__": #success, frame = camera.read() #yolo_uri = 'wss://palondomus-caesarai.hf.space/caesarobjectdetectws' #CaesarSendWeb.send_video_websocket(uri = yolo_uri) #video_uri = 'wss://palondomus-caesarai.hf.space/sendvideows' #CaesarSendWeb.send_video_websocket(uri = video_uri) #extraction_uri = "wss://palondomus-caesarai.hf.space/caesarocrextractionws" #CaesarSendWeb.send_video_websocket(uri = extraction_uri,jsondata={"target_words":["your","brain","power"]}) #ocrws_uri = "wss://palondomus-caesarai.hf.space/caesarocrws" #CaesarSendWeb.send_video_websocket(uri = ocrws_uri) #ocr_uri = "http://192.168.0.10:7860/caesarocr" #message = CaesarSendWeb.send_image_recieve_text(uri = ocr_uri,showimage=True) #print(message) facedetect_uri = "wss://palondomus-caesarai.hf.space/caesarfacedetectws"#"wss://palondomus-caesarai.hf.space/caesarfacedetectws" CaesarSendWeb.send_video_websocket(uri = facedetect_uri,phone=True) #facesnap_uri = "http://192.168.0.10:7860/caesarfacesnap" #cropped_image = CaesarSendWeb.send_image_recieve_image(uri = facesnap_uri,saveimage=True) #print(cropped_image) #https_uri = 'http://192.168.0.10:7860/caesarobjectdetect' #CaesarSendWeb.send_video_https(uri = https_uri)