生产上建议对一些核心改动启用审计日志,既可用于合规检查,也便于问题追踪排查。
具体操作如下
创建目录 D:\mssql2022\auditlogs\
USE [master] ;
ALTER SERVER AUDIT [AuditLog] WITH (STATE = OFF) ;
DROP SERVER AUDIT [AuditLog] ;
CREATE SERVER AUDIT AuditLog
TO FILE
(
FILEPATH = 'D:\mssql2022\auditlogs\',
MAXSIZE = 200 MB,
MAX_ROLLOVER_FILES = 5
) WITH (QUEUE_DELAY = 1000, ON_FAILURE = CONTINUE);
ALTER SERVER AUDIT AuditLog WITH (STATE = ON);
-- 官方文档 https://learn.microsoft.com/zh-cn/sql/relational-databases/security/auditing/sql-server-audit-action-groups-and-actions?view=sql-server-ver17
USE [master];
ALTER SERVER AUDIT SPECIFICATION [AuditSpec] WITH (STATE = OFF) ;
DROP SERVER AUDIT SPECIFICATION [AuditSpec] ;
CREATE SERVER AUDIT SPECIFICATION AuditSpec
FOR SERVER AUDIT AuditLog
ADD (SUCCESSFUL_LOGIN_GROUP),
ADD (FAILED_LOGIN_GROUP),
ADD (SCHEMA_OBJECT_PERMISSION_CHANGE_GROUP),
ADD (SERVER_ROLE_MEMBER_CHANGE_GROUP),
ADD (DBCC_GROUP),
ADD (BACKUP_RESTORE_GROUP),
ADD (SERVER_PERMISSION_CHANGE_GROUP),
ADD (LOGIN_CHANGE_PASSWORD_GROUP),
ADD (DATABASE_OBJECT_CHANGE_GROUP),
ADD (DATABASE_CHANGE_GROUP),
ADD (DATABASE_LOGOUT_GROUP),
ADD (DATABASE_OBJECT_OWNERSHIP_CHANGE_GROUP),
ADD (DATABASE_OWNERSHIP_CHANGE_GROUP),
ADD (DATABASE_PERMISSION_CHANGE_GROUP),
ADD (DATABASE_ROLE_MEMBER_CHANGE_GROUP),
ADD (USER_CHANGE_PASSWORD_GROUP),
ADD (APPLICATION_ROLE_CHANGE_PASSWORD_GROUP),
ADD (AUDIT_CHANGE_GROUP),
ADD (SERVER_PRINCIPAL_CHANGE_GROUP);
ALTER SERVER AUDIT SPECIFICATION AuditSpec WITH (STATE = ON);
ALTER LOGIN [devops] WITH PASSWORD = 'NewPassword123!', CHECK_POLICY = ON;
SELECT
event_time AS UTC时间,
DATEADD(HOUR, 8, event_time) as CST时间 ,
-- host_name, -- mssql 2014版本里没有这个列
database_name as 库名, -- 发生操作的数据库上下文
object_name as 发生审计的实体的名称, -- 发生审计的实体的名称
statement AS SQL语句 ,
action_id ,
succeeded AS 成功,
server_principal_name as 当前登陆名, -- 当前登陆名
target_server_principal_name as 操作的目标登录名 -- 操作的目标登录名
FROM sys.fn_get_audit_file ('D:\mssql2022\auditlogs\Audit*.sqlaudit', NULL, NULL)
where 1=1 and statement not like '%network protocol%' and object_name !=''
order by event_time desc ;
将日志通过 logstash 的odbc协议采集起来,发送到ELK中
或者使用python脚本采集数据并写入ELK也可以。
代码DEMO如下:
# -*- coding: utf-8 -*-
# pip install elasticsearch==7.17.12
# pip install pymssql==2.2.7
import pymssql
from datetime import datetime, timedelta
from elasticsearch import Elasticsearch
import hashlib
def collect_and_push(host, port, user, password):
conn = pymssql.connect(
host=host,
port=port,
user=user,
password=password,
database="master",
charset="utf8",
)
cursor = conn.cursor()
# 计算最近10分钟的时间(实际范围预留多一些,防止数据采集遗漏)
ten_minutes_ago_str = (datetime.now() - timedelta(minutes=13)).strftime(
"%Y-%m-%d %H:%M:%S"
)
es = Elasticsearch(
[
"https://192.168.31.11:9200",
"https://192.168.31.12:9200",
"https://192.168.31.13:9200",
],
verify_certs=False,
http_auth=("elastic", "elastic1234"),
)
# 索引名以年月为单位
es_index_name = "sqlserver-auditlog-" + datetime.now().strftime("%Y.%m")
# SQL查询语句
query = (
"""
SELECT
@@SERVERNAME as server_name,
cast(event_time AS VARCHAR(26)) + '-' + database_name + '-' + object_name + '-' + statement as concat_uniq,
-- 拼接后用于后续生成es的_id
event_time,
DATEADD(HOUR, 8, event_time) as cst_event_time,
database_name,
object_name,
statement,
action_id,
succeeded,
server_principal_name,
session_server_principal_name,
target_server_principal_name
FROM sys.fn_get_audit_file ('D:\\mssql2022\\auditlogs\\Audit*.sqlaudit', NULL, NULL)
WHERE 1=1
AND DATEADD(HOUR, 8, event_time) > '"""
+ str(ten_minutes_ago_str)
+ """' AND statement NOT LIKE '%network protocol%'
AND object_name!=''
ORDER BY event_time DESC
"""
)
cursor.execute(query)
res = cursor.fetchall()
if res:
for row in res:
(
server_name,
concat_uniq,
event_time,
cst_event_time,
database_name,
object_name,
statement,
action_id,
succeeded,
server_principal_name,
session_server_principal_name,
target_server_principal_name
) = row
# 拼接成es的_id
es_id = hashlib.md5(concat_uniq.encode()).hexdigest()
doc = {
"@timestamp": event_time,
"server_name": server_name,
"event_time": event_time,
"cst_event_time": cst_event_time,
"database_name": database_name,
"object_name": object_name,
"statement": statement,
"action_id": action_id,
"succeeded": succeeded,
"server_principal_name": server_principal_name,
"session_server_principal_name": session_server_principal_name,
"target_server_principal_name": target_server_principal_name,
}
es.index(index=es_index_name, id=es_id, body=doc)
if __name__ == "__main__":
# 连接SQL Server
port = "1433"
user = "sa"
password = "Abcd1234"
for host in ['192.168.31.181','192.168.31.182','192.168.31.183']:
collect_and_push(host, port, user, password)
对sqlserver接触的不是很多,如果上面写的有不对的地方,欢迎指出。谢谢。
官方文档 https://learn.microsoft.com/en-us/sql/relational-databases/system-functions/sys-fn-get-audit-file-transact-sql?view=sql-server-ver17&tabs=sqlserver
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。