前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >随笔记录之自定义 SSH 服务

随笔记录之自定义 SSH 服务

原创
作者头像
bowenerchen
修改2023-08-27 11:28:20
1.1K1
修改2023-08-27 11:28:20
举报
文章被收录于专栏:梵高先生梵高先生

前言

SSH(Secure Shell)协议是一种加密的网络传输协议,使得在不安全的网络环境中可以安全的执行远程登录、远程命令执行和数据传输等操作。

SSH 协议的组成可以大致分成三个模块:

  • 传输层协议(Transport Layer Protocol):传输层协议负责在客户端和服务器之间建立安全的连接。它提供了一种可靠的数据流,用于在不安全的网络环境中传输加密的数据。传输层协议还负责进行密钥交换、协商加密算法和数据完整性检查等操作。
  • 用户认证协议(User Authentication Protocol):用户认证协议用于验证客户端用户的身份。在 SSH 连接建立之后,客户端需要提供有效的认证凭据(如用户名和密码、公钥等),以证明其具有访问服务器的权限。SSH 支持多种认证方法,包括基于密码的认证、基于公钥的认证和基于其他安全令牌的认证等。
  • 连接协议(Connection Protocol):连接协议负责在已建立的安全连接上提供多种服务,如交互式 Shell、远程命令执行、端口转发和文件传输等。连接协议允许客户端在同一个 SSH 连接上同时运行多个会话,每个会话可以使用不同的服务。

传输层协议比较偏向于基础设施类协议,本文中我们所说的定制集中在用户认证协议和连接层协议的定制。

本文中的所有代码基于 Python3 + asyncssh 库进行实现。

关于 asyncssh 库

在基于 asyncssh 库实现 ssh 服务时,应用程序需要继承并实现 SSHServer 类,

常见的需要重载的方法如:

connection_made

begin_auth

connection_lost

xxxx_supported

validate_xxxx

SSHServer 类主要控制客户端与SSH服务之间的认证行为。

在认证完成后,客户端与服务之间的连接行为,需要通过 SSHServerSession 类来完成,业务需要继承这个类并重载相关方法如:

connection_made

pty_requested

shell_requested

exec_requested

subsystem_requested

session_started

data_received

还有一点需要注意的是,在阅读 asyncssh 的文档以及源码时,发现有部分错误,详情可以参考:关于 SSHServerSession 的 Issue

用户认证协议的定制

本文主要侧重于三种比较常见的认证方式:

  • 用户名密码认证
  • 公钥认证
  • 交互式认证

我们在使用 ssh 命令行时,可以指定认证方式的优先级:

代码语言:shell
复制
命令行工具使用 -o 参数指定认证方式的优先级
none -- 不需要认证
keyboard-interactive -- 键盘交互式认证
publickey -- 公钥认证
password  -- 密码认证
例如:
ssh -o PreferredAuthentications=none,keyboard-interactive, publickey, password user@example.com

用户名密码认证

用户名密码认证主要有两方面的功能:

  • 校验当前用户名与密码是否正确
  • 校验当前用户名的密码是否已经过期,如果已经过期则需要改密
代码语言:python
复制
import asyncio
import json
import sys
import time
from typing import Any, Dict, Optional

import asyncssh
from asyncssh.misc import MaybeAwait, PasswordChangeRequired, PermissionDenied

