专栏首页SmartSiKafka Connect 如何构建实时数据管道

Kafka Connect 如何构建实时数据管道

Kafka Connect 旨在通过将数据移入和移出 Kafka 进行标准化,以更轻松地构建大规模的实时数据管道。我们可以使用 Kafka Connector 读取或写入外部系统、管理数据流以及扩展系统,所有这些都无需开发新代码。Kafka Connect 管理与其他系统连接时的所有常见问题(Schema 管理、容错、并行性、延迟、投递语义等),每个 Connector 只关注如何在目标系统和 Kafka 之间复制数据。

如果有对 Kafka Connect 不了解的,可以参考Kafka Connect 构建大规模低延迟的数据管道

1. 执行模式

Kafka Connect 是与 Apache Kafka 一起发布的,所以没有必要单独安装,对于生产使用,特别是计划使用 Connect 移动大量数据或运行多个 Connector 时,应该在单独的服务器上运行 Connect。在这种情况下,所有的机器上安装 Apache Kafka,并在部分服务器上启动 broker,然后在其他服务器上启动 Connect。Kafka Connect 目前支持两种执行模式:Standalone 模式和分布式模式。

1.1 Standalone 模式

在 Standalone 模式下,所有的工作都在单个进程中完成。这种模式更容易配置以及入门,但不能充分利用 Kafka Connect 的某些重要功能,例如,容错。我们可以使用如下命令启动 Standalone 进程:

bin/connect-standalone.sh config/connect-standalone.properties connector1.properties [connector2.properties ...]

第一个参数 config/connect-standalone.properties 是 worker 的配置。这其中包括 Kafka 连接参数、序列化格式以及提交 Offset 的频率等配置:

bootstrap.servers=localhost:9092
key.converter.schemas.enable=true
value.converter.schemas.enable=true
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000

上述提供的默认配置适用于使用 config/server.properties 提供的默认配置运行的本地集群。如果使用不同配置或者在生产部署,那就需要对默认配置做调整。但无论怎样,所有 Worker(独立的和分布式的)都需要一些配置:

  • bootstrap.servers:该参数列出了将要与 Connect 协同工作的 broker 服务器,Connector 将会向这些 broker 写入数据或者从它们那里读取数据。你不需要指定集群的所有 broker,但是建议至少指定3个。
  • key.converter 和 value.converter:分别指定了消息键和消息值所使用的的转换器,用于在 Kafka Connect 格式和写入 Kafka 的序列化格式之间进行转换。这控制了写入 Kafka 或从 Kafka 读取的消息中键和值的格式。由于这与 Connector 没有任何关系,因此任何 Connector 可以与任何序列化格式一起使用。默认使用 Kafka 提供的 JSONConverter。有些转换器还包含了特定的配置参数。例如,通过将 key.converter.schemas.enable 设置成 true 或者 false 来指定 JSON 消息是否包含 schema。
  • offset.storage.file.filename:用于存储 Offset 数据的文件。

这些配置参数可以让 Kafka Connect 的生产者和消费者访问配置、Offset 和状态 Topic。配置 Kafka Source 任务使用的生产者和 Kafka Sink 任务使用的消费者,可以使用相同的参数,但需要分别加上 ‘producer.’ 和 ‘consumer.’ 前缀。bootstrap.servers 是唯一不需要添加前缀的 Kafka 客户端参数。

1.2 分布式模式

分布式模式可以自动平衡工作负载,并可以动态扩展(或缩减)以及提供容错。分布式模式的执行与 Standalone 模式非常相似:

bin/connect-distributed.sh config/connect-distributed.properties

不同之处在于启动的脚本以及配置参数。在分布式模式下,使用 connect-distributed.sh 来代替 connect-standalone.sh。第一个 worker 配置参数使用的是 config/connect-distributed.properties 配置文件:

