首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >paramiko模块

paramiko模块

作者头像
Eric杂货铺
发布2020-09-24 10:45:24
1.2K0
发布2020-09-24 10:45:24
举报
文章被收录于专栏:Eric杂货铺Eric杂货铺

介绍

ssh是一个协议,OpenSSH是其中一个开源实现,paramiko是Python的一个库,实现了SSHv2协议(底层使用cryptography)。

有了Paramiko以后,我们就可以在Python代码中直接使用SSH协议对远程服务器执行操作,而不是通过ssh命令对远程服务器进行操作。

由于paramiko属于第三方库,所以需要使用如下命令先行安装

pip3 install paramiko

组件

  • SSHClient的作用类似于Linux的ssh命令,是对SSH会话的封装,该类封装了传输(Transport),通道(Channel)及SFTPClient建立的方法(open_sftp),通常用于执行远程命令。
client = SSHClient()
client.load_system_host_keys()
client.connect('ssh.example.com')
stdin, stdout,stderr = client.exec_command('ls -l')
==============================================================
1.1 connect方法
connect(hostname, port=22, username=None, password=None, pkey=None, key_filename=None, timeout=None, allow_agent=True, look_for_keys=True, compress=False, sock=None, gss_auth=False, gss_kex=False, gss_deleg_creds=True, gss_host=None, banner_timeout=None, auth_timeout=None, gss_trust_dns=True, passphrase=None)

hostname(str类型),连接的目标主机地址;
port(int类型),连接目标主机的端口,默认为22;
username(str类型),校验的用户名(默认为当前的本地用户名);
password(str类型),密码用于身份校验或解锁私钥;
pkey(Pkey类型),私钥方式用于身份验证;
key_filename(str or list(str)类型),一个文件名或文件名列表,用于私钥的身份验证;
timeout(float类型),一个可选的超时时间(以秒为单位)的TCP连接;
allow_agent(bool类型),设置为False时用于禁用连接到SSH代理;
look_for_keys(bool类型),设置为False时用于来禁用在~/.ssh中搜索私钥文件;
compress(bool类型),设置为True时打开压缩。

1.2 exec_command方法
#远程命令执行方法,该命令的输入与输出流为标准输入(stdin)、输出(stdout)、错误(stderr)的Python文件对像。
exec_command(command, bufsize=-1, timeout=None, get_pty=False, environment=None)
command(str类型),执行的命令串;
bufsize(int类型),文件缓冲区大小,默认为-1(不限制)

1.3 load_system_host_keys方法
load_system_host_keys(self,filename=None)

1.4 set_missing_host_policy方法
敲入“yes”,key的信息将被保存到“known_hosts”文件中。这些密钥很重要,因为它是与主机之间的信任机制。如果key被破坏或更改,那么客户端会拒绝连接并不会通知你,而paramiko也采用相同的规则。如果在“hnown_hosts”中没有保存相关的信息,SSHClient 默认行为是拒绝连接。如果是工作在系统反反复复安装的实验环境中时,这将变得无比的烦人。设置host key的规则调用的方法叫ssh client 实例的"set_missing_host_key_policy()",它设定了你所期望的方法来管理host key.如果你像我一样懒惰,你可以用"paramiko.AutoAddPolicy()"方法来自动接收未知的key:

AutoAddPolicy,目标添加主机名及主机密钥到本地HostKeys对象,并将其保存,不依赖load_system_host_keys()的配置,即使~/.ssh/hnown_hosts不存在也不产生影响;
RejectPolicy,自动拒绝未知的主机名和密钥,依赖load_system_host_keys()的配置;
WarningPolicy,用于记录一个未知的主机密钥的Python警告,并接收它,功能上AutoAddPolicy相似,但未知主机会有告警。
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
  • SFTPClient的作用类似与Linux的sftp命令,是对SFTP客户端的封装,用以实现远程文件操作,如文件上传、下载、修改文件权限等操作。
import paramiko

t = paramiko.Transport(('192.168.56.132',22))
t.connect(username='root',password='1234567')
sftp = paramiko.SFTPClient.from_transport(t)
put
put(localpath, remotepath, callback=None, confirm=True) #上传
	localpath(str类型),需上传的本地文件(源);
	remotepath(str类型),远程路径(目标);
	callback(funcation(int,int)),获取已接收的字节数及总传输字节数,以便回调函数调用,默认为None;
	confirm(bool类型),文件上传完毕后是否调用stat()方法,以便确认文件的大小。

比如:
localpath='/home/access.log'
remotepath='/data/logs/access.log'
sftp.put(localpath,remotepath)

get#下载
get(remotepath, localpath, callback=None)
	remotepath(str类型),需要下载的远程文件(源);
	callback(funcation(int,int)),获取已接收的字节数及总和传输字节数,以便回调函数调用,默认为None.
比如:
remotepath = '/data/logs/access.log'
localpath = '/home/access.log'
sftp.get(remotepath,localpath)

