instagram-test / app.py
jonathanjordan21's picture
Update app.py
5b0e283 verified
# from fastapi import FastAPI, Query, HTTPException
import hmac
import hashlib
from fastapi import FastAPI, Header, HTTPException, Request, Query
from typing import Optional
app = FastAPI()
VERIFY_TOKEN = "lintasmediadanawa" # Replace with your actual verify token
@app.get("/webhooks")
async def handle_webhook(
hub_mode: str = Query(..., alias="hub.mode"),
hub_challenge: int = Query(..., alias="hub.challenge"),
hub_verify_token: str = Query(..., alias="hub.verify_token")
):
if hub_mode == "subscribe" and hub_verify_token == VERIFY_TOKEN:
return int(hub_challenge)
else:
raise HTTPException(status_code=403, detail="Verification failed")
# @app.post("/webhooks")
# async def handle_event_notifications(
# request: Request,
# x_hub_signature_256: Optional[str] = Header(None) # Header for signature verification
# ):
# # # Read and verify the request body
# # body = await request.body()
# # # Verify the X-Hub-Signature-256 header
# # if not x_hub_signature_256:
# # raise HTTPException(status_code=400, detail="Missing X-Hub-Signature-256 header")
# # # Compute the expected signature
# # expected_signature = (
# # "sha256="
# # + hmac.new(VERIFY_TOKEN.encode(), body, hashlib.sha256).hexdigest()
# # )
# # if not hmac.compare_digest(expected_signature, x_hub_signature_256):
# # raise HTTPException(status_code=403, detail="Signature verification failed")
# print("POST SUCCESS!", request)
# # Parse the JSON payload
# payload = await request.json()
# print("POST SUCCESS!", request)
# print(payload)
# object_type = payload.get("object")
# entries = payload.get("entry", [])
# # Log the received event (or process as needed)
# for entry in entries:
# changes = entry.get("changes", [])
# for change in changes:
# field = change.get("field")
# value = change.get("value")
# # Handle specific fields or values as required
# print(f"Received change for field: {field}, value: {value}")
# # Return a 200 OK response to acknowledge receipt
# return {"status": "ok"}
@app.post("/webhooks")
async def get_all_params(request: Request):
print("POST GET ALL PARAMS!")
# Get headers
headers = dict(request.headers)
# Get query parameters
query_params = dict(request.query_params)
# Get the body (as JSON or raw data)
try:
body = await request.json() # Parse JSON if possible
except Exception:
body = await request.body() # Fall back to raw body if not JSON
# Combine all details
all_params = {
"headers": headers,
"query_params": query_params,
"body": body,
}
# Log the params (optional)
print(all_params)
return all_params