专栏首页爱编码学习kafka教程(二)

学习kafka教程(二)

本文主要介绍【KafkaStreams】

简介

Kafka Streams编写关键任务实时应用程序和微服务的最简单方法,是一个用于构建应用程序和微服务的客户端库,其中输入和输出数据存储在Kafka集群中。它结合了在客户端编写和部署标准Java和Scala应用程序的简单性和Kafka服务器端集群技术的优点。

Kafka Streams是一个用于构建关键任务实时应用程序和微服务的客户端库,其中输入和/或输出数据存储在Kafka集群中。Kafka Streams结合了在客户端编写和部署标准Java和Scala应用程序的简单性和Kafka服务器端集群技术的优点,使这些应用程序具有高度可伸缩性、灵活性、容错性、分布式等等。

目标

  • 了解kafka Streams
  • 会使用kafka Streams

过程

1.首先WordCountDemo示例代码(Java8以上)

// Serializers/deserializers (serde) for String and Long types
final Serde<String> stringSerde = Serdes.String();
final Serde<Long> longSerde = Serdes.Long();

// Construct a `KStream` from the input topic "streams-plaintext-input", where message values
// represent lines of text (for the sake of this example, we ignore whatever may be stored
// in the message keys).
KStream<String, String> textLines = builder.stream("streams-plaintext-input",
    Consumed.with(stringSerde, stringSerde);

KTable<String, Long> wordCounts = textLines
    // Split each text line, by whitespace, into words.
    .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))

    // Group the text words as message keys
    .groupBy((key, value) -> value)

    // Count the occurrences of each word (message key).
    .count()

// Store the running counts as a changelog stream to the output topic.
wordCounts.toStream().to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()));

它实现了WordCount算法,该算法从输入文本计算单词出现的直方图。然而,与您以前可能看到的对有界数据进行操作的其他WordCount示例不同,WordCount演示应用程序的行为略有不同,因为它被设计为对无限、无界的数据流进行操作。与有界变量类似,它是一种有状态算法,用于跟踪和更新单词的计数。然而,由于它必须假定输入数据可能是无界的,因此它将周期性地输出当前状态和结果,同时继续处理更多的数据,因为它不知道何时处理了“所有”输入数据。

2.安装并启动zookeeper和kafka

bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties

3.创建主题 接下来,我们创建名为streams-plain -input的输入主题和名为streams-wordcount-output的输出主题:

bin/kafka-topics.sh --create \
    --zookeeper localhost:2181 \
    --replication-factor 1 \
    --partitions 1 \
    --topic streams-plaintext-input
Created topic "streams-plaintext-input"

我们创建启用压缩的输出主题,因为输出流是一个变更日志流.

bin/kafka-topics.sh --create \
    --zookeeper localhost:2181 \
    --replication-factor 1 \
    --partitions 1 \
    --topic streams-wordcount-output \
    --config cleanup.policy=compact
Created topic "streams-wordcount-output"

创建的主题也可以使用相同的kafka主题进行描述

bin/kafka-topics.sh --zookeeper localhost:2181 --describe

4.启动Wordcount应用程序

bin/kafka-run-class.sh org.apache.kafka.streams.examples.wordcount.WordCountDemo

a)演示应用程序将从输入主题流(明文输入)中读取,对每个读取的消息执行WordCount算法的计算,并不断将其当前结果写入输出主题流(WordCount -output)。因此,除了日志条目之外,不会有任何STDOUT输出,因为结果是用Kafka写回去的。

b)现在我们可以在一个单独的终端上启动控制台生成器,向这个主题写入一些输入数据和检查输出的WordCount演示应用程序从其输出主题与控制台消费者在一个单独的终端.

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
    --topic streams-wordcount-output \
    --from-beginning \
    --formatter kafka.tools.DefaultMessageFormatter \
    --property print.key=true \
    --property print.value=true \
    --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
    --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer

c)输入端:现在让我们使用控制台生成器将一些消息写入输入主题流——纯文本输入,方法是输入一行文本,然后单击。这将发送新消息输入主题,消息键为空和消息值是刚才输入的字符串编码的文本行。

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-plaintext-input

此时你可以在控制台输入如下字符:

all streams lead to kafka

d))输出端:此消息将由Wordcount应用程序处理,以下输出数据将写入streams-wordcount-output主题并由控制台使用者打印:

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
    --topic streams-wordcount-output \
    --from-beginning \
    --formatter kafka.tools.DefaultMessageFormatter \
    --property print.key=true \
    --property print.value=true \
    --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
    --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer

这个时候会接收到刚刚在控制台输入的单词统计结果:

all     1
streams 1
lead    1
to      1
kafka   1

如此类推:你可以在输入端输入单词,对应的在输出端就会有统计结果。

小结: 可以看到,Wordcount应用程序的输出实际上是连续的更新流,其中每个输出记录(即上面原始输出中的每一行)是单个单词的更新计数,也就是记录键,如“kafka”。对于具有相同键的多个记录,后面的每个记录都是前一个记录的更新。

下面的两个图说明了幕后的本质。第一列显示KTable的当前状态的演变,该状态为count计算单词出现的次数。第二列显示KTable的状态更新所产生的更改记录,这些记录被发送到输出Kafka主题流-wordcount-output。

本文分享自微信公众号 - 爱编码(ilovecode),作者:明大侦探

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2019-01-28

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • 学习kafka教程(三)

    Kafka流通过构建Kafka生产者和消费者库,并利用Kafka的本地功能来提供数据并行性、分布式协调、容错和操作简单性,从而简化了应用程序开发。 下图展示了一...

    用户3467126
  • kafka教程(一)

    kafka是用于构建实时数据管道和流应用程序。具有横向扩展,容错,wicked fast(变态快)等优点,并已在成千上万家公司运行。

    用户3467126
  • 【Netty】Netty初识篇

    Netty是一个异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。

    用户3467126
  • linux学习第六十四篇:Shell脚本中的逻辑判断,文件目录属性判断, if特殊用法,case判断

    Shell脚本中的逻辑判断 格式1:if 条件 ; then 语句; fi 格式2:if 条件; then 语句; else 语句; fi 格式3:if …; ...

    用户1215343
  • 现代前端技术解析:前端与协议

    完整的HTTP报文由头部、空行、正文三部分组成。目前最广泛使用的是HTTP1.1。

    奋飛
  • elasticsearch的一些小知识点

    A little nonsense now and then is relished by the wisest people.

    小闫同学啊
  • Ubuntu下安装Yarm-PM2

    首先打开yarm的官网。https://www.yarnpkg.com/zh-Hant/

    用户2417870
  • kafka 配置kerberos校验以及开启acl实践

    转载请注明原创地址:http://www.cnblogs.com/dongxiao-yang/p/7131626.html

    sanmutongzi
  • 菲尔兹奖得主维拉尼:七个点子帮你找到科研灵感

    大数据文摘
  • 001golang中的字符串编码问题无标题文章

    HTMLEscape 函数将json编码的src中的<、>、&、U+2028 和U+2029字符替换为\u003c、\u003e、\u0026、\u2028、\...

    上善若水.夏

扫码关注云+社区

领取腾讯云代金券