前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >助力联邦——­Pulsar在Angel PowerFL联邦学习平台中的应用

助力联邦——­Pulsar在Angel PowerFL联邦学习平台中的应用

作者头像
腾讯大数据
发布2020-08-06 10:40:14
1.2K0
发布2020-08-06 10:40:14
举报

导语

Apache Pulsar是Yahoo开源的MQ解决方案,功能上跟Kafka、RocketMQ、TubeMQ等类似,同时支持多租户、读写分离、跨地域复制等特性。联邦学习作为新一代人工智能基础技术,通过解决数据隐私与数据孤岛问题,重塑金融、医疗、城市安防等领域。本文将介绍Pulsar在Angel PowerFL 联邦学习平台中的应用,探索MQ和联邦学习的跨界合作过程。

01

背  景

Angel PowerFL联邦学习平台及其通信模块要求

Angel PowerFL联邦学习平台构建在Angel之上,利用Angel­PS支持万亿级模型训练的能力,将很多在Worker上的计算提升到PS端;Angel PowerFL为联邦学习算法提供了计算、加密、存储、状态同步等基本操作接口,通过流程调度模块协调参与方任务执行状态,而通信模块完成了任务训练 过程当中所有数据的传输。Angel PowerFL联邦学习已经在金融云、广告联合建模等业务中开始落地,并取得初步的效果。Angel PowerFL系统架构图如下:

Angel PowerFL的学习任务在训练过程当中,参与方之间会有大量的加密数据通过通信模块传输,Angel PowerFL对通信模块有以下要求:

  • 稳定可靠 Angel PowerFL的学习任务时长从几分钟到几小时的都有,算法执行对数据的质量要求很高,不同算法的数据传输峰值也不一样,这需要通信模块的服务足够的稳定,并且不能丢数据。
  • 高性能传输 Angel PowerFL底层通过Spark进行计算,Executor并发执行会产生很多待传输的中间数据,通信模块需要将这些加密后的数据及时传输到对方,这就要求通信服务延时低、吞吐量尽可能高。
  • 数据安全 虽然Angel PowerFL所有数据都通过加密模块进行了加密,但参与联邦学习的业务可能分布在不同公司,跨公网进行传输,需要通信模块足够安全,不易被攻击。

Pulsar及Pulsar Geo­Replication

Pulsar是新一代消息队列系统,架构上做了计算与存储的分离,MQ的逻辑主要放在Pulsar Broker完成,存储层使用Apache BookKeeper作为分布式一致性存储,相比于传统MQ的一些优势:

  • Broker和Bookie相互独立,可以独立的扩展,独立的容错,提升系统的可用性
  • 分区存储不受限于单个节点存储容量,数据分布更加均匀
  • Bookkeeper提供可靠的存储,保证消息不丢失

相比较传统的MQ解决方案,针对跨越多个数据中心的多Pulsar集群,Pulsar提供了地域复制功能,即Pulsar Geo­Replication。Geo­Replication同时支持同步地域复制异步地域复制,甚至可以在Message级别通过 setReplicationClusters控制消息复制到哪些集群。

在上图中,无论Producer P1、P2和P3在什么时候分别将消息发布给Cluster A、Cluster B和Cluster C中的topic T1,这些消息均会立刻复制到整个集群。一旦完成复制,Consumer C1和C2即可从自己所在的集群消费这些消息。

02

基于Pulsar的Angel PowerFL通信模块实现

参与联邦学习的各个业务(Angel PowerFL称之为Party,每个Party有不同的ID,比方说 10000/20000),可能分布在同个公司的不同部门(无网络隔离),也可能分布在不同公司(跨公网),各个Party之间通过Pulsar Geo­Replication进行同步复制,总的设计方案如下:

联邦学习的每个训练任务,通过消息生产者(Prouder)和消费者(Consumer)连接所在Party的Pulsar集群,集群名以fl-pulsar-[partyID] 进行区分,训练任务产生需要传输的中间数据后,生产者负责将这些数据发送给本地Pulsar集群。Pulsar集群收到数据后,通过Pulsar Proxy建立的同步复制网络通道,将数据发送给使用方Party。而使用方Party的消费者,会一直监听该训练任务对应的Topic,当有数据达到后,直接消费数据进行下一步的计算。

Angel PowerFL支持多方联邦,意味着存在大于2个Pulsar集群开启了同步复制。每个联邦学习任务,通过各自的parties任务参数指定了参与方,生产者在发送消息时调用 setReplicationClusters接口,保证了数据只在参与party之间传输。

使用Pulsar作为Angel PowerFL的通信模块,在方案的实现的过程当中,使用了Pulsar很多其它特性,同时也做了些优化,包括Geo­Replication去掉Global Zookeeper依赖、Client增加Token认证、多集群Topic自动回收等内容,下面将一一具体介绍。

