File size: 2,700 Bytes
3ac9dae
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
927c472
 
 
3ac9dae
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
927c472
 
 
 
 
3ac9dae
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
7fe5844
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
import os
import ssl
import time
from threading import Thread

import requests
from telegram import Update
from telegram import __version__ as TG_VER
from telegram.ext import (
    Application,
    CommandHandler,
    ContextTypes,
    MessageHandler,
    filters,
)

from app_modules.init import app_init

llm_loader, qa_chain = app_init()

ctx = ssl.create_default_context()
ctx.set_ciphers("DEFAULT")

try:
    from telegram import __version_info__
except ImportError:
    __version_info__ = (0, 0, 0, 0, 0)  # type: ignore[assignment]

if __version_info__ < (20, 0, 0, "alpha", 1):
    raise RuntimeError(
        f"This example is not compatible with your current PTB version {TG_VER}. To view the "
        f"{TG_VER} version of this example, "
        f"visit https://docs.python-telegram-bot.org/en/v{TG_VER}/examples.html"
    )

TOKEN = os.getenv("TELEGRAM_API_TOKEN")


# Define a few command handlers. These usually take the two arguments update and
# context.
async def start_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
    """Send a message when the command /start is issued."""
    user = update.effective_user
    await update.message.reply_html(
        rf"Hi {user.mention_html()}! You are welcome to ask questions on anything!",
    )


async def help_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
    """Send a message when the command /help is issued."""
    await update.message.reply_text("Help!")


async def chat_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
    """Echo the user message."""
    print(update)
    tic = time.perf_counter()
    try:
        result = qa_chain.call_chain(
            {"question": update.message.text, "chat_history": []}, None
        )

        result = result["answer"]
        print(result)
        await update.message.reply_text(result[0:8192])
        toc = time.perf_counter()
        print(f"Response time in {toc - tic:0.4f} seconds")
    except Exception as e:
        print("error", e)


def start_telegram_bot() -> None:
    """Start the bot."""
    print("starting telegram bot ...")
    # Create the Application and pass it your bot's token.
    application = Application.builder().token(TOKEN).build()

    # on different commands - answer in Telegram
    application.add_handler(CommandHandler("start_command", start_command))
    application.add_handler(CommandHandler("help", help_command))

    # on non command i.e message - chat_command the message on Telegram
    application.add_handler(
        MessageHandler(filters.TEXT & ~filters.COMMAND, chat_command)
    )

    application.run_polling()


if __name__ == "__main__":
    start_telegram_bot()