CaesarAI / sendwebphone.py
CaesarCloudSync
CaesarAI Deployed
9d3162f
raw
history blame
6.36 kB
import cv2
import numpy as np
import requests
import base64
import time
import websockets
import asyncio
import cv2
import json
import os
import imutils
class CaesarSendWeb:
@classmethod
def send_video_https(self,uri = "http://192.168.0.10:7860/caesarobjectdetect"):
cap = cv2.VideoCapture(0)
while True:
_, image = cap.read()
#uri = "http://palondomus-caesarai.hf.space/caesarobjectdetect"
response = requests.post(uri,json={"frame":base64.b64encode(image).decode(),"shape":[image.shape[0],image.shape[1]]})
valresp = response.json()
imagebase64 = np.array(valresp["frame"])
image = np.frombuffer(base64.b64decode(imagebase64),dtype="uint8").reshape(valresp["shape"][0],valresp["shape"][1],3)
cv2.imshow("image", image)
if ord("q") == cv2.waitKey(1):
break
cap.release()
cv2.destroyAllWindows()
@classmethod
def send_video_websocket(self,uri = 'wss://palondomus-caesarai.hf.space/caesarobjectdetectws',jsondata=None,phone=False):
if phone == False:
camera = cv2.VideoCapture(0)
async def main():
# Connect to the server
#uri = 'wss://palondomus-caesarai.hf.space/sendvideows'
#uri = 'wss://palondomus-caesarai.hf.space/caesarobjectdetectws'
async with websockets.connect(uri) as ws:
while True:
if phone == False:
success, frame = camera.read()
elif phone == True:
img_resp = requests.get("http://192.168.0.14:8080/shot.jpg")
frame_arr = np.array(bytearray(img_resp.content),dtype=np.uint8)
frame = cv2.imdecode(frame_arr,-1)
frame = imutils.resize(frame,width=500,height=600)
success = True
#print(frame.shape)
#1080, 1920, 3)
#print(success)
#print(frame.shape)
if not success:
break
else:
#print("hi")
ret, buffer = cv2.imencode('.png', frame)
await ws.send(buffer.tobytes())
if jsondata:
await ws.send(json.dumps(jsondata))
contents = await ws.recv()
if type(contents) == bytes:
arr = np.frombuffer(contents, np.uint8)
frameobj = cv2.imdecode(arr, cv2.IMREAD_UNCHANGED)
cv2.imshow('frame',frameobj)
cv2.waitKey(1)
if type(contents) == str:
print(json.loads(contents))
asyncio.run(main())
@classmethod
def send_image_recieve_text(self,uri ="http://192.168.0.10:7860/caesarocr",showimage=False):
cap = cv2.VideoCapture(0)
_, image = cap.read()
#uri = "http://palondomus-caesarai.hf.space/caesarobjectdetect"
response = requests.post(uri,json={"frame":base64.b64encode(image).decode(),"shape":[image.shape[0],image.shape[1]]})
messageresp = response.json()
if showimage == True:
cv2.imshow('frame',image)
cv2.waitKey(0)
# closing all open windows
cv2.destroyAllWindows()
return messageresp
@classmethod
def send_image_recieve_image(self,uri ="http://192.168.0.10:7860/caesarfacesnap",showimage=False,saveimage=False):
cap = cv2.VideoCapture(0)
_, image = cap.read()
#uri = "http://palondomus-caesarai.hf.space/caesarobjectdetect"
response = requests.post(uri,json={"frame":base64.b64encode(image).decode(),"shape":[image.shape[0],image.shape[1]]})
valresp = response.json()
if valresp["frame"] == "no face was detected.":
print("No face was detected.")
if valresp["frame"] != "no face was detected.":
imagebase64 = np.array(valresp["frame"])
image = np.frombuffer(base64.b64decode(imagebase64),dtype="uint8").reshape(valresp["shape"][0],valresp["shape"][1],3)
if showimage == True:
cv2.imshow('frame',image)
cv2.waitKey(0)
# closing all open windows
cv2.destroyAllWindows()
if saveimage == True:
filename = "croppedimage.png"
count = 0
while filename in os.listdir("CaesarFaceDetection/FaceDataPoints/"):
filename = f"{filename.replace('.png','').replace(f'{count-1}','')}{count}.png"
count +=1
cv2.imwrite(f"CaesarFaceDetection/FaceDataPoints/{filename}", image)
return image
if __name__ == "__main__":
#success, frame = camera.read()
#yolo_uri = 'wss://palondomus-caesarai.hf.space/caesarobjectdetectws'
#CaesarSendWeb.send_video_websocket(uri = yolo_uri)
#video_uri = 'wss://palondomus-caesarai.hf.space/sendvideows'
#CaesarSendWeb.send_video_websocket(uri = video_uri)
#extraction_uri = "wss://palondomus-caesarai.hf.space/caesarocrextractionws"
#CaesarSendWeb.send_video_websocket(uri = extraction_uri,jsondata={"target_words":["your","brain","power"]})
#ocrws_uri = "wss://palondomus-caesarai.hf.space/caesarocrws"
#CaesarSendWeb.send_video_websocket(uri = ocrws_uri)
#ocr_uri = "http://192.168.0.10:7860/caesarocr"
#message = CaesarSendWeb.send_image_recieve_text(uri = ocr_uri,showimage=True)
#print(message)
facedetect_uri = "wss://palondomus-caesarai.hf.space/caesarfacedetectws"#"wss://palondomus-caesarai.hf.space/caesarfacedetectws"
CaesarSendWeb.send_video_websocket(uri = facedetect_uri,phone=True)
#facesnap_uri = "http://192.168.0.10:7860/caesarfacesnap"
#cropped_image = CaesarSendWeb.send_image_recieve_image(uri = facesnap_uri,saveimage=True)
#print(cropped_image)
#https_uri = 'http://192.168.0.10:7860/caesarobjectdetect'
#CaesarSendWeb.send_video_https(uri = https_uri)