import random from fastapi import FastAPI, WebSocket, WebSocketDisconnect import uvicorn app = FastAPI() parties = {} users = {} class User: def __init__(self, username, websocket, status): self.username = username self.websocket = websocket self.status = status self.device_id = None def generate_party_code(): return ''.join([str(random.randint(0, 9)) for _ in range(6)]) async def broadcast_message(party_id, sender, message: str): if party_id in parties: if message.startswith("[006]"): formatted_message = f"[system]:{message}" elif message.startswith("TMPN:") and sender.status == "owner": formatted_message = message else: status = 'Owner' if sender.status == 'owner' else 'User' formatted_message = f"[chat]:[{status}]:[{sender.username}]:[{sender.device_id}]:{message}" for user_name in parties[party_id]['members']: user = users[user_name] if user != sender: # if user.websocket: await user.websocket.send_text(formatted_message) @app.get("/") async def root_data(): return { "status" : True, "message" : "Working" } @app.get("/newroom") async def create_new_room(status: str, link: str, name: str, username: str, device_id: str, password: str=None): party_code = generate_party_code() parties[party_code] = { 'owner': username, 'members': [], 'password' : password, 'title' : name, 'link' : link, 'type' : 'Private' } users[username] = User(username.lower(), None, "owner") users[username].device_id = device_id return { "status" : True, "party" : party_code, "message" : "Created success" } @app.websocket("/ws") async def party_server(websocket: WebSocket): await websocket.accept() username = None party_code = None password = None user_status = None await websocket.send_text("[system]:[001]:ENTER DATA:") data = await websocket.receive_text() data = data.lower() action = data.split("|")[0] if not action.lower() in ["create", "join"]: await websocket.send_text("[system]:[004]:Invalid action") await websocket.close(1000, reason="Invalid action") return if action.lower() == "create": if data.count("|") != 5: await websocket.send_text("[system]:[004]:Invalid input data") await websocket.close(1000, "Invalid input data") return else: if data.count("|") != 3: await websocket.send_text("[system]:[004]:Invalid input data") await websocket.close(1000, "Invalid input data") return if data.lower() == "system": await websocket.send_text("[system]:[004]:You can't set system as your name") await websocket.close(1000, "You can't set system as your name") return username = data.split("|")[-2] device_id = data.split("|")[-1] if not username in users: users[username] = User(username, websocket, "user") if action == "create": link = data.split("|")[2] name = data.split("|")[3] act = data.split("|")[1] if "[" in act: act = act.split("[")[0].strip().lower() if act == "private": password = data.split("|")[1].split("[")[1].split("]")[0] party_code = generate_party_code() party_code = str(party_code) parties[party_code] = { 'owner': username, 'members': [username], 'password' : password, 'title' : name, 'link' : link, 'type' : 'Private' } else: party_code = generate_party_code() parties[party_code] = { 'owner': username, 'members': [username], 'password' : None, 'title' : name, 'link' : link, 'type' : 'Public' } users[username].status = "owner" users[username].device_id = device_id users[username].websocket = websocket await websocket.send_text(f"[system]:[006]:Party created! Your party code is :{party_code}.") await broadcast_message(party_code, users[username], f'[system]::{username} has created the party.') elif action == "join": party_code = data.split("|")[1] party_code = str(party_code) if "[" in party_code: password = party_code.split("[")[1].split("]")[0] # Get and verify the password party_code = party_code.split("[")[0].strip() if party_code in parties: if parties.get(party_code).get('password') != None: # Its a private party if parties.get(party_code).get('password') != password: await websocket.send_text(f"[system]:[004]: Invalid password") await websocket.close(1000, "Invalid password") return # if device_id == parties[party_code]['owner'] if device_id == users[parties[party_code]['owner']].device_id: parties[party_code]['owner'] = username users[username].status = "owner" else: users[username].status = "user" parties[party_code]['members'].append(username) users[username].device_id = device_id users[username].websocket = websocket await websocket.send_text(f"[system]:[006]:You have joined the party with code {party_code}.") await websocket.send_text(f"[info]:[006]:{parties[party_code]['title']}|{parties[party_code]['link']}|{len(parties[party_code]['members'])}") await broadcast_message(party_code, users[username], f'[006]:{username} has joined the party.') print(f"Sending brodcast message to {party_code}, {username}") else: await websocket.send_text("[system]:[004]:Invalid party code.") await websocket.close(code=1000, reason="Invalid party code") try: while True: data = await websocket.receive_text() if data.startswith("[006]:"): await websocket.send_text("[system]:[004]:You can't send this message") elif data.startswith("[004]:"): await websocket.send_text("[system]:[004]:You can't send this message") elif data.startswith("[001]:"): await websocket.send_text("[system]:[004]:You can't send this message") elif party_code: await broadcast_message(party_code, users[username], data) except WebSocketDisconnect: if party_code and party_code in parties: parties[party_code]['members'].remove(username) if parties[party_code]['owner'] == username: await broadcast_message(party_code, users[username], f'[system]:{username} (Owner) has left the party. All members are being kicked out.') for member in list(parties[party_code]['members']): member_user = users[member] await member_user.websocket.send_text("[system]:[004]:The owner has left, and the party is deleted.") parties[party_code]['members'].remove(member) del parties[party_code] else: await broadcast_message(party_code, users[username], f'[system]:[004]:{username} has left the party.') del users[username] if __name__ == "__main__": uvicorn.run(app, host="0.0.0.0", port=5000)