首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >使用Python和SNMP (LLDP)查找交换机的邻居

使用Python和SNMP (LLDP)查找交换机的邻居
EN

Code Review用户
提问于 2020-03-12 16:42:47
回答 1查看 3.8K关注 0票数 7

INTRO

关于SNMP

SNMP表示简单的网络管理协议。这是服务器可以共享其当前状态信息的一种方式,也是管理人员修改预定义值的通道。SNMP是在网络堆栈的应用层上实现的协议(单击此处了解网络层)。该协议是为了以一致的方式从非常不同的系统收集信息而制定的。

您可以在上面的链接中阅读更多关于SNMP、OID和SNMP方法的信息。作为总结,此脚本使用:

  • snmp版本2c
  • snmpwalk作为获取数据的主要snmp方法。
  • 从设备获取的数据是从特定的OID中获取的(可以找到更多的LLDP OID 这里)。

关于自民党

链路层发现协议(LLDP)是一种供应商中立的第2层协议,可由附加到特定LAN段的站点使用,以宣传其标识和功能,并从物理相邻的第2层对等点接收该协议。

使用python结合SNMP和LLDP的K218

我的程序的目的是,通过使用Python3.6并提供一个交换机数据文件(社区字符串、snmp端口和交换机ip),返回文件中所有交换机的邻居数据(本地和远程端口+邻居的名称)。

示例配置文件:

代码语言:javascript
运行
复制
community_string1, snmp_port1, ip1
community_string2, snmp_port2, ip2
community_string3, snmp_port3, ip3

示例输出:

代码语言:javascript
运行
复制
[
    {
        "name1": {
            "ip": "ip1",
            "neighbours": [
                {
                    "neighbour_name1": "neighbour_name1",
                    "local_port1": "local_port1",
                    "remote_port1": "remote_port1"
                },
                {
                    "neighbour_name2": "neighbour_name2",
                    "local_port2": "local_port2",
                    "remote_port2": "remote_port2"
                },
                {
                    "neighbour_name3": "neighbour_name3",
                    "local_port3": "local_port3",
                    "remote_port3": "remote_port3"
                },
            ]
        },
        "name2":  {data here},
        "name3":  {data here},
    }
]

Explaining输出

  • name1表示配置文件第一行中开关的名称(通过为PARENT_NAME_OID执行snmp检索)
  • ip1表示配置文件第一行的交换机的ip (这是从配置文件中获得的)。
  • 邻居都是通过snmp使用特定的OID检索的(参见下面的代码)。

我认为这个JSON输出格式是最相关的,但是如果您有更好的想法,我想听听。

The代码

现在,代码有点混乱,但是它使用pysnmp库来完成它的工作,这个库可以很容易地通过pip安装。它接收配置文件作为CLI参数,解析它并处理其中的信息。

代码语言:javascript
运行
复制
"""
Parse a file which contains switches information (community, snmp_port, ip)
and query those devices (neighbours information) via LLDP. Return the data
as a JSON object.
"""

import argparse
import itertools
import pprint
import os
import re

from pysnmp.hlapi import *


NEIGHBOUR_PORT_OID = '1.0.8802.1.1.2.1.4.1.1.8.0'
NEIGHBOUR_NAME_OID = '1.0.8802.1.1.2.1.4.1.1.9'
PARENT_NAME_OID = '1.0.8802.1.1.2.1.3.3'


class MissingOidParameter(Exception):
    """
    Custom exception used when the OID is missing.
    """
    pass


def is_file_valid(filepath):
    """
    Check if a file exists or not.

    Args:
        filepath (str): Path to the switches file
    Returns:
        filepath or raise exception if invalid
    """

    if not os.path.exists(filepath):
        raise ValueError('Invalid filepath')
    return filepath


def get_cli_arguments():
    """
    Simple command line parser function.
    """

    parser = argparse.ArgumentParser(description="")
    parser.add_argument(
        '-f',
        '--file',
        type=is_file_valid,
        help='Path to the switches file'
    )
    return parser


def get_switches_from_file():
    """Return data as a list from a file.

    The file format is the following:

    community_string1, snmp_port1, ip1
    community_string2, snmp_port2, ip2
    community_string3, snmp_port3, ip3

    The output:

    [
        {"community": "community_string1", "snmp_port": "snmp_port1", "ip": "ip1"},
        {"community": "community_string2", "snmp_port": "snmp_port2", "ip": "ip2"},
        {"community": "community_string3", "snmp_port": "snmp_port3", "ip": "ip3"},
    ]
    """

    args = get_cli_arguments().parse_args()
    switches_info = []
    with open(args.file) as switches_info_fp:
        for line in switches_info_fp:
            line = line.rstrip().split(',')
            switches_info.append({
                'community': line[0].strip(),
                'snmp_port': line[1].strip(),
                'ip': line[2].strip(),
            })
    return switches_info


def parse_neighbours_ports_result(result):
    """
    One line of result looks like this:

    result = iso.0.8802.1.1.2.1.4.1.1.8.0.2.3 = 2

    Where the last "2" from the OID is the local port and the value
    after '=' is the remote port (2)
    """
    if not result:
        raise MissingOidParameter('No OID provided.')

    value = result.split(' = ')
    if not value:
        return 'Missing entire value for OID={}'.format(result)
    else:
        oid, port = value
        local_port = re.search(r'{}\.(\d+)'.format(NEIGHBOUR_PORT_OID[2:]), oid).group(1)

        if port:
            remote_port = re.search(r'(\d+)', port).group(1)
        else:
            remote_port = 'Unknown'

    return 'local_port', local_port, 'remote_port', remote_port


