前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >教程|运输IoT中的Kafka

教程|运输IoT中的Kafka

作者头像
大数据杂货铺
发布2021-04-30 10:50:55
1.5K0
发布2021-04-30 10:50:55
举报
文章被收录于专栏:大数据杂货铺大数据杂货铺

介绍

本教程介绍了Apache Kafka的核心概念及其在可靠性、可伸缩性、持久性和性能至关重要的环境中所扮演的角色。

我们将创建Kafka主题(类别队列),来处理数据管道中的大量数据,充当物联网(IoT)数据和Storm拓扑之间的连接。

Kafka消息系统

目标

要了解分布式系统中的消息系统背后的概念消,以及如何使用它们来转移生产者(发布者,发送者)和消费者(订阅者,接收者)之间的消息。在此示例中,您将了解Kafka。

什么是消息系统?

消息系统在客户端应用程序之间传输数据。一个应用程序生成数据,例如从嵌入在车辆上的传感器读取数据,另一个应用程序接收数据,对其进行处理以使其可视化以显示有关驾驶这些车辆的驾驶员驾驶行为的特征。如您所见,每个应用程序开发人员都可以专注于编写代码来分析数据,而不必担心如何共享数据。在这种情况下使用两种消息传递系统,即点对点和发布订阅。最常用的系统是发布订阅,但我们将同时介绍两者。

点对点系统

点对点是将消息传输到队列中

以上通用图的主要特征:

  • 生产者将消息发送到队列中,每个消息仅由一个消费者读取
  • 一旦消息被使用,该消息就会消失
  • 多个使用者可以从队列中读取消息

发布-订阅系统

发布-订阅是传送到主题中的消息

  • 消息生产者被称为发布者
  • 消息使用者称为订阅者

如何将发布-订阅消息系统的工作?

  • 发布者将消息发送到1个或多个主题中
  • 订阅者可以安排接收1个或多个主题,然后使用所有消息

什么是Kafka

Apache Kafka是一个基于发布-订阅的开源消息传递系统,负责将数据从一个应用程序传输到另一个应用程序。

架构概述

总体而言,我们的数据管道如下所示:

NiFi生产者

生产者实现为Kafka Producer的NiFi处理器,从卡车传感器和交通信息生成连续的实时数据提要,这些信息分别发布到两个Kafka主题中。

要了解有关Kafka Producer API示例代码的更多信息,请访问开发Kafka Producers

Kafka集群

具有1个或多个主题,用于支持由Kafka代理管理的1个或多个类别的消息,这些消息可创建每个主题的副本(类别队列)以实现持久性。

Storm消费者

从Kafka Cluster读取消息,并将其发送到Apache Storm拓扑中进行处理。

要了解有关Kafka消费者API示例代码的更多信息,请访问开发Kafka消费者

Kafka的好处

可靠性

  • 分布式,分区,复制和容错

可扩展性

  • 消息系统可轻松扩展,而无需停机

持久性

  • “分布式提交日志”,即使在创建该数据的进程结束后,消息仍可以继续存在于磁盘上

性能

  • 高吞吐量,用于发布和订阅消息
  • 保持许多TB的稳定性能

在Demo中探索Kafka

环境设定

如果您安装了最新的Cloudera DataFlow(CDF)沙盒,则该演示已预先安装。

打开本地计算机上的终端,然后通过开箱即用”的方法访问沙箱。

在对数据执行Kafka操作之前,我们必须首先在Kafka中包含数据,因此让我们运行NiFi DataFlow应用程序。请参阅本模块中的步骤:在Trucking IoT Demo中运行NiFi,然后您就可以开始探索Kafka。

如果尚未通过Ambari打开Kafka组件,则将其打开。

将数据持久化到Kafka主题中

NiFi模拟器会生成两种类型的数据:TruckData和TrafficData作为CSV字符串。数据上会进行一些预处理,以准备将其拆分并由NiFi的Kafka生产者发送给两个单独的Kafka主题:trucking_data_truck和trucking_data_traffic。

