|
import re |
|
import os |
|
import datetime |
|
from typing import Type, Dict, List, Tuple |
|
import time |
|
from itertools import compress |
|
import pandas as pd |
|
import numpy as np |
|
|
|
|
|
import torch.cuda |
|
from threading import Thread |
|
from transformers import pipeline, TextIteratorStreamer |
|
|
|
|
|
|
|
|
|
|
|
from langchain.prompts import PromptTemplate |
|
from langchain_community.vectorstores import FAISS |
|
from langchain_community.retrievers import SVMRetriever |
|
from langchain.text_splitter import RecursiveCharacterTextSplitter |
|
from langchain.docstore.document import Document |
|
|
|
|
|
|
|
|
|
from nltk.corpus import stopwords |
|
from nltk.tokenize import RegexpTokenizer |
|
from nltk.stem import WordNetLemmatizer |
|
|
|
from keybert import KeyBERT |
|
|
|
|
|
|
|
|
|
|
|
|
|
import bm25s |
|
import Stemmer |
|
|
|
|
|
|
|
|
|
|
|
from llama_cpp import Llama |
|
from huggingface_hub import hf_hub_download |
|
|
|
from chatfuncs.prompts import instruction_prompt_template_alpaca, instruction_prompt_mistral_orca, instruction_prompt_phi3, instruction_prompt_llama3, instruction_prompt_qwen |
|
|
|
import gradio as gr |
|
|
|
torch.cuda.empty_cache() |
|
|
|
PandasDataFrame = Type[pd.DataFrame] |
|
|
|
embeddings = None |
|
vectorstore = None |
|
model_type = None |
|
|
|
max_memory_length = 0 |
|
|
|
full_text = "" |
|
|
|
model = [] |
|
tokenizer = [] |
|
|
|
|
|
hlt_chunk_size = 12 |
|
hlt_strat = [" ", ". ", "! ", "? ", ": ", "\n\n", "\n", ", "] |
|
hlt_overlap = 4 |
|
|
|
|
|
ner_model = [] |
|
|
|
|
|
|
|
kw_model = pipeline("feature-extraction", model="sentence-transformers/all-MiniLM-L6-v2") |
|
|
|
|
|
if torch.cuda.is_available(): |
|
torch_device = "cuda" |
|
gpu_layers = 100 |
|
else: |
|
torch_device = "cpu" |
|
gpu_layers = 0 |
|
|
|
print("Running on device:", torch_device) |
|
threads = 8 |
|
print("CPU threads:", threads) |
|
|
|
|
|
temperature: float = 0.1 |
|
top_k: int = 3 |
|
top_p: float = 1 |
|
repetition_penalty: float = 1.15 |
|
flan_alpaca_repetition_penalty: float = 1.3 |
|
last_n_tokens: int = 64 |
|
max_new_tokens: int = 1024 |
|
seed: int = 42 |
|
reset: bool = False |
|
stream: bool = True |
|
threads: int = threads |
|
batch_size:int = 256 |
|
context_length:int = 2048 |
|
sample = True |
|
|
|
|
|
class CtransInitConfig_gpu: |
|
def __init__(self, |
|
last_n_tokens=last_n_tokens, |
|
seed=seed, |
|
n_threads=threads, |
|
n_batch=batch_size, |
|
n_ctx=4096, |
|
n_gpu_layers=gpu_layers): |
|
|
|
self.last_n_tokens = last_n_tokens |
|
self.seed = seed |
|
self.n_threads = n_threads |
|
self.n_batch = n_batch |
|
self.n_ctx = n_ctx |
|
self.n_gpu_layers = n_gpu_layers |
|
|
|
|
|
def update_gpu(self, new_value): |
|
self.n_gpu_layers = new_value |
|
|
|
class CtransInitConfig_cpu(CtransInitConfig_gpu): |
|
def __init__(self): |
|
super().__init__() |
|
self.n_gpu_layers = 0 |
|
|
|
gpu_config = CtransInitConfig_gpu() |
|
cpu_config = CtransInitConfig_cpu() |
|
|
|
|
|
class CtransGenGenerationConfig: |
|
def __init__(self, temperature=temperature, |
|
top_k=top_k, |
|
top_p=top_p, |
|
repeat_penalty=repetition_penalty, |
|
seed=seed, |
|
stream=stream, |
|
max_tokens=max_new_tokens |
|
): |
|
self.temperature = temperature |
|
self.top_k = top_k |
|
self.top_p = top_p |
|
self.repeat_penalty = repeat_penalty |
|
self.seed = seed |
|
self.max_tokens=max_tokens |
|
self.stream = stream |
|
|
|
def update_temp(self, new_value): |
|
self.temperature = new_value |
|
|
|
|
|
|
|
def docs_to_faiss_save(docs_out:PandasDataFrame, embeddings=embeddings): |
|
|
|
print(f"> Total split documents: {len(docs_out)}") |
|
|
|
vectorstore_func = FAISS.from_documents(documents=docs_out, embedding=embeddings) |
|
|
|
''' |
|
#with open("vectorstore.pkl", "wb") as f: |
|
#pickle.dump(vectorstore, f) |
|
''' |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
global vectorstore |
|
|
|
vectorstore = vectorstore_func |
|
|
|
out_message = "Document processing complete" |
|
|
|
|
|
|
|
|
|
return out_message |
|
|
|
|
|
|
|
def base_prompt_templates(model_type = "Qwen 2 0.5B (small, fast)"): |
|
|
|
|
|
|
|
|
|
|
|
|
|
CONTENT_PROMPT = PromptTemplate( |
|
template="{page_content}\n\n", |
|
input_variables=["page_content"] |
|
) |
|
|
|
|
|
|
|
if model_type == "Qwen 2 0.5B (small, fast)": |
|
INSTRUCTION_PROMPT=PromptTemplate(template=instruction_prompt_qwen, input_variables=['question', 'summaries']) |
|
elif model_type == "Phi 3.5 Mini (larger, slow)": |
|
INSTRUCTION_PROMPT=PromptTemplate(template=instruction_prompt_phi3, input_variables=['question', 'summaries']) |
|
|
|
return INSTRUCTION_PROMPT, CONTENT_PROMPT |
|
|
|
def write_out_metadata_as_string(metadata_in): |
|
metadata_string = [f"{' '.join(f'{k}: {v}' for k, v in d.items() if k != 'page_section')}" for d in metadata_in] |
|
return metadata_string |
|
|
|
def generate_expanded_prompt(inputs: Dict[str, str], instruction_prompt, content_prompt, extracted_memory, vectorstore, embeddings, relevant_flag = True, out_passages = 2): |
|
|
|
question = inputs["question"] |
|
chat_history = inputs["chat_history"] |
|
|
|
print("relevant_flag in generate_expanded_prompt:", relevant_flag) |
|
|
|
|
|
if relevant_flag == True: |
|
new_question_kworded = adapt_q_from_chat_history(question, chat_history, extracted_memory) |
|
docs_keep_as_doc, doc_df, docs_keep_out = hybrid_retrieval(new_question_kworded, vectorstore, embeddings, k_val = 25, out_passages = out_passages, vec_score_cut_off = 0.85, vec_weight = 1, bm25_weight = 1, svm_weight = 1) |
|
else: |
|
new_question_kworded = question |
|
doc_df = pd.DataFrame() |
|
docs_keep_as_doc = [] |
|
docs_keep_out = [] |
|
|
|
if (not docs_keep_as_doc) | (doc_df.empty): |
|
sorry_prompt = """Say 'Sorry, there is no relevant information to answer this question.'""" |
|
return sorry_prompt, "No relevant sources found.", new_question_kworded |
|
|
|
|
|
print("Doc_df columns:", doc_df.columns) |
|
|
|
if 'meta_url' in doc_df.columns: |
|
file_type = determine_file_type(doc_df['meta_url'][0]) |
|
else: |
|
file_type = determine_file_type(doc_df['source'][0]) |
|
|
|
|
|
if (file_type != ".csv") & (file_type != ".xlsx"): |
|
docs_keep_as_doc, doc_df = get_expanded_passages(vectorstore, docs_keep_out, width=3) |
|
|
|
|
|
doc_df['meta_clean'] = write_out_metadata_as_string(doc_df["metadata"]) |
|
|
|
|
|
doc_df['page_content_no_meta'] = doc_df.apply(lambda row: row['page_content'].replace(row['meta_clean'] + ". ", ""), axis=1) |
|
doc_df['content_meta'] = doc_df['meta_clean'].astype(str) + ".<br><br>" + doc_df['page_content_no_meta'].astype(str) |
|
|
|
|
|
modified_page_content = [f" Document {i+1} - {word}" for i, word in enumerate(doc_df['content_meta'])] |
|
docs_content_string = '<br><br>'.join(modified_page_content) |
|
|
|
sources_docs_content_string = '<br><br>'.join(doc_df['content_meta']) |
|
|
|
instruction_prompt_out = instruction_prompt.format(question=new_question_kworded, summaries=docs_content_string) |
|
|
|
print('Final prompt is: ') |
|
print(instruction_prompt_out) |
|
|
|
return instruction_prompt_out, sources_docs_content_string, new_question_kworded |
|
|
|
def create_full_prompt(user_input, history, extracted_memory, vectorstore, embeddings, model_type, out_passages, api_model_choice=None, api_key=None, relevant_flag = True): |
|
|
|
|
|
|
|
|
|
print("\n==== date/time: " + str(datetime.datetime.now()) + " ====") |
|
|
|
|
|
history = history or [] |
|
|
|
if api_model_choice and api_model_choice != "None": |
|
print("API model choice detected") |
|
if api_key: |
|
print("API key detected") |
|
return history, "", None, relevant_flag |
|
else: |
|
return history, "", None, relevant_flag |
|
|
|
|
|
instruction_prompt, content_prompt = base_prompt_templates(model_type=model_type) |
|
|
|
if not user_input.strip(): |
|
user_input = "No user input found" |
|
relevant_flag = False |
|
else: |
|
relevant_flag = True |
|
|
|
print("User input:", user_input) |
|
|
|
instruction_prompt_out, docs_content_string, new_question_kworded =\ |
|
generate_expanded_prompt({"question": user_input, "chat_history": history}, |
|
instruction_prompt, content_prompt, extracted_memory, vectorstore, embeddings, relevant_flag, out_passages) |
|
|
|
history.append(user_input) |
|
|
|
print("Output history is:", history) |
|
print("Final prompt to model is:",instruction_prompt_out) |
|
|
|
return history, docs_content_string, instruction_prompt_out, relevant_flag |
|
|
|
|
|
import boto3 |
|
import json |
|
from chatfuncs.helper_functions import get_or_create_env_var |
|
|
|
|
|
class ResponseObject: |
|
def __init__(self, text, usage_metadata): |
|
self.text = text |
|
self.usage_metadata = usage_metadata |
|
|
|
max_tokens = 4096 |
|
|
|
AWS_DEFAULT_REGION = get_or_create_env_var('AWS_DEFAULT_REGION', 'eu-west-2') |
|
print(f'The value of AWS_DEFAULT_REGION is {AWS_DEFAULT_REGION}') |
|
|
|
bedrock_runtime = boto3.client('bedrock-runtime', region_name=AWS_DEFAULT_REGION) |
|
|
|
def call_aws_claude(prompt: str, system_prompt: str, temperature: float, max_tokens: int, model_choice: str) -> ResponseObject: |
|
""" |
|
This function sends a request to AWS Claude with the following parameters: |
|
- prompt: The user's input prompt to be processed by the model. |
|
- system_prompt: A system-defined prompt that provides context or instructions for the model. |
|
- temperature: A value that controls the randomness of the model's output, with higher values resulting in more diverse responses. |
|
- max_tokens: The maximum number of tokens (words or characters) in the model's response. |
|
- model_choice: The specific model to use for processing the request. |
|
|
|
The function constructs the request configuration, invokes the model, extracts the response text, and returns a ResponseObject containing the text and metadata. |
|
""" |
|
|
|
prompt_config = { |
|
"anthropic_version": "bedrock-2023-05-31", |
|
"max_tokens": max_tokens, |
|
"top_p": 0.999, |
|
"temperature":temperature, |
|
"system": system_prompt, |
|
"messages": [ |
|
{ |
|
"role": "user", |
|
"content": [ |
|
{"type": "text", "text": prompt}, |
|
], |
|
} |
|
], |
|
} |
|
|
|
body = json.dumps(prompt_config) |
|
|
|
modelId = model_choice |
|
accept = "application/json" |
|
contentType = "application/json" |
|
|
|
request = bedrock_runtime.invoke_model( |
|
body=body, modelId=modelId, accept=accept, contentType=contentType |
|
) |
|
|
|
|
|
response_body = json.loads(request.get("body").read()) |
|
text = response_body.get("content")[0].get("text") |
|
|
|
response = ResponseObject( |
|
text=text, |
|
usage_metadata=request['ResponseMetadata'] |
|
) |
|
|
|
|
|
|
|
print("Metadata:", response.usage_metadata) |
|
|
|
return response |
|
|
|
def produce_streaming_answer_chatbot(history, |
|
full_prompt, |
|
model_type, |
|
temperature=temperature, |
|
relevant_query_bool=True, |
|
max_new_tokens=max_new_tokens, |
|
sample=sample, |
|
repetition_penalty=repetition_penalty, |
|
top_p=top_p, |
|
top_k=top_k |
|
): |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if relevant_query_bool == False: |
|
out_message = [("","No relevant query found. Please retry your question")] |
|
history.append(out_message) |
|
|
|
yield history |
|
return |
|
|
|
if model_type == "Qwen 2 0.5B (small, fast)": |
|
|
|
model_inputs = tokenizer(text=full_prompt, return_tensors="pt", return_attention_mask=False).to(torch_device) |
|
|
|
|
|
|
|
streamer = TextIteratorStreamer(tokenizer, timeout=120., skip_prompt=True, skip_special_tokens=True) |
|
generate_kwargs = dict( |
|
model_inputs, |
|
streamer=streamer, |
|
max_new_tokens=max_new_tokens, |
|
do_sample=sample, |
|
repetition_penalty=repetition_penalty, |
|
top_p=top_p, |
|
temperature=temperature, |
|
top_k=top_k |
|
) |
|
|
|
|
|
|
|
t = Thread(target=model.generate, kwargs=generate_kwargs) |
|
t.start() |
|
|
|
|
|
start = time.time() |
|
NUM_TOKENS=0 |
|
print('-'*4+'Start Generation'+'-'*4) |
|
|
|
history[-1][1] = "" |
|
for new_text in streamer: |
|
try: |
|
if new_text == None: new_text = "" |
|
history[-1][1] += new_text |
|
NUM_TOKENS+=1 |
|
yield history |
|
except Exception as e: |
|
print(f"Error during text generation: {e}") |
|
|
|
time_generate = time.time() - start |
|
print('\n') |
|
print('-'*4+'End Generation'+'-'*4) |
|
print(f'Num of generated tokens: {NUM_TOKENS}') |
|
print(f'Time for complete generation: {time_generate}s') |
|
print(f'Tokens per secound: {NUM_TOKENS/time_generate}') |
|
print(f'Time per token: {(time_generate/NUM_TOKENS)*1000}ms') |
|
|
|
elif model_type == "Phi 3.5 Mini (larger, slow)": |
|
|
|
|
|
gen_config = CtransGenGenerationConfig() |
|
gen_config.update_temp(temperature) |
|
|
|
print(vars(gen_config)) |
|
|
|
|
|
start = time.time() |
|
NUM_TOKENS=0 |
|
print('-'*4+'Start Generation'+'-'*4) |
|
|
|
output = model( |
|
full_prompt, **vars(gen_config)) |
|
|
|
history[-1][1] = "" |
|
for out in output: |
|
|
|
if "choices" in out and len(out["choices"]) > 0 and "text" in out["choices"][0]: |
|
history[-1][1] += out["choices"][0]["text"] |
|
NUM_TOKENS+=1 |
|
yield history |
|
else: |
|
print(f"Unexpected output structure: {out}") |
|
|
|
time_generate = time.time() - start |
|
print('\n') |
|
print('-'*4+'End Generation'+'-'*4) |
|
print(f'Num of generated tokens: {NUM_TOKENS}') |
|
print(f'Time for complete generation: {time_generate}s') |
|
print(f'Tokens per secound: {NUM_TOKENS/time_generate}') |
|
print(f'Time per token: {(time_generate/NUM_TOKENS)*1000}ms') |
|
|
|
elif model_type == "anthropic.claude-3-haiku-20240307-v1:0" or model_type == "anthropic.claude-3-sonnet-20240229-v1:0": |
|
system_prompt = "You are answering questions from the user based on source material. Respond with short, factually correct answers." |
|
|
|
try: |
|
print("Calling AWS Claude model") |
|
response = call_aws_claude(full_prompt, system_prompt, temperature, max_tokens, model_type) |
|
except Exception as e: |
|
|
|
print(e) |
|
try: |
|
out_message = "API limit hit - waiting 30 seconds to retry." |
|
print(out_message) |
|
|
|
time.sleep(30) |
|
response = call_aws_claude(full_prompt, system_prompt, temperature, max_tokens, model_type) |
|
|
|
except Exception as e: |
|
print(e) |
|
return "", history |
|
|
|
history.append({'role': 'user', 'parts': [full_prompt]}) |
|
history.append({'role': 'assistant', 'parts': [response.text]}) |
|
|
|
|
|
|
|
|
|
return response, history |
|
|
|
|
|
|
|
def adapt_q_from_chat_history(question, chat_history, extracted_memory, keyword_model=""): |
|
|
|
chat_history_str, chat_history_first_q, chat_history_first_ans, max_memory_length = _get_chat_history(chat_history) |
|
|
|
if chat_history_str: |
|
|
|
|
|
|
|
|
|
new_question_kworded = str(extracted_memory) + ". " + question |
|
|
|
|
|
else: |
|
new_question_kworded = question |
|
|
|
|
|
|
|
return new_question_kworded |
|
|
|
def determine_file_type(file_path): |
|
""" |
|
Determine the file type based on its extension. |
|
|
|
Parameters: |
|
file_path (str): Path to the file. |
|
|
|
Returns: |
|
str: File extension (e.g., '.pdf', '.docx', '.txt', '.html'). |
|
""" |
|
return os.path.splitext(file_path)[1].lower() |
|
|
|
|
|
def create_doc_df(docs_keep_out): |
|
|
|
content=[] |
|
meta=[] |
|
meta_url=[] |
|
page_section=[] |
|
score=[] |
|
|
|
doc_df = pd.DataFrame() |
|
|
|
|
|
|
|
for item in docs_keep_out: |
|
content.append(item[0].page_content) |
|
meta.append(item[0].metadata) |
|
meta_url.append(item[0].metadata['source']) |
|
|
|
file_extension = determine_file_type(item[0].metadata['source']) |
|
if (file_extension != ".csv") & (file_extension != ".xlsx"): |
|
page_section.append(item[0].metadata['page_section']) |
|
else: page_section.append("") |
|
score.append(item[1]) |
|
|
|
|
|
|
|
doc_df = pd.DataFrame(list(zip(content, meta, page_section, meta_url, score)), |
|
columns =['page_content', 'metadata', 'page_section', 'meta_url', 'score']) |
|
|
|
docs_content = doc_df['page_content'].astype(str) |
|
doc_df['full_url'] = "https://" + doc_df['meta_url'] |
|
|
|
return doc_df |
|
|
|
def hybrid_retrieval(new_question_kworded, vectorstore, embeddings, k_val, out_passages, |
|
vec_score_cut_off, vec_weight, bm25_weight, svm_weight): |
|
|
|
|
|
|
|
doc_df = pd.DataFrame() |
|
|
|
|
|
docs = vectorstore.similarity_search_with_score(new_question_kworded, k=k_val) |
|
|
|
print("Docs from similarity search:") |
|
print(docs) |
|
|
|
|
|
docs_len = [len(x[0].page_content) for x in docs] |
|
docs_scores = [x[1] for x in docs] |
|
|
|
|
|
score_more_limit = pd.Series(docs_scores) < vec_score_cut_off |
|
docs_keep = list(compress(docs, score_more_limit)) |
|
|
|
if not docs_keep: |
|
return [], pd.DataFrame(), [] |
|
|
|
|
|
length_more_limit = pd.Series(docs_len) >= 100 |
|
docs_keep = list(compress(docs_keep, length_more_limit)) |
|
|
|
if not docs_keep: |
|
return [], pd.DataFrame(), [] |
|
|
|
docs_keep_as_doc = [x[0] for x in docs_keep] |
|
docs_keep_length = len(docs_keep_as_doc) |
|
|
|
|
|
|
|
if docs_keep_length == 1: |
|
|
|
content=[] |
|
meta_url=[] |
|
score=[] |
|
|
|
for item in docs_keep: |
|
content.append(item[0].page_content) |
|
meta_url.append(item[0].metadata['source']) |
|
score.append(item[1]) |
|
|
|
|
|
|
|
doc_df = pd.DataFrame(list(zip(content, meta_url, score)), |
|
columns =['page_content', 'meta_url', 'score']) |
|
|
|
docs_content = doc_df['page_content'].astype(str) |
|
docs_url = doc_df['meta_url'] |
|
|
|
return docs_keep_as_doc, doc_df, docs_content, docs_url |
|
|
|
|
|
if out_passages > docs_keep_length: |
|
out_passages = docs_keep_length |
|
k_val = docs_keep_length |
|
|
|
vec_rank = [*range(1, docs_keep_length+1)] |
|
vec_score = [(docs_keep_length/x)*vec_weight for x in vec_rank] |
|
|
|
print("Number of documents remaining: ", docs_keep_length) |
|
|
|
|
|
|
|
content_keep=[] |
|
for item in docs_keep: |
|
content_keep.append(item[0].page_content) |
|
|
|
|
|
corpus = [doc.lower() for doc in content_keep] |
|
|
|
stemmer = Stemmer.Stemmer("english") |
|
corpus_tokens = bm25s.tokenize(corpus, stopwords="en", stemmer=stemmer) |
|
|
|
|
|
retriever = bm25s.BM25() |
|
retriever.index(corpus_tokens) |
|
|
|
|
|
query_tokens = bm25s.tokenize(new_question_kworded.lower(), stemmer=stemmer) |
|
results, scores = retriever.retrieve(query_tokens, corpus=corpus, k=len(corpus)) |
|
|
|
for i in range(results.shape[1]): |
|
doc, score = results[0, i], scores[0, i] |
|
print(f"Rank {i+1} (score: {score:.2f}): {doc}") |
|
|
|
|
|
|
|
|
|
|
|
bm25_rank = list(range(1, len(results[0]) + 1)) |
|
|
|
bm25_score = [(docs_keep_length / (rank + 1)) * bm25_weight for rank in bm25_rank] |
|
|
|
|
|
|
|
pairs = list(zip(bm25_rank, docs_keep_as_doc)) |
|
pairs.sort() |
|
bm25_result = [value for rank, value in pairs] |
|
|
|
|
|
|
|
|
|
embeddings_type = type(embeddings) |
|
print("Type of embeddings object:", embeddings_type) |
|
|
|
|
|
print("embeddings:", embeddings) |
|
|
|
from langchain_huggingface import HuggingFaceEmbeddings |
|
|
|
hf_embeddings = embeddings |
|
|
|
svm_retriever = SVMRetriever.from_texts(content_keep, hf_embeddings, k = k_val) |
|
svm_result = svm_retriever.invoke(new_question_kworded) |
|
|
|
|
|
svm_rank=[] |
|
svm_score = [] |
|
|
|
for vec_item in docs_keep: |
|
x = 0 |
|
for svm_item in svm_result: |
|
x = x + 1 |
|
if svm_item.page_content == vec_item[0].page_content: |
|
svm_rank.append(x) |
|
svm_score.append((docs_keep_length/x)*svm_weight) |
|
|
|
|
|
|
|
final_score = [a + b + c for a, b, c in zip(vec_score, bm25_score, svm_score)] |
|
final_rank = [sorted(final_score, reverse=True).index(x)+1 for x in final_score] |
|
|
|
final_rank = list(pd.Series(final_rank).rank(method='first')) |
|
|
|
|
|
|
|
|
|
best_rank_index_pos = [] |
|
|
|
for x in range(1,out_passages+1): |
|
try: |
|
best_rank_index_pos.append(final_rank.index(x)) |
|
except IndexError: |
|
pass |
|
|
|
|
|
|
|
best_rank_pos_series = pd.Series(best_rank_index_pos) |
|
|
|
|
|
docs_keep_out = [docs_keep[i] for i in best_rank_index_pos] |
|
|
|
|
|
docs_keep_as_doc = [x[0] for x in docs_keep_out] |
|
|
|
|
|
doc_df = create_doc_df(docs_keep_out) |
|
|
|
print("doc_df:",doc_df) |
|
print("docs_keep_as_doc:",docs_keep_as_doc) |
|
print("docs_keep_out:", docs_keep_out) |
|
|
|
return docs_keep_as_doc, doc_df, docs_keep_out |
|
|
|
def get_expanded_passages(vectorstore, docs, width): |
|
|
|
""" |
|
Extracts expanded passages based on given documents and a width for context. |
|
|
|
Parameters: |
|
- vectorstore: The primary data source. |
|
- docs: List of documents to be expanded. |
|
- width: Number of documents to expand around a given document for context. |
|
|
|
Returns: |
|
- expanded_docs: List of expanded Document objects. |
|
- doc_df: DataFrame representation of expanded_docs. |
|
""" |
|
|
|
from collections import defaultdict |
|
|
|
def get_docs_from_vstore(vectorstore): |
|
vector = vectorstore.docstore._dict |
|
return list(vector.items()) |
|
|
|
def extract_details(docs_list): |
|
docs_list_out = [tup[1] for tup in docs_list] |
|
content = [doc.page_content for doc in docs_list_out] |
|
meta = [doc.metadata for doc in docs_list_out] |
|
return ''.join(content), meta[0], meta[-1] |
|
|
|
def get_parent_content_and_meta(vstore_docs, width, target): |
|
|
|
target_range = range(max(0, target), min(len(vstore_docs), target + width + 1)) |
|
parent_vstore_out = [vstore_docs[i] for i in target_range] |
|
|
|
content_str_out, meta_first_out, meta_last_out = [], [], [] |
|
for _ in parent_vstore_out: |
|
content_str, meta_first, meta_last = extract_details(parent_vstore_out) |
|
content_str_out.append(content_str) |
|
meta_first_out.append(meta_first) |
|
meta_last_out.append(meta_last) |
|
return content_str_out, meta_first_out, meta_last_out |
|
|
|
def merge_dicts_except_source(d1, d2): |
|
merged = {} |
|
for key in d1: |
|
if key != "source": |
|
merged[key] = str(d1[key]) + " to " + str(d2[key]) |
|
else: |
|
merged[key] = d1[key] |
|
return merged |
|
|
|
def merge_two_lists_of_dicts(list1, list2): |
|
return [merge_dicts_except_source(d1, d2) for d1, d2 in zip(list1, list2)] |
|
|
|
|
|
vstore_docs = get_docs_from_vstore(vectorstore) |
|
doc_sources = {doc.metadata['source'] for doc, _ in docs} |
|
vstore_docs = [(k, v) for k, v in vstore_docs if v.metadata.get('source') in doc_sources] |
|
|
|
|
|
vstore_by_source = defaultdict(list) |
|
for k, v in vstore_docs: |
|
vstore_by_source[v.metadata['source']].append((k, v)) |
|
|
|
expanded_docs = [] |
|
for doc, score in docs: |
|
search_source = doc.metadata['source'] |
|
|
|
|
|
|
|
|
|
|
|
|
|
search_section = doc.metadata['page_section'] |
|
parent_vstore_meta_section = [doc.metadata['page_section'] for _, doc in vstore_by_source[search_source]] |
|
search_index = parent_vstore_meta_section.index(search_section) if search_section in parent_vstore_meta_section else -1 |
|
|
|
content_str, meta_first, meta_last = get_parent_content_and_meta(vstore_by_source[search_source], width, search_index) |
|
meta_full = merge_two_lists_of_dicts(meta_first, meta_last) |
|
|
|
expanded_doc = (Document(page_content=content_str[0], metadata=meta_full[0]), score) |
|
expanded_docs.append(expanded_doc) |
|
|
|
doc_df = pd.DataFrame() |
|
|
|
doc_df = create_doc_df(expanded_docs) |
|
|
|
return expanded_docs, doc_df |
|
|
|
def highlight_found_text(search_text: str, full_text: str, hlt_chunk_size:int=hlt_chunk_size, hlt_strat:List=hlt_strat, hlt_overlap:int=hlt_overlap) -> str: |
|
""" |
|
Highlights occurrences of search_text within full_text. |
|
|
|
Parameters: |
|
- search_text (str): The text to be searched for within full_text. |
|
- full_text (str): The text within which search_text occurrences will be highlighted. |
|
|
|
Returns: |
|
- str: A string with occurrences of search_text highlighted. |
|
|
|
Example: |
|
>>> highlight_found_text("world", "Hello, world! This is a test. Another world awaits.") |
|
'Hello, <mark style="color:black;">world</mark>! This is a test. Another <mark style="color:black;">world</mark> awaits.' |
|
""" |
|
|
|
def extract_text_from_input(text, i=0): |
|
if isinstance(text, str): |
|
return text.replace(" ", " ").strip() |
|
elif isinstance(text, list): |
|
return text[i][0].replace(" ", " ").strip() |
|
else: |
|
return "" |
|
|
|
def extract_search_text_from_input(text): |
|
if isinstance(text, str): |
|
return text.replace(" ", " ").strip() |
|
elif isinstance(text, list): |
|
return text[-1][1].replace(" ", " ").strip() |
|
else: |
|
return "" |
|
|
|
full_text = extract_text_from_input(full_text) |
|
search_text = extract_search_text_from_input(search_text) |
|
|
|
|
|
|
|
text_splitter = RecursiveCharacterTextSplitter( |
|
chunk_size=hlt_chunk_size, |
|
separators=hlt_strat, |
|
chunk_overlap=hlt_overlap, |
|
) |
|
sections = text_splitter.split_text(search_text) |
|
|
|
found_positions = {} |
|
for x in sections: |
|
text_start_pos = 0 |
|
while text_start_pos != -1: |
|
text_start_pos = full_text.find(x, text_start_pos) |
|
if text_start_pos != -1: |
|
found_positions[text_start_pos] = text_start_pos + len(x) |
|
text_start_pos += 1 |
|
|
|
|
|
sorted_starts = sorted(found_positions.keys()) |
|
combined_positions = [] |
|
if sorted_starts: |
|
current_start, current_end = sorted_starts[0], found_positions[sorted_starts[0]] |
|
for start in sorted_starts[1:]: |
|
if start <= (current_end + 10): |
|
current_end = max(current_end, found_positions[start]) |
|
else: |
|
combined_positions.append((current_start, current_end)) |
|
current_start, current_end = start, found_positions[start] |
|
combined_positions.append((current_start, current_end)) |
|
|
|
|
|
pos_tokens = [] |
|
prev_end = 0 |
|
for start, end in combined_positions: |
|
if end-start > 15: |
|
pos_tokens.append(full_text[prev_end:start]) |
|
pos_tokens.append('<mark style="color:black;">' + full_text[start:end] + '</mark>') |
|
prev_end = end |
|
pos_tokens.append(full_text[prev_end:]) |
|
|
|
return "".join(pos_tokens) |
|
|
|
|
|
|
|
|
|
def clear_chat(chat_history_state, sources, chat_message, current_topic): |
|
chat_history_state = [] |
|
sources = '' |
|
chat_message = '' |
|
current_topic = '' |
|
|
|
return chat_history_state, sources, chat_message, current_topic |
|
|
|
def _get_chat_history(chat_history: List[Tuple[str, str]], max_memory_length:int = max_memory_length): |
|
|
|
if (not chat_history) | (max_memory_length == 0): |
|
chat_history = [] |
|
|
|
if len(chat_history) > max_memory_length: |
|
chat_history = chat_history[-max_memory_length:] |
|
|
|
|
|
|
|
first_q = "" |
|
first_ans = "" |
|
for human_s, ai_s in chat_history: |
|
first_q = human_s |
|
first_ans = ai_s |
|
|
|
|
|
break |
|
|
|
conversation = "" |
|
for human_s, ai_s in chat_history: |
|
human = f"Human: " + human_s |
|
ai = f"Assistant: " + ai_s |
|
conversation += "\n" + "\n".join([human, ai]) |
|
|
|
return conversation, first_q, first_ans, max_memory_length |
|
|
|
def add_inputs_answer_to_history(user_message, history, current_topic): |
|
|
|
if history is None: |
|
history = [("","")] |
|
|
|
|
|
|
|
chat_history_str, chat_history_first_q, chat_history_first_ans, max_memory_length = _get_chat_history(history) |
|
|
|
|
|
|
|
if (len(history) == 1) | (len(history) > max_memory_length): |
|
|
|
|
|
|
|
|
|
first_q_and_first_ans = str(chat_history_first_q) + " " + str(chat_history_first_ans) |
|
|
|
keywords = keybert_keywords(first_q_and_first_ans, n = 8, kw_model=kw_model) |
|
|
|
|
|
|
|
ordered_tokens = set() |
|
result = [] |
|
for word in keywords: |
|
if word not in ordered_tokens: |
|
ordered_tokens.add(word) |
|
result.append(word) |
|
|
|
extracted_memory = ' '.join(result) |
|
|
|
else: extracted_memory=current_topic |
|
|
|
print("Extracted memory is:") |
|
print(extracted_memory) |
|
|
|
|
|
return history, extracted_memory |
|
|
|
|
|
|
|
def remove_q_stopwords(question): |
|
|
|
text = question.lower() |
|
|
|
|
|
text = re.sub('[0-9]', '', text) |
|
|
|
tokenizer = RegexpTokenizer(r'\w+') |
|
text_tokens = tokenizer.tokenize(text) |
|
|
|
tokens_without_sw = [word for word in text_tokens if not word in stopwords] |
|
|
|
|
|
ordered_tokens = set() |
|
result = [] |
|
for word in tokens_without_sw: |
|
if word not in ordered_tokens: |
|
ordered_tokens.add(word) |
|
result.append(word) |
|
|
|
|
|
|
|
new_question_keywords = ' '.join(result) |
|
return new_question_keywords |
|
|
|
def remove_q_ner_extractor(question): |
|
|
|
predict_out = ner_model.predict(question) |
|
|
|
|
|
|
|
predict_tokens = [' '.join(v for k, v in d.items() if k == 'span') for d in predict_out] |
|
|
|
|
|
ordered_tokens = set() |
|
result = [] |
|
for word in predict_tokens: |
|
if word not in ordered_tokens: |
|
ordered_tokens.add(word) |
|
result.append(word) |
|
|
|
|
|
|
|
new_question_keywords = ' '.join(result).lower() |
|
return new_question_keywords |
|
|
|
def apply_lemmatize(text, wnl=WordNetLemmatizer()): |
|
|
|
def prep_for_lemma(text): |
|
|
|
|
|
text = re.sub('[0-9]', '', text) |
|
print(text) |
|
|
|
tokenizer = RegexpTokenizer(r'\w+') |
|
text_tokens = tokenizer.tokenize(text) |
|
|
|
|
|
return text_tokens |
|
|
|
tokens = prep_for_lemma(text) |
|
|
|
def lem_word(word): |
|
|
|
if len(word) > 3: out_word = wnl.lemmatize(word) |
|
else: out_word = word |
|
|
|
return out_word |
|
|
|
return [lem_word(token) for token in tokens] |
|
|
|
def keybert_keywords(text, n, kw_model): |
|
tokens_lemma = apply_lemmatize(text) |
|
lemmatised_text = ' '.join(tokens_lemma) |
|
|
|
keywords_text = KeyBERT(model=kw_model).extract_keywords(lemmatised_text, stop_words='english', top_n=n, |
|
keyphrase_ngram_range=(1, 1)) |
|
keywords_list = [item[0] for item in keywords_text] |
|
|
|
return keywords_list |
|
|
|
|
|
def turn_off_interactivity(user_message, history): |
|
return gr.update(value="", interactive=False), history + [[user_message, None]] |
|
|
|
def restore_interactivity(): |
|
return gr.update(interactive=True) |
|
|
|
def update_message(dropdown_value): |
|
return gr.Textbox(value=dropdown_value) |
|
|
|
def hide_block(): |
|
return gr.Radio(visible=False) |
|
|
|
|
|
|
|
def vote(data: gr.LikeData, chat_history, instruction_prompt_out, model_type): |
|
import os |
|
import pandas as pd |
|
|
|
chat_history_last = str(str(chat_history[-1][0]) + " - " + str(chat_history[-1][1])) |
|
|
|
response_df = pd.DataFrame(data={"thumbs_up":data.liked, |
|
"chosen_response":data.value, |
|
"input_prompt":instruction_prompt_out, |
|
"chat_history":chat_history_last, |
|
"model_type": model_type, |
|
"date_time": pd.Timestamp.now()}, index=[0]) |
|
|
|
if data.liked: |
|
print("You upvoted this response: " + data.value) |
|
|
|
if os.path.isfile("thumbs_up_data.csv"): |
|
existing_thumbs_up_df = pd.read_csv("thumbs_up_data.csv") |
|
thumbs_up_df_concat = pd.concat([existing_thumbs_up_df, response_df], ignore_index=True).drop("Unnamed: 0",axis=1, errors="ignore") |
|
thumbs_up_df_concat.to_csv("thumbs_up_data.csv") |
|
else: |
|
response_df.to_csv("thumbs_up_data.csv") |
|
|
|
else: |
|
print("You downvoted this response: " + data.value) |
|
|
|
if os.path.isfile("thumbs_down_data.csv"): |
|
existing_thumbs_down_df = pd.read_csv("thumbs_down_data.csv") |
|
thumbs_down_df_concat = pd.concat([existing_thumbs_down_df, response_df], ignore_index=True).drop("Unnamed: 0",axis=1, errors="ignore") |
|
thumbs_down_df_concat.to_csv("thumbs_down_data.csv") |
|
else: |
|
response_df.to_csv("thumbs_down_data.csv") |
|
|