Spaces:
Running
on
Zero
Running
on
Zero
class ScoringDiagnostics: | |
"""評分系統診斷工具類""" | |
def __init__(self, log_dir="scoring_diagnostics"): | |
""" | |
初始化診斷工具 | |
參數: | |
log_dir (str): 日誌文件存放目錄 | |
""" | |
self.log_dir = log_dir | |
self._ensure_log_directory() | |
self.current_session = datetime.now().strftime("%Y%m%d_%H%M%S") | |
def _ensure_log_directory(self): | |
"""確保日誌目錄存在""" | |
if not os.path.exists(self.log_dir): | |
os.makedirs(self.log_dir) | |
def _create_log_filename(self, prefix): | |
"""生成日誌文件名""" | |
return os.path.join( | |
self.log_dir, | |
f"{prefix}_{self.current_session}.json" | |
) | |
def diagnostic_wrapper(self, original_func): | |
""" | |
包裝原始評分函數的診斷裝飾器 | |
使用方式: | |
@scoring_diagnostics.diagnostic_wrapper | |
def calculate_compatibility_score(breed_info, user_prefs): | |
... | |
""" | |
def wrapper(breed_info, user_prefs, *args, **kwargs): | |
# 準備診斷信息 | |
diagnostic_info = { | |
"timestamp": datetime.now().isoformat(), | |
"breed": breed_info.get('Breed', 'Unknown'), | |
"input_data": { | |
"breed_info": breed_info, | |
"user_preferences": vars(user_prefs) | |
} | |
} | |
try: | |
# 執行原始函數 | |
result = original_func(breed_info, user_prefs, *args, **kwargs) | |
# 記錄成功結果 | |
diagnostic_info.update({ | |
"status": "success", | |
"result": result | |
}) | |
# 檢查是否所有分數都相同或接近預設值 | |
scores = [v for k, v in result.items() if k != 'overall'] | |
if all(abs(score - 0.5) < 0.1 for score in scores): | |
diagnostic_info["warnings"] = { | |
"type": "uniform_scores", | |
"message": "所有分數都接近預設值 0.5", | |
"scores": scores | |
} | |
except Exception as e: | |
# 記錄錯誤信息 | |
diagnostic_info.update({ | |
"status": "error", | |
"error": { | |
"type": str(type(e).__name__), | |
"message": str(e), | |
"traceback": traceback.format_exc() | |
} | |
}) | |
raise # 重新拋出異常 | |
finally: | |
# 保存診斷信息 | |
self._save_diagnostic_info(diagnostic_info) | |
return result | |
return wrapper | |
def _save_diagnostic_info(self, diagnostic_info): | |
"""保存診斷信息到文件""" | |
filename = self._create_log_filename("scoring_diagnostic") | |
try: | |
# 如果文件存在,讀取現有記錄 | |
if os.path.exists(filename): | |
with open(filename, 'r', encoding='utf-8') as f: | |
records = json.load(f) | |
else: | |
records = [] | |
# 添加新記錄 | |
records.append(diagnostic_info) | |
# 保存所有記錄 | |
with open(filename, 'w', encoding='utf-8') as f: | |
json.dump(records, f, ensure_ascii=False, indent=2) | |
except Exception as e: | |
print(f"保存診斷信息時發生錯誤: {str(e)}") | |
def analyze_diagnostics(self): | |
"""分析診斷結果並生成報告""" | |
try: | |
filename = self._create_log_filename("scoring_diagnostic") | |
if not os.path.exists(filename): | |
return "沒有找到診斷記錄" | |
with open(filename, 'r', encoding='utf-8') as f: | |
records = json.load(f) | |
# 分析結果 | |
analysis = { | |
"total_records": len(records), | |
"success_count": sum(1 for r in records if r["status"] == "success"), | |
"error_count": sum(1 for r in records if r["status"] == "error"), | |
"uniform_scores_count": sum(1 for r in records if "warnings" in r and | |
r["warnings"]["type"] == "uniform_scores"), | |
"error_types": {} | |
} | |
# 統計錯誤類型 | |
for record in records: | |
if record["status"] == "error": | |
error_type = record["error"]["type"] | |
analysis["error_types"][error_type] = analysis["error_types"].get(error_type, 0) + 1 | |
return analysis | |
except Exception as e: | |
return f"分析診斷記錄時發生錯誤: {str(e)}" |