import socket import paramiko # Define host and port host_key = paramiko.RSAKey(filename='test_rsa.key') host, port = 'localhost', 2222 # Create SSH server class class SSHServer(paramiko.ServerInterface): def __init__(self): self.event = threading.Event() def check_channel_request(self, kind, chanid): if kind == 'session': return paramiko.OPEN_SUCCEEDED return paramiko.OPEN_FAILED_ADMINISTRATIVELY_PROHIBITED def check_auth_password(self, username, password): if username == 'test' and password == 'password': return paramiko.AUTH_SUCCESSFUL return paramiko.AUTH_FAILED # Main function to start the server def start_server(): server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) server.bind((host, port)) server.listen(100) print(f'[*] Listening for connection on {host}:{port}...') while True: client, addr = server.accept() print(f'[*] Accepted connection from {addr[0]}:{addr[1]}') try: transport = paramiko.Transport(client) transport.add_server_key(host_key) server = SSHServer() transport.start_server(server=server) chan = transport.accept(20) if chan is None: print('*** No channel.') continue print(f'[*] Authenticated!') chan.send('Welcome to my SSH server!') while True: command = chan.recv(1024) if not command: break print(f'[*] Received command: {command.decode()}') chan.send(command) transport.close() except Exception as e: print(f'[-] Caught exception: {e.__class__.__name__}: {e}') try: transport.close() except: pass if __name__ == '__main__': start_server()