import os | |
import logging | |
import asyncio | |
import nest_asyncio | |
import httpx | |
from telegram import Update, Bot | |
from telegram.ext import ( | |
Application, | |
CommandHandler, | |
MessageHandler, | |
filters, | |
CallbackContext | |
) | |
# ------------------------- | |
# Configure logging | |
# ------------------------- | |
logging.basicConfig(format="%(asctime)s - %(levelname)s - %(message)s", level=logging.INFO) | |
logger = logging.getLogger(__name__) | |
# ------------------------- | |
# Environment variables | |
# ------------------------- | |
# Get Telegram bot token from environment variables | |
TOKEN = os.getenv("TELEGRAM_BOT_TOKEN") | |
if not TOKEN: | |
raise ValueError("Missing Telegram Bot Token. Please set TELEGRAM_BOT_TOKEN environment variable.") | |
# # Get the domain for webhook (publicly accessible, e.g., your-space.hf.space) | |
# WEBHOOK_DOMAIN = os.getenv("WEBHOOK_DOMAIN") | |
# if not WEBHOOK_DOMAIN: | |
# raise ValueError("Missing Webhook Domain. Please set WEBHOOK_DOMAIN environment variable.") | |
# # ------------------------- | |
# # Webhook configuration | |
# # ------------------------- | |
# # Define a unique webhook path using the bot token | |
# # WEBHOOK_PATH = f"/{TOKEN}" | |
# # Construct the full webhook URL (must be HTTPS as required by Telegram) | |
# WEBHOOK_URL = f"https://{WEBHOOK_DOMAIN}" | |
# ------------------------- | |
# API URL of the FastAPI server (running on Hugging Face) | |
# ------------------------- | |
API_URL = "https://demaking-decision-helper-bot.hf.space/generate_response" | |
# bot = Bot(token=TOKEN) | |
# ------------------------- | |
# Function to fetch response from FastAPI (unchanged) | |
# ------------------------- | |
async def fetch_response(user_text: str): | |
""" | |
Sends a POST request to the FastAPI API with the user's text and returns the JSON response. | |
""" | |
async with httpx.AsyncClient(timeout=45.0) as client: | |
try: | |
response = await client.post(API_URL, json={"text": user_text}) | |
response.raise_for_status() # Raise exception for HTTP 4XX/5XX errors | |
return response.json() | |
except httpx.HTTPStatusError as e: | |
logger.error(f"HTTP Error: {e.response.status_code} - {e.response.text}") | |
return {"response": "Error: API returned an error."} | |
except httpx.RequestError as e: | |
logger.error(f"Request Error: {e}") | |
return {"response": "Error: Could not reach API."} | |
except Exception as e: | |
logger.error(f"Unexpected Error: {e}") | |
return {"response": "Error: Unexpected error occurred."} | |
# ------------------------- | |
# Command handler for /start | |
# ------------------------- | |
async def start(update: Update, context: CallbackContext): | |
"""" | |
Handler for the /start command. | |
""" | |
# Respond to the /start command. | |
await update.message.reply_text("Hello! Tell me your decision-making issue, and I'll try to help.") | |
logger.info("Start command received.") | |
# ------------------------- | |
# Message handler for incoming text messages | |
# ------------------------- | |
async def handle_message(update: Update, context: CallbackContext): | |
""" | |
Handler for incoming messages. | |
Sends the user's message to the API and replies with the response. | |
""" | |
user_text = update.message.text | |
logger.info(f"User message: {user_text}") | |
# Send the user text to the FastAPI server and get the response. | |
result = await fetch_response(user_text) | |
response_text = result.get("response", "Error generating response.") | |
logger.info(f"API Response: {response_text}") | |
await update.message.reply_text(response_text) | |
# def webhook(): | |
# bot.remove_webhook() | |
# bot.set_webhook(url=WEBHOOL_URL + TOKEN) # ืืืืจ ืืช ื-Webhook | |
# print("webhook set: ", WEBHOOL_URL + TOKEN) | |
# return "!", 200 | |
# # ------------------------- | |
# # Set Telegram webhook | |
# # ------------------------- | |
# async def set_webhook(): | |
# #bot = Bot(token=TOKEN) | |
# #bot.delete_webhook() | |
# # This call will set the webhook to the given URL. | |
# PATH = WEBHOOK_URL + TOKEN | |
# try: | |
# await bot.set_webhook(url=PATH) | |
# logger.info(f"Webhook set successfully to: {WEBHOOK_URL}") | |
# print("bot webhook success") | |
# except Exception as e: | |
# logger.error(f"Failed to set webhook manually. Error: {e}") | |
# print(f"error setting bot webhook. Error: {e}") | |
# ------------------------- | |
# Delete Telegram webhook | |
# ------------------------- | |
# async def delete_webhook(): | |
# try: | |
# await bot.delete_webhook(drop_pending_updates=True) | |
# logger.info("Webhook deleted successfully") | |
# print("deleted webhook successfully") | |
# except Exception as e: | |
# logger.error(f"Failed to delete webhook manually. Error: {e}") | |
# print(f"error deleting bot webhook. Error: {e}") | |
# ------------------------- | |
# Set Telegram webhook | |
# ------------------------- | |
# async def set_webhook(): | |
# # bot = Bot(token=TOKEN) | |
# try: | |
# await bot.set_webhook(url=WEBHOOK_URL) # This call will set the webhook to the given URL. | |
# logger.info(f"Webhook set successfully to: {WEBHOOK_URL}") | |
# print("webhook set succesfully") | |
# except Exception as e: | |
# logger.error(f"Failed to set webhook. Error: {e}") | |
# print(f"error in setting bot webhook. Error: {e}") | |
# ------------------------- | |
# Main function to run the bot using Webhook mode | |
# ------------------------- | |
async def main(): | |
""" | |
Main function to run the Telegram Bot in polling mode. | |
""" | |
# await delete_webhook() | |
# await set_webhook() | |
# Build the Application with the Telegram Bot Token | |
application = Application.builder().token(TOKEN).build() | |
# Add command and message handlers | |
application.add_handler(CommandHandler("start", start)) | |
application.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, handle_message)) | |
# # Log: starting the bot in webhook mode. | |
# logger.info("Starting bot in webhook mode...") | |
logger.info("Starting bot in polling mode...") | |
print("Starting bot in polling mode...") | |
await application.run_polling() | |
# Run the application using webhook mode. | |
# The bot will listen on all interfaces (0.0.0.0) at the specified port. | |
# await application.run_webhook( | |
# listen="0.0.0.0", # Listen on all available interfaces | |
# port=7860, # Port to listen on | |
# # url_path=TELEGRAM_WEBHOOK, # The webhook path; here, we use the bot token | |
# webhook_url=WEBHOOK_URL # The webhook URL that Telegram will use to send updates | |
# ) | |
# ------------------------- | |
# Run the main function | |
# ------------------------- | |
if __name__ == "__main__": | |
# Apply nest_asyncio to support nested event loops if required. | |
nest_asyncio.apply() | |
try: | |
print("in try") | |
loop = asyncio.get_event_loop() | |
# Schedule the main() task | |
loop.create_task(main()) | |
loop.run_until_complete(main()) | |
except Exception as e: | |
logger.error(f"Error in main loop: {e}") | |
print(f"Error in main loop: {e}") | |
# try: | |
# asyncio.run(main()) | |
# except Exception as e: | |
# logger.error(f"Error in main loop: {e}") | |
# print(f"Error in main loop: {e}") | |
# Instead of asyncio.run(), which may try to close an already running loop, | |
# get the current loop and run main() until complete. | |
# loop = asyncio.get_event_loop() | |
# # loop.run_until_complete(main()) | |
# try: | |
# loop.run_until_complete(main()) | |
# except Exception as e: | |
# logger.error(f"Error in main loop: {e}") | |
# print(f"Error in main loop: {e}") | |
# finally: | |
# await bot.shutdown() | |