Test / app_utils.py
Zheng Qu
testmongdb port
fcf296b
raw
history blame
8.02 kB
import json
import os, sys
import datetime
import logging
import traceback
import sqlite3
from pathlib import Path
from dotenv import load_dotenv
from tinydb import TinyDB, Query
from tinydb_serialization import SerializationMiddleware
from tinydb_sqlite import SQLiteStorage
import tempfile
import socket
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('app.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
load_dotenv()
SESSION_ID = os.getenv('SESSION_ID', 'default_session')
SESSION_TITLE = os.getenv('SESSION_TITLE', 'Default Session')
logger.info(f"Session ID: {SESSION_ID}")
ADMIN_PASSWD = os.getenv('ADMIN_PASSWD', 'default_password')
import os
def is_mongodb_port_open(ip: str, port: int) -> bool:
"""Check if the MongoDB port is open on the given IP address."""
try:
with socket.create_connection((ip, port), timeout=5):
return True
except (socket.timeout, ConnectionRefusedError, OSError):
return False
# Example usage:
MongoDBDomain = os.environ.get("MONGODB_DOMAIN")
port_number = 27017 # Default MongoDB port
if is_mongodb_port_open(MongoDBDomain, port_number):
print(f"The MongoDB {MongoDBDomain} port {port_number} is open.")
else:
print(f"The MongoDB {MongoDBDomain} port {port_number} on is blocked.")
def explore_persistent_volumes():
# Known persistent directories in Hugging Face Spaces
persistent_dirs = [
'/data', # Most reliable persistent storage
'/workspace', # Often persistent, but less guaranteed
os.path.expanduser('~') # Home directory, typically persistent
]
print("Checking available persistent directories:")
for directory in persistent_dirs:
try:
# Check if directory exists
if not os.path.exists(directory):
print(f"{directory}: Does not exist")
continue
# Try to create a test file
test_file = os.path.join(directory, 'huggingface_persistent_test.txt')
with open(test_file, 'w') as f:
f.write('Persistent volume test')
# Check if file persists
print(f"{directory}: βœ… Writable and appears persistent")
# Optional: clean up test file
os.remove(test_file)
except Exception as e:
print(f"{directory}: ❌ Not accessible. Error: {e}")
# Additional system information
print("\nCurrent Working Directory:", os.getcwd())
print("Home Directory:", os.path.expanduser('~'))
if sys.platform == 'darwin':
db_path = 'submissions.db'
else:
explore_persistent_volumes()
db_path = '/home/user/submissions.db'
def get_storage():
"""Create and return SQLite storage instance."""
try:
serialization = SerializationMiddleware()
connection = sqlite3.connect(db_path, check_same_thread=False)
return SQLiteStorage(
connection=connection,
serializer=serialization
)
except Exception as e:
logger.error(f"Error creating storage: {str(e)}\n{traceback.format_exc()}")
raise
# Initialize TinyDB with the storage factory
try:
_db = TinyDB(storage=get_storage)
_submissions_table = _db.table('submissions')
logger.info("Database initialized successfully")
except Exception as e:
logger.error(f"Database initialization failed: {str(e)}\n{traceback.format_exc()}")
raise
def get_db():
"""Get the database and submissions table instances."""
return _db, _submissions_table
def load_problems():
problems = []
try:
db_path = Path(SESSION_ID)
for json_file in db_path.glob('*.json'):
try:
with open(json_file) as f:
problem = json.load(f)
problem['id'] = json_file.stem
problems.append(problem)
logger.info(f"Loaded problem: {json_file.stem}")
except Exception as e:
logger.error(f"Error loading problem {json_file}: {str(e)}\n{traceback.format_exc()}")
except Exception as e:
logger.error(f"Error in load_problems: {str(e)}\n{traceback.format_exc()}")
return problems
def save_submission(session, name, email, problem_id, code, hint_requested):
try:
_, submissions_table = get_db()
timestamp = datetime.datetime.now().isoformat()
submission = {
'session': session,
'name': name,
'email': email,
'problem_id': problem_id,
'student_code': code,
'hint_requested': hint_requested,
'timestamp': timestamp
}
query = Query()
submissions_table.upsert(
submission,
(query.session == session) &
(query.name == name) &
(query.email == email) &
(query.problem_id == problem_id)
)
logger.info(f"Saved submission for {name} - Problem: {problem_id}")
except Exception as e:
logger.error(f"Error in save_submission: {str(e)}\n{traceback.format_exc()}")
raise
def save_all_submissions(name, email, codes_dict, hints_dict):
try:
for problem_id, code in codes_dict.items():
hint_requested = hints_dict.get(problem_id, False)
save_submission(SESSION_ID, name, email, problem_id, code, hint_requested)
logger.info(f"Successfully saved all submissions for {name}")
return "βœ… All submissions saved successfully!"
except Exception as e:
error_msg = f"Error saving submissions: {str(e)}"
logger.error(f"{error_msg}\n{traceback.format_exc()}")
return f"❌ {error_msg}"
def check_password(password):
return password == ADMIN_PASSWD
def get_all_students():
try:
_, submissions_table = get_db()
result = list(set([submission['name'] for submission in submissions_table.all()]))
logger.info(f"Retrieved {len(result)} students")
return result
except Exception as e:
logger.error(f"Error in get_all_students: {str(e)}\n{traceback.format_exc()}")
return []
def get_student_submissions(name):
try:
_, submissions_table = get_db()
query = Query()
submissions = submissions_table.search(query.name == name)
submissions.sort(key=lambda x: datetime.datetime.fromisoformat(x['timestamp']), reverse=True)
logger.info(f"Retrieved {len(submissions)} submissions for student: {name}")
return submissions
except Exception as e:
logger.error(f"Error in get_student_submissions: {str(e)}\n{traceback.format_exc()}")
return []
def export_submissions():
try:
_, submissions_table = get_db()
submissions = submissions_table.all()
submissions.sort(key=lambda x: datetime.datetime.fromisoformat(x['timestamp']), reverse=True)
data = json.dumps(submissions, indent=2)
temp_file = tempfile.NamedTemporaryFile(delete=False, mode='w', suffix='.json')
temp_file.write(data)
temp_file.close()
logger.info(f"Exported {len(submissions)} submissions to {temp_file.name}")
return temp_file.name
except Exception as e:
logger.error(f"Error in export_submissions: {str(e)}\n{traceback.format_exc()}")
return None
def refresh_submissions():
try:
_, submissions_table = get_db()
all_submissions = submissions_table.all()
all_submissions.sort(key=lambda x: datetime.datetime.fromisoformat(x['timestamp']), reverse=True)
logger.info(f"Refreshed {len(all_submissions)} submissions")
return all_submissions
except Exception as e:
logger.error(f"Error in refresh_submissions: {str(e)}\n{traceback.format_exc()}")
return []