bootstrap.servers=localhost:9092
group.id=connect-cluster
key.converter.schemas.enable=true
value.converter.schemas.enable=true
offset.storage.topic=connect-offsets
offset.storage.replication.factor=1
#offset.storage.partitions=25
config.storage.topic=connect-configs
config.storage.replication.factor=1
status.storage.topic=connect-status
status.storage.replication.factor=1
#status.storage.partitions=5
offset.flush.interval.ms=10000

Kafka Connect 将 Offset、配置以及任务状态存储在 Kafka Topic 中。建议手动创建 Offset、配置和状态的 Topic,以达到所需的分区数和复制因子。如果在启动 Kafka Connect 时尚未创建 Topic,将使用默认分区数和复制因子来自动创建 Topic,这可能不适合我们的应用。在启动集群之前配置如下参数至关重要:

  • group.id:Connect 集群的唯一名称,默认为 connect-cluster。具有相同 group id 的 worker 属于同一个 Connect 集群。需要注意的是这不能与消费者组 ID 冲突。
  • config.storage.topic:用于存储 Connector 和任务配置的 Topic,默认为 connect-configs。需要注意的是这是一个只有一个分区、高度复制、压缩的 Topic。我们可能需要手动创建 Topic 以确保配置的正确,因为自动创建的 Topic 可能有多个分区或自动配置为删除而不是压缩。
  • offset.storage.topic:用于存储 Offset 的 Topic,默认为 connect-offsets。这个 Topic 可以有多个分区。
  • status.storage.topic:用于存储状态的 Topic,默认为 connect-status。这个 Topic 可以有多个分区。

2. 配置 Connector

Connector 配置是简单的键值对。对于 Standalone 模式,配置参数在配置文件中定义并通过命令行传递给 Connect 进程。但在分布式模式下,需要使用 REST API 来提交 Connector 配置,来请求创建或者修改 Connector。如下是文件 Source Connector 的示例:

name=local-file-source
connector.class=FileStreamSource
tasks.max=1
file=test.txt

大多数配置都依赖于具体的 Connector,因此无法在此处进行概述。但是,有一些常见的配置参数:

  • name:Connector 的唯一名称。使用相同名称注册会失败。
  • connector.class:Connector 对应的的 Java 类。
  • tasks.max:应为此 Connector 创建的最大任务数。如果 Connector 无法达到这种并行度级别,可能会创建比配置要少的任务。

connector.class 配置支持多种格式:Connector 类的全名或别名。对于文件 Source Connector,我们可以全名 org.apache.kafka.connect.file.FileStreamSinkConnector,也可以使用简写的 FileStreamSink 或 FileStreamSinkConnector。Sink Connector 还需要一些其他的参数来控制输入。每个 Sink Connector 都必须设置如下参数:

  • topic:Connector 的输入 Topic,以逗号分隔的列表
  • topic.regex:Connector 输入 Topic 的 Java 正则表达式

3. 运行 Connect

启动 Connect 进程与启动 broker 进程差不多,在调用脚本时传入一个配置文件即可,如下使用分布式执行模式来启动 Connect:

bin/connect-distributed.sh config/connect-distributed.properties &

我们一般通过 Connect 的 REST API 来配置和监控 rest.host.name 和 rest.port。你可以为 REST API 指定特定的端口:

rest.port=9083

默认端口号为 8083,在这里我们为了防止端口号冲突,特意修改为 9083。

启动 Worker 集群之后,可以通过 REST API 来验证它们是否正常运行:

localhost:script wy$ curl http://localhost:9083/
{"version":"2.4.0","commit":"77a89fcf8d7fa018","kafka_cluster_id":"jNjfTPnOTHOYxyaafsGU6A"}

这个 REST API 会返回当前 Connect 的版本号。我们运行的是 Kafka 2.4.0 版本。我们还可以检查已经安装好的 Connector 插件:

