专栏首页OSSIMOSSIM传感器代理传送机制分析
原创

OSSIM传感器代理传送机制分析

OSSIM Agent的主要职责是收集网络上存在的各种设备发送的所有数据,然后按照一种标准方式有序发给OSSIM Server,Agent收集到数据后在发送给Server之前要对这些数据进行归一化处理,本文主要就如何有序发送数据与如何完成归一化进行讨论。 OSSIM传感器在通过GET框架实现OSSIM代理和OSSIM服务器之间通信协议和数据格式的之间转换。下面我们先简要看一下ossim-agent脚本:

#!/usr/bin/python -OOt
import sys
sys.path.append('/usr/share/ossim-agent/')
sys.path.append('/usr/local/share/ossim-agent/')
from ossim_agent.Agent import Agent
agent = Agent()
agent.main()

这里需要GET作为OSSIM代理向OSSIM服务器输送数据。实现紧密整合所需的两个主要操作是“生成”(或)OSSIM兼容事件的“映射Mapping”)和此类数据向OSSIM的“传输”服务器。它负责此类操作的GET框架的两个组件是EventHandler和Sender Agent,如图1所示。

图1 将Get框架内容集成到OSSIM

Event Handler的主要任务是映射数据源插件采集的事件到SIEM实例警报的OSSIM标准化事件格式。为了执行这样的过程,原始消息经历由RAW LOG转换为现有归一化数据字段格式的一个转变;在上图中我们将这些机制表示为“归一化Normalization”和“OSSIM消息”。部分日志归一化代码:

from Logger import Logger
from time import mktime, strptime
logger = Logger.logger
class Event:
    EVENT_TYPE = 'event'
    EVENT_ATTRS = [
        "type",
        "date",
        "sensor",
        "interface",
        "plugin_id",
        "plugin_sid",
        "priority",
        "protocol",
        "src_ip",
        "src_port",
        "dst_ip",
        "dst_port",
        "username",
        "password",
        "filename",
        "userdata1",
        "userdata2",
        "userdata3",
        "userdata4",
        "userdata5",
        "userdata6",
        "userdata7",
        "userdata8",
        "userdata9",
        "occurrences",
        "log",
        "data",
        "snort_sid",    # snort specific
        "snort_cid",    # snort specific
        "fdate",
        "tzone"
    ]

    def __init__(self):
        self.event = {}
        self.event["event_type"] = self.EVENT_TYPE

    def __setitem__(self, key, value):

        if key in self.EVENT_ATTRS:
            self.event[key] = self.sanitize_value(value)
            if key == "date":
                # 以秒为单位
                self.event["fdate"]=self.event[key]
                try:
                    self.event["date"]=int(mktime(strptime(self.event[key],"%Y-%m-%d %H:%M:%S")))
                except:
                    logger.warning("There was an error parsing date (%s)" %\
                        (self.event[key]))
        elif key != 'event_type':
            logger.warning("Bad event attribute: %s" % (key))

    def __getitem__(self, key):
        return self.event.get(key, None)
    # 事件表示
    def __repr__(self):
        event = self.EVENT_TYPE
        for attr in self.EVENT_ATTRS:
            if self[attr]:
                event += ' %s="%s"' % (attr, self[attr])
        return event + "\n"
    # 返回内部哈希值
    def dict(self):
        return self.event

    def sanitize_value(self, string):
        return str(string).strip().replace("\"", "\\\"").replace("'", "")

class EventOS(Event):
    EVENT_TYPE = 'host-os-event'
    EVENT_ATTRS = [
        "host",
        "os",
        "sensor",
        "interface",
        "date",
        "plugin_id",
        "plugin_sid",
        "occurrences",
        "log",
        "fdate",
    ]

class EventMac(Event):
    EVENT_TYPE = 'host-mac-event'
    EVENT_ATTRS = [
        "host",
        "mac",
        "vendor",
        "sensor",
        "interface",
        "date",
        "plugin_id",
        "plugin_sid",
        "occurrences",
        "log",
        "fdate",
    ]

