|
from datasets import load_dataset, load_from_disk, Dataset
|
|
import os
|
|
from transformers import AutoTokenizer, AutoModel
|
|
import torch
|
|
import pandas as pd
|
|
import xml.etree.ElementTree as ET
|
|
import glob, os
|
|
|
|
rootFolder = "c:/317"
|
|
file = open(rootFolder + "/result.csv", "w", encoding="utf-8")
|
|
|
|
def parseXML(xmlFile):
|
|
|
|
prompt = xmlFile.replace("Using_", "").replace(".xml", "").replace(".", " ").replace("_", " ")
|
|
text = ""
|
|
|
|
try:
|
|
tree = ET.parse(rootFolder + "/" + xmlFile)
|
|
root = tree.getroot()
|
|
|
|
for item in root.findall(".//text"):
|
|
text += (item.text + " ")
|
|
|
|
if len(text) > 500:
|
|
text = text[:500]
|
|
|
|
if text.find("а") == -1:
|
|
file.write(text + "\n")
|
|
return {"text": text, "prompt": prompt}
|
|
else:
|
|
return None
|
|
|
|
except Exception as error:
|
|
print(error)
|
|
|
|
|
|
def generator():
|
|
|
|
for xmlFile in glob.glob("*.xml", root_dir=rootFolder):
|
|
print(xmlFile)
|
|
data = parseXML(xmlFile)
|
|
if not (data == None) : yield data
|
|
|
|
|
|
ds = Dataset.from_generator(generator)
|
|
|
|
file.close()
|
|
|
|
|
|
|
|
|
|
model_ckpt = "sentence-transformers/multi-qa-mpnet-base-dot-v1"
|
|
|
|
|
|
model_ckpt = "nomic-ai/nomic-embed-text-v1.5"
|
|
|
|
|
|
tokenizer = AutoTokenizer.from_pretrained(model_ckpt)
|
|
model = AutoModel.from_pretrained(model_ckpt, trust_remote_code=True)
|
|
|
|
device = torch.device("cuda")
|
|
model.to(device)
|
|
|
|
def cls_pooling(model_output):
|
|
return model_output.last_hidden_state[:, 0]
|
|
|
|
def get_embeddings(text_list):
|
|
encoded_input = tokenizer(
|
|
text_list, padding=True, truncation=True, return_tensors="pt"
|
|
)
|
|
encoded_input = {k: v.to(device) for k, v in encoded_input.items()}
|
|
model_output = model(**encoded_input)
|
|
return cls_pooling(model_output)
|
|
|
|
embeddings_dataset = ds.map(
|
|
lambda x: {"embeddings": get_embeddings(x["text"]).detach().cpu().numpy()[0]}
|
|
)
|
|
|
|
embeddings_dataset.save_to_disk("dataset/embeddings")
|
|
|
|
embeddings_dataset = Dataset.load_from_disk("dataset/embeddings")
|
|
|
|
embeddings_dataset.add_faiss_index(column="embeddings")
|
|
|
|
embeddings_dataset.save_faiss_index("embeddings", "index/embeddings")
|
|
|
|
question = "Download license key"
|
|
|
|
question_embedding = get_embeddings([question]).cpu().detach().numpy()
|
|
|
|
scores, samples = embeddings_dataset.get_nearest_examples(
|
|
"embeddings", question_embedding, k=10
|
|
)
|
|
|
|
samples_df = pd.DataFrame.from_dict(samples)
|
|
samples_df["scores"] = scores
|
|
samples_df.sort_values("scores", ascending=True, inplace=True)
|
|
|
|
for _, row in samples_df.iterrows():
|
|
print(f"COMMENT: {row.text}")
|
|
print(f"SCORE: {row.scores}")
|
|
print(f"PROMPT: {row.prompt}")
|
|
print("=" * 50)
|
|
print()
|
|
|
|
|