前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Kafka 服务器集群部署

Kafka 服务器集群部署

作者头像
IT技术小咖
发布2019-06-26 15:51:04
1.7K0
发布2019-06-26 15:51:04
举报
文章被收录于专栏:码上修行码上修行

上篇文章 Kafka 工作机制 讲述了 Kafka 的各组件(包括配置中心、Broker、消息生产者和消费者)的作用,分区与复制的机制等。有了这些概念,本文以三个 Broker 为例,讲述了 Kafka 集群的搭建步骤和方法,并以官方自带的命令行脚本进行消息的生产、消费、查看等操作。

1 下载

最新版本为 1.0.0,发布于 2017-11-01。 http://kafka.apache.org/downloads https://www.apache.org/dyn/closer.cgi?path=/kafka/1.0.0/kafka_2.11-1.0.0.tgz

2 安装(解压)

代码语言:javascript
复制
wget http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/1.0.0/kafka_2.11-1.0.0.tgz
sudo tar -zxf kafka_2.11-1.0.0.tgz -C /opt

3 调整配置

打算部署成三个节点的集群,把 config/server.properties 复制成三份:

代码语言:javascript
复制
sudo cp $KAFKA_HOME/config/server.properties $KAFKA_HOME/config/server-1.properties
sudo cp $KAFKA_HOME/config/server.properties $KAFKA_HOME/config/server-2.properties
sudo cp $KAFKA_HOME/config/server.properties $KAFKA_HOME/config/server-3.properties

修改三个文件 config/server-X.properties 如下内容(前三行末字符分别用 1/2/3): 修改后用命令检查:

代码语言:javascript
复制
grep -E "^broker.id|^log.dirs|^listeners" $KAFKA_HOME/config/server-?.properties

其中的参数 zookeeper.connect 用来指定 ZooKeeper 服务器地址,三个文件内容一样。 注意:三个 host:port 共用一个 /kafka,表示三个 ZooKeeper 服务器中都使用 /kafka 作为 kafka 存储的根目录。

代码语言:javascript
复制
### 必设参数
broker.id=1 ## 服务器的代理ID(默认值-1),需与 zookeeper 的代理ID不同,建议 brokerId 从 maxZookeeperId+1 开始设置;
log.dirs=/tmp/kafka-logs-1 ## 消息日志数据保存的目录;
listeners=PLAINTEXT://:9091 ## 面向客户端的监听器列表;
zookeeper.connect=localhost:2181,localhost:2182,localhost:2183/kafka ## ZooKeeper服务器连接字符串;
### 重要参数
auto.create.topics.enable=true ## 当主题不存在时,允许自动创建主题(默认为 true);
num.partitions=3 ## 每个主题的默认日志分区数量(默认为1);
default.replication.factor=3 ## 自动创建主题的默认复制因子(默认为3);
log.retention.hours=168 ## 比这个保留期更早的消息将被丢弃(默认为 168小时,即7天);
delete.topic.enable=true ## 允许删除主题(默认为 true);
controlled.shutdown.enable=true ## 支持优雅的关机(默认为true)

4 设置环境变量

代码语言:javascript
复制
## sudo vim /etc/profile ## 所有用户有效
export KAFKA_HOME=/opt/kafka_2.11-1.0.0
export PATH=$PATH:$KAFKA_HOME/bin

保存文件,Linux 用户重新登录后生效。

5 防火墙放行

如果非本机应用需要连接,必须把监听端口放行。

代码语言:javascript
复制
### sudo vim /etc/sysconfig/iptables
-A INPUT -m state --state NEW -m tcp -p tcp --dport 9091 -j ACCEPT
-A INPUT -m state --state NEW -m tcp -p tcp --dport 9092 -j ACCEPT
-A INPUT -m state --state NEW -m tcp -p tcp --dport 9093 -j ACCEPT
### 重启生效: sudo systemctl restart iptables

6 启动服务器

指定选项 -daemon 时采用“守护进程”启动,否则以“控制台进程”启动。

代码语言:javascript
复制
## sudo vim $KAFKA_HOME/bin/kafka-server-start-all.sh
## sudo chmod +x $KAFKA_HOME/bin/kafka-server-start-all.sh
sudo $KAFKA_HOME/bin/kafka-server-start.sh -daemon $KAFKA_HOME/config/server-1.properties
sudo $KAFKA_HOME/bin/kafka-server-start.sh -daemon $KAFKA_HOME/config/server-2.properties
sudo $KAFKA_HOME/bin/kafka-server-start.sh -daemon $KAFKA_HOME/config/server-3.properties

7 停止服务器

Kafka 集群将自动检测到任何 Broker 故障或关机(包括人为地),并为该机器上的分区选择新的领导。 因为维护或配置更改而故意停机时,Kafka 支持更优雅的关机机制(默认配置已开启 controlled.shutdown.enable=true):

  • 将所有的日志同步到磁盘,以避免重新启动时需要做任何日志恢复;
  • 在关闭之前将服务器领导者的任何分区迁移到其他副本;
