前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Java 9 新特性:Reactive Streams

Java 9 新特性:Reactive Streams

作者头像
dys
发布2018-06-04 16:35:54
1.4K0
发布2018-06-04 16:35:54
举报

Reactive Streams

Reactive Streams 是一个使用非阻塞背压机制的异步流处理标准。

back pressure(背压)是其中的关键概念。在异步模式中,消费者订阅生产者,从生产者那里获取数据,需要提供回调方法,当生产者产生新的可用数据后,就调用回调方法。当生产者发送数据的速度大于消费者处理的速度时,消费者就会抢占更多的资源来处理,并且有崩溃的可能。为了防止这种问题,需要一种机制,能让消费者通知生产者:生产速度太快了需要减速,然后生产者可以进行相应调整。这个机制就叫做背压

背压可以分为阻塞非阻塞

阻塞比较简单,例如生产者和消费者运行在同一个线程中,一个执行、另一个阻塞,意味着当消费者执行时,生产者不会发送新的数据。

非阻塞的方式是把 推模式 改为了 拉模式推模式是生产者来决定,生产者尽快的把数据发给消费者,拉模式是消费者来决定,消费者向生产者请求一定数量的数据,生产者会按照这个数量发送,在下次请求到来之前就是等待。

API 中的重要类型

Publisher

生产数据,供订阅者消费,只有一个方法 subscribe(Subscriber)

Subscriber

订阅生产者,接收数据(通过 onNext(T) 方法)、错误信息( onError(Throwable) 方法)、没有更多数据的信号( onComplete() ),在这些动作之前,publisher 会调用 onSubscription(Subscription)

Subscription

是发布者和订阅者之间的连接,订阅者会通过它来请求更多的数据( request(long) ),或者中断连接( cancel() )。

整体流程

  • 创建一个 Publisher 和一个 Subscriber
  • 通过 Publisher::subscribe 关联订阅者
  • 发布者创建一个 Subscription 然后调用 Subscriber::onSubscription,这样订阅连接就建立起来了
  • 订阅者调用 Subscription::request 请求一定数量的数据
  • 发布者调用 Subscriber::onNext 向订阅者传递数据,数据量不会超过订阅者指定的数量
  • 当发布者没有更多数据时会调用 Subscriber::onComplete,如果出错就调用 Subscriber::onError
  • 订阅者可以继续请求更多的数据,或者通过 Subscription::cancel 关闭连接

可以看到,订阅者调用 Subscription::request 主动请求,这就是对非阻塞背压的实现。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2018-05-24,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 JAVA高性能架构 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Reactive Streams
  • API 中的重要类型
    • Publisher
      • Subscriber
        • Subscription
          • 整体流程
          领券
          问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档