前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >代码远程执行主机命令-Paramiko入门

代码远程执行主机命令-Paramiko入门

作者头像
怪盗LYL
发布2023-10-08 09:19:18
4330
发布2023-10-08 09:19:18
举报
文章被收录于专栏:测试开发真货测试开发真货

什么是 「Paramiko」? Paramiko是一个Python实现的SSHv2协议的库,可以用于在远程服务器上执行命令、上传和下载文件等操作。它使用了加密算法,可以提供安全的远程访问。由于其简单易用的API和丰富的功能,Paramiko被广泛用于自动化运维和云计算等领域。

1 使用的目的

  • 执行远程主机脚本。
  • 发起接口测试以后查询系统日志。

2 Paramiko 安装

使用效果图

demo类

  • 可以看到查询需要用到的信息,主要是ip,用户名,密码,端口号
  • 这些我们可以配置到yaml文件里面维护

3 使用yaml文件配置主机连接信息

  • 创建一个工具类
代码语言:javascript
复制
import re
import time
import urllib
from time import sleep

# 定义一个类,表示一台远端linux主机
import paramiko
from paramiko import util


class Linux(object):
    # 通过IP, 用户名,密码,超时时间初始化一个远程Linux主机
    def __init__(self, ip, username, password,env ='默认环境',timeout=5):
        self.ip = ip
        self.username = username
        self.password = password
        self.timeout = timeout
        # transport和chanel
        self.trans = ''
        self.chan = ''
        # 链接失败的重试次数
        self.try_times = 3

    # 调用该方法连接远程主机
    def connect(self):
        while True:
            # 连接过程中可能会抛出异常,比如网络不通、链接超时
            try:
                self.trans = paramiko.Transport(sock=(self.ip, 22))
                self.trans.set_log_channel('paramiko')
                util.log_to_file('test.log')
                self.trans.connect(username=self.username, password=self.password)
                self.chan = self.trans.open_session()
                self.chan.settimeout(self.timeout)
                self.chan.get_pty()
                self.chan.invoke_shell()
                # 如果没有抛出异常说明连接成功,直接返回
                print(u'连接%s成功' % self.ip)
                # 接收到的网络数据解码为str
                print(self.chan.recv(65535).decode('utf-8'))
                return
            # 这里不对可能的异常如socket.error, socket.timeout细化,直接一网打尽
            except Exception:
                if self.try_times != 0:
                    print(u'连接%s失败,进行重试' % self.ip)
                    self.try_times -= 1
                else:
                    print(u'重试3次失败,结束程序')
                    exit(1)

    # 断开连接
    def close(self):
        print('关闭'+self.ip)
        self.chan.close()
        self.trans.close()

    # 发送要执行的命令
    def send(self, cmds=''):
        cmds += '\r'
        res = ''
        self.chan.send(cmds)
        # 特别备注,这里需要判断一条语句已经执行通过,结尾来判断。
        # 终端结尾最后是']$ ',注意后边还有一个空格
        shellhead = ']$ '
        try:
            while True:
                # 日志预留回显时间
                time.sleep(1)
                ret = self.chan.recv(65535).decode()
                res += ret
                # 判断的结尾,就停止循环
                if ret.endswith(shellhead):
                    break
            return res
        except Exception as e:
            # 日志预留
            self.chan.close()
            self.trans.close()
    def is_alive(self):
        print(self.chan.closed)   # 是否关闭
        return self.chan.active    # 是否活动
    # def upload(self,local_path,target_path):
    #     # 连接,上传
    #     # file_name = self.create_file()
    #     sftp = paramiko.SFTPClient.from_transport(self.t)
    #     # 将location.py 上传至服务器 /tmp/mytest.py
    #     sftp.put(local_path, target_path)
    # def download(self,remote_path,local_path):
    #     sftp = paramiko.SFTPClient.from_transport(self.t)
    #     sftp.get(remote_path,local_path)


if __name__ == '__main__':
    transid = "流水关键字"
    path, cmd = 'cd /路径', f'grep {transid} *文件名*'
    hosts = "主机ip"
    host_username,host_password =  "账号", "密码"
    host = Linux(hosts, host_username,host_password)
    host.connect()
    try:
        host.send(path)
        interface_str = host.send(cmd)
        host.close()
        new_text = urllib.parse.unquote(interface_str) #用同一个字符换成%xx转义。相当于JS中的urldecode(),对url进行解码。
        print(new_text)
        print(interface_str)
    except Exception as e:
        host.close()

