前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Apache Kafka 消息队列

Apache Kafka 消息队列

作者头像
收心
发布2022-01-19 14:00:01
7080
发布2022-01-19 14:00:01
举报
文章被收录于专栏:Java实战博客

各大厂商选择的消息队列的应用不尽相同,市面上也有很多的产品,为了更好的适应就业,自己必须靠自己去学习,本篇文章讲述的就是,Kafka 消息队列

网络找的 :黑马Kafka笔记代码下载

Kafka 简介:

是一款分布式,基于 发布订阅模式的 消息队列产品,主要应用于大数据实时处理领域。

使用Kafka的好处?

好处就是使用消息队列的好处:削峰填谷、异步解耦

使用kafka的条件

依赖Zookeeper(帮助Kafka 集群存储信息,帮助消费者存储消费的位置信息)

下载Kafka

kafka_2.12-2.7.0下载

安装Kafka

启动 zookeper

进入bin目标,直接

代码语言:javascript
复制
启动
./kafka-server-start.sh -daemon ../config/server.properties
参数说明
    -daemon 的作用是后台启动,不占用当前终端打印台
    ../config/server.properties 是指定配置文件,不指定配置文件不行

停止 Kafka
./kafka-server-stop.sh 

查看是否启动成功

代码语言:javascript
复制
jps

启动成功了!

尚硅谷 在这里 提到了 shell 脚本 https://www.bilibili.com/video/BV1a4411B7V9?p=6&spm_id_from=pageDriver 不会,需要补充学习一下 16分钟之后

这里 补充一下配置文件的说明

  • zookeeper.connect 指明Zookeeper主机地址,如果zookeeper是集群则以逗号隔开,如: 172.6.14.61:2181,172.6.14.62:2181,172.6.14.63:2181
  • listeners 监听列表,broker对外提供服务时绑定的IP和端口。多个以逗号隔开,如果监听器名称不是一个安全的 协议, listener.security.protocol.map也必须设置。主机名称设置0.0.0.0绑定所有的接口,主机名称为 空则绑定默认的接口。如:PLAINTEXT://myhost:9092、SSL://:9091 CLIENT://0.0.0.0:9092、REPLICATION://localhost:9093
  • broker.id broker的唯一标识符,如果不配置则自动生成,建议配置且一定要保证集群中必须唯一,默认-1
  • log.dirs 日志数据存放的目录,如果没有配置则使用log.dir,建议此项配置。
  • message.max.bytes 服务器接受单个消息的最大大小,默认1000012 约等于976.6KB。

命令行操作

一台机器只能拥有一个副本 即replication-factor

topic 主题名称,partitions 分区数,replication-factor 备份数

代码语言:javascript
复制
./kafka-topics.sh --create --zookeeper 127.0.0.1:2181 --topic first --partitions 2 --replication-factor 1

./kafka-topics.sh --create --zookeeper 127.0.0.1:2181 --topic heima001 --partitions 2 --replication-factor 1

代码语言:javascript
复制
./kafka-topics.sh --list --zookeeper 127.0.0.1:2181

此时 日志里就会出现数据

代码语言:javascript
复制
[root@localhost bin]# ./kafka-topics.sh --delete --zookeeper 127.0.0.1:2181 --topic first 
下面提示
Topic first is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.

此时 去看数据

当在此添加主题相同名字 相同分区的、相同的备份 主题时些数据才会被清除

查看tipics信息

代码语言:javascript
复制
./kafka-topics.sh --describe --zookeeper 127.0.0.1:2181 --topic first1

读数据

代码语言:javascript
复制
/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic heima

发送数据

代码语言:javascript
复制
./kafka-console-producer.sh --broker-list localhost:9092 --topic heima

读、写使用如图

代码语言:javascript
复制
listeners=PLAINTEXT://0.0.0.0:9092

生产者详解:

①、首先要构造一个 ProducerRecord 对象,该对象可以声明主题Topic分区Partition键 Key以及 值 Value,主题和值是必须要声明的,分区和键可以不用指定。

②、调用send() 方法进行消息发送。

③、因为消息要到网络上进行传输,所以必须进行序列化,序列化器的作用就是把消息的 key 和 value对象序列化成字节数组。

④、接下来数据传到分区器,如果之间的 ProducerRecord 对象指定了分区,那么分区器将不再做 任何事,直接把指定的分区返回;如果没有,那么分区器会根据 Key 来选择一个分区,选择好分区之 后,生产者就知道该往哪个主题和分区发送记录了。

⑤、接着这条记录会被添加到一个记录批次里面,这个批次里所有的消息会被发送到相同的主题和 分区。会有一个独立的线程来把这些记录批次发送到相应的 Broker 上。

⑥、Broker成功接收到消息,表示发送成功,返回消息的元数据(包括主题和分区信息以及记录在 分区里的偏移量)。发送失败,可以选择重试或者直接抛出异常。

同步发送
代码语言:javascript
复制
producer.send(record)
异步发送 (相当于单独开线程去发送,不会影响主线程)
代码语言:javascript
复制
            producer.send(record, new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    if (exception == null) {
                        System.out.println(metadata.partition() + ":" + metadata.offset());
                    }
                }
            });
序列化器

消息要到网络上进行传输,必须进行序列化,而序列化器的作用就是如此。 Kafka 提供了默认的字符串序列化器(org.apache.kafka.common.serialization.StringSerializer), 还有整型(IntegerSerializer)和字节数组(BytesSerializer)序列化器,这些序列化器都实现了接口 (org.apache.kafka.common.serialization.Serializer)基本上能够满足大部分场景的需求。

特殊说明: 解决问题的光鲜,藏着磕Bug的痛苦。 万物皆入轮回,谁也躲不掉! 以上文章,均是我实际操作,写出来的笔记资料,不会出现全文盗用别人文章!烦请各位,请勿直接盗用!

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Kafka 简介:
  • 使用Kafka的好处?
  • 使用kafka的条件
  • 下载Kafka
  • 安装Kafka
  • 这里 补充一下配置文件的说明
  • 命令行操作
    • 一台机器只能拥有一个副本 即replication-factor
    • 查看tipics信息
    • 读数据
    • 发送数据
    • 读、写使用如图
    • 生产者详解:
    相关产品与服务
    文件存储
    文件存储(Cloud File Storage,CFS)为您提供安全可靠、可扩展的共享文件存储服务。文件存储可与腾讯云服务器、容器服务、批量计算等服务搭配使用,为多个计算节点提供容量和性能可弹性扩展的高性能共享存储。腾讯云文件存储的管理界面简单、易使用,可实现对现有应用的无缝集成;按实际用量付费,为您节约成本,简化 IT 运维工作。
    领券
    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档