消费 Demo-第三方软件

最近更新时间:2025-11-10 17:25:32

我的收藏
本文介绍使用 Filebeat、 Logstash、Flume 消费日志,以及通过 HTTP 方式将您消费的日志发送至 SIEM(Splunk、Devo)。

Filebeat 消费 CLS 日志

说明:
推荐使用版本7.5.0及以上。

filebeat.inputs:
- type: kafka
hosts:
- kafkaconsumer-${region}.cls.tencentyun.com:9095
topics: "您的消费主题"
group_id: "您的消费组名称"
username: "${logsetID}"
password: "${SecretId}#${SecretKey}"
sasl.mechanism: "PLAIN"
processors:
- decode_json_fields:
fields: ["message"]
target: ""
overwrite_keys: true
output.file:
path: /tmp
filename: filebeat_data.log
rotate_every_kb: 102400
number_of_files: 7


Logstash 消费 CLS 日志

说明:
推荐使用 Logstash 8.0及以上版本。
input {
kafka {
# cls kafka 协议消费控制台给出的的主题名称,例如 XXXXXX-633a268c-XXXX-4a4c-XXXX-7a9a1a7baXXXX,可在控制台复制
topics => "您的消费主题"
# 服务地址+端口,外网端口9096,内网端口9095,例子是内网消费,请根据您的实际情况填写
bootstrap_servers => "kafkaconsumer-${region}.cls.tencentyun.com:9095"
group_id => "您的消费组名称"
security_protocol => "SASL_PLAINTEXT"
sasl_mechanism => "PLAIN"
# 用户名是日志集合 ID,例如 ca5cXXXXdd2e-4ac0af12-92d4b677d2c6
# 密码是用户的 SecretId#SecretKey 组合的字符串,例如 AKID********************************#XXXXuXtymIXT0Lac,注意不要丢失#。建议使用子账号密钥,主账号为子账号授权时,遵循最小权限原则,即子账号的访问策略中的 action、resource 都配置为最小范围,可以满足操作即可。
sasl_jaas_config => "org.apache.kafka.common.security.plain.PlainLoginModule required username='${logsetID}' password='${SecretId}#${SecretKey}';"
}
}
output {
stdout { codec => json }
}


Flume 消费 CLS 日志

若您需将日志数据消费至自建的 HDFS、Kafka 集群,则可以通过 Flume 组件来中转,具体操作参见如下示例。
说明:
推荐使用 Flume1.9.0及以上。

开启日志的 Kafka 消费协议

参见 操作步骤 开启日志的 Kafka 消费协议,并获取消费的服务域名和 Topic。

Flume 配置

a1.sources = source_kafka
a1.sinks = sink_local
a1.channels = channel1

# 配置Source
a1.sources.source_kafka.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.source_kafka.batchSize = 10
a1.sources.source_kafka.batchDurationMillis = 200000
# 服务地址+端口,外网端口9096,内网端口9095,例子是内网消费,请根据您的实际情况填写
a1.sources.source_kafka.kafka.bootstrap.servers = kafkaconsumer-${region}.cls.tencentyun.com:9095
# cls kafka 协议消费控制台给出的的主题名称,例如 XXXXXX-633a268c-XXXX-4a4c-XXXX-7a9a1a7baXXXX,可在控制台复制
a1.sources.source_kafka.kafka.topics = 您的消费主题
# 请替换为您的消费组名称
a1.sources.source_kafka.kafka.consumer.group.id = 您的消费组名称
a1.sources.source_kafka.kafka.consumer.auto.offset.reset = earliest
a1.sources.source_kafka.kafka.consumer.security.protocol = SASL_PLAINTEXT
a1.sources.source_kafka.kafka.consumer.sasl.mechanism = PLAIN
# 用户名是日志集合 ID,例如 ca5cXXXXdd2e-4ac0af12-92d4b677d2c6
# 密码是用户的 SecretId#SecretKey 组合的字符串,例如 AKID********************************#XXXXuXtymIXT0Lac注意不要丢失#。建议使用子账号密钥,主账号为子账号授权时,遵循最小权限原则,即子账号的访问策略中的 action、resource 都配置为最小范围,可以满足操作即可,注意 jaas.config 最后有;分号,不填写会报错。
a1.sources.source_kafka.kafka.consumer.sasl.jaas.config = org.apache.kafka.common.security.plain.PlainLoginModule required username="${logsetID}" password="${SecretId}#${SecretKey}";

# 配置 sink
a1.sinks.sink_local.type = logger

a1.channels.channel1.type = memory
a1.channels.channel1.capacity = 1000
a1.channels.channel1.transactionCapacity = 100

# 将 source 和 sink 绑定到 channel
a1.sources.source_kafka.channels = channel1
a1.sinks.sink_local.channel = channel1