测试

  • 有可能我们的日志是随机打印到不同的主机上的,所以可以host配置成列表的。

主机配置

4 添加一个线程类:

  • 多个主机时候我们不会一个个线性去查询,所以我们可以创建一个线程类去多线程调用查询。
代码语言:javascript
复制
   # -*- coding: utf-8 -*-
# @Time    : 2020/3/9 23:48
# @Author  : 怪盗LYL
# @File    : testthreading.py
#!/usr/bin/python3
import concurrent
import threading
import time

from loguru import logger

from unit.paramiko_unit import Linux


class grepThread (threading.Thread):
    def __init__(self,host, host_username, host_password,path,cmd):
        threading.Thread.__init__(self)
        self.host = host
        self.host_username = host_username
        self.host_password = host_password
        self.path = path
        self.cmd = cmd
    def run(self):
        logger.info("开始线程查询主机:" + self.host)
        host = Linux(self.host, self.host_username, self.host_password)
        host.connect()
        try:
            host.send(self.path)
            interface_str = host.send(self.cmd)
            host.close()
            self.result = interface_str
        except Exception as e:
            logger.error(e.__str__())
            host.close()
        logger.info ("退出线程:" + self.host)
    def get_result(self):
        return self.result,self.host

def do_grep(arg):
    linux = arg[0]
    path = arg[1]
    cmd = arg[2]
    linux.connect()
    try:
        linux.send(path)
        interface_str = linux.send(cmd)
        linux.close()
        result = interface_str
    except Exception as e:
        logger.error(e.__str__())
        linux.close()
    logger.info("退出线程:" + linux.ip)
    return result

def grepfunction(hosts, host_username, host_password,path,cmd):
    linuxs = []
    for i in range(0,len(hosts)):
        linux = Linux(hosts[i], host_username, host_password)
        linuxs.append(linux)
    with concurrent.futures.ThreadPoolExecutor(len(hosts)) as executor:
        to_do = []
        for i in range(0,len(linuxs)):  # 模拟多个任务
            future = executor.submit(do_grep,(linuxs[i],path,cmd))
            to_do.append(future)
        result_list = []
        for future in concurrent.futures.as_completed(to_do):  # 并发执行
            result_list.append(future.result())
    return result_list

if __name__ == '__main__':
    pass
  • 简单测试一下
代码语言:javascript
复制
    try:
        transid = "关键字"
        path, cmd = 'cd  /路径', f'grep {transid} 文件名'
        thred_list = []
        hosts = [主机列表]
        for i in range(0, len(hosts)):
            thread = grepThread(hosts[i], "账号", "密码", path, cmd)
            thred_list.append(thread)
            thread.start()
        gateway_str = ""
        geted_host = ""
        for i in thred_list:
            i.join()
            # pattern = re.compile(r'\[[^\r\t\n]*?\]')  # []包起来的
            result, host = i.get_result()
            # list_str = pattern.findall(result)
            # for j in list_str:
            if "ACTIONNAME" in result:
                gateway_str = result
                geted_host = host
        logger.info("退出网关查询主线程")
        print(gateway_str, geted_host)
    except Exception as e:
        # logger.exception(e)
        logger.error(e)
        print({"error_code": 9999})

测试

7 结语

  • 这样使用Paramiko入门操作就介绍完了。大家可以根据自己实际情况在自己的项目里使用。
  • 目前我是封装到了我的测试类里面,请求和返回之后会根据返回的报文解析关键字去主机查询日志,里面再根据json或者xml格式化输出。xml格式化使用ElementTree,json格式化使用json,读者可以自行尝试。
代码语言:javascript
复制
#root = ET.fromstring("报文")
#minidom.parseString(ET.tostring(et)).toprettyxml(indent='  ')


#json.dumps(ssprequest.json_response, indent=2, ensure_ascii=False)
  • 下面这段代码可能用到:
代码语言:javascript
复制
    TRANS_ID_str = TRANS_ID_str.strip()
    TRANS_ID_str = re.sub(r'\\[.*?\[K', '', TRANS_ID_str)
  • 好久没更新了,最近在摆烂。
本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2023-02-19 12:47:17,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 测试开发真货 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1 使用的目的
  • 2 Paramiko 安装
  • 3 使用yaml文件配置主机连接信息
  • 4 添加一个线程类:
  • 7 结语
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档