localhost:script wy$ curl http://localhost:9083/connector-plugins
[{"class":"org.apache.kafka.connect.file.FileStreamSinkConnector","type":"sink","version":"2.4.0"},{"class":"org.apache.kafka.connect.file.FileStreamSourceConnector","type":"source","version":"2.4.0"},{"class":"org.apache.kafka.connect.mirror.MirrorCheckpointConnector","type":"source","version":"1"},{"class":"org.apache.kafka.connect.mirror.MirrorHeartbeatConnector","type":"source","version":"1"},{"class":"org.apache.kafka.connect.mirror.MirrorSourceConnector","type":"source","version":"1"}]

4. Connector 示例

在这里,我们使用 Kafka 自带的文件连接器(FileStreamSource、FileStreamSink)来演示如何将一个文件发送到 Kafka Topic 上,再从 Kafka Topic 导出到文件中。

如下所示创建一个文件 Source Connector。在这里,直接让它读取我们创建的 a.txt 文件,即把 a.txt 文件发送到 Topic 上:

echo '{"name":"file-source-connector", "config":{"connector.class":"FileStreamSource","file":"a.txt","topic":"file-connector-topic"}}' | curl -X POST -d @- http://localhost:9083/connectors --header "content-Type:application/json"

上述命令使用 Kafka Connect REST API ‘POST /connectors’ 创建一个新的 Connector,请求是一个 JSON 对象,其中包含一个字符串名称字段 name 以及一个带有 Connector 配置参数的对象配置字段 config。我们通过 echo 命令把 JSON 内容发送给 REST API。从 JSON 中我们可以知道:

  • Connector 名称: file-source-connector
  • Connector 类: FileStreamSource
  • 要加载的文件: a.txt
  • 把文件加载到 Kaffa 的 topic:file-connector-topic

a.txt 文件内容如下:

1
2
3
4
5
6
7
8

创建完 Connector 之后,我们通过如下命令查看目前已经创建的 Connector:

curl -X GET http://localhost:9083/connectors

可以通过该命令删除对应的 Connector:curl -X DELETE http://localhost:9083/connectors/

下面通过 Kafka 的控制台消费者来验证指定的文件是否已经加载到 Topic 中:

bin/kafka-console-consumer.sh --topic file-connector-topic --from-beginning --bootstrap-server localhost:9092

如果一切正常,可以看到如下输出:

以上输出的是 a.txt 文件的内容,这些内容被一行一行的转成 JSON 记录,并被 Connector 发送到 file-connector-topic Topic 上。默认情况下,JSON 转换器会在每一条记录上附带上 schema 信息。payload 列对应了文件里的一行内容。

文件已经发送到 Kafka Topic 上了,现在使用文件 Sink Connector 再把 Topic 里的内容导出到 a-backup.txt 文件中。导出的文件应该与原始文件 a.txt 的内容完全一样,JSON转换器会把每个 JSON 记录转成单行文本:

echo '{"name":"file-sink-connector", "config":{"connector.class":"FileStreamSink","file":"a-backup.txt","topics":"file-connector-topic"}}' | curl -X POST -d @- http://localhost:9083/connectors --header "content-Type:application/json"

上述命令还是使用 Kafka Connect REST API ‘POST /connectors’ 创建一个新的 Connector,请求同样是一个 JSON 对象,其中有几个配置参数发生了变化,connector.class 使用 FileStreamSink,而不是 FileStreamSource;file 参数指向目标文件,而不是原始文件;我们使用 topics,而不是 topic 来指定读取的 Topic。如果一切正常,我们会得到一个名为 a-backup.txt 的文件。

