Spaces:
Runtime error
Runtime error
# database/db_manager.py | |
import sqlite3 | |
import json | |
import logging | |
from datetime import datetime | |
from typing import Dict, List, Optional, Any | |
import os | |
# Configuração de logging | |
logging.basicConfig( | |
level=logging.INFO, | |
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' | |
) | |
logger = logging.getLogger(__name__) | |
class DatabaseManager: | |
"""Gerenciador central do banco de dados""" | |
def __init__(self, db_path: str = 'revalida.db'): | |
"""Inicializa o gerenciador de banco de dados""" | |
self.db_path = db_path | |
self.conn = None | |
self.initialize_database() | |
def get_connection(self) -> sqlite3.Connection: | |
"""Obtém conexão com o banco de dados""" | |
if not self.conn: | |
self.conn = sqlite3.connect(self.db_path) | |
self.conn.row_factory = sqlite3.Row | |
return self.conn | |
def initialize_database(self) -> None: | |
"""Inicializa todas as tabelas necessárias""" | |
try: | |
conn = self.get_connection() | |
cursor = conn.cursor() | |
# Tabela de Usuários | |
cursor.execute(''' | |
CREATE TABLE IF NOT EXISTS users ( | |
user_id TEXT PRIMARY KEY, | |
name TEXT NOT NULL, | |
email TEXT UNIQUE, | |
registration_date DATE, | |
target_date DATE, | |
study_hours INTEGER, | |
weak_areas TEXT, | |
preferences TEXT | |
)''') | |
# Tabela de Progresso de Estudo | |
cursor.execute(''' | |
CREATE TABLE IF NOT EXISTS study_progress ( | |
id INTEGER PRIMARY KEY AUTOINCREMENT, | |
user_id TEXT, | |
date DATE, | |
topic TEXT, | |
hours_studied FLOAT, | |
questions_answered INTEGER, | |
performance_score FLOAT, | |
notes TEXT, | |
FOREIGN KEY (user_id) REFERENCES users(user_id) | |
)''') | |
# Tabela de Questões Anteriores | |
cursor.execute(''' | |
CREATE TABLE IF NOT EXISTS previous_questions ( | |
id INTEGER PRIMARY KEY AUTOINCREMENT, | |
year INTEGER, | |
area TEXT, | |
question_text TEXT, | |
options TEXT, | |
correct_answer TEXT, | |
explanation TEXT, | |
difficulty TEXT, | |
ref_sources TEXT, | |
tags TEXT, | |
times_used INTEGER DEFAULT 0, | |
success_rate FLOAT DEFAULT 0.0 | |
)''') | |
# Tabela de Casos Clínicos | |
cursor.execute(''' | |
CREATE TABLE IF NOT EXISTS clinical_cases ( | |
id INTEGER PRIMARY KEY AUTOINCREMENT, | |
title TEXT, | |
area TEXT, | |
difficulty TEXT, | |
description TEXT, | |
steps TEXT, | |
expected_answers TEXT, | |
hints TEXT, | |
ref_sources TEXT, | |
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP | |
)''') | |
# Tabela de Simulados | |
cursor.execute(''' | |
CREATE TABLE IF NOT EXISTS simulados ( | |
id TEXT PRIMARY KEY, | |
user_id TEXT, | |
created_at TIMESTAMP, | |
completed_at TIMESTAMP, | |
difficulty TEXT, | |
questions TEXT, | |
answers TEXT, | |
score FLOAT, | |
time_taken INTEGER, | |
analysis TEXT, | |
FOREIGN KEY (user_id) REFERENCES users(user_id) | |
)''') | |
# Tabela de Sessões de Estudo | |
cursor.execute(''' | |
CREATE TABLE IF NOT EXISTS study_sessions ( | |
id INTEGER PRIMARY KEY AUTOINCREMENT, | |
user_id TEXT, | |
start_time TIMESTAMP, | |
end_time TIMESTAMP, | |
topic TEXT, | |
activity_type TEXT, | |
productivity_score FLOAT, | |
notes TEXT, | |
FOREIGN KEY (user_id) REFERENCES users(user_id) | |
)''') | |
# Tabela de Metas e Objetivos | |
cursor.execute(''' | |
CREATE TABLE IF NOT EXISTS goals ( | |
id INTEGER PRIMARY KEY AUTOINCREMENT, | |
user_id TEXT, | |
goal_type TEXT, | |
description TEXT, | |
target_date DATE, | |
progress FLOAT, | |
status TEXT, | |
priority INTEGER, | |
FOREIGN KEY (user_id) REFERENCES users(user_id) | |
)''') | |
conn.commit() | |
logger.info("Banco de dados inicializado com sucesso") | |
except Exception as e: | |
logger.error(f"Erro ao inicializar banco de dados: {e}") | |
raise | |
def load_questions_from_json(self, file_path: str) -> bool: | |
"""Carrega questões de um arquivo JSON""" | |
try: | |
if not os.path.exists(file_path): | |
logger.error(f"Arquivo não encontrado: {file_path}") | |
return False | |
with open(file_path, 'r', encoding='utf-8') as f: | |
questions = json.load(f) | |
conn = self.get_connection() | |
cursor = conn.cursor() | |
for q in questions: | |
cursor.execute(''' | |
INSERT OR IGNORE INTO previous_questions | |
(year, area, question_text, options, correct_answer, | |
explanation, difficulty, ref_sources, tags) # <- Alterado aqui também | |
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) | |
''', ( | |
q.get('year'), | |
q.get('area'), | |
q.get('question_text'), | |
json.dumps(q.get('options', {})), | |
q.get('correct_answer'), | |
q.get('explanation'), | |
q.get('difficulty', 'medium'), | |
json.dumps(q.get('references', [])), # O nome no JSON pode continuar 'references' | |
json.dumps(q.get('tags', [])) | |
)) | |
conn.commit() | |
logger.info(f"Questões carregadas com sucesso de {file_path}") | |
return True | |
except Exception as e: | |
logger.error(f"Erro ao carregar questões: {e}") | |
return False | |
def add_user(self, user_data: Dict[str, Any]) -> bool: | |
"""Adiciona ou atualiza usuário""" | |
try: | |
conn = self.get_connection() | |
cursor = conn.cursor() | |
cursor.execute(''' | |
INSERT OR REPLACE INTO users | |
(user_id, name, email, registration_date, target_date, | |
study_hours, weak_areas, preferences) | |
VALUES (?, ?, ?, ?, ?, ?, ?, ?) | |
''', ( | |
user_data['user_id'], | |
user_data['name'], | |
user_data.get('email'), | |
user_data.get('registration_date', datetime.now().date()), | |
user_data.get('target_date'), | |
user_data.get('study_hours'), | |
json.dumps(user_data.get('weak_areas', [])), | |
json.dumps(user_data.get('preferences', {})) | |
)) | |
conn.commit() | |
logger.info(f"Usuário {user_data['user_id']} adicionado/atualizado com sucesso") | |
return True | |
except Exception as e: | |
logger.error(f"Erro ao adicionar usuário: {e}") | |
return False | |
def get_user_profile(self, user_id: str) -> Optional[Dict]: | |
"""Obtém perfil completo do usuário""" | |
try: | |
conn = self.get_connection() | |
cursor = conn.cursor() | |
# Dados básicos do usuário | |
cursor.execute('SELECT * FROM users WHERE user_id = ?', (user_id,)) | |
user = cursor.fetchone() | |
if not user: | |
return None | |
# Convertendo Row para dict | |
user_dict = dict(user) | |
# Decodificando JSON | |
user_dict['weak_areas'] = json.loads(user_dict['weak_areas']) | |
user_dict['preferences'] = json.loads(user_dict['preferences']) | |
# Progresso recente | |
cursor.execute(''' | |
SELECT topic, SUM(hours_studied) as total_hours, | |
AVG(performance_score) as avg_score | |
FROM study_progress | |
WHERE user_id = ? | |
GROUP BY topic | |
''', (user_id,)) | |
user_dict['progress'] = { | |
row['topic']: { | |
'hours': row['total_hours'], | |
'score': row['avg_score'] | |
} | |
for row in cursor.fetchall() | |
} | |
# Metas ativas | |
cursor.execute(''' | |
SELECT * FROM goals | |
WHERE user_id = ? AND status != 'completed' | |
ORDER BY priority DESC | |
''', (user_id,)) | |
user_dict['active_goals'] = [dict(row) for row in cursor.fetchall()] | |
return user_dict | |
except Exception as e: | |
logger.error(f"Erro ao obter perfil do usuário: {e}") | |
return None | |
def update_study_progress(self, progress_data: Dict[str, Any]) -> bool: | |
"""Atualiza progresso de estudo""" | |
try: | |
conn = self.get_connection() | |
cursor = conn.cursor() | |
cursor.execute(''' | |
INSERT INTO study_progress | |
(user_id, date, topic, hours_studied, | |
questions_answered, performance_score, notes) | |
VALUES (?, ?, ?, ?, ?, ?, ?) | |
''', ( | |
progress_data['user_id'], | |
progress_data.get('date', datetime.now().date()), | |
progress_data['topic'], | |
progress_data['hours_studied'], | |
progress_data.get('questions_answered', 0), | |
progress_data.get('performance_score', 0.0), | |
progress_data.get('notes') | |
)) | |
conn.commit() | |
logger.info(f"Progresso atualizado para usuário {progress_data['user_id']}") | |
return True | |
except Exception as e: | |
logger.error(f"Erro ao atualizar progresso: {e}") | |
return False | |
def close(self): | |
"""Fecha conexão com o banco de dados""" | |
if self.conn: | |
self.conn.close() | |
self.conn = None | |
def __enter__(self): | |
"""Suporte para context manager""" | |
return self | |
def __exit__(self, exc_type, exc_val, exc_tb): | |
"""Fecha conexão ao sair do contexto""" | |
self.close() | |
# Função de inicialização | |
def initialize_database(db_path: str = 'revalida.db') -> DatabaseManager: | |
"""Inicializa e retorna instância do gerenciador de banco de dados""" | |
try: | |
db_manager = DatabaseManager(db_path) | |
return db_manager | |
except Exception as e: | |
logger.error(f"Erro ao inicializar gerenciador de banco de dados: {e}") | |
raise | |
if __name__ == "__main__": | |
# Código para testes | |
try: | |
db_manager = initialize_database('test_revalida.db') | |
# Teste de inserção de usuário | |
test_user = { | |
'user_id': 'test123', | |
'name': 'Teste User', | |
'email': 'test@example.com', | |
'study_hours': 4, | |
'weak_areas': ['ClínicaMédica', 'Cirurgia'] | |
} | |
success = db_manager.add_user(test_user) | |
if success: | |
print("Sistema de banco de dados funcionando corretamente") | |
# Recuperar perfil para verificar | |
profile = db_manager.get_user_profile('test123') | |
print(f"Perfil recuperado: {json.dumps(profile, indent=2)}") | |
except Exception as e: | |
print(f"Erro nos testes: {e}") | |
finally: | |
db_manager.close() |