Akka(20): Stream:异步运算,压力缓冲-Async, batching backpressure and buffering

   akka-stream原则上是一种推式(push-model)的数据流。push-model和pull-model的区别在于它们解决问题倾向性:push模式面向高效的数据流下游(fast-downstream-subscriber),pull model倾向高效的上游(fast-upstream-publisher)。现实中速度同等的上下游并不多见,不匹配的上下游速度最终造成数据丢失。如果下游的subscriber无法及时接收由publisher向下游推送的全部数据,那么无论有多大的缓冲区,最终会造成溢出丢失数据。如果上游的publisher无法及时满足下游subscriber的数据读取需求会加长下游的等待状态造成超时甚至会使遗失下游请求遗失。对于akka-stream这种push模式的数据流,因为超速推送数据会造成数据丢失,所以必须想办法控制publisher产生数据的速度。因为akka-stream已经在上下游环节全部实现了Reactive-Streams-Specification,所以上下游之间可以进行互动,这样就可以在akka-stream里由下游通知上游自身可接收数据的状态来控制上游数据流速,即所谓的压力缓冲backpressure了。akka-stream的backpressure使用了缓冲区buffer来成批预存及补充数据,这样可以提高数据传输效率。另外,如果用async进行数据流的并行运算的话上游就不必理会下游反应,可以把数据推进buffer然后立即继续处理下一个数据元素。所以async运算模式的buffering就不可或缺了。akka-stream可以通过以下几种方式来设定异步运算使用的缓冲大小:

1、在配置文件中设定默认buffer:

akka.stream.materializer.max-input-buffer-size = 16

2、在ActorMaterializerSetting中宏观层面上设定:

val materializer = ActorMaterializer(
  ActorMaterializerSettings(system)
    .withInputBuffer(
      initialSize = 64,
      maxSize = 64))

3、通过Attribute属性设定。因为Atrribute保持了层级关系,所以通过Attribute设定的inputbuffer也延续了属性继承:

import Attributes._
val nestedSource =
  Source.single(0)
    .map(_ + 1)
    .named("nestedSource") // Wrap, no inputBuffer set

val nestedFlow =
  Flow[Int].filter(_ != 0)
    .via(Flow[Int].map(_ - 2).withAttributes(inputBuffer(4, 4))) // override
    .named("nestedFlow") // Wrap, no inputBuffer set

val nestedSink =
  nestedFlow.to(Sink.fold(0)(_ + _)) // wire an atomic sink to the nestedFlow
    .withAttributes(name("nestedSink") and inputBuffer(3, 3)) // override

在上面的示例里nestdSource继承了Materializer全局inputBuffer属性;nestedSink重写了属性;nestedFlow先是继承了nestedSink的设定然后又重写了自己的inputBuffer属性。我们可以用addAttribute来新添加Attribute:

  val flow = Flow[Int].map(_ * 2).async.addAttributes(Attributes.inputBuffer(16,16))
  val (_,fut) = flow.runWith(Source(1 to 10),Sink.foreach(println))
  fut.andThen{case _ => sys.terminate()}

上面定义这些inputBuffer包括了起始值和最大值,主要应用在backpressure。所以,理论上inputBuffer可以设成一个字节(initial=1,max=1),因为有了backpressure就不用担心数据溢出,但这样会影响数据流传输效率。所以akka-stream默认的缓冲区长度为16字节。所以aka-stream的backpressure是batching backpressure。

由于akka-stream是push模式的,我们还可以用buffer来控制包括Source,Flow这些上游环节推送的数据:

  val source = Source(1 to 10).buffer(16,OverflowStrategy.dropTail)
  val sum = source.runFold(0)((acc,i) => i + acc)
  sum.map(println)  //.andThen{case _ => sys.terminate()}

  val flow = Flow[Int].map(_ * 3).buffer(16,OverflowStrategy.dropNew)
  val (_,fut) = flow.runWith(Source(1 to 10),Sink.fold(0){(acc,a) => acc + a})
  fut.map(println).andThen{case _ => sys.terminate()}

上游所设buffer对publisher过快产生的数据可以采用溢出处理策略OverflowStrategy。上面用Attribute添加的inputBuffer默认了OverflowStrategy.backpressure,其它OverflowStrategy选项如下:

