Spaces:
Running
on
Zero
Running
on
Zero
File size: 5,082 Bytes
7f41c10 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 |
class ScoringDiagnostics:
"""評分系統診斷工具類"""
def __init__(self, log_dir="scoring_diagnostics"):
"""
初始化診斷工具
參數:
log_dir (str): 日誌文件存放目錄
"""
self.log_dir = log_dir
self._ensure_log_directory()
self.current_session = datetime.now().strftime("%Y%m%d_%H%M%S")
def _ensure_log_directory(self):
"""確保日誌目錄存在"""
if not os.path.exists(self.log_dir):
os.makedirs(self.log_dir)
def _create_log_filename(self, prefix):
"""生成日誌文件名"""
return os.path.join(
self.log_dir,
f"{prefix}_{self.current_session}.json"
)
def diagnostic_wrapper(self, original_func):
"""
包裝原始評分函數的診斷裝飾器
使用方式:
@scoring_diagnostics.diagnostic_wrapper
def calculate_compatibility_score(breed_info, user_prefs):
...
"""
@wraps(original_func)
def wrapper(breed_info, user_prefs, *args, **kwargs):
# 準備診斷信息
diagnostic_info = {
"timestamp": datetime.now().isoformat(),
"breed": breed_info.get('Breed', 'Unknown'),
"input_data": {
"breed_info": breed_info,
"user_preferences": vars(user_prefs)
}
}
try:
# 執行原始函數
result = original_func(breed_info, user_prefs, *args, **kwargs)
# 記錄成功結果
diagnostic_info.update({
"status": "success",
"result": result
})
# 檢查是否所有分數都相同或接近預設值
scores = [v for k, v in result.items() if k != 'overall']
if all(abs(score - 0.5) < 0.1 for score in scores):
diagnostic_info["warnings"] = {
"type": "uniform_scores",
"message": "所有分數都接近預設值 0.5",
"scores": scores
}
except Exception as e:
# 記錄錯誤信息
diagnostic_info.update({
"status": "error",
"error": {
"type": str(type(e).__name__),
"message": str(e),
"traceback": traceback.format_exc()
}
})
raise # 重新拋出異常
finally:
# 保存診斷信息
self._save_diagnostic_info(diagnostic_info)
return result
return wrapper
def _save_diagnostic_info(self, diagnostic_info):
"""保存診斷信息到文件"""
filename = self._create_log_filename("scoring_diagnostic")
try:
# 如果文件存在,讀取現有記錄
if os.path.exists(filename):
with open(filename, 'r', encoding='utf-8') as f:
records = json.load(f)
else:
records = []
# 添加新記錄
records.append(diagnostic_info)
# 保存所有記錄
with open(filename, 'w', encoding='utf-8') as f:
json.dump(records, f, ensure_ascii=False, indent=2)
except Exception as e:
print(f"保存診斷信息時發生錯誤: {str(e)}")
def analyze_diagnostics(self):
"""分析診斷結果並生成報告"""
try:
filename = self._create_log_filename("scoring_diagnostic")
if not os.path.exists(filename):
return "沒有找到診斷記錄"
with open(filename, 'r', encoding='utf-8') as f:
records = json.load(f)
# 分析結果
analysis = {
"total_records": len(records),
"success_count": sum(1 for r in records if r["status"] == "success"),
"error_count": sum(1 for r in records if r["status"] == "error"),
"uniform_scores_count": sum(1 for r in records if "warnings" in r and
r["warnings"]["type"] == "uniform_scores"),
"error_types": {}
}
# 統計錯誤類型
for record in records:
if record["status"] == "error":
error_type = record["error"]["type"]
analysis["error_types"][error_type] = analysis["error_types"].get(error_type, 0) + 1
return analysis
except Exception as e:
return f"分析診斷記錄時發生錯誤: {str(e)}" |