Kafka 是一种高吞吐的分布式发布订阅消息系统,能够替代传统的消息队列用于解耦合数据处理,缓存未处理消息等,同时具有更高的吞吐率,支持分区、多副本、冗余,因此被广泛用于大规模消息数据处理应用。Kafka 支持Java 及多种其它语言客户端,可与Hadoop、Storm、Spark等其它大数据工具结合使用。
Zookeeper集群: 192.168.252.121:2181,192.168.252.122:2181,192.168.252.123:2181
kafka 集群: 192.168.252.124 , 192.168.252.125 , 192.168.252.126
kafka-manager: 192.168.252.127
CentOs7.3 搭建 ZooKeeper-3.4.9 Cluster 集群服务
Zookeeper集群: 192.168.252.121:2181,192.168.252.122:2181,192.168.252.123:2181
主机名依次被我修改成: node1,node2,node3
kafka 集群: 192.168.252.124 , 192.168.252.125 , 192.168.252.126
主机名依次被我修改成: node4,node5,node6
kafka 官网下载 http://kafka.apache.org/downloads
$ cd /opt
$ wget https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/0.11.0.0/kafka_2.12-0.11.0.0.tgz
$ tar -zxvf kafka_2.12-0.11.0.0.tgz
$ cd kafka_2.12-0.11.0.0
在 node4 操作
$ vi /opt/kafka_2.12-0.11.0.0/config/server.properties
broker.id=0 每台服务器不能重复
#设置zookeeper的集群地址
zookeeper.connect=192.168.252.121:2181,192.168.252.122:2181,192.168.252.123:2181
把配置复制到 node5,node6 集群
$ for a in {5..6} ; do scp -r /opt/kafka_2.12-0.11.0.0/ node$a:/opt/kafka_2.12-0.11.0.0 ; done
修改 node5,node6 集群的broker.id
$ vi /opt/kafka_2.12-0.11.0.0/config/server.properties
在 node1,启动 Kafka使用的 ZooKeeper,所以先启动ZooKeeper服务器
$ for a in {1..3} ; do ssh node$a "source /etc/profile; /opt/zookeeper-3.4.9/bin/zkServer.sh start" ; done
现在 node4 启动Kafka服务器
$ for a in {4..6} ; do ssh node$a "source /etc/profile; /opt/kafka_2.12-0.11.0.0/bin/kafka-server-start.sh /opt/kafka_2.12-0.11.0.0/config/server.properties" ; done
或者后台启动运行,日志查看去Kafka解压目录有个log
文件夹查看
$ for a in {4..6} ; do ssh node$a "source /etc/profile; nohup /opt/kafka_2.12-0.11.0.0/bin/kafka-server-start.sh /opt/kafka_2.12-0.11.0.0/config/server.properties > /dev/null 2>&1 &" ; done
查看进程,Kafka 是否启动成功
$ jps
3825 Kafka
6360 Jps
如果报错删除
kafka.common.KafkaException: Failed to acquire lock on file .lock in /tmp/kafka-logs. A Kafka instance in another process or thread is using this directory.
$ rm -rf /tmp/kafka-logs
$ /opt/kafka_2.12-0.11.0.0/bin/kafka-topics.sh --create --zookeeper 192.168.252.121:2181,192.168.252.122:2181,192.168.252.123:2181 --replication-factor 2 --partitions 1 --topic ymq
–replication-factor 2 #复制两份 –partitions 1 #创建1个分区 –topic #主题为ymq
运行list topic命令,可以看到该主题:
$ /opt/kafka_2.12-0.11.0.0/bin/kafka-topics.sh --list --zookeeper 192.168.252.121:2181,192.168.252.122:2181,192.168.252.123:2181
Kafka附带一个命令行客户端,它将从文件或标准输入中输入,并将其作为消息发送到Kafka群集。默认情况下,每行将作为单独的消息发送。
在 node5 运行生产者,然后在控制台中输入一些消息以发送到服务器。
$ /opt/kafka_2.12-0.11.0.0/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic ymq
>www.ymq.io
在node6 运行消费者,将把消息转储到标准输出。
$ /opt/kafka_2.12-0.11.0.0/bin/kafka-console-consumer.sh --zookeeper 192.168.252.121:2181,192.168.252.122:2181,192.168.252.123:2181 --topic ymq --from-beginning
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].
www.ymq.io
用describe 查看集群中topic每个节点情况
$ /opt/kafka_2.12-0.11.0.0/bin/kafka-topics.sh --describe --zookeeper 192.168.252.121:2181,192.168.252.122:2181,192.168.252.123:2181 --topic ymq
Topic:ymq PartitionCount:1 ReplicationFactor:2 Configs:
Topic: ymq Partition: 0 Leader: 1 Replicas: 1,3 Isr: 3,1
以下是输出的说明。第一行给出了所有分区的摘要,每个附加行提供有关一个分区的信息。由于我们这个主题只有一个分区,只有一行。
leader
负责给定分区的读取和写入分配节点编号,每个分区的部分数据会随机指定不同的节点 replicas
是复制此分区的日志的节点列表 isr
一组正在同步的副本列表
$ /opt/kafka_2.12-0.11.0.0/bin/kafka-topics.sh --delete --zookeeper 192.168.252.121:2181,192.168.252.122:2181,192.168.252.123:2181 --topic ymq
Topic ymq is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.
$ for a in {4..6} ; do ssh node$a "source /etc/profile; /opt/kafka_2.12-0.11.0.0/bin/kafka-server-stop.sh /opt/kafka_2.12-0.11.0.0/config/server.properties " ; done
Yahoo开源Kafka集群管理器Kafka Manager
作为一个分布式的消息发布-订阅系统,Apache Kafka在Yahoo内部已经被很多团队所使用,例如媒体分析团队就将其应用到了实时分析流水线中,同时,Yahoo整个Kafka集群处理的峰值带宽超过了20Gbps(压缩数据)。为了让开发者和服务工程师能够更加简单地维护Kafka集群,Yahoo构建了一个基于Web的管理工具,称为Kafka Manager,日前该项目已经在GitHub上开源。
通过Kafka Manager用户能够更容易地发现集群中哪些主题或者分区分布不均匀,同时能够管理多个集群,能够更容易地检查集群的状态,能够创建主题,执行首选的副本选择,能够基于集群当前的状态生成分区分配,并基于生成的分配执行分区的重分配,此外,Kafka Manager还是一个非常好的可以快速查看集群状态的工具。
Kafka Manager使用Scala语言编写,其Web控制台基于Play Framework实现,除此之外,Yahoo还迁移了一些Apache Kafka的帮助程序以便能够与Apache Curator框架一起工作。
一、它支持以下内容:
在 kafka-manager: 192.168.252.127 node7 部署
编译超级慢
$ yum install git
$ cd /opt/
$ git clone https://github.com/yahoo/kafka-manager
$ cd kafka-manager/
$ ./sbt clean dist
反正我是没编译成功,从网上找了一个编译好的
链接: 百度网盘下载 密码: kha2
$ yum install unzip
$ unzip kafka-manager-1.3.2.1.zip
$ vi /opt/kafka-manager-1.3.2.1/conf/application.conf
修改这个 zk 地址
kafka-manager.zkhosts="192.168.252.121:2181,192.168.252.122:2181,192.168.252.123:2181"
默认端口 NettyServer - Listening for HTTP on /0:0:0:0:0:0:0:0:9000
$ /opt/kafka-manager-1.3.2.1/bin/kafka-manager -Dconfig.file=conf/application.conf
或者后台运行 并且配置端口
$ nohup bin/kafka-manager -Dconfig.file=/home/hadoop/app/kafka-manager-1.3.2.1/conf/application.conf -Dhttp.port=9000 &
访问:http://ip:9000/
扫码关注腾讯云开发者
领取腾讯云代金券
Copyright © 2013 - 2025 Tencent Cloud. All Rights Reserved. 腾讯云 版权所有
深圳市腾讯计算机系统有限公司 ICP备案/许可证号:粤B2-20090059 深公网安备号 44030502008569
腾讯云计算(北京)有限责任公司 京ICP证150476号 | 京ICP备11018762号 | 京公网安备号11010802020287
Copyright © 2013 - 2025 Tencent Cloud.
All Rights Reserved. 腾讯云 版权所有