前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >搭建Kafka集群( 2.8.0版本)之二

搭建Kafka集群( 2.8.0版本)之二

作者头像
程裕强
发布2021-09-08 15:57:04
5550
发布2021-09-08 15:57:04
举报
文章被收录于专栏:大数据学习笔记

1、创建Topic

(1)创建topic

代码语言:javascript
复制
[root@node3 kafka-2.8.0]# bin/kafka-topics.sh --create --topic demo --bootstrap-server node1:9092
Created topic demo.
[root@node3 kafka-2.8.0]#

(2)查看topic

代码语言:javascript
复制
[root@node3 kafka-2.8.0]# bin/kafka-topics.sh  --list --zookeeper node1:2181
__consumer_offsets
demo
[root@node3 kafka-2.8.0]#

(3)查看名称为test的topic消息

代码语言:javascript
复制
[root@node3 kafka-2.8.0]# bin/kafka-topics.sh --describe --topic demo --bootstrap-server node1:9092
Topic: demo     TopicId: wdH8eCg_SNqaWi3N1crN8Q PartitionCount: 3       ReplicationFactor: 1    Configs: segment.bytes=1073741824
        Topic: demo     Partition: 0    Leader: 1       Replicas: 1     Isr: 1
        Topic: demo     Partition: 1    Leader: 2       Replicas: 2     Isr: 2
        Topic: demo     Partition: 2    Leader: 3       Replicas: 3     Isr: 3
[root@node3 kafka-2.8.0]# 

2、启动生产者,发送消息

Kafka软件包提供了kafka-console-producer.sh脚本,这是一个简易的生产者控制台,供我们测试之用。我们可以通过Java编写一个具有具体业务意义的消息生产者。

代码语言:javascript
复制
[root@node3 kafka-2.8.0]# bin/kafka-console-producer.sh --topic demo --bootstrap-server node1:9092
>hello
>This is my second message
>^C[root@node3 kafka-2.8.0]#

其中--bootstrap-server是kafka-console-producer.sh脚本必要参数,指明连接的服务器,形如host1:prot1,host2:prot2

3、启动消费者,消费消息

Kafka软件包提供了kafka-console-consumer.sh脚本,这是一个简易的消费者控制台,供我们测试之用。我们可以通过Java编写一个具有具体业务意义的消息消费者。

(1)在node1节点启动一个消费者

代码语言:javascript
复制
[root@node1 kafka-2.8.0]# bin/kafka-console-consumer.sh --topic demo --from-beginning --bootstrap-server node1:9092
hello
This is my second message

同样--bootstrap-server是kafka-console-consumer.sh脚本必要参数,指明连接的服务器,形如host1:prot1,host2:prot2 (2)在node2节点上再启动一个消费者

代码语言:javascript
复制
[root@node2 kafka-2.8.0]#  bin/kafka-console-consumer.sh --topic demo --from-beginning --bootstrap-server node1:9092,node2:9092,node3:9092
hello
This is my second message

4、Key/Value消息

默认情况下,所生产的消息是没有key,或者认为所有消息内容都是value值,其可以值为null。

下面测试一下Key/Value消息 (1)启动生产者,启用key解析

代码语言:javascript
复制
[root@node1 kafka-2.8.0]# bin/kafka-console-producer.sh --topic demo --bootstrap-server node1:9092 --property parse.key=true
>hello  Kafka
>

--property表示设置消费者相关的配置,其后的parse.key=true表示启用parse.key。

(2)启动消费者

代码语言:javascript
复制
[root@node2 kafka-2.8.0]# bin/kafka-console-consumer.sh --topic demo --from-beginning --bootstrap-server node1:9092 --property print.key=true
null    hello
hello   Kafka
null    This is my second message

可以看到之前普通消息的key是null

5、消费者无法收到消息

最近遇到一个问题,重新搭建了一套kafka集群,但是kafka-console-consumer.sh无法收到kafka-console-producer.sh 发送的消息。 经过一番折腾后,问题终于解决了(问题产生的原因尚不明确),解决流程如下:

4.1 停止kafka集群,清空数据

(1)新建脚本文件init_kafka.sh

代码语言:javascript
复制
#! /bin/bash
 
echo "------正在停止Kafka集群------"
SERVERS="node1 node2 node3"

