导语
Apache Pulsar是Yahoo开源的MQ解决方案,功能上跟Kafka、RocketMQ、TubeMQ等类似,同时支持多租户、读写分离、跨地域复制等特性。联邦学习作为新一代人工智能基础技术,通过解决数据隐私与数据孤岛问题,重塑金融、医疗、城市安防等领域。本文将介绍Pulsar在Angel PowerFL 联邦学习平台中的应用,探索MQ和联邦学习的跨界合作过程。
01
背 景
Angel PowerFL联邦学习平台及其通信模块要求
Angel PowerFL联邦学习平台构建在Angel之上,利用AngelPS支持万亿级模型训练的能力,将很多在Worker上的计算提升到PS端;Angel PowerFL为联邦学习算法提供了计算、加密、存储、状态同步等基本操作接口,通过流程调度模块协调参与方任务执行状态,而通信模块完成了任务训练 过程当中所有数据的传输。Angel PowerFL联邦学习已经在金融云、广告联合建模等业务中开始落地,并取得初步的效果。Angel PowerFL系统架构图如下:
Angel PowerFL的学习任务在训练过程当中,参与方之间会有大量的加密数据通过通信模块传输,Angel PowerFL对通信模块有以下要求:
Pulsar及Pulsar GeoReplication
Pulsar是新一代消息队列系统,架构上做了计算与存储的分离,MQ的逻辑主要放在Pulsar Broker完成,存储层使用Apache BookKeeper作为分布式一致性存储,相比于传统MQ的一些优势:
相比较传统的MQ解决方案,针对跨越多个数据中心的多Pulsar集群,Pulsar提供了地域复制功能,即Pulsar GeoReplication。GeoReplication同时支持同步地域复制和异步地域复制,甚至可以在Message级别通过 setReplicationClusters控制消息复制到哪些集群。
在上图中,无论Producer P1、P2和P3在什么时候分别将消息发布给Cluster A、Cluster B和Cluster C中的topic T1,这些消息均会立刻复制到整个集群。一旦完成复制,Consumer C1和C2即可从自己所在的集群消费这些消息。
02
基于Pulsar的Angel PowerFL通信模块实现
参与联邦学习的各个业务(Angel PowerFL称之为Party,每个Party有不同的ID,比方说 10000/20000),可能分布在同个公司的不同部门(无网络隔离),也可能分布在不同公司(跨公网),各个Party之间通过Pulsar GeoReplication进行同步复制,总的设计方案如下:
联邦学习的每个训练任务,通过消息生产者(Prouder)和消费者(Consumer)连接所在Party的Pulsar集群,集群名以fl-pulsar-[partyID] 进行区分,训练任务产生需要传输的中间数据后,生产者负责将这些数据发送给本地Pulsar集群。Pulsar集群收到数据后,通过Pulsar Proxy建立的同步复制网络通道,将数据发送给使用方Party。而使用方Party的消费者,会一直监听该训练任务对应的Topic,当有数据达到后,直接消费数据进行下一步的计算。
Angel PowerFL支持多方联邦,意味着存在大于2个Pulsar集群开启了同步复制。每个联邦学习任务,通过各自的parties任务参数指定了参与方,生产者在发送消息时调用 setReplicationClusters接口,保证了数据只在参与party之间传输。
使用Pulsar作为Angel PowerFL的通信模块,在方案的实现的过程当中,使用了Pulsar很多其它特性,同时也做了些优化,包括GeoReplication去掉Global Zookeeper依赖、Client增加Token认证、多集群Topic自动回收等内容,下面将一一具体介绍。
GeoReplication去掉Global Zookeeper依赖
一个完整的Pulsar的部署,依赖两个ZooKeeper集群,分别是Local ZooKeeper和Global ZooKeeper。Local ZooKeeper和Kafka中的ZooKeeper作用类似,用来存储元数据。而 Global ZooKeeper则是用于Pulsar多集群,便于多个集群间的配置信息共享。
在Angel PowerFL的场景中,每个Party在加入前,都要先部署一个Global ZooKeeper的子节点,或者共用一套跨公司或跨地域的公共ZooKeeper,这样不仅会增加部署的难度,也会增加被攻击的风险,在推广上不利于说服新Party的加入。
Global ZooKeeper中存储的元数据,主要是集群名/服务地址/namespace权限等信息,并且Pulsar支持新集群的创建和加入。我们通过以下两个步骤注册联邦Pulsar集群的信息到Local ZooKeeper,就去除了对Global Zookeeper的依赖。
./bin/pulsar-admin clusters create ${OTHER_CLUSTER_NAME} \ --url http://${OTHER_CLUSTER_HTTP_URL} \ --broker-url pulsar://${OTHER_CLUSTER_BROKER_URL}
./bin/pulsar-admin namespaces set-clusters fl-tenant/${namespace} \ -clusters ${LOCAL_CLUSTR_NAME},${OTHER_CLUSTER_NAME}
对于新加入的Party,只用提供对应Pulsar的集群名/服务地址即可完成注册,GeoReplication就可以通过注册信息进行同步复制。
Client增加Token认证
Pulsar作为Angel PowerFL的通信模块,没有加入用户级别的权限控制,但为了进一步保证Client生产和消费数据的安全,参考Pulsar Token authentication(请复制链接到浏览器查看:https://pulsar.apache.org/docs/en/security-token-admin/)增加了Token认证,Angel PowerFL的训练任务除了配置当前party使用的服务地址外,还需要配置admin token。由于Angel PowerFL整套系统是有kubernetes部署的,我们通过容器生成Pulsar集群需要的Public/Private keys:
# fl-private.key and fl-public.keydocker run --rm -v "$(pwd)":/tmp \ apachepulsar/pulsar-all:2.5.2 \ /pulsar/bin/pulsar tokens create-key-pair --output-private-key \ /tmp/fl-private.key --output-public-key /tmp/fl-public.key
# admin-token.txt token fileecho -n `docker run --rm -v \ "$(pwd)":/tmp apachepulsar/pulsar-all:2.5.2 \ /pulsar/bin/pulsar tokens create --private-key \ file:///tmp/fl-private.key --subject admin` # prepare files for pulsar podskubectl create secret generic token-symmetric-key \ --from-file=TOKEN=admin-token.txt \ --from-file=PUBLICKEY=fl-public.key -n ${PARTY_NAME}
多集群Topic自动回收
由于Pulsar集群开启了GeoReplication功能后,无法通过命令直接删除用过的Topic,而Angel PowerFL训练任务每次使用过的task是一次性的,任务结束这些Topic就没用了,如果不及时删除就会出现大量累积。
Pulsar对于GeoReplication的Topic,可以通过 brokerDeleteInactiveTopicsEnabled配置开启 Topic自动回收,自动回收无用的Topic,需满足以下几个条件:
Angel PowerFL部署的Pulsar集群,通过 brokerDeleteInactiveTopicsEnabled开启了Topic自动回收,在训练任务执行过程当中,每个Topic在使用完后就按回收条件进行了处置。同时,我们增加
brokerDeleteInactiveTopicsFrequencySeconds配置将回收的频率设置为3小时。
开启Topic限流
Angel PowerFL中的训练任务,在不同的数据集/算法/执行阶段,生产数据的流量峰值也不同(生产环境目前观察到,单个任务最大的数据量超过200G/小时),如果训练过程中出现Pulsar断连或者生产消费的异常,整个训练任务都要重新跑。
为了让Pulsar集群不被单个训练任务冲垮,我们使用了Pulsar的限流功能。Pulsar支持 messagerate和byte-rate两种生产限流策略,前者是限制每秒生产message的条数,后者是限制每秒生产的message大小。由于Angel PowerFL会将数据切分成4M大小的Message,最好的选择是通过messagerate限制生产Message的条数。在Angel PowerFL中,我们将namespace的message限制为30条(小于<30*4=120M/s):
./bin/pulsar-admin namespaces set-publish-rate fl-tenant/${namespace} -m 30
刚开始测试message-rate的限流功能时,出现了限不住的情况(限流设置失效),经过TDbank 团队负责Pulsar的同事一起排查,优化了限流功能并贡献到了社区(topic publish rate limit not take effect,请复制链接到浏览器查看详情:
https://github.com/apache/pulsar/issues/6975)。
优化Topic Unloading配置
Pulsar基于Broker集群负载状况,可以将Topic 动态分配到Broker上。如果拥有该Topic的Broker宕机,或者拥有该Topic的Broker负载过大,则该Topic将立即重新分配给另一个Broker ,而重新分配的过程就是Topic的Unloading,该操作意味着关闭Topic,释放所有者。
理论上,Topic Unloading是种正常的负载均衡调整,客户端将经历极小的延迟抖动,通常耗时 10ms左右。但Angel PowerFL初期版本在跑训练任务时,日志爆出大量因为Unloading Topic导致的连接异常,日志显示在不断的重试,但都不成功:
[sub] Could not get connection to broker: Topic is temporarily unavailabl e -- Will try again in 0.1 s
先来看Broker/Namespace/Bundle/Topic三者的关系。Bundle是Pulsar Namespace的一个分片机制,Namespace被分片为Bundles列表,每个Bundle包含Namespace的整个哈希范围的一部分。Topic不是直接分配给broker的,而是通过计算Topic的哈希码来确定把Topic分配给特定的Bundle,每个Bundle都是互相独立,再被分配到不同的Broker上。
Angel PowerFL早期版本的任务Topic没有复用,一个LR算法训练任务创建超过2000个Topic,并且每个topic生产的数据负载也不同,我们判断短时间大量创建topic并且负载不均衡导致频繁Topic Unloading。为了降低Topic Unloading的频率,对Pulsar的Bundle相关参数做了调整:
# 增加broker可分配最大Topic数量loadBalancerBrokerMaxTopics=500000# 启用自动拆分namespace bundleloadBalancerAutoBundleSplitEnabled=true# 增加触发拆分bundle的topic数量loadBalancerNamespaceBundleMaxTopics=10000# 增加触发拆分bundle的消息数loadBalancerNamespaceBundleMaxMsgRate=10000
同时,在创建namespace设置默认bundles数量为64:
./bin/pulsar-admin namespaces create fl-tenant/${namespace} --bundles 64
经过以上调整,Angel PowerFL在任务执行期间没有再出现过由于Topic Unloading导致的断连情况。
Pulsar On Kubernetes
Angel PowerFL的所有服务均通过Helm部署在Kubernetes上,Pulsar作为其中的一个Chart,可以很好的利用k8s的资源隔离、快速扩缩容等特性。在Angel PowerFL使用Helm部署Pulsar的实践中,我们总结了以下几个经验:
以上就是本文的全部内容了,大家如果遇到了相关的技术问题,欢迎在文章下方留言。
以“#你问我答#+提问内容”的形式留言提问,就有机会得到专家回复,还将获得腾讯视频VIP月卡一张哦!
扫码关注 | 即刻了解腾讯大数据技术动态