Kafka 服务器集群部署

上篇文章 Kafka 工作机制 讲述了 Kafka 的各组件(包括配置中心、Broker、消息生产者和消费者)的作用,分区与复制的机制等。有了这些概念,本文以三个 Broker 为例,讲述了 Kafka 集群的搭建步骤和方法,并以官方自带的命令行脚本进行消息的生产、消费、查看等操作。

1 下载

最新版本为 1.0.0,发布于 2017-11-01。 http://kafka.apache.org/downloads https://www.apache.org/dyn/closer.cgi?path=/kafka/1.0.0/kafka_2.11-1.0.0.tgz

2 安装(解压)

wget http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/1.0.0/kafka_2.11-1.0.0.tgz
sudo tar -zxf kafka_2.11-1.0.0.tgz -C /opt

3 调整配置

打算部署成三个节点的集群,把 config/server.properties 复制成三份:

sudo cp $KAFKA_HOME/config/server.properties $KAFKA_HOME/config/server-1.properties
sudo cp $KAFKA_HOME/config/server.properties $KAFKA_HOME/config/server-2.properties
sudo cp $KAFKA_HOME/config/server.properties $KAFKA_HOME/config/server-3.properties

修改三个文件 config/server-X.properties 如下内容(前三行末字符分别用 1/2/3): 修改后用命令检查:

grep -E "^broker.id|^log.dirs|^listeners" $KAFKA_HOME/config/server-?.properties

其中的参数 zookeeper.connect 用来指定 ZooKeeper 服务器地址,三个文件内容一样。 注意:三个 host:port 共用一个 /kafka,表示三个 ZooKeeper 服务器中都使用 /kafka 作为 kafka 存储的根目录。

### 必设参数
broker.id=1 ## 服务器的代理ID(默认值-1),需与 zookeeper 的代理ID不同,建议 brokerId 从 maxZookeeperId+1 开始设置;
log.dirs=/tmp/kafka-logs-1 ## 消息日志数据保存的目录;
listeners=PLAINTEXT://:9091 ## 面向客户端的监听器列表;
zookeeper.connect=localhost:2181,localhost:2182,localhost:2183/kafka ## ZooKeeper服务器连接字符串;
### 重要参数
auto.create.topics.enable=true ## 当主题不存在时,允许自动创建主题(默认为 true);
num.partitions=3 ## 每个主题的默认日志分区数量(默认为1);
default.replication.factor=3 ## 自动创建主题的默认复制因子(默认为3);
log.retention.hours=168 ## 比这个保留期更早的消息将被丢弃(默认为 168小时,即7天);
delete.topic.enable=true ## 允许删除主题(默认为 true);
controlled.shutdown.enable=true ## 支持优雅的关机(默认为true)

4 设置环境变量

## sudo vim /etc/profile ## 所有用户有效
export KAFKA_HOME=/opt/kafka_2.11-1.0.0
export PATH=$PATH:$KAFKA_HOME/bin

保存文件,Linux 用户重新登录后生效。

5 防火墙放行

如果非本机应用需要连接,必须把监听端口放行。

### sudo vim /etc/sysconfig/iptables
-A INPUT -m state --state NEW -m tcp -p tcp --dport 9091 -j ACCEPT
-A INPUT -m state --state NEW -m tcp -p tcp --dport 9092 -j ACCEPT
-A INPUT -m state --state NEW -m tcp -p tcp --dport 9093 -j ACCEPT
### 重启生效: sudo systemctl restart iptables

6 启动服务器

指定选项 -daemon 时采用“守护进程”启动,否则以“控制台进程”启动。

## sudo vim $KAFKA_HOME/bin/kafka-server-start-all.sh
## sudo chmod +x $KAFKA_HOME/bin/kafka-server-start-all.sh
sudo $KAFKA_HOME/bin/kafka-server-start.sh -daemon $KAFKA_HOME/config/server-1.properties
sudo $KAFKA_HOME/bin/kafka-server-start.sh -daemon $KAFKA_HOME/config/server-2.properties
sudo $KAFKA_HOME/bin/kafka-server-start.sh -daemon $KAFKA_HOME/config/server-3.properties

7 停止服务器

