from flask import ( Flask, json, render_template, request, jsonify, redirect, url_for, send_file, Response ) from io import BytesIO import urllib.parse from functools import wraps import requests import hashlib import os from config import Config from models import Database from utils import get_file_type, format_file_size from huggingface_hub import HfApi # 初始化 HuggingFace API api = HfApi(token=Config.HF_TOKEN) app = Flask(__name__) app.config['SECRET_KEY'] = Config.SECRET_KEY db = Database() def require_auth(f): @wraps(f) def decorated(*args, **kwargs): if not Config.REQUIRE_LOGIN: return f(*args, **kwargs) if not request.cookies.get('authenticated'): if request.is_json: return jsonify({'error': 'Unauthorized'}), 401 return redirect(url_for('login')) return f(*args, **kwargs) return decorated @app.route('/login', methods=['GET', 'POST']) def login(): if request.method == 'POST': if request.form.get('password') == Config.ACCESS_PASSWORD: response = jsonify({'success': True}) response.set_cookie('authenticated', 'true', secure=True, httponly=True) return response return jsonify({'error': 'Invalid password'}), 401 return render_template('login.html') @app.route('/logout') def logout(): response = redirect(url_for('login')) response.delete_cookie('authenticated') return response @app.route('/') @require_auth def index(): return render_template('index.html') @app.route('/api/files/list/') @app.route('/api/files/list/') @require_auth def list_files(directory=''): try: url = f"https://huggingface.co/api/datasets/{Config.HF_DATASET_ID}/tree/{Config.HF_BRANCH}" if directory: url = f"{url}/{directory}" response = requests.get( url, headers={'Authorization': f'Bearer {Config.HF_TOKEN}'} ) if not response.ok: return jsonify({'error': 'Failed to fetch files', 'details': response.text}), response.status_code files = response.json() for file in files: if file['type'] == 'file': file['file_type'] = get_file_type(file['path']) file['size_formatted'] = format_file_size(file['size']) # 添加预览和下载URL file['preview_url'] = f"/api/files/preview/{file['path']}" file['download_url'] = f"/api/files/download/{file['path']}" return jsonify(files) except Exception as e: return jsonify({'error': str(e)}), 500 @app.route('/api/files/preview/') @require_auth def preview_file(filepath): try: file_type = get_file_type(filepath) if file_type not in ['image', 'video', 'document']: return jsonify({'error': 'File type not supported for preview'}), 400 url = f"https://{Config.PROXY_DOMAIN}/datasets/{Config.HF_DATASET_ID}/resolve/{Config.HF_BRANCH}/{filepath}" response = requests.get( url, headers={'Authorization': f'Bearer {Config.HF_TOKEN}'}, stream=True ) if response.ok: def generate(): for chunk in response.iter_content(chunk_size=8192): yield chunk return Response( generate(), mimetype=response.headers.get('content-type', 'application/octet-stream'), direct_passthrough=True ) return jsonify({'error': 'Failed to fetch file'}), response.status_code except Exception as e: return jsonify({'error': str(e)}), 500 @app.route('/api/files/download/') @require_auth def download_file(filepath): try: url = f"https://{Config.PROXY_DOMAIN}/datasets/{Config.HF_DATASET_ID}/resolve/{Config.HF_BRANCH}/{filepath}" # 先发送 HEAD 请求获取文件信息 head_response = requests.head( url, headers={'Authorization': f'Bearer {Config.HF_TOKEN}'}, allow_redirects=True ) if head_response.ok: # 获取文件基本信息 content_type = head_response.headers.get('content-type', 'application/octet-stream') content_length = head_response.headers.get('content-length') last_modified = head_response.headers.get('last-modified') etag = head_response.headers.get('etag') # 如果是txt文件但没有指定字符集,设置为text/plain if filepath.lower().endswith('.txt') and 'charset' not in content_type: content_type = 'text/plain' # 获取文件内容 response = requests.get( url, headers={'Authorization': f'Bearer {Config.HF_TOKEN}'}, stream=True ) if response.ok: filename = os.path.basename(filepath) encoded_filename = urllib.parse.quote(filename.encode('utf-8')) headers = { 'Content-Disposition': f'attachment; filename*=UTF-8\'\'{encoded_filename}', 'Content-Type': content_type, 'Content-Length': content_length, 'Accept-Ranges': 'bytes', 'Cache-Control': 'no-cache', 'Last-Modified': last_modified, 'ETag': etag } # 移除为None的header headers = {k: v for k, v in headers.items() if v is not None} return Response( response.iter_content(chunk_size=1048576), headers=headers ) return jsonify({'error': 'File not found'}), 404 except Exception as e: print(f"Download error for {filepath}: {str(e)}") return jsonify({'error': str(e)}), 500 @app.route('/api/files/upload', methods=['POST']) @require_auth def upload_file(): if 'file' not in request.files: return jsonify({'error': 'No file provided'}), 400 file = request.files['file'] current_path = request.form.get('path', '').strip('/') try: file_content = file.read() file.seek(0) original_name = file.filename stored_name = original_name full_path = os.path.join(current_path, stored_name).replace("\\", "/") response = api.upload_file( path_or_fileobj=file_content, path_in_repo=full_path, repo_id=Config.HF_DATASET_ID, repo_type="dataset", token=Config.HF_TOKEN ) if response: with db.conn.cursor() as cursor: cursor.execute(""" INSERT INTO files ( original_name, stored_name, file_path, file_type, file_size ) VALUES (%s, %s, %s, %s, %s) """, ( original_name, stored_name, full_path, get_file_type(original_name), len(file_content) )) db.conn.commit() return jsonify({'success': True}) return jsonify({'error': 'Upload failed'}), 500 except Exception as e: return jsonify({'error': str(e)}), 500 @app.route('/api/files/search') @require_auth def search_files(): keyword = request.args.get('keyword', '') if not keyword: return jsonify([]) try: files = db.search_files(keyword) return jsonify([{ 'type': 'file', 'path': f['file_path'], 'file_type': get_file_type(f['file_path']), 'size': f['file_size'], 'size_formatted': format_file_size(f['file_size']), 'preview_url': f'/api/files/preview/{f["file_path"]}', 'download_url': f'/api/files/download/{f["file_path"]}' } for f in files]) except Exception as e: return jsonify({'error': str(e)}), 500 @app.route('/api/files/delete/', methods=['DELETE']) @require_auth def delete_file(filepath): try: # Initialize HuggingFace API api = HfApi(token=Config.HF_TOKEN) # Delete file from HuggingFace Hub api.delete_file( path_in_repo=filepath, repo_id=Config.HF_DATASET_ID, repo_type="dataset" ) # Delete file record from database with db.conn.cursor() as cursor: cursor.execute( "DELETE FROM files WHERE file_path = %s", [filepath] ) db.conn.commit() return jsonify({'success': True}) except Exception as e: return jsonify({'error': str(e)}), 500 @app.route('/api/files/create_folder', methods=['POST']) @require_auth def create_folder(): try: data = request.json path = data.get('path', '/') name = data.get('name') if not name: return jsonify({'error': 'Folder name is required'}), 400 full_path = os.path.join(path, name, '.keep').replace("\\", "/") # 使用 HuggingFace API 创建文件夹 api = HfApi(token=Config.HF_TOKEN) response = api.upload_file( path_or_fileobj=b'', # 空内容 path_in_repo=full_path, repo_id=Config.HF_DATASET_ID, repo_type="dataset", token=Config.HF_TOKEN ) if response: # 记录到数据库 with db.conn.cursor() as cursor: cursor.execute(""" INSERT INTO files ( original_name, stored_name, file_path, file_type, file_size ) VALUES (%s, %s, %s, %s, %s) """, ( '.keep', '.keep', full_path, 'directory', 0 )) db.conn.commit() return jsonify({'message': 'Folder created successfully'}) return jsonify({'error': 'Failed to create folder'}), 500 except Exception as e: return jsonify({'error': str(e)}), 500 if __name__ == '__main__': app.run(host='0.0.0.0', port=7860, debug=True)