前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Combine之Backpressure

Combine之Backpressure

作者头像
CC老师
发布2022-03-15 08:16:35
5670
发布2022-03-15 08:16:35
举报

我们在使用订阅者的时候,都是用Sink或者Assign,但是大家有没有想过一个问题,这两种订阅者在第一次连接到发布者的时候,会发送一个无限大(unlimited)的需求(Demand)。

这个时候,订阅者就会一直不停的接收到发布者发过来的内容,按理说,订阅者无条件接收就可以了,但是,如果发布者发布的速度太快了,而订阅者接收的速度很慢,接收不了,那怎么办呢?又或者说,我不需要实时接收,我只需要隔一段时间接收一次,这种需求也是非常多见的。

这个时候就涉及到一个概念,背压(back pressure),或者叫回压,我们可以通过这个背压,来精确的控制发布者什么时候生成元素,我们通常理解的话,发布者应该是主动发布的,然后订阅者被动的去接收。

其实不是,而是由订阅者去连接和获取元素的时候,才进行发布,这个时候,我们就可以通过使用Subscribers.Demand这个类型来告诉发布者我可以接收多少个元素,也就是返回可以追加接收的事件数量,这样就可以做到控制发布者的发送速度,以此来定义 Backpressure 的响应行为。

Combine 在设计思路和 API 等等很多地方都参考了 ReactiveX,特别是具体到 RxSwift 的实现。如果你对响应式编程有了一定的认识的话,把你的项目从 RxSwift 迁移到 Combine 应该是非常容易的,不得不说Combine“抄袭”的非常成功。

如果非要说 RxSwift 和 Combine 的最大的不同之处,那就是 RxSwift 到现在为止都没有支持 backpressure,只有RxJava才有这个机制;但是 Combine 中原生对这个特性进行了支持。

我写了一个demo,发布者是这个定时器:

点击button的时候,就开始订阅:

这个订阅者是自定义的,他遵循Subscriber协议,然后实现协议里面的三个方法:

第一个方法里面,使用接收到的这个订阅subscription,去向发布者请求元素,这个Subscription协议就是连接发布者和订阅者的桥梁;第二个方法是用来告诉订阅者,发布者已经产生了元素。

并且可以接收到一个Date元素input,然后返回一个需求量,也就是你希望订阅者还能够接收多少个元素;第三个方法告诉订阅者,发布者已经发布完了,不管是发布正常或者是有错误,这个结果我都会告诉你。

说的再简单点,发布者会跟踪所有的订阅者,看谁的需求没有满足,就产生元素给谁,一直到满足所有的需求,发布者就不产生元素了,任务就完成了,在第一个方法里面,发布者和订阅者就都存在了。

但是需求为0,就不会产生任何元素,一直到1秒钟延时结束执行到闭包里面的request,订阅者就给了发布者一个非零的需求,现在发布者就开始发布元素,并且是每隔一秒发布一次,一共发布三个元素就会停止发布,但是也并不会执行第三个方法打印完成,因为发布者还在等待更多的需求。

所以这时候如果有需要的话,订阅者可以把这个订阅次数保存下来,然后定期去请求元素,就可以很灵活的管理这个发布过程。比如有一个非常常见的开发场景,我们可以在输入框中输入一些内容进行搜索操作,并且一旦输入框的内容改变了,我就去调用接口刷新对应的列表数据,但这个接口调用频率是一定要进行控制的,不然的话。

如果我按住一个英文字母键不放开,输入框会一直在变化,就会不停的去调用接口来刷新页面数据,就算你的代码逻辑很好,不会卡顿不会崩溃,你们的后台人员也肯定会骂你,因为平白无故增加了服务器压力,这个时候,就可以用到这个背压的方式来进行控制和处理。

而且还有更简单的方式,就是直接使用背压操作符,完全不需要自定义订阅者:

1.buffer(size:prefetch:whenFull:),保留来自上游发布者的固定数量的项目。缓冲满了之后,缓冲区会丢弃元素或抛出错误;

2.debounce(for:scheduler:options:),只在上游发布者在指定的时间间隔内停止发布时才发布;

3.throttle(for:scheduler:latest:),以给定的最大速率生成元素。如果在一个间隔内接收到多个元素,则仅发送最新的或最早的元素;

4.collect(_:) 和 collect(_:options:) 聚集元素,直到它们超过给定的数量或时间间隔,然后向订阅者发送元素数组。如果订阅者可以同时处理多个元素,这个操作符将是很好的选择。

这些操作符都可以控制订阅者接收的元素数量,所以可以放心地连接无限需求的订阅者,比如:sink(receiveValue:) 和 assign(to:on:)。

Debounce是防抖的意思,Throttle是节流,他们俩在前端开发中可能会经常用到,做iOS开发可能很多人都不知道这个概念,其实我们在工作中或多或少都遇到过需要使用背压的场景,只是大多数人接触的不多,没有具体了解到概念和原理的对应关系,就像设计模式有很多种,实际开发中我们用到了某种设计模式自己却不知道。

实际使用我就不写了,比较简单,类似这样:

另外,我们也可以通过设置 flatMap 的 maxPublishers来控制发布频率,我举个例子:

然后,在点击事件里面进行调用:

这样也同样实现了每隔一秒发布一次,但是一定要注意一个问题,如果我把publisher更换成 PassthoughSubject 或 Notification,就会出现数据遗漏的情况。因为我们限制了数据的并行处理数量,所以就导致数据的消耗时间超过了数据的生成时间。这个时候,我们就需要在 Publisher 的后面添加 buffer 来对数据进行缓冲:

最后,把Publisher转换成AsyncSequence也可以做到类似的效果,创建一个遵循AsyncSequence协议的结构体,将从 Publihser 中获取的数据通过 AsyncStream 转送出去,并将迭代器指向 AsyncStream 的迭代器即可,这里就不展开来写了,学无止境,执笔共勉。

- END -

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2022-01-27,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 HelloCoder全栈小集 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档