|
import re |
|
import os |
|
from datetime import datetime |
|
import openai |
|
from google.cloud import firestore |
|
from dotenv import load_dotenv |
|
import pandas as pd |
|
import ast |
|
import json |
|
from pandasai.responses.response_parser import ResponseParser |
|
|
|
load_dotenv() |
|
db = firestore.Client.from_service_account_json("firestore-key.json") |
|
from langchain_google_genai import ChatGoogleGenerativeAI |
|
|
|
from langchain_community.chat_models.sambanova import ChatSambaNovaCloud |
|
import google.generativeai as genai |
|
|
|
llm = ChatSambaNovaCloud( |
|
model="Meta-Llama-3.1-70B-Instruct", |
|
max_tokens=1024, |
|
temperature=0.7, |
|
top_k=1, |
|
top_p=0.01, |
|
) |
|
|
|
|
|
|
|
|
|
class FlaskResponse(ResponseParser): |
|
def __init__(self, context) -> None: |
|
super().__init__(context) |
|
|
|
def format_dataframe(self, result): |
|
return result['value'].to_html() |
|
|
|
def format_plot(self, result): |
|
|
|
try: |
|
|
|
img_path = result['value'] |
|
|
|
|
|
except ValueError: |
|
img_path = str(result['value']) |
|
print("value error!", img_path) |
|
|
|
print("response_class_path:", img_path) |
|
return img_path |
|
|
|
def format_other(self, result): |
|
return str(result['value']) |
|
|
|
def generateResponse(prompt, model='Meta-Llama-3.1-70B-Instruct'): |
|
|
|
relevant_info_template = """ |
|
Intent: The CRUD operation (create, read, update, delete) |
|
Transaction Type: e.g. Purchase, Sale, Inventory |
|
Details: A list of key detail fields extracted. |
|
""" |
|
sample_single_transaction_template = """ |
|
*Intent*: Create |
|
*Transaction Type*: Purchase |
|
*Details*: - Item: Car, - Quantity: 1, - Cost: 10000, etc. |
|
""" |
|
sample_multi_transaction_template = """ |
|
*Intent*: Create |
|
Transaction 1: |
|
*Transaction Type*: Purchase |
|
*Details*: - Item: Car, - Quantity: 1, etc. |
|
Transaction 2: |
|
*Transaction Type*: Sale |
|
*Details*: - Item: Chair, - Quantity: 2, etc. |
|
""" |
|
response = openai.OpenAI( |
|
api_key=os.environ.get("SAMBANOVA_API_KEY"), |
|
base_url="https://api.sambanova.ai/v1", |
|
).chat.completions.create( |
|
model=model, |
|
messages=[ |
|
{"role": "system", "content": |
|
f"You are a helpful assistant that classifies transactions. Format your output with these guidelines: {relevant_info_template} " |
|
f"Sample single transaction: {sample_single_transaction_template} " |
|
f"Sample multi-transaction: {sample_multi_transaction_template}"}, |
|
{"role": "user", "content": prompt} |
|
] |
|
) |
|
try: |
|
response_text = response.choices[0].message.content |
|
except Exception as e: |
|
print(f'An error occurred: {str(e)}') |
|
response_text = None |
|
return response_text |
|
|
|
def parse_value(value): |
|
value = value.strip() |
|
try: |
|
|
|
currency_match = re.search(r"([A-Z]{3}|\$|€|£)", value) |
|
currency = currency_match.group(1) if currency_match else None |
|
|
|
|
|
cleaned_value = re.sub(r"([A-Z]{3}|\$|€|£)", "", value).replace(",", "").strip() |
|
|
|
if "%" in cleaned_value: |
|
return float(cleaned_value.replace("%", "")), currency |
|
elif cleaned_value.replace(".", "", 1).isdigit(): |
|
return float(cleaned_value) if "." in cleaned_value else int(cleaned_value), currency |
|
return value, currency |
|
except ValueError: |
|
return value, None |
|
|
|
def extract_transaction_details(text): |
|
details = {} |
|
transaction_currency = None |
|
|
|
detail_matches = re.findall( |
|
r"-\s*\*{0,2}([\w\s]+)\*{0,2}:\s*([\w\s,.$%-]+?)(?:\s*[\n]|$)", |
|
text, |
|
re.DOTALL |
|
) |
|
for field, value in detail_matches: |
|
field_key = field.strip().lower().replace(" ", "_") |
|
parsed_value, detected_currency = parse_value(value) |
|
if detected_currency and not transaction_currency: |
|
transaction_currency = detected_currency |
|
details[field_key] = parsed_value |
|
if transaction_currency: |
|
details["currency"] = transaction_currency |
|
return details |
|
|
|
def parse_ai_response(response_text): |
|
data = { |
|
"intent": None, |
|
"transaction_type": None, |
|
"details": {}, |
|
"created_at": datetime.now().isoformat() |
|
} |
|
intent_match = re.search(r"\*Intent\*:\s*(\w+)", response_text) |
|
if intent_match: |
|
data["intent"] = intent_match.group(1) |
|
transaction_type_match = re.search(r"\*Transaction Type\*:\s*(\w+)", response_text) |
|
if transaction_type_match: |
|
data["transaction_type"] = transaction_type_match.group(1) |
|
data["details"] = extract_transaction_details(response_text) |
|
return data |
|
|
|
def parse_multiple_transactions(response_text): |
|
transactions = [] |
|
|
|
transaction_sections = re.split(r"Transaction \d+:", response_text, flags=re.IGNORECASE) |
|
transaction_sections = [section.strip() for section in transaction_sections if section.strip()] |
|
|
|
if transaction_sections and not re.search(r"\*Transaction Type\*", transaction_sections[0], re.IGNORECASE): |
|
transaction_sections.pop(0) |
|
intent_match = re.search(r"\*Intent\*:\s*(\w+)", response_text) |
|
intent = intent_match.group(1) if intent_match else None |
|
for section in transaction_sections: |
|
data = { |
|
"intent": intent, |
|
"transaction_type": None, |
|
"details": {}, |
|
"created_at": datetime.now().isoformat() |
|
} |
|
transaction_type_match = re.search(r"\*Transaction Type\*:\s*(\w+)", section) |
|
if transaction_type_match: |
|
data["transaction_type"] = transaction_type_match.group(1) |
|
data["details"] = extract_transaction_details(section) |
|
transactions.append(data) |
|
return transactions |
|
|
|
def read_datalake(user_phone, user_question): |
|
inventory_ref = db.collection("users").document(user_phone).collection("inventory") |
|
sales_ref = db.collection("users").document(user_phone).collection("sales") |
|
inventory_list = [doc.to_dict() for doc in inventory_ref.stream()] |
|
sales_list = [doc.to_dict() for doc in sales_ref.stream()] |
|
inventory_df = pd.DataFrame(inventory_list) |
|
sales_df = pd.DataFrame(sales_list) |
|
from pandasai import SmartDatalake |
|
lake = SmartDatalake( |
|
[inventory_df, sales_df], |
|
config={ |
|
"llm": llm, |
|
"custom_whitelisted_dependencies": ["ast"], |
|
"response_parser": FlaskResponse, |
|
"enable_cache": False, |
|
"save_logs": False |
|
} |
|
) |
|
response = lake.chat(user_question) |
|
return response |
|
|
|
def create_inventory(user_phone, transaction_data): |
|
for transaction in transaction_data: |
|
item_name = transaction['details'].get('item') |
|
if item_name: |
|
doc_ref = db.collection("users").document(user_phone).collection("inventory").document(item_name) |
|
doc_ref.set(transaction) |
|
return True |
|
|
|
def create_sale(user_phone, transaction_data): |
|
for transaction in transaction_data: |
|
item_name = transaction['details'].get('item') |
|
if not item_name: |
|
continue |
|
inventory = fetch_transaction(user_phone, item_name) |
|
if inventory and inventory['details'].get('quantity') is not None: |
|
new_stock = inventory['details'].get('quantity') - transaction['details'].get('quantity', 0) |
|
inventory['details']['quantity'] = new_stock |
|
inv_ref = db.collection("users").document(user_phone).collection("inventory").document(item_name) |
|
inv_ref.set(inventory) |
|
sale_ref = db.collection("users").document(user_phone).collection("sales").document(item_name) |
|
sale_ref.set(transaction) |
|
return True |
|
|
|
def update_transaction(user_phone, transaction_id, update_data): |
|
doc_ref = db.collection("users").document(user_phone).collection("transactions").document(transaction_id) |
|
doc_ref.update(update_data) |
|
return True |
|
|
|
def fetch_transaction(user_phone, transaction_id=None): |
|
if transaction_id: |
|
doc_ref = db.collection("users").document(user_phone).collection("inventory").document(transaction_id) |
|
transaction = doc_ref.get() |
|
if transaction.exists: |
|
return transaction.to_dict() |
|
return None |
|
else: |
|
collection_ref = db.collection("users").document(user_phone).collection("inventory") |
|
transactions = [doc.to_dict() for doc in collection_ref.stream()] |
|
return transactions |
|
|
|
def delete_transaction(user_phone, transaction_data): |
|
|
|
transaction_id = transaction_data[0]['details'].get('item') |
|
transaction_type = transaction_data[0]['transaction_type'] |
|
if transaction_type: |
|
transaction_type = transaction_type.lower() |
|
else: |
|
transaction_type = "inventory" |
|
doc_ref = db.collection("users").document(user_phone).collection(transaction_type).document(transaction_id) |
|
item_doc = doc_ref.get() |
|
if item_doc.exists: |
|
doc_ref.delete() |
|
return True |
|
else: |
|
return False |
|
|
|
def persist_temporary_transaction(transactions, mobile): |
|
temp_ref = db.collection("users").document(mobile).collection("temp_transactions").document('pending-user-action') |
|
data = { |
|
"transactions": transactions, |
|
"status": "pending", |
|
"created_at": datetime.now().isoformat() |
|
} |
|
temp_ref.set(data) |
|
return True |
|
|
|
def process_intent(parsed_trans_data, mobile): |
|
""" |
|
Process the detected intent and handle transactions accordingly. |
|
- For 'create', it differentiates between purchase/inventory and sale. |
|
- For 'delete', it attempts to delete the transaction. |
|
- Returns a message indicating the result. |
|
""" |
|
intent = parsed_trans_data[0]['intent'].lower() if parsed_trans_data and parsed_trans_data[0].get('intent') else None |
|
trans_type = parsed_trans_data[0]['transaction_type'].lower() if parsed_trans_data and parsed_trans_data[0].get('transaction_type') else None |
|
|
|
if intent == 'create': |
|
if trans_type in ('purchase', 'purchases', 'inventory'): |
|
if create_inventory(mobile, parsed_trans_data): |
|
firestore_msg = "Transaction recorded successfully!" |
|
else: |
|
firestore_msg = "Sorry, could not record transaction!" |
|
elif trans_type in ('sale', 'sales'): |
|
if create_sale(mobile, parsed_trans_data): |
|
firestore_msg = "Transaction recorded successfully!" |
|
else: |
|
firestore_msg = "Sorry, could not record transaction!" |
|
else: |
|
firestore_msg = f"Transaction type '{trans_type}' is not supported for create operations." |
|
elif intent == 'update': |
|
|
|
firestore_msg = "Update operation not implemented yet." |
|
elif intent == 'delete': |
|
item = parsed_trans_data[0]['details'].get('item', 'item') |
|
item_deleted = delete_transaction(mobile, parsed_trans_data) |
|
if item_deleted: |
|
firestore_msg = f"You successfully deleted {item} from {trans_type}!" |
|
else: |
|
firestore_msg = f"Sorry, could not delete {item} from {trans_type}!" |
|
else: |
|
firestore_msg = f"The detected intent, {intent}, is not currently supported!" |
|
return firestore_msg |