消费 Demo-第三方软件

最近更新时间:2025-06-27 15:38:52

我的收藏
本文介绍使用 Filebeat、 Logstash、Flume 消费日志,以及通过 HTTP 方式将您消费的日志发送至 SIEM(Splunk、Devo)。

Filebeat 消费 CLS 日志

说明:
推荐使用版本7.5.0及以上。

filebeat.inputs:
- type: kafka
hosts:
- kafkaconsumer-${region}.cls.tencentyun.com:9095
topics: "您的消费主题"
group_id: "您的消费组名称"
username: "${logsetID}"
password: "${SecretId}#${SecretKey}"
sasl.mechanism: "PLAIN"
processors:
- decode_json_fields:
fields: ["message"]
target: ""
overwrite_keys: true
output.file:
path: /tmp
filename: filebeat_data.log
rotate_every_kb: 102400
number_of_files: 7


Logstash 消费 CLS 日志

说明:
推荐使用 Logstash 8.0及以上版本。
input {
kafka {
# cls kafka 协议消费控制台给出的的主题名称,例如 XXXXXX-633a268c-XXXX-4a4c-XXXX-7a9a1a7baXXXX,可在控制台复制
topics => "您的消费主题"
# 服务地址+端口,外网端口9096,内网端口9095,例子是内网消费,请根据您的实际情况填写
bootstrap_servers => "kafkaconsumer-${region}.cls.tencentyun.com:9095"
group_id => "您的消费组名称"
security_protocol => "SASL_PLAINTEXT"
sasl_mechanism => "PLAIN"
# 用户名是日志集合 ID,例如 ca5cXXXXdd2e-4ac0af12-92d4b677d2c6
# 密码是用户的 SecretId#SecretKey 组合的字符串,例如 AKID********************************#XXXXuXtymIXT0Lac,注意不要丢失#。建议使用子账号密钥,主账号为子账号授权时,遵循最小权限原则,即子账号的访问策略中的 action、resource 都配置为最小范围,可以满足操作即可。
sasl_jaas_config => "org.apache.kafka.common.security.plain.PlainLoginModule required username='${logsetID}' password='${SecretId}#${SecretKey}';"
}
}
output {
stdout { codec => json }
}


Flume 消费 CLS 日志

若您需将日志数据消费至自建的 HDFS、Kafka 集群,则可以通过 Flume 组件来中转,具体操作参见如下示例。
说明:
推荐使用 Flume1.9.0及以上。

开启日志的 Kafka 消费协议

参见 操作步骤 开启日志的 Kafka 消费协议,并获取消费的服务域名和 Topic。

Flume 配置

a1.sources = source_kafka
a1.sinks = sink_local
a1.channels = channel1

# 配置Source
a1.sources.source_kafka.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.source_kafka.batchSize = 10
a1.sources.source_kafka.batchDurationMillis = 200000
# 服务地址+端口,外网端口9096,内网端口9095,例子是内网消费,请根据您的实际情况填写
a1.sources.source_kafka.kafka.bootstrap.servers = kafkaconsumer-${region}.cls.tencentyun.com:9095
# cls kafka 协议消费控制台给出的的主题名称,例如 XXXXXX-633a268c-XXXX-4a4c-XXXX-7a9a1a7baXXXX,可在控制台复制
a1.sources.source_kafka.kafka.topics = 您的消费主题
# 请替换为您的消费组名称
a1.sources.source_kafka.kafka.consumer.group.id = 您的消费组名称
a1.sources.source_kafka.kafka.consumer.auto.offset.reset = earliest
a1.sources.source_kafka.kafka.consumer.security.protocol = SASL_PLAINTEXT
a1.sources.source_kafka.kafka.consumer.sasl.mechanism = PLAIN
# 用户名是日志集合 ID,例如 ca5cXXXXdd2e-4ac0af12-92d4b677d2c6
# 密码是用户的 SecretId#SecretKey 组合的字符串,例如 AKID********************************#XXXXuXtymIXT0Lac注意不要丢失#。建议使用子账号密钥,主账号为子账号授权时,遵循最小权限原则,即子账号的访问策略中的 action、resource 都配置为最小范围,可以满足操作即可,注意 jaas.config 最后有;分号,不填写会报错。
a1.sources.source_kafka.kafka.consumer.sasl.jaas.config = org.apache.kafka.common.security.plain.PlainLoginModule required username="${logsetID}" password="${SecretId}#${SecretKey}";

