import re import os from datetime import datetime import openai from google.cloud import firestore from dotenv import load_dotenv import pandas as pd import ast import json from pandasai.responses.response_parser import ResponseParser # Load environment variables and configure Firestore load_dotenv() db = firestore.Client.from_service_account_json("firestore-key.json") from langchain_google_genai import ChatGoogleGenerativeAI # Initialize the ChatSambaNovaCloud LLM client (using the latest model) from langchain_community.chat_models.sambanova import ChatSambaNovaCloud import google.generativeai as genai llm = ChatSambaNovaCloud( model="Meta-Llama-3.1-70B-Instruct", max_tokens=1024, temperature=0.7, top_k=1, top_p=0.01, ) # Response parser for formatting outputs from pandasai class FlaskResponse(ResponseParser): def __init__(self, context) -> None: super().__init__(context) def format_dataframe(self, result): return result['value'].to_html() def format_plot(self, result): # Save the plot using savefig try: img_path = result['value'] except ValueError: img_path = str(result['value']) print("value error!", img_path) print("response_class_path:", img_path) return img_path def format_other(self, result): return str(result['value']) def generateResponse(prompt, model='Meta-Llama-3.1-70B-Instruct'): # Templates for extracting transaction information relevant_info_template = """ Intent: The CRUD operation (create, read, update, delete) Transaction Type: e.g. Purchase, Sale, Inventory Details: A list of key detail fields extracted. """ sample_single_transaction_template = """ *Intent*: Create *Transaction Type*: Purchase *Details*: - Item: Car, - Quantity: 1, - Cost: 10000, etc. """ sample_multi_transaction_template = """ *Intent*: Create Transaction 1: *Transaction Type*: Purchase *Details*: - Item: Car, - Quantity: 1, etc. Transaction 2: *Transaction Type*: Sale *Details*: - Item: Chair, - Quantity: 2, etc. """ response = openai.OpenAI( api_key=os.environ.get("SAMBANOVA_API_KEY"), base_url="https://api.sambanova.ai/v1", ).chat.completions.create( model=model, messages=[ {"role": "system", "content": f"You are a helpful assistant that classifies transactions. Format your output with these guidelines: {relevant_info_template} " f"Sample single transaction: {sample_single_transaction_template} " f"Sample multi-transaction: {sample_multi_transaction_template}"}, {"role": "user", "content": prompt} ] ) try: response_text = response.choices[0].message.content except Exception as e: print(f'An error occurred: {str(e)}') response_text = None return response_text def parse_value(value): value = value.strip() try: # Try to detect currency symbols or codes currency_match = re.search(r"([A-Z]{3}|\$|€|£)", value) currency = currency_match.group(1) if currency_match else None # Remove currency symbols/codes for numeric conversion cleaned_value = re.sub(r"([A-Z]{3}|\$|€|£)", "", value).replace(",", "").strip() if "%" in cleaned_value: return float(cleaned_value.replace("%", "")), currency elif cleaned_value.replace(".", "", 1).isdigit(): return float(cleaned_value) if "." in cleaned_value else int(cleaned_value), currency return value, currency except ValueError: return value, None def extract_transaction_details(text): details = {} transaction_currency = None # Default currency # Use regex to extract key: value pairs (handles bold formatting if present) detail_matches = re.findall( r"-\s*\*{0,2}([\w\s]+)\*{0,2}:\s*([\w\s,.$%-]+?)(?:\s*[\n]|$)", text, re.DOTALL ) for field, value in detail_matches: field_key = field.strip().lower().replace(" ", "_") parsed_value, detected_currency = parse_value(value) if detected_currency and not transaction_currency: transaction_currency = detected_currency details[field_key] = parsed_value if transaction_currency: details["currency"] = transaction_currency return details def parse_ai_response(response_text): data = { "intent": None, "transaction_type": None, "details": {}, "created_at": datetime.now().isoformat() } intent_match = re.search(r"\*Intent\*:\s*(\w+)", response_text) if intent_match: data["intent"] = intent_match.group(1) transaction_type_match = re.search(r"\*Transaction Type\*:\s*(\w+)", response_text) if transaction_type_match: data["transaction_type"] = transaction_type_match.group(1) data["details"] = extract_transaction_details(response_text) return data def parse_multiple_transactions(response_text): transactions = [] # Split the response into sections by "Transaction :" transaction_sections = re.split(r"Transaction \d+:", response_text, flags=re.IGNORECASE) transaction_sections = [section.strip() for section in transaction_sections if section.strip()] # If the first section does not contain transaction info, remove it if transaction_sections and not re.search(r"\*Transaction Type\*", transaction_sections[0], re.IGNORECASE): transaction_sections.pop(0) intent_match = re.search(r"\*Intent\*:\s*(\w+)", response_text) intent = intent_match.group(1) if intent_match else None for section in transaction_sections: data = { "intent": intent, "transaction_type": None, "details": {}, "created_at": datetime.now().isoformat() } transaction_type_match = re.search(r"\*Transaction Type\*:\s*(\w+)", section) if transaction_type_match: data["transaction_type"] = transaction_type_match.group(1) data["details"] = extract_transaction_details(section) transactions.append(data) return transactions def read_datalake(user_phone, user_question): inventory_ref = db.collection("users").document(user_phone).collection("inventory") sales_ref = db.collection("users").document(user_phone).collection("sales") inventory_list = [doc.to_dict() for doc in inventory_ref.stream()] sales_list = [doc.to_dict() for doc in sales_ref.stream()] inventory_df = pd.DataFrame(inventory_list) sales_df = pd.DataFrame(sales_list) from pandasai import SmartDatalake lake = SmartDatalake( [inventory_df, sales_df], config={ "llm": llm, "custom_whitelisted_dependencies": ["ast"], "response_parser": FlaskResponse, "enable_cache": False, "save_logs": False } ) response = lake.chat(user_question) return response def create_inventory(user_phone, transaction_data): for transaction in transaction_data: item_name = transaction['details'].get('item') if item_name: doc_ref = db.collection("users").document(user_phone).collection("inventory").document(item_name) doc_ref.set(transaction) return True def create_sale(user_phone, transaction_data): for transaction in transaction_data: item_name = transaction['details'].get('item') if not item_name: continue inventory = fetch_transaction(user_phone, item_name) if inventory and inventory['details'].get('quantity') is not None: new_stock = inventory['details'].get('quantity') - transaction['details'].get('quantity', 0) inventory['details']['quantity'] = new_stock inv_ref = db.collection("users").document(user_phone).collection("inventory").document(item_name) inv_ref.set(inventory) sale_ref = db.collection("users").document(user_phone).collection("sales").document(item_name) sale_ref.set(transaction) return True def update_transaction(user_phone, transaction_id, update_data): doc_ref = db.collection("users").document(user_phone).collection("transactions").document(transaction_id) doc_ref.update(update_data) return True def fetch_transaction(user_phone, transaction_id=None): if transaction_id: doc_ref = db.collection("users").document(user_phone).collection("inventory").document(transaction_id) transaction = doc_ref.get() if transaction.exists: return transaction.to_dict() return None else: collection_ref = db.collection("users").document(user_phone).collection("inventory") transactions = [doc.to_dict() for doc in collection_ref.stream()] return transactions def delete_transaction(user_phone, transaction_data): # Assume transaction id defaults to the item name in transaction details transaction_id = transaction_data[0]['details'].get('item') transaction_type = transaction_data[0]['transaction_type'] if transaction_type: transaction_type = transaction_type.lower() else: transaction_type = "inventory" doc_ref = db.collection("users").document(user_phone).collection(transaction_type).document(transaction_id) item_doc = doc_ref.get() if item_doc.exists: doc_ref.delete() return True else: return False def persist_temporary_transaction(transactions, mobile): temp_ref = db.collection("users").document(mobile).collection("temp_transactions").document('pending-user-action') data = { "transactions": transactions, "status": "pending", "created_at": datetime.now().isoformat() } temp_ref.set(data) return True def process_intent(parsed_trans_data, mobile): """ Process the detected intent and handle transactions accordingly. - For 'create', it differentiates between purchase/inventory and sale. - For 'delete', it attempts to delete the transaction. - Returns a message indicating the result. """ intent = parsed_trans_data[0]['intent'].lower() if parsed_trans_data and parsed_trans_data[0].get('intent') else None trans_type = parsed_trans_data[0]['transaction_type'].lower() if parsed_trans_data and parsed_trans_data[0].get('transaction_type') else None if intent == 'create': if trans_type in ('purchase', 'purchases', 'inventory'): if create_inventory(mobile, parsed_trans_data): firestore_msg = "Transaction recorded successfully!" else: firestore_msg = "Sorry, could not record transaction!" elif trans_type in ('sale', 'sales'): if create_sale(mobile, parsed_trans_data): firestore_msg = "Transaction recorded successfully!" else: firestore_msg = "Sorry, could not record transaction!" else: firestore_msg = f"Transaction type '{trans_type}' is not supported for create operations." elif intent == 'update': # Update logic would be implemented here. firestore_msg = "Update operation not implemented yet." elif intent == 'delete': item = parsed_trans_data[0]['details'].get('item', 'item') item_deleted = delete_transaction(mobile, parsed_trans_data) if item_deleted: firestore_msg = f"You successfully deleted {item} from {trans_type}!" else: firestore_msg = f"Sorry, could not delete {item} from {trans_type}!" else: firestore_msg = f"The detected intent, {intent}, is not currently supported!" return firestore_msg