kubernetes 中 kafka 和 zookeeper 有状态集群服务部署实践 (一)

引言

Kafka和zookeeper是在两种典型的有状态的集群服务。首先kafka和zookeeper都需要存储盘来保存有状态信息,其次kafka和zookeeper每一个实例都需要有对应的实例Id(Kafka需要broker.id,zookeeper需要my.id)来作为集群内部每个成员的标识,集群内节点之间进行内部通信时需要用到这些标识。

对于这类服务的部署,需要解决两个大的问题,一个是状态保存,另一个是集群管理(多服务实例管理)。kubernetes中提的StatefulSet(1.5版本之前称为Petset)方便了有状态集群服务在上的部署和管理。具体来说是通过Init Container来做集群的初始化工 作,用 Headless Service来维持集群成员的稳定关系,用Persistent Volume和Persistent Volume Claim提供网络存储来持久化数据,从而支持有状态集群服务的部署。

本文将尝试根据社区提供的StatefulSet方案,对kafka和zookeeper服务进行部署。具体的部署过程包括以下几个部署: (1) Persistent Volume 存储的创建

(2) StatefulSet(Petset)资源的创建

(3) headless服务的创建

Persistent Volume存储的创建

PersistentVolume(PV)是集群之中的一块网络存储。跟 Node 一样,也是集群的资源,并且不属于特定的namespace。PV 跟 Volume (卷) 类似,不过会有独立于 Pod 的生命周期。

在有状态服务创建之前,需要先创建对应的PV存储。为了便于环境的搭建,本文PV存储的后端采用NFS。NFS服务的容器化部署,可以参考腾讯云容器服务帮助文档-搭建nfs服务器

$ kubectl get service
NAME         CLUSTER-IP       EXTERNAL-IP   PORT(S)                      AGE
kubernetes   172.16.255.1     <none>        443/TCP                      56m
nfsserver0   172.16.255.44    <none>        20048/TCP,111/TCP,2049/TCP   40m
nfsserver1   172.16.255.6     <none>        2049/TCP,20048/TCP,111/TCP   29m
nfsserver2   172.16.255.131   <none>        2049/TCP,20048/TCP,111/TCP   14m
nfsserver3   172.16.255.231   <none>        2049/TCP,20048/TCP,111/TCP   10m
nfsserver4   172.16.255.12    <none>        2049/TCP,20048/TCP,111/TCP   7m
nfsserver5   172.16.255.223   <none>        2049/TCP,20048/TCP,111/TCP   4m

基于NFS的PV创建示例如下:

  apiVersion: v1
  kind: PersistentVolume
  metadata:
    name: pv0001
    annotations:
        volume.beta.kubernetes.io/storage-class: "anything"
  spec:
    capacity:
      storage: 50Gi
    accessModes:
      - ReadWriteMany
    persistentVolumeReclaimPolicy: Recycle
    nfs:
      path: /exports
      server: 172.16.255.44

其中172.16.255.44为对应的nfs服务的IP。对应创建多个PV,zookeeper服务和kafka服务各3个。

$ kubectl get pv
NAME      CAPACITY   ACCESSMODES   RECLAIMPOLICY  STATUS     CLAIM   REASON  AGE
pv0001    50Gi       RWO           Recycle        Available                   5m
pv0002    50Gi       RWO           Recycle        Available                   5m
pv0003    50Gi       RWO           Recycle        Available                   5m
pv0004    50Gi       RWO           Recycle        Available                   2m
pv0005    50Gi       RWO           Recycle        Available                   2m
pv0006    50Gi       RWO           Recycle        Available                   2m

上面通过手动的方式创建了一个NFS Volume,这在管理很多Volume的时候很方便。kubernetes还提供了StorageClass来动态创建PV,可以大大节省了管理员的时间。作者在这里就不做对应的测试了。

zookeeper中StatefulSet(Petset)资源和headless服务的创建

StatefulSet是为了解决有状态服务的问题(对应Deployments和ReplicaSets是为无状态服务而设计),其应用场景包括

  • 稳定的持久化存储,即Pod重新调度后还是能访问到相同的持久化数据,基于PVC来实现
  • 稳定的网络标志,即Pod重新调度后其PodName和HostName不变,基于Headless Service(即没有Cluster IP的Service)来实现
  • 有序部署,有序扩展,即Pod是有顺序的,在部署或者扩展的时候要依据定义的顺序依次依序进行(即从0到N-1,在下一个Pod运行之前所有之前的Pod必须都是Running和Ready状态),基于init containers来实现
  • 有序收缩,有序删除(即从N-1到0)

