Apache Kafka是一个分布式发布 - 订阅消息系统和一个强大的队列,可以处理大量的数据,并使您能够将消息从一个端点传递到另一个端点。 Kafka适合离线和在线消息消费。 Kafka消息保留在磁盘上,并在群集内复制以防止数据丢失。 Kafka构建在ZooKeeper同步服务之上。 它与Apache Storm和Spark非常好地集成,用于实时流式数据分析。
所以说,Kafka还是一个MQ,这时候,你肯定会想到ActiveMQ、RabbitMQ、RocketMQ等,在《Web基础配置篇(十): ActiveMQ与RabbitMQ的安装配置及使用》 一篇中,已经大概讲述了他们之间的区别,这里还是要简单说明一下:
所以,网上一般的推荐就是,中小型公司可以选择RabbitMQ,因为怕阿里不维护RocketMQ了,就没有能力去维护RocketMQ;大型软件公司可以选择rocketMq,因为有钱,所以有人维护。至于kafka,根据业务场景选择,大数据领域中以及日志采集,肯定是首选kafka了。
**如果大家正在寻找一个java的学习环境,或者在开发中遇到困难,可以<a
href="https://jq.qq.com/?_wv=1027&k=52sgH1J"
target="_blank">
加入我们的java学习圈,点击即可加入
</a>
,共同学习,节约学习时间,减少很多在学习中遇到的难题。**
Kafka是依赖于zookeeper的,所以要先安装zookeeper并启动,可以参考《Web基础配置篇(十一): Zookeeper的安装配置及使用》这一篇来安装启动zookeeper。
在这里插入图片描述
配置文件在conf目录下,主要配置是server.properties、producer.properties和consumer.properties;而kafka启动的配置文件是server.properties。
这里只说server.properties几个关键配置:
##每一个broker在集群中的唯一标示,要求是正数。在改变IP地址,不改变broker.id的话不会影响consumers
broker.id=100
##kafka监听地址,advertised.listeners默认也是用这个地址,如果不配置,检测的是计算机的hostname,比如我的windows是PLAINTEXT://DESKTOP-OL03P5L:9092,所以建议改掉。
listeners=PLAINTEXT://10.247.62.91:9092
##暴漏出去的地址,注册到zookeeper的地址,默认和listeners一样。比如我在docker中绑定地址是172网段的,暴漏出去的就要写192网段的地址了。
advertised.listeners=PLAINTEXT://10.247.62.91:9092
## broker 处理消息的最大线程数,一般情况下不需要去修改
num.network.threads=3
## broker处理磁盘IO 的线程数 ,数值应该大于你的硬盘数
num.io.threads=8
##kafka数据的存放地址(不是日志地址,kafka的日志默认在安装目录的logs文件夹下),多个地址的话用逗号分割 /tmp/kafka-logs-1,/tmp/kafka-logs-2
log.dirs=E:/DevSoft/kafka/kafka_2.12-2.3.0/log
## 每个topic的分区个数,若是在topic创建时候没有指定的话 会被topic创建时的指定参数覆盖
num.partitions=1
## 数据存储的最大时间
log.retention.hours=168
##zookeeper集群的地址,可以是多个,多个之间用逗号分割 hostname1:port1,hostname2:port2,hostname3:port3
zookeeper.connect=localhost:2181
检查配置
修改server.properties文件,来满足自己的需求,比如log.dirs、zookeeper.connect、listeners等配置,:
启动
打开powershell或者cmd,首先进入到kafka的安装目录下,输入命令.\bin\windows\kafka-server-start.bat config/server.properties
:
然后它就运行起来了。打开zk的界面,看下,已经有kafka相关信息:
在这里插入图片描述
点开brokers/ids,可以看到:
在这里插入图片描述
检查配置
修改server.properties文件,来满足自己的需求,比如log.dirs、zookeeper.connect、listeners等配置,:
如果注册到同一个zookeeper,注意broker.id不要重复。
启动
进入到kafka的安装目录下,输入命令bin/kafka-server-start.sh config/server.properties
:
然后它就运行起来了。打开zk的界面,看下,已经有新注册的kafka相关信息:
在这里插入图片描述
但是它的endpoints地址竟然是一串字符串,这是因为我用的docker,读hostname只能读到这个,这个是无效的,还是换成ip靠谱,修改server.properties文件的listeners配置和advertised.listeners配置,将它改为ip地址,注意,listeners配置是监听地址,必须是本机的网络接口ip;advertised.listeners配置是暴漏地址,可以写网关地址。
集群和单机的配置是一样的,只要注册到zookeeper的节点可达,可以持续添加节点。但注意的是broker.id不要重复。
我这里分别在windows和linux上安装了一个kafka:
可以正常同步数据。
集群和单机的命令没区别,所以这里直接使用两个机器做测试。
在kafka安装目录下执行下面命令。
windows上使用下面命令:
.\bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 2 -partitions 1 --topic helloTets
Linux上使用: ./bin/kafka-topics.sh --create --zookeeper 10.247.62.91:2181 --replication-factor 2 -partitions 1 --topic helloWorld
这时候就建了两个topics:
zookeeper上显示如下:
在这里插入图片描述
点开上图中的topics,可以查看详情:
在这里插入图片描述
windows的kafka配置的数据文件夹下已经有两个topics(linux上创建的已经同步过来):
在这里插入图片描述
新建一个topics(这里是TestTest)之后,在kafka安装目录下执行下面命令。
windows上使用下面命令:
.\bin\windows\kafka-topics.bat --zookeeper 10.247.62.91:2181 --alter --topic TestTest --partitions 3
Linux上使用: ./bin/kafka-topics.sh --zookeeper 10.247.62.91:2181 --alter --topic TestTest --partitions 4
注意,修改topics的分区数,只能增加,减少会报错:Error while executing topic command : The number of partitions for a topic can only be increased。
在kafka安装目录下执行下面命令。
windows上使用下面命令:
.\bin\windows\kafka-topics.bat --zookeeper 10.247.62.91:2181 --delete --topic TestTest
Linux上使用: ./bin/kafka-topics.sh --zookeeper 10.247.62.91:2181 --delete --topic TestTest
注意,如果delete.topic.enable没有设置为true,这里的删除只是标记删除,并不真删;删除之后再次删除会提示已经删除。
修改producer.properties文件中的bootstrap.servers配置(经测试这个配置改不改不影响),其他配置默认即可。
## brokers节点列表
bootstrap.servers=10.247.62.91:9092,192.168.99.100:9092
#压缩类型,有none, gzip, snappy, lz4, zstd。
compression.type=none
在kafka安装目录下执行下面命令(注意,--broker-list 指定的ip必须是监听地址配置的ip,否则连接不能,尽管localhost是可达的,它也是无效的):
windows上使用下面命令:
.\bin\windows\kafka-console-producer.bat --broker-list 10.247.62.91:9092 --topic helloWorld
Linux上使用: ./bin/kafka-console-producer.sh --broker-list 192.168.99.100:9092 --topic helloTets
输入命令后,光标就在等你输入了,你可以任意输入字符串即可。
在任何一个节点写入数据,其他节点很快就会同步过来,打开相应topics目录下的log文件,比如我的是00000000000000000000.log,可以看到刚才输入的数据(有乱码存在,正常的,不乱码才不正常,总要整个看不懂的分割符吧)
修改consumer.properties文件中的bootstrap.servers配置(经测试这个配置改不改不影响使用,因为命令行需要传入)。
## brokers节点列表
bootstrap.servers=10.247.62.91:9092,192.168.99.100:9092
在kafka安装目录下执行下面命令(注意替换topics):
windows上使用下面命令:
.\bin\windows\kafka-console-consumer.bat --bootstrap-server 10.247.62.91:9092,192.168.99.100:9092 --topic helloTets --from-beginning
Linux上使用: ./bin/kafka-console-consumer.sh --bootstrap-server 10.247.62.91:9092,192.168.99.100:9092 --topic helloTets —from-beginning
下图是我测试的过程:
在这里插入图片描述
如图所示:
所以,大概能做这样的结论:kafka存储使用的是UTF-8编码。windows默认一般是gbk,所以无法正常显示windows/linux的数据。