Geo­Replication去掉Global Zookeeper依赖

一个完整的Pulsar的部署,依赖两个ZooKeeper集群,分别是Local ZooKeeper和Global ZooKeeper。Local ZooKeeper和Kafka中的ZooKeeper作用类似,用来存储元数据。而 Global ZooKeeper则是用于Pulsar多集群,便于多个集群间的配置信息共享。

在Angel PowerFL的场景中,每个Party在加入前,都要先部署一个Global ZooKeeper的子节点,或者共用一套跨公司或跨地域的公共ZooKeeper,这样不仅会增加部署的难度,也会增加被攻击的风险,在推广上不利于说服新Party的加入。

Global ZooKeeper中存储的元数据,主要是集群名/服务地址/namespace权限等信息,并且Pulsar支持新集群的创建和加入。我们通过以下两个步骤注册联邦Pulsar集群的信息到Local ZooKeeper,就去除了对Global Zookeeper的依赖。

  • 注册新加入Party Pulsar集群
代码语言:javascript
复制
./bin/pulsar-admin clusters create ${OTHER_CLUSTER_NAME} \    --url http://${OTHER_CLUSTER_HTTP_URL} \    --broker-url pulsar://${OTHER_CLUSTER_BROKER_URL}
  • 授予namespace权限
代码语言:javascript
复制
./bin/pulsar-admin namespaces set-clusters fl-tenant/${namespace} \     -clusters ${LOCAL_CLUSTR_NAME},${OTHER_CLUSTER_NAME}

对于新加入的Party,只用提供对应Pulsar的集群名/服务地址即可完成注册,Geo­Replication就可以通过注册信息进行同步复制。

Client增加Token认证

