专栏首页MySQL解决方案工程师MySQL审计数据归档演示

MySQL审计数据归档演示

作者:Mike Frank 译:徐轶韬

在此博客中,我将演示如何在许多mysql实例之间将审计日志进行合并归档。在后续文章中,我将展示如何通过在该归档文件上创建一个简单的哈希链来扩展此示例–这样您就可以证明是否可以通过任何方式对其进行了修改或污染,以及在何处进行了修改。

在示例代码中,我将使用mysql audit_log_read函数的新扩展功能,并说明为什么mysqlx API可以使某些任务更加简单。这个新的审计阅读功能已在MySQL 8.0.22企业版中发布。示例内容使用以SQL和python模式运行的MySQL Shell。

将展示一些的其他技巧包括:

  1. 从JSON审计数据中提取行–使用JSON_TABLE函数将JSON数据转换为表格式。
  2. 将这些行从已审计的数据库插入到审计数据归档的MySQL数据库中。如您所见,mysqlx API将使事情变得更加简单。

一些事实。正如许多DBA可以告诉您的那样,无论是法规阻止还是出于其他安全原因,DBA通常不想(或无法)访问运行MySQL的底层OS服务器。DBA没有SSH!通常从安全角度来看,运行数据库服务的OS上的内容越少越好。

由于安全性、分析等多种原因,最佳做法是经常从MySQL服务器上获取审计数据,并将其收集到一些中央数据存储中,您可以在其中查看所有MySQL服务器上的活动。为什么会这样做?

  1. 易于分析
  2. 防止数据被破坏
  3. 法规要求
  4. 存储管理

当然,可以使用多种方法通过各种产品来执行移动审计数据任务。这只是一种可能的设计模式,可以轻松地进行第三方集成或更改为将数据写入对象存储或某些其他审计数据存储库。

在术语方面,我将合并审计数据的服务器称为“归档服务器”。该服务器将拥有一个帐户,我将称其为“ auditarchiver”,该帐户只能在audit_data表中插入并选择。(它不能更改数据)。

将要提取审计数据的每个服务器都有一个帐户,该帐户通过SQL连接读取审计数据,并从审计文件中读取JSON数据。

首先让我们以管理员身份登录到归档MySQL服务器实例上–我将使用root。整个示例都需要使用mysql shell。它包括用于从目标服务器提取审计数据进行计划批处理归档的python。

步骤1 –审计归档数据库设置。 在归档服务器上创建模式和表 在审计数据归档服务器上

> mysqlsh

\sql
\connect root@<archiving server>;
create schema audit_archive;

use audit_archive;

