趁着毕设初期,还能摸会儿?,了解波 Kafka。
Kafka,分布式消息引擎系统,主要功能是提供一套完备的消息发布与订阅解决方案。Kafka 也是一个分布式的、分区的、多副本的多订阅者,基于 Zookeeper 协调的分布式日志系统,可用于处理 Web 日志和消息服务。
与 Kafka 相关的几个问题:
Partition 机制,一个 Topic 划分为多个 Partition,防止单台 Broker 机器无法容纳太多的数据,Partition 机制与 Replica 机制联系紧密,每个 Partition 可以有多个 Replica(1 Leader + N Followers)。
Zookeeper 可为分布式系统提供分布式配置服务、同步服务和命名注册服务。
从前文可知,Kafka 的消息存储在 Topic 中,一个 Topic 又可以划分为多个 Partition,多 Partition 时,Kafka 只能保证 Partition 内的消息有序(Offset保证有序),如需保证 Topic 消息的有序,那么只能使用单个Partition了。如果仍要使用多个 Partition,消息的分区写入策略应选择按键(Key)保存。
既然只是玩一下,不如使用 Docker 搭建 Kafka 环境吧,“即用即焚”。
环境:Windows 10 Docker Desktop + WSL
这里通过 Docker-Compose 搭建个单机版的 kafka 集群,编排文件如下:
version: '3.4'
services:
zoo1:
container_name: zookeeper-one
image: zookeeper:3.4.9
hostname: zoo1
ports:
- "2181:2181"
environment:
ZOO_MY_ID: 1
ZOO_PORT: 2181
ZOO_SERVERS: server.1=zoo1:2888:3888
volumes:
- ./zk-single-kafka-single/zoo1/data:/data
- ./zk-single-kafka-single/zoo1/datalog:/datalog
kafka1:
container_name: kafka-one
image: confluentinc/cp-kafka:5.3.1
hostname: kafka1
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka1:19092,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181"
KAFKA_BROKER_ID: 1
KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
volumes:
- ./zk-single-kafka-single/kafka1/data:/var/lib/kafka/data
depends_on:
- zoo1
该编排文件来自:https://github.com/simplesteph/kafka-stack-docker-compose 的 zk-single-kafka-single.yml。使用 docker-compose up
启动容器。
编排文件中所使用到的镜像 confluentinc/cp-kafka:5.3.1
和 zookeeper:3.4.9
配置参考:
Kafka 和 Zookeeper 容器启动后,配合 IDEA 的两个插件 Kafkalytic
和 Zoolytic
,我们可以很方便的观察集群的情况:
通过 vscode 插件我们可以方便的对启动的容器进行管理(日志追踪、shell attach等):
通过 Kafka 自带的命令行工具可以查看 Topic:(先连接到 Kafka 容器:docker exec -it kafka-one bash
)
root@kafka1:/# kafka-topics --describe --zookeeper zoo1:2181
Topic:__confluent.support.metrics PartitionCount:1 ReplicationFactor:1 Configs:retention.ms=31536000000
Topic: __confluent.support.metrics Partition: 0 Leader: 1 Replicas: 1 Isr: 1
Topic:__consumer_offsets PartitionCount:50 ReplicationFactor:1 Configs:s
......
Go 中有两个比较有名的 Go Client,即 kafka-go 和 confluent-kafka-go。我都不熟悉?,但是前面编排时用到了 confluent 公司的 Kafka 镜像,所以这里选用 confluent-kafka-go
创建 Client。confluent-kafka-go 项目的 example 拿来即用。
1、创建 Go Module
mkdir go-kafka-demo
cd go-kafka-demo
go mod init github.com/yeshan333/go-kafka-demo
go get -u github.com/confluentinc/confluent-kafka-go
2、创建 Consumer。这个 Consumer 订阅的 Topic 为 myTopic。
// kafka_consumer.go
package main
import (
"fmt"
"github.com/confluentinc/confluent-kafka-go/kafka"
)
func main() {
c, err := kafka.NewConsumer(&kafka.ConfigMap{
"bootstrap.servers": "localhost",
"group.id": "myGroup",
"auto.offset.reset": "earliest",
})
if err != nil {
panic(err)
}
c.SubscribeTopics([]string{"myTopic", "^aRegex.*[Tt]opic"}, nil)
for {
msg, err := c.ReadMessage(-1)
if err == nil {
fmt.Printf("Message on %s: %s\n", msg.TopicPartition, string(msg.Value))
} else {
// The client will automatically try to recover from all errors.
fmt.Printf("Consumer error: %v (%v)\n", err, msg)
}
}
c.Close()
}
3、创建 Producer。这个 Producer 向 myTopic Topic 发送了 7 条消息。
// kafka_producer.go
package main
import (
"fmt"
"github.com/confluentinc/confluent-kafka-go/kafka"
)
func main() {
p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": "localhost"})
if err != nil {
panic(err)
}
defer p.Close()
// Delivery report handler for produced messages
go func() {
for e := range p.Events() {
switch ev := e.(type) {
case *kafka.Message:
if ev.TopicPartition.Error != nil {
fmt.Printf("Delivery failed: %v\n", ev.TopicPartition)
} else {
fmt.Printf("Delivered message to %v\n", ev.TopicPartition)
}
}
}
}()
// Produce messages to topic (asynchronously)
topic := "myTopic"
for _, word := range []string{"Welcome", "to", "the", "Confluent", "Kafka", "Golang", "client"} {
p.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
Value: []byte(word),
}, nil)
}
// Wait for message deliveries before shutting down
p.Flush(15 * 1000)
}
4、两个 terminal,先跑 Consumer,再跑 Producer。
# terminal 1
go run kafka_consumer.go
# ternimal 2
go run kafka_producer.go
收工,其他东西后续慢慢啃。本文源文件:https://github.com/yeshan333/go-kafka-demo
博客内容遵循 署名-非商业性使用-相同方式共享 4.0 国际 (CC BY-NC-SA 4.0) 协议 我的博客即将同步至腾讯云+社区,邀请大家一同入驻:https://cloud.tencent.com/developer/support-plan?invite_code=1utoln9pyvwqu