File size: 7,838 Bytes
1784957 f88f1f8 1784957 4c560e4 1784957 4c560e4 1784957 4c560e4 1784957 4982eb6 1784957 4982eb6 1784957 f88f1f8 1784957 f88f1f8 1784957 a05f91b 4982eb6 f88f1f8 4982eb6 f88f1f8 4982eb6 f88f1f8 51e44ec f88f1f8 51e44ec 1784957 f88f1f8 1784957 4982eb6 1784957 f88f1f8 4982eb6 f88f1f8 4982eb6 f88f1f8 4982eb6 f88f1f8 4982eb6 1784957 f88f1f8 4982eb6 f88f1f8 1784957 f88f1f8 1784957 f88f1f8 4982eb6 f88f1f8 4982eb6 f88f1f8 4c560e4 4982eb6 1784957 f88f1f8 1784957 4c560e4 1784957 f88f1f8 1784957 f88f1f8 1784957 |
|
import random
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
import uvicorn
app = FastAPI()
parties = {}
users = {}
class User:
def __init__(self, username, websocket, status):
self.username = username
self.websocket = websocket
self.status = status
self.device_id = None
def generate_party_code():
return ''.join([str(random.randint(0, 9)) for _ in range(6)])
async def broadcast_message(party_id, sender, message: str):
if party_id in parties:
if message.startswith("[006]"):
formatted_message = f"[system]:{message}"
elif message.startswith("TMPN:") and sender.status == "owner":
formatted_message = message
else:
status = 'Owner' if sender.status == 'owner' else 'User'
formatted_message = f"[chat]:[{status}]:[{sender.username}]:[{sender.device_id}]:{message}"
for user_name in parties[party_id]['members']:
user = users[user_name]
if user != sender:
# if user.websocket:
await user.websocket.send_text(formatted_message)
@app.get("/")
async def root_data():
return {
"status" : True,
"message" : "Working"
}
@app.get("/newroom")
async def create_new_room(status: str, link: str, name: str, username: str, device_id: str, password: str=None):
party_code = generate_party_code()
parties[party_code] = {
'owner': username,
'members': [],
'password' : password,
'title' : name,
'link' : link,
'type' : 'Private'
}
users[username] = User(username.lower(), None, "owner")
users[username].device_id = device_id
return {
"status" : True,
"party" : party_code,
"message" : "Created success"
}
@app.websocket("/ws")
async def party_server(websocket: WebSocket):
await websocket.accept()
username = None
party_code = None
password = None
user_status = None
await websocket.send_text("[system]:[001]:ENTER DATA:")
data = await websocket.receive_text()
data = data.lower()
action = data.split("|")[0]
if not action.lower() in ["create", "join"]:
await websocket.send_text("[system]:[004]:Invalid action")
await websocket.close(1000, reason="Invalid action")
return
if action.lower() == "create":
if data.count("|") != 5:
await websocket.send_text("[system]:[004]:Invalid input data")
await websocket.close(1000, "Invalid input data")
return
else:
if data.count("|") != 3:
await websocket.send_text("[system]:[004]:Invalid input data")
await websocket.close(1000, "Invalid input data")
return
if data.lower() == "system":
await websocket.send_text("[system]:[004]:You can't set system as your name")
await websocket.close(1000, "You can't set system as your name")
return
username = data.split("|")[-2]
device_id = data.split("|")[-1]
if not username in users:
users[username] = User(username, websocket, "user")
if action == "create":
link = data.split("|")[2]
name = data.split("|")[3]
act = data.split("|")[1]
if "[" in act:
act = act.split("[")[0].strip().lower()
if act == "private":
password = data.split("|")[1].split("[")[1].split("]")[0]
party_code = generate_party_code()
party_code = str(party_code)
parties[party_code] = {
'owner': username,
'members': [username],
'password' : password,
'title' : name,
'link' : link,
'type' : 'Private'
}
else:
party_code = generate_party_code()
parties[party_code] = {
'owner': username,
'members': [username],
'password' : None,
'title' : name,
'link' : link,
'type' : 'Public'
}
users[username].status = "owner"
users[username].device_id = device_id
users[username].websocket = websocket
await websocket.send_text(f"[system]:[006]:Party created! Your party code is :{party_code}.")
await broadcast_message(party_code, users[username], f'[system]::{username} has created the party.')
elif action == "join":
party_code = data.split("|")[1]
party_code = str(party_code)
if "[" in party_code:
password = party_code.split("[")[1].split("]")[0] # Get and verify the password
party_code = party_code.split("[")[0].strip()
if party_code in parties:
if parties.get(party_code).get('password') != None:
# Its a private party
if parties.get(party_code).get('password') != password:
await websocket.send_text(f"[system]:[004]: Invalid password")
await websocket.close(1000, "Invalid password")
return
# if device_id == parties[party_code]['owner']
if device_id == users[parties[party_code]['owner']].device_id:
parties[party_code]['owner'] = username
users[username].status = "owner"
else:
users[username].status = "user"
parties[party_code]['members'].append(username)
users[username].device_id = device_id
users[username].websocket = websocket
await websocket.send_text(f"[system]:[006]:You have joined the party with code {party_code}.")
await websocket.send_text(f"[info]:[006]:{parties[party_code]['title']}|{parties[party_code]['link']}|{len(parties[party_code]['members'])}")
await broadcast_message(party_code, users[username], f'[006]:{username} has joined the party.')
print(f"Sending brodcast message to {party_code}, {username}")
else:
await websocket.send_text("[system]:[004]:Invalid party code.")
await websocket.close(code=1000, reason="Invalid party code")
try:
while True:
data = await websocket.receive_text()
if data.startswith("[006]:"):
await websocket.send_text("[system]:[004]:You can't send this message")
elif data.startswith("[004]:"):
await websocket.send_text("[system]:[004]:You can't send this message")
elif data.startswith("[001]:"):
await websocket.send_text("[system]:[004]:You can't send this message")
elif party_code:
await broadcast_message(party_code, users[username], data)
except WebSocketDisconnect:
if party_code and party_code in parties:
parties[party_code]['members'].remove(username)
if parties[party_code]['owner'] == username:
await broadcast_message(party_code, users[username], f'[system]:{username} (Owner) has left the party. All members are being kicked out.')
for member in list(parties[party_code]['members']):
member_user = users[member]
await member_user.websocket.send_text("[system]:[004]:The owner has left, and the party is deleted.")
parties[party_code]['members'].remove(member)
del parties[party_code]
else:
await broadcast_message(party_code, users[username], f'[system]:[004]:{username} has left the party.')
del users[username]
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=5000)
|