代码语言:javascript
复制
## sudo vim $KAFKA_HOME/bin/kafka-server-stop-all.sh
## sudo chmod +x $KAFKA_HOME/bin/kafka-server-stop-all.sh
## cat $KAFKA_HOME/bin/kafka-server-{stop,start}-all.sh | sudo tee $KAFKA_HOME/bin/kafka-server-restart-all.sh
## sudo chmod +x $KAFKA_HOME/bin/kafka-server-restart-all.sh
sudo $KAFKA_HOME/bin/kafka-server-stop.sh

8 命令行测试

注意:zookeeper 的 host:port/kafka,表示 ZooKeeper 中使用 /kafka 作为 kafka 存储信息的根目录。

8.1 主题的创建与查看

创建一个分区数为1、复制因子为 3 的主题,名称为 topicName 默认配置时(auto.create.topics.enable=true),针对不存在的主题发布或消费时,主题会自动创建,而且采用的分区数和复制因子都有相应的配置(num.partitions=1default.replication.factor=3)。

代码语言:javascript
复制
kafka-topics.sh --create --zookeeper localhost:2181/kafka --partitions 1 --replication-factor 3 --topic topicName
# Result: Created topic "topicName".
kafka-topics.sh --list --zookeeper localhost:2181/kafka
# Result: topicName / topicName2 / topicName3

8.2 主题描述信息查询

代码语言:javascript
复制
# kafka-topics.sh --create --zookeeper localhost:2181/kafka --partitions 3 --replication-factor 2 --topic topicName
kafka-topics.sh --describe --zookeeper localhost:2181/kafka --topic topicName

输出信息如下:

代码语言:javascript
复制
Topic:topicName3       PartitionCount:3             ReplicationFactor:2   Configs:
   Topic: topicName3   Partition: 0     Leader: 1   Replicas: 1,2         Isr: 1,2
   Topic: topicName3   Partition: 1     Leader: 2   Replicas: 2,3         Isr: 2,3
   Topic: topicName3   Partition: 2     Leader: 3   Replicas: 3,1         Isr: 3,1

可看到每个 broker 的角色:

  • 第一行(主题概要):分区数 3,复制因子 2;
  • 后面各行是各个分区(0/1/2)的信息,字段含义如下:
  •   Leader: 作为主题 Leader 的 brokerId;
  •   Replicas: 表示复制数据的节点的 brokerId(Leader 也可以在其中);
  •   isr(in-sync replicas): 是 Replicas 的子集(当前存活者),等待升级为 Leader 的 brokerId;

8.3 消息的发布和消费

代码语言:javascript
复制
kafka-console-producer.sh --broker-list localhost:9092 --topic topicName
## 等待提示符(大于号)出现后,输入文本,每行作为一个消息来发布。按 CTRL+C 结束
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topicName
## 会逐行打印收到的消息,按 CTRL+C 结束。如果要从头接收,要增加选项 --from-beginning

8.4 主题的删除【测试环境用】

代码语言:javascript
复制
grep "delete.topic.enable" $KAFKA_HOME/logs/server.log ## 确保为 true(默认)
kafka-topics.sh --delete --zookeeper localhost:2181/kafka --topic topicName
## 【可能不需要此步】前面只是对主题做了删除标记,必须手工再删除
$KAFKA_HOME/bin/kafka-server-stop-all.sh
zkCli.sh -server localhost:2181 <<EOF
   rmr /kafka/brokers/topics/topicName
   rmr /kafka/admin/delete_topics/topicName
   rmr /kafka/config/topics/topicName
   quit
EOF
sudo rm -rf /tmp/kafka-logs-{1,2,3}/topicName
$KAFKA_HOME/bin/kafka-server-start-all.sh
## 确认删除结果
kafka-topics.sh --list --zookeeper localhost:2181/kafka

9 清空数据【测试环境用】

代码语言:javascript
复制
sudo $KAFKA_HOME/bin/kafka-server-stop-all.sh
sudo rm -rf /tmp/kafka-logs /tmp/kafka-logs-{1,2,3}
sudo $KAFKA_HOME/bin/kafka-server-start-all.sh

作者:王克锋

出处:https://kefeng.wang/2017/11/16/kafka-deploy/

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2019-05-11,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 码上修行 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1 下载
  • 2 安装(解压)
  • 3 调整配置
  • 4 设置环境变量
  • 5 防火墙放行
  • 6 启动服务器
  • 7 停止服务器
  • 8 命令行测试
    • 8.1 主题的创建与查看
      • 8.2 主题描述信息查询
        • 8.3 消息的发布和消费
          • 8.4 主题的删除【测试环境用】
          • 9 清空数据【测试环境用】
          相关产品与服务
          微服务引擎 TSE
          微服务引擎(Tencent Cloud Service Engine)提供开箱即用的云上全场景微服务解决方案。支持开源增强的云原生注册配置中心(Zookeeper、Nacos 和 Apollo),北极星网格(腾讯自研并开源的 PolarisMesh)、云原生 API 网关(Kong)以及微服务应用托管的弹性微服务平台。微服务引擎完全兼容开源版本的使用方式,在功能、可用性和可运维性等多个方面进行增强。
          领券
          问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档