from nc_py_api import Nextcloud import json from typing import Dict, Any import os import time from datetime import datetime import threading import arena_config # Initialize Nextcloud client nc = Nextcloud(nextcloud_url=arena_config.NEXTCLOUD_URL, nc_auth_user=arena_config.NEXTCLOUD_USERNAME, nc_auth_pass=arena_config.NEXTCLOUD_PASSWORD) def load_leaderboard() -> Dict[str, Any]: try: file_content = nc.files.download(arena_config.NEXTCLOUD_LEADERBOARD_PATH) return json.loads(file_content.decode('utf-8')) except Exception as e: print(f"Error loading leaderboard: {str(e)}") return {} def save_leaderboard(leaderboard_data: Dict[str, Any]) -> bool: try: json_data = json.dumps(leaderboard_data, indent=2) nc.files.upload(arena_config.NEXTCLOUD_LEADERBOARD_PATH, json_data.encode('utf-8')) return True except Exception as e: print(f"Error saving leaderboard: {str(e)}") return False def update_leaderboard(winner: str, loser: str) -> Dict[str, Any]: leaderboard = load_leaderboard() if winner not in leaderboard: leaderboard[winner] = {"wins": 0, "losses": 0, "opponents": {}} if loser not in leaderboard: leaderboard[loser] = {"wins": 0, "losses": 0, "opponents": {}} leaderboard[winner]["wins"] += 1 leaderboard[winner]["opponents"].setdefault(loser, {"wins": 0, "losses": 0})["wins"] += 1 leaderboard[loser]["losses"] += 1 leaderboard[loser]["opponents"].setdefault(winner, {"wins": 0, "losses": 0})["losses"] += 1 save_leaderboard(leaderboard) return leaderboard # Function to get the current leaderboard def get_current_leaderboard() -> Dict[str, Any]: return load_leaderboard() def create_backup(): while True: try: leaderboard_data = load_leaderboard() timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") backup_file_name = f"leaderboard_backup_{timestamp}.json" backup_path = f"{arena_config.NEXTCLOUD_BACKUP_FOLDER}/{backup_file_name}" json_data = json.dumps(leaderboard_data, indent=2) nc.files.upload(backup_path, json_data.encode('utf-8')) print(f"Backup created on Nextcloud: {backup_path}") except Exception as e: print(f"Error creating backup: {e}") time.sleep(3600) # Sleep for 1 hour def start_backup_thread(): backup_thread = threading.Thread(target=create_backup, daemon=True) backup_thread.start()