class EventService(Event):
    EVENT_TYPE = 'host-service-event'
    EVENT_ATTRS = [
        "host",
        "sensor",
        "interface",
        "port",
        "protocol",
        "service",
        "application",
        "date",
        "plugin_id",
        "plugin_sid",
        "occurrences",
        "log",
        "fdate",
    ]

class EventHids(Event):
    EVENT_TYPE = 'host-ids-event'
    EVENT_ATTRS = [
        "host",
        "hostname",
        "hids_event_type",
        "target",
        "what",
        "extra_data",
        "sensor",
        "date",
        "plugin_id",
        "plugin_sid",
        "username",
        "password",
        "filename",
        "userdata1",
        "userdata2",
        "userdata3",
        "userdata4",
        "userdata5",
        "userdata6",
        "userdata7",
        "userdata8",
        "userdata9",
        "occurrences",
        "log",
        "fdate",
    ]

class WatchRule(Event):

    EVENT_TYPE = 'event'
    EVENT_ATTRS = [
        "type",
    "date",
    "fdate",
    "sensor",
    "interface",
    "src_ip",
    "dst_ip",
    "protocol",
        "plugin_id",
        "plugin_sid",
        "condition",
        "value",
        "port_from",
        "src_port",
        "port_to",
        "dst_port",
        "interval",
        "from",
        "to",
        "absolute",
    "log",
        "userdata1",
        "userdata2",
        "userdata3",
        "userdata4",
        "userdata5",
        "userdata6",
        "userdata7",
        "userdata8",
        "userdata9",
        "filename",
        "username",
    ]
class Snort(Event):
    EVENT_TYPE = 'snort-event'
    EVENT_ATTRS = [
        "sensor",
        "interface",
        "gzipdata",
        "unziplen",
        "event_type",
        "plugin_id",
        "type",
        "occurrences"
    ]

日志编码代码:

import threading, time
from Logger import Logger
logger = Logger.logger
from Output import Output
import Config
import Event
from Threshold import EventConsolidation
from Stats import Stats
from ConnPro import ServerConnPro
class Detector(threading.Thread):
    def __init__(self, conf, plugin, conn):

        self._conf = conf
        self._plugin = plugin
        self.os_hash = {}
        self.conn = conn
        self.consolidation = EventConsolidation(self._conf)
        logger.info("Starting detector %s (%s).." % \
                    (self._plugin.get("config", "name"),
                     self._plugin.get("config", "plugin_id")))
        threading.Thread.__init__(self)
    def _event_os_cached(self, event):
        if isinstance(event, Event.EventOS):
            import string
            current_os = string.join(string.split(event["os"]), ' ')
            previous_os = self.os_hash.get(event["host"], '')
            if current_os == previous_os:
                return True
            else:
                # 失败并添加到缓存
                self.os_hash[event["host"]] = \
                    string.join(string.split(event["os"]), ' ')
        return False
    def _exclude_event(self, event):

        if self._plugin.has_option("config", "exclude_sids"):
            exclude_sids = self._plugin.get("config", "exclude_sids")
            if event["plugin_sid"] in Config.split_sids(exclude_sids):
                logger.debug("Excluding event with " +\
                    "plugin_id=%s and plugin_sid=%s" %\
                    (event["plugin_id"], event["plugin_sid"]))
                return True
        return False

    def _thresholding(self):
        self.consolidation.process()
    def _plugin_defaults(self, event):
        # 从配置文件中获取默认参数
        if self._conf.has_section("plugin-defaults"):

        # 1) 日期
            default_date_format = self._conf.get("plugin-defaults",
                                                 "date_format")
            if event["date"] is None and default_date_format and \
               'date' in event.EVENT_ATTRS:
                event["date"] = time.strftime(default_date_format, 
                                              time.localtime(time.time()))
        # 2) 传感器
            default_sensor = self._conf.get("plugin-defaults", "sensor")
            if event["sensor"] is None and default_sensor and \
               'sensor' in event.EVENT_ATTRS:
                event["sensor"] = default_sensor
        # 3) 网络接口
            default_iface = self._conf.get("plugin-defaults", "interface")
            if event["interface"] is None and default_iface and \
               'interface' in event.EVENT_ATTRS:
                event["interface"] = default_iface
        # 4) 源IP
            if event["src_ip"] is None and 'src_ip' in event.EVENT_ATTRS:
                event["src_ip"] = event["sensor"]
        # 5) 时区
            default_tzone = self._conf.get("plugin-defaults", "tzone")
            if event["tzone"] is None and 'tzone' in event.EVENT_ATTRS:
                event["tzone"] = default_tzone

        # 6) sensor,source ip and dest != localhost
            if event["sensor"] in ('127.0.0.1', '127.0.1.1'):
                event["sensor"] = default_sensor

            if event["dst_ip"] in ('127.0.0.1', '127.0.1.1'):
                event["dst_ip"] = default_sensor

            if event["src_ip"] in ('127.0.0.1', '127.0.1.1'):
                event["src_ip"] = default_sensor
        # 检测日志的类型
        if event["type"] is None and 'type' in event.EVENT_ATTRS:
            event["type"] = 'detector'
        return event
    def send_message(self, event):
        if self._event_os_cached(event):
            return

        if self._exclude_event(event):
            return
        #对于一些空属性使用默认值。
        event = self._plugin_defaults(event)

        # 合并之前检查
        if self.conn is not None:
            try:
                self.conn.send(str(event))
            except:
                id = self._plugin.get("config", "plugin_id")
                c = ServerConnPro(self._conf, id)
                self.conn = c.connect(0, 10)
                try:
                    self.conn.send(str(event))
                except:
                    return
            logger.info(str(event).rstrip())
        elif not self.consolidation.insert(event):
            Output.event(event)
        Stats.new_event(event)
    def stop(self):
        #self.consolidation.clear()
        pass
