Spaces:
Runtime error
Runtime error
import asyncpg | |
import aioredis | |
from loguru import logger | |
from app.core.config import settings | |
async def test_postgres_connection(database_url: str): | |
try: | |
conn = await asyncpg.connect(database_url) | |
logger.info("Successfully connected to PostgreSQL!") | |
await conn.close() | |
except Exception as e: | |
logger.error(f"Failed to connect to PostgreSQL: {e}") | |
async def test_redis_connection(redis_url: str): | |
try: | |
redis = await aioredis.from_url(redis_url) | |
await redis.ping() # Send a ping to check connection | |
logger.info("Successfully connected to Redis!") | |
await redis.close() | |
except Exception as e: | |
logger.error(f"Failed to connect to Redis: {e}") | |
async def main(): | |
logger.info(f"Settings: {settings.dict()}") | |
await test_postgres_connection(settings.database_url.replace("+asyncpg", "")) | |
await test_redis_connection(settings.redis_url) | |
if __name__ == "__main__": | |
import asyncio | |
asyncio.run(main()) | |