在创建好对应的PV后,需要创建对应的StatefulSet(Petset)资源。由于本文使用的kubernetes为1.4.6版本,所以示例中采用的名称仍然为Petset。具体的创建zookeeper的Statefulset(Petset)的示例如下:

# A headless service to create DNS records
apiVersion: v1
kind: Service
metadata:
  annotations:
    service.alpha.kubernetes.io/tolerate-unready-endpoints: "true"
  name: zk
  labels:
    app: zk
spec:
  ports:
  - port: 2888
    name: peer
  - port: 3888
    name: leader-election
  # *.zk.default.svc.cluster.local
  clusterIP: None
  selector:
    app: zk
---
apiVersion: apps/v1alpha1
kind: PetSet
metadata:
  name: zoo
spec:
  serviceName: "zk"
  replicas: 3
  template:
    metadata:
      labels:
        app: zk
      annotations:
        pod.alpha.kubernetes.io/initialized: "true"
        pod.alpha.kubernetes.io/init-containers: '[
            {
                "name": "install",
                "image": "gcr.io/google_containers/zookeeper-install:0.1",
                "imagePullPolicy": "Always",
                "args": ["--version=3.5.0-alpha", "--install-into=/opt", "--work-dir=/work-dir"],
                "volumeMounts": [
                    {
                        "name": "opt",
                        "mountPath": "/opt/"
                    },
                    {
                        "name": "workdir",
                        "mountPath": "/work-dir"
                    }
                ]
            },
            {
                "name": "bootstrap",
                "image": "java:openjdk-8-jre",
                "command": ["/work-dir/peer-finder"],
                "args": ["-on-start=\"/work-dir/on-start.sh\"", "-service=zk"],
                "env": [
                  {
                      "name": "POD_NAMESPACE",
                      "valueFrom": {
                          "fieldRef": {
                              "apiVersion": "v1",
                              "fieldPath": "metadata.namespace"
                          }
                      }
                   }
                ],
                "volumeMounts": [
                    {
                        "name": "opt",
                        "mountPath": "/opt/"
                    },
                    {
                        "name": "workdir",
                        "mountPath": "/work-dir"
                    },
                    {
                        "name": "datadir",
                        "mountPath": "/tmp/zookeeper"
                    }
                ]
            }
        ]'
    spec:
      containers:
      - name: zk
        image: java:openjdk-8-jre
        ports:
        - containerPort: 2888
          name: peer
        - containerPort: 3888
          name: leader-election
        command:
        - /opt/zookeeper/bin/zkServer.sh
        args:
        - start-foreground
        readinessProbe:
          exec:
            command:
            - sh
            - -c
            - "/opt/zookeeper/bin/zkCli.sh ls /"
          initialDelaySeconds: 15
          timeoutSeconds: 5
        volumeMounts:
        - name: datadir
          mountPath: /tmp/zookeeper
        - name: opt
          mountPath: /opt/
      volumes:
      - name: opt
        emptyDir: {}
      - name: workdir
        emptyDir: {}
  volumeClaimTemplates:
  - metadata:
      name: datadir
      annotations:
        volume.beta.kubernetes.io/storage-class: anything
    spec:
      accessModes: [ "ReadWriteOnce" ]
      resources:
        requests:
          storage: 25Gi

描述文件中包括以下几个重要部分: (1) init-containers容器初始化环境信息和对应的配置文件信息

(2) zookeeper容器通过zkServer.sh启动

(3) 通过/opt/zookeeper/bin/zkCli.sh ls /进行健康检测

(4) volumeClaimTemplates声明对应的Persistent Volume Claim

(5) 创建对应的Headless Service

创建过程中查看具体的资源信息:

创建zookeeper服务

$ kubectl create -f zookeeper-petset.yaml
service "zk" created
petset "zoo" created

查看服务实例状态(从服务状态中可以看出,服务实例依次启动)

