CloudVault / app.py
lexlepty's picture
Update app.py
52a0ce7 verified
from flask import (
Flask,
json,
render_template,
request,
jsonify,
redirect,
url_for,
send_file,
Response
)
from io import BytesIO
import urllib.parse
from functools import wraps
import requests
import hashlib
import os
from config import Config
from models import Database
from utils import get_file_type, format_file_size
from huggingface_hub import HfApi
# 初始化 HuggingFace API
api = HfApi(token=Config.HF_TOKEN)
app = Flask(__name__)
app.config['SECRET_KEY'] = Config.SECRET_KEY
db = Database()
def require_auth(f):
@wraps(f)
def decorated(*args, **kwargs):
if not Config.REQUIRE_LOGIN:
return f(*args, **kwargs)
if not request.cookies.get('authenticated'):
if request.is_json:
return jsonify({'error': 'Unauthorized'}), 401
return redirect(url_for('login'))
return f(*args, **kwargs)
return decorated
@app.route('/login', methods=['GET', 'POST'])
def login():
if request.method == 'POST':
if request.form.get('password') == Config.ACCESS_PASSWORD:
response = jsonify({'success': True})
response.set_cookie('authenticated', 'true', secure=True, httponly=True)
return response
return jsonify({'error': 'Invalid password'}), 401
return render_template('login.html')
@app.route('/logout')
def logout():
response = redirect(url_for('login'))
response.delete_cookie('authenticated')
return response
@app.route('/')
@require_auth
def index():
return render_template('index.html')
@app.route('/api/files/list/')
@app.route('/api/files/list/<path:directory>')
@require_auth
def list_files(directory=''):
try:
url = f"https://huggingface.co/api/datasets/{Config.HF_DATASET_ID}/tree/{Config.HF_BRANCH}"
if directory:
url = f"{url}/{directory}"
response = requests.get(
url,
headers={'Authorization': f'Bearer {Config.HF_TOKEN}'}
)
if not response.ok:
return jsonify({'error': 'Failed to fetch files', 'details': response.text}), response.status_code
files = response.json()
for file in files:
if file['type'] == 'file':
file['file_type'] = get_file_type(file['path'])
file['size_formatted'] = format_file_size(file['size'])
# 添加预览和下载URL
file['preview_url'] = f"/api/files/preview/{file['path']}"
file['download_url'] = f"/api/files/download/{file['path']}"
return jsonify(files)
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/files/preview/<path:filepath>')
@require_auth
def preview_file(filepath):
try:
file_type = get_file_type(filepath)
if file_type not in ['image', 'video', 'document']:
return jsonify({'error': 'File type not supported for preview'}), 400
url = f"https://{Config.PROXY_DOMAIN}/datasets/{Config.HF_DATASET_ID}/resolve/{Config.HF_BRANCH}/{filepath}"
response = requests.get(
url,
headers={'Authorization': f'Bearer {Config.HF_TOKEN}'},
stream=True
)
if response.ok:
def generate():
for chunk in response.iter_content(chunk_size=8192):
yield chunk
return Response(
generate(),
mimetype=response.headers.get('content-type', 'application/octet-stream'),
direct_passthrough=True
)
return jsonify({'error': 'Failed to fetch file'}), response.status_code
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/files/download/<path:filepath>')
@require_auth
def download_file(filepath):
try:
url = f"https://{Config.PROXY_DOMAIN}/datasets/{Config.HF_DATASET_ID}/resolve/{Config.HF_BRANCH}/{filepath}"
# 先发送 HEAD 请求获取文件信息
head_response = requests.head(
url,
headers={'Authorization': f'Bearer {Config.HF_TOKEN}'},
allow_redirects=True
)
if head_response.ok:
# 获取文件基本信息
content_type = head_response.headers.get('content-type', 'application/octet-stream')
content_length = head_response.headers.get('content-length')
last_modified = head_response.headers.get('last-modified')
etag = head_response.headers.get('etag')
# 如果是txt文件但没有指定字符集,设置为text/plain
if filepath.lower().endswith('.txt') and 'charset' not in content_type:
content_type = 'text/plain'
# 获取文件内容
response = requests.get(
url,
headers={'Authorization': f'Bearer {Config.HF_TOKEN}'},
stream=True
)
if response.ok:
filename = os.path.basename(filepath)
encoded_filename = urllib.parse.quote(filename.encode('utf-8'))
headers = {
'Content-Disposition': f'attachment; filename*=UTF-8\'\'{encoded_filename}',
'Content-Type': content_type,
'Content-Length': content_length,
'Accept-Ranges': 'bytes',
'Cache-Control': 'no-cache',
'Last-Modified': last_modified,
'ETag': etag
}
# 移除为None的header
headers = {k: v for k, v in headers.items() if v is not None}
return Response(
response.iter_content(chunk_size=1048576),
headers=headers
)
return jsonify({'error': 'File not found'}), 404
except Exception as e:
print(f"Download error for {filepath}: {str(e)}")
return jsonify({'error': str(e)}), 500
@app.route('/api/files/upload', methods=['POST'])
@require_auth
def upload_file():
if 'file' not in request.files:
return jsonify({'error': 'No file provided'}), 400
file = request.files['file']
current_path = request.form.get('path', '').strip('/')
try:
file_content = file.read()
file.seek(0)
original_name = file.filename
stored_name = original_name
full_path = os.path.join(current_path, stored_name).replace("\\", "/")
response = api.upload_file(
path_or_fileobj=file_content,
path_in_repo=full_path,
repo_id=Config.HF_DATASET_ID,
repo_type="dataset",
token=Config.HF_TOKEN
)
if response:
with db.conn.cursor() as cursor:
cursor.execute("""
INSERT INTO files (
original_name, stored_name, file_path,
file_type, file_size
) VALUES (%s, %s, %s, %s, %s)
""", (
original_name,
stored_name,
full_path,
get_file_type(original_name),
len(file_content)
))
db.conn.commit()
return jsonify({'success': True})
return jsonify({'error': 'Upload failed'}), 500
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/files/search')
@require_auth
def search_files():
keyword = request.args.get('keyword', '')
if not keyword:
return jsonify([])
try:
files = db.search_files(keyword)
return jsonify([{
'type': 'file',
'path': f['file_path'],
'file_type': get_file_type(f['file_path']),
'size': f['file_size'],
'size_formatted': format_file_size(f['file_size']),
'preview_url': f'/api/files/preview/{f["file_path"]}',
'download_url': f'/api/files/download/{f["file_path"]}'
} for f in files])
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/files/delete/<path:filepath>', methods=['DELETE'])
@require_auth
def delete_file(filepath):
try:
# Initialize HuggingFace API
api = HfApi(token=Config.HF_TOKEN)
# Delete file from HuggingFace Hub
api.delete_file(
path_in_repo=filepath,
repo_id=Config.HF_DATASET_ID,
repo_type="dataset"
)
# Delete file record from database
with db.conn.cursor() as cursor:
cursor.execute(
"DELETE FROM files WHERE file_path = %s",
[filepath]
)
db.conn.commit()
return jsonify({'success': True})
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/files/create_folder', methods=['POST'])
@require_auth
def create_folder():
try:
data = request.json
path = data.get('path', '/')
name = data.get('name')
if not name:
return jsonify({'error': 'Folder name is required'}), 400
full_path = os.path.join(path, name, '.keep').replace("\\", "/")
# 使用 HuggingFace API 创建文件夹
api = HfApi(token=Config.HF_TOKEN)
response = api.upload_file(
path_or_fileobj=b'', # 空内容
path_in_repo=full_path,
repo_id=Config.HF_DATASET_ID,
repo_type="dataset",
token=Config.HF_TOKEN
)
if response:
# 记录到数据库
with db.conn.cursor() as cursor:
cursor.execute("""
INSERT INTO files (
original_name, stored_name, file_path,
file_type, file_size
) VALUES (%s, %s, %s, %s, %s)
""", (
'.keep',
'.keep',
full_path,
'directory',
0
))
db.conn.commit()
return jsonify({'message': 'Folder created successfully'})
return jsonify({'error': 'Failed to create folder'}), 500
except Exception as e:
return jsonify({'error': str(e)}), 500
if __name__ == '__main__':
app.run(host='0.0.0.0', port=7860, debug=True)