Kafka 是一种高吞吐量的分布式发布订阅消息系统
producer:生产者。
consumer:消费者。
topic: 消息以为类别记录,Kafka将消息种子(Feed)分类, 每一类的消息称之为一个。
broker:以集群
的方式运行,可以由一个或多个服务组成,每个服务
叫做一个broker
;消费者可以订阅一个或多个主题(topic
), 并从Broker
拉数据,从而消费这些已发布的消息。
你的本地环境必须安装有Java 8+。
Apache Kafka2.8版本之后可以不需要使用ZooKeeper。
加压即可无需编译安装。
wget https://www.apache.org/dyn/closer.cgi?path=/kafka/3.1.0/kafka_2.12-3.1.0.tgz
tar -xzf kafka_2.12-3.1.0.tgz
cd kafka_2.12-3.1.0
#Apache Kafka2.8版本之前需要使用ZooKeeper,启动zookeeper
bin/zookeeper-server-start.sh config/zookeeper.properties
#打开另一个命令终端启动kafka服务,启动完成Kafka已经可以使用了
bin/kafka-server-start.sh config/server.properties &
#启动kafka客户端,创建一个只有一个分区和一个备份名称为"test"的Topic,partitions分区参数、replication-factor参数
#Apache Kafka2.8版本之前命令
bin/kafka-topics.sh --create --zookeeper localhost:9092 --replication-factor 1 --partitions 1 --topic test
#Apache Kafka2.8版本之后命令
bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
#查看已创建的topic信息
#Apache Kafka2.8版本之前命令
bin/kafka-topics.sh --describe --zookeeper localhost:9092 --topic test
#Apache Kafka2.8版本之后命令
bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic test
#显示所有话题
#Apache Kafka2.8版本之前命令
bin/kafka-topics.sh --list --zookeeper localhost:2181 test
#Apache Kafka2.8版本之后命令
bin/kafka-topics.sh --list --bootstrap-server localhost:9092 --topic test
#除了手工创建topic外,也可以配置你的broker,当发布一个不存在的topic时自动创建topic
#设置自动创建topic时设置默认的分区和副本数(在server.properties中配置)
# 自动创建主题
auto.create.topics.enable=true
# 默认主题的分区数
num.partitions=8
# 默认分区副本,default.replication.factor 默认分区副本数不得超过kafka节点数,一个节点放2份没意义,每个节点都需要配置,然后重启即可。
default.replication.factor=3
#启动一个生产者,每一行是一条消息,在控制台输入几条消息到服务器
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test #[等待输入自己的内容]
#输入message1,换行输入message2,换行输入message3
#启动一个消费者(等待消息)注意这里的--from-beginning,每次都会从头开始读取,你可以尝试去掉和不去掉看下效果
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning #[等待消息]
#显示message1换行message2换行message3