什么是 「Paramiko」? Paramiko是一个Python实现的SSHv2协议的库,可以用于在远程服务器上执行命令、上传和下载文件等操作。它使用了加密算法,可以提供安全的远程访问。由于其简单易用的API和丰富的功能,Paramiko被广泛用于自动化运维和云计算等领域。
使用效果图
demo类
import re
import time
import urllib
from time import sleep
# 定义一个类,表示一台远端linux主机
import paramiko
from paramiko import util
class Linux(object):
# 通过IP, 用户名,密码,超时时间初始化一个远程Linux主机
def __init__(self, ip, username, password,env ='默认环境',timeout=5):
self.ip = ip
self.username = username
self.password = password
self.timeout = timeout
# transport和chanel
self.trans = ''
self.chan = ''
# 链接失败的重试次数
self.try_times = 3
# 调用该方法连接远程主机
def connect(self):
while True:
# 连接过程中可能会抛出异常,比如网络不通、链接超时
try:
self.trans = paramiko.Transport(sock=(self.ip, 22))
self.trans.set_log_channel('paramiko')
util.log_to_file('test.log')
self.trans.connect(username=self.username, password=self.password)
self.chan = self.trans.open_session()
self.chan.settimeout(self.timeout)
self.chan.get_pty()
self.chan.invoke_shell()
# 如果没有抛出异常说明连接成功,直接返回
print(u'连接%s成功' % self.ip)
# 接收到的网络数据解码为str
print(self.chan.recv(65535).decode('utf-8'))
return
# 这里不对可能的异常如socket.error, socket.timeout细化,直接一网打尽
except Exception:
if self.try_times != 0:
print(u'连接%s失败,进行重试' % self.ip)
self.try_times -= 1
else:
print(u'重试3次失败,结束程序')
exit(1)
# 断开连接
def close(self):
print('关闭'+self.ip)
self.chan.close()
self.trans.close()
# 发送要执行的命令
def send(self, cmds=''):
cmds += '\r'
res = ''
self.chan.send(cmds)
# 特别备注,这里需要判断一条语句已经执行通过,结尾来判断。
# 终端结尾最后是']$ ',注意后边还有一个空格
shellhead = ']$ '
try:
while True:
# 日志预留回显时间
time.sleep(1)
ret = self.chan.recv(65535).decode()
res += ret
# 判断的结尾,就停止循环
if ret.endswith(shellhead):
break
return res
except Exception as e:
# 日志预留
self.chan.close()
self.trans.close()
def is_alive(self):
print(self.chan.closed) # 是否关闭
return self.chan.active # 是否活动
# def upload(self,local_path,target_path):
# # 连接,上传
# # file_name = self.create_file()
# sftp = paramiko.SFTPClient.from_transport(self.t)
# # 将location.py 上传至服务器 /tmp/mytest.py
# sftp.put(local_path, target_path)
# def download(self,remote_path,local_path):
# sftp = paramiko.SFTPClient.from_transport(self.t)
# sftp.get(remote_path,local_path)
if __name__ == '__main__':
transid = "流水关键字"
path, cmd = 'cd /路径', f'grep {transid} *文件名*'
hosts = "主机ip"
host_username,host_password = "账号", "密码"
host = Linux(hosts, host_username,host_password)
host.connect()
try:
host.send(path)
interface_str = host.send(cmd)
host.close()
new_text = urllib.parse.unquote(interface_str) #用同一个字符换成%xx转义。相当于JS中的urldecode(),对url进行解码。
print(new_text)
print(interface_str)
except Exception as e:
host.close()
测试
主机配置
# -*- coding: utf-8 -*-
# @Time : 2020/3/9 23:48
# @Author : 怪盗LYL
# @File : testthreading.py
#!/usr/bin/python3
import concurrent
import threading
import time
from loguru import logger
from unit.paramiko_unit import Linux
class grepThread (threading.Thread):
def __init__(self,host, host_username, host_password,path,cmd):
threading.Thread.__init__(self)
self.host = host
self.host_username = host_username
self.host_password = host_password
self.path = path
self.cmd = cmd
def run(self):
logger.info("开始线程查询主机:" + self.host)
host = Linux(self.host, self.host_username, self.host_password)
host.connect()
try:
host.send(self.path)
interface_str = host.send(self.cmd)
host.close()
self.result = interface_str
except Exception as e:
logger.error(e.__str__())
host.close()
logger.info ("退出线程:" + self.host)
def get_result(self):
return self.result,self.host
def do_grep(arg):
linux = arg[0]
path = arg[1]
cmd = arg[2]
linux.connect()
try:
linux.send(path)
interface_str = linux.send(cmd)
linux.close()
result = interface_str
except Exception as e:
logger.error(e.__str__())
linux.close()
logger.info("退出线程:" + linux.ip)
return result
def grepfunction(hosts, host_username, host_password,path,cmd):
linuxs = []
for i in range(0,len(hosts)):
linux = Linux(hosts[i], host_username, host_password)
linuxs.append(linux)
with concurrent.futures.ThreadPoolExecutor(len(hosts)) as executor:
to_do = []
for i in range(0,len(linuxs)): # 模拟多个任务
future = executor.submit(do_grep,(linuxs[i],path,cmd))
to_do.append(future)
result_list = []
for future in concurrent.futures.as_completed(to_do): # 并发执行
result_list.append(future.result())
return result_list
if __name__ == '__main__':
pass
try:
transid = "关键字"
path, cmd = 'cd /路径', f'grep {transid} 文件名'
thred_list = []
hosts = [主机列表]
for i in range(0, len(hosts)):
thread = grepThread(hosts[i], "账号", "密码", path, cmd)
thred_list.append(thread)
thread.start()
gateway_str = ""
geted_host = ""
for i in thred_list:
i.join()
# pattern = re.compile(r'\[[^\r\t\n]*?\]') # []包起来的
result, host = i.get_result()
# list_str = pattern.findall(result)
# for j in list_str:
if "ACTIONNAME" in result:
gateway_str = result
geted_host = host
logger.info("退出网关查询主线程")
print(gateway_str, geted_host)
except Exception as e:
# logger.exception(e)
logger.error(e)
print({"error_code": 9999})
测试
#root = ET.fromstring("报文")
#minidom.parseString(ET.tostring(et)).toprettyxml(indent=' ')
#json.dumps(ssprequest.json_response, indent=2, ensure_ascii=False)
TRANS_ID_str = TRANS_ID_str.strip()
TRANS_ID_str = re.sub(r'\\[.*?\[K', '', TRANS_ID_str)