class demo_ssh_server(asyncssh.SSHServer):
    
    def __init__(self):
        self.AUTH_MAX_RETRIES: int = 2
        self.CHANGE_MAX_RETRIES: int = 2
        self.password_retried: int = 0
        self.update_password_retried: int = 0
        self.connected_time = ''
        self.auth_method = 'none'
        self.auth_username = ''
        self.auth_password = ''
        self.auth_new_password = ''
        self.auth_success: bool = False
    
    def connection_made(self, conn: asyncssh.SSHServerConnection) -> None:
        self.connected_time = time.strftime('%Y-%m-%d %H:%M:%S %p %A %B')
    
    def session_requested(self):
        return demo_ssh_session(self)
    
    def connection_lost(self, exc: Optional[Exception]) -> None:
        if exc:
            print('SSH connection error: ' + str(exc), file = sys.stderr)
        else:
            print('SSH connection closed.')
    
    def begin_auth(self, username: str) -> bool:
        # If the user's password is the empty string, no auth is required
        print('begin_auth, username:{}'.format(username))
        return True  # 需要认证
    
    def password_auth_supported(self) -> bool:
        # 开启密码认证
        return True
    
    def validate_password(self, username: str, password: str) -> MaybeAwait[bool]:
        # 标记当前的认证方式为 password
        self.auth_method = 'password'
        self.auth_username = username
        self.auth_password = password
        
        print('username:{}, password:{}, auth password retried:{}, MAX LIMIT:{}'.format(
            username,
            password,
            self.update_password_retried,
            self.AUTH_MAX_RETRIES))
        
        # 如果密码重试次数已经超过了最大限度,则返回失败
        if self.password_retried > self.AUTH_MAX_RETRIES or self.update_password_retried > self.CHANGE_MAX_RETRIES:
            # 告知客户端当前密码登录验证失败,并将失败信息通过 reason 字段传递给客户端
            raise PermissionDenied(reason = 'demo ssh server, password auth failed, user:[{}]'.format(username))
        
        # 用户名不为 test 和 change 时,认为校验失败
        if username != 'test' and username != 'change':
            self.password_retried += 1
            return False
        
        # 校验 username 和 password 是否匹配,如果匹配则返回 True
        # 当用户名为 test 时,密码必须为 test
        if username == 'test' and password != 'test':
            self.password_retried += 1
            return False
        
        # 当用户名为 change 时,要求客户端更改密码
        if username == 'change' and len(password) > 0:
            self.password_retried += 1
            raise PasswordChangeRequired(prompt = 'change your password for username:[{}]'.format(username))
        self.auth_success = True
        return True
    
    def change_password(self, username: str, old_password: str,
                        new_password: str) -> MaybeAwait[bool]:
        
        print('update password retried:{}, MAX LIMIT:{}'.format(self.update_password_retried, self.CHANGE_MAX_RETRIES))
        
        # 如果重置密码的次数超过上限则返回 False
        if self.update_password_retried > self.CHANGE_MAX_RETRIES:
            # 告知客户端当前密码登录验证失败,并将失败信息通过 reason 字段传递给客户端
            raise PermissionDenied(reason = 'demo ssh server, update password failed, user:[{}]'.format(username))
        
        # 如果老的密码和新的密码一致 或 新密码为空,则要求再次改变密码
        if old_password == new_password or len(new_password) <= 0:
            self.update_password_retried += 1
            raise PasswordChangeRequired('retry change password for [{}]'.format(username))
        self.auth_new_password = new_password
        # 密码认证且更新了密码
        self.auth_method = 'password and updated'
        self.auth_success = True
        return True
    
    def auth_completed(self) -> None:
        print(
            'demo ssh server, auth_method:{}, '
            'auth_username:{}, auth_password:{}, '
            'auth_pwd_retried:{}, changed_pwd_retried:{}, '
            'new_password:{}, auth_result:{}'.format(
                self.auth_method,
                self.auth_username,
                self.auth_password,
                self.password_retried,
                self.update_password_retried,
                self.auth_new_password,
                self.auth_success))
        pass


class demo_ssh_session(asyncssh.SSHServerSession):
    def __init__(self, server: demo_ssh_server):
        self.server: demo_ssh_server = server
    
    def connection_made(self, channel):
        super().connection_made(channel)
        ret: Dict[str, Any] = {
            'auth_method': self.server.auth_method,
            'auth_username': self.server.auth_username,
            'auth_password': self.server.auth_password,
            'password_retried': self.server.password_retried,
            'update_password_retried': self.server.update_password_retried,
            'auth_new_password': self.server.auth_new_password,
            'auth_succeed': self.server.auth_success,
        }
        data = json.dumps(ret, ensure_ascii = False, sort_keys = True)
        # 当密码校验成功后,客户端与服务端建立了 session,此时回写登录信息给客户端
        channel.write("{}\n".format(data))
        channel.exit(0)