Pulsar作为Angel PowerFL的通信模块,没有加入用户级别的权限控制,但为了进一步保证Client生产和消费数据的安全,参考Pulsar Token authentication(请复制链接到浏览器查看:https://pulsar.apache.org/docs/en/security-token-admin/)增加了Token认证,Angel PowerFL的训练任务除了配置当前party使用的服务地址外,还需要配置admin token。由于Angel PowerFL整套系统是有kubernetes部署的,我们通过容器生成Pulsar集群需要的Public/Private keys:

代码语言:javascript
复制
# fl-private.key and fl-public.keydocker run --rm -v "$(pwd)":/tmp \     apachepulsar/pulsar-all:2.5.2 \     /pulsar/bin/pulsar tokens create-key-pair --output-private-key \     /tmp/fl-private.key --output-public-key /tmp/fl-public.key 
# admin-token.txt token fileecho -n `docker run --rm -v \     "$(pwd)":/tmp apachepulsar/pulsar-all:2.5.2 \     /pulsar/bin/pulsar tokens create --private-key \     file:///tmp/fl-private.key --subject admin` # prepare files for pulsar podskubectl create secret generic token-symmetric-key \     --from-file=TOKEN=admin-token.txt \     --from-file=PUBLICKEY=fl-public.key -n ${PARTY_NAME}

多集群Topic自动回收

由于Pulsar集群开启了Geo­Replication功能后,无法通过命令直接删除用过的Topic,而Angel PowerFL训练任务每次使用过的task是一次性的,任务结束这些Topic就没用了,如果不及时删除就会出现大量累积。

Pulsar对于Geo­Replication的Topic,可以通过 brokerDeleteInactiveTopicsEnabled配置开启 Topic自动回收,自动回收无用的Topic,需满足以下几个条件:

  • 当前Topic没有连接的生产者或者消费者
  • 当前Topic没有被订阅
  • 当前Topic没有需要保留的Message

Angel PowerFL部署的Pulsar集群,通过 brokerDeleteInactiveTopicsEnabled开启了Topic自动回收,在训练任务执行过程当中,每个Topic在使用完后就按回收条件进行了处置。同时,我们增加

brokerDeleteInactiveTopicsFrequencySeconds配置将回收的频率设置为3小时。

开启Topic限流

Angel PowerFL中的训练任务,在不同的数据集/算法/执行阶段,生产数据的流量峰值也不同(生产环境目前观察到,单个任务最大的数据量超过200G/小时),如果训练过程中出现Pulsar断连或者生产消费的异常,整个训练任务都要重新跑。

为了让Pulsar集群不被单个训练任务冲垮,我们使用了Pulsar的限流功能。Pulsar支持 messagerate和byte-rate两种生产限流策略,前者是限制每秒生产message的条数,后者是限制每秒生产的message大小。由于Angel PowerFL会将数据切分成4M大小的Message,最好的选择是通过message­rate限制生产Message的条数。在Angel PowerFL中,我们将namespace的message限制为30条(小于<30*4=120M/s):

代码语言:javascript
复制
./bin/pulsar-admin namespaces set-publish-rate fl-tenant/${namespace} -m 30

刚开始测试message-rate的限流功能时,出现了限不住的情况(限流设置失效),经过TDbank 团队负责Pulsar的同事一起排查,优化了限流功能并贡献到了社区(topic publish rate limit not take effect,请复制链接到浏览器查看详情:

https://github.com/apache/pulsar/issues/6975)。

优化Topic Unloading配置

Pulsar基于Broker集群负载状况,可以将Topic 动态分配到Broker上。如果拥有该Topic的Broker宕机,或者拥有该Topic的Broker负载过大,则该Topic将立即重新分配给另一个Broker ,而重新分配的过程就是Topic的Unloading,该操作意味着关闭Topic,释放所有者。

理论上,Topic Unloading是种正常的负载均衡调整,客户端将经历极小的延迟抖动,通常耗时 10ms左右。但Angel PowerFL初期版本在跑训练任务时,日志爆出大量因为Unloading Topic导致的连接异常,日志显示在不断的重试,但都不成功:

代码语言:javascript
复制
[sub] Could not get connection to broker: Topic is temporarily unavailabl e -- Will try again in 0.1 s

先来看Broker/Namespace/Bundle/Topic三者的关系。Bundle是Pulsar Namespace的一个分片机制,Namespace被分片为Bundles列表,每个Bundle包含Namespace的整个哈希范围的一部分。Topic不是直接分配给broker的,而是通过计算Topic的哈希码来确定把Topic分配给特定的Bundle,每个Bundle都是互相独立,再被分配到不同的Broker上。

Angel PowerFL早期版本的任务Topic没有复用,一个LR算法训练任务创建超过2000个Topic,并且每个topic生产的数据负载也不同,我们判断短时间大量创建topic并且负载不均衡导致频繁Topic Unloading。为了降低Topic Unloading的频率,对Pulsar的Bundle相关参数做了调整:

代码语言:javascript
复制
# 增加broker可分配最大Topic数量loadBalancerBrokerMaxTopics=500000# 启用自动拆分namespace bundleloadBalancerAutoBundleSplitEnabled=true# 增加触发拆分bundle的topic数量loadBalancerNamespaceBundleMaxTopics=10000# 增加触发拆分bundle的消息数loadBalancerNamespaceBundleMaxMsgRate=10000 

同时,在创建namespace设置默认bundles数量为64:

代码语言:javascript
复制
./bin/pulsar-admin namespaces create fl-tenant/${namespace} --bundles 64

经过以上调整,Angel PowerFL在任务执行期间没有再出现过由于Topic Unloading导致的断连情况。

Pulsar On Kubernetes

Angel PowerFL的所有服务均通过Helm部署在Kubernetes上,Pulsar作为其中的一个Chart,可以很好的利用k8s的资源隔离、快速扩缩容等特性。在Angel PowerFL使用Helm部署Pulsar的实践中,我们总结了以下几个经验:

  • 使用Local Persistent Volume作为存储 Pulsar是IO敏感的的服务,尤其bookie组件,在生产环境中建议使用SSD或独立的磁盘。Angel PowerFL在跑一些大数据集任务时,Pulsar经常出现No Bookies Available的异常,而这期间磁盘的util_max很高。我们通过Local Persistent Volume将bookie和zookeeper等其它组件挂载到单独的磁盘,减缓了磁盘IO的竞争。我们也测试过将pulsar的PV存储换成Ceph和NFS,性能都没有直接使用Local Persistent Volume好。
  • 使用NodeSelector Geo­Replication在做同步复制期间,Broker需要能够访问对方的pulsar proxy容器。Angel PowerFL将网关机单独打了标签,通过NodeSelector将Broker在可访问外网的网关机上。
  • 配置useHostNameAsBookieID bookie是有状态的组件,为了让bookie pod重建后服务正常,需要配置 useHostNameAsBookieID,以保证向zookeeper注册的ID是pod的hostname。

以上就是本文的全部内容了,大家如果遇到了相关的技术问题,欢迎在文章下方留言。

以“#你问我答#+提问内容”的形式留言提问,就有机会得到专家回复,还将获得腾讯视频VIP月卡一张哦!

扫码关注 | 即刻了解腾讯大数据技术动态

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2020-08-05,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 腾讯大数据 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
联邦学习
联邦学习(Federated Learning,FELE)是一种打破数据孤岛、释放 AI 应用潜能的分布式机器学习技术,能够让联邦学习各参与方在不披露底层数据和底层数据加密(混淆)形态的前提下,通过交换加密的机器学习中间结果实现联合建模。该产品兼顾AI应用与隐私保护,开放合作,协同性高,充分释放大数据生产力,广泛适用于金融、消费互联网等行业的业务创新场景。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档