参考:

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • 07 Confluent_Kafka权威指南 第七章: 构建数据管道

    当人们讨论使用apache kafka构建数据管道时,他们通常会应用如下几个示例,第一个就是构建一个数据管道,Apache Kafka是其中的终点。丽日,从ka...

    冬天里的懒猫
  • Kafka Connect JDBC Source MySQL 全量同步

    从数据库获取数据到 Apache Kafka 无疑是 Kafka Connect 最流行的用例。Kafka Connect 提供了将数据导入和导出 Kafka ...

    smartsi
  • Kafka核心API——Connect API

    Kafka Connect是一个用于将数据流输入和输出Kafka的框架。Confluent平台附带了几个内置connector,可以使用这些connector进...

    端碗吹水
  • 替代Flume——Kafka Connect简介

    我们知道过去对于Kafka的定义是分布式,分区化的,带备份机制的日志提交服务。也就是一个分布式的消息队列,这也是他最常见的用法。但是Kafka不止于此,打开最新...

    用户6070864
  • 替代Flume——Kafka Connect简介

    我们看到Kafka最新的定义是:Apache Kafka® is a distributed streaming platform

    实时计算
  • Kafka能做什么?十分钟构建你的实时数据流管道

    本文将对Kafka做一个入门简介,并展示如何使用Kafka构建一个文本数据流管道。通过本文,读者可以了解一个流处理数据管道(Pipeline)的大致结构:数据生...

    PP鲁
  • Kafka Connect | 无缝结合Kafka构建高效ETL方案

    Kafka connect是Confluent公司(当时开发出Apache Kafka的核心团队成员出来创立的新公司)开发的confluent platform...

    大数据真好玩
  • Kafka Connect | 无缝结合Kafka构建高效ETL方案

    Kafka connect是Confluent公司(当时开发出Apache Kafka的核心团队成员出来创立的新公司)开发的confluent platform...

    Spark学习技巧
  • Kafka Connect | 无缝结合Kafka构建高效ETL方案

    很多同学可能没有接触过 Kafka Connect,大家要注意不是Connector。 Kafka Connect 是一款可扩展并且可靠地在 Apache Ka...

    王知无-import_bigdata
  • 如何使用 Flupy 构建数据处理管道

    经常使用 Linux 的同学,肯定对|这个符号不陌生,这个符号是 Linux 的管道符号,可以把左边的数据传递给右边。

    青南
  • Kafka 安装及快速入门

    介绍 官网:http://kafka.apache.org/ Apache Kafka是分布式发布-订阅消息系统。它最初由LinkedIn公司开发,之后成为Ap...

    zhisheng
  • 「首席看架构」CDC (捕获数据变化) Debezium 介绍

    Debezium是一个分布式平台,它将您现有的数据库转换为事件流,因此应用程序可以看到数据库中的每一个行级更改并立即做出响应。Debezium构建在Apache...

    首席架构师智库
  • Kafka 连接器使用与开发

    连接器作为 Kafka 的一部分,是随着 Kafka 系统一起发布的,无须独立安装。

    Se7en258
  • Mysql实时数据变更事件捕获kafka confluent之debezium

    如果你的后端应用数据存储使用的MySQL,项目中如果有这样的业务场景你会怎么做呢?

    XING辋
  • Kafka Connect JDBC Source MySQL 增量同步

    上一篇文章 Kafka Connect JDBC Source MySQL 全量同步 中,我们只是将整个表数据导入 Kafka。这对于获取数据快照很有用,但并不...

    smartsi
  • Aache Kafka 入门教程

      在 Kafka 中,客户端和服务器之间的通信是通过简单,高性能,语言无关的TCP协议完成的。此协议已版本化并保持与旧版本的向后兼容性。Kafka 提供 Ja...

    iMike
  • kafka连接器两种部署模式详解

    一 kafka Connector介绍 Kafka Connect是一个用于在Apache Kafka和其他系统之间进行可扩展和可靠数据流传输的工具。这使得快速...

    Spark学习技巧
  • 使用Kafka和ksqlDB构建和部署实时流处理ETL引擎

    在Koverhoop,我们正在保险,医疗保健,房地产和离线分析领域建立一系列大型项目。对于我们的多租户团体保险经纪平台klient.ca,我们将建立强大的搜索功...

    IT大咖说
  • Kafka生态

    Confluent提供了业界唯一的企业级事件流平台,Confluent Platform通过将来自多个源和位置的数据集成到公司的单个中央事件流平台中,可以轻松构...

    用户6969969

扫码关注云+社区

领取腾讯云代金券