def parse_parent_name(result):
    """
    One line of result looks like this:

    result = iso.0.8802.1.1.2.1.3.3.0 = Switch01

    The name of the parent is "Switch01"
    """

    if not result:
        raise MissingOidParameter('No OID provided.')

    value = result.split(' = ')
    if not value:
        return 'Missing entire value for OID={}'.format(result)
    else:
        return 'Unknown' if not value[-1] else value[-1]


def parse_neighbour_names_results(result):
    """
    One line of result looks like this:

    result = iso.0.8802.1.1.2.1.4.1.1.9.0.2.3 = HP-2920-24G

    The name of the parent is "Switch01"
    """

    if not result:
        raise MissingOidParameter('No OID provided.')

    value = result.split(' = ')
    if not value:
        return 'Missing entire value for OID={}'.format(result)
    else:
        return ('name', 'Unknown') if not value[-1] else ('name', value[-1])


def main():
    data = {}
    switches_filedata = get_switches_from_file()

    for switch in switches_filedata:
        community = switch.get('community')
        snmp_port = switch.get('snmp_port')
        ip = switch.get('ip')

        name = ''
        for (error_indication, error_status, error_index, var_binds) in nextCmd(
                SnmpEngine(),
                CommunityData(community),
                UdpTransportTarget((ip, snmp_port)),
                ContextData(),
                ObjectType(ObjectIdentity(PARENT_NAME_OID)),
                lexicographicMode=False
        ):
            # this should always return one result
            name = parse_parent_name(str(var_binds[0]))

        if not name:
            print('Could not retrieve name of switch. Moving to the next one...')
            continue

        neighbour_names = []
        neighbour_local_remote_ports = []

        for (error_indication, error_status, error_index, var_binds) in nextCmd(
                SnmpEngine(),
                CommunityData(community),
                UdpTransportTarget((ip, snmp_port)),
                ContextData(),
                ObjectType(ObjectIdentity(NEIGHBOUR_NAME_OID)),
                lexicographicMode=False
        ):
            for var_bind in var_binds:
                neighbour_names.append(
                    parse_neighbour_names_results(str(var_bind))
                )

        for (error_indication, error_status, error_index, var_binds) in nextCmd(
                SnmpEngine(),
                CommunityData(community),
                UdpTransportTarget((ip, snmp_port)),
                ContextData(),
                ObjectType(ObjectIdentity(NEIGHBOUR_PORT_OID)),
                lexicographicMode=False
        ):
            for var_bind in var_binds:
                neighbour_local_remote_ports.append(
                    parse_neighbours_ports_result(str(var_bind))
                )

        neighbours = []
        for a, b in itertools.zip_longest(
            neighbour_names,
            neighbour_local_remote_ports,
            fillvalue='unknown'
        ):
            neighbours.append({
                a[0]: a[1],
                b[0]: b[1],
                b[2]: b[3]
            })

        data[name] = {
            'ip': ip,
            'neighbors': neighbours
        }

    return data


if __name__ == '__main__':
    all_data = main()
    pprint.pprint(all_data, indent=4)

我特别关心的What:

  • 使用pysnmp的S功能的更好/更高效的方法(也许我只能完成一次SNMP来存储所有数据,然后从那里获得所有OID所需的数据)--就像我们解析lxmls html树时所做的那样。
  • 构造我的代码的更好方法
  • 关于函数/名称名称的改进
  • 我试着坚持使用PEP8,但是我并没有真正关注它。我对一切都很熟悉,所以我希望你们不要过分关注这个问题。
EN

回答 1

Code Review用户

回答已采纳

发布于 2020-03-14 19:08:19

类型提示

PEP484类型提示将有所帮助;例如:

代码语言:javascript
运行
复制
def is_file_valid(filepath: str) -> bool:

函数收缩

实际上,is_file_valid并不是正在发生的事情。您正在使用这个函数做两件事:

  • 通过an解析系统将输入字符串转换到程序所需的任何内容。
  • 验证输入字符串是否正确。

文档展示了实际应该如何做到这一点:

代码语言:javascript
运行
复制
parser.add_argument('bar', type=open)

这将调用open并返回一个文件对象,如果文件不存在,则失败。

解析

get_switches_from_file可以使用元组解压缩:

代码语言:javascript
运行
复制
community, port, ip = (t.strip() for t in line.split(','))

这有一个额外的优点,即包含三个以上部分的不规则行将导致错误,而不是被忽略。

更好的是,将其委托给一个类:

代码语言:javascript
运行
复制
class Switch:
    def __init__(self, line: str):
        self.community, self.port, self.ip = (
            field.strip() for field in line.split(',')
        )

避免迭代级联

与其维护switches_info,不如简单地从内部循环中yield每个字典。这将导致您的方法从O(n)内存转到O(1)内存,而运行时可能会付出较小的代价。

如果使用上面的Switch类,这可能如下所示

代码语言:javascript
运行
复制
    args = get_cli_arguments().parse_args()
    with args.file as switches_info_fp:
        for line in switches_info_fp:
            yield Switch(line)

关注点分离

parse_neighbours_ports_result有一种奇怪的返回格式。还不清楚第一个和第三个字符串是否有用。或者返回一个带有实际端口值的二元组,或者返回一个命名的元组或类实例。

隐式元组解包

代码语言:javascript
运行
复制
for (error_indication, error_status, error_index, var_binds)

可能会失去父母。

进口

要简化代码,请执行一些from x import y

代码语言:javascript
运行
复制
from argparse import ArgumentParser
from itertools import zip_longest
票数 5
EN
页面原文内容由Code Review提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://codereview.stackexchange.com/questions/238781

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档