other:
mkdir,在SFTP服务端创建目录,如sftp.mkdir("/home/userdir",mode=0777),默认模式是0777(八进制),在某些系统上,mode被忽略。在使用它的地方,当前的umask值首先被屏蔽掉。
remove,删除SFTP服务端指定目录,如sftp.remove("/home/userdir")。
rename,重命名SFTP服务端文件或目录,如sftp.rename("/home/test.sh","/home/testfile.sh")
stat,获取远程SFTP服务端指定文件信息,如sftp.stat("/home/testfile.sh")。
listdir,获取远程SFTP服务端指定目录列表,以Python的列表(List)形式返回,如sftp.listdir("/home")。
1. Channel:是一种类Socket,一种安全的SSH传输通道;
2. Transport:是一种加密的会话,使用时会同步创建了一个加密的Tunnels(通道),这个Tunnels叫做Channel;
3. Session:是client与Server保持连接的对象,用connect()/start_client()/start_server()开始会话。

实例

import paramiko

hostname = '192.168.1.1'
username = 'root'
password = '123456'
paramiko.util.log_to_file('syslogin.log')     #发送paramiko日志到syslogin.log文件

ssh = paramiko.SSHClient()          #创建一个SSH客户端client对象
ssh.load_system_host_keys() #获取客户端host_keys,默认~/.ssh/known_hosts 
ssh.connect(hostname=hostname,username=username,password=password)    #创建SSH连接
stdin,stdout,stderr = ssh.exec_command('free -h')      #调用远程执行命令方法exec_command()
print(stdout.read().decode('utf-8'))        #打印命令执行结果,得到Python列表形式,可以使用stdout_readlines()
ssh.close()
#免密登陆
import paramiko
ssh = paramiko.SSHClient()
SSH_PRIVATE_KEY ='/root/.ssh/id_rsa'  #本地密钥文件路径

try:
    key = paramiko.RSAKey.from_private_key_file(SSH_PRIVATE_KEY) # 无解密密码时
    #key = paramiko.RSAKey.from_private_key_file(SSH_PRIVATE_KEY, password='******')  # 有解密密码时,

    ssh.load_system_host_keys() #通过known_hosts 方式进行认证可以用这个,如果known_hosts 文件未定义还需要定义 known_hosts
    #ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())  # 通过公共方式进行认证 (不需要在known_hosts 文件中存在)
    #以上两条任选一条 如果没有就会报错:<class 'paramiko.ssh_exception.SSHException'>:Server '192.168.1.61' not found in known_hosts


    ssh.connect(hostname='192.168.1.61', port=22, username='gota', pkey=key)
    stdin, stdout, stderr = ssh.exec_command("df -h")
    # 获取命令结果
    result = stdout.read()
    # 打印输出
    print(result.decode())
except Exception as e:
    print("%s:%s" % (e.__class__, e))
finally:
    # 关闭
    ssh.close()


'''
注意:生成密码的方法
A、进入本地 ssh文件夹 cd .ssh/
B、使用ssh-keygen生产本地公钥和私钥 ssh-keygen 
root@ubuntu:~/.ssh$ ls 
id_rsa id_rsa.pub
C、将生成的id_rsa.pub文件中的内容copy到目标机的.ssh/authorized_keys中就可以了,如果没有authorized_keys,自己创建。但是要注意authorized_keys的权限一般是600

或者直接在本地使用一条命令也可以实现公钥的复制,ssh-copy-id后面接入的用户就是要支持免密登录的用户。
morra@ubuntu:~/.ssh$ ssh-copy-id "gota@192.168.1.61"
/usr/bin/ssh-copy-id: INFO: Source of key(s) to be installed: "/home/morra/.ssh/id_rsa.pub"
The authenticity of host '192.168.1.42 (192.168.1.42)' can't be established.
ECDSA key fingerprint is SHA256:/ufx+/OLtdsYy7vsdk4KDu9xJsBp6zHonRAf2jjT0GI.
Are you sure you want to continue connecting (yes/no)? n^H
Please type 'yes' or 'no': yes
/usr/bin/ssh-copy-id: INFO: 1 key(s) remain to be installed -- if you are prompted now it is to install the new keys
Password:

Number of key(s) added: 1

Now try logging into the machine, with:   "ssh 'morra@192.168.1.42'"   and check to make sure that only the key(s) you wanted were added.


#去目标机器下,检查authorized_keys文件
localhost:.ssh gota$ cat authorized_keys
'''

免密登录执行命令
#coding:utf8
#自动备份
import paramiko
import os
import traceback

class SshConnectError(Exception):#
    pass

