Spaces:
Runtime error
Runtime error
File size: 3,151 Bytes
5c48e61 5d322d8 5c48e61 093a866 4bfa0b3 093a866 5c48e61 093a866 5c48e61 5d322d8 093a866 5c48e61 5d322d8 d8804e5 ba9fae4 5c48e61 5d322d8 d8804e5 5c48e61 eb92911 5c48e61 eb92911 5c48e61 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 |
import json
import os
from fastapi import FastAPI, Header, Request, Response, HTTPException
from fastapi.responses import JSONResponse, StreamingResponse
import traceback
from typing import Annotated, Optional
from fetchYoutubeSubtitle import fetchSubtitle, fetchSubtitleUrls, fetchSubtitleByInfo
token = os.getenv("HF_X_TOKEN")
app = FastAPI()
@app.get("/")
def read_root(request: Request):
print(request.headers)
print(request.client.host)
print(request.client.port)
return {"Hello": "World!"}
@app.get("/json")
def read_json():
return JSONResponse(content={"Hello": "World!"})
@app.get("/subtitle/")
async def get_subtitle(
url: str,
subtype: str = "srt",
lang: str = "en",
proxy: str = None,
x_token: Annotated[str | None, Header()] = None,
):
if token != x_token:
return JSONResponse({"error": "Invalid token"})
subtitle = await fetchSubtitle(url, lang=lang, subType=subtype, proxy=proxy)
return JSONResponse(content=subtitle)
@app.get("/subtitle-urls/")
async def get_subtitleUrls(
url: str, proxy: str = None, x_token: Annotated[str | None, Header()] = None
):
if token != x_token:
return JSONResponse({"error": "Invalid token"})
subtitles = await fetchSubtitleUrls(url, proxy=proxy)
return JSONResponse(content=subtitles)
def download_file(content, chunk_size):
num_chunks = (len(content) // chunk_size) + 1
for i in range(num_chunks):
start = i * chunk_size
end = (i + 1) * chunk_size
yield content[start:end]
@app.get("/subtitle-dl/")
async def download(
url: str,
fileName: str,
fileType: str,
info: str, # download info
proxy: str = None,
x_token: Annotated[str | None, Header()] = None,
request: Request = None,
):
if token != x_token:
raise HTTPException(status_code=401, detail="Invalid token")
dlInfo = None
try:
dlInfo = json.loads(info)
except Exception:
raise HTTPException(status_code=400, detail="Invalid params")
try:
# print(
# "url: {}, fileName: {}, fileType: {}, dlInfo: {}".format(
# url, fileName, fileType, dlInfo
# )
# )
subtitle = await fetchSubtitleByInfo(url, fileType, dlInfo, proxy=proxy)
excluded_headers = [
"content-encoding",
"content-length",
"transfer-encoding",
"connection",
]
headers = [
(name, value)
for (name, value) in request.headers.items()
if name.lower() not in excluded_headers
]
headers.append(("Content-Type", "text/plain"))
headers.append(
(
"Content-Disposition",
f'attachment; filename="{fileName.encode("utf-8").decode("latin-1")}.{fileType}"',
)
)
return StreamingResponse(
download_file(subtitle, 8192), headers=dict(headers), status_code=200
)
except Exception as e:
print(e)
traceback.print_exc()
raise HTTPException(status_code=500, detail="Internal Server Error")
|