参考博文:kafka 配置文件参数详解
参考博文:Kafka【第一篇】Kafka集群搭建
参考博文:如何为Kafka集群选择合适的Partitions数量
参考博文:kafka常用配置【重要】
参考博文:kafka常用配置
主机名称 | IP信息 | 内网IP | 操作系统 | 安装软件 | 备注:运行程序 |
---|---|---|---|---|---|
mini01 | 10.0.0.11 | 172.16.1.11 | CentOS 7.4 | jdk、zookeeper、kafka | QuorumPeerMain、Kafka |
mini02 | 10.0.0.12 | 172.16.1.12 | CentOS 7.4 | jdk、zookeeper、kafka | QuorumPeerMain、Kafka |
mini03 | 10.0.0.13 | 172.16.1.13 | CentOS 7.4 | jdk、zookeeper、kafka | QuorumPeerMain、Kafka |
mini04 | 10.0.0.14 | 172.16.1.14 | CentOS 7.4 | jdk、zookeeper | QuorumPeerMain |
mini05 | 10.0.0.15 | 172.16.1.15 | CentOS 7.4 | jdk、zookeeper | QuorumPeerMain |
其中zookeeper的安装可参见:Hbase-2.0.0_01_安装部署
添加hosts信息,保证每台Linux都可以相互ping通
1 [root@mini01 ~]# cat /etc/hosts
2 127.0.0.1 localhost localhost.localdomain localhost4 localhost4.localdomain4
3 ::1 localhost localhost.localdomain localhost6 localhost6.localdomain6
4
5 172.16.1.11 mini01
6 172.16.1.12 mini02
7 172.16.1.13 mini03
8 172.16.1.14 mini04
9 172.16.1.15 mini05
windows的hosts文件也追加如下信息
1 c:\windows\system32\drivers\etc
2 ########################################## 追加信息如下:
3 10.0.0.11 mini01
4 10.0.0.12 mini02
5 10.0.0.13 mini03
6 10.0.0.14 mini04
7 10.0.0.15 mini05
1 [yun@mini01 software]$ pwd
2 /app/software
3 [yun@mini01 software]$ tar xf kafka_2.11-2.0.0.tgz
4 [yun@mini01 software]$ mv kafka_2.11-2.0.0 /app/
5 [yun@mini01 software]$ cd /app/
6 [yun@mini01 ~]$ ln -s kafka_2.11-2.0.0/ kafka
7 [yun@mini01 ~]$ ll -d kafka*
8 lrwxrwxrwx 1 yun yun 17 Sep 15 11:46 kafka -> kafka_2.11-2.0.0/
9 drwxr-xr-x 6 yun yun 89 Jul 24 22:19 kafka_2.11-2.0.0
需要使用root权限
1 [root@mini01 ~]# tail /etc/profile
2 done
3
4 unset i
5 unset -f pathmunge
6
7 # kafka
8 export KAFKA_HOME=/app/kafka
9 export PATH=$KAFKA_HOME/bin:$PATH
10
11 [root@mini01 profile.d]# logout
12 [yun@mini01 hbase]$ source /etc/profile # 使用yun用户,并重新加载环境变量
1 [yun@mini01 config]$ pwd
2 /app/kafka/config
3 [yun@mini01 config]$ vim server.properties
4 ############################# Server Basics #############################
5 # 每一个broker在集群中的唯一标示★★★
6 # 比如mini01 为0 mini02 为1 mini03 为2
7 broker.id=0
8
9 ############################# Socket Server Settings #############################
10 # The address the socket server listens on. It will get the value returned from
11 # java.net.InetAddress.getCanonicalHostName() if not configured.
12 # FORMAT:
13 # listeners = listener_name://host_name:port
14 # EXAMPLE:
15 # listeners = PLAINTEXT://your.host.name:9092
16 # 启动kafka服务监听的ip和端口,默认为java.net.InetAddress.getCanonicalHostName()获取的ip
17 #listeners=PLAINTEXT://:9092
18
19 # broker通知到producers和consumers的主机地址和端口号
20 # 如果未设置,使用listeners的配置。否则,使用java.net.InetAddress.getCanonicalHostName()返回的值
21 # 对于ipv4,基本就是localhost了 127.0.0.1 最后就是访问失败
22 #advertised.listeners=PLAINTEXT://your.host.name:9092
23 # 在不同的机器,名称不同 如mini01、mini02、mini03★★★
24 advertised.listeners=PLAINTEXT://mini01:9092
25
26 # broker 处理消息的最大线程数,一般情况下不需要去修改
27 num.network.threads=3
28
29 # broker处理磁盘IO 的线程数 ,数值应该大于你的硬盘数
30 num.io.threads=8
31
32 # 发送缓冲区buffer大小,数据不是一下子就发送的,先回存储到缓冲区了到达一定的大小后在发送,能提高性能
33 socket.send.buffer.bytes=102400
34
35 # kafka接收缓冲区大小,当数据到达一定大小后在序列化到磁盘
36 socket.receive.buffer.bytes=102400
37
38 # 这个参数是向kafka请求消息或者向kafka发送消息的请求的最大数,这个值不能超过java的堆栈大小
39 socket.request.max.bytes=104857600
40
41 ############################# Log Basics #############################
42 # kafka数据的存放地址,多个地址的话用逗号分割 /tmp/kafka-logs-1,/tmp/kafka-logs-2
43 log.dirs=/app/kafka/logs
44
45 # 默认的分区数,一个topic默认1个分区数
46 num.partitions=1
47
48 # 每个数据目录在启动时用于日志恢复和关闭时用于刷新的线程数。
49 num.recovery.threads.per.data.dir=1
50
51 ############################# Internal Topic Settings #############################
52 # 组元数据内部主题“__consumer_offset”和“__transaction_state”的复制因子
53 # 对于开发测试之外的任何内容,建议使用大于1的值来确保可用性,比如3。
54 offsets.topic.replication.factor=3
55 transaction.state.log.replication.factor=3
56 transaction.state.log.min.isr=3
57
58 ############################# Log Flush Policy #############################
59 # 在持久化到磁盘前message最大接收条数
60 #log.flush.interval.messages=10000
61 log.flush.interval.messages=10000
62
63 # 持久化的最大时间间隔
64 #log.flush.interval.ms=1000
65 log.flush.interval.ms=3000
66
67 ############################# Log Retention Policy #############################
68 # 默认消息的最大持久化时间,168小时,7天
69 # segment 文件保留的最长时间,超时则被删除
70 log.retention.hours=168
71
72 # 当分片的大小超过该值时,就会被删除。该功能不依赖于log.retention.hours。为 -1没有大小限制
73 #log.retention.bytes=1073741824
74 log.retention.bytes=-1
75
76 # 滚动生成新的segment文件的最大时间
77 log.roll.hours=168
78
79 # 单个分片的上限,达到该大小后会生成新的日志分片 1G
80 log.segment.bytes=1073741824
81
82 # 日志分片的检测时间间隔,每隔该时间会根据log保留策略决定是否删除log分片
83 log.retention.check.interval.ms=300000
84
85 # 默认为true 启用日志清理器进程在服务器上运行
86 log.cleaner.enable=true
87
88 # 默认为true 【当前版本】 启用删除主题。 如果此配置已关闭,则通过管理工具删除主题将不起作用
89 delete.topic.enable=true
90 ############################# Zookeeper #############################
91 # Zookeeper connection string (see zookeeper docs for details).
92 # This is a comma separated host:port pairs, each corresponding to a zk
93 # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
94 # You can also append an optional chroot string to the urls to specify the
95 # root directory for all kafka znodes.
96 zookeeper.connect=mini01:2181,mini02:2181,mini03:2181,mini04:2181,mini05:2181
97
98 # Timeout in ms for connecting to zookeeper
99 zookeeper.connection.timeout.ms=6000
100
101
102 ############################# Group Coordinator Settings #############################
103 # 在开发测试环境下该值设置为0,保证启动后马上可以使用。但在生产环境下,默认值3秒更适合
104 group.initial.rebalance.delay.ms=3000
1 [yun@mini01 config]$ pwd
2 /app/kafka/config
3 [yun@mini01 config]$ vim producer.properties
4 ………………
5 # 用于建立到Kafka集群的初始连接的主机/端口对列表。客户机将使用所有服务器,而不管这里为引导绑定指定了哪些服务器——此列表只影响用于发现完整服务器集的初始主机。
6 # 由于这些服务器仅用于初始连接,以发现完整的集群成员关系(可能会动态更改),因此这个列表不需要包含完整的服务器集(但是,如果服务器宕机,您可能需要多个服务器)。
7 bootstrap.servers=mini01:9092,mini02:9092,mini03:9092
8 ………………
9 [yun@mini01 config]$ vim consumer.properties
10 ………………
11 bootstrap.servers=mini01:9092,mini02:9092,mini03:9092
12 ………………
1 [yun@mini01 ~]$ kafka-server-start.sh -daemon /app/kafka/config/server.properties # -daemon 可选参数,后台运行
2 # 如果是后台运行,则控制台没有下面日志信息
3 [2018-09-15 18:38:38,700] INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$)
4 [2018-09-15 18:38:39,564] INFO starting (kafka.server.KafkaServer)
5 ……………………
6 [yun@mini01 ~]$ kafka-server-stop.sh # 停止kafka
1 [zk: localhost:2181(CONNECTED) 10] ls /
2 [cluster, controller, brokers, zookeeper, admin, isr_change_notification, log_dir_event_notification, controller_epoch, zhang01, consumers, latest_producer_id_block, config, hbase]
3 [zk: localhost:2181(CONNECTED) 11] ls /brokers
4 [ids, topics, seqid]
5 [zk: localhost:2181(CONNECTED) 12] ls /brokers/ids
6 [0, 1, 2]