Spaces:
Running
on
Zero
Running
on
Zero
File size: 4,957 Bytes
1f074d8 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 |
from fastapi import FastAPI
from fastapi.responses import JSONResponse
from fastapi.requests import Request
from fastapi.security.utils import get_authorization_scheme
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, DateTime, ForeignKey
from sqlalchemy.orm import sessionmaker, relationship
from pydantic import BaseModel
from typing import List
app = FastAPI()
Base = declarative_base()
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True)
username = Column(String, unique=True)
password = Column(String)
profile = Column(String)
team_id = Column(Integer, ForeignKey("teams.id"))
team = relationship("Team", backref="users")
class Team(Base):
__tablename__ = "teams"
id = Column(Integer, primary_key=True)
name = Column(String, unique=True)
engine = create_async_engine("sqlite:///database.sqlite")
Base.metadata.create_all(engine)
async_session = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
@app.on_event("startup")
async def startup():
async with async_session() as session:
await session.execute("PRAGMA foreign_keys=ON")
@app.on_event("shutdown")
async def shutdown():
await engine.dispose()
class UserSchema(BaseModel):
username: str
password: str
profile: str
team_id: int
class TeamSchema(BaseModel):
name: str
@app.post("/register")
async def register_user(user: UserSchema):
async with async_session() as session:
existing_user = await session.execute(User.__table__.select().where(User.username == user.username))
if existing_user.scalar():
return JSONResponse(status_code=400, content={"error": "Username already exists"})
new_user = User(username=user.username, password=user.password, profile=user.profile, team_id=user.team_id)
session.add(new_user)
await session.commit()
return JSONResponse(status_code=201, content={"message": "User created successfully"})
@app.post("/login")
async def login_user(username: str, password: str):
async with async_session() as session:
user = await session.execute(User.__table__.select().where(User.username == username))
user = user.scalar()
if not user or user.password != password:
return JSONResponse(status_code=401, content={"error": "Invalid username or password"})
return JSONResponse(status_code=200, content={"message": "Logged in successfully"})
@app.get("/teams/")
async def get_teams():
async with async_session() as session:
teams = await session.execute(Team.__table__.select())
teams = teams.scalars().all()
return JSONResponse(status_code=200, content=[{"id": team.id, "name": team.name} for team in teams])
@app.post("/teams/")
async def create_team(team: TeamSchema):
async with async_session() as session:
new_team = Team(name=team.name)
session.add(new_team)
await session.commit()
return JSONResponse(status_code=201, content={"message": "Team created successfully"})
@app.get("/users/")
async def get_users():
async with async_session() as session:
users = await session.execute(User.__table__.select())
users = users.scalars().all()
return JSONResponse(status_code=200, content=[{"id": user.id, "username": user.username, "profile": user.profile} for user in users])
@app.get("/users/{user_id}")
async def get_user(user_id: int):
async with async_session() as session:
user = await session.execute(User.__table__.select().where(User.id == user_id))
user = user.scalar()
if not user:
return JSONResponse(status_code=404, content={"error": "User not found"})
return JSONResponse(status_code=200, content={"username": user.username, "profile": user.profile, "team_id": user.team_id})
@app.put("/users/{user_id}")
async def update_user(user_id: int, user: UserSchema):
async with async_session() as session:
user_db = await session.execute(User.__table__.select().where(User.id == user_id))
user_db = user_db.scalar()
if not user_db:
return JSONResponse(status_code=404, content={"error": "User not found"})
user_db.username = user.username
user_db.profile = user.profile
user_db.team_id = user.team_id
await session.commit()
return JSONResponse(status_code=200, content={"message": "User updated successfully"})
@app.get("/search")
async def search_users(q: str):
async with async_session() as session:
users = await session.execute(User.__table__.select().where(User.profile.like(f"%{q}%")))
users = users.scalars().all()
return JSONResponse(status_code=200, content=[{"id": user.id, "username": user.username, "profile": user.profile} for user in users]) |