列出Kafka主题

在终端上,我们可以看到已创建的两个Kafka主题:

代码语言:javascript
复制
/usr/hdf/current/kafka-broker/bin/kafka-topics.sh --list --zookeeper localhost:2181

结果:

代码语言:javascript
复制
Output:
trucking_data_driverstats
trucking_data_joined
trucking_data_traffic
trucking_data_truck_enriched

在Kafka主题中查看数据

由于生产者将消息保留在Kafka主题中,因此您可以通过编写以下命令在每个主题中看到它们:

查看Kafka的数据主题:trucking_data_truck_enriched:

代码语言:javascript
复制
/usr/hdf/current/kafka-broker/bin/kafka-console-consumer.sh --bootstrap-server sandbox-hdf.hortonworks.com:6667 --topic trucking_data_truck_enriched --from-beginning

查看Kafka的数据主题:trucking_data_traffic:

代码语言:javascript
复制
/usr/hdf/current/kafka-broker/bin/kafka-console-consumer.sh --bootstrap-server sandbox-hdf.hortonworks.com:6667 --topic trucking_data_traffic --from-beginning

如您所见,Kafka充当了一个健壮的队列,可以接收数据并将其传输到其他系统。

代码语言:javascript
复制
Note: You may notice the is data encoded in a format we cannot read, this format is necessary for Schema Registry. The reason we are using Schema Registry is because we need it for Stream Analytics Manager to pull data from Kafka.

了解Kafka的基本操作

Kafka组件

现在我们已经了解了Kafka的功能,下面让我们探讨其不同的组件,定义Kafka流程时的构建基块以及使用它们的原因。

  • 生产者:发布一个或多个主题的消息的发布者。将数据发送给Kafka代理。
  • 主题:属于类别的消息流,分为多个分区。一个主题必须至少具有一个分区。
  • 分区:消息具有不可变的序列,并实现为大小相等的段文件。他们还可以处理任意数量的数据。
  • 分区偏移量:分区消息中的唯一序列ID。
  • 分区副本:分区的“备份”。它们从不读取或写入数据,并且可以防止数据丢失。
  • Kafka Brokers:责任是维护发布的数据。
  • Lead Broker:负责在给定分区上执行的所有读取或写入的节点。
  • 追随者代理:遵循领导者指示的节点。如果领导者失败,它将代替领导者。还像接收方一样拉入消息并更新其数据存储。
  • Kafka群集:如果存在多个代理,则Kafka被视为Kafka群集。拥有多个代理的主要原因是要管理消息数据的持久性和复制,并在没有繁华的情况下进行扩展。
  • 消费者组:来自相同组ID的消费者。
  • 消费者:通过提取数据从经纪人读取数据。他们订阅1个或更多主题。

创建两个Kafka主题

最初在构建此演示时,我们验证了Zookeeper是否正在运行,因为Kafka使用Zookeeper。如果Zookeeper已关闭,则我们从Cloudera Manager运行或打开该命令:

代码语言:javascript
复制
/usr/hdf/current/kafka-broker/bin/zookeeper-server-start.sh config/zookeeper.properties

然后,我们通过Ambari或命令启动Kafka Broker:

代码语言:javascript
复制
/usr/hdf/current/kafka-broker/bin/kafka-server-start.sh config/server.properties

如果要查看正在运行的守护程序,请键入 jps

代码语言:javascript
复制
Example of Output:


2306 drpc
932 AmbariServer
2469 core
2726 logviewer
3848 NiFiRegistry
5201 StreamlineApplication
3602 NiFi
3026 TlsToolkitMain
18194 Jps
1684 Kafka
3829 RunNiFiRegistry
2649 Supervisor
1530 RegistryApplication
4762 LogSearch
4987 LogFeeder
3581 RunNiFi
4383 jar
1375 QuorumPeerMain