Kafka 集群将自动检测到任何 Broker 故障或关机(包括人为地),并为该机器上的分区选择新的领导。 因为维护或配置更改而故意停机时,Kafka 支持更优雅的关机机制(默认配置已开启 controlled.shutdown.enable=true):

  • 将所有的日志同步到磁盘,以避免重新启动时需要做任何日志恢复;
  • 在关闭之前将服务器领导者的任何分区迁移到其他副本;
## sudo vim $KAFKA_HOME/bin/kafka-server-stop-all.sh
## sudo chmod +x $KAFKA_HOME/bin/kafka-server-stop-all.sh
## cat $KAFKA_HOME/bin/kafka-server-{stop,start}-all.sh | sudo tee $KAFKA_HOME/bin/kafka-server-restart-all.sh
## sudo chmod +x $KAFKA_HOME/bin/kafka-server-restart-all.sh
sudo $KAFKA_HOME/bin/kafka-server-stop.sh

8 命令行测试

注意:zookeeper 的 host:port/kafka,表示 ZooKeeper 中使用 /kafka 作为 kafka 存储信息的根目录。

8.1 主题的创建与查看

创建一个分区数为1、复制因子为 3 的主题,名称为 topicName 默认配置时(auto.create.topics.enable=true),针对不存在的主题发布或消费时,主题会自动创建,而且采用的分区数和复制因子都有相应的配置(num.partitions=1default.replication.factor=3)。

kafka-topics.sh --create --zookeeper localhost:2181/kafka --partitions 1 --replication-factor 3 --topic topicName
# Result: Created topic "topicName".
kafka-topics.sh --list --zookeeper localhost:2181/kafka
# Result: topicName / topicName2 / topicName3

8.2 主题描述信息查询

# kafka-topics.sh --create --zookeeper localhost:2181/kafka --partitions 3 --replication-factor 2 --topic topicName
kafka-topics.sh --describe --zookeeper localhost:2181/kafka --topic topicName

输出信息如下:

Topic:topicName3       PartitionCount:3             ReplicationFactor:2   Configs:
   Topic: topicName3   Partition: 0     Leader: 1   Replicas: 1,2         Isr: 1,2
   Topic: topicName3   Partition: 1     Leader: 2   Replicas: 2,3         Isr: 2,3
   Topic: topicName3   Partition: 2     Leader: 3   Replicas: 3,1         Isr: 3,1

可看到每个 broker 的角色:

  • 第一行(主题概要):分区数 3,复制因子 2;
  • 后面各行是各个分区(0/1/2)的信息,字段含义如下:
  •   Leader: 作为主题 Leader 的 brokerId;
  •   Replicas: 表示复制数据的节点的 brokerId(Leader 也可以在其中);
  •   isr(in-sync replicas): 是 Replicas 的子集(当前存活者),等待升级为 Leader 的 brokerId;

8.3 消息的发布和消费

kafka-console-producer.sh --broker-list localhost:9092 --topic topicName
## 等待提示符(大于号)出现后,输入文本,每行作为一个消息来发布。按 CTRL+C 结束
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topicName
## 会逐行打印收到的消息,按 CTRL+C 结束。如果要从头接收,要增加选项 --from-beginning

8.4 主题的删除【测试环境用】

grep "delete.topic.enable" $KAFKA_HOME/logs/server.log ## 确保为 true(默认)
kafka-topics.sh --delete --zookeeper localhost:2181/kafka --topic topicName
## 【可能不需要此步】前面只是对主题做了删除标记,必须手工再删除
$KAFKA_HOME/bin/kafka-server-stop-all.sh
zkCli.sh -server localhost:2181 <<EOF
   rmr /kafka/brokers/topics/topicName
   rmr /kafka/admin/delete_topics/topicName
   rmr /kafka/config/topics/topicName
   quit
EOF
sudo rm -rf /tmp/kafka-logs-{1,2,3}/topicName
$KAFKA_HOME/bin/kafka-server-start-all.sh
## 确认删除结果
kafka-topics.sh --list --zookeeper localhost:2181/kafka

9 清空数据【测试环境用】

sudo $KAFKA_HOME/bin/kafka-server-stop-all.sh
sudo rm -rf /tmp/kafka-logs /tmp/kafka-logs-{1,2,3}
sudo $KAFKA_HOME/bin/kafka-server-start-all.sh

作者:王克锋

出处:https://kefeng.wang/2017/11/16/kafka-deploy/

原文发布于微信公众号 - AiSmart4J(smart4j)

原文发表时间:2019-05-11

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

扫码关注云+社区

领取腾讯云代金券