File size: 7,838 Bytes
1784957 f88f1f8 1784957 4c560e4 1784957 4c560e4 1784957 4c560e4 1784957 4982eb6 1784957 4982eb6 1784957 f88f1f8 1784957 f88f1f8 1784957 a05f91b 4982eb6 f88f1f8 4982eb6 f88f1f8 4982eb6 f88f1f8 51e44ec f88f1f8 51e44ec 1784957 f88f1f8 1784957 4982eb6 1784957 f88f1f8 4982eb6 f88f1f8 4982eb6 f88f1f8 4982eb6 f88f1f8 4982eb6 1784957 f88f1f8 4982eb6 f88f1f8 1784957 f88f1f8 1784957 f88f1f8 4982eb6 f88f1f8 4982eb6 f88f1f8 4c560e4 4982eb6 1784957 f88f1f8 1784957 4c560e4 1784957 f88f1f8 1784957 f88f1f8 1784957 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 |
import random
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
import uvicorn
app = FastAPI()
parties = {}
users = {}
class User:
def __init__(self, username, websocket, status):
self.username = username
self.websocket = websocket
self.status = status
self.device_id = None
def generate_party_code():
return ''.join([str(random.randint(0, 9)) for _ in range(6)])
async def broadcast_message(party_id, sender, message: str):
if party_id in parties:
if message.startswith("[006]"):
formatted_message = f"[system]:{message}"
elif message.startswith("TMPN:") and sender.status == "owner":
formatted_message = message
else:
status = 'Owner' if sender.status == 'owner' else 'User'
formatted_message = f"[chat]:[{status}]:[{sender.username}]:[{sender.device_id}]:{message}"
for user_name in parties[party_id]['members']:
user = users[user_name]
if user != sender:
# if user.websocket:
await user.websocket.send_text(formatted_message)
@app.get("/")
async def root_data():
return {
"status" : True,
"message" : "Working"
}
@app.get("/newroom")
async def create_new_room(status: str, link: str, name: str, username: str, device_id: str, password: str=None):
party_code = generate_party_code()
parties[party_code] = {
'owner': username,
'members': [],
'password' : password,
'title' : name,
'link' : link,
'type' : 'Private'
}
users[username] = User(username.lower(), None, "owner")
users[username].device_id = device_id
return {
"status" : True,
"party" : party_code,
"message" : "Created success"
}
@app.websocket("/ws")
async def party_server(websocket: WebSocket):
await websocket.accept()
username = None
party_code = None
password = None
user_status = None
await websocket.send_text("[system]:[001]:ENTER DATA:")
data = await websocket.receive_text()
data = data.lower()
action = data.split("|")[0]
if not action.lower() in ["create", "join"]:
await websocket.send_text("[system]:[004]:Invalid action")
await websocket.close(1000, reason="Invalid action")
return
if action.lower() == "create":
if data.count("|") != 5:
await websocket.send_text("[system]:[004]:Invalid input data")
await websocket.close(1000, "Invalid input data")
return
else:
if data.count("|") != 3:
await websocket.send_text("[system]:[004]:Invalid input data")
await websocket.close(1000, "Invalid input data")
return
if data.lower() == "system":
await websocket.send_text("[system]:[004]:You can't set system as your name")
await websocket.close(1000, "You can't set system as your name")
return
username = data.split("|")[-2]
device_id = data.split("|")[-1]
if not username in users:
users[username] = User(username, websocket, "user")
if action == "create":
link = data.split("|")[2]
name = data.split("|")[3]
act = data.split("|")[1]
if "[" in act:
act = act.split("[")[0].strip().lower()
if act == "private":
password = data.split("|")[1].split("[")[1].split("]")[0]
party_code = generate_party_code()
party_code = str(party_code)
parties[party_code] = {
'owner': username,
'members': [username],
'password' : password,
'title' : name,
'link' : link,
'type' : 'Private'
}
else:
party_code = generate_party_code()
parties[party_code] = {
'owner': username,
'members': [username],
'password' : None,
'title' : name,
'link' : link,
'type' : 'Public'
}
users[username].status = "owner"
users[username].device_id = device_id
users[username].websocket = websocket
await websocket.send_text(f"[system]:[006]:Party created! Your party code is :{party_code}.")
await broadcast_message(party_code, users[username], f'[system]::{username} has created the party.')
elif action == "join":
party_code = data.split("|")[1]
party_code = str(party_code)
if "[" in party_code:
password = party_code.split("[")[1].split("]")[0] # Get and verify the password
party_code = party_code.split("[")[0].strip()
if party_code in parties:
if parties.get(party_code).get('password') != None:
# Its a private party
if parties.get(party_code).get('password') != password:
await websocket.send_text(f"[system]:[004]: Invalid password")
await websocket.close(1000, "Invalid password")
return
# if device_id == parties[party_code]['owner']
if device_id == users[parties[party_code]['owner']].device_id:
parties[party_code]['owner'] = username
users[username].status = "owner"
else:
users[username].status = "user"
parties[party_code]['members'].append(username)
users[username].device_id = device_id
users[username].websocket = websocket
await websocket.send_text(f"[system]:[006]:You have joined the party with code {party_code}.")
await websocket.send_text(f"[info]:[006]:{parties[party_code]['title']}|{parties[party_code]['link']}|{len(parties[party_code]['members'])}")
await broadcast_message(party_code, users[username], f'[006]:{username} has joined the party.')
print(f"Sending brodcast message to {party_code}, {username}")
else:
await websocket.send_text("[system]:[004]:Invalid party code.")
await websocket.close(code=1000, reason="Invalid party code")
try:
while True:
data = await websocket.receive_text()
if data.startswith("[006]:"):
await websocket.send_text("[system]:[004]:You can't send this message")
elif data.startswith("[004]:"):
await websocket.send_text("[system]:[004]:You can't send this message")
elif data.startswith("[001]:"):
await websocket.send_text("[system]:[004]:You can't send this message")
elif party_code:
await broadcast_message(party_code, users[username], data)
except WebSocketDisconnect:
if party_code and party_code in parties:
parties[party_code]['members'].remove(username)
if parties[party_code]['owner'] == username:
await broadcast_message(party_code, users[username], f'[system]:{username} (Owner) has left the party. All members are being kicked out.')
for member in list(parties[party_code]['members']):
member_user = users[member]
await member_user.websocket.send_text("[system]:[004]:The owner has left, and the party is deleted.")
parties[party_code]['members'].remove(member)
del parties[party_code]
else:
await broadcast_message(party_code, users[username], f'[system]:[004]:{username} has left the party.')
del users[username]
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=5000)
|