Spaces:
Running
on
Zero
Running
on
Zero
import gradio as gr | |
import pickle | |
import numpy as np | |
import glob | |
from tqdm import tqdm | |
import torch | |
import torch.nn.functional as F | |
from transformers import AutoTokenizer, AutoModel | |
from peft import PeftModel | |
from tevatron.retriever.searcher import FaissFlatSearcher | |
import logging | |
import os | |
import json | |
import spaces | |
import ir_datasets | |
import subprocess | |
# Set up logging | |
logging.basicConfig(level=logging.INFO) | |
logger = logging.getLogger(__name__) | |
# Global variables | |
CUR_MODEL = "orionweller/repllama-instruct-hard-positives-v2-joint" | |
base_model = "meta-llama/Llama-2-7b-hf" | |
tokenizer = None | |
model = None | |
retriever = None | |
corpus_lookup = None | |
queries = None | |
q_lookup = None | |
def load_model(): | |
global tokenizer, model | |
tokenizer = AutoTokenizer.from_pretrained(base_model) | |
tokenizer.pad_token_id = tokenizer.eos_token_id | |
tokenizer.pad_token = tokenizer.eos_token | |
tokenizer.padding_side = "right" | |
base_model_instance = AutoModel.from_pretrained("meta-llama/Llama-2-7b-hf") | |
model = PeftModel.from_pretrained(base_model_instance, CUR_MODEL) | |
model = model.merge_and_unload() | |
model.eval() | |
model.cuda() | |
def load_corpus_embeddings(dataset_name): | |
global retriever, corpus_lookup | |
corpus_path = f"{dataset_name}/corpus_emb*" | |
index_files = glob.glob(corpus_path) | |
logger.info(f'Pattern match found {len(index_files)} files; loading them into index.') | |
p_reps_0, p_lookup_0 = pickle_load(index_files[0]) | |
retriever = FaissFlatSearcher(p_reps_0) | |
shards = [(p_reps_0, p_lookup_0)] + [pickle_load(f) for f in index_files[1:]] | |
corpus_lookup = [] | |
for p_reps, p_lookup in tqdm(shards, desc='Loading shards into index', total=len(index_files)): | |
retriever.add(p_reps) | |
corpus_lookup += p_lookup | |
def pickle_load(path): | |
with open(path, 'rb') as f: | |
reps, lookup = pickle.load(f) | |
return np.array(reps), lookup | |
def load_queries(dataset_name): | |
global queries, q_lookup | |
dataset = ir_datasets.load(f"beir/{dataset_name.lower()}/test") | |
queries = [] | |
q_lookup = {} | |
for query in dataset.queries_iter(): | |
queries.append(query.text) | |
q_lookup[query.query_id] = query.text | |
def encode_queries(prefix, postfix): | |
global queries | |
input_texts = [f"{prefix}Query: {query} {postfix}".strip() for query in queries] | |
encoded_embeds = [] | |
batch_size = 32 # Adjust as needed | |
for start_idx in range(0, len(input_texts), batch_size): | |
batch_input_texts = input_texts[start_idx: start_idx + batch_size] | |
inputs = tokenizer(batch_input_texts, padding=True, truncation=True, return_tensors="pt").to(model.device) | |
with torch.no_grad(): | |
outputs = model(**inputs) | |
embeds = outputs.last_hidden_state[:, 0, :] # Use [CLS] token embedding | |
embeds = F.normalize(embeds, p=2, dim=-1) | |
encoded_embeds.append(embeds.cpu().numpy()) | |
return np.concatenate(encoded_embeds, axis=0) | |
def search_queries(q_reps, depth=1000): | |
all_scores, all_indices = retriever.search(q_reps, depth) | |
psg_indices = [[str(corpus_lookup[x]) for x in q_dd] for q_dd in all_indices] | |
return all_scores, np.array(psg_indices) | |
def write_ranking(corpus_indices, corpus_scores, ranking_save_file): | |
with open(ranking_save_file, 'w') as f: | |
for qid, q_doc_scores, q_doc_indices in zip(q_lookup.keys(), corpus_scores, corpus_indices): | |
score_list = [(s, idx) for s, idx in zip(q_doc_scores, q_doc_indices)] | |
score_list = sorted(score_list, key=lambda x: x[0], reverse=True) | |
for rank, (s, idx) in enumerate(score_list, 1): | |
f.write(f'{qid} Q0 {idx} {rank} {s} pyserini\n') | |
def evaluate_with_subprocess(dataset, ranking_file): | |
# Convert to TREC format | |
trec_file = f"rank.{dataset}.trec" | |
convert_cmd = [ | |
"python", "-m", "tevatron.utils.format.convert_result_to_trec", | |
"--input", ranking_file, | |
"--output", trec_file, | |
"--remove_query" | |
] | |
subprocess.run(convert_cmd, check=True) | |
# Evaluate using trec_eval | |
eval_cmd = [ | |
"python", "-m", "pyserini.eval.trec_eval", | |
"-c", "-mrecall.100", "-mndcg_cut.10", | |
f"beir-v1.0.0-{dataset}-test", trec_file | |
] | |
result = subprocess.run(eval_cmd, capture_output=True, text=True, check=True) | |
# Parse the output | |
lines = result.stdout.strip().split('\n') | |
ndcg_10 = float(lines[0].split()[-1]) | |
recall_100 = float(lines[1].split()[-1]) | |
# Clean up temporary files | |
os.remove(ranking_file) | |
os.remove(trec_file) | |
return f"nDCG@10: {ndcg_10:.4f}, Recall@100: {recall_100:.4f}" | |
def run_evaluation(dataset, prefix, postfix): | |
global queries, q_lookup | |
# Load corpus embeddings and queries if not already loaded | |
if retriever is None or queries is None: | |
load_corpus_embeddings(dataset) | |
load_queries(dataset) | |
# Encode queries | |
q_reps = encode_queries(prefix, postfix) | |
# Search | |
all_scores, psg_indices = search_queries(q_reps) | |
# Write ranking | |
ranking_file = f"temp_ranking_{dataset}.txt" | |
write_ranking(psg_indices, all_scores, ranking_file) | |
# Evaluate | |
results = evaluate_with_subprocess(dataset, ranking_file) | |
return results | |
def gradio_interface(dataset, prefix, postfix): | |
return run_evaluation(dataset, prefix, postfix) | |
# Load model | |
load_model() | |
# Create Gradio interface | |
iface = gr.Interface( | |
fn=gradio_interface, | |
inputs=[ | |
gr.Dropdown(choices=["scifact", "arguana"], label="Dataset"), | |
gr.Textbox(label="Prefix prompt"), | |
gr.Textbox(label="Postfix prompt") | |
], | |
outputs=gr.Textbox(label="Evaluation Results"), | |
title="Query Evaluation with Custom Prompts", | |
description="Select a dataset and enter prefix and postfix prompts to evaluate queries using Pyserini." | |
) | |
# Launch the interface | |
iface.launch() |