本文介绍使用 Filebeat、 Logstash、Flume 消费日志,以及通过 HTTP 方式将您消费的日志发送至 SIEM(Splunk、Devo)。
Filebeat 消费 CLS 日志
说明:
推荐使用版本7.5.0及以上。
filebeat.inputs:- type: kafkahosts:- kafkaconsumer-${region}.cls.tencentyun.com:9095topics: "您的消费主题"group_id: "您的消费组名称"username: "${logsetID}"password: "${SecretId}#${SecretKey}"sasl.mechanism: "PLAIN"processors:- decode_json_fields:fields: ["message"]target: ""overwrite_keys: trueoutput.file:path: /tmpfilename: filebeat_data.logrotate_every_kb: 102400number_of_files: 7
Logstash 消费 CLS 日志
说明:
推荐使用 Logstash 8.0及以上版本。
input {kafka {# cls kafka 协议消费控制台给出的的主题名称,例如 XXXXXX-633a268c-XXXX-4a4c-XXXX-7a9a1a7baXXXX,可在控制台复制topics => "您的消费主题"# 服务地址+端口,外网端口9096,内网端口9095,例子是内网消费,请根据您的实际情况填写bootstrap_servers => "kafkaconsumer-${region}.cls.tencentyun.com:9095"group_id => "您的消费组名称"security_protocol => "SASL_PLAINTEXT"sasl_mechanism => "PLAIN"# 用户名是日志集合 ID,例如 ca5cXXXXdd2e-4ac0af12-92d4b677d2c6# 密码是用户的 SecretId#SecretKey 组合的字符串,例如 AKID********************************#XXXXuXtymIXT0Lac,注意不要丢失#。建议使用子账号密钥,主账号为子账号授权时,遵循最小权限原则,即子账号的访问策略中的 action、resource 都配置为最小范围,可以满足操作即可。sasl_jaas_config => "org.apache.kafka.common.security.plain.PlainLoginModule required username='${logsetID}' password='${SecretId}#${SecretKey}';"}}output {stdout { codec => json }}
Flume 消费 CLS 日志
若您需将日志数据消费至自建的 HDFS、Kafka 集群,则可以通过 Flume 组件来中转,具体操作参见如下示例。
说明:
推荐使用 Flume1.9.0及以上。
开启日志的 Kafka 消费协议
Flume 配置
a1.sources = source_kafkaa1.sinks = sink_locala1.channels = channel1# 配置Sourcea1.sources.source_kafka.type = org.apache.flume.source.kafka.KafkaSourcea1.sources.source_kafka.batchSize = 10a1.sources.source_kafka.batchDurationMillis = 200000# 服务地址+端口,外网端口9096,内网端口9095,例子是内网消费,请根据您的实际情况填写a1.sources.source_kafka.kafka.bootstrap.servers = kafkaconsumer-${region}.cls.tencentyun.com:9095# cls kafka 协议消费控制台给出的的主题名称,例如 XXXXXX-633a268c-XXXX-4a4c-XXXX-7a9a1a7baXXXX,可在控制台复制a1.sources.source_kafka.kafka.topics = 您的消费主题# 请替换为您的消费组名称a1.sources.source_kafka.kafka.consumer.group.id = 您的消费组名称a1.sources.source_kafka.kafka.consumer.auto.offset.reset = earliesta1.sources.source_kafka.kafka.consumer.security.protocol = SASL_PLAINTEXTa1.sources.source_kafka.kafka.consumer.sasl.mechanism = PLAIN# 用户名是日志集合 ID,例如 ca5cXXXXdd2e-4ac0af12-92d4b677d2c6# 密码是用户的 SecretId#SecretKey 组合的字符串,例如 AKID********************************#XXXXuXtymIXT0Lac注意不要丢失#。建议使用子账号密钥,主账号为子账号授权时,遵循最小权限原则,即子账号的访问策略中的 action、resource 都配置为最小范围,可以满足操作即可,注意 jaas.config 最后有;分号,不填写会报错。a1.sources.source_kafka.kafka.consumer.sasl.jaas.config = org.apache.kafka.common.security.plain.PlainLoginModule required username="${logsetID}" password="${SecretId}#${SecretKey}";# 配置 sinka1.sinks.sink_local.type = loggera1.channels.channel1.type = memorya1.channels.channel1.capacity = 1000a1.channels.channel1.transactionCapacity = 100# 将 source 和 sink 绑定到 channela1.sources.source_kafka.channels = channel1a1.sinks.sink_local.channel = channel1
将日志消费至 SIEM(Splunk)
SIEM:安全信息与事件管理系统(Security Information and Event Management),例如 Splunk。

1. 请在您的 Splunk 配置 HEC (HTTP事件收集器),您在 Demo Shipper 代码中需要使用:
Splunk HEC endpoint,例如 http:#192.168.1.1:8088/services/collector。
HEC token,例如 1329e5cc-2c0d-XXXX-XXXX-332c753f54bd。
2. 部署 Demo shipper。该代码的作用:从 CLS 日志消费日志的同时,将日志通过 HTTP 的方式发送到 Splunk HEC。
import jsonimport timeimport threadingimport queuefrom kafka import KafkaConsumerimport requests# 日志消费参数配置cls_consumer_config = {'bootstrap_servers': ['kafkaconsumer-ap-guangzhou.cls.tencentcs.com:9096'],'topic_name': '1254139626-XXXXXXXXX-XXXX-XXXX-bdd00b95d678', #消费主题 ID,请在 Kafka 协议消费的控制台复制。'group_id': 'xhm_consumer_group','auto_offset_reset': 'earliest','security_protocol': 'SASL_PLAINTEXT','sasl_mechanism': 'PLAIN','sasl_plain_username': "e60e9e91-XXXX-XXXX-XXXXX-f19fb6f4e626",#配置为 ${LogSetID},即日志集 ID。'sasl_plain_password': "AKIDnTG#SfehOq3",#配置为 ${SecretId}#${SecretKey},'api_version': (2, 5, 1)}# Splunk HEC 配置SPLUNK_CONFIG = {'url': 'http:#192.168.1.1:8088/services/collector',#splunk HEC endpoint,请在Splunk侧获取'token': 'Splunk 1329e5cc-2c0d-XXXX-XXXX-332c753f54bd', #HEC token,请在Splunk侧获取'max_retries': 3, # 失败重试次数'retry_delay': 1 # 重试延迟(秒)}# 创建线程安全的队列log_queue = queue.Queue(maxsize=1000) # 根据内存调整队列大小def splunk_sender():"""Splunk日志发送线程"""session = requests.Session()session.headers.update({'Authorization': SPLUNK_CONFIG['token'],'Content-Type': 'application/json'})while True:try:# 从队列获取日志log_data = log_queue.get()if log_data is None: # 终止信号break# 构造Splunk事件event = {'event': {'message': log_data.decode('utf-8'),'kafka_topic': cls_consumer_config['topic_name']}}# 发送日志到Splunk(带重试机制)for attempt in range(SPLUNK_CONFIG['max_retries'] + 1):try:response = session.post(SPLUNK_CONFIG['url'],data=json.dumps(event),verify=False, # 跳过SSL验证(-k参数)timeout=5)response.raise_for_status()print(f"Successfully sent to Splunk, status: {response.status_code}")break #成功则跳出重试循环except Exception as e:if attempt < SPLUNK_CONFIG['max_retries']:print(f"Attempt {attempt + 1} failed: {str(e)}, retrying...")time.sleep(SPLUNK_CONFIG['retry_delay'])else:print(f"Failed to send to Splunk after {SPLUNK_CONFIG['max_retries']} attempts: {str(e)}")# 可以添加死信队列处理或本地存储逻辑except Exception as e:print(f"Error in splunk_sender: {str(e)}")finally:log_queue.task_done()def consume_messages():"""Kafka消费主线程"""consumer = KafkaConsumer(cls_consumer_config['topic_name'],bootstrap_servers=cls_consumer_config['bootstrap_servers'],group_id=cls_consumer_config['group_id'],auto_offset_reset=cls_consumer_config['auto_offset_reset'],security_protocol=cls_consumer_config['security_protocol'],sasl_mechanism=cls_consumer_config['sasl_mechanism'],sasl_plain_username=cls_consumer_config['sasl_plain_username'],sasl_plain_password=cls_consumer_config['sasl_plain_password'],api_version=cls_consumer_config['api_version'])print('Start consuming messages...')try:for message in consumer:print(f"Received message from Topic: {message.topic}, Partition: {message.partition}, Offset: {message.offset}")# 将消息放入队列(阻塞直到有空位)log_queue.put(message.value)except Exception as e:print(f"Kafka consumer error: {str(e)}")finally:consumer.close()if __name__ == "__main__":# 启动Splunk发送线程sender_thread = threading.Thread(target=splunk_sender, daemon=True)sender_thread.start()try:consume_messages()except KeyboardInterrupt:print("Shutting down...")finally:# 等待队列处理完毕log_queue.join()# 发送终止信号log_queue.put(None)sender_thread.join(timeout=10)print("All done.")
在 SIEM(Splunk)中查看日志:

将日志消费至 SIEM( Devo)
通过 HTTP 方式:类似于Splunk、SIEM(Devo) 也提供了 HTTP endpoint,您可将消费出的日志数据通过 HTTP 协议发送至Devo,请参考 Devo 的文档 Sending data to the HTTP API endpoint。
通过 Logstash,详情参考 Sending data from Logstash to Devo。示例如下:
input {kafka {# cls kafka 协议消费控制台给出的的主题名称,例如 XXXXXX-633a268c-XXXX-4a4c-XXXX-7a9a1a7baXXXX,可在控制台复制topics => "您的消费主题"# 服务地址+端口,外网端口9096,内网端口9095,例子是内网消费,请根据您的实际情况填写bootstrap_servers => "kafkaconsumer-${region}.cls.tencentyun.com:9095"group_id => "您的消费组名称"security_protocol => "SASL_PLAINTEXT"sasl_mechanism => "PLAIN"# 用户名是日志集合 ID,例如 ca5cXXXXdd2e-4ac0af12-92d4b677d2c6# 密码是用户的 SecretId#SecretKey 组合的字符串,例如 AKID********************************#XXXXuXtymIXT0Lac,注意不要丢失#。建议使用子账号密钥,主账号为子账号授权时,遵循最小权限原则,即子账号的访问策略中的 action、resource 都配置为最小范围,可以满足操作即可。sasl_jaas_config => "org.apache.kafka.common.security.plain.PlainLoginModule required username='${logsetID}' password='${SecretId}#${SecretKey}';"}}output {syslog {facility => "local7"host => "ecollector-eu.devo.io"port => "443"severity => "informational"appname => "my.app.oracle.test"protocol => "ssl-tcp"ssl_cert => "/Users/Bob/logstash/ca/domain.crt"ssl_key => "/Users/Bob/logstash/ca/domain.key"ssl_cacert => "/Users/Bob/logstash/ca/chain.crt"codec => line {format => "%{name},%{id}"}}}