|
import socket |
|
import paramiko |
|
|
|
|
|
host_key = paramiko.RSAKey(filename='test_rsa.key') |
|
host, port = 'localhost', 2222 |
|
|
|
|
|
class SSHServer(paramiko.ServerInterface): |
|
def __init__(self): |
|
self.event = threading.Event() |
|
|
|
def check_channel_request(self, kind, chanid): |
|
if kind == 'session': |
|
return paramiko.OPEN_SUCCEEDED |
|
return paramiko.OPEN_FAILED_ADMINISTRATIVELY_PROHIBITED |
|
|
|
def check_auth_password(self, username, password): |
|
if username == 'test' and password == 'password': |
|
return paramiko.AUTH_SUCCESSFUL |
|
return paramiko.AUTH_FAILED |
|
|
|
|
|
def start_server(): |
|
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
|
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) |
|
server.bind((host, port)) |
|
server.listen(100) |
|
|
|
print(f'[*] Listening for connection on {host}:{port}...') |
|
|
|
while True: |
|
client, addr = server.accept() |
|
print(f'[*] Accepted connection from {addr[0]}:{addr[1]}') |
|
|
|
try: |
|
transport = paramiko.Transport(client) |
|
transport.add_server_key(host_key) |
|
server = SSHServer() |
|
|
|
transport.start_server(server=server) |
|
|
|
chan = transport.accept(20) |
|
if chan is None: |
|
print('*** No channel.') |
|
continue |
|
|
|
print(f'[*] Authenticated!') |
|
chan.send('Welcome to my SSH server!') |
|
|
|
while True: |
|
command = chan.recv(1024) |
|
if not command: |
|
break |
|
print(f'[*] Received command: {command.decode()}') |
|
chan.send(command) |
|
|
|
transport.close() |
|
|
|
except Exception as e: |
|
print(f'[-] Caught exception: {e.__class__.__name__}: {e}') |
|
try: |
|
transport.close() |
|
except: |
|
pass |
|
|
|
if __name__ == '__main__': |
|
start_server() |
|
|