前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >集群部署kafka和zookeeper

集群部署kafka和zookeeper

原创
作者头像
mariolu
修改2020-06-22 00:48:14
1K0
修改2020-06-22 00:48:14
举报

一、前置条件:安装zookpeer和kafka

下载zookeeper版本,3.5.16源码安装后发现启动脚本一些缺少.class等的java lib库,网上有人建议用3.4系列,

图1 错误示例
图1 错误示例

这里以3.4.14为例:

代码语言:shell
复制
cd /opt/
wget http://apache.communilink.net/zookeeper/zookeeper-3.4.14/zookeeper-3.4.14.tar.gz
tar zxvf zookeeper-3.4.14.tar.gz 
cd zookeeper-3.4.14/conf
cp zoo_sample.cfg zoo.cfg
vim /opt/zookeeper-3.4.14/conf/zoo.cfg

1.1 安装zookeeper

1.1.1 配置运行参数zookeeper

zoo.cfg需要可能更改的配置包括:

  • tickTime: 心跳时间和超时时间
  • dataDir: 磁盘保存zookeeper内存的快照
  • clientPort: 监听端口:默认值是2181
  • initLimit: 在主从模式下leader-follower,follow连接leader的的最大允许丢失的tick次数,否则超时。在initLimit*tickTime时间内,如果folloer没有连接上leader,则超时了
  • syncLimit:folloer和leader同步的允许最大时间(tickTime*syncLimit),
  • server.X=host:port1:port2, X是全局唯一的数字,和dataDir配置路径下,有一个myid文件的内容数字对应。X是zookeeper的位移ID,在zoo.cfg和myid文件中设置且两者相等,host是该主机的名字或者ip,port1用于folloer连接leader的监听端口 ,port2用于leader选举

1.1.2 启动zookeeper

代码语言:javascript
复制
/opt/zookeeper-3.4.14
bin/zkServer.sh start conf/zoo.cfg 

启动完查看是否成功用

代码语言:javascript
复制
bin/zkServer.sh status conf/zoo.cfg 

执行jps可以看到QuorumPeerMain,也就是刚刚启动zooker进程

1.2. 安装kafka

1.2.1 前置处理

代码语言:javascript
复制
配置用户信息:
useradd kafka -m
usermod --shell /bin/bash kafka
usermod -aG wheel kafka
下载kafka:
curl http://apache.stu.edu.tw/kafka/2.3.1/kafka_2.11-2.3.1.tgz -o /home/kafka/Downloads/kafka.tgz
cd /home/kafka;tar -xvzf Downloads/kafka.tgz --strip 1
mkdir /home/kafka/Downloads

1.2.2 配置kafka

代码语言:javascript
复制
broker.id=0                                                                                                                                                             delete.topic.enable=true
listeners=PLAINTEXT://localhost:9192
log.dirs=/tmp/kafka-logs_cluster1
zookeeper.connect=localhost:2281,localhost:2282
unclean.leader.election.enable=false
zookeeper.connection.timeout.ms=6000

broker.id设置了全局标识符,集群里是唯一,listener设置该broker的监听地址和端口,logs.dirs配置了kafka的磁盘路径,这个在kafka磁盘, 每条消息持久化底层存储,规定数量的broker成功接收才通知client发送成功。zookeeper.connect

配置了zookeeper的连接地址(?确认下)

1.2.3 启动kafka

cd /home/kafka

bin/kafka-server-start.sh -x -daemon config/server.properties

也可以把kafka写成一个服务,后续可以直接用service kafka start/stop/restart/status执行kafka相应操作。

代码语言:javascript
复制
[Unit]                                                                                                                                                                    
Requires=zookeeper.service
After=zookeeper.service

[Service]
Type=simple
User=kafka
ExecStart=/bin/sh -c '/home/kafka/bin/kafka-server-start.sh /home/kafka/config/server.properties > /home/kafka/kafka.log 2>&1'
ExecStop=/home/kafka/bin/kafka-server-stop.sh
Restart=on-abnormal

[Install]
WantedBy=multi-user.target

如果是跟我一样在内存很小的vps主机做集群环境搭建,那么启动kafka可能会这种内存错误

图2 加上jvm限制内存使用编译参数
图2 加上jvm限制内存使用编译参数

二、集群部署

2.1 集群部署zk

拷贝一份zoo.cfg

图3 拷贝zk配置
图3 拷贝zk配置

在zoo2.cfg修改以下参数:换个内存快照存放硬盘路径dataDir。换下kafka监听端口,配置和server.X的互动地址和端口

图4. zoo.cfg文件修改内容
图4. zoo.cfg文件修改内容

在dataDir路径的写一个myid文件,这个值为kafka的broker.id

启动zk会产生dataDir配置的目录(不用手动创建,用户组是kafka.kafka)

重复以上步骤,可以创建多份拷贝zooX.cfg,启动这些配置,同时观察状态:可以看到有个follower和leader的角色

图6、启动集群zk和观察集群zk角色
图6、启动集群zk和观察集群zk角色

jps观察到多个zk

图7 jps观察zk进程
图7 jps观察zk进程

2.2 集群部署kafka

复制多份kafka的配置文件。

在/home/kafka/config/目录下,做多分server.properties拷贝

图8 复制kafka配置文件
图8 复制kafka配置文件

需要个性化配置:(需要设置broker id和监听端口,和log.dirs目录,)

代码语言:javascript
复制
broker.id=0 
delete.topic.enable=true
listeners=PLAINTEXT://localhost:9192
log.dirs=/tmp/kafka-logs_cluster1
zookeeper.connect=localhost:2281,localhost:2282
unclean.leader.election.enable=false
zookeeper.connection.timeout.ms=6000

如果logs.dirs是已存在的,记得先rm这个目录,清空log.dirs目录,也就是/tmp/kafka-logs_cluster1

相应的目录

在/etc/systemd/system/kafka1.service

写入脚本:

代码语言:javascript
复制
[Unit]
Requires=zookeeper.service
After=zookeeper.service
[Service] 
Environment="KAFKA_HEAP_OPTS=-Xmx256M -Xms128M"
Type=simple
User=kafka
ExecStart=/bin/sh -c '/home/kafka/bin/kafka-server-start.sh /home/kafka/config/server_cluster1.properties > /home/kafka/kafka_cluster1.log 2>&1'
ExecStop=/home/kafka/bin/kafka-server-stop.sh
Restart=on-abnormal
[Install]
WantedBy=multi-user.target

其中设置变量Environment="KAFKA_HEAP_OPTS=-Xmx256M -Xms128M",是因为我的虚拟机起几个kafka,内存不够

后续就可以用service kafka1 restart了

三、测试

图9 创建一个topic测试
图9 创建一个topic测试
  • 第一个行显示总结,后面每行描述一个partition,这里例子我们的节点是(broker.0和broker.2, 没有broker.1)
  • leader 是所有partitons中负责读写的节点
  • replicas 显示给定partiton所有副本所存储节点的节点列表,不管该节点是否是leader或者是否存活。
  • isr 副本都已同步的的节点集合,状态是存活,并且跟leader同步

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、前置条件:安装zookpeer和kafka
    • 1.1 安装zookeeper
      • 1.1.1 配置运行参数zookeeper
      • 1.1.2 启动zookeeper
    • 1.2. 安装kafka
      • 1.2.1 前置处理
        • 1.2.2 配置kafka
          • 1.2.3 启动kafka
          • 二、集群部署
            • 2.1 集群部署zk
              • 2.2 集群部署kafka
              • 三、测试
              领券
              问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档