from fastapi import FastAPI, Request from fastapi.responses import JSONResponse import httpx from telegram import Update from telegram.ext import ApplicationBuilder, CommandHandler, ContextTypes import os import logging # from transformers import pipeline from huggingface_hub import InferenceClient, login import langid # Configure logging logging.basicConfig(format="%(asctime)s - %(levelname)s - %(message)s", level=logging.INFO) logger = logging.getLogger(__name__) # Replace this with your Hugging Face Space URL HUGGING_FACE_SPACE_URL = "https://demaking-decision-helper-bot.hf.space" # Get Telegram bot token from environment variables TOKEN = os.getenv("TELEGRAM_BOT_TOKEN") if not TOKEN: raise ValueError("Missing Telegram Bot Token. Please set TELEGRAM_BOT_TOKEN environment variable.") # Get Hugging Face API token from environment variable HF_HUB_TOKEN = os.getenv("HUGGINGFACEHUB_API_TOKEN") if not HF_HUB_TOKEN: raise ValueError("Missing Hugging Face API token. Please set HUGGINGFACEHUB_API_TOKEN.") # Login and initialize the client login(token=HF_HUB_TOKEN) client = InferenceClient(api_key=HF_HUB_TOKEN) app = FastAPI() # Function to detect language def detect_language(user_input): try: lang, _ = langid.classify(user_input) return "hebrew" if lang == "he" else "english" if lang == "en" else "unsupported" except Exception as e: logging.error(f"Language detection error: {e}") return "unsupported" # Function to generate response def generate_response(text): language = detect_language(text) if language == "hebrew": content = "תענה בקצרה אבל תשתף את תהליך קבלת ההחלטות שלך, " + text model = "microsoft/Phi-3.5-mini-instruct" elif language == "english": content = "keep it short but tell your decision making process, " + text model = "mistralai/Mistral-Nemo-Instruct-2407" else: return "Sorry, I only support Hebrew and English." messages = [{"role": "user", "content": content}] completion = client.chat.completions.create( model=model, messages=messages, max_tokens=2048, temperature=0.5, top_p=0.7 ) return completion.choices[0].message.content @app.post("/generate_response") async def generate_text(request: Request): """ Endpoint to generate a response from the chat model. Expects a JSON with a "text" field. """ try: data = await request.json() text = data.get("text", "").strip() if not text: return {"error": "No text provided"} response = generate_response(text) return {"response": response} except Exception as e: logging.error(f"Error processing request: {e}") return {"error": "An unexpected error occurred."} @app.get("/") async def root(): """ Root endpoint to check that the API is running. """ return {"message": "Decision Helper API is running!"} # ------------------------- # Function to fetch response from FastAPI # ------------------------- async def call_hugging_face_space(input_data: str): """ Sends a POST request to the FastAPI API with the user's imput and returns the JSON response. """ async with httpx.AsyncClient(timeout=45.0) as client: try: response = await client.post(HUGGING_FACE_SPACE_URL, json={"input": input_data}) response.raise_for_status() # Raise exception for HTTP 4XX/5XX errors return response.json() except httpx.HTTPStatusError as e: logger.error(f"HTTP Error: {e.response.status_code} - {e.response.text}") return {"response": "Error: API returned an error."} except httpx.RequestError as e: logger.error(f"Request Error: {e}") return {"response": "Error: Request Error. Could not reach API."} except httpx.ConnectError as e: logger.error(f"Connection error: {e}") return {"error": "Could not connect to the Hugging Face Space"} except Exception as e: logger.error(f"Unexpected Error: {e}") return {"response": "Error: Unexpected error occurred."} @app.post("/webhook/{token}") async def webhook(token: str, request: Request): if token != TOKEN: logger.error(f"Tokens doesn't match. {e}") return JSONResponse(status_code=403, content={"message": "Forbidden"}) update = Update.de_json(await request.json(), None) message_text = update.message.text result = await call_hugging_face_space(message_text) return JSONResponse(content=result) def start_telegram_bot(): application = ApplicationBuilder().token(TOKEN).build() # Set up a command handler async def start(update: Update, context: ContextTypes.DEFAULT_TYPE): await update.message.reply_text("Hello! Tell me your decision-making issue, and I'll try to help.") logger.info("Start command received.") async def handle_message(update: Update, context: ContextTypes.DEFAULT_TYPE): user_text = update.message.text logger.info(f"User message: {user_text}") # Send the user text to the FastAPI server and get the response. result = await call_hugging_face_space(user_text) response_text = result.get("response", "Error generating response.") logger.info(f"API Response: {response_text}") await update.message.reply_text(response_text) application.add_handler(CommandHandler("start", start)) application.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, handle_message)) # Start the bot application.run_polling() if __name__ == "__main__": import threading # Start the Telegram bot in a separate thread threading.Thread(target=start_telegram_bot).start() # Start the FastAPI app import uvicorn uvicorn.run(app, host="0.0.0.0", port=7860)