class controlHost:
    def __init__(self, host, username, password, port=22, key_file='/root/.ssh/id_rsa'):##本地密钥文件路径
        self.host = host
        self.pkey = paramiko.RSAKey.from_private_key_file(key_file)
        self.ssh = controlHost.__sshConn(self.host, username, password, self.pkey, int(port)) #调用类中的静态方法__sshConn 返回ssh连接对象
        self.sftp = self.__sftpConn()


    def close(self):
        if hasattr(self.ssh, "close"):
            self.ssh.close()

    @staticmethod
    def __sshConn(host, username, password, pkey, port):
        ssh = paramiko.SSHClient()##创建一个SSH客户端client对象
        ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
        try:
            ssh.connect(hostname=host, port=int(port), username=username, pkey=pkey) #免密登陆方式
            # print('免密登陆方式')
        except:
            try:
                ssh.connect(hostname=host, port=int(port), username=username, password=password)#密码认证
                # print('密码认证')
            except:
                raise SshConnectError("SSH Connect %s Error!" %host)
            else:
                return ssh
        else:
            return ssh

#返回sftp通道实例对象 方法
    def __sftpConn(self):
        transport = self.ssh.get_transport() #1.先ssh连上,2.再建立通道
        sftp = paramiko.SFTPClient.from_transport(transport) #创建一个已连通的SFTP客户端通道。
        return sftp


#执行命令方法
    def exeCommand(self, cmd, timeout=300):
        _, stdout, stderr = self.ssh.exec_command(cmd, timeout=timeout)
        try:
            channel = stdout.channel
            #print('channel',channel)
            exit_code = channel.recv_exit_status()
            #print('exit_code',exit_code)报错返回码是127,没有报错是0
            stdout = stdout.read().strip()
            stderr = stderr.read().strip()
            return {"status": 1, "stdout": stdout, "stderr": stderr, 'exit_code': exit_code}
        except:
            return {"status": 0, "stdout": stdout, "stderr": stderr, 'exit_code': 127}

#文件上传下载方法
    def sftpFile(self, localpath, remotepath, action):
        try:
            if action == 'push':
                dirname = os.path.dirname(remotepath)
                self.exeCommand("mkdir -p %s" % dirname)
                self.sftp.put(localpath, remotepath)
                return {"status": 1, "message": 'sftp %s %s success!' % (self.host, action)}
            elif action == "pull":
                dirname = os.path.dirname(localpath)
                if not os.path.exists(dirname):
                    os.makedirs(dirname)
                # if os.path.exists(localpath):
                #     os.remove(localpath)
                self.sftp.get(remotepath, localpath)
                return {"status": 1, "stdout": 'sftp %s %s success!' % (self.host, action), "stderr": ""}
        except Exception as e:
            return {"status": 0, "stderr": 'sftp %s %s failed %s' % (self.host, action, str(e)), "stdout": ""}

    @staticmethod
    def iter_local_path(abs_path):
        '''遍历本机该目录中所以的文件,并返回'''
        result = set([])
        for j in os.walk(abs_path):
            print(j)
            base_path = j[0]
            file_list = j[2]
            for k in file_list:
                p = os.path.join(base_path, k)
                result.add(p)
        return result

    def iter_remote_path(self, abs_path):
        '''获取远程主机abs_path下的所以文件'''
        result = set([])
        try:
            stat = str(self.sftp.lstat(abs_path))
            print('stat',stat)
        except FileNotFoundError:
            return result
        else:
            if stat.startswith("d"):
                file_list = self.exeCommand("ls %s" %abs_path)["stdout"].decode(encoding='utf-8').strip().splitlines()
                #Python strip() 方法用于移除字符串头尾指定的字符(默认为空格或换行符)或字符序列。注意:该方法只能删除开头或是结尾的字符,不能删除中间部分的字符。
                 #Python splitlines() 按照行('\r', '\r\n', \n')分隔,返回一个包含各行作为元素的列表,如果参数 keepends 为(默认值) False,不包含换行符,如果为 True,则保留换行符。

                for j in file_list:
                    p = os.path.join(abs_path, j)
                    result.update(self.iter_remote_path(p))  #合并 并集U
            else:
                result.add(abs_path)
        return result





if __name__ == '__main__':
    x = controlHost("192.168.1.40", 'root', 'Gota34cc')

    #测试 获取本地某个目录的所有文件
    # w = x.iter_local_path("/root/test")
    # print(w)

    # 测试 获取远程主机某个目录的所有文件
    # y = x.iter_remote_path("/root/test")
    # print(y)

    # 测试 命令执行方法
    # y = x.exeCommand("uname -r")
    # print(y)

    # 测试 上传下载
    w = x.sftpFile("/tmp/ansible.txt", '/tmp/xx.txt', "push")  #将本地机器的/tmp/ansible.txt,上传至远程主机/tmp目录下并命名为xx.sh
    # w = x.sftpFile('/tmp/aaaa.py', '/tmp/xyz.py', 'pull')
    print(w)

    x.close()

自动化备份平台项目公用类lib/sshConn.py
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 介绍
  • 组件
  • 实例
相关产品与服务
多因子身份认证
多因子身份认证(Multi-factor Authentication Service,MFAS)的目的是建立一个多层次的防御体系,通过结合两种或三种认证因子(基于记忆的/基于持有物的/基于生物特征的认证因子)验证访问者的身份,使系统或资源更加安全。攻击者即使破解单一因子(如口令、人脸),应用的安全依然可以得到保障。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档