Spaces:
Running
Running
File size: 5,322 Bytes
bffe7b3 32d37f5 bffe7b3 7c2803a 32d37f5 bffe7b3 7c2803a bffe7b3 7c2803a bffe7b3 ec0a6ec cb74f9c ec0a6ec bffe7b3 ec0a6ec cb74f9c bffe7b3 cb74f9c bffe7b3 cb74f9c 7c2803a |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 |
import logging
import os
import streamlit as st
from twilio.rest import Client
import os
import numpy as np
import hashlib
import tempfile
import os
import hashlib
from tqdm import tqdm
from zipfile import ZipFile
from urllib.request import urlopen
logger = logging.getLogger(__name__)
@st.cache_data
def get_ice_servers(name="twilio"):
"""Get ICE servers from Twilio.
Returns:
List of ICE servers.
"""
if name == "twilio":
# Ref: https://www.twilio.com/docs/stun-turn/api
try:
account_sid = os.environ["TWILIO_ACCOUNT_SID"]
auth_token = os.environ["TWILIO_AUTH_TOKEN"]
except KeyError:
logger.warning("Twilio credentials are not set. Fallback to a free STUN server from Google.")
return [{"urls": ["stun:stun.l.google.com:19302"]}]
client = Client(account_sid, auth_token)
token = client.tokens.create()
return token.ice_servers
elif name == "metered":
try:
username = os.environ["METERED_USERNAME"]
credential = os.environ["METERED_CREDENTIAL"]
except KeyError:
logger.warning("Metered credentials are not set. Fallback to a free STUN server from Google.")
return [{"urls": ["stun:stun.l.google.com:19302"]}]
ice_servers = [
{"url": "stun:a.relay.metered.ca:80", "urls": "stun:a.relay.metered.ca:80"},
{
"url": "turn:a.relay.metered.ca:80",
"username": username,
"urls": "turn:a.relay.metered.ca:80",
"credential": credential,
},
{
"url": "turn:a.relay.metered.ca:80?transport=tcp",
"username": username,
"urls": "turn:a.relay.metered.ca:80?transport=tcp",
"credential": credential,
},
{
"url": "turn:a.relay.metered.ca:443",
"username": username,
"urls": "turn:a.relay.metered.ca:443",
"credential": credential,
},
{
"url": "turn:a.relay.metered.ca:443?transport=tcp",
"username": username,
"urls": "turn:a.relay.metered.ca:443?transport=tcp",
"credential": credential,
},
]
return ice_servers
else:
raise ValueError(f"Unknown name: {name}")
# Function to format floats within a list
def format_dflist(val):
if isinstance(val, list):
return [format_dflist(num) for num in val]
if isinstance(val, np.ndarray):
return np.asarray([format_dflist(num) for num in val])
if isinstance(val, np.float32):
return f"{val:.2f}"
if isinstance(val, float):
return f"{val:.2f}"
else:
return val
def rgb(r, g, b):
return "#{:02x}{:02x}{:02x}".format(r, g, b)
def tflite_inference(model, img):
"""Inferences an image through the model with tflite interpreter on CPU
:param model: a tflite.Interpreter loaded with a model
:param img: image
:return: list of outputs of the model
"""
# Check if img is np.ndarray
if not isinstance(img, np.ndarray):
img = np.asarray(img)
# Check if dim is 4
if len(img.shape) == 3:
img = np.expand_dims(img, axis=0)
input_details = model.get_input_details()
output_details = model.get_output_details()
model.resize_tensor_input(input_details[0]["index"], img.shape)
model.allocate_tensors()
model.set_tensor(input_details[0]["index"], img.astype(np.float32))
model.invoke()
return [model.get_tensor(elem["index"]) for elem in output_details]
def get_file(origin, file_hash, is_zip=False):
tmp_file = os.path.join(tempfile.gettempdir(), "FaceIDLight", origin.split("/")[-1])
os.makedirs(os.path.dirname(tmp_file), exist_ok=True)
if not os.path.exists(tmp_file):
download = True
else:
hasher = hashlib.sha256()
with open(tmp_file, "rb") as file:
for chunk in iter(lambda: file.read(65535), b""):
hasher.update(chunk)
if not hasher.hexdigest() == file_hash:
print(
"A local file was found, but it seems to be incomplete or outdated because the file hash does not "
"match the original value of " + file_hash + " so data will be downloaded."
)
download = True
else:
download = False
if download:
response = urlopen(origin)
with tqdm.wrapattr(
open(tmp_file, "wb"),
"write",
miniters=1,
desc="Downloading " + origin.split("/")[-1] + " to: " + tmp_file,
total=getattr(response, "length", None),
) as file:
for chunk in response:
file.write(chunk)
file.close()
if is_zip:
with ZipFile(tmp_file, "r") as zipObj:
zipObj.extractall(tmp_file.split(".")[0])
tmp_file = os.path.join(tmp_file.split(".")[0])
return tmp_file
def get_hash(filepath):
hasher = hashlib.sha256()
with open(filepath, "rb") as file:
for chunk in iter(lambda: file.read(65535), b""):
hasher.update(chunk)
return hasher.hexdigest()
|