async def start_server() -> None:
    """
    create_server 是一个封装了 listen 方法的协程,
    它提供与旧版AsyncSSH的向后兼容性。
    唯一的区别是,server_factory参数在此调用中是一个位置参数,
    而不是关键字参数或通过SSHServerConnectionOptions对象指定,
    这与asyncio.AbstractEventLoop.create_server相似。
    :return: None
    """
    await asyncssh.create_server(
        server_factory = demo_ssh_server,
        host = '127.0.0.1',
        port = 18822,
        server_version = 'DEMO-TEST-AsyncSSH_2.13.1',  # 自定义服务器 SSH 协议版本名称
        server_host_keys = ['/Users/cyx/.ssh/devcloud_ssh/devcloud_id_rsa'],
    )
    await asyncio.Event().wait()  # 添加一个永远不会触发的事件,使服务器保持运行状态

if __name__ == '__main__':
    try:
        print(asyncssh.__version__)
        asyncio.run(start_server())
    except (OSError, asyncssh.Error) as exc:
        sys.exit('Error starting server: ' + str(exc))

当使用错误的用户名登陆时:

当使用正确的用户名,先输错密码,最后输入正确密码时:

使用需要改密的用户名时:

交互式认证

交互式认证的流程为,先向用户给出提示,获得客户端的输入,最后根据用户的输入判断认证是否成功。这里可以结合双因子认证来做,比如结合短信验证码或小程序 OTP 码等等。

代码语言:python
复制
"""
ssh -o PreferredAuthentications=keyboard-interactive user@example.com
"""
import asyncio
import sys
from typing import Optional

import asyncssh
from asyncssh.auth import KbdIntChallenge, KbdIntResponse
from asyncssh.misc import MaybeAwait


class demo_ssh_server(asyncssh.SSHServer):
    def __init__(self):
        self.first_auth = False
        self.second_auth = False
        pass
    
    def connection_lost(self, exc: Optional[Exception]) -> None:
        if exc:
            print('SSH connection error: ' + str(exc), file = sys.stderr)
        else:
            print('SSH connection closed.')
    
    def begin_auth(self, username: str) -> MaybeAwait[bool]:
        """
        :param username: 认证的用户名
        :return: 不需要认证则返回 False,需要认证返回 True
        """
        return True
    
    def kbdint_auth_supported(self) -> bool:
        """
        :return: 是否支持键盘交互式认证
        """
        return True
    
    def get_kbdint_challenge(self, username: str, lang: str,
                             submethods: str) -> MaybeAwait[KbdIntChallenge]:
        """
        :param username: 登录的用户名 username@xxxx
        :param lang:
        :param submethods:
        :return: 这个方法应该返回 True,如果认证应该在没有任何挑战的情况下成功;
                 返回 False,如果认证应该在没有任何挑战的情况下失败;
                 返回一个认证挑战,包括
                 ( 挑战名称, 说明, 语言标签, [ ( 提示字符串, 布尔值 ) ] ),
                 当为该提示输入值时,布尔值表示是否应该回显输入。
        """
        self.first_auth = True
        print('username:{}, lang:{}, submethods:{}'.format(username, lang, submethods))
        # 挑战名称、说明、语言标签、[(提示符字符串、是否回显)]
        return (' ChallengeName', 'ChallengeDesc', 'Tag',
                [('Input something with echo:', True),
                 ('Input something without echo:', False)])
    
    def validate_kbdint_response(self, username: str, responses: KbdIntResponse) -> MaybeAwait[KbdIntChallenge]:
        if self.first_auth and not self.second_auth:
            print('username:{}, responses:{}'.format(username, responses))
            self.second_auth = True
            return ('Second ChallengeName', 'Second ChallengeDesc', 'Second Tag',
                    [('Input something AGAIN with echo:', True),
                     ('Input something AGAIN without echo:', False)])
        if self.first_auth and self.second_auth:
            print('AGAIN username:{}, responses:{}'.format(username, responses))
            return True
        return False