object OverflowStrategy {
  /**
   * If the buffer is full when a new element arrives, drops the oldest element from the buffer to make space for
   * the new element.
   */
  def dropHead: OverflowStrategy = DropHead

  /**
   * If the buffer is full when a new element arrives, drops the youngest element from the buffer to make space for
   * the new element.
   */
  def dropTail: OverflowStrategy = DropTail

  /**
   * If the buffer is full when a new element arrives, drops all the buffered elements to make space for the new element.
   */
  def dropBuffer: OverflowStrategy = DropBuffer

  /**
   * If the buffer is full when a new element arrives, drops the new element.
   */
  def dropNew: OverflowStrategy = DropNew

  /**
   * If the buffer is full when a new element is available this strategy backpressures the upstream publisher until
   * space becomes available in the buffer.
   */
  def backpressure: OverflowStrategy = Backpressure

  /**
   * If the buffer is full when a new element is available this strategy completes the stream with failure.
   */
  def fail: OverflowStrategy = Fail
}

当akka-stream需要与外界系统进行数据交换时就无法避免数据流上下游速率不匹配的问题了。如果外界系统不支持Reactive-Stream标准,就会发生数据丢失现象。对此akka-stream提供了具体的解决方法:如果外界系统是在上游过快产生数据可以用conflate函数用Seq这样的集合把数据传到下游。如果下游能及时读取则Seq(Item)中的Item正是上游推送的数据元素,否则Seq(i1,i2,i3...)就代表上游在下游再次读取时间段内产生的数据。因为Seq可以是无限大,所以理论上可以避免数据丢失。下面是这个函数的定义:

 /**
   * Allows a faster upstream to progress independently of a slower subscriber by conflating elements into a summary
   * until the subscriber is ready to accept them. For example a conflate step might average incoming numbers if the
   * upstream publisher is faster.
   *
   * This version of conflate allows to derive a seed from the first element and change the aggregated type to be
   * different than the input type. See [[FlowOps.conflate]] for a simpler version that does not change types.
   *
   * This element only rolls up elements if the upstream is faster, but if the downstream is faster it will not
   * duplicate elements.
   *
   * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
   *
   * '''Emits when''' downstream stops backpressuring and there is a conflated element available
   *
   * '''Backpressures when''' never
   *
   * '''Completes when''' upstream completes
   *
   * '''Cancels when''' downstream cancels
   *
   * @param seed Provides the first state for a conflated value using the first unconsumed element as a start
   * @param aggregate Takes the currently aggregated value and the current pending element to produce a new aggregate
   *
   * See also [[FlowOps.conflate]], [[FlowOps.limit]], [[FlowOps.limitWeighted]] [[FlowOps.batch]] [[FlowOps.batchWeighted]]
   */
  def conflateWithSeed[S](seed: Out ⇒ S)(aggregate: (S, Out) ⇒ S): Repr[S] =
    via(Batch(1L, ConstantFun.zeroLong, seed, aggregate).withAttributes(DefaultAttributes.conflate))

下面是conflateWithSeed函数用例:

import akka.actor._
import akka.stream._
import akka.stream.scaladsl._
import scala.concurrent.duration._
object StreamDemo1 extends App {

  implicit val sys = ActorSystem("streamSys")
  implicit val ec = sys.dispatcher
  implicit val mat = ActorMaterializer(
    ActorMaterializerSettings(sys)
      .withInputBuffer(1,1)
  )

   case class Tick()

   RunnableGraph.fromGraph(GraphDSL.create() { implicit b =>
    import GraphDSL.Implicits._

    // this is the asynchronous stage in this graph
    val zipper = b.add(ZipWith[Tick, Seq[String], Seq[String]]((tick, count) => count).async)
    // this slows down the pipeline by 3 seconds
    Source.tick(initialDelay = 3.seconds, interval = 3.seconds, Tick()) ~> zipper.in0
    // faster producer with all elements passed inside a Seq
    Source.tick(initialDelay = 1.second, interval = 1.second, "item")
      .conflateWithSeed(Seq(_)) { (acc,elem) => acc :+ elem } ~> zipper.in1

    zipper.out ~> Sink.foreach(println)
    ClosedShape
  }).run()
  
}

