前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Kafka系统之连接器(七)

Kafka系统之连接器(七)

作者头像
无涯WuYa
发布2022-01-18 16:32:10
3910
发布2022-01-18 16:32:10
举报
文章被收录于专栏:Python自动化测试

Kafka除了生产者和消费者的核心组件外,它的另外一个核心组件就是连接器,简单的可以把连接器理解为是Kafka系统与其他系统之间实现数据传输的通道。通过Kafka的连接器,可以把大量的数据移入到Kafka的系统,也可以把数据从Kafka的系统移出。具体如下显示:

依据如上,这样Kafka的连接器就完成了输入和输出的数据传输的管道。也就很好的理解了我们从第三方获取到海量的实时流的数据,通过生产者和消费者的模式写入到Kafka的系统,再经过连接器把数据最终存储到目标的可存储的数据库,比如Hbase等。基于如上,Kafka的连接器使用场景具体可以总结为:

1、Kafka作为一个连接的管道,把目标的数据写入到Kafka的系统,再通过Kafka的连接器把数据移出到目标的数据库

2、Kafka作为数据传输的中间介质,仅仅是数据传输的一个管道或者说是数据传输的中间介质而已。如日志文件的信息传输到Kafka的系统后,然后再从Kafka的系统把这些数据移出到ElasticSearch中进行存储并展示。通过Kafka的连接器,可以有效的把Kafka系统的生产者模式和消费者模式进行的整合,完成它的解耦。

启动Kafka系统的连接器可以通过两种方式来进行启动,一种方式是单机模式,另外一种的方式是分布式模式,这里主要是以单机模式来启动Kafka的连接器。在kafka/config的目录下配置连接器的信息,它的配置文件名称为:connect-file-source.properties,配置的内容为:

代码语言:javascript
复制
#设置连接器名称
name=local-file-source
#指定连接器类
connector.class=FileStreamSource
#设置最大任务数
tasks.max=1
#指定读取的文件
file=/tmp/source.txt
#指定主题名
topic=login

我们在该配置文件中制定了读取的文件/tmp/source.txt,下面我们在这个目录下在这个文件里面添加内容,具体内容如下:

下面通过单机的模式来启动连接器的程序,启动命令为:

代码语言:javascript
复制
./connect-standalone.sh  ../config/connect-standalone.properties  ../config/connect-file-source.properties

启动成功后输出的信息,如下所示:

代码语言:javascript
复制
#!/usr/bin/env python
#!coding:utf-8
[2021-06-07 20:43:12,715] INFO WorkerSourceTask{id=local-file-source-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:495)
[2021-06-07 20:43:12,727] INFO WorkerSourceTask{id=local-file-source-0} Finished commitOffsets successfully in 12 ms (org.apache.kafka.connect.runtime.WorkerSourceTask:574)

Kafka系统的连接器进程是以后台服务的形式在执行,它的默认端口是8083,我们可以通过REST API的方式来获取到相关的信息,比如获取到活跃连接器的实例列表,它的接口信息为:GET /connectors,具体如下:

下来再来演示下指定连接器的实例信息,返回它的详细信息,具体PostMan输出的信息如下:

下来我们再来看获取连接器的配置信息,也就是config的信息,具体PostMan的请求截图如下:

连接器提供了很多的REST API的接口,这里就不一一的演示。

下面来实现把数据导入到Kafka的login主题中,被导入的数据为

执行命令:

代码语言:javascript
复制
./connect-standalone.sh  ../config/connect-standalone.properties  ../config/connect-file-source.properties

执行成功后,在Kafka的消费者程序里面就可以看到数据被导入到login的主题中,如下所示:

下面实现在单机的模式下,把Kafka主题中的数据导出到本地的具体文件中,在config的配置文件connect-file-sink.properties中指定被导出的数据写入到本地的具体文件中,具体文件内容如下:

代码语言:javascript
复制
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
name=local-file-sink
connector.class=FileStreamSink
tasks.max=1
file=/tmp/target.txttopics=login

它的执行顺序具体为:

执行命令先把目标数据消费到Kafka的主题中,命令为:

代码语言:javascript
复制
./connect-standalone.sh  ../config/connect-standalone.properties ../config/connect-file-source.properties

然后在/tmp下创建文件target.txt。下来执行如下命令把Kafka的数据导出到本地,命令为:

代码语言:javascript
复制
./connect-standalone.sh  ../config/connect-standalone.properties ../config/connect-file-sink.properties

控制台打印的log信息为:

代码语言:javascript
复制
[2021-06-08 15:37:11,766] INFO WorkerSinkTask{id=local-file-sink-0} Committing offsets asynchronously using sequence number 1: {data-1=OffsetAndMetadata{offset=5, leaderEpoch=null, metadata=''}, data-0=OffsetAndMetadata{offset=0, leaderEpoch=null, metadata=''}, data-3=OffsetAndMetadata{offset=1, leaderEpoch=null, metadata=''}, data-2=OffsetAndMetadata{offset=0, leaderEpoch=null, metadata=''}, data-5=OffsetAndMetadata{offset=0, leaderEpoch=null, metadata=''}, data-4=OffsetAndMetadata{offset=0, leaderEpoch=null, metadata=''}} (org.apache.kafka.connect.runtime.WorkerSinkTask:346)

最后在/tmp下查看target.txt文件的内容,发现和最初被消费的目标文件内容是一致的。

根据如上,通过连接器把目标数据消费到Kafka系统的主题中,最后再通过连接器导出到本地的目标存储数据的地方(可能是数据库,也可能是文本)。这样就实现了最初说的连接数据管道的目的之一。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2021-06-08,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 Python自动化测试 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
数据库
云数据库为企业提供了完善的关系型数据库、非关系型数据库、分析型数据库和数据库生态工具。您可以通过产品选择和组合搭建,轻松实现高可靠、高可用性、高性能等数据库需求。云数据库服务也可大幅减少您的运维工作量,更专注于业务发展,让企业一站式享受数据上云及分布式架构的技术红利!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档