|
import random |
|
from fastapi import FastAPI, WebSocket, WebSocketDisconnect |
|
import uvicorn |
|
|
|
app = FastAPI() |
|
|
|
parties = {} |
|
users = {} |
|
|
|
class User: |
|
def __init__(self, username, websocket, status): |
|
self.username = username |
|
self.websocket = websocket |
|
self.status = status |
|
self.device_id = None |
|
|
|
def generate_party_code(): |
|
return ''.join([str(random.randint(0, 9)) for _ in range(6)]) |
|
|
|
async def broadcast_message(party_id, sender, message: str): |
|
if party_id in parties: |
|
if message.startswith("[006]"): |
|
formatted_message = f"[system]:{message}" |
|
elif message.startswith("TMPN:") and sender.status == "owner": |
|
formatted_message = message |
|
else: |
|
status = 'Owner' if sender.status == 'owner' else 'User' |
|
formatted_message = f"[chat]:[{status}]:[{sender.username}]:[{sender.device_id}]:{message}" |
|
|
|
for user_name in parties[party_id]['members']: |
|
user = users[user_name] |
|
if user != sender: |
|
|
|
await user.websocket.send_text(formatted_message) |
|
|
|
@app.get("/") |
|
async def root_data(): |
|
return { |
|
"status" : True, |
|
"message" : "Working" |
|
} |
|
|
|
@app.get("/newroom") |
|
async def create_new_room(status: str, link: str, name: str, username: str, device_id: str, password: str=None): |
|
party_code = generate_party_code() |
|
parties[party_code] = { |
|
'owner': username, |
|
'members': [], |
|
'password' : password, |
|
'title' : name, |
|
'link' : link, |
|
'type' : 'Private' |
|
} |
|
|
|
users[username] = User(username.lower(), None, "owner") |
|
users[username].device_id = device_id |
|
|
|
return { |
|
"status" : True, |
|
"party" : party_code, |
|
"message" : "Created success" |
|
} |
|
|
|
@app.websocket("/ws") |
|
async def party_server(websocket: WebSocket): |
|
await websocket.accept() |
|
username = None |
|
party_code = None |
|
password = None |
|
user_status = None |
|
|
|
await websocket.send_text("[system]:[001]:ENTER DATA:") |
|
data = await websocket.receive_text() |
|
data = data.lower() |
|
action = data.split("|")[0] |
|
|
|
if not action.lower() in ["create", "join"]: |
|
await websocket.send_text("[system]:[004]:Invalid action") |
|
await websocket.close(1000, reason="Invalid action") |
|
return |
|
|
|
if action.lower() == "create": |
|
if data.count("|") != 5: |
|
await websocket.send_text("[system]:[004]:Invalid input data") |
|
await websocket.close(1000, "Invalid input data") |
|
return |
|
|
|
else: |
|
if data.count("|") != 3: |
|
await websocket.send_text("[system]:[004]:Invalid input data") |
|
await websocket.close(1000, "Invalid input data") |
|
return |
|
|
|
if data.lower() == "system": |
|
await websocket.send_text("[system]:[004]:You can't set system as your name") |
|
await websocket.close(1000, "You can't set system as your name") |
|
return |
|
|
|
username = data.split("|")[-2] |
|
device_id = data.split("|")[-1] |
|
|
|
if not username in users: |
|
users[username] = User(username, websocket, "user") |
|
|
|
if action == "create": |
|
link = data.split("|")[2] |
|
name = data.split("|")[3] |
|
act = data.split("|")[1] |
|
if "[" in act: |
|
act = act.split("[")[0].strip().lower() |
|
if act == "private": |
|
password = data.split("|")[1].split("[")[1].split("]")[0] |
|
party_code = generate_party_code() |
|
party_code = str(party_code) |
|
parties[party_code] = { |
|
'owner': username, |
|
'members': [username], |
|
'password' : password, |
|
'title' : name, |
|
'link' : link, |
|
'type' : 'Private' |
|
} |
|
|
|
else: |
|
party_code = generate_party_code() |
|
parties[party_code] = { |
|
'owner': username, |
|
'members': [username], |
|
'password' : None, |
|
'title' : name, |
|
'link' : link, |
|
'type' : 'Public' |
|
} |
|
|
|
users[username].status = "owner" |
|
users[username].device_id = device_id |
|
users[username].websocket = websocket |
|
await websocket.send_text(f"[system]:[006]:Party created! Your party code is :{party_code}.") |
|
await broadcast_message(party_code, users[username], f'[system]::{username} has created the party.') |
|
|
|
elif action == "join": |
|
party_code = data.split("|")[1] |
|
party_code = str(party_code) |
|
if "[" in party_code: |
|
password = party_code.split("[")[1].split("]")[0] |
|
party_code = party_code.split("[")[0].strip() |
|
if party_code in parties: |
|
if parties.get(party_code).get('password') != None: |
|
|
|
if parties.get(party_code).get('password') != password: |
|
await websocket.send_text(f"[system]:[004]: Invalid password") |
|
await websocket.close(1000, "Invalid password") |
|
return |
|
|
|
if device_id == users[parties[party_code]['owner']].device_id: |
|
parties[party_code]['owner'] = username |
|
users[username].status = "owner" |
|
|
|
else: |
|
users[username].status = "user" |
|
|
|
parties[party_code]['members'].append(username) |
|
users[username].device_id = device_id |
|
users[username].websocket = websocket |
|
|
|
await websocket.send_text(f"[system]:[006]:You have joined the party with code {party_code}.") |
|
await websocket.send_text(f"[info]:[006]:{parties[party_code]['title']}|{parties[party_code]['link']}|{len(parties[party_code]['members'])}") |
|
await broadcast_message(party_code, users[username], f'[006]:{username} has joined the party.') |
|
print(f"Sending brodcast message to {party_code}, {username}") |
|
else: |
|
await websocket.send_text("[system]:[004]:Invalid party code.") |
|
await websocket.close(code=1000, reason="Invalid party code") |
|
|
|
try: |
|
while True: |
|
data = await websocket.receive_text() |
|
if data.startswith("[006]:"): |
|
await websocket.send_text("[system]:[004]:You can't send this message") |
|
elif data.startswith("[004]:"): |
|
await websocket.send_text("[system]:[004]:You can't send this message") |
|
elif data.startswith("[001]:"): |
|
await websocket.send_text("[system]:[004]:You can't send this message") |
|
elif party_code: |
|
await broadcast_message(party_code, users[username], data) |
|
except WebSocketDisconnect: |
|
if party_code and party_code in parties: |
|
parties[party_code]['members'].remove(username) |
|
if parties[party_code]['owner'] == username: |
|
await broadcast_message(party_code, users[username], f'[system]:{username} (Owner) has left the party. All members are being kicked out.') |
|
for member in list(parties[party_code]['members']): |
|
member_user = users[member] |
|
await member_user.websocket.send_text("[system]:[004]:The owner has left, and the party is deleted.") |
|
parties[party_code]['members'].remove(member) |
|
del parties[party_code] |
|
else: |
|
await broadcast_message(party_code, users[username], f'[system]:[004]:{username} has left the party.') |
|
del users[username] |
|
|
|
if __name__ == "__main__": |
|
uvicorn.run(app, host="0.0.0.0", port=5000) |
|
|