#在子类中重写
    def process(self):
        pass
    def run(self):
        self.process()
class ParserSocket(Detector):
    def process(self):
        self.process()
class ParserDatabase(Detector):
    def process(self):
        self.process()

… …

从上可以看出,传感器的归一化主要负责对每个LOG内数据字段进行重新编码,使其生成一个全新的可能用于发送到OSSIM服务器的完整事件。为达成这种目的GET框架中包含了一些特定的功能,以便将所有的功能转换需要BASE64转换的字段。“OSSIM消息”负责填充GET生成的原始事件中不存在的字段。所以上面讲的plugin_id、plugin_sid是用来表示日志消息来源类型和子类型,这也是生成SIEM事件的必填字段。为事件格式完整性考虑,有些时候在无法确认源或目标IP时,系统默认会采用0.0.0.0来填充该字段。

注意:这种必填字段我们可利用phpmyadmin工具查看OSSIM的MySQL数据库。 Sender Agent负责完成下面两个任务: 发送由GET收集并由事件格式化的事件发送到OSSIM服务器,这项任务由Event Hander创建的消息组成消息队列发送到消息中间件实现,时序图如图2所示。

图2 序列图: 从安全探测器的日志转换到OSSIM服务器事件

2)管理GET框架和OSSIM服务器之间的通信,通信端口为TCP 40001通过双向握手实现。归一化原始日志是规范化过程的一个重要环节,OSSIM在归一化处理日志的同时保留了原始日志,可用于日志归档,提供了一种从规范化事件中提取原始日志的手段。 经过归一化处理的EVENTS,存储到MySQL数据库中,如图3所示。接着就由关联引擎根据规则、优先级、可靠性等参数进行交叉关联分析,得出风险值并发出各种报警提示信息。

图3 OSSIM平台日志存储机制

接下来我们再看个实例,下面是一段Apache、CiscoASA以及SSH的原始日志,如图4、图5、图6所示。

  1. Apache插件中的正则表达式: [0001 - apache-access] 访问日志event_type=event regexp=((?P\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})(:(?P\d{1,5}))? )?(?P\S+) (?P\S+) (?P\S+) \[(?P\d{2}\/\w{3}\/\d{4}:\d{2}:\d{2}:\d{2})\s+[+-]\d{4}\] \"(?P[^\"]*)\" (?P\d{3}) ((?P\d+)|-)( \"(?P[^\"]*)\" \"(?P[^\"]*)\")?$ src_ip={resolv($src)} dst_ip={resolv($dst)} dst_port={$port} date={normalize_date($date)} plugin_sid={$code} username={$user} userdata1={$request} userdata2={$size} userdata3={$referer_uri} userdata4={$useragent} filename={$id}

