Quantx-WA / Llama_Utility.py
rairo's picture
Update Llama_Utility.py
29bd2f1 verified
import re
import os
from datetime import datetime
import openai
from google.cloud import firestore
from dotenv import load_dotenv
import pandas as pd
import ast
import json
from pandasai.responses.response_parser import ResponseParser
# Load environment variables and configure Firestore
load_dotenv()
db = firestore.Client.from_service_account_json("firestore-key.json")
from langchain_google_genai import ChatGoogleGenerativeAI
# Initialize the ChatSambaNovaCloud LLM client (using the latest model)
from langchain_community.chat_models.sambanova import ChatSambaNovaCloud
import google.generativeai as genai
llm = ChatSambaNovaCloud(
model="Meta-Llama-3.1-70B-Instruct",
max_tokens=1024,
temperature=0.7,
top_k=1,
top_p=0.01,
)
# Response parser for formatting outputs from pandasai
class FlaskResponse(ResponseParser):
def __init__(self, context) -> None:
super().__init__(context)
def format_dataframe(self, result):
return result['value'].to_html()
def format_plot(self, result):
# Save the plot using savefig
try:
img_path = result['value']
except ValueError:
img_path = str(result['value'])
print("value error!", img_path)
print("response_class_path:", img_path)
return img_path
def format_other(self, result):
return str(result['value'])
def generateResponse(prompt, model='Meta-Llama-3.1-70B-Instruct'):
# Templates for extracting transaction information
relevant_info_template = """
Intent: The CRUD operation (create, read, update, delete)
Transaction Type: e.g. Purchase, Sale, Inventory
Details: A list of key detail fields extracted.
"""
sample_single_transaction_template = """
*Intent*: Create
*Transaction Type*: Purchase
*Details*: - Item: Car, - Quantity: 1, - Cost: 10000, etc.
"""
sample_multi_transaction_template = """
*Intent*: Create
Transaction 1:
*Transaction Type*: Purchase
*Details*: - Item: Car, - Quantity: 1, etc.
Transaction 2:
*Transaction Type*: Sale
*Details*: - Item: Chair, - Quantity: 2, etc.
"""
response = openai.OpenAI(
api_key=os.environ.get("SAMBANOVA_API_KEY"),
base_url="https://api.sambanova.ai/v1",
).chat.completions.create(
model=model,
messages=[
{"role": "system", "content":
f"You are a helpful assistant that classifies transactions. Format your output with these guidelines: {relevant_info_template} "
f"Sample single transaction: {sample_single_transaction_template} "
f"Sample multi-transaction: {sample_multi_transaction_template}"},
{"role": "user", "content": prompt}
]
)
try:
response_text = response.choices[0].message.content
except Exception as e:
print(f'An error occurred: {str(e)}')
response_text = None
return response_text
def parse_value(value):
value = value.strip()
try:
# Try to detect currency symbols or codes
currency_match = re.search(r"([A-Z]{3}|\$|€|£)", value)
currency = currency_match.group(1) if currency_match else None
# Remove currency symbols/codes for numeric conversion
cleaned_value = re.sub(r"([A-Z]{3}|\$|€|£)", "", value).replace(",", "").strip()
if "%" in cleaned_value:
return float(cleaned_value.replace("%", "")), currency
elif cleaned_value.replace(".", "", 1).isdigit():
return float(cleaned_value) if "." in cleaned_value else int(cleaned_value), currency
return value, currency
except ValueError:
return value, None
def extract_transaction_details(text):
details = {}
transaction_currency = None # Default currency
# Use regex to extract key: value pairs (handles bold formatting if present)
detail_matches = re.findall(
r"-\s*\*{0,2}([\w\s]+)\*{0,2}:\s*([\w\s,.$%-]+?)(?:\s*[\n]|$)",
text,
re.DOTALL
)
for field, value in detail_matches:
field_key = field.strip().lower().replace(" ", "_")
parsed_value, detected_currency = parse_value(value)
if detected_currency and not transaction_currency:
transaction_currency = detected_currency
details[field_key] = parsed_value
if transaction_currency:
details["currency"] = transaction_currency
return details
def parse_ai_response(response_text):
data = {
"intent": None,
"transaction_type": None,
"details": {},
"created_at": datetime.now().isoformat()
}
intent_match = re.search(r"\*Intent\*:\s*(\w+)", response_text)
if intent_match:
data["intent"] = intent_match.group(1)
transaction_type_match = re.search(r"\*Transaction Type\*:\s*(\w+)", response_text)
if transaction_type_match:
data["transaction_type"] = transaction_type_match.group(1)
data["details"] = extract_transaction_details(response_text)
return data
def parse_multiple_transactions(response_text):
transactions = []
# Split the response into sections by "Transaction <number>:"
transaction_sections = re.split(r"Transaction \d+:", response_text, flags=re.IGNORECASE)
transaction_sections = [section.strip() for section in transaction_sections if section.strip()]
# If the first section does not contain transaction info, remove it
if transaction_sections and not re.search(r"\*Transaction Type\*", transaction_sections[0], re.IGNORECASE):
transaction_sections.pop(0)
intent_match = re.search(r"\*Intent\*:\s*(\w+)", response_text)
intent = intent_match.group(1) if intent_match else None
for section in transaction_sections:
data = {
"intent": intent,
"transaction_type": None,
"details": {},
"created_at": datetime.now().isoformat()
}
transaction_type_match = re.search(r"\*Transaction Type\*:\s*(\w+)", section)
if transaction_type_match:
data["transaction_type"] = transaction_type_match.group(1)
data["details"] = extract_transaction_details(section)
transactions.append(data)
return transactions
def read_datalake(user_phone, user_question):
inventory_ref = db.collection("users").document(user_phone).collection("inventory")
sales_ref = db.collection("users").document(user_phone).collection("sales")
inventory_list = [doc.to_dict() for doc in inventory_ref.stream()]
sales_list = [doc.to_dict() for doc in sales_ref.stream()]
inventory_df = pd.DataFrame(inventory_list)
sales_df = pd.DataFrame(sales_list)
from pandasai import SmartDatalake
lake = SmartDatalake(
[inventory_df, sales_df],
config={
"llm": llm,
"custom_whitelisted_dependencies": ["ast"],
"response_parser": FlaskResponse,
"enable_cache": False,
"save_logs": False
}
)
response = lake.chat(user_question)
return response
def create_inventory(user_phone, transaction_data):
for transaction in transaction_data:
item_name = transaction['details'].get('item')
if item_name:
doc_ref = db.collection("users").document(user_phone).collection("inventory").document(item_name)
doc_ref.set(transaction)
return True
def create_sale(user_phone, transaction_data):
for transaction in transaction_data:
item_name = transaction['details'].get('item')
if not item_name:
continue
inventory = fetch_transaction(user_phone, item_name)
if inventory and inventory['details'].get('quantity') is not None:
new_stock = inventory['details'].get('quantity') - transaction['details'].get('quantity', 0)
inventory['details']['quantity'] = new_stock
inv_ref = db.collection("users").document(user_phone).collection("inventory").document(item_name)
inv_ref.set(inventory)
sale_ref = db.collection("users").document(user_phone).collection("sales").document(item_name)
sale_ref.set(transaction)
return True
def update_transaction(user_phone, transaction_id, update_data):
doc_ref = db.collection("users").document(user_phone).collection("transactions").document(transaction_id)
doc_ref.update(update_data)
return True
def fetch_transaction(user_phone, transaction_id=None):
if transaction_id:
doc_ref = db.collection("users").document(user_phone).collection("inventory").document(transaction_id)
transaction = doc_ref.get()
if transaction.exists:
return transaction.to_dict()
return None
else:
collection_ref = db.collection("users").document(user_phone).collection("inventory")
transactions = [doc.to_dict() for doc in collection_ref.stream()]
return transactions
def delete_transaction(user_phone, transaction_data):
# Assume transaction id defaults to the item name in transaction details
transaction_id = transaction_data[0]['details'].get('item')
transaction_type = transaction_data[0]['transaction_type']
if transaction_type:
transaction_type = transaction_type.lower()
else:
transaction_type = "inventory"
doc_ref = db.collection("users").document(user_phone).collection(transaction_type).document(transaction_id)
item_doc = doc_ref.get()
if item_doc.exists:
doc_ref.delete()
return True
else:
return False
def persist_temporary_transaction(transactions, mobile):
temp_ref = db.collection("users").document(mobile).collection("temp_transactions").document('pending-user-action')
data = {
"transactions": transactions,
"status": "pending",
"created_at": datetime.now().isoformat()
}
temp_ref.set(data)
return True
def process_intent(parsed_trans_data, mobile):
"""
Process the detected intent and handle transactions accordingly.
- For 'create', it differentiates between purchase/inventory and sale.
- For 'delete', it attempts to delete the transaction.
- Returns a message indicating the result.
"""
intent = parsed_trans_data[0]['intent'].lower() if parsed_trans_data and parsed_trans_data[0].get('intent') else None
trans_type = parsed_trans_data[0]['transaction_type'].lower() if parsed_trans_data and parsed_trans_data[0].get('transaction_type') else None
if intent == 'create':
if trans_type in ('purchase', 'purchases', 'inventory'):
if create_inventory(mobile, parsed_trans_data):
firestore_msg = "Transaction recorded successfully!"
else:
firestore_msg = "Sorry, could not record transaction!"
elif trans_type in ('sale', 'sales'):
if create_sale(mobile, parsed_trans_data):
firestore_msg = "Transaction recorded successfully!"
else:
firestore_msg = "Sorry, could not record transaction!"
else:
firestore_msg = f"Transaction type '{trans_type}' is not supported for create operations."
elif intent == 'update':
# Update logic would be implemented here.
firestore_msg = "Update operation not implemented yet."
elif intent == 'delete':
item = parsed_trans_data[0]['details'].get('item', 'item')
item_deleted = delete_transaction(mobile, parsed_trans_data)
if item_deleted:
firestore_msg = f"You successfully deleted {item} from {trans_type}!"
else:
firestore_msg = f"Sorry, could not delete {item} from {trans_type}!"
else:
firestore_msg = f"The detected intent, {intent}, is not currently supported!"
return firestore_msg