ariansyahdedy commited on
Commit
07fbc67
·
1 Parent(s): 79f04af

Add Gemnini Flash

Browse files
.env ADDED
@@ -0,0 +1,28 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ WHATSAPP_API_URL = "adsads"
2
+ API_URL = "adsads"
3
+ ACCESS_TOKEN = "EAAQNYsHYo2cBO30HZCtSkpRppS8ZA6Du2MV9PPFFepInIb2og5cjP1ZAZBJrJhHTlkmcrcN0BS6NZAqMltXhOZBzAyGdNaZCL6XtC6ZCdcug5JR5fZCdXVMzOBbuwPKPBwZABhFsWDIAORYaCI97ajvUxrZCSALNZAIH1E5hs2ZApETvWoVMiZCnoT5MqpyAZBoFFJ84K6vFgZDZD"
4
+
5
+ ENV="development"
6
+ usernameDb = 'hidh4125_admin'
7
+ password = 'Alberto471'
8
+ host = 'hidigi.asia'
9
+ port = '3306'
10
+ MySQLDb_name = 'hidh4125_speechRecognition'
11
+
12
+ mongodb_uri = "mongodb+srv://dedyariansyah:Haleluyah30@test.c5ul8at.mongodb.net/?retryWrites=true&w=majority&appName=Test"
13
+
14
+ DB_NAME='traveloka'
15
+ mongodb_name = "llm_wa_hf"
16
+ WHATSAPP_API_URL ='https://graph.facebook.com/v21.0/360447720489034/messages'
17
+ SECRET_KEY = "cf6352b5ac05bfc2761e2973da54c4c1751ddcb765c0036197d9799f79257296"
18
+
19
+ client = "http://localhost:5000"
20
+
21
+ fastapiusername = "PPR"
22
+ fastapipassword = "HiDigi"
23
+ MEDIA_UPLOAD_URL = "http://localhost:5000/upload"
24
+
25
+ GEMINI_API = "AIzaSyAJxmpiEGSDGAPHiJDbIuclq6EUaQCpPyg"
26
+ OPENAI_API = "sk-proj-gYAJTf4yUP55p9KBrQNMT3BlbkFJTT8TNjrp2qaZbbnqszwM"
27
+ ACCESS_TOKEN = "EAAQNYsHYo2cBO30HZCtSkpRppS8ZA6Du2MV9PPFFepInIb2og5cjP1ZAZBJrJhHTlkmcrcN0BS6NZAqMltXhOZBzAyGdNaZCL6XtC6ZCdcug5JR5fZCdXVMzOBbuwPKPBwZABhFsWDIAORYaCI97ajvUxrZCSALNZAIH1E5hs2ZApETvWoVMiZCnoT5MqpyAZBoFFJ84K6vFgZDZD"
28
+
app/core/config.py ADDED
@@ -0,0 +1,22 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+
2
+ # app/core/config.py
3
+ import os
4
+ from pydantic_settings import BaseSettings
5
+ # from dotenv import load_dotenv
6
+
7
+ # load_dotenv()
8
+
9
+ class Settings(BaseSettings):
10
+ MONGO_DETAILS: str = os.getenv("mongodb_uri")
11
+ MongoDB_NAME: str = "llm_wa_hf"
12
+ COLLECTION_NAMES: list = ["users", "transactions"]
13
+ ACCESS_TOKEN:str = os.getenv("ACCESS_TOKEN")
14
+ API_URL: str = os.getenv("API_URL")
15
+ MEDIA_UPLOAD_URL: str = os.getenv("MEDIA_UPLOAD_URL")
16
+ SECRET_KEY: str = os.getenv("SECRET_KEY")
17
+ ALGORITHM: str = "HS256"
18
+ ACCESS_TOKEN_EXPIRE_MINUTES: int = 100
19
+
20
+ settings = Settings()
21
+
22
+ print(f"SECRET_KEY: {settings.SECRET_KEY}") # Add this line to verify the SECRET_KEY
app/crud/transaction.py ADDED
@@ -0,0 +1,24 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from app.db.database import db
2
+ from app.schemas.user import TransactionResponse
3
+ from bson.objectid import ObjectId
4
+ from datetime import datetime
5
+
6
+ async def record_transaction(user_id: str, amount: float, description: str):
7
+ transaction = {
8
+ "user_id": ObjectId(user_id),
9
+ "amount": amount,
10
+ "description": description,
11
+ "timestamp": datetime.utcnow()
12
+ }
13
+ result = await db.transactions.insert_one(transaction)
14
+ transaction["_id"] = result.inserted_id
15
+ transaction["id"] = str(transaction.pop("_id"))
16
+ return transaction
17
+
18
+ async def get_user_transactions(user_id: str):
19
+ cursor = db.transactions.find({"user_id": ObjectId(user_id)})
20
+ transactions = []
21
+ async for transaction in cursor:
22
+ transaction["id"] = str(transaction.pop("_id"))
23
+ transactions.append(transaction)
24
+ return transactions
app/crud/user.py ADDED
@@ -0,0 +1,30 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from app.db.database import db
2
+ from app.schemas.user import UserCreate
3
+ from bson.objectid import ObjectId
4
+
5
+ async def create_user(user: UserCreate):
6
+ result = await db.users.insert_one(user.dict())
7
+ return str(result.inserted_id)
8
+
9
+ async def get_user_by_id(user_id: str):
10
+ user = await db.users.find_one({"_id": ObjectId(user_id)})
11
+ if user:
12
+ user["id"] = str(user.pop("_id"))
13
+ return user
14
+
15
+ async def update_user_credits(user_id: str, amount: float):
16
+ user = await db.users.find_one({"_id": ObjectId(user_id)})
17
+ if not user:
18
+ return None
19
+
20
+ new_remaining_credits = user["remaining_credits"] + amount
21
+ payment_status = new_remaining_credits > 0
22
+
23
+ await db.users.update_one(
24
+ {"_id": ObjectId(user_id)},
25
+ {"$set": {"remaining_credits": new_remaining_credits, "payment_status": payment_status}}
26
+ )
27
+
28
+ user["remaining_credits"] = new_remaining_credits
29
+ user["payment_status"] = payment_status
30
+ return user
app/db/database.py ADDED
@@ -0,0 +1,46 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # app/core/database.py
2
+ from motor.motor_asyncio import AsyncIOMotorClient
3
+ from app.core.config import settings
4
+ from pymongo import IndexModel, ASCENDING
5
+ import os
6
+ from dotenv import load_dotenv
7
+
8
+ load_dotenv()
9
+
10
+
11
+ client = AsyncIOMotorClient(settings.MONGO_DETAILS)
12
+ db = client[settings.MongoDB_NAME]
13
+
14
+ def get_database(db_name:str):
15
+ return client[db_name]
16
+
17
+ async def create_collection(db_name:str, collection_name:str):
18
+ database = get_database(db_name)
19
+ existing_collections = await database.list_collection_names()
20
+ if collection_name not in existing_collections:
21
+ await database.create_collection(collection_name)
22
+ else:
23
+ print(f"Collection '{collection_name}' already exists in database '{db_name}'")
24
+
25
+
26
+ async def list_collection_names(db_name: str):
27
+ database = get_database(db_name)
28
+ collection_names = await database.list_collection_names()
29
+ return collection_names
30
+
31
+ async def init_db():
32
+ print(f"Initializing database '{settings.MongoDB_NAME}'")
33
+ for collection_name in settings.COLLECTION_NAMES:
34
+ await create_collection(settings.MongoDB_NAME, collection_name)
35
+
36
+ collections = await list_collection_names(settings.MongoDB_NAME)
37
+
38
+ await create_indexes()
39
+ print(f"Collections in '{settings.MongoDB_NAME}': {collections}")
40
+
41
+ # Indexes for users and transactions
42
+ async def create_indexes():
43
+ user_index = IndexModel([("email", ASCENDING)], unique=True)
44
+ transaction_index = IndexModel([("user_id", ASCENDING), ("timestamp", ASCENDING)])
45
+ await db.users.create_indexes([user_index])
46
+ await db.transactions.create_indexes([transaction_index])
app/endpoints/v1/users.py ADDED
@@ -0,0 +1,48 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from fastapi import APIRouter, HTTPException, Depends
2
+ from app.schemas.user import UserCreate, UserResponse, TransactionResponse
3
+ from app.crud.user import create_user, get_user_by_id, update_user_credits
4
+ from app.crud.transaction import record_transaction, get_user_transactions
5
+
6
+ router = APIRouter()
7
+
8
+ @router.post("/", response_model=UserResponse)
9
+ async def create_new_user(user: UserCreate):
10
+ user_id = await create_user(user)
11
+ user_response = await get_user_by_id(user_id)
12
+ return user_response
13
+
14
+ @router.get("/{user_id}", response_model=UserResponse)
15
+ async def get_user(user_id: str):
16
+ user = await get_user_by_id(user_id)
17
+ if not user:
18
+ raise HTTPException(status_code=404, detail="User not found")
19
+ return user
20
+
21
+ @router.post("/{user_id}/top-up", response_model=TransactionResponse)
22
+ async def top_up_credits(user_id: str, amount: float):
23
+ if amount <= 0:
24
+ raise HTTPException(status_code=400, detail="Amount must be greater than 0")
25
+
26
+ transaction = await record_transaction(user_id, amount, "Top-up credits")
27
+ user = await update_user_credits(user_id, amount)
28
+ if not user:
29
+ raise HTTPException(status_code=404, detail="User not found")
30
+ return transaction
31
+
32
+ @router.post("/{user_id}/deduct", response_model=TransactionResponse)
33
+ async def deduct_credits(user_id: str, amount: float):
34
+ if amount <= 0:
35
+ raise HTTPException(status_code=400, detail="Amount must be greater than 0")
36
+
37
+ transaction = await record_transaction(user_id, -amount, "Deduct credits")
38
+ user = await update_user_credits(user_id, -amount)
39
+ if not user:
40
+ raise HTTPException(status_code=404, detail="User not found")
41
+ return transaction
42
+
43
+ @router.get("/{user_id}/transactions", response_model=list[TransactionResponse])
44
+ async def list_transactions(user_id: str):
45
+ transactions = await get_user_transactions(user_id)
46
+ if not transactions:
47
+ raise HTTPException(status_code=404, detail="No transactions found")
48
+ return transactions
app/main.py CHANGED
@@ -7,121 +7,166 @@ from app.services.message import generate_reply, send_reply, process_message_wit
7
  import logging
