SNMP表示简单的网络管理协议。这是服务器可以共享其当前状态信息的一种方式,也是管理人员修改预定义值的通道。SNMP是在网络堆栈的应用层上实现的协议(单击此处了解网络层)。该协议是为了以一致的方式从非常不同的系统收集信息而制定的。
您可以在上面的链接中阅读更多关于SNMP、OID和SNMP方法的信息。作为总结,此脚本使用:
链路层发现协议(LLDP)是一种供应商中立的第2层协议,可由附加到特定LAN段的站点使用,以宣传其标识和功能,并从物理相邻的第2层对等点接收该协议。
使用python结合SNMP和LLDP的K218
我的程序的目的是,通过使用Python3.6并提供一个交换机数据文件(社区字符串、snmp端口和交换机ip),返回文件中所有交换机的邻居数据(本地和远程端口+邻居的名称)。
示例配置文件:
community_string1, snmp_port1, ip1
community_string2, snmp_port2, ip2
community_string3, snmp_port3, ip3
示例输出:
[
{
"name1": {
"ip": "ip1",
"neighbours": [
{
"neighbour_name1": "neighbour_name1",
"local_port1": "local_port1",
"remote_port1": "remote_port1"
},
{
"neighbour_name2": "neighbour_name2",
"local_port2": "local_port2",
"remote_port2": "remote_port2"
},
{
"neighbour_name3": "neighbour_name3",
"local_port3": "local_port3",
"remote_port3": "remote_port3"
},
]
},
"name2": {data here},
"name3": {data here},
}
]
Explaining输出
name1
表示配置文件第一行中开关的名称(通过为PARENT_NAME_OID
执行snmp检索)ip1
表示配置文件第一行的交换机的ip (这是从配置文件中获得的)。我认为这个JSON输出格式是最相关的,但是如果您有更好的想法,我想听听。
The代码
现在,代码有点混乱,但是它使用pysnmp
库来完成它的工作,这个库可以很容易地通过pip
安装。它接收配置文件作为CLI参数,解析它并处理其中的信息。
"""
Parse a file which contains switches information (community, snmp_port, ip)
and query those devices (neighbours information) via LLDP. Return the data
as a JSON object.
"""
import argparse
import itertools
import pprint
import os
import re
from pysnmp.hlapi import *
NEIGHBOUR_PORT_OID = '1.0.8802.1.1.2.1.4.1.1.8.0'
NEIGHBOUR_NAME_OID = '1.0.8802.1.1.2.1.4.1.1.9'
PARENT_NAME_OID = '1.0.8802.1.1.2.1.3.3'
class MissingOidParameter(Exception):
"""
Custom exception used when the OID is missing.
"""
pass
def is_file_valid(filepath):
"""
Check if a file exists or not.
Args:
filepath (str): Path to the switches file
Returns:
filepath or raise exception if invalid
"""
if not os.path.exists(filepath):
raise ValueError('Invalid filepath')
return filepath
def get_cli_arguments():
"""
Simple command line parser function.
"""
parser = argparse.ArgumentParser(description="")
parser.add_argument(
'-f',
'--file',
type=is_file_valid,
help='Path to the switches file'
)
return parser
def get_switches_from_file():
"""Return data as a list from a file.
The file format is the following:
community_string1, snmp_port1, ip1
community_string2, snmp_port2, ip2
community_string3, snmp_port3, ip3
The output:
[
{"community": "community_string1", "snmp_port": "snmp_port1", "ip": "ip1"},
{"community": "community_string2", "snmp_port": "snmp_port2", "ip": "ip2"},
{"community": "community_string3", "snmp_port": "snmp_port3", "ip": "ip3"},
]
"""
args = get_cli_arguments().parse_args()
switches_info = []
with open(args.file) as switches_info_fp:
for line in switches_info_fp:
line = line.rstrip().split(',')
switches_info.append({
'community': line[0].strip(),
'snmp_port': line[1].strip(),
'ip': line[2].strip(),
})
return switches_info
def parse_neighbours_ports_result(result):
"""
One line of result looks like this:
result = iso.0.8802.1.1.2.1.4.1.1.8.0.2.3 = 2
Where the last "2" from the OID is the local port and the value
after '=' is the remote port (2)
"""
if not result:
raise MissingOidParameter('No OID provided.')
value = result.split(' = ')
if not value:
return 'Missing entire value for OID={}'.format(result)
else:
oid, port = value
local_port = re.search(r'{}\.(\d+)'.format(NEIGHBOUR_PORT_OID[2:]), oid).group(1)
if port:
remote_port = re.search(r'(\d+)', port).group(1)
else:
remote_port = 'Unknown'
return 'local_port', local_port, 'remote_port', remote_port
def parse_parent_name(result):
"""
One line of result looks like this:
result = iso.0.8802.1.1.2.1.3.3.0 = Switch01
The name of the parent is "Switch01"
"""
if not result:
raise MissingOidParameter('No OID provided.')
value = result.split(' = ')
if not value:
return 'Missing entire value for OID={}'.format(result)
else:
return 'Unknown' if not value[-1] else value[-1]
def parse_neighbour_names_results(result):
"""
One line of result looks like this:
result = iso.0.8802.1.1.2.1.4.1.1.9.0.2.3 = HP-2920-24G
The name of the parent is "Switch01"
"""
if not result:
raise MissingOidParameter('No OID provided.')
value = result.split(' = ')
if not value:
return 'Missing entire value for OID={}'.format(result)
else:
return ('name', 'Unknown') if not value[-1] else ('name', value[-1])
def main():
data = {}
switches_filedata = get_switches_from_file()
for switch in switches_filedata:
community = switch.get('community')
snmp_port = switch.get('snmp_port')
ip = switch.get('ip')
name = ''
for (error_indication, error_status, error_index, var_binds) in nextCmd(
SnmpEngine(),
CommunityData(community),
UdpTransportTarget((ip, snmp_port)),
ContextData(),
ObjectType(ObjectIdentity(PARENT_NAME_OID)),
lexicographicMode=False
):
# this should always return one result
name = parse_parent_name(str(var_binds[0]))
if not name:
print('Could not retrieve name of switch. Moving to the next one...')
continue
neighbour_names = []
neighbour_local_remote_ports = []
for (error_indication, error_status, error_index, var_binds) in nextCmd(
SnmpEngine(),
CommunityData(community),
UdpTransportTarget((ip, snmp_port)),
ContextData(),
ObjectType(ObjectIdentity(NEIGHBOUR_NAME_OID)),
lexicographicMode=False
):
for var_bind in var_binds:
neighbour_names.append(
parse_neighbour_names_results(str(var_bind))
)
for (error_indication, error_status, error_index, var_binds) in nextCmd(
SnmpEngine(),
CommunityData(community),
UdpTransportTarget((ip, snmp_port)),
ContextData(),
ObjectType(ObjectIdentity(NEIGHBOUR_PORT_OID)),
lexicographicMode=False
):
for var_bind in var_binds:
neighbour_local_remote_ports.append(
parse_neighbours_ports_result(str(var_bind))
)
neighbours = []
for a, b in itertools.zip_longest(
neighbour_names,
neighbour_local_remote_ports,
fillvalue='unknown'
):
neighbours.append({
a[0]: a[1],
b[0]: b[1],
b[2]: b[3]
})
data[name] = {
'ip': ip,
'neighbors': neighbours
}
return data
if __name__ == '__main__':
all_data = main()
pprint.pprint(all_data, indent=4)
我特别关心的What:
pysnmp
的S功能的更好/更高效的方法(也许我只能完成一次SNMP来存储所有数据,然后从那里获得所有OID所需的数据)--就像我们解析lxml
s html树时所做的那样。发布于 2020-03-14 19:08:19
PEP484类型提示将有所帮助;例如:
def is_file_valid(filepath: str) -> bool:
实际上,is_file_valid
并不是正在发生的事情。您正在使用这个函数做两件事:
文档展示了实际应该如何做到这一点:
parser.add_argument('bar', type=open)
这将调用open
并返回一个文件对象,如果文件不存在,则失败。
get_switches_from_file
可以使用元组解压缩:
community, port, ip = (t.strip() for t in line.split(','))
这有一个额外的优点,即包含三个以上部分的不规则行将导致错误,而不是被忽略。
更好的是,将其委托给一个类:
class Switch:
def __init__(self, line: str):
self.community, self.port, self.ip = (
field.strip() for field in line.split(',')
)
与其维护switches_info
,不如简单地从内部循环中yield
每个字典。这将导致您的方法从O(n)内存转到O(1)内存,而运行时可能会付出较小的代价。
如果使用上面的Switch
类,这可能如下所示
args = get_cli_arguments().parse_args()
with args.file as switches_info_fp:
for line in switches_info_fp:
yield Switch(line)
parse_neighbours_ports_result
有一种奇怪的返回格式。还不清楚第一个和第三个字符串是否有用。或者返回一个带有实际端口值的二元组,或者返回一个命名的元组或类实例。
for (error_indication, error_status, error_index, var_binds)
可能会失去父母。
要简化代码,请执行一些from x import y
:
from argparse import ArgumentParser
from itertools import zip_longest
https://codereview.stackexchange.com/questions/238781
复制相似问题