其实最简单和合适的方法就是通过 operator 来安装这种具备自己一套机制的高可用应用,这里只是尝试用一种比较清晰的方式部署 kafka 来让大家了解 k8s 和 kafka 的运作机制。就不多 BB,直接上YAML结构:
主要这四个部分,分别涉及了 zk 和 kafka 的部署安装和对外通讯
这里用的是 Bitnami 的 zookeeper 镜像,说明文档还比较全,下面是deployment文件:
kind: Deployment
apiVersion: apps/v1
metadata:
name: zookeeper-1
namespace: rcmd
spec:
replicas: 1
selector:
matchLabels:
app: zookeeper-1
template:
metadata:
labels:
app: zookeeper-1
spec:
containers:
- name: zookeeper-1
image: bitnami/zookeeper:3.6.2
env:
- name: ALLOW_ANONYMOUS_LOGIN
value: "yes"
- name: ZOO_SERVER_ID
value: "1"
- name: ZOO_SERVERS
value: 0.0.0.0:2888:3888,zookeeper-2:2888:3888,zookeeper-3:2888:3888
---
kind: Deployment
apiVersion: apps/v1
metadata:
name: zookeeper-2
namespace: rcmd
spec:
replicas: 1
selector:
matchLabels:
app: zookeeper-2
template:
metadata:
labels:
app: zookeeper-2
spec:
containers:
- name: zookeeper-2
image: bitnami/zookeeper:3.6.2
env:
- name: ALLOW_ANONYMOUS_LOGIN
value: "yes"
- name: ZOO_SERVER_ID
value: "2"
- name: ZOO_SERVERS
value: zookeeper-1:2888:3888,0.0.0.0:2888:3888,zookeeper-3:2888:3888
---
kind: Deployment
apiVersion: apps/v1
metadata:
name: zookeeper-3
namespace: rcmd
spec:
replicas: 1
selector:
matchLabels:
app: zookeeper-3
template:
metadata:
labels:
app: zookeeper-3
spec:
containers:
- name: zookeeper-3
image: bitnami/zookeeper:3.6.2
env:
- name: ALLOW_ANONYMOUS_LOGIN
value: "yes"
- name: ZOO_SERVER_ID
value: "3"
- name: ZOO_SERVERS
value: zookeeper-1:2888:3888,zookeeper-2:2888:3888,0.0.0.0:2888:3888
services:
apiVersion: v1
kind: Service
metadata:
name: zookeeper-1
namespace: rcmd
labels:
app: zookeeper-1
spec:
ports:
- name: client
port: 2181
protocol: TCP
targetPort: 2181
- name: follower
port: 2888
protocol: TCP
targetPort: 2888
- name: leader
port: 3888
protocol: TCP
targetPort: 3888
selector:
app: zookeeper-1
---
apiVersion: v1
kind: Service
metadata:
name: zookeeper-2
namespace: rcmd
labels:
app: zookeeper-2
spec:
ports:
- name: client
port: 2181
protocol: TCP
targetPort: 2181
- name: follower
port: 2888
protocol: TCP
targetPort: 2888
- name: leader
port: 3888
protocol: TCP
targetPort: 3888
selector:
app: zookeeper-2
---
apiVersion: v1
kind: Service
metadata:
name: zookeeper-3
namespace: rcmd
labels:
app: zookeeper-3
spec:
ports:
- name: client
port: 2181
protocol: TCP
targetPort: 2181
- name: follower
port: 2888
protocol: TCP
targetPort: 2888
- name: leader
port: 3888
protocol: TCP
targetPort: 3888
selector:
app: zookeeper-3
kafka 镜像用的是 wurstmeister 的 kafka 镜像
deployment:
kind: Deployment
apiVersion: apps/v1
metadata:
name: kafka-broker0
namespace: rcmd
spec:
replicas: 1
selector:
matchLabels:
app: kafka
id: "kafka-broker0"
template:
metadata:
labels:
app: kafka
id: "kafka-broker0"
spec:
containers:
- name: kafka
image: "wurstmeister/kafka:2.12-2.5.0"
imagePullPolicy: "IfNotPresent"
env:
- name: KAFKA_ADVERTISED_LISTENERS
value: "INSIDE://kafka-broker0:9092,OUTSIDE://kafka-broker0.rcmd.testing.mpengine:10000"
- name: KAFKA_LISTENERS
value: "INSIDE://:9092,OUTSIDE://:10000"
- name: KAFKA_LISTENER_SECURITY_PROTOCOL_MAP
value: "INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT"
- name: KAFKA_INTER_BROKER_LISTENER_NAME
value: "INSIDE"
- name: KAFKA_ZOOKEEPER_CONNECT
value: zookeeper-1:2181
- name: KAFKA_BROKER_ID
value: "0"
- name: KAFKA_CREATE_TOPICS
value: mp_post_slog:1:1
这里需注意说明下的是 advertised 相关的参数,advertised 会声明一个 broker 可访问地址,这里申明了两个,用于集群内和集群外访问使用
kafka broker services:
apiVersion: v1
kind: Service
metadata:
name: kafka-broker0
labels:
name: kafka
namespace: rcmd
spec:
ports:
- port: 9092
name: kafka-port
protocol: TCP
targetPort: 9092
- port: 10000
name: external-port
protocol: TCP
targetPort: 10000
selector:
app: kafka
id: "kafka-broker0"
type: ClusterIP
这里采用 tcp 类型的 gateway:
apiVersion: networking.istio.io/v1beta1
kind: Gateway
metadata:
name: external-gateway
namespace: rcmd
spec:
selector:
istio: ingressgateway-external
servers:
- hosts:
- '*.rcmd.testing.mpengine'
port:
name: kafka
number: 10000
protocol: TCP
virtual service:
apiVersion: networking.istio.io/v1beta1
kind: VirtualService
metadata:
name: kafka-router
namespace: rcmd
spec:
gateways:
- external-gateway
hosts:
- "kafka-broker0.rcmd.testing.mpengine"
tcp:
- match:
- port: 10000
route:
- destination:
host: kafka-broker0.rcmd.svc.cluster.local
port:
number: 10000
用 python-kafka 在集群歪测试(这里已经做了域名DNS), 生产者:
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers='kafka-broker0.rcmd.testing.mpengine:10000')
for _ in range(10):
producer.send('mp_post_slog', key=b'ping', value=b'bar')
producer.flush()
消费者:
from kafka import KafkaConsumer
consumer = KafkaConsumer('mp_post_slog', bootstrap_servers='kafka-broker0.rcmd.testing.mpengine:10000', group_id='my_favorite_group')
for msg in consumer:
print(msg)