# 配置 sink
a1.sinks.sink_local.type = logger

a1.channels.channel1.type = memory
a1.channels.channel1.capacity = 1000
a1.channels.channel1.transactionCapacity = 100

# 将 source 和 sink 绑定到 channel
a1.sources.source_kafka.channels = channel1
a1.sinks.sink_local.channel = channel1

将日志消费至 SIEM(Splunk)

SIEM:安全信息与事件管理系统(Security Information and Event Management),例如 Splunk。


1. 请在您的 Splunk 配置 HEC (HTTP事件收集器),您在 Demo Shipper 代码中需要使用:
Splunk HEC endpoint,例如 http:#192.168.1.1:8088/services/collector。
HEC token,例如 1329e5cc-2c0d-XXXX-XXXX-332c753f54bd。
2. 部署 Demo shipper。该代码的作用:从 CLS 日志消费日志的同时,将日志通过 HTTP 的方式发送到 Splunk HEC。
import json
import time
import threading
import queue
from kafka import KafkaConsumer
import requests

# 日志消费参数配置
cls_consumer_config = {
'bootstrap_servers': ['kafkaconsumer-ap-guangzhou.cls.tencentcs.com:9096'],
'topic_name': '1254139626-XXXXXXXXX-XXXX-XXXX-bdd00b95d678', #消费主题 ID,请在 Kafka 协议消费的控制台复制。
'group_id': 'xhm_consumer_group',
'auto_offset_reset': 'earliest',
'security_protocol': 'SASL_PLAINTEXT',
'sasl_mechanism': 'PLAIN',
'sasl_plain_username': "e60e9e91-XXXX-XXXX-XXXXX-f19fb6f4e626",#配置为 ${LogSetID},即日志集 ID。
'sasl_plain_password': "AKIDnTG#SfehOq3",#配置为 ${SecretId}#${SecretKey},
'api_version': (2, 5, 1)
}

# Splunk HEC 配置
SPLUNK_CONFIG = {
'url': 'http:#192.168.1.1:8088/services/collector',#splunk HEC endpoint,请在Splunk侧获取
'token': 'Splunk 1329e5cc-2c0d-XXXX-XXXX-332c753f54bd', #HEC token,请在Splunk侧获取
'max_retries': 3, # 失败重试次数
'retry_delay': 1 # 重试延迟(秒)
}

# 创建线程安全的队列
log_queue = queue.Queue(maxsize=1000) # 根据内存调整队列大小


def splunk_sender():
"""Splunk日志发送线程"""
session = requests.Session()
session.headers.update({
'Authorization': SPLUNK_CONFIG['token'],
'Content-Type': 'application/json'
})

while True:
try:
# 从队列获取日志
log_data = log_queue.get()
if log_data is None: # 终止信号
break

# 构造Splunk事件
event = {
'event': {
'message': log_data.decode('utf-8'),
'kafka_topic': cls_consumer_config['topic_name']
}
}

# 发送日志到Splunk(带重试机制)
for attempt in range(SPLUNK_CONFIG['max_retries'] + 1):
try:
response = session.post(
SPLUNK_CONFIG['url'],
data=json.dumps(event),
verify=False, # 跳过SSL验证(-k参数)
timeout=5
)
response.raise_for_status()
print(f"Successfully sent to Splunk, status: {response.status_code}")
break #成功则跳出重试循环
except Exception as e:
if attempt < SPLUNK_CONFIG['max_retries']:
print(f"Attempt {attempt + 1} failed: {str(e)}, retrying...")
time.sleep(SPLUNK_CONFIG['retry_delay'])
else:
print(f"Failed to send to Splunk after {SPLUNK_CONFIG['max_retries']} attempts: {str(e)}")
# 可以添加死信队列处理或本地存储逻辑

