Spaces:
Sleeping
Sleeping
from flask import ( | |
Flask, | |
json, | |
render_template, | |
request, | |
jsonify, | |
redirect, | |
url_for, | |
send_file, | |
Response | |
) | |
from io import BytesIO | |
import urllib.parse | |
from functools import wraps | |
import requests | |
import hashlib | |
import os | |
from config import Config | |
from models import Database | |
from utils import get_file_type, format_file_size | |
from huggingface_hub import HfApi | |
# 初始化 HuggingFace API | |
api = HfApi(token=Config.HF_TOKEN) | |
app = Flask(__name__) | |
app.config['SECRET_KEY'] = Config.SECRET_KEY | |
db = Database() | |
def require_auth(f): | |
def decorated(*args, **kwargs): | |
if not Config.REQUIRE_LOGIN: | |
return f(*args, **kwargs) | |
if not request.cookies.get('authenticated'): | |
if request.is_json: | |
return jsonify({'error': 'Unauthorized'}), 401 | |
return redirect(url_for('login')) | |
return f(*args, **kwargs) | |
return decorated | |
def login(): | |
if request.method == 'POST': | |
if request.form.get('password') == Config.ACCESS_PASSWORD: | |
response = jsonify({'success': True}) | |
response.set_cookie('authenticated', 'true', secure=True, httponly=True) | |
return response | |
return jsonify({'error': 'Invalid password'}), 401 | |
return render_template('login.html') | |
def logout(): | |
response = redirect(url_for('login')) | |
response.delete_cookie('authenticated') | |
return response | |
def index(): | |
return render_template('index.html') | |
def list_files(directory=''): | |
try: | |
url = f"https://huggingface.co/api/datasets/{Config.HF_DATASET_ID}/tree/{Config.HF_BRANCH}" | |
if directory: | |
url = f"{url}/{directory}" | |
response = requests.get( | |
url, | |
headers={'Authorization': f'Bearer {Config.HF_TOKEN}'} | |
) | |
if not response.ok: | |
return jsonify({'error': 'Failed to fetch files', 'details': response.text}), response.status_code | |
files = response.json() | |
for file in files: | |
if file['type'] == 'file': | |
file['file_type'] = get_file_type(file['path']) | |
file['size_formatted'] = format_file_size(file['size']) | |
# 添加预览和下载URL | |
file['preview_url'] = f"/api/files/preview/{file['path']}" | |
file['download_url'] = f"/api/files/download/{file['path']}" | |
return jsonify(files) | |
except Exception as e: | |
return jsonify({'error': str(e)}), 500 | |
def preview_file(filepath): | |
try: | |
file_type = get_file_type(filepath) | |
if file_type not in ['image', 'video', 'document']: | |
return jsonify({'error': 'File type not supported for preview'}), 400 | |
url = f"https://{Config.PROXY_DOMAIN}/datasets/{Config.HF_DATASET_ID}/resolve/{Config.HF_BRANCH}/{filepath}" | |
response = requests.get( | |
url, | |
headers={'Authorization': f'Bearer {Config.HF_TOKEN}'}, | |
stream=True | |
) | |
if response.ok: | |
def generate(): | |
for chunk in response.iter_content(chunk_size=8192): | |
yield chunk | |
return Response( | |
generate(), | |
mimetype=response.headers.get('content-type', 'application/octet-stream'), | |
direct_passthrough=True | |
) | |
return jsonify({'error': 'Failed to fetch file'}), response.status_code | |
except Exception as e: | |
return jsonify({'error': str(e)}), 500 | |
def download_file(filepath): | |
try: | |
url = f"https://{Config.PROXY_DOMAIN}/datasets/{Config.HF_DATASET_ID}/resolve/{Config.HF_BRANCH}/{filepath}" | |
# 先发送 HEAD 请求获取文件信息 | |
head_response = requests.head( | |
url, | |
headers={'Authorization': f'Bearer {Config.HF_TOKEN}'}, | |
allow_redirects=True | |
) | |
if head_response.ok: | |
# 获取文件基本信息 | |
content_type = head_response.headers.get('content-type', 'application/octet-stream') | |
content_length = head_response.headers.get('content-length') | |
last_modified = head_response.headers.get('last-modified') | |
etag = head_response.headers.get('etag') | |
# 如果是txt文件但没有指定字符集,设置为text/plain | |
if filepath.lower().endswith('.txt') and 'charset' not in content_type: | |
content_type = 'text/plain' | |
# 获取文件内容 | |
response = requests.get( | |
url, | |
headers={'Authorization': f'Bearer {Config.HF_TOKEN}'}, | |
stream=True | |
) | |
if response.ok: | |
filename = os.path.basename(filepath) | |
encoded_filename = urllib.parse.quote(filename.encode('utf-8')) | |
headers = { | |
'Content-Disposition': f'attachment; filename*=UTF-8\'\'{encoded_filename}', | |
'Content-Type': content_type, | |
'Content-Length': content_length, | |
'Accept-Ranges': 'bytes', | |
'Cache-Control': 'no-cache', | |
'Last-Modified': last_modified, | |
'ETag': etag | |
} | |
# 移除为None的header | |
headers = {k: v for k, v in headers.items() if v is not None} | |
return Response( | |
response.iter_content(chunk_size=1048576), | |
headers=headers | |
) | |
return jsonify({'error': 'File not found'}), 404 | |
except Exception as e: | |
print(f"Download error for {filepath}: {str(e)}") | |
return jsonify({'error': str(e)}), 500 | |
def upload_file(): | |
if 'file' not in request.files: | |
return jsonify({'error': 'No file provided'}), 400 | |
file = request.files['file'] | |
current_path = request.form.get('path', '').strip('/') | |
try: | |
file_content = file.read() | |
file.seek(0) | |
original_name = file.filename | |
stored_name = original_name | |
full_path = os.path.join(current_path, stored_name).replace("\\", "/") | |
response = api.upload_file( | |
path_or_fileobj=file_content, | |
path_in_repo=full_path, | |
repo_id=Config.HF_DATASET_ID, | |
repo_type="dataset", | |
token=Config.HF_TOKEN | |
) | |
if response: | |
with db.conn.cursor() as cursor: | |
cursor.execute(""" | |
INSERT INTO files ( | |
original_name, stored_name, file_path, | |
file_type, file_size | |
) VALUES (%s, %s, %s, %s, %s) | |
""", ( | |
original_name, | |
stored_name, | |
full_path, | |
get_file_type(original_name), | |
len(file_content) | |
)) | |
db.conn.commit() | |
return jsonify({'success': True}) | |
return jsonify({'error': 'Upload failed'}), 500 | |
except Exception as e: | |
return jsonify({'error': str(e)}), 500 | |
def search_files(): | |
keyword = request.args.get('keyword', '') | |
if not keyword: | |
return jsonify([]) | |
try: | |
files = db.search_files(keyword) | |
return jsonify([{ | |
'type': 'file', | |
'path': f['file_path'], | |
'file_type': get_file_type(f['file_path']), | |
'size': f['file_size'], | |
'size_formatted': format_file_size(f['file_size']), | |
'preview_url': f'/api/files/preview/{f["file_path"]}', | |
'download_url': f'/api/files/download/{f["file_path"]}' | |
} for f in files]) | |
except Exception as e: | |
return jsonify({'error': str(e)}), 500 | |
def delete_file(filepath): | |
try: | |
# Initialize HuggingFace API | |
api = HfApi(token=Config.HF_TOKEN) | |
# Delete file from HuggingFace Hub | |
api.delete_file( | |
path_in_repo=filepath, | |
repo_id=Config.HF_DATASET_ID, | |
repo_type="dataset" | |
) | |
# Delete file record from database | |
with db.conn.cursor() as cursor: | |
cursor.execute( | |
"DELETE FROM files WHERE file_path = %s", | |
[filepath] | |
) | |
db.conn.commit() | |
return jsonify({'success': True}) | |
except Exception as e: | |
return jsonify({'error': str(e)}), 500 | |
def create_folder(): | |
try: | |
data = request.json | |
path = data.get('path', '/') | |
name = data.get('name') | |
if not name: | |
return jsonify({'error': 'Folder name is required'}), 400 | |
full_path = os.path.join(path, name, '.keep').replace("\\", "/") | |
# 使用 HuggingFace API 创建文件夹 | |
api = HfApi(token=Config.HF_TOKEN) | |
response = api.upload_file( | |
path_or_fileobj=b'', # 空内容 | |
path_in_repo=full_path, | |
repo_id=Config.HF_DATASET_ID, | |
repo_type="dataset", | |
token=Config.HF_TOKEN | |
) | |
if response: | |
# 记录到数据库 | |
with db.conn.cursor() as cursor: | |
cursor.execute(""" | |
INSERT INTO files ( | |
original_name, stored_name, file_path, | |
file_type, file_size | |
) VALUES (%s, %s, %s, %s, %s) | |
""", ( | |
'.keep', | |
'.keep', | |
full_path, | |
'directory', | |
0 | |
)) | |
db.conn.commit() | |
return jsonify({'message': 'Folder created successfully'}) | |
return jsonify({'error': 'Failed to create folder'}), 500 | |
except Exception as e: | |
return jsonify({'error': str(e)}), 500 | |
if __name__ == '__main__': | |
app.run(host='0.0.0.0', port=7860, debug=True) |