$ kubectl get po --watch-only
zoo-0     1/1       Running   0          1m
zoo-1     0/1       Pending   0         0s
zoo-1     0/1       Pending   0         0s
zoo-1     0/1       Init:0/2   0         0s
zoo-1     0/1       Init:0/2   0         6s
zoo-1     0/1       Init:1/2   0         25s
zoo-1     0/1       Init:1/2   0         26s
zoo-1     0/1       PodInitializing   0         29s
zoo-1     0/1       Running   0         30s
zoo-1     1/1       Running   0         51s
zoo-2     0/1       Pending   0         0s
zoo-2     0/1       Pending   0         0s
zoo-2     0/1       Init:0/2   0         0s
zoo-2     0/1       Init:0/2   0         6s
zoo-2     0/1       Init:1/2   0         33s
zoo-2     0/1       Init:1/2   0         34s
zoo-2     0/1       PodInitializing   0         37s
zoo-2     0/1       Running   0         38s
zoo-2     1/1       Running   0         1m

查看Persistent Volume Claim (PVC)信息

$ kubectl get pvc
NAME            STATUS    VOLUME    CAPACITY   ACCESSMODES   AGE
datadir-zoo-0   Bound     pv0004    50Gi       RWO           10m
datadir-zoo-1   Bound     pv0001    50Gi       RWO           10m
datadir-zoo-2   Bound     pv0003    50Gi       RWO           10m

查看Persistent Volume (PV)信息

$ kubectl get pv
NAME    CAPACITY ACCESSMODES RECLAIMPOLICY  STATUS    CLAIM                 REASON  AGE
pv0001  50Gi     RWO         Recycle        Bound     default/datadir-zoo-1         14h
pv0002  50Gi     RWO         Recycle        Available                               14h
pv0003  50Gi     RWO         Recycle        Bound     default/datadir-zoo-2         14h
pv0004  50Gi     RWO         Recycle        Bound     default/datadir-zoo-0         14h
pv0005  50Gi     RWO         Recycle        Available                               14h
pv0006  50Gi     RWO         Recycle        Available                               14h

查看服务信息

$ kubectl get service zk
NAME      CLUSTER-IP   EXTERNAL-IP   PORT(S)             AGE
zk        None         <none>        2888/TCP,3888/TCP   24m

查看Pod的状态信息

$ kubectl get pod -l app=zk -o wide 
NAME      READY     STATUS    RESTARTS   AGE       IP           NODE
zoo-0     1/1       Running   0          26m       172.16.0.5   10.0.0.33
zoo-1     1/1       Running   0          24m       172.16.2.4   10.0.0.40
zoo-2     1/1       Running   0          23m       172.16.0.6   10.0.0.33

查看dns中的域名信息

$  dig @172.16.0.3 zk.default.svc.cluster.local
;; ANSWER SECTION:
zk.default.svc.cluster.local. 30 IN    A    172.16.0.5
zk.default.svc.cluster.local. 30 IN    A    172.16.0.6
zk.default.svc.cluster.local. 30 IN    A    172.16.2.4

$  dig @172.16.0.3 zoo-0.zk.default.svc.cluster.local
;; ANSWER SECTION:
zoo-0.zk.default.svc.cluster.local. 30 IN A    172.16.0.5

$  dig @172.16.0.3 zoo-1.zk.default.svc.cluster.local
;; ANSWER SECTION:
zoo-1.zk.default.svc.cluster.local. 30 IN A    172.16.2.4

$  dig @172.16.0.3 zoo-2.zk.default.svc.cluster.local
;; ANSWER SECTION:
zoo-1.zk.default.svc.cluster.local. 30 IN A    172.16.2.4

查看服务实例内部状态

查看zookeeper配置信息

root@zoo-0:/opt/zookeeper/conf# cat zoo.cfg.dynamic 
server.1=zoo-0.zk.default.svc.cluster.local:2888:3888:participant;0.0.0.0:2181
server.2=zoo-1.zk.default.svc.cluster.local:2888:3888:participant;0.0.0.0:2181
server.3=zoo-2.zk.default.svc.cluster.local:2888:3888:participant;0.0.0.0:2181

root@zoo-0:/tmp/zookeeper# cat myid 
1

查看zookeeper集群状态

root@zoo-0:/opt/zookeeper/bin# ./zkCli.sh 

