用户可以通过提交例行导入作业,直接订阅 Kafka 中的消息数据,以近实时的方式进行数据同步。
Doris 自身能够保证不丢不重的订阅 Kafka 中的消息,即
Exactly-Once
消费语义。订阅 Kafka 消息
订阅 Kafka 消息使用了 Doris 中的例行导入(Routine Load)功能。
用户首先需要创建一个例行导入作业。作业会通过例行调度,不断地发送一系列的任务,每个任务会消费一定数量 Kafka 中的消息。
请注意以下使用限制:
1. 支持无认证的 Kafka 访问,以及通过 SSL 方式认证的 Kafka 集群。
2. 支持的消息格式如下:
csv 文本格式。每一个 message 为一行,且行尾不包含换行符。
Json 格式。
3. 仅支持 Kafka 0.10.0.0(含) 以上版本。
访问 SSL 认证的 Kafka 集群
例行导入功能支持无认证的 Kafka 集群,以及通过 SSL 认证的 Kafka 集群。
访问 SSL 认证的 Kafka 集群需要用户提供用于认证 Kafka Broker 公钥的证书文件(ca.pem)。如果 Kafka 集群同时开启了客户端认证,则还需提供客户端的公钥(client.pem)、密钥文件(client.key),以及密钥密码。这里所需的文件需要先通过
CREAE FILE
命令上传到 Plao 中,并且 catalog 名称为 kafka
。CREATE FILE
命令的具体帮助可以参见 CREATE FILE 命令手册。这里给出示例:上传文件
CREATE FILE "ca.pem" PROPERTIES("url" = "https://example_url/kafka-key/ca.pem", "catalog" = "kafka");CREATE FILE "client.key" PROPERTIES("url" = "https://example_urlkafka-key/client.key", "catalog" = "kafka");CREATE FILE "client.pem" PROPERTIES("url" = "https://example_url/kafka-key/client.pem", "catalog" = "kafka");
创建例行导入作业
1. 访问无认证的 Kafka 集群。
CREATE ROUTINE LOAD demo.my_first_routine_load_job ON test_1COLUMNS TERMINATED BY ","PROPERTIES("max_batch_interval" = "20","max_batch_rows" = "300000","max_batch_size" = "209715200",)FROM KAFKA("kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092","kafka_topic" = "my_topic","property.group.id" = "xxx","property.client.id" = "xxx","property.kafka_default_offsets" = "OFFSET_BEGINNING");
max_batch_interval/max_batch_rows/max_batch_size
用于控制一个子任务的运行周期。一个子任务的运行周期由最长运行时间、最多消费行数和最大消费数据量共同决定。2. 访问 SSL 认证的 Kafka 集群。
CREATE ROUTINE LOAD demo.my_first_routine_load_job ON test_1COLUMNS TERMINATED BY ",",PROPERTIES("max_batch_interval" = "20","max_batch_rows" = "300000","max_batch_size" = "209715200",)FROM KAFKA("kafka_broker_list"= "broker1:9091,broker2:9091","kafka_topic" = "my_topic","property.security.protocol" = "ssl","property.ssl.ca.location" = "FILE:ca.pem","property.ssl.certificate.location" = "FILE:client.pem","property.ssl.key.location" = "FILE:client.key","property.ssl.key.password" = "******");
查看导入作业状态
查看作业状态的具体命令和示例请参阅 SHOW ROUTINE LOAD 命令文档。
查看某个作业的任务运行状态的具体命令和示例请参阅 SHOW ROUTINE LOAD TASK 命令文档。
只能查看当前正在运行中的任务,已结束和未开始的任务无法查看。
修改作业属性
作业控制
用户可以通过
STOP/PAUSE/RESUME
三个命令来控制作业的停止,暂停和重启。
具体命令请参阅:STOP ROUTINE LOAD、 PAUSE ROUTINE LOAD 和 RESUME ROUTINE LOAD 命令文档。