stop_kafka() {
    for SERVER in $SERVERS
    do
         ssh $SERVER "/opt/kafka-2.8.0/bin/kafka-server-stop.sh;jps -m"
    done
}
rm_data() {
    for SERVER in $SERVERS
    do
         ssh $SERVER " rm -rf /var/kafka-logs/*;ls /var/kafka-logs/"
    done
}
echo "------停止Kafka集群------"
stop_kafka
echo "------删除Kafka集群数据----"
rm_data

(2)执行过程

代码语言:javascript
复制
[root@node1 ~]# chmod +x init_kafka.sh 
[root@node1 ~]# sh init_kafka.sh 
------正在停止Kafka集群------
------停止Kafka集群------
10400 PaloFe
20006 Jps -m
12744 Worker --webui-port 8081 spark://node1:7077
14153 BrokerBootstrap
12249 Master --host node1 --port 7077 --webui-port 8080
23787 Kafka /opt/kafka-2.8.0/config/server.properties
22446 QuorumPeerMain /opt/zookeeper-3.4.10/bin/../conf/zoo.cfg
6144 Worker --webui-port 8081 spark://node1:7077
5009 QuorumPeerMain /opt/zookeeper-3.4.10/bin/../conf/zoo.cfg
23955 BrokerBootstrap
18516 PaloFe -helper node1:9010
4762 Jps -m
5918 Kafka /opt/kafka-2.8.0/config/server.properties
4786 Kafka /opt/kafka-2.8.0/config/server.properties
5971 PaloFe -helper node1:9010
1940 Worker --webui-port 8081 spark://node1:7077
3892 QuorumPeerMain /opt/zookeeper-3.4.10/bin/../conf/zoo.cfg
22940 BrokerBootstrap
4638 Jps -m
------删除Kafka集群数据----
[root@node1 ~]# 
4.2 停止zookeeper集群,清空数据

(1)新建脚本文件init_zk.sh

代码语言:javascript
复制
#! /bin/bash
 
echo "------正在停止ZK集群------"
SERVERS="node1 node2 node3"

stop_zk() {
    for SERVER in $SERVERS
    do
            ssh $SERVER "/opt/zookeeper-3.4.10/bin/zkServer.sh stop;jps -m"
    done
}
rm_data() {
    id=0
    for SERVER in $SERVERS
    do
         let id++  
         ssh $SERVER "rm -rf /tpdata/zookeeper/*;echo $id > /tpdata/zookeeper/myid;ls /tpdata/zookeeper/"
    done
}
echo "------停止ZK集群------"
stop_zk
echo "------删除ZK集群数据----"
rm_data

(2)执行过程

代码语言:javascript
复制
[root@node1 ~]# chmod +x  init_zk.sh 
[root@node1 ~]# sh init_zk.sh 
------正在停止ZK集群------
------停止ZK集群------
ZooKeeper JMX enabled by default
Using config: /opt/zookeeper-3.4.10/bin/../conf/zoo.cfg
Stopping zookeeper ... STOPPED
10400 PaloFe
30736 Jps -m
12744 Worker --webui-port 8081 spark://node1:7077
14153 BrokerBootstrap
12249 Master --host node1 --port 7077 --webui-port 8080
22446 -- process information unavailable
ZooKeeper JMX enabled by default
Using config: /opt/zookeeper-3.4.10/bin/../conf/zoo.cfg
Stopping zookeeper ... STOPPED
6144 Worker --webui-port 8081 spark://node1:7077
5009 -- process information unavailable
23955 BrokerBootstrap
18516 PaloFe -helper node1:9010
10392 Jps -m
ZooKeeper JMX enabled by default
Using config: /opt/zookeeper-3.4.10/bin/../conf/zoo.cfg
Stopping zookeeper ... STOPPED
5971 PaloFe -helper node1:9010
1940 Worker --webui-port 8081 spark://node1:7077
3892 -- process information unavailable
10215 Jps -m
22940 BrokerBootstrap
------删除ZK集群数据----
myid
myid
myid
[root@node1 ~]# 
4.3 重新启动zookeeper集群

(1)新建zk集群脚本start_zk.sh

代码语言:javascript
复制
#! /bin/bash
 
echo "------正在启动ZK集群------"
SERVERS="node1 node2 node3"