async def start_server() -> None:
    """
    create_server 是一个封装了 listen 方法的协程,
    它提供与旧版AsyncSSH的向后兼容性。
    唯一的区别是,server_factory参数在此调用中是一个位置参数,
    而不是关键字参数或通过SSHServerConnectionOptions对象指定,
    这与asyncio.AbstractEventLoop.create_server相似。
    :return: None
    """
    await asyncssh.create_server(
        server_factory = demo_ssh_server,
        host = '127.0.0.1',
        port = 18822,
        server_version = 'DEMO-TEST-AsyncSSH_2.13.1',  # 自定义服务器 SSH 协议版本名称
        server_host_keys = ['/Users/cyx/.ssh/devcloud_ssh/devcloud_id_rsa'],
    )
    await asyncio.Event().wait()  # 添加一个永远不会触发的事件,使服务器保持运行状态


if __name__ == '__main__':
    try:
        print(asyncssh.__version__)
        asyncio.run(start_server())
    except (OSError, asyncssh.Error) as exc:
        sys.exit('Error starting server: ' + str(exc))

公钥认证方式

公钥认证方式指的是,客户端提前将公钥种到了服务器中(.ssh/authorized_keys),然后客户端在 ssh 登录时,会直接和 ssh 服务器验证公钥的合法性。

当然,ssh 服务器也可以通过解析 ssh 客户端发过来的公钥数据,进行实时的放行或拦截。

基于authorized_keys文件进行的 ssh 公钥认证

代码语言:python
复制
import asyncio
import sys
from typing import Optional

import asyncssh


class demo_ssh_server(asyncssh.SSHServer):
    
    def connection_lost(self, exc: Optional[Exception]) -> None:
        if exc:
            print('SSH connection error: ' + str(exc), file = sys.stderr)
        else:
            print('SSH connection closed.')
    
    def public_key_auth_supported(self) -> bool:
        return True


async def start_server() -> None:
    """
    create_server 是一个封装了 listen 方法的协程,
    它提供与旧版AsyncSSH的向后兼容性。
    唯一的区别是,server_factory参数在此调用中是一个位置参数,
    而不是关键字参数或通过SSHServerConnectionOptions对象指定,
    这与asyncio.AbstractEventLoop.create_server相似。
    :return: None
    
    服务器从名为 authorized_keys 的文件中读取允许的客户端公钥。这个文件应该包含一行或多行 OpenSSH 格式的公钥
    
    当执行:
        ssh  -i ~/.ssh/id_rsa -o PreferredAuthentications=publickey test@127.0.0.1 -p 18822
    会得到:
        channel 0: open failed: connect failed: Session refused
    
    当执行:
        ssh  -i ~/.ssh/devcloud_ssh/devcloud_id_rsa -o PreferredAuthentications=publickey test@127.0.0.1 -p 18822
    会得到:
        test@127.0.0.1: Permission denied (publickey).
    """
    await asyncssh.create_server(
        server_factory = demo_ssh_server,
        host = '127.0.0.1',
        port = 18822,
        server_version = 'DEMO-TEST-AsyncSSH_2.13.1',  # 自定义服务器 SSH 协议版本名称
        server_host_keys = ['/Users/cyx/.ssh/devcloud_ssh/devcloud_id_rsa'],
        authorized_client_keys = '/Users/cyx/.ssh/authorized_keys'
    )
    await asyncio.Event().wait()  # 添加一个永远不会触发的事件,使服务器保持运行状态


if __name__ == '__main__':
    try:
        print(asyncssh.__version__)
        asyncio.run(start_server())
    except (OSError, asyncssh.Error) as exc:
        sys.exit('Error starting server: ' + str(exc))

此时服务器中的公钥文件为:

客户端分别使用合法的公钥以及非法的公钥访问时,会得到:

ssh服务器自动解析客户端发送来的公钥

代码语言:python
复制
"""
SSH 指定私钥文件路径:
    ssh  -i ~/.ssh/id_rsa -o PreferredAuthentications=publickey test@127.0.0.1 -p 18822
在发起 SSH 连接时,会由这个私钥派生出对应的公钥,并使用派生出的公钥进行用户认证
"""
import asyncio
import hashlib
import sys
from typing import List, Optional

import asyncssh
from asyncssh import SSHKey
from asyncssh.misc import MaybeAwait
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization

from openssh_key_parse import protocol_binary_to_authorized_key_rsa