[0002 - apache-error] 错误日志

event_type=event
regexp=\[(?P\w{3} \w{3} \d{2} \d{2}:\d{2}:\d{2} \d{4})\] \[(?P(emerg|alert|crit|error|warn|notice|info|debug))\] (\[client (?P\S+)\] )?(?P.*)
date={normalize_date($date)}
plugin_sid={translate($type)}
src_ip={resolv($src)}
userdata1={$data}
图4 Apache原始日志

图5 一条Cisco ASA 原始日志

图6 Cisco ASA 事件分类

通过OSSIM归一化处理后,再通过Web前端展现给大家方便阅读的格式。归一化处理后的事件和原始日志的对比方法我们在《开源安全运维平台OSSIM疑难解析:入门篇》一书中还会讲解。而在图7所示的例子当中,仅使用了Userdata1和Userdata2,并没有用到Userdata3~Userdata9这些是扩展位,主要是为了预留给其他设备或服务使用,这里目标地址会标记成IP地址的形式,例如:Host192.168.11.160。实际上归一化处理这种操作发生在系统采集和存储事件之后,关联和数据分析之前,在SIEM工具中把采集过程中把数据转换成易读懂的格式,采用格式化的数据,能更容易理解。

原创声明,本文系作者授权云+社区发表,未经许可,不得转载。

如有侵权,请联系 yunjia_community@tencent.com 删除。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • 手动打造Snort+barnyard2+BASE可视化报警平台

    大家在安装基于Snort NIDS系统, 感觉很难,总是出错,其他安装Snort并不难,难的是准备工作做得不充分,如果你做的不好,在配置可视化报警时会遇到各种问...

    OSSIM
  • OSSIM架构与组成综述

    如果运维工程师手里没有高效的管理工具支持,就很难快速处理故障。市面上有很多运维监控工具,例如商业版的 Solarwinds、ManageEngine以及What...

    OSSIM
  • 深度学习OSSIM关联分析

    从海量安全事件中挖掘有用的威胁信息与情报是当今讨论的热门话题,同时这也是一个难点?怎么实现呢?这里用到一种技术叫做关联分析,他也是SIEM(Sec...

    OSSIM
  • PyQt拖放事件(二)

    在PyQt中,重新实现拖放事件处理方法,可用于处理自定义数据,或者实现一些特殊的拖放功能。

    用户6021899
  • python中文件变化监控-watchd

    在python中文件监控主要有两个库,一个是pyinotify ( https://github.com/seb-m/pyinotify/wiki ),一个是w...

    py3study
  • DPDK 报文调度/保序 终极解决方案 Event Dev 简介

    网络报文的分发以及保序一直以来是让人头痛的问题, 为了完整的解释Event Dev的背景,我们可以从两个基本概念的定义开始:

    Linux阅码场
  • google Guava之EventBus

    EventBus是Guava的事件处理机制,是设计模式中的观察者模式(生产/消费者编程模型)的优雅实现,在应用中可以处理一些异步任务。对于事件监听和发布订阅模式...

    栋先生
  • 乘风破浪的技术大咖再次集结 | 腾讯云TVP持续航行中

    腾讯云最具价值专家,简称 TVP(Tencent Cloud Valuable Professional),是腾讯云颁发给第三方技术专家们的一项荣誉认证,以此感...

    TVP官方团队
  • Typecho 开启HTTPS 以及加入CDN 不能登陆后台问题解决

    因为评论action的地址还是http,所有需要改一下评论文件的Form 提交的action的地址 找到主题的评论文件,一般是comments.php ,在里面...

    Alone88
  • 计算机视觉 OpenCV Android | 基本特征检测之 霍夫直线检测 详析

    对于每个平面空间的像素点坐标(x,y), 随着角度θ的取值不同,都会得到r值, (%+++%要点.B)而对于任意一条直线来说,在极坐标空间它的(r,θ...

    凌川江雪

扫码关注云+社区

领取腾讯云代金券