本文介绍使用 Filebeat、 Logstash、Flume 消费日志,以及通过 HTTP 方式将您消费的日志发送至 SIEM(Splunk、Devo)。
Filebeat 消费 CLS 日志
说明:
推荐使用版本7.5.0及以上。
filebeat.inputs:- type: kafkahosts:- kafkaconsumer-${region}.cls.tencentyun.com:9095topics: "您的消费主题"group_id: "您的消费组名称"username: "${logsetID}"password: "${SecretId}#${SecretKey}"sasl.mechanism: "PLAIN"processors:- decode_json_fields:fields: ["message"]target: ""overwrite_keys: trueoutput.file:path: /tmpfilename: filebeat_data.logrotate_every_kb: 102400number_of_files: 7
Logstash 消费 CLS 日志
说明:
推荐使用 Logstash 8.0及以上版本。
input {kafka {# cls kafka 协议消费控制台给出的的主题名称,例如 XXXXXX-633a268c-XXXX-4a4c-XXXX-7a9a1a7baXXXX,可在控制台复制topics => "您的消费主题"# 服务地址+端口,外网端口9096,内网端口9095,例子是内网消费,请根据您的实际情况填写bootstrap_servers => "kafkaconsumer-${region}.cls.tencentyun.com:9095"group_id => "您的消费组名称"security_protocol => "SASL_PLAINTEXT"sasl_mechanism => "PLAIN"# 用户名是日志集合 ID,例如 ca5cXXXXdd2e-4ac0af12-92d4b677d2c6# 密码是用户的 SecretId#SecretKey 组合的字符串,例如 AKID********************************#XXXXuXtymIXT0Lac,注意不要丢失#。建议使用子账号密钥,主账号为子账号授权时,遵循最小权限原则,即子账号的访问策略中的 action、resource 都配置为最小范围,可以满足操作即可。sasl_jaas_config => "org.apache.kafka.common.security.plain.PlainLoginModule required username='${logsetID}' password='${SecretId}#${SecretKey}';"}}output {stdout { codec => json }}
Flume 消费 CLS 日志
若您需将日志数据消费至自建的 HDFS、Kafka 集群,则可以通过 Flume 组件来中转,具体操作参见如下示例。
说明:
推荐使用 Flume1.9.0及以上。
开启日志的 Kafka 消费协议
Flume 配置
a1.sources = source_kafkaa1.sinks = sink_locala1.channels = channel1# 配置Sourcea1.sources.source_kafka.type = org.apache.flume.source.kafka.KafkaSourcea1.sources.source_kafka.batchSize = 10a1.sources.source_kafka.batchDurationMillis = 200000# 服务地址+端口,外网端口9096,内网端口9095,例子是内网消费,请根据您的实际情况填写a1.sources.source_kafka.kafka.bootstrap.servers = kafkaconsumer-${region}.cls.tencentyun.com:9095# cls kafka 协议消费控制台给出的的主题名称,例如 XXXXXX-633a268c-XXXX-4a4c-XXXX-7a9a1a7baXXXX,可在控制台复制a1.sources.source_kafka.kafka.topics = 您的消费主题# 请替换为您的消费组名称a1.sources.source_kafka.kafka.consumer.group.id = 您的消费组名称a1.sources.source_kafka.kafka.consumer.auto.offset.reset = earliesta1.sources.source_kafka.kafka.consumer.security.protocol = SASL_PLAINTEXTa1.sources.source_kafka.kafka.consumer.sasl.mechanism = PLAIN# 用户名是日志集合 ID,例如 ca5cXXXXdd2e-4ac0af12-92d4b677d2c6# 密码是用户的 SecretId#SecretKey 组合的字符串,例如 AKID********************************#XXXXuXtymIXT0Lac注意不要丢失#。建议使用子账号密钥,主账号为子账号授权时,遵循最小权限原则,即子账号的访问策略中的 action、resource 都配置为最小范围,可以满足操作即可,注意 jaas.config 最后有;分号,不填写会报错。a1.sources.source_kafka.kafka.consumer.sasl.jaas.config = org.apache.kafka.common.security.plain.PlainLoginModule required username="${logsetID}" password="${SecretId}#${SecretKey}";# 配置 sinka1.sinks.sink_local.type = loggera1.channels.channel1.type = memorya1.channels.channel1.capacity = 1000a1.channels.channel1.transactionCapacity = 100# 将 source 和 sink 绑定到 channela1.sources.source_kafka.channels = channel1a1.sinks.sink_local.channel = channel1