start_zk() {
    for SERVER in $SERVERS
    do
            ssh $SERVER "/opt/zookeeper-3.4.10/bin/zkServer.sh start;jps -m"
    done
}
show_status() {
    for SERVER in $SERVERS
    do
         ssh $SERVER "/opt/zookeeper-3.4.10/bin/zkServer.sh status"
    done
}
echo "------启动ZK集群------"
start_zk

echo "------等待5秒,查看ZK集群状态----"
sleep 5
show_status

(2)执行过程

代码语言:javascript
复制
[root@node1 ~]# chmod +x  start_zk.sh 
[root@node1 ~]# sh start_zk.sh 
------正在启动ZK集群------
------启动ZK集群------
ZooKeeper JMX enabled by default
Using config: /opt/zookeeper-3.4.10/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
10400 PaloFe
12744 Worker --webui-port 8081 spark://node1:7077
14153 BrokerBootstrap
12249 Master --host node1 --port 7077 --webui-port 8080
6651 Jps -m
6574 QuorumPeerMain /opt/zookeeper-3.4.10/bin/../conf/zoo.cfg
ZooKeeper JMX enabled by default
Using config: /opt/zookeeper-3.4.10/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
6144 Worker --webui-port 8081 spark://node1:7077
23955 BrokerBootstrap
18516 PaloFe -helper node1:9010
14569 QuorumPeerMain /opt/zookeeper-3.4.10/bin/../conf/zoo.cfg
14605 Jps -m
ZooKeeper JMX enabled by default
Using config: /opt/zookeeper-3.4.10/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
5971 PaloFe -helper node1:9010
1940 Worker --webui-port 8081 spark://node1:7077
14475 Jps -m
22940 BrokerBootstrap
14429 QuorumPeerMain /opt/zookeeper-3.4.10/bin/../conf/zoo.cfg
------等待5秒,查看ZK集群状态----
ZooKeeper JMX enabled by default
Using config: /opt/zookeeper-3.4.10/bin/../conf/zoo.cfg
Mode: follower
ZooKeeper JMX enabled by default
Using config: /opt/zookeeper-3.4.10/bin/../conf/zoo.cfg
Mode: leader
ZooKeeper JMX enabled by default
Using config: /opt/zookeeper-3.4.10/bin/../conf/zoo.cfg
Mode: follower
[root@node1 ~]# 
4.4 重新启动kafka集群

(1)新建启动kafka集群脚本start_kafka.sh

代码语言:javascript
复制
#! /bin/bash
 
echo "------正在启动Kafka集群------"
SERVERS="node1 node2 node3"

start_kafka() {
    for SERVER in $SERVERS
    do
            ssh $SERVER "/opt/kafka-2.8.0/bin/kafka-server-start.sh -daemon  /opt/kafka-2.8.0/config/server.properties;jps -m"
    done
}
start_kafka

(2)执行过程

代码语言:javascript
复制
[root@node1 ~]# chmod +x  start_kafka.sh 
[root@node1 ~]# sh start_kafka.sh 
------正在启动Kafka集群------
10400 PaloFe
12744 Worker --webui-port 8081 spark://node1:7077
14153 BrokerBootstrap
12249 Master --host node1 --port 7077 --webui-port 8080
13371 Kafka /opt/kafka-2.8.0/config/server.properties
13372 Jps -m
6574 QuorumPeerMain /opt/zookeeper-3.4.10/bin/../conf/zoo.cfg
6144 Worker --webui-port 8081 spark://node1:7077
23955 BrokerBootstrap
18516 PaloFe -helper node1:9010
14569 QuorumPeerMain /opt/zookeeper-3.4.10/bin/../conf/zoo.cfg
17979 Kafka /opt/kafka-2.8.0/config/server.properties
17980 Jps -m
5971 PaloFe -helper node1:9010
1940 Worker --webui-port 8081 spark://node1:7077
18133 Kafka /opt/kafka-2.8.0/config/server.properties
18134 Jps -m
22940 BrokerBootstrap
14429 QuorumPeerMain /opt/zookeeper-3.4.10/bin/../conf/zoo.cfg
[root@node1 ~]#
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2021/09/05 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1、创建Topic
  • 2、启动生产者,发送消息
  • 3、启动消费者,消费消息
  • 4、Key/Value消息
  • 5、消费者无法收到消息
    • 4.1 停止kafka集群,清空数据
      • 4.2 停止zookeeper集群,清空数据
        • 4.3 重新启动zookeeper集群
          • 4.4 重新启动kafka集群
          领券
          问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档