\
\
\
\
\
\
\
创建一个名字为“test”的Topic,这个topic只有一个partition,并且备份因子也设置为1。当producer发布一个消息到某个指定的Topic,这个Topic如果不存在,就自动创建。
\
/opt/kafka_2.13-2.7.1/bin/kafka-topics.sh --create --zookeeper 106.14.132.94:2181 --replication-factor 1 --partitions 1 --topic test
\
\
/opt/kafka_2.13-2.7.1/bin/kafka-topics.sh --list --zookeeper 106.14.132.94:2181
\
列表中有一个__consumer_offsets主题,这个主题不能删除哟
\
\
/opt/kafka_2.13-2.7.1/bin/kafka-topics.sh --delete --topic test --zookeeper 106.14.132.94:2181
\
如果出现This will have no impact if delete.topic.enable is not set to true.
\
彻底删除topic:
[1. ] 删除Topic,delete.topic.enable=true这里要设置为true,需要在$KAFKA/config/server.properties中配置delete.topic.enble=true
[2. ] 删除log日志
[3. ] 删除ZK中的Topic记录
\
删除列表中有一个consumer_offsets主题会出现,Topic consumer_offsets is a kafka internal topic and is not allowed to be marked for deletion.
\
__consumer_offsets这个topic是由kafka自动创建的,默认49个,这个topic是不能被删除的!
\
为什么这里会是这样存储__consumer_offsets的呢?
\
[1.] 将所有 N Broker 和待分配的 i 个 Partition 排序
[2.] 将第 i 个 Partition 分配到第(i mod n)个 Broker 上
\
\
kafka自带了一个producer命令客户端,可以从本地文件中读取内容,或者我们也可以以命令行中直接输入内容,并将这些内容以消息的形式发送到kafka集群中。在默认情况下,每一个行会被当做成一个独立的消息。首先我们要运行发布消息的脚本,然后在命令中输入要发送的消息的内容。
\
/opt/kafka_2.13-2.7.1/bin/kafka-console-producer.sh --broker-list 106.14.132.94:9092 --topic test
\
this is a news this is a another news
\
\
对于consumer,kafka同样也携带了一个命令行客户端,会将获取到内容在命令中进行输出,默认是消费最新的消息。由于我们已经发送了消息了,想要消费之前的消息可以通过--from-beginning参数指定。
\
\
/opt/kafka_2.13-2.7.1/bin/kafka-console-consumer.sh --bootstrap-server 106.14.132.94:9092 --from-beginning --topic test
\
\
/opt/kafka_2.13-2.7.1/bin/kafka-console-consumer.sh --bootstrap-server 106.14.132.94:9092 --topic test
\
通过不同的终端窗口来运行以上的命令,你将会看到在producer终端输入的内容,很快就会在consumer的终端窗口上显示出来。
\
\
/opt/kafka_2.13-2.7.1/bin/kafka-console-consumer.sh --bootstrap-server 106.14.132.94:9092 --from-beginning --whitelist "test|test2"
\