[zk: localhost:2181(CONNECTED) 0] ls /
[zookeeper]

kafka中StatefulSet(Petset)资源和headless服务的创建

在创建好zookeeper服务后,我们接下来创建kafka服务。kafka服务的创建过程和zookeeper服务类似。对应的示例文件如下:

---
apiVersion: v1
kind: Service
metadata:
  name: kafka-svc
  labels:
    app: kafka
spec:
  ports:
  - port: 9093
    name: server
  clusterIP: None
  selector:
    app: kafka
---
apiVersion: apps/v1alpha1
kind: PetSet
metadata:
  name: kafka
spec:
  serviceName: kafka-svc
  replicas: 3
  template:
    metadata:
      labels:
        app: kafka
      annotations:
        pod.alpha.kubernetes.io/initialized: "true"
    spec:
      terminationGracePeriodSeconds: 0
      dnsPolicy: ClusterFirst
      restartPolicy: Always
      serviceAccount: ""
      containers:
      - name: k8skafka
        imagePullPolicy: Always
        image: gcr.io/google_samples/k8skafka:v1
        resources:
          requests:
            memory: "1Gi"
            cpu: 500m
        ports:
        - containerPort: 9093
          name: server
        command:
        - sh
        - -c
        - "exec kafka-server-start.sh /opt/kafka/config/server.properties --override broker.id=${HOSTNAME##*-} \
          --override listeners=PLAINTEXT://:9093 \
          --override zookeeper.connect=zoo-0.zk.default.svc.cluster.local:2181,zoo-1.zk.default.svc.cluster.local:2181,zoo-2.zk.default.svc.cluster.local:2181 \
          --override log.dir=/var/lib/kafka \
          --override auto.create.topics.enable=true \
          --override auto.leader.rebalance.enable=true \
          --override background.threads=10 \
          --override compression.type=producer \
          --override delete.topic.enable=false \
          --override leader.imbalance.check.interval.seconds=300 \
          --override leader.imbalance.per.broker.percentage=10 \
          --override log.flush.interval.messages=9223372036854775807 \
          --override log.flush.offset.checkpoint.interval.ms=60000 \
          --override log.flush.scheduler.interval.ms=9223372036854775807 \
          --override log.retention.bytes=-1 \
          --override log.retention.hours=168 \
          --override log.roll.hours=168 \
          --override log.roll.jitter.hours=0 \
          --override log.segment.bytes=1073741824 \
          --override log.segment.delete.delay.ms=60000 \
          --override message.max.bytes=1000012 \
          --override min.insync.replicas=1 \
          --override num.io.threads=8 \
          --override num.network.threads=3 \
          --override num.recovery.threads.per.data.dir=1 \
          --override num.replica.fetchers=1 \
          --override offset.metadata.max.bytes=4096 \
          --override offsets.commit.required.acks=-1 \
          --override offsets.commit.timeout.ms=5000 \
          --override offsets.load.buffer.size=5242880 \
          --override offsets.retention.check.interval.ms=600000 \
          --override offsets.retention.minutes=1440 \
          --override offsets.topic.compression.codec=0 \
          --override offsets.topic.num.partitions=50 \
          --override offsets.topic.replication.factor=3 \
          --override offsets.topic.segment.bytes=104857600 \
          --override queued.max.requests=500 \
          --override quota.consumer.default=9223372036854775807 \
          --override quota.producer.default=9223372036854775807 \
          --override replica.fetch.min.bytes=1 \
          --override replica.fetch.wait.max.ms=500 \
          --override replica.high.watermark.checkpoint.interval.ms=5000 \
          --override replica.lag.time.max.ms=10000 \
          --override replica.socket.receive.buffer.bytes=65536 \
          --override replica.socket.timeout.ms=30000 \
          --override request.timeout.ms=30000 \
          --override socket.receive.buffer.bytes=102400 \
          --override socket.request.max.bytes=104857600 \
          --override socket.send.buffer.bytes=102400 \
          --override unclean.leader.election.enable=true \
          --override zookeeper.session.timeout.ms=6000 \
          --override zookeeper.set.acl=false \
          --override broker.id.generation.enable=true \
          --override connections.max.idle.ms=600000 \
          --override controlled.shutdown.enable=true \
          --override controlled.shutdown.max.retries=3 \
          --override controlled.shutdown.retry.backoff.ms=5000 \
          --override controller.socket.timeout.ms=30000 \
          --override default.replication.factor=1 \
          --override fetch.purgatory.purge.interval.requests=1000 \
          --override group.max.session.timeout.ms=300000 \
          --override group.min.session.timeout.ms=6000 \
          --override inter.broker.protocol.version=0.10.2-IV0 \
          --override log.cleaner.backoff.ms=15000 \
          --override log.cleaner.dedupe.buffer.size=134217728 \
          --override log.cleaner.delete.retention.ms=86400000 \
          --override log.cleaner.enable=true \
          --override log.cleaner.io.buffer.load.factor=0.9 \
          --override log.cleaner.io.buffer.size=524288 \
          --override log.cleaner.io.max.bytes.per.second=1.7976931348623157E308 \
          --override log.cleaner.min.cleanable.ratio=0.5 \
          --override log.cleaner.min.compaction.lag.ms=0 \
          --override log.cleaner.threads=1 \
          --override log.cleanup.policy=delete \
          --override log.index.interval.bytes=4096 \
          --override log.index.size.max.bytes=10485760 \
          --override log.message.timestamp.difference.max.ms=9223372036854775807 \
          --override log.message.timestamp.type=CreateTime \
          --override log.preallocate=false \
          --override log.retention.check.interval.ms=300000 \
          --override max.connections.per.ip=2147483647 \
          --override num.partitions=1 \
          --override producer.purgatory.purge.interval.requests=1000 \
          --override replica.fetch.backoff.ms=1000 \
          --override replica.fetch.max.bytes=1048576 \
          --override replica.fetch.response.max.bytes=10485760 \
          --override reserved.broker.max.id=1000 "
        env:
        - name: KAFKA_HEAP_OPTS
          value : "-Xmx512M -Xms512M"
        - name: KAFKA_OPTS
          value: "-Dlogging.level=INFO"
        volumeMounts:
        - name: datadir
          mountPath: /var/lib/kafka
        readinessProbe:
          exec:
           command: 
            - sh 
            - -c 
            - "/opt/kafka/bin/kafka-broker-api-versions.sh --bootstrap-server=localhost:9093"
  volumeClaimTemplates:
  - metadata:
      name: datadir
      annotations:
        volume.beta.kubernetes.io/storage-class: anything
    spec:
      accessModes: [ "ReadWriteOnce" ]
      resources:
        requests:
          storage: 50Gi