class demo_ssh_server(asyncssh.SSHServer):
    def __init__(self):
        self.auth_client_pub_keys: List[bytes] = []
        pass
    
    def begin_auth(self, username: str) -> MaybeAwait[bool]:
        """
         预先加载好客户认证需要的密钥对
         以 "-----BEGIN OPENSSH PRIVATE KEY-----" 开头的密钥实际上是 OpenSSH 格式的私钥,而不是传统的 PEM 格式。
         OpenSSH 格式是 OpenSSH 6.5 及更高版本中引入的新格式。
         尽管它们看起来很像 PEM 格式,但它们之间有一些差异。
         
         PEM 格式的密钥文件是以文本形式存储的,可以直接用文本编辑器打开。它们以 "-----BEGIN" 开头,
         然后是密钥类型(如 RSA PRIVATE KEY、RSA PUBLIC KEY、PUBLIC KEY 等),接着是 "-----"。
         文件结尾处以 "-----END" 开头,后面跟着与开始相同的密钥类型和 "-----"
         
        :param username: 登录的用户名
        :return: True 表示需要进行认证,False 标识不需要认证
        """
        local_key_path = ['/Users/cyx/.ssh/devcloud_ssh/devcloud_id_rsa',
                          '/Users/cyx/.ssh/id_rsa']
        private_key = None
        for p in local_key_path:
            with open(p, 'rb') as key_file:
                key_file_data = key_file.read()
                if key_file_data.startswith('-----BEGIN OPENSSH PRIVATE KEY-----'.encode(encoding = 'utf-8')):
                    private_key = serialization.load_ssh_private_key(
                        key_file_data,
                        password = None,  # 如果私钥文件有密码,请在此处提供
                        backend = default_backend())
                
                if key_file_data.startswith('-----BEGIN RSA PRIVATE KEY-----'.encode(encoding = 'utf-8')):
                    private_key = serialization.load_pem_private_key(
                        key_file_data,
                        password = None,
                        backend = default_backend()
                    )
                """
                将 encoding 参数设置为 serialization.Encoding.PEM,表示希望以 PEM 编码输出公钥。
                将 format 参数设置为 serialization.PublicFormat.SubjectPublicKeyInfo,表示希望输出公钥的 SubjectPublicKeyInfo 格式
                这是 PEM 格式公钥的标准格式
                
                将 encoding 参数设置为 serialization.Encoding.DER,表示希望以 DER 编码输出公钥。
                将 format 参数设置为 serialization.PublicFormat.SubjectPublicKeyInfo,表示希望输出公钥的 SubjectPublicKeyInfo 格式。
                这是 DER 格式公钥的标准格式。
                
                SubjectPublicKeyInfo works only with PEM or DER encoding
                
                 RSA 公钥转换为 SSH RSA 公钥格式的二进制数据,需要将 Encoding 和 PublicFormat 填充为:OpenSSH
                """
                pub_key_data = private_key.public_key().public_bytes(encoding = serialization.Encoding.OpenSSH,
                                                                     format = serialization.PublicFormat.OpenSSH)
                # print(p, pub_key_data)
                self.auth_client_pub_keys.append(pub_key_data)
        return True
    
    def connection_lost(self, exc: Optional[Exception]) -> None:
        if exc:
            print('SSH connection error: ' + str(exc), file = sys.stderr)
        else:
            print('SSH connection closed.')
    
    def password_auth_supported(self) -> bool:
        return False
    
    def kbdint_auth_supported(self) -> bool:
        return False
    
    def public_key_auth_supported(self) -> bool:
        return True
    
    def validate_public_key(self, username: str, key: SSHKey) -> MaybeAwait[bool]:
        print('try login, username:{}, pub key:{}'.format(username, hashlib.md5(key.public_data[:-16]).hexdigest()))
        for i in self.auth_client_pub_keys:
            if i == protocol_binary_to_authorized_key_rsa(key.public_data):
                print(username, '==> pub key matched, login success!!!')
                return True
        print(username, '--> pub key mis-matched')
        return False