8
  from datetime import datetime
9
  import time
 
 
 
 
10
 
11
  # Configure logging
12
  logging.basicConfig(
13
  level=logging.INFO,
14
  format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
15
  )
 
16
  logger = logging.getLogger(__name__)
17
  # Initialize FastAPI app
18
- app = FastAPI()
19
-
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
20
  # In-memory cache with timestamp cleanup
21
- class MessageCache:
22
- def __init__(self, max_age_hours: int = 24):
23
- self.messages: Dict[str, float] = {}
24
- self.max_age_seconds = max_age_hours * 3600
25
-
26
- def add(self, message_id: str) -> None:
27
- self.cleanup()
28
- self.messages[message_id] = time.time()
29
-
30
- def exists(self, message_id: str) -> bool:
31
- self.cleanup()
32
- return message_id in self.messages
33
-
34
- def cleanup(self) -> None:
35
- current_time = time.time()
36
- self.messages = {
37
- msg_id: timestamp
38
- for msg_id, timestamp in self.messages.items()
39
- if current_time - timestamp < self.max_age_seconds
40
- }
41
-
42
- message_cache = MessageCache()
43
-
44
- @app.post("/webhook")
45
- async def webhook(request: Request):
46
- request_id = f"req_{int(time.time()*1000)}"
47
- logger.info(f"Processing webhook request {request_id}")
48
- payload = await request.json()
49
-
50
- processed_count = 0
51
- error_count = 0
52
- results = []
53
-
54
- entries = payload.get("entry", [])
55
-
56
- for entry in entries:
57
- entry_id = entry.get("id")
58
- logger.info(f"Processing entry_id: {entry_id}")
59
-
60
- changes = entry.get("changes", [])
61
- for change in changes:
62
- messages = change.get("value", {}).get("messages", [])
63
- for message in messages:
64
- message_id = message.get("id")
65
- timestamp = message.get("timestamp")
66
- content = message.get("text", {}).get("body")
67
- sender_id = message.get("from")
68
-
69
- # Deduplicate messages based on message_id
70
- if message_cache.exists(message_id):
71
- logger.info(f"Duplicate message detected and skipped: {message_id}")
72
- continue
73
-
74
- try:
75
- # Process message with retry logic
76
- result = await process_message_with_retry(
77
- sender_id,
78
- content,
79
- timestamp
80
- )
81
- # Add the message ID to the cache
82
- message_cache.add(message_id)
83
- processed_count += 1
84
- results.append(result)
85
- except Exception as e:
86
- error_count += 1
87
- logger.error(
88
- f"Failed to process message {message_id}: {str(e)}",
89
- exc_info=True
90
- )
91
- results.append({
92
- "status": "error",
93
- "message_id": message_id,
94
- "error": str(e)
95
- })
96
-
97
- response_data = {
98
- "request_id": request_id,
99
- "processed": processed_count,
100
- "errors": error_count,
101
- "results": results
102
- }
103
-
104
- logger.info(
105
- f"Webhook processing completed - "
106
- f"Processed: {processed_count}, Errors: {error_count}"
107
- )
108
-
109
- return JSONResponse(
110
- content=response_data,
111
- status_code=status.HTTP_200_OK
112
- )
113
-
114
-
115
-
116
- @app.get("/webhook")
117
- async def verify_webhook(request: Request):
118
- mode = request.query_params.get('hub.mode')
119
- token = request.query_params.get('hub.verify_token')
120
- challenge = request.query_params.get('hub.challenge')
121
-
122
- # Replace 'your_verification_token' with the token you set in Facebook Business Manager
123
- if mode == 'subscribe' and token == 'test':
124
- # Return the challenge as plain text
125
- return Response(content=challenge, media_type="text/plain")
126
- else:
127
- raise HTTPException(status_code=403, detail="Verification failed")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
7
  import logging