CREATE TABLE `audit_data` (
`server_uuid` varchar(45) NOT NULL,
`id` int NOT NULL,
`ts` timestamp NOT NULL,
`class` varchar(20) DEFAULT NULL,
`event` varchar(80) DEFAULT NULL,
`the_account` varchar(80) DEFAULT NULL,
`login_ip` varchar(200) DEFAULT NULL,
`login_os` varchar(200) DEFAULT NULL,
`login_user` varchar(200) DEFAULT NULL,
`login_proxy` varchar(200) DEFAULT NULL,
`connection_id` varchar(80) DEFAULT NULL,
`db` varchar(40) DEFAULT NULL,
`status` int DEFAULT NULL,
`connection_type` varchar(40) DEFAULT NULL,
`connect_os` varchar(40) DEFAULT NULL,
`pid` varchar(40) DEFAULT NULL,
`_client_name` varchar(80) DEFAULT NULL,
`_client_version` varchar(80) DEFAULT NULL,
`program_name` varchar(80) DEFAULT NULL,
`_platform` varchar(80) DEFAULT NULL,
`command` varchar(40) DEFAULT NULL,
`sql_command` varchar(40) DEFAULT NULL,
`command_status` varchar(40) DEFAULT NULL,
`query` varchar(40) DEFAULT NULL,
`query_status` int DEFAULT NULL,
`start_server_id` varchar(400) DEFAULT NULL,
`server_os_version` varchar(100) DEFAULT NULL,
`server_mysqlversion` varchar(100) DEFAULT NULL,
`args` varchar(80) DEFAULT NULL,
`account_host` varchar(80) DEFAULT NULL,
`mysql_version` varchar(80) DEFAULT NULL,
`the_os` varchar(80) DEFAULT NULL,
`the_os_ver` varchar(80) DEFAULT NULL,
`server_id` varchar(8) DEFAULT NULL,
PRIMARY KEY (`server_uuid`,`id`,`ts`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;

步骤2 –在归档服务器上 首先创建帐户。授予auditarchiver帐户选择和插入权限。(没有其他权限)

\connect root@<archiving server>;
create user auditarchiver identified by 'Th3Archivista!';
grant select, insert on audit_archive.audit_data to auditarchiver;

步骤3 –对于每台服务器读取(拉出)审计数据–创建auditreader帐户

> mysqlsh

\sql
\connect root@<servers to pull audit trail>;
select @@version;

/ *如果您未运行mysql企业版8.0.22或更高版本,请停止操作并升级* /

create user auditreader identified by 'Pu11Archivister!';
GRANT AUDIT_ADMIN
ON *.* TO 'auditreader';

步骤4 –如果尚未安装企业审计,请安装。

安装MySQL企业审计 简单地说,执行位于MySQL share目录下面的SQL - audit_log_filter_linux_install.sql

第5步–编辑/etc/my.cnf –此示例至少需要包含前3行。

[mysqld]
plugin-load-add=audit_log.so
audit-log=FORCE_PLUS_PERMANENT
audit-log-format=JSON
audit-log-strategy=SYNCHRONOUS

重新启动服务器

步骤6 –添加审计过滤器并绑定到用户 如果您以前没有添加审计过滤器,则以下内容将记录所有连接的人。这将记录很多内如,如果出于在测试环境中查看这项工作的目的,这是合理的。在生产中,您可能会希望更具选择性。

--- create audit filter - here log everything (this will generate a great deal of data)
SELECT audit_log_filter_set_filter('log_all', '{ "filter": { "log": true } }');
--- attach filter to accounts - here for all users
SELECT audit_log_filter_set_user('%', 'log_all');

对每个审计的实例重复此操作。

步骤7 –生成一些审计数据活动 以各种用户身份在安装mysql企业审计的服务器上运行一些SQL查询。

步骤8 –选择一个可以在批处理模式下调度mysqlsh的服务器

下面是批处理python脚本的工作方式(最后会重复合并后的代码以复制、编辑和运行)。 请更改使用的密码并使用特定的服务器名称等。

首先,我将使用mysqlx API通过自己的会话连接到读取服务器和归档服务器。

将“ localhost”更改为归档服务器的ip /主机名。

archive_session = mysqlx.get_session( {</code> <code>'host': 'localhost', 'port': 33060,</code> <code>'user': 'auditarchiver', 'password': 'Th3Archivista!'} )

将“ localhost”更改为已审计服务器的ip /主机名。

read_session = mysqlx.get_session( {
'host': 'localhost', 'port': 33060,
'user': 'auditreader', 'password': 'Pu11Archivister!'} )

好了,现在我需要看看我是否有之前的归档数据——这样我就可以指出审计数据中我需要开始读取更新数据的地方。如果归档不包含此实例的数据—我将从日志数据的开头开始。 如果归档表不包含此实例的数据(由其server_uuid标识),则在JSON中创建带有“start”的json字符串。“start”告诉该功能执行常规日期时间搜索。

但是,如果已经加载了先前的数据,那么我将获得插入的最后一个时间戳和事件ID,并将其用作审计数据的指针–在这种情况下,JSON搜索字符串中没有“start”。

archive_empty = archive_session.run_sql("select count(*) from audit_archive.audit_data limit 1").fetch_one()

if (archive_empty[0] > 0):
    print("Data in archive getting last event ts and id")
    search_args = archive_session.run_sql("select id, ts from audit_archive.audit_data order by ts desc, id desc limit 1").fetch_one()
    x = "set @nextts ='{ \"timestamp\": \"" + str(search_args[1]) + "\",\"id\":" + str(search_args[0]) + ", \"max_array_length\": 100 }'"
    setnext = read_session.run_sql(x)
else:
    print("The archive is empty - get oldest audit event")
    read_session.run_sql("set @nextts='{ \"start\": { \"timestamp\": \"2020-01-01\"}, \"max_array_length\": 100 }'")

好的,我们现在为我的搜索条件设置了会话变量– @nextts。

如果要查看JSON搜索字符串 view_nextts = read_session.run_sql("select @nextts")

在下一步中,您将在SQL中看到对audit_log_read组件的调用 AUDIT_LOG_READ(@nextts)

您将看到,我希望在归档中以行形式存储数据——因此我使用JSON_TABLE函数将JSON转换为行。

readaudit = read_session.run_sql(" "
"SELECT @@server_uuid as server_uuid, id, ts, class, event, the_account,login_ip,login_os,login_user,login_proxy,connection_id,db, "
" status,connection_type,connect_os,pid,_client_name,_client_version, "
" program_name,_platform,command,sql_command,command_status,query, "
" query_status,start_server_id,server_os_version,server_mysqlversion,args, "
" account_host,mysql_version,the_os,the_os_ver,server_id "
"FROM "
"JSON_TABLE "
"( "
" AUDIT_LOG_READ(@nextts), "
" '$[*]' "
" COLUMNS "
" ( "
" id INT PATH '$.id', "
" ts TIMESTAMP PATH '$.timestamp', "
" class VARCHAR(20) PATH '$.class', "
" event VARCHAR(80) PATH '$.event', "
" the_account VARCHAR(80) PATH '$.account', "
" login_ip VARCHAR(200) PATH '$.login.ip', "
" login_os VARCHAR(200) PATH '$.login.os', "
" login_user VARCHAR(200) PATH '$.login.user', "
" login_proxy VARCHAR(200) PATH '$.login.proxy', "
" connection_id VARCHAR(80) PATH '$.connection_id', "
" db VARCHAR(40) PATH '$.connection_data.db', "
" status INT PATH '$.connection_data.status', "
" connection_type VARCHAR(40) PATH '$.connection_data.connection_type', "
" connect_os VARCHAR(40) PATH '$.connection_data.connection_attributes._os', "
" pid VARCHAR(40) PATH '$.connection_data.connection_attributes._pid', "
" _client_name VARCHAR(80) PATH '$.connection_data.connection_attributes._client_name', "
" _client_version VARCHAR(80) PATH '$.connection_data.connection_attributes._client_version', "
" program_name VARCHAR(80) PATH '$.connection_data.connection_attributes.program_name', "
" _platform VARCHAR(80) PATH '$.connection_data.connection_attributes._platform', "
" command VARCHAR(40) PATH '$.general_data.command', "
" sql_command VARCHAR(40) PATH '$.general_data.sql_command', "
" command_status VARCHAR(40) PATH '$.general_data.status', "
" query VARCHAR(40) PATH '$.genera_data.query', "
" query_status INT PATH '$.general_data.status', "
" start_server_id VARCHAR(400) PATH '$.startup_data.server_id', "
" server_os_version VARCHAR(100) PATH '$.startup_data.os_version', "
" server_mysqlversion VARCHAR(100) PATH '$.startup_data.mysql_version', "
" args VARCHAR(80) PATH '$.startup_data.args', "
" account_host VARCHAR(80) PATH '$.account.host', "
" mysql_version VARCHAR(80) PATH '$.startup_data.mysql_version', "
" the_os VARCHAR(80) PATH '$.startup_data.os', "
" the_os_ver VARCHAR(80) PATH '$.startup_data.os_version', "
" server_id VARCHAR(80) PATH '$.startup_data.server_id' "
" ) "
") AS auditdata; ")

现在您可以使用所有JSON并将每个事件存储为JSON数据类型。这也很简单。但在这里,我存储在一个表中。由你决定。

好了–现在作为Auditarchiver –我将保存刚刚提取的数据。 这是mysqlx api非常方便的地方。我可以循环执行结果,并用很少的代码保存到表中。

aschema=archive_session.get_schema('audit_archive')
atable=aschema.get_table('audit_data')
if (archive_empty[0] > 0):
    evt = readaudit.fetch_one_object()
    print("Archive was not empty - skip first duplicate event")
else:
    print("Archive was empty - load all")

evt = readaudit.fetch_one_object()
while evt:
    atable.insert(evt).execute()
    evt= readaudit.fetch_one_object()

正如您可能已经注意到的那样–我并没有尝试从审计日志中一次提取过多。最大取了100。此外还受到为–audit-log-read-buffer-size设置的缓冲区大小的限制。

参见https://dev.mysql.com/doc/refman/8.0/en/audit-log-reference.html#sysvar_audit_log_read_buffer_size

因此,将此脚本保存到目录中。 cd到目录

现在,您只需以批处理模式运行mysqlsh。 第一次运行

frank@Mike % mysqlsh --py < archiveauditbatch

The archive is empty – get oldest audit event Archive was empty – load all

下一次运行 frank@Mike % mysqlsh --py < archiveauditbatch Data in archive getting last event ts and id Archive was not empty – skip first duplicate event

好的-现在您已经运行了一些测试,使用cron或您喜欢的调度程序创建一个计划的批处理。让它循环直到清空。

第9步–登录到归档服务器,查看数据 select * from audit_archive.audit_data order by server_uuid, id, ts;

运行一些统计

select event, count(event) from audit_archive.audit_data group by event;

select login_user, event, count(event) from audit_archive.audit_data group by login_user, event;

select distinct login_user, sql_command, status, query_status from audit_archive.audit_data ;

最后–这与生产质量代码相差甚远, 我插入了密码,没有为审计服务器提供循环和收集数据等参数。没有检查错误等。重点是演示一些技术来帮助对其进行尝试的人。

在后续博客中- 我将向您展示如何执行哈希链等-这样您就可以证明您的审计数据是不可变的且不受污染。

感谢您使用MySQL。这是完整的批处理脚本 https://mysqlserverteam.com/mysql-audit-data-consolidation-made-simple/archivebatch/

本文分享自微信公众号 - MySQL解决方案工程师(mysqlse),作者:Mike Frank

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2020-11-05

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • MySQL查询的生命周期

    当你执行一次MySQL查询时,有没有仔细想过,在查询结果返回之前,经过了哪些步骤呢?这些步骤有可能消耗了超出想象的时间和资源。因此,在对MySQL的查询进行优化...

    MySQLSE
  • MySQL8.0.21——错误日志中的组复制系统消息

    利用组复制,用户可以通过将系统状态复制到一组服务器来创建具有冗余的容错系统。即使某些服务器发生故障,只要不是所有服务器或大多数服务器,系统仍然可用。

    MySQLSE
  • MySQL的Bugs

    相信有小伙伴在使用MySQL的过程中,会遇到一些不明原因的问题,经过不断努力排查之后,还是无法确认这是一个什么情况,这时你可能需要查看一下这个现象是不是MySQ...

    MySQLSE
  • 用户下沉还是走向高端?如何用竞争战略玩转消费升级?

    农历新年后的一个月时间里,拼多多股价已经悄然上涨了约25%,3月5日当天一度涨超6%,目前股价已经创出IPO以来的新高。

    曾响铃
  • 爬虫入门经典(十九) | 难度提升,破解极验验证码

      ♥各位如果想要交流的话,可以加下QQ交流群:974178910,里面有各种你想要的学习资料。♥

    不温卜火
  • 麦肯锡 | 数据分析时代:大数据环境下的商业竞争(附报告全文下载)

    大数据文摘
  • 值得纪念的日子:镭速RaySync FTP关键传输指标超越国际标杆Aspera产品

    2018年3月19日对大部分人来说是一个普通的日子,但是对于我来说,是一个人生中值得纪念的日子。

    云语科技
  • 2019-12-18 设计模式中英文对照

    Albert陈凯
  • 【搭建网站】linux下WDCP建站流程

    2、登陆后,左边是管理栏目,右边就是显示的系统状态,可以很清楚的看到当前服务器的系统各项信息,如图:

    chenzhouliyan
  • x86_64汇编调试程序初步

    掌握此基础,就可以用来修改无源代码的程序等,比如希望jstatd在指定的端口上监听,而不是一个值为0的随机端口号,请参见《防火墙内JVisualVM连接jst...

    一见

扫码关注云+社区

领取腾讯云代金券