Spaces:
Sleeping
Sleeping
import httpx | |
from app.core.config import settings | |
from typing import Optional, Dict, Any | |
API_BASE_URL = settings.API_BASE_URL # Lấy base URL từ config | |
async def get(endpoint: str, params: Optional[Dict[str, Any]] = None, headers: Optional[Dict[str, str]] = None): | |
"""Gọi API GET""" | |
url = f"{API_BASE_URL}{endpoint}" | |
headers = headers or {} | |
async with httpx.AsyncClient() as client: | |
try: | |
response = await client.get(url, headers=headers, params=params) | |
response.raise_for_status() | |
return response.json() | |
except httpx.HTTPStatusError as http_err: | |
return {"error": f"HTTP {http_err.response.status_code}: {http_err.response.text}"} | |
except Exception as err: | |
return {"error": f"Request failed: {str(err)}"} | |
async def post(endpoint: str , payload: Dict[str, Any] = None,headers: Optional[Dict[str, str]] = None): | |
"""Gọi API POST""" | |
url = f"{API_BASE_URL}{endpoint}" | |
headers = headers or {"Content-Type": "application/json"} | |
async with httpx.AsyncClient() as client: | |
try: | |
response = await client.post(url, json=payload, headers=headers) | |
response.raise_for_status() | |
return response.json() | |
except httpx.HTTPStatusError as http_err: | |
return {"error": f"HTTP {http_err.response.status_code}: {http_err.response.text}"} | |
except Exception as err: | |
return {"error": f"Request failed: {str(err)}"} | |
async def put_api(endpoint: str, payload: Dict[str, Any], headers: Optional[Dict[str, str]] = None): | |
"""Gọi API PUT""" | |
url = f"{API_BASE_URL}{endpoint}" | |
headers = headers or {"Content-Type": "application/json"} | |
async with httpx.AsyncClient() as client: | |
try: | |
response = await client.put(url, json=payload, headers=headers) | |
response.raise_for_status() | |
return response.json() | |
except httpx.HTTPStatusError as http_err: | |
return {"error": f"HTTP {http_err.response.status_code}: {http_err.response.text}"} | |
except Exception as err: | |
return {"error": f"Request failed: {str(err)}"} | |
async def delete_api(endpoint: str, params: Optional[Dict[str, Any]] = None, headers: Optional[Dict[str, str]] = None): | |
"""Gọi API DELETE""" | |
url = f"{API_BASE_URL}{endpoint}" | |
headers = headers or {} | |
async with httpx.AsyncClient() as client: | |
try: | |
response = await client.delete(url, headers=headers, params=params) | |
response.raise_for_status() | |
return response.json() | |
except httpx.HTTPStatusError as http_err: | |
return {"error": f"HTTP {http_err.response.status_code}: {http_err.response.text}"} | |
except Exception as err: | |
return {"error": f"Request failed: {str(err)}"} | |