8
  from datetime import datetime
9
  import time
10
+ from contextlib import asynccontextmanager
11
+ from app.db.database import create_indexes, init_db
12
+ from app.endpoints.v1 import users
13
+ from app.services.webhook_handler import webhook, verify_webhook
14
 
15
  # Configure logging
16
  logging.basicConfig(
17
  level=logging.INFO,
18
  format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
19
  )
20
+
21
  logger = logging.getLogger(__name__)
22
  # Initialize FastAPI app
23
+ @asynccontextmanager
24
+ async def lifespan(app: FastAPI):
25
+
26
+
27
+ try:
28
+ await init_db()
29
+ logger.info("Connected to the MongoDB database!")
30
+ # collections = app.database.list_collection_names()
31
+ # print(f"Collections in {db_name}: {collections}")
32
+ yield
33
+ except Exception as e:
34
+ logger.error(e)
35
+
36
+ app = FastAPI(lifespan=lifespan)
37
+
38
+
39
+ app.include_router(users.router, prefix="/users", tags=["Users"])
40
+
41
+ # Register webhook routes
42
+ app.post("/webhook")(webhook)
43
+ app.get("/webhook")(verify_webhook)
44
  # In-memory cache with timestamp cleanup
45
+ # class MessageCache:
46
+ # def __init__(self, max_age_hours: int = 24):
47
+ # self.messages: Dict[str, float] = {}
48
+ # self.max_age_seconds = max_age_hours * 3600
49
+
50
+ # def add(self, message_id: str) -> None:
51
+ # self.cleanup()
52
+ # self.messages[message_id] = time.time()
53
+
54
+ # def exists(self, message_id: str) -> bool:
55
+ # self.cleanup()
56
+ # return message_id in self.messages
57
+
58
+ # def cleanup(self) -> None:
59
+ # current_time = time.time()
60
+ # self.messages = {
61
+ # msg_id: timestamp
62
+ # for msg_id, timestamp in self.messages.items()
63
+ # if current_time - timestamp < self.max_age_seconds
64
+ # }
65
+
66
+ # message_cache = MessageCache()
67
+ # user_chats = {}
68
+
69
+ # @app.post("/webhook")
70
+ # async def webhook(request: Request):
71
+ # request_id = f"req_{int(time.time()*1000)}"
72
+ # logger.info(f"Processing webhook request {request_id}")
73
+ # payload = await request.json()
74
+
75
+ # print("Webhook received:", payload)
76
+
77
+ # processed_count = 0
78
+ # error_count = 0
79
+ # results = []
80
+
81
+ # entries = payload.get("entry", [])
82
+
83
+ # for entry in entries:
84
+ # entry_id = entry.get("id")
85
+ # logger.info(f"Processing entry_id: {entry_id}")
86
+
87
+ # changes = entry.get("changes", [])
88
+ # for change in changes:
89
+ # messages = change.get("value", {}).get("messages", [])
90
+ # for message in messages:
91
+ # message_id = message.get("id")
92
+ # timestamp = message.get("timestamp")
93
+ # content = message.get("text", {}).get("body")
94
+ # sender_id = message.get("from")
95
+ # msg_type = message.get('type')
96
+
97
+ # # Deduplicate messages based on message_id
98
+ # if message_cache.exists(message_id):
99
+ # logger.info(f"Duplicate message detected and skipped: {message_id}")
100
+ # continue
101
+
102
+ # if sender_id not in user_chats:
103
+ # user_chats[sender_id] = []
104
+ # user_chats[sender_id].append({
105
+
106
+ # "role": "user",
107
+ # "content": content
108
+ # })
109
+
110
+ # history = "".join([f"{item['role']}: {item['content']}\n" for item in user_chats[sender_id]])
111
+ # print(f"history: {history}")
112
+
113
+ # try:
114
+ # # Process message with retry logic
115
+ # result = await process_message_with_retry(
116
+ # sender_id,content,
117
+ # history,
118
+ # timestamp,
119
+ # )
120
+ # user_chats[sender_id].append({
121
+
122
+ # "role": "assistant",
123
+ # "content": result
124
+ # })
125
+
126
+ # # Add the message ID to the cache
127
+ # message_cache.add(message_id)
128
+ # processed_count += 1
129
+ # results.append(result)
130
+ # except Exception as e:
131
+ # error_count += 1
132
+ # logger.error(
133
+ # f"Failed to process message {message_id}: {str(e)}",
134
+ # exc_info=True
135
+ # )
136
+ # results.append({
137
+ # "status": "error",
138
+ # "message_id": message_id,
139
+ # "error": str(e)
140
+ # })
141
+
142
+ # response_data = {
143
+ # "request_id": request_id,
144
+ # "processed": processed_count,
145
+ # "errors": error_count,
146
+ # "results": results
147
+ # }
148
+
149
+ # logger.info(
150
+ # f"Webhook processing completed - "
151
+ # f"Processed: {processed_count}, Errors: {error_count}"
152
+ # )
153
+
154
+ # return JSONResponse(
155
+ # content=response_data,
156
+ # status_code=status.HTTP_200_OK
157
+ # )
158
+
159
+
160
+
161
+ # @app.get("/webhook")
162
+ # async def verify_webhook(request: Request):
163
+ # mode = request.query_params.get('hub.mode')
164
+ # token = request.query_params.get('hub.verify_token')
165
+ # challenge = request.query_params.get('hub.challenge')
166
+
167
+ # # Replace 'your_verification_token' with the token you set in Facebook Business Manager
168
+ # if mode == 'subscribe' and token == 'test':
169
+ # # Return the challenge as plain text
170
+ # return Response(content=challenge, media_type="text/plain")
171
+ # else:
172
+ # raise HTTPException(status_code=403, detail="Verification failed")
app/models/user.py ADDED
File without changes
app/schemas/user.py ADDED
@@ -0,0 +1,33 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from pydantic import BaseModel, EmailStr
2
+ from datetime import datetime
3
+ from bson import ObjectId
4
+ from typing import Optional, List
5
+
6
+ class UserBase(BaseModel):
7
+ username: str
8
+ phone_number: str
9
+ email: EmailStr
10
+
11
+ class UserCreate(UserBase):
12
+ credits: float = 0.0
13
+ remaining_credits: float = 0.0
14
+ payment_status: bool = False
15
+
16
+ class UserResponse(UserBase):
17
+ id: str
18
+ credits: float
19
+ remaining_credits: float
20
+ payment_status: bool
21
+
22
+ class Config:
23
+ orm_mode = True
24
+
25
+ class TransactionResponse(BaseModel):
26
+ id: str
27
+ user_id: str
28
+ amount: float
29
+ description: str
30
+ timestamp: datetime
31
+
32
+ class Config:
33
+ orm_mode = True
app/services/cache.py ADDED
@@ -0,0 +1,23 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import time
2
+ from typing import Dict
3
+
4
+ class MessageCache:
5
+ def __init__(self, max_age_hours: int = 24):
6
+ self.messages: Dict[str, float] = {}
7
+ self.max_age_seconds = max_age_hours * 3600
8
+
9
+ def add(self, message_id: str) -> None:
10
+ self.cleanup()
11
+ self.messages[message_id] = time.time()
12
+
13
+ def exists(self, message_id: str) -> bool:
14
+ self.cleanup()
15
+ return message_id in self.messages
16
+
17
+ def cleanup(self) -> None:
18
+ current_time = time.time()
19
+ self.messages = {
20
+ msg_id: timestamp
21
+ for msg_id, timestamp in self.messages.items()
22
+ if current_time - timestamp < self.max_age_seconds
23
+ }
app/services/download_media.py ADDED
@@ -0,0 +1,158 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import aiohttp
2
+ import asyncio
3
+ from typing import Optional
4
+
5
+ import aiohttp
6
+ import logging
7
+ from typing import Optional
8
+
9
+ logger = logging.getLogger(__name__)
10
+
11
+ import aiohttp
12
+ import aiofiles
13
+ import logging
14
+ import os
15
+ from typing import Optional
16
+
17
+ logger = logging.getLogger(__name__)
18
+
19
+ async def download_whatsapp_media(media_id: str, access_token: str, file_path: Optional[str] = None) -> Optional[str]:
20
+ """
21
+ Asynchronously download media from WhatsApp Cloud API using media ID and save it to a file.
22
+
23
+ Args:
24
+ media_id (str): The ID of the media to download.
25
+ access_token (str): Your WhatsApp access token.
26
+ file_path (Optional[str]): The path where the media file should be saved.
27
+
28
+ Returns:
29
+ str: The file path if the media is saved successfully.
30
+ None: If download fails.
31
+ """
32
+ timeout = aiohttp.ClientTimeout(total=30) # Set an appropriate timeout
33
+
34
+ async with aiohttp.ClientSession(timeout=timeout) as session:
35
+ # Step 1: Get the media URL
36
+ try:
37
+ url = f"https://graph.facebook.com/v21.0/{media_id}"
38
+ headers = {
39
+ "Authorization": f"Bearer {access_token}"
40
+ }
41
+ async with session.get(url, headers=headers) as response:
42
+ data = await response.json()
43
+ if response.status != 200:
44
+ logger.error(f"Failed to get media URL. Status: {response.status}, Response: {data}")
45
+ return None
46
+
47
+ media_url = data.get('url')
48
+ if not media_url:
49
+ logger.error(f"Failed to get media URL from response: {data}")
50
+ return None
51
+ except aiohttp.ClientError as e:
52
+ logger.error(f"Error getting media URL: {e}")
53
+ return None
54
+ except Exception as e:
55
+ logger.error(f"Unexpected error: {e}")
56
+ return None
57
+
58
+ # Step 2: Download the actual media
59
+ try:
60
+ headers = {
61
+ "Authorization": f"Bearer {access_token}"
62
+ }
63
+
64
+ async with session.get(media_url, headers=headers) as media_response:
65
+ if media_response.status != 200:
66
+ media_data = await media_response.text()
67
+ logger.error(f"Failed to download media. Status: {media_response.status}, Response: {media_data}")
68
+ return None
69
+
70
+ if file_path:
71
+ # Ensure the directory exists
72
+ os.makedirs("user_media", exist_ok=True)
73
+ file_path = os.path.join("user_media", file_path)
74
+
75
+ # Write the media content to the file asynchronously
76
+ async with aiofiles.open(file_path, 'wb') as f:
77
+ async for chunk in media_response.content.iter_chunked(1024):
78
+ await f.write(chunk)
79
+ return file_path
80
+ else:
81
+ logger.error("File path not provided for saving the media.")
82
+ return None
83
+ except aiohttp.ClientError as e:
84
+ logger.error(f"Error downloading media: {e}")
85
+ return None
86
+ except Exception as e:
87
+ logger.error(f"Error saving media to file: {e}")
88
+ return None
89
+
90
+ # async def download_whatsapp_media(media_id: str, access_token: str) -> Optional[bytes]:
91
+ # """
92
+ # Asynchronously download media from WhatsApp Cloud API using media ID
93
+
94
+ # Args:
95
+ # media_id (str): The ID of the media to download
96
+ # access_token (str): Your WhatsApp access token
97
+
98
+ # Returns:
99
+ # bytes: The media content if successful
100
+ # None: If download fails
101
+ # """
102
+ # headers = {
103
+ # "Authorization": f"Bearer {access_token}"
104
+ # }
105
+
106
+ # async with aiohttp.ClientSession() as session:
107
+ # try:
108
+ # # Step 1: Get the media URL
109
+ # url = f"https://graph.facebook.com/v21.0/{media_id}"
110
+ # async with session.get(url, headers=headers) as response:
111
+ # if response.status != 200:
112
+ # print(f"Failed to get media URL. Status: {response.status}")
113
+ # return None
114
+
115
+ # data = await response.json()
116
+ # media_url = data.get('url')
117
+
118
+ # if not media_url:
119
+ # print("Failed to get media URL from response")
120
+ # return None
121
+
122
+ # # Step 2: Download the actual media
123
+ # async with session.get(media_url, headers=headers) as media_response:
124
+ # if media_response.status != 200:
125
+ # print(f"Failed to download media. Status: {media_response.status}")
126
+ # return None
127
+
128
+ # return await media_response.read()
129
+
130
+ # # Step 3: Save the media to a file
131
+
132
+ # except aiohttp.ClientError as e:
133
+ # print(f"Error downloading media: {e}")
134
+ # return None
135
+
136
+ async def save_whatsapp_image(media_id: str, access_token: str, output_path: str) -> None:
137
+ """
138
+ Asynchronously download and save WhatsApp image to file
139
+
140
+ Args:
141
+ media_id (str): The ID of the media to download
142
+ access_token (str): Your WhatsApp access token
143
+ output_path (str): Path where to save the image
144
+ """
145
+ media_content = await download_whatsapp_media(media_id, access_token)
146
+
147
+ if media_content:
148
+ # File operations are blocking, so we use asyncio.to_thread for Python 3.9+
149
+ # For older Python versions, you might want to use loop.run_in_executor
150
+ await asyncio.to_thread(save_file, output_path, media_content)
151
+ print(f"Image saved to {output_path}")
152
+ else:
153
+ print("Failed to download image")
154
+
155
+ def save_file(path: str, content: bytes) -> None:
156
+ """Helper function to save file content"""
157
+ with open(path, 'wb') as f:
158
+ f.write(content)
app/services/message.py CHANGED
@@ -1,14 +1,25 @@
1
  import os
