日志服务(Cloud Log Service,CLS)目前已支持使用 Kafka Producer SDK 和其他 Kafka 相关 agent 上传日志到 CLS。
使用场景
日志应用中使用 Kafka 作为消息管道是非常普遍的场景。如通过机器上的开源采集客户端或者使用 producer 直接写入日志,再通过消费管道提供给下游如 spark、flink 等进行消费。CLS 具备完整的 Kafka 数据管道上下行能力,以下主要介绍哪些场景适合您使用 Kafka 协议上传日志,更多 Kafka 协议消费场景请参见 Kafka 协议实时消费。
场景1:您已有基于开源采集的自建系统,不希望有复杂的二次改造,您可以通过修改配置文件将日志上传到 CLS。
例如,您之前使用 ELK 搭建日志系统的客户,现在只需要通过修改 Filebeat 或者 Logstash 的配置文件,将 Output 配置(详情请参见 filebeat 配置)到 CLS,即可非常方便简洁地将日志上传。
场景2:您希望通过 Kafka producer SDK 来采集日志并上传,不必再安装采集 Agent。
CLS 支持您使用各类 Kafka producer SDK 采集日志,并通过 Kafka 协议上传到 CLS。(详情请参见本文提供的 SDK 调用示例 )
相关限制
支持 Kafka 协议版本为:0.11.0.X,1.0.X,1.1.X,2.0.X,2.1.X,2.2.X,2.3.X,2.4.X,2.5.X,2.6.X,2.7.X,2.8.X。
支持压缩方式:gzip,snappy,lz4。
当前使用 SASL_PLAINTEXT 认证。
使用 Kafka 协议上传需要配置 RealtimeProducer 权限,详情请参见 CLS 访问策略模板。
配置方式
使用 Kafka 协议上传日志至 CLS Kafka 生产端时,需要配置下 CLS Kafka 访问信息:
参数 | 说明 |
鉴权机制 | 当前支持 SASL_PLAINTEXT |
hosts | |
topic | CLS Kafka topic 名称,配置为日志主题 ID。例如:76c63473-c496-466b-XXXX-XXXXXXXXXXXX |
username | CLS Kafka 访问用户名,配置为日志集 ID。例如:0f8e4b82-8adb-47b1-XXXX-XXXXXXXXXXXX |
password | CLS Kafka 访问密码,格式为 ${SecurityId}#${SecurityKey} 。例如:XXXXXXXXXXXXXX#YYYYYYYY。密钥信息获取请前往 密钥获取。请确保密钥关联的账号具有相应的 Kafka 协议上传日志权限。若要匿名写入,格式为 topic_id#${日志主题 ID}。例如:topic_id#76c63473-c496-466b-XXXX-XXXX。 |
header | 定义在通过 Kafka 协议上传时日志的解析行为。 json_remove_escape:对日志进行 JSON 解析时是否进行去转义,取值为 true、false,若不指定默认为 false。 time_key:日志中的指定时间字段,表示选择日志中的指定字段作为日志采集时间。 time_format:当配置了 time_key,需额外配置指定字段的时间解析格式,详情请参考 配置时间格式。 |
CLS Kafka 地址
通过 Kafka 协议上传日志时,基于访问方式(内网/外网)以及目标日志主题所在地域,CLS Kafka 地址配置为如下:
说明:
1. 地域缩写可参考 可用域名 - Kafka上传日志。
2. 访问方式:
内网访问:数据发起端为腾讯云服务器且服务器所在地域与目标日志主题一致
外网访问:数据发起端为非腾讯云服务器或者服务器所在地域与目标日志主题不一致
访问方式 | CLS Kafka 地址 |
内网 | ${地域缩写}-producer.cls.tencentyun.com:9095 |
外网 | ${地域缩写}-producer.cls.tencentcs.com:9096 |
示例
Beat 调用示例
filebeat/winlogbeat 配置
output.kafka:enabled: truehosts: ["${region}-producer.cls.tencentyun.com:9095"] # TODO 服务地址;外网端口9096,内网端口9095topic: "${ClsTopicID}" # TODO 日志主题IDversion: "0.11.0.2"compression: "${compress}" # TODO 配置压缩方式,支持gzip,snappy,lz4;例如"lz4"username: "${ClslogsetID}" # TODO 日志集ID# 若要匿名写入,password: "topic_id#${日志主题 ID}"password: "${SecurityId}#${SecurityKey}"
Logstash 调用示例
logstash 配置
output {kafka {topic_id => "${ClstopicID}"bootstrap_servers => "${region}-producer.cls.tencentyun.com:${port}"sasl_mechanism => "PLAIN"security_protocol => "SASL_PLAINTEXT"compression_type => "${compress}"# 若要匿名写入,password='topic_id#${日志主题 ID}'sasl_jaas_config => "org.apache.kafka.common.security.plain.PlainLoginModule required username='${ClslogsetID}' password='${securityID}#${securityKEY}';"codec => json}}
FluentTD 调用示例
注意:
该 demo 基于 fluentd-1.15.3 版本验证;依赖的 ruby 版本 ruby=2.7.6。
FluentTD 配置
<match *>@type rdkafka2# brokers setting# TODO 域名参考 https://cloud.tencent.com/document/product/614/18940,注意内网端口9095,公网端口9096brokers "${domain}:${port}" # 例如 gz-producer.cls.tencentyun.com:9095# topic settings# TODO 替换日志主题IDtopic "${topic_id}"# saslrdkafka_options {"sasl.mechanism": "PLAIN","security.protocol": "sasl_plaintext",# TODO topic所属日志集ID"sasl.username": "${logset_id}",# TODO topic所属uin的密钥,格式为 ${secret_id}#${secret_key};若要匿名写入,格式为 topic_id#${日志主题 ID}"sasl.password": "${secret_id}#${secret_key}"}required_acks 1compression_codec gzip<format>@type json</format><buffer tag>flush_at_shutdown trueflush_mode intervalflush_interval 1schunk_limit_size 3MBchunk_full_threshold 1total_limit_size 1024MBoverflow_action block</buffer></match>
FluentBit 调用示例
FluentBit 配置
[OUTPUT]Name kafkaMatch *# TODO 域名参考 https://cloud.tencent.com/document/product/614/18940,注意内网端口9095,公网端口9096Brokers ${domain}:${port} # 例如 gz-producer.cls.tencentyun.com:9095# TODO 替换日志主题IDTopics ${topic_id}# TODO 请求消息的最大大小,最大不能超过5Mrdkafka.message.max.bytes 5242880rdkafka.sasl.mechanisms PLAINrdkafka.security.protocol sasl_plaintext# TODO 根据使用场景选择acks的值rdkafka.acks 1# TODO 配置压缩方式rdkafka.compression.codec lz4# TODO topic所属日志集IDrdkafka.sasl.username ${logset_id}# TODO topic所属uin的密钥,格式为 ${secret_id}#${secret_key};若要匿名写入,格式为 topic_id#${日志主题 ID}rdkafka.sasl.password ${secret_id}#${secret_key}
SDK 调用示例
Golang SDK 调用示例
以下以 sarama.V1_1_0_0为例,其他版本按照类似规则配置:
import ("fmt""github.com/Shopify/sarama")func main() {config := sarama.NewConfig()config.Net.SASL.Mechanism = "PLAIN"config.Net.SASL.Version = int16(1)config.Net.SASL.Enable = true// TODO 日志集 IDconfig.Net.SASL.User = "${logsetID}"// TODO 格式为 ${SecurityId}#${SecurityKey},若要匿名写入,格式为 topic_id#${日志主题 ID}config.Net.SASL.Password = "${SecurityId}#${SecurityKey}"config.Producer.Return.Successes = true// TODO 根据使用场景选择acks的值config.Producer.RequiredAcks = ${acks}config.Version = sarama.V1_1_0_0// TODO 配置压缩方式config.Producer.Compression = ${compress}// TODO 服务地址;外网端口9096,内网端口9095producer, err := sarama.NewSyncProducer([]string{"${region}-producer.cls.tencentyun.com:9095"}, config)if err != nil {panic(err)}msg := &sarama.ProducerMessage{Topic: "${topicID}", // TODO 日志主题IDValue: sarama.StringEncoder("goland sdk sender demo"),}// 发送消息for i := 0; i <= 5; i++ {partition, offset, err := producer.SendMessage(msg)if err != nil {panic(err)}fmt.Printf("send response; partition:%d, offset:%d\\n", partition, offset)}_ = producer.Close()}
Python SDK 调用示例
from kafka import KafkaProducerif __name__ == '__main__':produce = KafkaProducer(# TODO 服务地址;外网端口9096,内网端口9095bootstrap_servers=["${region}-producer.cls.tencentyun.com:9095"],security_protocol='SASL_PLAINTEXT',sasl_mechanism='PLAIN',# TODO 日志集 IDsasl_plain_username='${logsetID}',# TODO 格式为 ${SecurityId}#${SecurityKey}, 若要匿名写入,格式为 topic_id#${日志主题 ID}sasl_plain_password='${SecurityId}#${SecurityKey}',api_version=(0, 11, 0),# TODO 配置压缩方式compression_type="${compress_type}",)for i in range(0, 5):# 发送消息 TODO 日志主题IDfuture = produce.send(topic="${topicID}", value=b'python sdk sender demo')result = future.get(timeout=10)print(result)
Java SDK 调用示例
maven 依赖:
<dependencies><!--https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients--><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>0.11.0.2</version></dependency></dependencies>
代码示例:
import org.apache.kafka.clients.producer.*;import java.util.Properties;import java.util.concurrent.ExecutionException;import java.util.concurrent.Future;import java.util.concurrent.TimeUnit;import java.util.concurrent.TimeoutException;public class ProducerDemo {public static void main(String[] args) throws InterruptedException, ExecutionException, TimeoutException {// 0.配置一系列参数Properties props = new Properties();// TODO 使用时props.put("bootstrap.servers", "${region}-producer.cls.tencentyun.com:9095");// TODO 以下值根据业务场景设置props.put("acks", ${acks});props.put("retries", ${retries});props.put("batch.size", ${batch.size});props.put("linger.ms", ${linger.ms});props.put("buffer.memory", ${buffer.memory});props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "${compress_type}"); // TODO 配置压缩方式props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("security.protocol", "SASL_PLAINTEXT");props.put("sasl.mechanism", "PLAIN");// TODO 用户名为logsetID;密码为securityID和securityKEY的组合securityID#securityKEY,格式为 ${SecurityId}#${SecurityKey},// 若要匿名写入,密码为 topic_id#${日志主题 ID}props.put("sasl.jaas.config","org.apache.kafka.common.security.plain.PlainLoginModule required username='${logsetID}' password='${SecurityId}#${SecurityKey}';");// 1.创建一个生产者对象Producer<String, String> producer = new KafkaProducer<String, String>(props);// 2.调用send方法 TODO 日志主题IDFuture<RecordMetadata> meta = producer.send(new ProducerRecord<String, String>("${topicID}", ${message}));RecordMetadata recordMetadata = meta.get(${timeout}, TimeUnit.MILLISECONDS);System.out.println("offset = " + recordMetadata.offset());// 3.关闭生产者producer.close();}}
C SDK 调用示例
// https://github.com/edenhill/librdkafka - master#include <iostream>#include <librdkafka/rdkafka.h>#include <string>#include <unistd.h>#define BOOTSTRAP_SERVER "${region}-producer.cls.tencentyun.com:${port}"// USERNAME 为日志集 ID#define USERNAME "${logsetID}"// PASSWORD 格式为 ${SecurityId}#${SecurityKey}, 若要匿名写入,格式为 topic_id#${日志主题 ID}#define PASSWORD "${SecurityId}#${SecurityKey}"// 日志主题ID#define TOPIC "${topicID}"#define ACKS "${acks}"// 配置压缩方式#define COMPRESS_TYPE "${compress_type}"static void dr_msg_cb(rd_kafka_t *rk, const rd_kafka_message_t *rkmessage, void *opaque) {if (rkmessage->err) {fprintf(stdout, "%% Message delivery failed : %s\\n", rd_kafka_err2str(rkmessage->err));} else {fprintf(stdout, "%% Message delivery successful %zu:%d\\n", rkmessage->len, rkmessage->partition);}}int main(int argc, char **argv) {// 1. 初始化配置rd_kafka_conf_t *conf = rd_kafka_conf_new();rd_kafka_conf_set_dr_msg_cb(conf, dr_msg_cb);char errstr[512];if (rd_kafka_conf_set(conf, "bootstrap.servers", BOOTSTRAP_SERVER, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {rd_kafka_conf_destroy(conf);fprintf(stdout, "%s\\n", errstr);return -1;}if (rd_kafka_conf_set(conf, "acks", ACKS, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {rd_kafka_conf_destroy(conf);fprintf(stdout, "%s\\n", errstr);return -1;}if (rd_kafka_conf_set(conf, "compression.codec", COMPRESS_TYPE, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {rd_kafka_conf_destroy(conf);fprintf(stdout, "%s\\n", errstr);return -1;}// 设置认证方式if (rd_kafka_conf_set(conf, "security.protocol", "sasl_plaintext", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {rd_kafka_conf_destroy(conf);fprintf(stdout, "%s\\n", errstr);return -1;}if (rd_kafka_conf_set(conf, "sasl.mechanisms", "PLAIN", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {rd_kafka_conf_destroy(conf);fprintf(stdout, "%s\\n", errstr);return -1;}if (rd_kafka_conf_set(conf, "sasl.username", USERNAME, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {rd_kafka_conf_destroy(conf);fprintf(stdout, "%s\\n", errstr);return -1;}if (rd_kafka_conf_set(conf, "sasl.password", PASSWORD, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {rd_kafka_conf_destroy(conf);fprintf(stdout, "%s\\n", errstr);return -1;}// 2. 创建 handlerrd_kafka_t *rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr));if (!rk) {rd_kafka_conf_destroy(conf);fprintf(stdout, "create produce handler failed: %s\\n", errstr);return -1;}// 3. 发送数据std::string value = "test lib kafka ---- ";for (int i = 0; i < 100; ++i) {retry:rd_kafka_resp_err_t err = rd_kafka_producev(rk, RD_KAFKA_V_TOPIC(TOPIC),RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY),RD_KAFKA_V_VALUE((void *) value.c_str(), value.size()),RD_KAFKA_V_OPAQUE(nullptr), RD_KAFKA_V_END);if (err) {fprintf(stdout, "Failed to produce to topic : %s, error : %s", TOPIC, rd_kafka_err2str(err));if (err == RD_KAFKA_RESP_ERR__QUEUE_FULL) {rd_kafka_poll(rk, 1000);goto retry;}} else {fprintf(stdout, "send message to topic successful : %s\\n", TOPIC);}rd_kafka_poll(rk, 0);}std::cout << "message flush final" << std::endl;rd_kafka_flush(rk, 10 * 1000);if (rd_kafka_outq_len(rk) > 0) {fprintf(stdout, "%d message were not deliverer\\n", rd_kafka_outq_len(rk));}rd_kafka_destroy(rk);return 0;}
C# SDK 调用示例
/** 该demo只提供了最简单的使用方法,具体生产还需要结合调用放来实现* 在使用过程中,demo中留的todo项需要替换使用** 注意:* 1. 该Demo基于Confluent.Kafka/1.8.2版本验证通过* 2. MessageMaxBytes最大值不能超过5M* 3. 该demo使用同步的方式生产,在使用时也可根据业务场景调整为异步的方式* 4. 其他参数在使用过程中可以根据业务参考文档自己调整:https://docs.confluent.io/platform/current/clients/confluent-kafka-dotnet/_site/api/Confluent.Kafka.ProducerConfig.html** Confluent.Kafka 参考文档:https://docs.confluent.io/platform/current/clients/confluent-kafka-dotnet/_site/api/Confluent.Kafka.html*/using Confluent.Kafka;namespace Producer{class Producer{private static void Main(string[] args){var config = new ProducerConfig{// TODO 域名参考 https://cloud.tencent.com/document/product/614/18940// Kafka 填写,注意内网端口9095,公网端口9096BootstrapServers = "${domain}:${port}",SaslMechanism = SaslMechanism.Plain,// TODO topic所属日志集IDSaslUsername = "${logsetID}",// TODO topic所属uin的密钥,格式为 ${SecurityId}#${SecurityKey},// 若要匿名写入,格式为 topic_id#${日志主题 ID}SaslPassword = "${SecurityId}#${SecurityKey}",SecurityProtocol = SecurityProtocol.SaslPlaintext,// TODO 根据实际使用场景赋值。可取值: Acks.None、Acks.Leader、Acks.AllAcks = Acks.None,// TODO 请求消息的最大大小,最大不能超过5MMessageMaxBytes = 5242880};// deliveryHandlerAction<DeliveryReport<Null, string>> handler =r => Console.WriteLine(!r.Error.IsError ? $"Delivered message to {r.TopicPartitionOffset}" : $"Delivery Error: {r.Error.Reason}");using (var produce = new ProducerBuilder<Null, string>(config).Build()){try{// TODO 测试验证代码for (var i = 0; i < 100; i++){// TODO 替换日志主题IDproduce.Produce("${topicID}", new Message<Null, string> { Value = "C# demo value" }, handler);}produce.Flush(TimeSpan.FromSeconds(10));}catch (ProduceException<Null, string> pe){Console.WriteLine($"send message receiver error : {pe.Error.Reason}");}}}}}