Kafka Connect 旨在通过将数据移入和移出 Kafka 进行标准化,以更轻松地构建大规模的实时数据管道。我们可以使用 Kafka Connector 读取或写入外部系统、管理数据流以及扩展系统,所有这些都无需开发新代码。Kafka Connect 管理与其他系统连接时的所有常见问题(Schema 管理、容错、并行性、延迟、投递语义等),每个 Connector 只关注如何在目标系统和 Kafka 之间复制数据。
如果有对 Kafka Connect 不了解的,可以参考Kafka Connect 构建大规模低延迟的数据管道
Kafka Connect 是与 Apache Kafka 一起发布的,所以没有必要单独安装,对于生产使用,特别是计划使用 Connect 移动大量数据或运行多个 Connector 时,应该在单独的服务器上运行 Connect。在这种情况下,所有的机器上安装 Apache Kafka,并在部分服务器上启动 broker,然后在其他服务器上启动 Connect。Kafka Connect 目前支持两种执行模式:Standalone 模式和分布式模式。
在 Standalone 模式下,所有的工作都在单个进程中完成。这种模式更容易配置以及入门,但不能充分利用 Kafka Connect 的某些重要功能,例如,容错。我们可以使用如下命令启动 Standalone 进程:
bin/connect-standalone.sh config/connect-standalone.properties connector1.properties [connector2.properties ...]
第一个参数 config/connect-standalone.properties 是 worker 的配置。这其中包括 Kafka 连接参数、序列化格式以及提交 Offset 的频率等配置:
bootstrap.servers=localhost:9092
key.converter.schemas.enable=true
value.converter.schemas.enable=true
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
上述提供的默认配置适用于使用 config/server.properties 提供的默认配置运行的本地集群。如果使用不同配置或者在生产部署,那就需要对默认配置做调整。但无论怎样,所有 Worker(独立的和分布式的)都需要一些配置:
这些配置参数可以让 Kafka Connect 的生产者和消费者访问配置、Offset 和状态 Topic。配置 Kafka Source 任务使用的生产者和 Kafka Sink 任务使用的消费者,可以使用相同的参数,但需要分别加上 ‘producer.’ 和 ‘consumer.’ 前缀。bootstrap.servers 是唯一不需要添加前缀的 Kafka 客户端参数。
分布式模式可以自动平衡工作负载,并可以动态扩展(或缩减)以及提供容错。分布式模式的执行与 Standalone 模式非常相似:
bin/connect-distributed.sh config/connect-distributed.properties
不同之处在于启动的脚本以及配置参数。在分布式模式下,使用 connect-distributed.sh 来代替 connect-standalone.sh。第一个 worker 配置参数使用的是 config/connect-distributed.properties 配置文件:
bootstrap.servers=localhost:9092
group.id=connect-cluster
key.converter.schemas.enable=true
value.converter.schemas.enable=true
offset.storage.topic=connect-offsets
offset.storage.replication.factor=1
#offset.storage.partitions=25
config.storage.topic=connect-configs
config.storage.replication.factor=1
status.storage.topic=connect-status
status.storage.replication.factor=1
#status.storage.partitions=5
offset.flush.interval.ms=10000
Kafka Connect 将 Offset、配置以及任务状态存储在 Kafka Topic 中。建议手动创建 Offset、配置和状态的 Topic,以达到所需的分区数和复制因子。如果在启动 Kafka Connect 时尚未创建 Topic,将使用默认分区数和复制因子来自动创建 Topic,这可能不适合我们的应用。在启动集群之前配置如下参数至关重要:
Connector 配置是简单的键值对。对于 Standalone 模式,配置参数在配置文件中定义并通过命令行传递给 Connect 进程。但在分布式模式下,需要使用 REST API 来提交 Connector 配置,来请求创建或者修改 Connector。如下是文件 Source Connector 的示例:
name=local-file-source
connector.class=FileStreamSource
tasks.max=1
file=test.txt
大多数配置都依赖于具体的 Connector,因此无法在此处进行概述。但是,有一些常见的配置参数:
connector.class 配置支持多种格式:Connector 类的全名或别名。对于文件 Source Connector,我们可以全名 org.apache.kafka.connect.file.FileStreamSinkConnector,也可以使用简写的 FileStreamSink 或 FileStreamSinkConnector。Sink Connector 还需要一些其他的参数来控制输入。每个 Sink Connector 都必须设置如下参数:
启动 Connect 进程与启动 broker 进程差不多,在调用脚本时传入一个配置文件即可,如下使用分布式执行模式来启动 Connect:
bin/connect-distributed.sh config/connect-distributed.properties &
我们一般通过 Connect 的 REST API 来配置和监控 rest.host.name 和 rest.port。你可以为 REST API 指定特定的端口:
rest.port=9083
默认端口号为 8083,在这里我们为了防止端口号冲突,特意修改为 9083。
启动 Worker 集群之后,可以通过 REST API 来验证它们是否正常运行:
localhost:script wy$ curl http://localhost:9083/
{"version":"2.4.0","commit":"77a89fcf8d7fa018","kafka_cluster_id":"jNjfTPnOTHOYxyaafsGU6A"}
这个 REST API 会返回当前 Connect 的版本号。我们运行的是 Kafka 2.4.0 版本。我们还可以检查已经安装好的 Connector 插件:
localhost:script wy$ curl http://localhost:9083/connector-plugins
[{"class":"org.apache.kafka.connect.file.FileStreamSinkConnector","type":"sink","version":"2.4.0"},{"class":"org.apache.kafka.connect.file.FileStreamSourceConnector","type":"source","version":"2.4.0"},{"class":"org.apache.kafka.connect.mirror.MirrorCheckpointConnector","type":"source","version":"1"},{"class":"org.apache.kafka.connect.mirror.MirrorHeartbeatConnector","type":"source","version":"1"},{"class":"org.apache.kafka.connect.mirror.MirrorSourceConnector","type":"source","version":"1"}]
在这里,我们使用 Kafka 自带的文件连接器(FileStreamSource、FileStreamSink)来演示如何将一个文件发送到 Kafka Topic 上,再从 Kafka Topic 导出到文件中。
如下所示创建一个文件 Source Connector。在这里,直接让它读取我们创建的 a.txt 文件,即把 a.txt 文件发送到 Topic 上:
echo '{"name":"file-source-connector", "config":{"connector.class":"FileStreamSource","file":"a.txt","topic":"file-connector-topic"}}' | curl -X POST -d @- http://localhost:9083/connectors --header "content-Type:application/json"
上述命令使用 Kafka Connect REST API ‘POST /connectors’ 创建一个新的 Connector,请求是一个 JSON 对象,其中包含一个字符串名称字段 name 以及一个带有 Connector 配置参数的对象配置字段 config。我们通过 echo 命令把 JSON 内容发送给 REST API。从 JSON 中我们可以知道:
a.txt 文件内容如下:
1
2
3
4
5
6
7
8
创建完 Connector 之后,我们通过如下命令查看目前已经创建的 Connector:
curl -X GET http://localhost:9083/connectors
可以通过该命令删除对应的 Connector:curl -X DELETE http://localhost:9083/connectors/
下面通过 Kafka 的控制台消费者来验证指定的文件是否已经加载到 Topic 中:
bin/kafka-console-consumer.sh --topic file-connector-topic --from-beginning --bootstrap-server localhost:9092
如果一切正常,可以看到如下输出:
以上输出的是 a.txt 文件的内容,这些内容被一行一行的转成 JSON 记录,并被 Connector 发送到 file-connector-topic Topic 上。默认情况下,JSON 转换器会在每一条记录上附带上 schema 信息。payload 列对应了文件里的一行内容。
文件已经发送到 Kafka Topic 上了,现在使用文件 Sink Connector 再把 Topic 里的内容导出到 a-backup.txt 文件中。导出的文件应该与原始文件 a.txt 的内容完全一样,JSON转换器会把每个 JSON 记录转成单行文本:
echo '{"name":"file-sink-connector", "config":{"connector.class":"FileStreamSink","file":"a-backup.txt","topics":"file-connector-topic"}}' | curl -X POST -d @- http://localhost:9083/connectors --header "content-Type:application/json"
上述命令还是使用 Kafka Connect REST API ‘POST /connectors’ 创建一个新的 Connector,请求同样是一个 JSON 对象,其中有几个配置参数发生了变化,connector.class 使用 FileStreamSink,而不是 FileStreamSource;file 参数指向目标文件,而不是原始文件;我们使用 topics,而不是 topic 来指定读取的 Topic。如果一切正常,我们会得到一个名为 a-backup.txt 的文件。
参考: