前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Reactor的Publisher与Subscriber

Reactor的Publisher与Subscriber

作者头像
关忆北.
发布2022-11-12 14:12:46
5280
发布2022-11-12 14:12:46
举报
文章被收录于专栏:关忆北.关忆北.

Project Reactor介绍

在计算机中,响应式变成或者反应式编程(Reactive Programming)是一种面向数据流和变化传播的编程范式。这意味着可以在编程语言中很方便地变大静态或动态的数据流,而相关的计算模型会自动将变化的值通过数据流进行传播。

作用

Reactor希望用少量、有限个数的线程来满足高负载的需要。 IO阻塞浪费系统性能,只有纯异步处理才能发挥系统的全部性能。JDK的异步API较为难用,成为异步编程的瓶颈。

响应式编程特性
  • Responsive(响应式)
  • Resilient(弹性)
  • Message Driven(消息驱动)
  • asynchronous request(异步请求)
  • non-blocking(非阻塞)
  • Backpressure(背压)
数据处理流程
image-20221030094515383
image-20221030094515383
测试代码

Subscriber增强类:

代码语言:javascript
复制
public class LoggingSubscriber<T> implements Subscriber<T> {
    private static final Logger log = LoggerFactory.getLogger(LoggingSubscriber.class);

    private Subscription subscription;
    private long requested;
    private long received;
    private CountDownLatch finished = new CountDownLatch(1);

    @Override
    public void onComplete() {
        log.info("onComplete: sub={}", subscription.hashCode());
        finished.countDown();
    }

    @Override
    public void onError(Throwable t) {
        log.error("Error: sub={}, message={}", subscription.hashCode(), t.getMessage(),t);
        finished.countDown();
    }

    @Override
    public void onNext(T value) {
        log.info("onNext: sub={}, value={}", subscription.hashCode(), value);
        this.received++;
        this.requested++;
        subscription.request(1);
    }

    @Override
    public void onSubscribe(Subscription sub) {
        log.info("onSubscribe: sub={}", sub.hashCode());
        this.subscription = sub;
        this.received = 0;
        this.requested = 1;
        sub.request(1);
    }
    
    
    public long getRequested() {
        return requested;
    }
    
    public long getReceived() {
        return received;
    }

    /**
     * 阻塞调用者,直到发布者发出所有对象或产生错误
     */
    public void block() {
        try {
            finished.await(10, TimeUnit.SECONDS);
        }
        catch(InterruptedException iex) {
            throw new RuntimeException(iex);
        }
    }

}
使用Streams处理数据
代码语言:javascript
复制
public class SteamTest {
    private static Logger log = LoggerFactory.getLogger(SteamTest.class);
    public static void main(String[] args) {

        Publisher<String> pub = Streams.publish(Arrays.asList("hello", "hello again"));
        LoggingSubscriber<String> sub = new LoggingSubscriber<String>();
        pub.subscribe(sub);
        sub.block();

    }
}
限制请求数量
代码语言:javascript
复制
public class YieldTest {
    private static Logger log = LoggerFactory.getLogger(SteamTest.class);

    public static void main(String[] args) {
        /**
         * 限制对象创建数量
         * 接收yieldRequest对象并返回下一个要发出的对象的Function参数
         */
        Publisher<String> pub = Streams.yield((t) -> {
            System.out.println(t.getRequestNum());
            return t.getRequestNum() < 5 ? "hello" : null;
        });

        LoggingSubscriber<String> sub = new LoggingSubscriber<String>();
        pub.subscribe(sub);
        sub.block();
        assertEquals(5, sub.getReceived());
    }
}
周期性请求
代码语言:javascript
复制
public class PeriodicTest {
    private static Logger log = LoggerFactory.getLogger(SteamTest.class);

    public static void main(String[] args) {
        /**
         * 周期性做请求
         */
        ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
        Publisher<String> pub = Streams.periodically(executor, Duration.ofSeconds(1), (t) -> {
            return t < 5 ? String.format("hello %d", t) : null;
        });

        LoggingSubscriber<String> sub = new LoggingSubscriber<String>();
        pub.subscribe(sub);
        sub.block();
        assertEquals(5, sub.getReceived());
    }
}

一些核心概念

Operators-Publisher/Subscriber
  • Flux<T>是一个标准的Reactive Streams规范中的Publisher<T>,它代表一个包含了[0…N]个元素的异步序列流。在Reactive Streams规范中,针对流中每个元素,订阅者将会监听这三个事件:onNextonCompleteonError
  • Mono<T>是一个特殊的Flux<T>,它代表一个仅包含1个元素的异步序列流。因为只有一个元素,所以订阅者只需要监听onCompleteonError
Backpressure
  • Subscription
  • onRequest()、onCancel()、onDispose()
线程调度Schedulers
  • immediate()/single()/newSingle()
  • Elastic()/parallel()/newParallel()
错误处理
  • onError/onErrorReturn/onErrorResume
  • doOnError/doFinally
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2022-10-30,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Project Reactor介绍
    • 作用
      • 响应式编程特性
        • 数据处理流程
          • 测试代码
            • 一些核心概念
              • Operators-Publisher/Subscriber
              • Backpressure
              • 线程调度Schedulers
              • 错误处理
          领券
          问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档