创建kafka服务

$ kubectl create -f kafka-petset.yaml
service "kafka-svc" created
petset "kafka" created

查看Pod的状态信息

$ kubectl get pod -l app=kafka -o wide 
NAME      READY     STATUS    RESTARTS   AGE       IP           NODE
kafka-0   1/1       Running   0          1h        172.16.0.7   10.0.0.33
kafka-1   1/1       Running   0          1h        172.16.2.5   10.0.0.40
kafka-2   1/1       Running   0          1h        172.16.1.5   10.0.0.45

其他信息的变化和zookeeper服务创建过程类似,限于篇幅关系,这里不再展示。

进行kafka集群可用性测试

创建topic测试

root@kafka-0:/opt/kafka/config# kafka-topics.sh --create \
> --topic test \
> --zookeeper zoo-0.zk.default.svc.cluster.local:2181,zoo-1.zk.default.svc.cluster.local:2181,zoo-2.zk.default.svc.cluster.local:2181 \
> --partitions 3 \
> --replication-factor 2

Created topic "test".

创建生产消费测试

root@kafka-0:/opt/kafka/config# kafka-console-consumer.sh --topic test --bootstrap-server localhost:9093

root@kafka-1:/# kafka-console-producer.sh --topic test --broker-list localhost:9093                                                             
I like kafka
hello world

#在消费者侧显示为:
I like kafka
hello world

kafka进行扩容测试

创建新的nfsserver和pv

# kubectl get service -l qcloud-app=nfsserver
NAME        CLUSTER-IP      EXTERNAL-IP   PORT(S)                      AGE
nfsserver   172.16.255.18   <none>        2049/TCP,20048/TCP,111/TCP   32m