async def start_server() -> None:
    """
    create_server 是一个封装了 listen 方法的协程,
    它提供与旧版AsyncSSH的向后兼容性。
    唯一的区别是,server_factory参数在此调用中是一个位置参数,
    而不是关键字参数或通过SSHServerConnectionOptions对象指定,
    这与asyncio.AbstractEventLoop.create_server相似。
    :return: None
    """
    await asyncssh.create_server(
        server_factory = demo_ssh_server,
        host = '127.0.0.1',
        port = 18822,
        server_version = 'DEMO-TEST-AsyncSSH_2.13.1',  # 自定义服务器 SSH 协议版本名称
        server_host_keys = ['/Users/cyx/.ssh/devcloud_ssh/devcloud_id_rsa'],
    )
    await asyncio.Event().wait()  # 添加一个永远不会触发的事件,使服务器保持运行状态


if __name__ == '__main__':
    try:
        print(asyncssh.__version__)
        asyncio.run(start_server())
    except (OSError, asyncssh.Error) as exc:
        sys.exit('Error starting server: ' + str(exc))

其中,ssh 服务器在接收到客户端发送过来的公钥时,需要对公钥数据做一些格式的转化。

代码语言:python
复制
def protocol_binary_to_authorized_key_rsa(data: bytes) -> bytes:
    # 使用 Paramiko 将二进制数据转换为公钥对象
    paramiko_key = paramiko.RSAKey(data = data)
    # 将公钥对象转换为 OpenSSH 格式的文本公钥
    public_key_openssh = paramiko_key.get_base64()
    # 拼接密钥类型(如 "ssh-rsa")和 Base64 编码的公钥数据
    openssh_public_key = b"ssh-rsa " + public_key_openssh.encode("utf-8")
    # print(openssh_public_key)
    return openssh_public_key


def authorized_key_to_protocol_binary(data: bytes) -> bytes:
    # 从 OpenSSH 格式文本公钥中提取 Base64 编码的部分
    key_base64 = data.decode().split(" ", 2)
    key_data = base64.b64decode(key_base64[1])
    # 使用 Paramiko 将二进制数据转换为公钥对象
    paramiko_key = paramiko.RSAKey(data = key_data)
    # 获取二进制 SSH RSA 公钥数据
    binary_ssh_rsa_public_key = paramiko_key.asbytes()
    # print(binary_ssh_rsa_public_key)
    return binary_ssh_rsa_public_key

这里的转化效果可以通过以下的例子来测试:

代码语言:python
复制
# 二进制格式的 SSH RSA 公钥,其中包含了公钥的类型、长度和实际数据。这种格式在 SSH 协议中用于传输公钥。
ssh_protocol_binary_rsa = b'\x00\x00\x00\x07ssh-rsa\x00\x00\x00\x03\x01\x00\x01\x00\x00\x01\x01\x00\xb4{+(\xf4\xb0\xa6\xc7\xbb\xac\xe8^u\xc2BR\xe4\x89,\\t\xf7t\x87\x91\x1a\xbf#$\x87\xd3MjA+\xb1Z\xee^\xa3\x0f\xcc\t\x05\xb8\xddt\x1ayt\xcdk\x8eZ\xaa\x1a9\x9fzZ\xa2\x05\xe1\x84\t\xbc=\x9d"\x98\x87\xd5\x80C\xd7\x88\x183\x90:\x85\x0e\xafc*\xabI\xb1\xc8\xf0\x9c~\xbe\xc2\xd8\xb1a\x11\xe5\xbeh\x9e\xa8\x9cU\xf9\xbd\x88\x86\x14\xbd\x97\xf9\xb0\xf7n\xb5\xd2Z>#nGs\x8f\x0bL\xc7J\xcd\x1b\xcft\xeaJ\xd9\x98\xcc\x8969\x16\x7f\x10\xbf\x06\x07\x9b\xb6jbUB\x87j8P\x1cc\x11*I\xb8~\x95\xc7\xa1\x0c\xc7\x1fM\x18*[\xabv\x91\txB\r\x1a9\xe8G\x10\'A\xf7\x0c"\xd8g\xe06\xe4\xf6WFvf\xe5\x8ayz8\x06\xf6q\xa5`\x00O\x86\\\x18\xd5v$\xf8\xacz\xfc^S\xa9V8b\xa7\xb8\xa4{\xbd\xf2\xd0I77\xde\x81\x98\xa9\xd6NB\xf9\x13J\x1b\xe5\x08\xfb\xcf\xdb\x19'

