Spaces:
Sleeping
Sleeping
import os | |
import string | |
from typing import Any, Dict, List, Tuple, Union | |
import chromadb | |
import numpy as np | |
import openai | |
import pandas as pd | |
import requests | |
import streamlit as st | |
from datasets import load_dataset | |
from langchain.document_loaders import TextLoader | |
from langchain.embeddings.sentence_transformer import SentenceTransformerEmbeddings | |
from langchain.text_splitter import CharacterTextSplitter | |
from langchain.vectorstores import Chroma | |
from scipy.spatial.distance import cosine | |
openai.api_key = os.environ["OPENAI_API_KEY"] | |
def merge_dataframes(dataframes: List[pd.DataFrame]) -> pd.DataFrame: | |
""" | |
Merges a list of pandas DataFrames into a single DataFrame. | |
This function concatenates the given DataFrames and filters the resulting DataFrame to only include the columns 'context', 'questions', and 'answers'. | |
Parameters: | |
dataframes (List[pd.DataFrame]): A list of DataFrames to be merged. | |
Returns: | |
pd.DataFrame: The concatenated DataFrame containing only the specified columns. | |
""" | |
# Concatenate the list of dataframes | |
combined_dataframe = pd.concat( | |
dataframes, ignore_index=True | |
) # Combine all dataframes into one | |
# Ensure that the resulting dataframe only contains the columns "context", "questions", "answers" | |
combined_dataframe = combined_dataframe[ | |
["context", "questions", "answers"] | |
] # Filter for specific columns | |
return combined_dataframe # Return the merged and filtered DataFrame | |
def call_chatgpt(prompt: str) -> str: | |
""" | |
Uses the OpenAI API to generate an AI response to a prompt. | |
Args: | |
prompt: A string representing the prompt to send to the OpenAI API. | |
Returns: | |
A string representing the AI's generated response. | |
""" | |
# Use the OpenAI API to generate a response based on the input prompt. | |
response = openai.Completion.create( | |
model="gpt-3.5-turbo-instruct", | |
prompt=prompt, | |
temperature=0.5, | |
max_tokens=500, | |
top_p=1, | |
frequency_penalty=0, | |
presence_penalty=0, | |
) | |
# Extract the text from the first (and only) choice in the response output. | |
ans = response.choices[0]["text"] | |
# Return the generated AI response. | |
return ans | |
def openai_text_embedding(prompt: str) -> str: | |
""" | |
Retrieves the text embedding for a given prompt using OpenAI's text-embedding model. | |
This function utilizes OpenAI's API to generate an embedding for the input text. It specifically uses the "text-embedding-ada-002" model. | |
Parameters: | |
prompt (str): The text input for which to generate an embedding. | |
Returns: | |
str: A string representation of the text embedding. | |
""" | |
# Call OpenAI API to create a text embedding | |
return openai.Embedding.create(input=prompt, model="text-embedding-ada-002")[ | |
"data" | |
][0][ | |
"embedding" | |
] # Retrieve the embedding from the response | |
def calculate_sts_openai_score(sentence1: str, sentence2: str) -> float: | |
""" | |
Calculates the Semantic Textual Similarity (STS) between two sentences using OpenAI's text-embedding model. | |
This function computes embeddings for each sentence and then calculates the cosine similarity between these embeddings. A higher score indicates greater similarity. | |
Parameters: | |
sentence1 (str): The first sentence for similarity comparison. | |
sentence2 (str): The second sentence for similarity comparison. | |
Returns: | |
float: The STS score representing the similarity between sentence1 and sentence2. | |
""" | |
# Compute sentence embeddings | |
embedding1 = openai_text_embedding(sentence1) # Flatten the embedding array | |
embedding2 = openai_text_embedding(sentence2) # Flatten the embedding array | |
# Convert embeddings to NumPy arrays | |
embedding1 = np.asarray(embedding1) | |
embedding2 = np.asarray(embedding2) | |
# Calculate cosine similarity between the embeddings | |
# Since 'cosine' returns the distance, 1 - distance is used to get similarity | |
similarity_score = 1 - cosine(embedding1, embedding2) | |
return similarity_score | |
def add_dist_score_column( | |
dataframe: pd.DataFrame, | |
sentence: str, | |
) -> pd.DataFrame: | |
""" | |
Adds a new column to the provided DataFrame with STS (Semantic Textual Similarity) scores, | |
calculated between a given sentence and each question in the 'questions' column of the DataFrame. | |
The DataFrame is then sorted by this new column in descending order and the top 5 rows are returned. | |
Parameters: | |
dataframe (pd.DataFrame): A pandas DataFrame containing a 'questions' column. | |
sentence (str): The sentence against which to compute STS scores for each question in the DataFrame. | |
Returns: | |
pd.DataFrame: A DataFrame containing the original data along with the new 'stsopenai' column, | |
sorted by the 'stsopenai' column, and limited to the top 5 entries with the highest scores. | |
""" | |
# Calculate the STS score between `sentence` and each row's `question` | |
dataframe["stsopenai"] = dataframe["questions"].apply( | |
lambda x: calculate_sts_openai_score(str(x), sentence) | |
) | |
# Sort the dataframe by the newly added 'stsopenai' column in descending order | |
sorted_dataframe = dataframe.sort_values(by="stsopenai", ascending=False) | |
# Return the top 5 rows from the sorted dataframe | |
return sorted_dataframe.iloc[:5, :] | |
def convert_to_list_of_dict(df: pd.DataFrame) -> List[Dict[str, str]]: | |
""" | |
Reads in a pandas DataFrame and produces a list of dictionaries with two keys each, 'question' and 'answer.' | |
Args: | |
df: A pandas DataFrame with columns named 'questions' and 'answers'. | |
Returns: | |
A list of dictionaries, with each dictionary containing a 'question' and 'answer' key-value pair. | |
""" | |
# Initialize an empty list to store the dictionaries | |
result = [] | |
# Loop through each row of the DataFrame | |
for index, row in df.iterrows(): | |
# Create a dictionary with the current question and answer | |
qa_dict_quest = {"role": "user", "content": row["questions"]} | |
qa_dict_ans = {"role": "assistant", "content": row["answers"]} | |
# Add the dictionary to the result list | |
result.append(qa_dict_quest) | |
result.append(qa_dict_ans) | |
# Return the list of dictionaries | |
return result | |
def query(payload: Dict[str, Any]) -> Dict[str, Any]: | |
""" | |
Sends a JSON payload to a predefined API URL and returns the JSON response. | |
Args: | |
payload (Dict[str, Any]): The JSON payload to be sent to the API. | |
Returns: | |
Dict[str, Any]: The JSON response received from the API. | |
""" | |
# API endpoint URL | |
API_URL = "https://sks7h7h5qkhoxwxo.us-east-1.aws.endpoints.huggingface.cloud" | |
# Headers to indicate both the request and response formats are JSON | |
headers = {"Accept": "application/json", "Content-Type": "application/json"} | |
# Sending a POST request with the JSON payload and headers | |
response = requests.post(API_URL, headers=headers, json=payload) | |
# Returning the JSON response | |
return response.json() | |
def llama2_7b_ysa(prompt: str) -> str: | |
""" | |
Queries a model and retrieves the generated text based on the given prompt. | |
This function sends a prompt to a model (presumably named 'llama2_7b') and extracts | |
the generated text from the model's response. It's tailored for handling responses | |
from a specific API or model query structure where the response is expected to be | |
a list of dictionaries, with at least one dictionary containing a key 'generated_text'. | |
Parameters: | |
- prompt (str): The text prompt to send to the model. | |
Returns: | |
- str: The generated text response from the model. | |
Note: | |
- The function assumes that the 'query' function is previously defined and accessible | |
within the same scope or module. It should send a request to the model and return | |
the response in a structured format. | |
- The 'parameters' dictionary is passed empty but can be customized to include specific | |
request parameters as needed by the model API. | |
""" | |
# Define the query payload with the prompt and any additional parameters | |
query_payload: Dict[str, Any] = { | |
"inputs": prompt, | |
"parameters": {"max_new_tokens": 20}, | |
} | |
# Send the query to the model and store the output response | |
output = query(query_payload) | |
# Extract the 'generated_text' from the first item in the response list | |
response: str = output[0]["generated_text"] | |
return response | |
def quantize_to_kbit(arr: Union[np.ndarray, Any], k: int = 16) -> np.ndarray: | |
"""Converts an array to a k-bit representation by normalizing and scaling its values. | |
Args: | |
arr (Union[np.ndarray, Any]): The input array to be quantized. | |
k (int): The number of levels to quantize to. Defaults to 16 for 4-bit quantization. | |
Returns: | |
np.ndarray: The quantized array with values scaled to 0 to k-1. | |
""" | |
if not isinstance(arr, np.ndarray): # Check if input is not a numpy array | |
arr = np.array(arr) # Convert input to a numpy array | |
arr_min = arr.min() # Calculate the minimum value in the array | |
arr_max = arr.max() # Calculate the maximum value in the array | |
normalized_arr = (arr - arr_min) / (arr_max - arr_min) # Normalize array values to [0, 1] | |
return np.round(normalized_arr * (k - 1)).astype(int) # Scale normalized values to 0-(k-1) and convert to integer | |
def quantized_influence(arr1: np.ndarray, arr2: np.ndarray, k: int = 16, use_dagger: bool = False) -> Tuple[float, List[float]]: | |
""" | |
Calculates a weighted measure of influence based on quantized version of input arrays and optionally applies a transformation. | |
Args: | |
arr1 (np.ndarray): First input array to be quantized and analyzed. | |
arr2 (np.ndarray): Second input array to be quantized and used for influence measurement. | |
k (int): The quantization level, defaults to 16 for 4-bit quantization. | |
use_dagger (bool): Flag to apply a transformation based on local averages, defaults to False. | |
Returns: | |
Tuple[float, List[float]]: A tuple containing the quantized influence measure and an optional list of transformed values based on local estimates. | |
""" | |
# Quantize both arrays to k levels | |
arr1_quantized = quantize_to_kbit(arr1, k) | |
arr2_quantized = quantize_to_kbit(arr2, k) | |
# Find unique quantized values in arr1 | |
unique_values = np.unique(arr1_quantized) | |
# Compute the global average of quantized arr2 | |
total_samples = len(arr2_quantized) | |
y_bar_global = np.mean(arr2_quantized) | |
# Compute weighted local averages and normalize | |
weighted_local_averages = [(np.mean(arr2_quantized[arr1_quantized == val]) - y_bar_global)**2 * len(arr2_quantized[arr1_quantized == val])**2 for val in unique_values] | |
qim = np.sum(weighted_local_averages) / (total_samples * np.std(arr2_quantized)) # Calculate the quantized influence measure | |
if use_dagger: | |
# If use_dagger is True, compute local estimates and map them to unique quantized values | |
local_estimates = [np.mean(arr2_quantized[arr1_quantized == val]) for val in unique_values] | |
daggers = {unique_values[i]: v for i, v in enumerate(local_estimates)} # Map unique values to local estimates | |
def find_val_(i: int) -> float: | |
"""Helper function to map quantized values to their local estimates.""" | |
return daggers[i] | |
# Apply transformation based on local estimates | |
daggered_values = list(map(find_val_, arr1_quantized)) | |
else: | |
# If use_dagger is False, return the original quantized arr1 values | |
daggered_values = arr1_quantized.tolist() | |
return qim, daggered_values | |