# kubectl get pv  pv0007
NAME    CAPACITY  ACCESSMODES RECLAIMPOLICY   STATUS    CLAIM   REASON    AGE
pv0007  50Gi      RWO         Recycle         Available                   14s

由于创建过程在前文已经描述过,这里不再进行详述创建过程。

kafka副本数修改

#  kubectl scale petset kafka --replicas=4
petset "kafka" scaled

#  kubectl get pod -l app=kafka -o wide 
NAME      READY     STATUS    RESTARTS   AGE       IP           NODE
kafka-0   1/1       Running   0          1h        172.16.0.7   10.0.0.33
kafka-1   1/1       Running   0          1h        172.16.2.5   10.0.0.40
kafka-2   1/1       Running   0          1h        172.16.1.5   10.0.0.45
kafka-3   1/1       Running   0          1m        172.16.0.8   10.0.0.33

总结

本文详细的介绍了基于Statefulset(PetSet)+Persistent Volume的方式,部署有状态的集群服务zookeeper和kafka。总体上看,基于Statefulset+PV的方式很好的解决了kafka和zookeeper服务需要存储盘来保存信息,同时每个实例需要特定的Id标记的问题。但目前PV的创建过程,以及Statefulset的启动过程,相对于来说还是比较复杂。另外基于StatefulSet(PetSet)的扩容和升级对于实例的变更顺序依然有比较大的依赖,kubernetes社区也在做进一步探索。

原创声明,本文系作者授权云+社区-专栏发表,未经许可,不得转载。

如有侵权,请联系 yunjia_community@tencent.com 删除。

编辑于

我来说两句

1 条评论
登录 后参与评论

相关文章

来自专栏浪淘沙

ZooKeeper学习

2.zookeeper本身就是一个分布式程序,(只要有半数节点存活,就能正常服务。适合奇数节点)

494
来自专栏pangguoming

在springboot项目中使用mybatis 集成 Sharding-JDBC

前段时间写了篇如何使用Sharding-JDBC进行分库分表的例子,相信能够感受到Sharding-JDBC的强大了,而且使用配置都非常干净。官方支持的功能还包...

1082
来自专栏菩提树下的杨过

Spring Security笔记:使用数据库进行用户认证(form login using database)

在前一节,学习了如何自定义登录页,但是用户名、密码仍然是配置在xml中的,这样显然太非主流,本节将学习如何把用户名/密码/角色存储在db中,通过db来实现用户认...

731
来自专栏「3306 Pai」社区

深入理解MySQL 5.7 GTID系列(九):实际案例一

https://mp.weixin.qq.com/s/XSnFkuYzIlGWMaXIl-oPeQ

500
来自专栏.net core新时代

使用TaskManager爬取2万条代理IP实现自动投票功能

  话说某天心血来潮想到一个问题,朋友圈里面经常有人发投票链接,让帮忙给XX投票,以前呢会很自觉打开链接帮忙投一票。可是这种事做多了就会考虑能不能使用工具来进行...

18910
来自专栏SDNLAB

OpenStack Neutron中的DVR简介与OVS流表分析

本文主要介绍DVR的概念,比较了DVR和非DVR情况下,数据在network节点上的流量变化。同时也介绍了在OpenStack里面如何配置DVR,比较详细地介绍...

34210
来自专栏dalaoyang

SpringBoot集成Druid监控

druid是开源的数据库连接池,提供了优秀的对数据库操作的监控功能,本文要讲解一下springboot项目怎么集成druid。 本文在基于jpa的项目下开发,首...

3537
来自专栏杨建荣的学习笔记

关于dblink锁定带来的问题(r3笔记第20天)

可能在一些分布式环境中,有一些数据访问都需要用到db link。从某种程度上来说dblink是很方便,但是从性能上来说还是有一些的隐患。如果两个环境之间的网络情...

2635
来自专栏.NET开发者社区

一步一步创建ASP.NET MVC5程序[Repository+Autofac+Automapper+SqlSugar](十一)

前言 小伙伴们, 大家好,我是Rector。 最近Rector忙于换工作,没有太多时间来更新我们的ASP.NET MVC 5系列文章 [一步一步创建ASP....

3096
来自专栏Java成神之路

分布式_事务_02_2PC框架raincat源码解析

上一节已经将raincat demo工程运行起来了,这一节来分析下raincat的源码

681

扫码关注云+社区