# OpenSSH 格式的文本数据,以 "ssh-rsa" 开头,后面跟着 Base64 编码的 RSA 公钥。这种格式通常用于 authorized_keys 文件。
ssh_authorized_keys_rsa = b'ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQC0eyso9LCmx7us6F51wkJS5IksXHT3dIeRGr8jJIfTTWpBK7Fa7l6jD8wJBbjddBp5dM1rjlqqGjmfelqiBeGECbw9nSKYh9WAQ9eIGDOQOoUOr2Mqq0mxyPCcfr7C2LFhEeW+aJ6onFX5vYiGFL2X+bD3brXSWj4jbkdzjwtMx0rNG8906krZmMyJNjkWfxC/BgebtmpiVUKHajhQHGMRKkm4fpXHoQzHH00YKlurdpEJeEINGjnoRxAnQfcMIthn4Dbk9ldGdmblinl6OAb2caVgAE+GXBjVdiT4rHr8XlOpVjhip7ike73y0Ek3N96BmKnWTkL5E0ob5Qj7z9sZ'

if __name__ == '__main__':
    assert (ssh_authorized_keys_rsa == protocol_binary_to_authorized_key_rsa(ssh_protocol_binary_rsa))
    assert (ssh_protocol_binary_rsa == authorized_key_to_protocol_binary(ssh_authorized_keys_rsa))

如果程序可以正常执行完成,则说明数据格式转化成功。

用户连接协议的定制化

在使用 ssh 的过程中,最常见的场景是,用户通过 ssh 客户端登录进 ssh 服务器后,打开一个 shell 交互的连接,然后用户进行 shell 命令的操作,直到用户或者服务器断开连接。

在用户连接层,其实我们可以自定义 ssh 认证完成后的逻辑,比如服务端给客户端返回一个字符 UI 界面,让用户基于字符 UI 界面进行操作。

完成这种动作,需要在 SSH 服务端,将字符 UI 界面的输入流和输出流与 ssh 客户端session 的输入流和输出流进行对应的绑定。

本文基于 asyncssh 和 prompt_toolkit 进行这种效果的 demo 实现。

源码如下:

代码语言:python
复制
import asyncio
import platform
import sys
import time
from typing import Dict, List, Optional

import asyncssh
from asyncssh.misc import MaybeAwait
from prompt_toolkit.completion import WordCompleter
from prompt_toolkit.contrib.ssh import PromptToolkitSSHServer, PromptToolkitSSHSession
from prompt_toolkit.lexers import PygmentsLexer
from prompt_toolkit.renderer import print_formatted_text
from prompt_toolkit.shortcuts import print_formatted_text, ProgressBar
from prompt_toolkit.shortcuts.dialogs import input_dialog, yes_no_dialog
from prompt_toolkit.shortcuts.prompt import PromptSession
from pygments.lexers.html import HtmlLexer


def sys_info() -> Dict[str, str]:
    return {
        'PythonVersion': sys.version,
        'ApiVersion': sys.api_version,
        'OSPlatform': sys.platform,
        'OSProcessor': platform.platform(),
        'BytesEndian': sys.byteorder
    }


class ssh_funny_tool(PromptToolkitSSHServer):
    def __init__(self, interact):
        super().__init__(interact)
        self.server_info = sys_info()
        self.connected_time: str = ''
    
    def connection_made(self, conn: asyncssh.SSHServerConnection) -> None:
        self.connected_time = time.strftime('%Y-%m-%d %H:%M:%S %p %A %B')
    
    def connection_lost(self, exc: Optional[Exception]) -> None:
        if exc:
            print('SSH connection error: ' + str(exc), file = sys.stderr)
        else:
            print('SSH connection closed.')
    
    def begin_auth(self, username: str) -> MaybeAwait[bool]:
        # 不需要认证
        print(username, self.connected_time)
        return False

