import json import re from collections import defaultdict import evaluate # import nltk import numpy as np from nervaluate import Evaluator from rouge_score import rouge_scorer from sacrebleu.metrics import BLEU, CHRF from sklearn.metrics import f1_score from tqdm import tqdm from transformers import AutoTokenizer from transformers import AutoTokenizer import re import string class TF_Tokenizer: def __init__(self, model_str): tok = AutoTokenizer.from_pretrained(model_str) def __call__(self, txt): return self.tok.tokenize(txt) class WS_Tokenizer: def __init__(self): pass def __call__(self, txt): return re.findall(r"[{}]|\w+".format(string.punctuation), txt) def convert_spans_to_bio(txt, roles, tokenizer_func): roles = sorted(roles, key=lambda x: x["start"]) roles_left = [r["start"] for r in roles] ttxt = tokenizer_func(txt) c = 0 cr = -1 prev = "O" troles = [] for tok in ttxt: if c >= len(txt): break while txt[c] == " ": c += 1 else: if c in roles_left: # Start of a new role ind = roles_left.index(c) cr = roles[ind]["end"] prev = "I-" + roles[ind]["label"] troles.append("B-" + roles[ind]["label"]) else: if c < cr: # Assign previous role troles.append(prev) else: # Assign 'O' troles.append("O") c += len(tok) if len(ttxt) != len(troles): troles += ["O"] * (len(ttxt) - len(troles)) assert len(ttxt) == len(troles) return troles def convert_bio_to_spans(txt, troles, tokenizer_func): c = 0 c2 = 0 cr = -1 cs = -1 prev = "O" roles = [] ttxt = tokenizer_func(txt) if len(ttxt) != len(troles): ttxt = ttxt[: len(troles)] for j, tok in enumerate(ttxt): if c >= len(txt): break while c < len(txt) and txt[c].isspace(): c += 1 if tok[:2] == "##" or tok == "[UNK]": c += len(tok) - 2 if tok[:2] == "##" else 1 else: if troles[j].startswith("B-"): if cs >= cr: cr = c if cs >= 0: roles.append({"start": cs, "end": c2, "label": prev}) cs = c prev = troles[j][2:] else: if troles[j] == "O": if cs >= cr: cr = c if cs >= 0: roles.append({"start": cs, "end": c2, "label": prev}) c += len(tok) c2 = c if cs >= cr: if cs >= 0: roles.append({"start": cs, "end": c2, "label": prev}) return roles def span2bio(txt, labels): roles = sorted(labels, key=lambda x: x["label"]) roles_left = [r["start"] for r in roles] ttxt = re.findall(r"[{}]|\w+".format(string.punctuation), txt) c = 0 cr = -1 prev = "O" troles = [] for tok in ttxt: if c >= len(txt): break while txt[c] == " ": c += 1 else: if c in roles_left: # Start of a new role ind = roles_left.index(c) cr = roles[ind]["end"] prev = "I-" + roles[ind]["label"] troles.append("B-" + roles[ind]["label"]) else: if c < cr: # Assign previous role troles.append(prev) else: # Assign 'O' troles.append("O") c += len(tok) if len(ttxt) != len(troles): troles += ["O"] * (len(ttxt) - len(troles)) assert len(ttxt) == len(troles) return ttxt, troles def load_json(file_path): with open(file_path, "r") as f: return json.load(f) def get_micro_at_k(gold, pred, k): gold_set = set(gold) pred_set = set(pred[:k]) return len(gold_set & pred_set), len(gold_set), len(pred_set) def evaluate_bail(gold_data, pred_data): gold_labels = [] pred_labels = [] for id, label in gold_data.items(): gold_labels.append(label) pred_labels.append(pred_data.get(id, 0)) f1 = f1_score(gold_labels, pred_labels, average="macro") print("Macro-F1 on HLDC-all-districts test set:", f1) return f"{f1:.2f}" def evaluate_cjpe(gold_data, pred_data): # Evaluate prediction gold_labels = [] pred_labels = [] for id, label in gold_data["prediction"].items(): gold_labels.append(label) pred_labels.append(pred_data["prediction"].get(id, 0)) f1 = f1_score(gold_labels, pred_labels, average="macro") prediction_result = {"cjpe-eval": f1} # Evaluate explanation rouge = evaluate.load("rouge") bleu = evaluate.load("bleu") gold_explanations = [exp["expert_1"] for exp in gold_data["explanation"].values()] pred_explanations = [exp["expert_1"] for exp in pred_data["explanation"].values()] rouge_scores = rouge.compute( predictions=pred_explanations, references=gold_explanations ) bleu_score = bleu.compute( predictions=pred_explanations, references=gold_explanations ) explanation_result = { "cjpe-exp-eval": { "rouge": [rouge_scores], "bleu": [bleu_score], } } return {**prediction_result, **explanation_result} def evaluate_lner(gold_data, pred_data, text_data): labels = [ "APP", "RESP", "A.COUNSEL", "R.COUNSEL", "JUDGE", "WIT", "AUTH", "COURT", "STAT", "PREC", "DATE", "CASENO", ] results_per_fold = {} for fold in range(1, 4): gold = gold_data[f"fold_{fold}"] pred = pred_data[f"fold_{fold}"] text = text_data[f"fold_{fold}"] texts, gold_labels, pred_labels = [], [], [] for id, gold_label in tqdm(gold.items()): txt = text[id] pred_label = pred.get(id, []) txt_seg, gold_bio = span2bio(txt, gold_label) _, pred_bio = span2bio(txt, pred_label) texts.append(txt_seg) gold_labels.append(gold_bio) pred_labels.append(pred_bio) evaluator = Evaluator(gold_labels, pred_labels, tags=labels, loader="list") results, results_per_tag, _, _ = evaluator.evaluate() f1_scores = [results_per_tag[l]["strict"]["f1"] for l in results_per_tag] avg_f1 = sum(f1_scores) / len(f1_scores) print(f"Strict Macro-F1 on Fold {fold}:", avg_f1) results_per_fold[f"fold_{fold}"] = avg_f1 return {"strict mF1": f"{np.mean(list(results_per_fold.values()))}:.2f"} def evaluate_rr(gold_data, pred_data): all_gold_labels = [] all_pred_labels = [] for id, gold_labels in gold_data.items(): pred_labels = pred_data.get(id, ["None"] * len(gold_labels)) all_gold_labels.extend(gold_labels) all_pred_labels.extend(pred_labels) mf1 = f1_score(all_gold_labels, all_pred_labels, average="macro") print(f"Macro-F1 on combined test set:", mf1) return {"mF1": f"{mf1:.2f}"} def evaluate_lsi(gold_data, pred_data): with open("lsi_label_vocab.json") as f: label_vocab = json.load(f) gold_matrix = np.zeros((len(gold_data), len(label_vocab))) pred_matrix = np.zeros((len(gold_data), len(label_vocab))) for i, (id, gold_labels) in enumerate(gold_data.items()): pred_labels = pred_data.get(id, []) for label in gold_labels: if label in label_vocab: gold_matrix[i, label_vocab[label]] = 1 for label in pred_labels: if label in label_vocab: pred_matrix[i, label_vocab[label]] = 1 f1 = f1_score(gold_matrix, pred_matrix, average="macro") print("Macro-F1 on ILSI test set:", f1) return f1 def evaluate_pcr(gold_data, pred_data): f1_scores = [] for k in range(1, 21): correct, gold_total, pred_total = 0, 0, 0 for id, gold_candidates in gold_data.items(): pred_candidates = pred_data.get(id, []) gold_candidates = [c for c in gold_candidates if c != id] pred_candidates = [c for c in pred_candidates if c != id] c, g, p = get_micro_at_k(gold_candidates, pred_candidates, k) correct += c gold_total += g pred_total += p precision = correct / pred_total if pred_total > 0 else 0 recall = correct / gold_total if gold_total > 0 else 0 f1 = ( 2 * precision * recall / (precision + recall) if precision + recall > 0 else 0 ) f1_scores.append(f1) print(f"Micro-F1@{k} on IL-PCR test set:", f1) return np.mean(f1_scores) def evaluate_summ(gold_data, pred_data): gold_summaries = [] pred_summaries = [] for id, gold_summary in gold_data.items(): if id in pred_data: gold_summary = re.sub(r"\s+", " ", gold_summary.replace("\n", " ")).strip() pred_summary = re.sub(r"\s+", " ", pred_data[id].replace("\n", " ")).strip() gold_summaries.append(gold_summary) pred_summaries.append(pred_summary) rouge = evaluate.load("rouge") rouge_scores = rouge.compute(predictions=pred_summaries, references=gold_summaries) print("Rouge-L:", rouge_scores) return {"ROUGE-L": rouge_scores, "BERTSCORE": "-"} def evaluate_lmt(gold_data, pred_data): tokenizer = AutoTokenizer.from_pretrained("ai4bharat/indic-bert") bleu = BLEU() chrfp = CHRF(word_order=2) gleu = evaluate.load("google_bleu") G = defaultdict(lambda: defaultdict(list)) P = defaultdict(lambda: defaultdict(list)) for dataset in gold_data: for id, gold_text in gold_data[dataset].items(): lang = id.split("/")[1].strip() gold_tokens = " ".join(tokenizer.tokenize(gold_text)) pred_tokens = " ".join(tokenizer.tokenize(pred_data[dataset][id])) G[dataset][lang].append(gold_tokens) P[dataset][lang].append(pred_tokens) bleu_scores, chrfpp_scores, gleu_scores = [], [], [] for dataset in G: print("Dataset", dataset) dataset_bleu, dataset_chrfpp, dataset_gleu = [], [], [] for lang in G[dataset]: gold = G[dataset][lang] pred = P[dataset][lang] bleu_score = bleu.corpus_score(pred, [gold]).score chrfpp_score = chrfp.corpus_score(pred, [gold]).score gleu_score = gleu.compute(predictions=pred, references=gold)["google_bleu"] dataset_bleu.append(bleu_score) dataset_chrfpp.append(chrfpp_score) dataset_gleu.append(gleu_score) bleu_scores.append(sum(dataset_bleu) / len(dataset_bleu)) chrfpp_scores.append(sum(dataset_chrfpp) / len(dataset_chrfpp)) gleu_scores.append(sum(dataset_gleu) / len(dataset_gleu)) return { "BLEU": sum(bleu_scores) / len(bleu_scores), "GLEU": sum(gleu_scores) / len(gleu_scores), "chrF++": sum(chrfpp_scores) / len(chrfpp_scores), } def create_output_json(evaluation_results): output = { "Method": "GPT-5 (2-shot)", "Submitted By": "IL-TUR", "Github Link": "dummy submission", "L-NER": {"strict mF1": evaluation_results["lner"]["strict mF1"]}, "RR": {"mF1": evaluation_results["rr"]["mF1"]}, "CJPE": { "mF1": evaluation_results["cjpe"]["mF1"], "ROUGE-L": evaluation_results["cjpe"]["ROUGE-L"], "BLEU": evaluation_results["cjpe"]["BLEU"], }, "BAIL": {"mF1": evaluation_results["bail"]}, "LSI": {"mF1": evaluation_results["lsi"]}, "PCR": {"muF1@K": evaluation_results["pcr"]}, "SUMM": { "ROUGE-L": evaluation_results["summ"]["ROUGE-L"], "BERTSCORE": "-", # Placeholder BERTSCORE }, "L-MT": { "BLEU": evaluation_results["lmt"]["BLEU"], "GLEU": evaluation_results["lmt"]["GLEU"], "chrF++": evaluation_results["lmt"]["chrF++"], }, } return [output] # Wrap in a list to match the desired format def main(): # gold_data = load_json("IL_TUR_eval_gold.json") # pred_data = load_json("IL_TUR_eval_submission2.json") gold_data = load_json("submissions/baseline/IL_TUR_eval_gold_small.json") pred_data = load_json("submissions/baseline/IL_TUR_eval_submission_small.json") pred_data = gold_data evaluation_results = {} for task in pred_data.keys(): print(f"Task: {task}") if task == "bail": evaluation_results[task] = evaluate_bail(gold_data[task], pred_data[task]) elif task == "cjpe": evaluation_results.update(evaluate_cjpe(gold_data[task], pred_data[task])) elif task == "lner": text_data = load_json("lner-text.json") evaluation_results[task] = evaluate_lner( gold_data[task], pred_data[task], text_data ) elif task == "rr": evaluation_results[task] = evaluate_rr(gold_data[task], pred_data[task]) elif task == "lsi": evaluation_results[task] = evaluate_lsi(gold_data[task], pred_data[task]) elif task == "pcr": evaluation_results[task] = evaluate_pcr(gold_data[task], pred_data[task]) elif task == "summ": evaluation_results[task] = evaluate_summ(gold_data[task], pred_data[task]) elif task == "lmt": evaluation_results[task] = evaluate_lmt(gold_data[task], pred_data[task]) # convert the evaluation results to the required format for task, result in evaluation_results.items(): if isinstance(result, dict): for subtask, subresult in result.items(): if isinstance(subresult, dict): for subsubtask, subsubresult in subresult.items(): evaluation_results[task][subtask][ subsubtask ] = f"{subsubresult:.2f}" else: if isinstance(subresult, str): evaluation_results[task][subtask] = subresult else: evaluation_results[task][subtask] = f"{subresult:.2f}" else: if isinstance(result, str): evaluation_results[task] = result else: evaluation_results[task] = f"{result:.2f}" blank_scores = { "lner": {"strict mF1": "-"}, "rr": {"mF1": "-"}, "cjpe": {"mF1": "-", "ROUGE-L": "-", "BLEU": "-"}, "bail": {"mF1": "-"}, "lsi": {"mF1": "-"}, "pcr": {"muF1@K": "-"}, "summ": {"ROUGE-L": "-", "BERTSCORE": "-"}, "lmt": {"BLEU": "-", "GLEU": "-", "chrF++": "-"}, } print("--------------------------Evaluation Summary--------------------------") for task, result in evaluation_results.items(): print(f"{task}: {result}") print("---------------------------------------------------------------------") # for tasks that were not present in the submission, add blank scores for task in gold_data.keys(): if task not in pred_data: evaluation_results[task] = blank_scores[task] # Generate the output JSON output_json = create_output_json(evaluation_results) with open("evaluation_results.json", "w") as f: json.dump(output_json, f, indent=2) print("Evaluation results saved to evaluation_results.json") def get_evaluation_scores(gold_data, submission_data): evaluation_results = {} for task in submission_data.keys(): print(f"Task: {task}") if task == "bail": evaluation_results[task] = evaluate_bail( gold_data[task], submission_data[task] ) elif task == "cjpe": evaluation_results.update( evaluate_cjpe(gold_data[task], submission_data[task]) ) elif task == "lner": text_data = load_json("lner-text.json") evaluation_results[task] = evaluate_lner( gold_data[task], submission_data[task], text_data ) elif task == "rr": evaluation_results[task] = evaluate_rr( gold_data[task], submission_data[task] ) elif task == "lsi": evaluation_results[task] = evaluate_lsi( gold_data[task], submission_data[task] ) elif task == "pcr": evaluation_results[task] = evaluate_pcr( gold_data[task], submission_data[task] ) elif task == "summ": evaluation_results[task] = evaluate_summ( gold_data[task], submission_data[task] ) elif task == "lmt": evaluation_results[task] = evaluate_lmt( gold_data[task], submission_data[task] ) # convert the evaluation results to the required format for task, result in evaluation_results.items(): if isinstance(result, dict): for subtask, subresult in result.items(): if isinstance(subresult, dict): for subsubtask, subsubresult in subresult.items(): evaluation_results[task][subtask][ subsubtask ] = f"{subsubresult:.2f}" else: if isinstance(subresult, str): evaluation_results[task][subtask] = subresult else: evaluation_results[task][subtask] = f"{subresult:.2f}" else: if isinstance(result, str): evaluation_results[task] = result else: evaluation_results[task] = f"{result:.2f}" blank_scores = { "lner": {"strict mF1": "-"}, "rr": {"mF1": "-"}, "cjpe": {"mF1": "-", "ROUGE-L": "-", "BLEU": "-"}, "bail": {"mF1": "-"}, "lsi": {"mF1": "-"}, "pcr": {"muF1@K": "-"}, "summ": {"ROUGE-L": "-", "BERTSCORE": "-"}, "lmt": {"BLEU": "-", "GLEU": "-", "chrF++": "-"}, } # for tasks that were not present in the submission, add blank scores for task in gold_data.keys(): if task not in submission_data: evaluation_results[task] = blank_scores[task] print("--------------------------Evaluation Summary--------------------------") for task, result in evaluation_results.items(): print(f"{task}: {result}") print("---------------------------------------------------------------------") output_json = create_output_json(evaluation_results) return output_json if __name__ == "__main__": main()