有奖捉虫:办公协同&微信生态&物联网文档专题 HOT

简介

Apache Kafka 连接器用于连接 Kafka 集群和腾讯云数据连接器集群,可消费 Kafka 集群的消息并作为集成流的 trigger,也可将集成流的消息生产发布到 Kafka 集群中。
Apache Kafka 是一个分布式发布-订阅消息传递系统,kafka 有以下一些基本概念:
Producer:消息生产者,即向 kafka broker 发布消息的客户端。
Consumer:消息消费者,负责消费 Kafka 服务器上的消息。
Topic:主题,用于建立 Producer 和 Consumer 之间的订阅关系。
Partition:消息分区,一个 topic 可以分为多个 partition,每个 partition 是一个有序的队列,partition 中的每条消息都会被分配一个有序的 ID(offset)。
Broker:一台 kafka 服务器即为一个 broker,一个集群由多个 broker 组成,一个 broker 可以容纳多个 topic。
Consumer Group:消费者分组,用于归组同类消费者,每个 consumer 属于一个特定的 consumer group,多个消费者可以共同消息一个 Topic 下的消息,每个消费者消费其中的部分消息,这些消费者就组成一个分组,拥有同一个分组名称。
Offset:消息在 partition 中的偏移量,每一条消息在 partition 都有唯一的偏移量,消息者可以指定偏移量来指定要消费的消息。
Producer、Consumer、Broker 关系如下图:


Broker 保存消息数据,和 topic、partition 关系示意图如下:


同一 topic 的各 partition 逻辑关系如下图,kafka 仅保证同一 partition 下的消息有序。
img



连接器配置

通用配置标签页-连接配置
通用配置标签页-消费者配置
高级配置标签页-消费者组配置
高级配置标签页-生产者配置
参数
数据类型
描述
是否必填
默认值
集群地址
string
Apache Kafka 集群地址,配置的格式:ip:port,ip:port

集群 Kafka 版本
enum
选择 Kafka 集群的版本号
1.1
SASL 安全认证模式
enum
选择连接到 Kafka 集群时的安全认证模式
PlainText
SASL 用户名
string
SASL/Plain 和 SASL/SCRAM 安全认证模式下的用户名

SASL 密码
string
SASL/Plain 和 SASL/SCRAM 安全认证模式下的密码

SASL/SCRAM 加密类型
enum
SASL/SCRAM 安全认证模式下的加密类型

使能 TLS 安全传输协议
bool
是否使用 TLS 加密和 Kafka 集群间的连接
false
TLS 客户端证书
file
可选,使用提供的证书对连接进行加密,仅当使能 TLS 安全传输协议设置为 True 才可配置

TLS 客户端Key
file
可选,使用提供的证书对连接进行加密,需和客户端证书同时提供,仅当使能 TLS 安全传输协议设置为 True 才可配置

TLS 服务端证书
file
可选,使用提供的证书对连接进行加密,仅当使能TLS安全传输协议设置为 True 才可配置




参数
数据类型
描述
是否必填
默认值
消费者组
string
消费时的所属的消费者组名称

主题
string
订阅的主题列表,支持正则表达式

消费者数量
int
消费者组内的消费者数量,必须小于主题的 partition 数量
1




参数
数据类型
描述
是否必填
默认值
消息确认模式
enum
消费消息后,在 Kafka 上确认 Offset 的模式,有两种:

消费后直到触发的流成功结束后才确认 Offset
消费后立即确认 Offset
流运行成功后确认
隔离级别
enum
Kafka 的事务隔离级别,可选 Read Uncommitted 和 Read committed
Read Uncommitted
重试阻塞时间
int
重试阻塞时间(毫秒),避免在失败场景下以紧密循环的方式重复发送请求
2000
请求超时时间
int
请求超时时间(毫秒)
30000
会话超时时间
int
会话超时时间(毫秒)
10000
心跳时间
int
心跳时间(毫秒)
3000
Rebalance 检测频率
int
Rebalance 检测频率(毫秒),Kafka 消费者组重平衡检测的间隔时间
60000
Rebalance 策略
enum
Rebalance 策略,按区间划分(Range)和轮询(RoundRobin)两种策略
Range
Rebalance 最大重试次数
int
Rebalance 最大重试次数
4
Rebalance 重试阻塞时间
int
Rebalance 重试阻塞时间(毫秒)
2000
最小拉取数据量
int
最小拉取数据量(字节)
1
默认拉取数据量
int
默认拉取数据量(字节) ,默认为1048756(1024*1024)
1048576
最大拉取数据量
int
最大拉取数据量(兆),设置为0时无限制
0
拉取阻塞时间
int
拉取阻塞时间(毫秒)
250
提交频率
int
提交频率(毫秒)
1000
初始偏移量
enum
初始偏移量,取值为 Offset Newest 和 Offset Latest
Offset Newest
偏移量保留的时间
int
偏移量保留的时间(秒),0表示不过期
0

