|
import os |
|
from fastapi import FastAPI, HTTPException, UploadFile, Depends, Query |
|
from fastapi.security import APIKeyHeader |
|
import aiohttp |
|
import asyncio |
|
import json |
|
import tempfile |
|
from typing import List, Dict |
|
import logging |
|
import textract |
|
import boto3 |
|
from botocore.exceptions import NoCredentialsError |
|
from duckduckgo_search import DDGS |
|
from bs4 import BeautifulSoup |
|
|
|
app = FastAPI() |
|
|
|
|
|
logging.basicConfig(level=logging.INFO) |
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
API_KEY_NAME = "X-API-Key" |
|
api_key_header = APIKeyHeader(name=API_KEY_NAME, auto_error=False) |
|
|
|
|
|
INVIDIOUS_INSTANCES = [ |
|
"https://invidious.privacydev.net", |
|
"https://invidious.reallyaweso.me", |
|
"https://invidious.adminforge.de" |
|
] |
|
API_KEY = os.environ.get("API_KEY") |
|
|
|
|
|
S3_ACCESS_KEY_ID = os.environ.get("S3_ACCESS_KEY_ID") |
|
S3_SECRET_ACCESS_KEY = os.environ.get("S3_SECRET_ACCESS_KEY") |
|
S3_BUCKET = os.environ.get("S3_BUCKET") |
|
S3_REGION = os.environ.get("S3_REGION") |
|
|
|
if not all([API_KEY, S3_ACCESS_KEY_ID, S3_SECRET_ACCESS_KEY, S3_BUCKET, S3_REGION]): |
|
raise ValueError("Missing required environment variables") |
|
|
|
|
|
async def search_and_get_videos(query: str, num_videos: int = 2) -> List[Dict]: |
|
for instance in INVIDIOUS_INSTANCES: |
|
url = f"{instance}/api/v1/search?q={query}&type=video" |
|
try: |
|
async with aiohttp.ClientSession() as session: |
|
async with session.get(url) as response: |
|
response.raise_for_status() |
|
search_results = await response.json() |
|
videos = [ |
|
{ |
|
"id": video.get("videoId"), |
|
"title": video.get("title"), |
|
"thumbnail": video["videoThumbnails"][0]["url"] |
|
if video.get("videoThumbnails") |
|
else "", |
|
} |
|
for video in search_results |
|
][:num_videos] |
|
return videos |
|
except aiohttp.ClientError as e: |
|
logger.error(f"Error performing video search on {instance}: {e}") |
|
logger.error("All Invidious instances failed") |
|
return [] |
|
|
|
async def get_youtube_audio(video_id: str, max_retries: int = 3) -> Dict: |
|
for instance in INVIDIOUS_INSTANCES: |
|
for attempt in range(max_retries): |
|
try: |
|
url = f"{instance}/api/v1/videos/{video_id}" |
|
|
|
async with aiohttp.ClientSession() as session: |
|
async with session.get(url) as response: |
|
response.raise_for_status() |
|
video_data = await response.json() |
|
|
|
audio_format = next((format for format in video_data.get('adaptiveFormats', []) |
|
if format.get('type', '').startswith('audio/mp4')), None) |
|
|
|
if audio_format: |
|
audio_url = audio_format.get('url') |
|
if audio_url: |
|
try: |
|
async with session.get(audio_url) as audio_response: |
|
audio_content = await audio_response.read() |
|
|
|
with tempfile.NamedTemporaryFile(delete=False, suffix='.m4a') as temp_file: |
|
temp_file.write(audio_content) |
|
temp_file_path = temp_file.name |
|
|
|
return {'success': True, 'temp_file_path': temp_file_path} |
|
except aiohttp.ServerDisconnectedError: |
|
if attempt == max_retries - 1: |
|
logger.error(f"Max retries reached for video ID {video_id} on {instance}") |
|
break |
|
await asyncio.sleep(1 * (attempt + 1)) |
|
continue |
|
|
|
logger.warning(f"No suitable audio format found for video ID {video_id} on {instance}") |
|
break |
|
except aiohttp.ClientError as e: |
|
logger.error(f"Network error fetching YouTube audio for video ID {video_id} on {instance}: {e}") |
|
except json.JSONDecodeError: |
|
logger.error(f"Error decoding JSON response for video ID {video_id} on {instance}") |
|
except Exception as e: |
|
logger.error(f"Unexpected error fetching YouTube audio for video ID {video_id} on {instance}: {e}") |
|
if attempt == max_retries - 1: |
|
break |
|
await asyncio.sleep(1 * (attempt + 1)) |
|
|
|
return {'success': False, 'error': "Failed to fetch audio after multiple attempts on all instances"} |
|
|
|
def extract_text_from_document(file: UploadFile) -> dict: |
|
try: |
|
with tempfile.NamedTemporaryFile(delete=False, suffix=os.path.splitext(file.filename)[1]) as temp_file: |
|
content = file.file.read() |
|
temp_file.write(content) |
|
temp_file_path = temp_file.name |
|
|
|
text = textract.process(temp_file_path).decode('utf-8') |
|
|
|
os.unlink(temp_file_path) |
|
|
|
return { |
|
'success': True, |
|
'extracted_text': text |
|
} |
|
except Exception as e: |
|
return { |
|
'success': False, |
|
'error': f"Error extracting text from document: {str(e)}" |
|
} |
|
|
|
def upload_to_s3(local_file, s3_file): |
|
s3_client = boto3.client( |
|
"s3", |
|
aws_access_key_id=S3_ACCESS_KEY_ID, |
|
aws_secret_access_key=S3_SECRET_ACCESS_KEY, |
|
region_name=S3_REGION, |
|
) |
|
|
|
try: |
|
s3_client.upload_file(local_file, S3_BUCKET, s3_file) |
|
s3_url = f"https://{S3_BUCKET}.s3.{S3_REGION}.amazonaws.com/{s3_file}" |
|
return s3_url |
|
except NoCredentialsError: |
|
logger.error("Credentials not available") |
|
return None |
|
|
|
def image_search(query: str, num_results: int = 5) -> dict: |
|
try: |
|
with DDGS( |
|
headers={'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'} |
|
) as ddgs: |
|
results = list(ddgs.images(query, max_results=num_results)) |
|
formatted_results = [ |
|
{ |
|
'title': result['title'], |
|
'image_url': result['image'], |
|
'thumbnail_url': result['thumbnail'], |
|
'source_url': result['url'], |
|
'width': result['width'], |
|
'height': result['height'] |
|
} |
|
for result in results |
|
] |
|
return { |
|
'success': True, |
|
'results': formatted_results |
|
} |
|
except Exception as e: |
|
logger.error(f"Error performing image search: {e}") |
|
return { |
|
'success': False, |
|
'error': f"Error performing image search: {str(e)}" |
|
} |
|
|
|
async def verify_api_key(api_key: str = Depends(api_key_header)): |
|
if api_key != API_KEY: |
|
raise HTTPException(status_code=401, detail="Invalid API Key") |
|
return api_key |
|
|
|
@app.get("/search-videos/") |
|
async def search_videos( |
|
query: str, |
|
num_videos: int = Query(default=2, ge=1, le=10), |
|
api_key: str = Depends(verify_api_key) |
|
): |
|
videos = await search_and_get_videos(query, num_videos) |
|
if not videos: |
|
raise HTTPException(status_code=404, detail="No videos found or an error occurred during the search.") |
|
return {"videos": videos} |
|
|
|
@app.get("/get-audio/{video_id}") |
|
async def get_audio(video_id: str, api_key: str = Depends(verify_api_key)): |
|
result = await get_youtube_audio(video_id) |
|
if not result['success']: |
|
raise HTTPException(status_code=404, detail=result['error']) |
|
|
|
s3_file_name = f"{video_id}.m4a" |
|
s3_url = upload_to_s3(result['temp_file_path'], s3_file_name) |
|
|
|
if s3_url: |
|
os.unlink(result['temp_file_path']) |
|
return {"audio_url": s3_url} |
|
else: |
|
raise HTTPException(status_code=500, detail="Failed to upload audio to S3") |
|
|
|
@app.post("/extract-text/") |
|
async def extract_text(file: UploadFile, api_key: str = Depends(verify_api_key)): |
|
result = extract_text_from_document(file) |
|
if not result['success']: |
|
raise HTTPException(status_code=500, detail=result['error']) |
|
return {"extracted_text": result['extracted_text']} |
|
|
|
@app.get("/image-search/") |
|
async def image_search_endpoint(query: str, num_results: int = 5, api_key: str = Depends(verify_api_key)): |
|
result = image_search(query, num_results) |
|
if not result['success']: |
|
raise HTTPException(status_code=500, detail=result['error']) |
|
return result |
|
|
|
class DuckDuckGoSearch: |
|
async def search(self, query: str, num_results: int = 5) -> list: |
|
url = f"https://html.duckduckgo.com/html/?q={query}" |
|
headers = { |
|
"User-Agent": "Mozilla/5.0", |
|
"Referer": "https://google.com/", |
|
"Cookie": "kl=wt-wt", |
|
} |
|
|
|
async with aiohttp.ClientSession() as session: |
|
async with session.get(url, headers=headers) as response: |
|
if response.status != 200: |
|
raise Exception("Failed to fetch data from DuckDuckGo") |
|
|
|
html = await response.text() |
|
soup = BeautifulSoup(html, "html.parser") |
|
results = [] |
|
|
|
for result in soup.select(".result"): |
|
title = result.select_one(".result__title .result__a") |
|
url = result.select_one(".result__url") |
|
desc = result.select_one(".result__snippet") |
|
|
|
if title and url and desc: |
|
results.append({ |
|
"title": title.get_text(strip=True), |
|
"body": desc.get_text(strip=True), |
|
"href": f"https://{url.get_text(strip=True)}", |
|
}) |
|
|
|
if len(results) >= num_results: |
|
break |
|
|
|
return results |
|
|
|
async def web_search(query: str, num_results: int = 5) -> dict: |
|
try: |
|
results = await DuckDuckGoSearch().search(query, num_results) |
|
return { |
|
'success': True, |
|
'results': results |
|
} |
|
except Exception as e: |
|
return { |
|
'success': False, |
|
'error': str(e) |
|
} |
|
|
|
@app.get("/web-search/") |
|
async def web_search_endpoint(query: str, num_results: int = 5, api_key: str = Depends(verify_api_key)): |
|
result = await web_search(query, num_results) |
|
if not result['success']: |
|
raise HTTPException(status_code=500, detail=result['error']) |
|
return result |