except Exception as e:
print(f"Error in splunk_sender: {str(e)}")
finally:
log_queue.task_done()


def consume_messages():
"""Kafka消费主线程"""
consumer = KafkaConsumer(
cls_consumer_config['topic_name'],
bootstrap_servers=cls_consumer_config['bootstrap_servers'],
group_id=cls_consumer_config['group_id'],
auto_offset_reset=cls_consumer_config['auto_offset_reset'],
security_protocol=cls_consumer_config['security_protocol'],
sasl_mechanism=cls_consumer_config['sasl_mechanism'],
sasl_plain_username=cls_consumer_config['sasl_plain_username'],
sasl_plain_password=cls_consumer_config['sasl_plain_password'],
api_version=cls_consumer_config['api_version']
)

print('Start consuming messages...')
try:
for message in consumer:
print(
f"Received message from Topic: {message.topic}, Partition: {message.partition}, Offset: {message.offset}")
# 将消息放入队列(阻塞直到有空位)
log_queue.put(message.value)
except Exception as e:
print(f"Kafka consumer error: {str(e)}")
finally:
consumer.close()


if __name__ == "__main__":
# 启动Splunk发送线程
sender_thread = threading.Thread(target=splunk_sender, daemon=True)
sender_thread.start()

try:
consume_messages()
except KeyboardInterrupt:
print("Shutting down...")
finally:
# 等待队列处理完毕
log_queue.join()
# 发送终止信号
log_queue.put(None)
sender_thread.join(timeout=10)
print("All done.")
在 SIEM(Splunk)中查看日志:


将日志消费至 SIEM( Devo)

通过 HTTP 方式:类似于Splunk、SIEM(Devo) 也提供了 HTTP endpoint,您可将消费出的日志数据通过 HTTP 协议发送至Devo,请参考 Devo 的文档 Sending data to the HTTP API endpoint
通过 Logstash,详情参考 Sending data from Logstash to Devo。示例如下:
input {
kafka {
# cls kafka 协议消费控制台给出的的主题名称,例如 XXXXXX-633a268c-XXXX-4a4c-XXXX-7a9a1a7baXXXX,可在控制台复制
topics => "您的消费主题"
# 服务地址+端口,外网端口9096,内网端口9095,例子是内网消费,请根据您的实际情况填写
bootstrap_servers => "kafkaconsumer-${region}.cls.tencentyun.com:9095"
group_id => "您的消费组名称"
security_protocol => "SASL_PLAINTEXT"
sasl_mechanism => "PLAIN"
# 用户名是日志集合 ID,例如 ca5cXXXXdd2e-4ac0af12-92d4b677d2c6
# 密码是用户的 SecretId#SecretKey 组合的字符串,例如 AKID********************************#XXXXuXtymIXT0Lac,注意不要丢失#。建议使用子账号密钥,主账号为子账号授权时,遵循最小权限原则,即子账号的访问策略中的 action、resource 都配置为最小范围,可以满足操作即可。
sasl_jaas_config => "org.apache.kafka.common.security.plain.PlainLoginModule required username='${logsetID}' password='${SecretId}#${SecretKey}';"
}
}
output {
syslog {
facility => "local7"
host => "ecollector-eu.devo.io"
port => "443"
severity => "informational"
appname => "my.app.oracle.test"
protocol => "ssl-tcp"
ssl_cert => "/Users/Bob/logstash/ca/domain.crt"
ssl_key => "/Users/Bob/logstash/ca/domain.key"
ssl_cacert => "/Users/Bob/logstash/ca/chain.crt"
codec => line {
format => "%{name},%{id}"
}
}
}