funapi / routes /helpers.py
imperialwool's picture
Update routes/helpers.py
209c610
raw
history blame
4.19 kB
import os
import time
import json
import random
import string
import sqlite3
import hashlib
import requests
# SQLite3 class
class EazySQLite3():
def __init__(self, db: str = "sqlite.db"):
self.connection = sqlite3.connect(db)
#self.cursor = sqlite_connection.cursor()
def query(self, query: str = ""):
cursor = self.connection.cursor()
try:
cursor.execute(query)
self.connection.commit()
try:
record = cursor.fetchall()
cursor.close()
return {"status": True, "details": record}
except:
try: cursor.close()
except: pass
finally: return {"status": True, "details": None}
except Exception as e: return {"status": False, "details": str(e)}
def close(self):
self.connection.close()
# Deleting audio
def deleteAudio(path: str, waitInSeconds: int = 0):
config = configFile()
os.system("rm {}/{}".format(config['static-path'], path))
# Config file reading
def configFile():
with open("/app/routes/config.json", "r") as file:
config = json.loads(file.read())
return config
# Signatures check!!! Wow!!!
def checkSignature(signature: str):
config = configFile()
db = EazySQLite3(config['signatures-db'])
query = db.query(f"SELECT * FROM `table` WHERE (`key` LIKE \"{signature}\" OR `key`=\"{signature}\") AND `endtime`>{time.time()}")
#if len(query['details']) == 0: return False
for row in query['details']:
if signature == row[0]:
newTTL = round(time.time())+86400
if newTTL > row[1]: db.query(f"UPDATE `table` SET `endtime`={newTTL} WHERE `key`=\"{signature}\"")
return True
return False
# Hook for yt-dlp
def thisIsHook(d):
try:
if d['total_bytes'] > 52428800:
print("\nFILE IS BIG, ABORT!!!\n")
raise Exception(f"Too long file (recieved {d['total_bytes']/1048576}MB, max is 50MB)")
except: pass
# Get variable from request
def getFromRequest(request, thing: str):
try:
if request.method == 'POST': reqthing = request.form[thing]
else: reqthing = request.args[thing]
return reqthing
except: return None
# Recognizing things
def req(access_token, meth, path, params, **kwargs):
full_url = "https://api.wit.ai" + path
headers = {
"authorization": "Bearer " + access_token,
"accept": "application/vnd.wit." + "20221114" + "+json",
}
headers.update(kwargs.pop("headers", {}))
rsp = requests.request(meth, full_url, headers=headers, params=params, **kwargs)
if rsp.status_code > 200:
raise Exception(
str(rsp.status_code)
+ " ("
+ rsp.reason
+ ")"
)
try: r = rsp.json()
except: r = rsp.text
if "error" in r:
raise Exception(json["error"])
return r
def dictation(access_token, audio_file, headers=None, verbose=None):
params = {}
headers = headers or {}
if verbose:
params["verbose"] = True
resp = req(
access_token,
"POST",
"/dictation",
params,
data=audio_file,
headers=headers,
)
return resp
def clean(text):
if text not in ['', None]:
return text.replace('?', '').replace('!', '').replace(';', '').replace('.', '').replace(',', '')
def delivering(text):
result = []
temp = None
toPush = None
tempLength = 0
for dirtLine in text.split('\n'):
if '"text"' in dirtLine:
line = dirtLine[11:-1]
if temp == None: temp = line
else:
if temp in line: toPush = line
else:
temp = line
result.append(toPush)
return ' '.join(result)
def devRaw(text):
result = []
for line in text.split('\n'): #line[11:-1]
if '"text"' in line:
result.append(line[11:-1])
return result
def randString(len: int = 16):
return ''.join(random.SystemRandom().choice(string.ascii_uppercase + string.ascii_lowercase + string.digits) for _ in range(len))