personTst / server.py
5m4ck3r's picture
Update server.py
4982eb6 verified
import random
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
import uvicorn
app = FastAPI()
parties = {}
users = {}
class User:
def __init__(self, username, websocket, status):
self.username = username
self.websocket = websocket
self.status = status
self.device_id = None
def generate_party_code():
return ''.join([str(random.randint(0, 9)) for _ in range(6)])
async def broadcast_message(party_id, sender, message: str):
if party_id in parties:
if message.startswith("[006]"):
formatted_message = f"[system]:{message}"
elif message.startswith("TMPN:") and sender.status == "owner":
formatted_message = message
else:
status = 'Owner' if sender.status == 'owner' else 'User'
formatted_message = f"[chat]:[{status}]:[{sender.username}]:[{sender.device_id}]:{message}"
for user_name in parties[party_id]['members']:
user = users[user_name]
if user != sender:
# if user.websocket:
await user.websocket.send_text(formatted_message)
@app.get("/")
async def root_data():
return {
"status" : True,
"message" : "Working"
}
@app.get("/newroom")
async def create_new_room(status: str, link: str, name: str, username: str, device_id: str, password: str=None):
party_code = generate_party_code()
parties[party_code] = {
'owner': username,
'members': [],
'password' : password,
'title' : name,
'link' : link,
'type' : 'Private'
}
users[username] = User(username.lower(), None, "owner")
users[username].device_id = device_id
return {
"status" : True,
"party" : party_code,
"message" : "Created success"
}
@app.websocket("/ws")
async def party_server(websocket: WebSocket):
await websocket.accept()
username = None
party_code = None
password = None
user_status = None
await websocket.send_text("[system]:[001]:ENTER DATA:")
data = await websocket.receive_text()
data = data.lower()
action = data.split("|")[0]
if not action.lower() in ["create", "join"]:
await websocket.send_text("[system]:[004]:Invalid action")
await websocket.close(1000, reason="Invalid action")
return
if action.lower() == "create":
if data.count("|") != 5:
await websocket.send_text("[system]:[004]:Invalid input data")
await websocket.close(1000, "Invalid input data")
return
else:
if data.count("|") != 3:
await websocket.send_text("[system]:[004]:Invalid input data")
await websocket.close(1000, "Invalid input data")
return
if data.lower() == "system":
await websocket.send_text("[system]:[004]:You can't set system as your name")
await websocket.close(1000, "You can't set system as your name")
return
username = data.split("|")[-2]
device_id = data.split("|")[-1]
if not username in users:
users[username] = User(username, websocket, "user")
if action == "create":
link = data.split("|")[2]
name = data.split("|")[3]
act = data.split("|")[1]
if "[" in act:
act = act.split("[")[0].strip().lower()
if act == "private":
password = data.split("|")[1].split("[")[1].split("]")[0]
party_code = generate_party_code()
party_code = str(party_code)
parties[party_code] = {
'owner': username,
'members': [username],
'password' : password,
'title' : name,
'link' : link,
'type' : 'Private'
}
else:
party_code = generate_party_code()
parties[party_code] = {
'owner': username,
'members': [username],
'password' : None,
'title' : name,
'link' : link,
'type' : 'Public'
}
users[username].status = "owner"
users[username].device_id = device_id
users[username].websocket = websocket
await websocket.send_text(f"[system]:[006]:Party created! Your party code is :{party_code}.")
await broadcast_message(party_code, users[username], f'[system]::{username} has created the party.')
elif action == "join":
party_code = data.split("|")[1]
party_code = str(party_code)
if "[" in party_code:
password = party_code.split("[")[1].split("]")[0] # Get and verify the password
party_code = party_code.split("[")[0].strip()
if party_code in parties:
if parties.get(party_code).get('password') != None:
# Its a private party
if parties.get(party_code).get('password') != password:
await websocket.send_text(f"[system]:[004]: Invalid password")
await websocket.close(1000, "Invalid password")
return
# if device_id == parties[party_code]['owner']
if device_id == users[parties[party_code]['owner']].device_id:
parties[party_code]['owner'] = username
users[username].status = "owner"
else:
users[username].status = "user"
parties[party_code]['members'].append(username)
users[username].device_id = device_id
users[username].websocket = websocket
await websocket.send_text(f"[system]:[006]:You have joined the party with code {party_code}.")
await websocket.send_text(f"[info]:[006]:{parties[party_code]['title']}|{parties[party_code]['link']}|{len(parties[party_code]['members'])}")
await broadcast_message(party_code, users[username], f'[006]:{username} has joined the party.')
print(f"Sending brodcast message to {party_code}, {username}")
else:
await websocket.send_text("[system]:[004]:Invalid party code.")
await websocket.close(code=1000, reason="Invalid party code")
try:
while True:
data = await websocket.receive_text()
if data.startswith("[006]:"):
await websocket.send_text("[system]:[004]:You can't send this message")
elif data.startswith("[004]:"):
await websocket.send_text("[system]:[004]:You can't send this message")
elif data.startswith("[001]:"):
await websocket.send_text("[system]:[004]:You can't send this message")
elif party_code:
await broadcast_message(party_code, users[username], data)
except WebSocketDisconnect:
if party_code and party_code in parties:
parties[party_code]['members'].remove(username)
if parties[party_code]['owner'] == username:
await broadcast_message(party_code, users[username], f'[system]:{username} (Owner) has left the party. All members are being kicked out.')
for member in list(parties[party_code]['members']):
member_user = users[member]
await member_user.websocket.send_text("[system]:[004]:The owner has left, and the party is deleted.")
parties[party_code]['members'].remove(member)
del parties[party_code]
else:
await broadcast_message(party_code, users[username], f'[system]:[004]:{username} has left the party.')
del users[username]
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=5000)