在上面这个例子里我们用ZipWith其中一个低速的输入端来控制整个管道的速率。这时我们会发现输出端Seq长度代表ZipWith消耗数据的延迟间隔。注意:前面3个输出好像没有延迟,这是akka-stream 预读prefetch造成的。因为我们设定了InputBuffer(Initial=1,max=1),第一个数据被预读当作及时消耗了。

如果没有实现Reactive-Stream标准的外界系统上游producer速率过慢,有可能造成下游超时,akka-stream提供了expand函数来解决这个问题:

 /**
   * Allows a faster downstream to progress independently of a slower publisher by extrapolating elements from an older
   * element until new element comes from the upstream. For example an expand step might repeat the last element for
   * the subscriber until it receives an update from upstream.
   *
   * This element will never "drop" upstream elements as all elements go through at least one extrapolation step.
   * This means that if the upstream is actually faster than the upstream it will be backpressured by the downstream
   * subscriber.
   *
   * Expand does not support [[akka.stream.Supervision.Restart]] and [[akka.stream.Supervision.Resume]].
   * Exceptions from the `seed` or `extrapolate` functions will complete the stream with failure.
   *
   * '''Emits when''' downstream stops backpressuring
   *
   * '''Backpressures when''' downstream backpressures or iterator runs empty
   *
   * '''Completes when''' upstream completes
   *
   * '''Cancels when''' downstream cancels
   *
   * @param seed Provides the first state for extrapolation using the first unconsumed element
   * @param extrapolate Takes the current extrapolation state to produce an output element and the next extrapolation
   *                    state.
   */
  def expand[U](extrapolate: Out ⇒ Iterator[U]): Repr[U] = via(new Expand(extrapolate))

当上游无法及时发送下游请求的数据时我们可以用expand推送一个固定的数据元素来临时满足下游的要求:

 val lastFlow = Flow[Double]
    .expand(Iterator.continually(_))

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏小灰灰

EventBus源码学习笔记(一)

EventBus 深入学习一 EventBus是一个消息总线,以观察者模式实现,用于简化程序的组件、线程通信,可以轻易切换线程、开辟线程; 传统上,Java...

1975
来自专栏Java编程技术

UML建模图实战笔记(全)

UML:Unified Modeling Language(统一建模语言),使用UML进行建模的作用有哪些那:

803
来自专栏ImportSource

ZGC回收器到底有多变态?

多少年来,JVM中的各种垃圾回收器都在努力追求着两个目标,暂停时间足够短,同时吞吐量也要不错。为了追求二者兼具,各种垃圾回收器可谓绞尽脑汁,但还是无法同时让两个...

2001
来自专栏微信终端开发团队的专栏

微信iOS卡顿监控系统

引子 微信 iOS 团队在值班的时候,时不时会收到这样的卡顿反馈:“用户A 刚才碰到从后台切换前台卡了一下,最近偶尔会遇到几次”、“用户B 反馈点对话框卡了五六...

4295
来自专栏吴小龙同學

Android 电量优化

为什么优化 现在智能手机基本能一天一充,如果一个应用耗电过多,肯定是有问题的,而开发中电量的优化可能是最容易被忽略的。 如何检测 1、手机设置-电池使用情况,查...

2607
来自专栏琦小虾的Binary

Node.js 异步编程基础理解

参考地址:《深入理解node.js异步编程:基础篇》 一、概述 目前开源社区最火热的技术当属 Node.js 莫属了,作为使用 Javascript 为主要开发...

22610
来自专栏吉浦迅科技

DAY31:阅读global memory

692
来自专栏SAP最佳业务实践

SAP最佳业务实践:SD–可退回包装物销售(120)-7未退回的托盘库存处理

一、 MIGO过帐未退回的托盘 对不能退回的托盘,做报废处理 在此步骤中,可以从库存移除未退回的托盘并过帐到成本中心。 1. 进行以下输入: 字段名称用户操作和...

3686
来自专栏WeTest质量开放平台团队的专栏

Android性能优化来龙去脉总结

一款app除了要有令人惊叹的功能和令人发指交互之外,在性能上也应该追求丝滑的要求,这样才能更好地提高用户体验。

70714
来自专栏QQ音乐技术团队的专栏

OpenSL ES那些事

本次分享主要是将JNI层的声音采集,传输以及播放过程做相应介绍,若是大家有更好的优化建议,欢迎指教。

1.4K7

扫码关注云+社区