我们使用以下命令创建了两个Kafka主题:trucking_data_truck_enriched和trucking_data_traffic:

代码语言:javascript
复制
/usr/hdf/current/kafka-broker/bin/kafka-topics.sh --create --zookeeper sandbox-hdf.hortonworks.com:2181 --replication-factor 1 --partitions 10 --topic trucking_data_truck_enriched
/usr/hdf/current/kafka-broker/bin/kafka-topics.sh --create --zookeeper sandbox-hdf.hortonworks.com:2181 --replication-factor 1 --partitions 10 --topic trucking_data_traffic

创建了两个Kafka主题,每个主题有10个分区,每个分区有一个分区。创建主题后,Kafka代理终端会发送一条通知,该通知可以在创建主题的日志中找到:“ /tmp/kafka-logs/”

启动生产者发送消息

在我们的演示中,我们利用称为Apache NiFi的数据流框架生成传感器卡车数据和在线交通数据,对其进行处理并集成Kafka的Producer API,因此NiFi可以将其流文件的内容转换为可以发送给Kafka的消息。

启动NiFi流程中的所有处理器(包括Kafka处理器),数据将保留在两个Kafka主题中。

启动消费者以接收消息

在我们的演示中,我们利用称为Apache Storm的流处理框架来消耗来自Kafka的消息。Storm集成了Kafka的Consumer API,以从Kafka代理获取消息,然后执行复杂的处理并将数据发送到目的地以进行存储或可视化。

提交Storm拓扑,来自Kafka主题的消息将被拉入Storm。

总结

恭喜你!现在,您将了解Kafka在演示应用程序中扮演的角色,如何创建Kafka主题以及如何使用Kafka的Producer API和Kafka的Consumer API在主题之间传输数据。在我们的演示中,我们向您展示了NiFi将Kafka的Producer API包装到其框架中,Storm对Kafka的Consumer API进行了同样的处理。

进一步阅读

  • 要了解有关Apache Kafka的更多信息,请访问Kafka文档
  • 要了解有关NiFi Kafka集成的更多信息,请访问集成Apache NiFi和Apache Kafka。
  • 要了解有关Storm Kafka集成的更多信息,请访问Storm Kafka Integration

附录:Kafka额外操作

修改Kafka主题

如果您需要修改Kafka主题,请运行以下命令:

/usr/hdf/current/kafka-broker/bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic topic-name --partitions X

您的主题名称帐户将有所不同,并且您要添加的分区数量也将有所不同。

  • X代表您要更改主题的分区数

如果需要删除Kafka主题,请运行以下命令:

/usr/hdf/current/kafka-broker/bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic topic_name

原文链接:https://www.cloudera.com/tutorials/kafka-in-trucking-iot.html

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2021-04-23,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 大数据杂货铺 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 介绍
  • Kafka消息系统
  • 什么是消息系统?
  • 点对点系统
  • 发布-订阅系统
  • 什么是Kafka
  • 架构概述
  • Kafka的好处
    • 在Demo中探索Kafka
      • 将数据持久化到Kafka主题中
        • 列出Kafka主题
          • 在Kafka主题中查看数据
            • 了解Kafka的基本操作
              • Kafka组件
                • 创建两个Kafka主题
                  • 启动生产者发送消息
                    • 启动消费者以接收消息
                      • 总结
                        • 进一步阅读
                          • 附录:Kafka额外操作
                          相关产品与服务
                          数据保险箱
                          数据保险箱(Cloud Data Coffer Service,CDCS)为您提供更高安全系数的企业核心数据存储服务。您可以通过自定义过期天数的方法删除数据,避免误删带来的损害,还可以将数据跨地域存储,防止一些不可抗因素导致的数据丢失。数据保险箱支持通过控制台、API 等多样化方式快速简单接入,实现海量数据的存储管理。您可以使用数据保险箱对文件数据进行上传、下载,最终实现数据的安全存储和提取。
                          领券
                          问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档