from fastapi import FastAPI, Request from fastapi.responses import HTMLResponse import threading import asyncio import mysql.connector import json import logging import pandas as pd from llama_cpp import Llama from transformers import pipeline import os app = FastAPI() # Configure logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) # Email and database configuration DB_CONFIG = os.getenv('db') # System prompt for LLM prompt = os.getenv('prompt') # Function to insert extracted shipment details into MySQL database def insert_data(extracted_details): try: mydb = mysql.connector.connect(**DB_CONFIG) cursor = mydb.cursor() # Skip insertion if all required fields are empty required_fields = ['origin', 'destination', 'expected_shipment_datetime', 'types_of_service', 'warehouse', 'description', 'quantities', 'carrier_details'] if all(extracted_details.get(field) in [None, ""] for field in required_fields): logger.info("Skipping insertion: All extracted values are empty.") return sql = """ INSERT INTO shipment_details ( origin, destination, expected_shipment_datetime, types_of_service, warehouse, description, quantities, carrier_details ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s) """ values = ( extracted_details.get('origin'), extracted_details.get('destination'), extracted_details.get('expected_shipment_datetime'), extracted_details.get('types_of_service'), extracted_details.get('warehouse'), extracted_details.get('description'), extracted_details.get('quantities'), extracted_details.get('carrier_details') ) cursor.execute(sql, values) mydb.commit() logger.info("Data inserted successfully.") except mysql.connector.Error as db_err: logger.error(f"Database error: {db_err}") except Exception as ex: logger.error(f"Error inserting data: {ex}") # Function to read and process emails def read_email(): logger.info("Loading Llama model...") llm = Llama.from_pretrained( repo_id="microsoft/Phi-3-mini-4k-instruct-gguf", filename="Phi-3-mini-4k-instruct-fp16.gguf", n_ctx=2048 ) logger.info("Llama model loaded.") logger.info("Reading emails from CSV...") df = pd.read_csv('./emails.csv') for i in df['Body']: logger.info(f"Processing email: {i}") output = llm( f"<|system|>\n{prompt}<|end|><|user|>\n{i}<|end|>\n<|assistant|>", max_tokens=256, stop=["<|end|>"], echo=False) logger.info("Extracting details...") t = output['choices'][0]['text'] logger.info('the model output : \n',t) extracted_details = json.loads(t[t.find('{'):t.find('}') + 1].replace("'", '"')) extracted_details = {key.lower().replace(" ", "_"): value for key, value in extracted_details.items()} # Add meta data placeholders meta_data = { 'sender': None, 'receiver': None, 'cc': None, 'bcc': None, 'subject': None } extracted_details.update(meta_data) logger.info(f"Full extracted data: {extracted_details}") insert_data(extracted_details) # Global variable to control the email processing loop running = False # HTML content for the web interface html_content = """ Email Processing

Email Processing Status: {{ status }}

""" # Function to process emails in a loop asynchronously async def email_processing_loop(): global running logger.info("Starting email processing loop...") while running: logger.info("Processing emails...") read_email() await asyncio.sleep(10) # Non-blocking delay for the loop # Endpoint to display the current email processor status @app.get("/", response_class=HTMLResponse) async def home(): global running print(os.getenv('db')) status = "Running" if running else "Stopped" return HTMLResponse(content=html_content.replace("{{ status }}", status), status_code=200) # Endpoint to start the email processing loop @app.post("/start") async def start_email_loop(): global running if not running: running = True asyncio.ensure_future(email_processing_loop()) logger.info("Email processing loop started.") return "Running" else: return "Already running" # Endpoint to stop the email processing loop @app.post("/stop") async def stop_email_loop(): global running if running: running = False logger.info("Email processing loop stopped.") return "Stopped" else: return "Already stopped" if __name__ == "__main__": logger.info("Starting FastAPI server...") import uvicorn uvicorn.run(app, host="0.0.0.0", port=8000)