概述
腾讯云也有商业版的 Pulsar 产品,即 TDMQ for Pulsar。本文举例介绍配置基于 TDMQ for Pulsar 消息队列中未消费的消息数量进行水平伸缩。当然,如果您使用的是自建的开源 Apache Pulsar,配置方法也是类似的。
操作步骤
下面使用 pulsar-demo 来模拟 Pulsar 生产者和消费者,再结合 KEDA 配置实现 Pulsar 消费者基于 Pulsar 消息数量的水平伸缩,在实际使用中,可根据自己的情况进行相应替换。
1. 获取 Pulsar API 调用地址
1. 在 Pulsar 集群管理页面 找到需要使用的 Pulsar 集群,单击接入地址可获取 Pulsar 的 URL,通常使用 VPC 内网接入地址(解析出来是169保留网段的 IP,在任意 VPC 都可用)。如下图所示:


2. 复制并记录 API 调用地址。
获取 Pulsar Topic


注意:
只支持持久化类型的 Topic,配置所需的 Topic 是在已复制的 Topic 名称前面加
persistent://
。获取 Pulsar JWT Token
1. 确保在 Pulsar 角色管理 创建了所需的角色,并在 Pulsar 命名空间 中配置了相应的权限,确保所需角色有相应的生产消息或消费消息的权限。
2. 复制密钥,即 Pulsar 客户端所需的 JWT Token。如下图所示:


获取订阅名称
在 Topic 管理的消费者页面,根据需要,查看已有的订阅,或者新建订阅,并记录下需要使用的订阅名称。如下图所示:


部署生产者
1. 准备生产者配置,根据之前获取的 Pulsar 相关信息替换配置。示例如下:
apiVersion: v1kind: Secrettype: Opaquemetadata:name: producer-secretstringData:URL: http://pulsar-xxxxxxxxxxxx.tdmq.ap-cd.qcloud.tencenttdmq.com:5005 # 替换 API 调用地址TOPIC: persistent://pulsar-xxxxxxxxxxxx/test-ns/test-topic # 替换 TopicTOKEN: xxx # 替换角色密钥 (JWT Token)
2. 部署生产者持续发送新消息:
apiVersion: apps/v1kind: Deploymentmetadata:name: producerspec:replicas: 1selector:matchLabels:app: producertemplate:metadata:labels:app: producerspec:containers:- name: producerimage: imroc/pulsar-demo:mainimagePullPolicy: Alwaysargs:- producer- --produce-duration- 2s # 每 2s 生产一条消息envFrom:- secretRef:name: producer-secretterminationGracePeriodSeconds: 1
部署消费者
1. 准备消费者配置,根据前面获取的 Pulsar 相关信息替换配置。示例如下:
apiVersion: v1kind: Secrettype: Opaquemetadata:name: consumer-secretstringData:URL: http://pulsar-xxxxxxxxxxxx.tdmq.ap-cd.qcloud.tencenttdmq.com:5005 # 替换 API 调用地址TOPIC: persistent://pulsar-xxxxxxxxxxxx/test-ns/test-topic # 替换 TopicTOKEN: xxx # 替换角色密钥 (JWT Token)SUBSCRIPTION: xxx # 替换订阅名称
2. 通过 Deployment 部署消费者,持续消费消息:
apiVersion: apps/v1kind: Deploymentmetadata:name: consumerspec:replicas: 1selector:matchLabels:app: consumertemplate:metadata:labels:app: consumerspec:containers:- args:- consumer- --consume-duration- 10s # 单个消费者每 10s 处理完一条消息envFrom:- secretRef:name: consumer-secretimage: imroc/pulsar-demo:mainimagePullPolicy: Alwaysname: consumerterminationGracePeriodSeconds: 1
配置 ScaledObject
1. 先创建
TriggerAuthentication
并引用 consumer-secret
中的 TOKEN:apiVersion: keda.sh/v1alpha1kind: TriggerAuthenticationmetadata:name: consumer-authspec:secretTargetRef:- parameter: bearerTokenname: consumer-secretkey: TOKEN
2. 创建 ScaledObject(替换高亮行配置):
apiVersion: keda.sh/v1alpha1kind: ScaledObjectmetadata:name: consumer-scaledobjectspec:scaleTargetRef:apiVersion: apps/v1kind: Deploymentname: consumerpollingInterval: 15idleReplicaCount: 0 # 没有消息时缩到 0minReplicaCount: 1maxReplicaCount: 100triggers:- type: pulsarmetadata:adminURL: http://pulsar-xxxxxxxxxxxx.tdmq.ap-cd.qcloud.tencenttdmq.com:5005 # 替换 API 调用地址topic: persistent://pulsar-xxxxxxxxxxxx/test/persist-topic # 替换 Topicsubscription: my-sub # 替换订阅名称isPartitionedTopic: "true" # 如果分区数大于 1,这里就置为 truemsgBacklogThreshold: "5" # 伸缩阈值,副本数=CEIL(消息堆积数/msgBacklogThreshold)activationMsgBacklogThreshold: "1" # 如果当前副本数为 0,只要队列里来新消息了,就将副本置为 1 并启用伸缩authModes: bearer # 角色密钥(JWT Token)本质上是 bearer 的认证模式authenticationRef:name: consumer-auth # 引用前面创建的 TriggerAuthentication
查看 HPA
如果配置正确,会自动创建出对应的 HPA 资源。执行如下命令,查看 HPA。
$ kubectl get hpaNAME REFERENCE TARGETS MINPODS MAXPODS REPLICAS AGEkeda-hpa-consumer-scaledobject Deployment/consumer 4600m/5 (avg) 1 10 5 31m
说明:
在上述输出中,可以通过 "TARGETS" 字段反推当前消息堆积数量。以上述输出为例,堆积消息数为4.6 * 5 = 23。
ScaledJob + 超级节点
如果单条消息处理耗时较大,但又需要尽量及时获取处理结果,可以配置 ScaledJob,每当队列中有新消息时,将自动创建一个 Job 来消费,让 Job 的 Pod 调度到超级节点,以实现按需使用计算资源和按量计费。
触发器的配置对于 ScaledObject 与 ScaledJob 完全一致,如需配置 ScaledJob,可参考 ScaledObject 的配置。