animal_completer = WordCompleter(
    [
        "alligator",
        "ant",
        "ape",
        "bat",
        "bear",
        "beaver",
        "bee",
        "bison",
        "butterfly",
        "cat",
        "chicken",
        "crocodile",
        "dinosaur",
        "dog",
        "dolphin",
        "dove",
        "duck",
        "eagle",
        "elephant",
        "fish",
        "goat",
        "gorilla",
        "kangaroo",
        "leopard",
        "lion",
        "mouse",
        "rabbit",
        "rat",
        "snake",
        "spider",
        "turkey",
        "turtle",
    ],
    ignore_case = True,
)


async def interact(ssh_session: PromptToolkitSSHSession) -> None:
    """
    The application interaction.

    This will run automatically in a prompt_toolkit AppSession, which means
    that any prompt_toolkit application (dialogs, prompts, etc...) will use the
    SSH channel for input and output.
    """
    prompt_session = PromptSession()
    
    # Alias 'print_formatted_text', so that 'print' calls go to the SSH client.
    print = print_formatted_text
    
    print("We will be running a few prompt_toolkit applications through this ")
    print("SSH connection.\n")
    
    # Simple progress bar.
    with ProgressBar() as pb:
        for i in pb(range(50)):
            await asyncio.sleep(0.1)
    
    # Normal prompt.
    text = await prompt_session.prompt_async("(normal prompt) Type something: ")
    print("You typed", text)
    
    # Prompt with auto completion.
    text = await prompt_session.prompt_async(
        "(autocompletion) Type an animal: ", completer = animal_completer
    )
    print("You typed", text)
    
    # prompt with syntax highlighting.
    text = await prompt_session.prompt_async(
        "(HTML syntax highlighting) Type something: ", lexer = PygmentsLexer(HtmlLexer)
    )
    print("You typed", text)
    
    # Show yes/no dialog.
    await prompt_session.prompt_async("Showing yes/no dialog... [ENTER]")
    user_choice = await yes_no_dialog("Yes/no dialog", "Running over asyncssh").run_async()
    print('user choice:{}'.format(user_choice))
    
    # Show input dialog
    await prompt_session.prompt_async("Showing input dialog... [ENTER]")
    user_input = await input_dialog("Input dialog", "Running over asyncssh").run_async()
    print('user input:{}'.format(user_input))


async def start_server() -> None:
    """
    create_server 是一个封装了 listen 方法的协程,
    它提供与旧版AsyncSSH的向后兼容性。
    唯一的区别是,server_factory参数在此调用中是一个位置参数,
    而不是关键字参数或通过SSHServerConnectionOptions对象指定,
    这与asyncio.AbstractEventLoop.create_server相似。
    :return: None
    """
    await asyncssh.create_server(
        server_factory = lambda: ssh_funny_tool(interact),
        host = '127.0.0.1',
        port = 18822,
        server_version = 'DEMO-TEST-AsyncSSH_2.13.1',  # 自定义服务器 SSH 协议版本名称
        server_host_keys = ['/Users/cyx/.ssh/devcloud_ssh/devcloud_id_rsa'],
    )
    await asyncio.Event().wait()  # 添加一个永远不会触发的事件,使服务器保持运行状态


if __name__ == '__main__':
    try:
        print(asyncssh.__version__)
        asyncio.run(start_server())
    except (OSError, asyncssh.Error) as exc:
        sys.exit('Error starting server: ' + str(exc))

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 前言
  • 关于 asyncssh 库
  • 用户认证协议的定制
    • 用户名密码认证
      • 交互式认证
        • 公钥认证方式
          • 基于authorized_keys文件进行的 ssh 公钥认证
          • ssh服务器自动解析客户端发送来的公钥
      • 用户连接协议的定制化
      相关产品与服务
      短信
      腾讯云短信(Short Message Service,SMS)可为广大企业级用户提供稳定可靠,安全合规的短信触达服务。用户可快速接入,调用 API / SDK 或者通过控制台即可发送,支持发送验证码、通知类短信和营销短信。国内验证短信秒级触达,99%到达率;国际/港澳台短信覆盖全球200+国家/地区,全球多服务站点,稳定可靠。
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档