image-20210426173044144


参数
数据类型
描述
是否必填
默认值
最大消息限制
int
最大消息限制(字节),单条消息允许发送的最大数据量
1000000
确认模式
enum
确认模式,可选值为:None、LeaderOnly、All。其中 None 为不需确认,LeaderOnly 仅需 Leader 确认,All 需要 Leader 及所有 follower 都确认
LeaderOnly
请求超时时间
int
请求超时时间
10000
幂等性使能开关
bool
幂等性使能开关,设置为 true 时,打开 kafka 幂等性功能,实现 Exactly-Once 语义
false
消息刷新到磁盘的数据量
int
消息刷新到磁盘的数据量(字节),设置为0时不设置该参数
0
消息刷新到磁盘的消息数量
int
消息刷新到磁盘的消息数量,设置为0时不设置该参数
0
消息刷新到磁盘的间隔时间
int
消息刷新到磁盘的间隔时间(毫秒),设置为0时不设置该参数
0
消息刷新的最大消息量
int
消息刷新的最大消息量,设置为0时不设置该参数
0




操作配置

Apache Kafka 连接器包含 Consumer 和 Publish 两种操作。

Consumer 操作

输入参数

None,Consumer 操作没有输入参数。
image-20210426173307712



输出

Consumer 操作执行成功后,输出结果会保存在 message 消息体的 payload;执行失败后,错误信息会保存在 message 消息体的 error。
组件输出的 message 信息如下:
message 属性
payload
执行成功后,payload 为 byte 数组
error
执行成功后,error 为空;执行失败后,error 为 dict 类型,包含“Code”和“Description”字段:“Code”字段表示错误类型,“Description”字段表示错误具体信息
attribute
执行成功后,attribute 为 dict,属性列表见下表
variable
继承上个组件的 variable 信息
atrrbites 属性列表:
atrrbites 属性
数据类型
描述
topic
string
消息所属的 topic
partition
int32
消息的 partition
offset
int64
消息的 offset
例如:执行成功后,message payload 值为 byte 数组,message attribute 值如下:
{
"topic": "test-topic",
"partition": 1,
"offset": 4
}
执行失败后,message error 值如下:
{
"Code": "CORE:RUNTIME",
"Description": "kafka: client has run out of available brokers to talk to (Is your cluster reachable?)"
}

案例

1. 按照 Kafka 集群信息,填写 kafka 连接器配置。


2. 新建流,将 kafka consumer 作为 trigger 节点。


3. 添加其他业务逻辑,处理消费到的数据,例如:增加 amqp publish,将消费到的消息发布到 rabbitMQ。


4. 若配置正确,流发布后,可以看到 kafka 中的流被正确消费,且被发布到 rabbitMQ 中。

Publish 操作

输入参数

通用配置
参数
数据类型
说明
是否必填
默认值
要写入的 Kafka 主题名称
string
写入的 kafka 主题

要写入的 Kafka 消息内容
string或byte
待发布的 Kafka 消息内容




高级配置
参数
数据类型
说明
是否必填
默认值
目标 partition
string
指定写入的 partition

key
string
key 相同的 message 会写入到同一 partition

header
string
header的 JSON 字符串





输出

Publish 操作执行成功后,输出属性会保存在 message 消息体的 attribute;执行失败后,错误信息会保存在 message 消息体的 error。
组件输出的 message 信息如下:
message 属性
payload
继承上个组件的 payload 信息
error
执行成功后,error 为空;执行失败后,error 为 dict 类型,包含“Code”和“Description”字段:“Code”字段表示错误类型,“Description”字段表示错误具体信息
attribute
执行成功后,attribute 为 dict,属性列表见下
variable
继承上个组件的 variable 信息
atrrbites 属性列表:
atrrbites 属性
数据类型
描述
topic
string
消息所属的 topic
partition
int32
消息的 partition
offset
int64
消息的 offset
例如:执行成功后,message attribute 值如下:
{
"topic": "test-topic",
"partition": 1,
"offset": 4
}
执行失败后,message error 值如下:
{
"Code": "CORE:RUNTIME",
"Description": "kafka: client has run out of available brokers to talk to (Is your cluster reachable?)"
}

案例

1. 按照 Kafka 集群信息,填写 kafka 连接器配置。


2. 新建流,添加 trigger,例如:HTTP listener。


3. 在流中增加 kafka publish 操作,填写消息内容,将消息发布到 Kafka 集群中。


4. 发布流,触发流后,若连接参数及消息参数配置正确,消息将成功投递到所配置的 Kafka 集群的对应队列中。