2
  import httpx
3
  from dotenv import load_dotenv
4
- from typing import Dict, Any
5
  from datetime import datetime
6
  import logging
7
  import asyncio
 
 
 
 
 
 
 
8
 
9
  # Load environment variables
10
  load_dotenv()
11
 
 
 
 
 
12
  # Configure logging
13
  logging.basicConfig(
14
  level=logging.INFO,
@@ -17,8 +28,9 @@ logging.basicConfig(
17
  logger = logging.getLogger(__name__)
18
 
19
  # Define the WhatsApp API URL and Access Token
20
- WHATSAPP_API_URL = os.environ.get("WHATSAPP_API_URL")
21
- ACCESS_TOKEN = os.environ.get("ACCESS_TOKEN")
 
22
 
23
  # Validate environment variables
24
  if not WHATSAPP_API_URL or not ACCESS_TOKEN:
@@ -69,24 +81,171 @@ async def generate_reply(sender: str, content: str, timestamp: int) -> str:
69
  async def process_message_with_retry(
70
  sender_id: str,
71
  content: str,
72
- timestamp: int
 
 
 
 
73
  ) -> Dict[str, Any]:
74
  """Process message with retry logic"""
75
- retries = 3
76
- delay = 2 # Initial delay in seconds
77
-
78
- for attempt in range(retries):
79
- try:
80
- generated_reply = await generate_reply(sender_id, content, timestamp)
81
- response = await send_reply(sender_id, generated_reply)
82
- return {"status": "success", "reply": generated_reply, "response": response}
83
- except Exception as e:
84
- logger.error(f"Attempt {attempt + 1} failed: {str(e)}", exc_info=True)
85
- if attempt < retries - 1:
86
- await asyncio.sleep(delay)
87
- delay *= 2 # Exponential backoff
88
- else:
89
- raise Exception(f"All {retries} attempts failed.") from e
 
 
 
 
 
90
 
91
  # Example usage
92
  # asyncio.run(process_message_with_retry("1234567890", "hello", 1700424056000))
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
  import os
2
  import httpx
3
  from dotenv import load_dotenv
4
+ from typing import Dict, Any, Optional, List
5
  from datetime import datetime
6
  import logging
7
  import asyncio
8
+ from openai import AsyncOpenAI
9
+ import google.generativeai as genai
10
+ import PIL.Image
11
+ from typing import List, Dict, Any, Optional
12
+
13
+ from app.utils.load_env import ACCESS_TOKEN, WHATSAPP_API_URL, GEMNI_API, OPENAI_API
14
+
15
 
16
  # Load environment variables
17
  load_dotenv()
18
 
19
+ genai.configure(api_key=GEMNI_API)
20
+ client = AsyncOpenAI(api_key = OPENAI_API)
21
+ # ACCESS_TOKEN = "EAAQNYsHYo2cBO30HZCtSkpRppS8ZA6Du2MV9PPFFepInIb2og5cjP1ZAZBJrJhHTlkmcrcN0BS6NZAqMltXhOZBzAyGdNaZCL6XtC6ZCdcug5JR5fZCdXVMzOBbuwPKPBwZABhFsWDIAORYaCI97ajvUxrZCSALNZAIH1E5hs2ZApETvWoVMiZCnoT5MqpyAZBoFFJ84K6vFgZDZD"
22
+
23
  # Configure logging
24
  logging.basicConfig(
25
  level=logging.INFO,
 
28
  logger = logging.getLogger(__name__)
29
 
30
  # Define the WhatsApp API URL and Access Token
31
+ # WHATSAPP_API_URL = os.environ.get("WHATSAPP_API_URL")
32
+ # WHATSAPP_API_URL = "https://graph.facebook.com/v21.0/360447720489034/messages"
33
+ # ACCESS_TOKEN = os.environ.get("ACCESS_TOKEN")
34
 
35
  # Validate environment variables
36
  if not WHATSAPP_API_URL or not ACCESS_TOKEN:
 
81
  async def process_message_with_retry(
82
  sender_id: str,
83
  content: str,
84
+ history: List[str],
85
+ timestamp: Optional[int] = None,
86
+ media: Optional[Dict[str, Any]] = None,
87
+ image_file_path: Optional[str] = None,
88
+ doc_path: Optional[str] = None,
89
  ) -> Dict[str, Any]:
90
  """Process message with retry logic"""
91
+ retries = 1
92
+ delay = 0.1 # Initial delay in seconds
93
+
94
+ # for attempt in range(retries):
95
+ try:
96
+ logger.info(f"Sending message to the Gemini model...")
97
+ generated_reply = await generate_response_from_gemini(sender = sender_id, content=content, history = history, timestamp = timestamp, image_file_path = image_file_path, media=media, doc_path = doc_path)
98
+ logger.info(f"Reply generated: {generated_reply}")
99
+ response = await send_reply(sender_id, generated_reply)
100
+ return generated_reply
101
+ return {"status": "success", "reply": generated_reply, "response": response}
102
+ except Exception as e:
103
+ logger.error(f"Error generating reply: {str(e)}", exc_info=True)
104
+ return {"status": "error", "reply": "Sorry, I couldn't generate a response at this time."}
105
+ # logger.error(f"Attempt {attempt + 1} failed: {str(e)}", exc_info=True)
106
+ # if attempt < retries - 1:
107
+ # await asyncio.sleep(delay)
108
+ # delay *= 2 # Exponential backoff
109
+ # else:
110
+ # raise Exception(f"All {retries} attempts failed.") from e
111
 
112
  # Example usage
113
  # asyncio.run(process_message_with_retry("1234567890", "hello", 1700424056000))
114
+
115
+
116
+ async def generate_response_from_gemini(sender: str, content: str, timestamp: str, history: List[Dict[str, str]], media: Optional[Dict[str, Any]] = None, image_file_path: Optional[str] = None, doc_path: Optional[str] = None) -> str:
117
+ try:
118
+ print(f"Sender: {sender}")
119
+ print(f"Content: {content}")
120
+ print(f"Timestamp: {timestamp}")
121
+ print(f"History: {history}")
122
+ print(f"Media: {media}")
123
+
124
+ # Initialize the model
125
+ model = genai.GenerativeModel("gemini-1.5-pro-002")
126
+
127
+ # Define the chat history
128
+ chat = model.start_chat(
129
+ history=history
130
+ )
131
+ logger.info(f"file_path: {image_file_path}")
132
+ if image_file_path: # Should be bytes or a file-like object
133
+
134
+
135
+ prompt = "Describe the following image:"
136
+ image_data = PIL.Image.open(image_file_path)
137
+
138
+ print("Sending image to the Gemini model...")
139
+ response = await chat.send_message_async(image_data)
140
+ print(f"Model response: {response.text}")
141
+ return response.text
142
+
143
+ if doc_path:
144
+ doc_data = genai.upload_file(doc_path)
145
+ print("Sending document to the Gemini model...")
146
+ response = await chat.send_message_async(doc_data)
147
+ print(f"Model response: {response.text}")
148
+ return response.text
149
+
150
+ # Send the user's message
151
+ print("Sending message to the Gemini model...")
152
+ response = await chat.send_message_async(content)
153
+ print(f"Model response: {response.text}")
154
+
155
+ return response.text
156
+
157
+ except Exception as e:
158
+ print("Error generating reply from Gemini:", e)
159
+ return "Sorry, I couldn't generate a response at this time."
160
+
161
+
162
+ async def generate_response_from_chatgpt(sender: str, content: str, timestamp: str, history: str) -> str:
163
+ """
164
+ Generate a reply using OpenAI's ChatGPT API.
165
+ """
166
+ try:
167
+ # # Initialize chat history if not provided
168
+ # chat_history = chat_history or []
169
+
170
+ # # Append the current user message to the chat history
171
+ # chat_history.append({"role": "user", "content": f"From {sender} at {timestamp}: {content}"})
172
+
173
+ messages = [
174
+ {"role": "system", "content": "You're an investor, a serial founder, and you've sold many startups. You understand nothing but business."},
175
+ {"role": "system", "content": f"Message History: {history}"},
176
+ {"role": "user", "content": f"From {sender} at {timestamp}: {content}"}
177
+ ]
178
+
179
+ print(f"Messages: {messages}")
180
+
181
+ response = await client.chat.completions.create(
182
+ model="gpt-3.5-turbo",
183
+ messages=messages,
184
+ max_tokens=200,
185
+ temperature=0.5
186
+ )
187
+
188
+ chatgpt_response = response.choices[0].message.content.strip()
189
+ # Append the assistant's response to the chat history
190
+ # chat_history.append({"role": "assistant", "content": chatgpt_response})
191
+ return chatgpt_response
192
+
193
+ except Exception as e:
194
+ print("Error generating reply:", e)
195
+ return "Sorry, I couldn't generate a response at this time."
196
+
197
+ # async def generate_response_from_chatgpt(
198
+ # sender: str,
199
+ # content: str,
200
+ # timestamp: str,
201
+ # image: Optional[bytes] = None,
202
+ # file: Optional[bytes] = None,
203
+ # file_name: Optional[str] = None,
204
+ # chat_history: Optional[List[Dict[str, str]]] = None,
205
+ # ) -> Dict[str, Any]:
206
+ # """
207
+ # Generate a reply using OpenAI's GPT-4 API, including support for images, files, and maintaining chat history.
208
+ # """
209
+ # try:
210
+ # # Initialize chat history if not provided
211
+ # chat_history = chat_history or []
212
+
213
+ # # Append the current user message to the chat history
214
+ # chat_history.append({"role": "user", "content": f"From {sender} at {timestamp}: {content}"})
215
+
216
+ # # Prepare files for the request
217
+ # files = []
218
+ # if image:
219
+ # files.append({"name": "image.png", "type": "image/png", "content": image})
220
+ # if file:
221
+ # files.append({"name": file_name or "file.txt", "type": "application/octet-stream", "content": file})
222
+
223
+ # logger.debug(f"Chat History Before Response: {chat_history}")
224
+
225
+ # # Send the request to the GPT-4 API
226
+ # response = await client.chat.completions.create(
227
+ # model="gpt-4-vision", # Ensure this is the correct model for multimodal support
228
+ # messages=chat_history,
229
+ # files=files if files else None, # Include files if present
230
+ # max_tokens=200,
231
+ # temperature=0.5,
232
+ # )
233
+
234
+ # # Parse the assistant's response
235
+ # chatgpt_response = response.choices[0].message.content.strip()
236
+
237
+ # # Append the assistant's response to the chat history
238
+ # chat_history.append({"role": "assistant", "content": chatgpt_response})
239
+
240
+ # logger.debug(f"Chat History After Response: {chat_history}")
241
+
242
+ # # Return both the assistant's response and the updated chat history
243
+ # return {"response": chatgpt_response, "chat_history": chat_history}
244
+
245
+ # except Exception as e:
246
+ # logger.error("Error generating reply", exc_info=True)
247
+ # return {"response": "Sorry, I couldn't generate a response at this time.", "chat_history": chat_history}
248
+
249
+
250
+
251
+
app/services/response.py ADDED
File without changes
app/services/webhook_handler.py ADDED
@@ -0,0 +1,58 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import logging
2
+ from fastapi import Request, status
3
+ from fastapi.responses import JSONResponse, Response
4
+ from fastapi.exceptions import HTTPException
5
+ from app.services.cache import MessageCache
6
+ from app.services.message import process_message_with_retry
7
+ from app.services.download_media import download_whatsapp_media
8
+ from app.utils.handle_message import handle_message
9
+ from app.utils.load_env import ACCESS_TOKEN
10
+ import time
11
+
12
+ logger = logging.getLogger(__name__)
13
+ message_cache = MessageCache()
14
+ user_chats = {}
15
+
16
+ async def webhook(request: Request):
17
+ request_id = f"req_{int(time.time()*1000)}"
18
+
19
+ payload = await request.json()
20
+ logger.info(f"Processing webhook request {payload}")
21
+ processed_count = 0
22
+ error_count = 0
23
+ results = []
24
+
25
+ entries = payload.get("entry", [])
26
+ for entry in entries:
27
+ entry_id = entry.get("id")
28
+ logger.info(f"Processing entry_id: {entry_id}")
29
+
30
+ changes = entry.get("changes", [])
31
+ for change in changes:
32
+ messages = change.get("value", {}).get("messages", [])
33
+ media = {}
34
+ for message in messages:
35
+ logger.info(f"Processing message: {message}")
36
+ response = await handle_message(message=message, user_chats = user_chats, message_cache = message_cache, access_token = ACCESS_TOKEN)
37
+
38
+ results.append(response)
39
+
40
+ response_data = {
41
+ "request_id": request_id,
42
+ # "processed": processed_count,
43
+ # "errors": error_count,
44
+ "results": results,
45
+ }
46
+
47
+ logger.info(f"Webhook processing completed - Processed: {processed_count}, Errors: {error_count}")
48
+ return JSONResponse(content=response_data, status_code=status.HTTP_200_OK)
49
+
50
+ async def verify_webhook(request: Request):
51
+ mode = request.query_params.get('hub.mode')
52
+ token = request.query_params.get('hub.verify_token')
53
+ challenge = request.query_params.get('hub.challenge')
54
+
55
+ if mode == 'subscribe' and token == 'test':
56
+ return Response(content=challenge, media_type="text/plain")
57
+ else:
58
+ raise HTTPException(status_code=403, detail="Verification failed")
app/utils/handle_message.py ADDED
@@ -0,0 +1,75 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from app.services.download_media import download_whatsapp_media
2
+ from app.services.message import process_message_with_retry
3
+ from app.services.cache import MessageCache
4
+ import logging
5
+
6
+ message_cache = MessageCache()
7
+ logger = logging.getLogger(__name__)
8
+ async def handle_message(message, user_chats, message_cache, access_token):
9
+ try:
10
+ # Process the message content
11
+ content = message.get("text", {}).get("body")
12
+ message_id = message.get("id")
13
+ image = message.get("image")
14
+ document = message.get("document")
15
+ video = message.get("video")
16
+ sender_id = message.get("from")
17
+
18
+ logger.info(f"Content: {content}")
19
+ logger.info(f"sender_id: {sender_id}")
20
+
21
+ # Download media if exists
22
+ if image:
23
+ file_path = f"{image.get('id')}.jpg"
24
+ logger.info(f"file_path: {file_path}")
25
+ image_file_path = await download_whatsapp_media(image.get("id"), access_token=access_token, file_path=file_path)
26
+ logger.info(f"image file_path: {image_file_path}")
27
+ else:
28
+ image_file_path = None
29
+
30
+ if document:
31
+ filename = message.get("document", {}).get("filename")
32
+ logger.info(f"file_path: {filename}")
33
+ document_file_path = await download_whatsapp_media(document.get("id"), access_token=access_token, file_path=filename)
34
+ logger.info(f"document file_path: {document_file_path}")
35
+ else:
36
+ document_file_path = None
37
+
38
+ if video:
39
+ video_type = message.get("video", {}).get("mime_type")
40
+ video_filename= f"{video.get('id')}.{video_type.split('/')[-1]}"
41
+ logger.info(f"video_type: {video_type}")
42
+ video_file_path = await download_whatsapp_media(video.get("id"), access_token=access_token, file_path=video_filename)
43
+ else:
44
+ video_file_path = None
45
+ # Check for duplicates
46
+ if message_cache.exists(message_id):
47
+ logger.info(f"Duplicate message detected and skipped: {message_id}")
48
+ return {"status": "duplicate", "message_id": message_id}
49
+
50
+ # Initialize chat history
51
+ if sender_id not in user_chats:
52
+ user_chats[sender_id] = []
53
+
54
+
55
+ # Append content to user chat
56
+ if user_chats[sender_id] == []:
57
+ user_chats[sender_id].append({"role": "user", "parts": "This is the chat history so far"})
58
+
59
+ logger.info(f"user_chats: {user_chats}")
60
+
61
+
62
+
63
+ # Process the message
64
+ result = await process_message_with_retry(sender_id, content, user_chats[sender_id], image_file_path = image_file_path, doc_path = document_file_path)
65
+
66
+ if content != None:
67
+ user_chats[sender_id].append({"role": "user", "parts": content})
68
+
69
+ user_chats[sender_id].append({"role": "model", "parts": result})
70
+
71
+ message_cache.add(message_id)
72
+
73
+ return {"status": "success", "message_id": message_id, "result": result}
74
+ except Exception as e:
75
+ return {"status": "error", "message_id": message.get("id"), "error": str(e)}
app/utils/load_env.py ADDED
@@ -0,0 +1,25 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from dotenv import load_dotenv
2
+ import os
3
+
4
+
5
+ # Load the .env file if the environment is development
6
+ ENV = os.getenv("ENV", "production") # Default to production if ENV is not set
7
+
8
+ if ENV == "development":
9
+ print("Environment detected: Development. Loading .env file.")
10
+ load_dotenv()
11
+ else:
12
+ print("Environment detected: Production. Using system environment variables.")
13
+
14
+ # Access your environment variables
15
+ ACCESS_TOKEN = os.getenv("ACCESS_TOKEN")
16
+ WHATSAPP_API_URL = os.getenv("WHATSAPP_API_URL")
17
+ OPENAI_API = os.getenv("OPENAI_API")
18
+ GEMNI_API = os.getenv("GEMINI_API")
19
+
20
+ # Debugging: Print the retrieved ACCESS_TOKEN (for development only)
21
+ if ENV == "development":
22
+ print(f"ACCESS_TOKEN loaded: {ACCESS_TOKEN}")
23
+
24
+
25
+
requirements.txt CHANGED
Binary files a/requirements.txt and b/requirements.txt differ
 
testcode.py ADDED
File without changes
user_media/1058473269087194.jpg ADDED
user_media/1097398131930207.jpg ADDED
user_media/1114757503462834.jpg ADDED
user_media/1206873840414118.jpg ADDED
user_media/1273853277169128.jpg ADDED
user_media/1355136128792793.jpg ADDED
user_media/2054439145017056.jpg ADDED
user_media/3944039259162293.jpg ADDED
user_media/783069647294385.jpg ADDED
user_media/898459985762969.jpg ADDED
user_media/CV_EvanLysandra.pdf ADDED
Binary file (162 kB). View file
 
user_media/Dedy_Resume.pdf ADDED
Binary file (41.1 kB). View file