import argparse
import numpy as np
from nltk.translate.bleu_score import sentence_bleu
import sys
sys.path.append('/home/ataallka/minigpt_video/minigpt_multi_img')
from minigpt4.common.registry import registry
from minigpt4.common.config import Config
# imports modules for registration
from minigpt4.datasets.builders import *
from minigpt4.models import *
from minigpt4.processors import *
# from minigpt4.runners import *
from minigpt4.tasks import *
from pycocoevalcap.cider.cider import Cider
import os
import openai
from tqdm import tqdm
import json
import ast
import time
def eval_parser():
parser = argparse.ArgumentParser(description="Demo")
parser.add_argument("--cfg-path", help="path to configuration file.",default="test_configs/llama2_test_config.yaml")
parser.add_argument("--ckpt", type=str,default='checkpoints/video_llama_checkpoint_last.pth', help="path to checkpoint")
parser.add_argument("--eval_opt", type=str, default='all', help="path to configuration file.")
parser.add_argument("--max_new_tokens", type=int, default=512, help="max number of generated tokens")
parser.add_argument("--lora_r", type=int, default=64, help="lora rank of the model")
parser.add_argument("--lora_alpha", type=int, default=16, help="lora alpha")
parser.add_argument(
"--options",
nargs="+",
help="override some settings in the used config, the key-value pair "
"in xxx=yyy format will be merged into config file (deprecate), "
"change to --cfg-options instead.",
)
return parser
def prepare_texts(texts, conv_temp, template='', lengths=None):
convs = [conv_temp.copy() for _ in range(len(texts))]
if lengths is None:
[conv.append_message(conv.roles[0], '{} {}'.format(template, text)) for conv, text in zip(convs, texts)]
else:
templates = [template * length for length in lengths]
[conv.append_message(conv.roles[0], '{} {}'.format(template, text)) for template, conv, text in zip(templates, convs, texts)]
[conv.append_message(conv.roles[1], None) for conv in convs]
texts = [conv.get_prompt() for conv in convs]
return texts
def init_model(args):
print('Initialization Model')
cfg = Config(args)
cfg.model_cfg.ckpt = args.ckpt
cfg.model_cfg.lora_r = args.lora_r
cfg.model_cfg.lora_alpha = args.lora_alpha
model_config = cfg.model_cfg
model_config.low_resource = True
model_cls = registry.get_model_class(model_config.arch)
model = model_cls.from_config(model_config).to('cuda:0')
# import pudb; pudb.set_trace()
key = list(cfg.datasets_cfg.keys())[0]
vis_processor_cfg = cfg.datasets_cfg.get(key).vis_processor.train
print(vis_processor_cfg)
vis_processor = registry.get_processor_class(vis_processor_cfg.name).from_config(vis_processor_cfg)
print('Initialization Finished')
return model, vis_processor
def computeIoU(bbox1, bbox2):
x1, y1, x2, y2 = bbox1
x3, y3, x4, y4 = bbox2
intersection_x1 = max(x1, x3)
intersection_y1 = max(y1, y3)
intersection_x2 = min(x2, x4)
intersection_y2 = min(y2, y4)
intersection_area = max(0, intersection_x2 - intersection_x1 + 1) * max(0, intersection_y2 - intersection_y1 + 1)
bbox1_area = (x2 - x1 + 1) * (y2 - y1 + 1)
bbox2_area = (x4 - x3 + 1) * (y4 - y3 + 1)
union_area = bbox1_area + bbox2_area - intersection_area
iou = intersection_area / union_area
return iou
def eval_bleu(results):
bleus1,bleus2,bleus3,bleus4 = [],[],[],[]
for result in tqdm (results,desc="bleu_eval"):
gt = result['gt']
pred = result['pred']
bleus1.append(sentence_bleu([gt.split()], pred.split(), weights=(1,0,0,0)))
bleus2.append(sentence_bleu([gt.split()], pred.split(), weights=(0.5,0.5,0,0)))
bleus3.append(sentence_bleu([gt.split()], pred.split(), weights=(0.33,0.33,0.33,0)))
bleus4.append(sentence_bleu([gt.split()], pred.split()))
# print(np.mean(bleus1),np.mean(bleus2),np.mean(bleus3),np.mean(bleus4),flush=True)
return {'bleu1':np.mean(bleus1),'bleu2':np.mean(bleus2),'bleu3':np.mean(bleus3),'bleu4':np.mean(bleus4)}
# Create a Cider object
cider_scorer = Cider()
def eval_cider(pred_result,gt_result):
# Compute CIDEr scores
mean_cider_scores, cider_scores = cider_scorer.compute_score(gt_result, pred_result)
cider_scores_dict={}
for score,pred_vid_id,gt_vid_id in tqdm(zip(cider_scores.tolist(),pred_result,gt_result),desc="cider_eval") :
assert pred_vid_id==gt_vid_id
cider_scores_dict[pred_vid_id] = score
return {'mean_cider_scores':mean_cider_scores,'cider_scores':cider_scores_dict}
openai.api_key_path = "/home/ataallka/chatgpt_api.txt"
def chat_gpt_eval(results,output_path):
trial=0
gpt_results=[]
avg_chatgpt_score=0
existed_files={}
# read previous results from output path
for file in os.listdir(output_path):
if file.endswith(".json"):
with open(f'{output_path}/{file}') as json_file:
data = json.load(json_file)
gpt_results.append(data[0])
avg_chatgpt_score+=float(data[0]['chatgpt_score'])
existed_files[data[0]['video_name']]=True
length_output_path=len(os.listdir(output_path))
while len (results)!= length_output_path:
for res in tqdm(results,desc="chatgpt_eval"):
if existed_files.get(res['video_name'],False):
continue
video_name=res['video_name']
sentence_1=res['A']
sentence_2=res['pred']
try:
# prompt=f"given these 2 sentences the first one is the ground truth text and the second sentence is the generated text ,give me a score from 0 to 1 to evaluate how much they are similar to each other, and have the same context and related to each other to evaluate the quality of this generated text.the output should be only the score float number without any additional information\nfirst sentence: {sentence_1}\nsecond sentence: {sentence_2}\nscore:"
prompt=f"given these 2 sentences the first one is the ground truth descrption of a video and the second sentence is the generated text from a video summarization model,give it a score from 0 to 5 to evaluate the model summarization performance.the output should be only the score number without any additional information\nfirst sentence: {sentence_1}\nsecond sentence: {sentence_2}\nscore:"
response = openai.ChatCompletion.create(
model="gpt-3.5-turbo",
messages=[
{
"role": "user",
"content": prompt
}],
)
res['chatgpt_score']=response.choices[0].message['content']
out={'video_name':video_name,'chatgpt_score':response.choices[0].message['content']}
gpt_results.append(out)
# save each video result in a json file
with open(f'{output_path}/{video_name}.json', 'w') as f:
json.dump([out], f)
avg_chatgpt_score+=float(response.choices[0].message['content'])
except Exception as e:
print("chat gpt error",e)
print ("Finished chat gpt evaluation in trial",trial)
trial+=1
length_output_path=len(os.listdir(output_path))
return results,avg_chatgpt_score/len(results)
def GPT4_answer(question, answer,pred):
try:
# Compute the correctness score
completion = openai.ChatCompletion.create(
# model="gpt-3.5-turbo",
model='gpt-4',
messages=[
{
"role": "system",
"content":
"You are an intelligent chatbot designed for evaluating the correctness of generative outputs for question-answer pairs. "
"Your task is to compare the predicted answer with the correct answer and determine if they match meaningfully. Here's how you can accomplish the task:"
"------"
"##INSTRUCTIONS: "
"- Focus on the meaningful match between the predicted answer and the correct answer.\n"
"- Consider synonyms or paraphrases as valid matches.\n"
"- Evaluate the correctness of the prediction compared to the answer."
},
{
"role": "user",
"content":
"Please evaluate the following video-based question-answer pair:\n\n"
f"Question: {question}\n"
f"Correct Answer: {answer}\n"
f"Predicted Answer: {pred}\n\n"
"Provide your evaluation only as a yes/no and score where the score is an integer value between 0 and 5, with 5 indicating the highest meaningful match. "
"Please generate the response in the form of a Python dictionary string with keys 'pred' and 'score', where value of 'pred' is a string of 'yes' or 'no' and value of 'score' is in INTEGER, not STRING."
"DO NOT PROVIDE ANY OTHER OUTPUT TEXT OR EXPLANATION. Only provide the Python dictionary string. "
"For example, your response should look like this: {'pred': 'yes', 'score': 4.8}."
}
]
)
# Convert response to a Python dictionary.
response_message = completion["choices"][0]["message"]["content"]
response_dict = ast.literal_eval(response_message)
return response_dict
except Exception as e:
print(f"Error : {e}")
return None
def GPT4_evaluation(val_result):
scores=[]
yes_count=0
no_count=0
for res in val_result:
gpt_response=GPT4_answer(res['Q'],res['A'],res['pred'])
if gpt_response is None:
continue
try:
scores.append(float(gpt_response['score']))
if 'yes' in gpt_response['pred'].lower():
yes_count+=1
elif 'no' in gpt_response['pred'].lower():
no_count+=1
except:
continue
avg_score=sum(scores)/len(scores)
accuracy=(yes_count/(yes_count+no_count))*100
print(f"chatgpt score: {avg_score} accuracy: {accuracy}")
return avg_score,accuracy
# with open('results/ckpt_15_res89_res32_Video_validation_Dataset_subtitles.json','r') as f:
# results = json.load(f)
# t1=time.time()
# avg_score,accuracy=GPT4_evaluation(results)
# print(f"chatgpt score: {avg_score} accuracy: {accuracy}")
# print(f"Time taken: {time.time()-t1}")