Spaces:
Sleeping
Sleeping
from fastapi import FastAPI, File, UploadFile | |
from fastapi.staticfiles import StaticFiles | |
from fastapi.responses import FileResponse | |
from PIL import Image | |
import numpy as np | |
import urllib.request | |
import io | |
import os | |
from moviepyTest import test | |
from typing import * | |
from fastapi.responses import PlainTextResponse #执行其他py的plaintext返回 | |
import subprocess | |
from fastapi import BackgroundTasks | |
import time | |
app = FastAPI() | |
def inference(): | |
return "<p>Hello, World!</p>" | |
def t5(input): | |
return {"output": input} | |
def t5(): | |
result = test() | |
return {"output": result} | |
async def getOriginalMangaList(images: List[UploadFile] = File(...)): | |
for idx, image in enumerate(images): | |
img = await image.read() | |
image = Image.open(io.BytesIO(img)).convert("L").convert("RGB") | |
path_to_image = f"/manga/{idx}.jpg" | |
image.save(path_to_image) | |
return "获取图片保存成功" | |
async def delete_files(directory: str): | |
for filename in os.listdir(directory): | |
file_path = os.path.join(directory, filename) | |
if os.path.isfile(file_path): | |
os.remove(file_path) | |
return {"message": f"成功删除{directory}目录下的所有文件"} | |
async def execute_py_file(file_name: str): | |
try: | |
result = subprocess.check_output(["python", f"{file_name}.py"]).decode("utf-8") | |
return PlainTextResponse(result) | |
except subprocess.CalledProcessError as e: | |
return PlainTextResponse(f"Error executing {file_name}.py: {e}") | |
def someTask(): | |
time.sleep(20) | |
print("睡眠20s结束") | |
def returnRandomSubscribeUrl(background_tasks: BackgroundTasks)->str: | |
#返回 | |
result = "先返回" | |
background_tasks.add_task(someTask) | |
return result | |