前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >简单理解响应式编程

简单理解响应式编程

原创
作者头像
eeaters
修改2022-02-21 18:04:38
7680
修改2022-02-21 18:04:38
举报
文章被收录于专栏:阿杰阿杰

响应式编程

  • 参考网站
  • 编程模型
  • 基本对象
    • Flow.Publisher<T>
    • Flow.Subscription<T>
    • Flow.Subscriber<T>
    • Flow.Processor<T>
  • 简单的实战
    • 主线任务
    • 第一阶段
    • 第二阶段

参考网站

编程模型

非阻塞的异步编程,映射在代码中本质就是回调函数,与响应式编程模型对应的是传统的 指令式编程 ;

  • 指令编程模型: 同步阻塞,告诉计算机 该怎么做 ,控制的是状态
  • 响应式编程模型: 异步非阻塞, 告诉计算机 要做什么 ,控制的是目标

基本对象

以Java 9的api为例:

Flow.Publisher<T>

发布者, 数据输入的对象, T表示数据的类型

代码语言:txt
复制
    @FunctionalInterface
    public static interface Publisher<T> {
        public void subscribe(Subscriber<? super T> subscriber);
    }

Flow.Subscription<T>

中间代理, 发布者和订阅者并没有直接的联系,而是将数据的传递控制 从数据和数据的变化里分离出来,进而降低功能之间的耦合

代码语言:txt
复制
   public static interface Subscription {
        public void request(long n);
        public void cancel();
    }

Flow.Subscriber<T>

订阅者,T表述数据的类型,分别规定了四种情形下的反应:

  • 如果接收到订阅邀请该怎么办?这个行为由 onSubscribe
  • 这个方法的实现确定。如果接收到数据该怎么办?这个行为由 onNext
  • 这个方法的实现确定。如果遇到了错误该怎么办?这个行为由 onError
  • 这个方法的实现确定。如果数据传输完毕该怎么办?这个行为由 onComplete 这个方法的实现确定。
代码语言:txt
复制
 public static interface Subscriber<T> {

        public void onSubscribe(Subscription subscription);

        public void onNext(T item);

        public void onError(Throwable throwable);

        public void onComplete();
    }

Flow.Processor<T>

处理器,同时实现了Publisher和Subscriber,也就是做一个承上启下的作用,是流在执行过程中数据处理的中间流程;

代码语言:txt
复制
    public static interface Processor<T,R> extends Subscriber<T>, Publisher<R> {
    }

简单的尝试

主线任务

代码语言:txt
复制
使用Flow的api 实现: a=b+c
a: 假设需要知道一件事情结束的时间是周几
b:数据开始的时间是周几 周五周六周日不好好干活,推到周一 ,实际开始时间就是周一,
c:处理完这件事情的时间需要几天 , 周一 需要两天, 周二周三周四需要一天,

NOTE

不使用多线程,仅仅想对响应式的代码执行情况做一些了解

第一阶段

bc不拆分,快速实现业务

Publisher
代码语言:txt
复制
public class TimePublisher implements Flow.Publisher<LocalDate> {

    public final LinkedList<LocalDate> items;
    public TimePublisher(LinkedList<LocalDate> items) {
        this.items = items;
    }
    @Override
    public void subscribe(Flow.Subscriber subscriber) {
        subscriber.onSubscribe(new TimeSubscription(subscriber, items));
    }
}
Subscription
代码语言:txt
复制
public class TimeSubscription implements Flow.Subscription {

    private final Flow.Subscriber<LocalDate> subscriber;

    private LinkedList<LocalDate> list;

    public TimeSubscription(Flow.Subscriber<LocalDate> subscriber, LinkedList<LocalDate> list) {
        this.subscriber = subscriber;
        this.list = list;
    }

    @Override
    public void request(long n) {
        if (list.isEmpty()) {
            subscriber.onComplete();
            return;
        }
        for (long i = 0; i < n; i++) {
            LocalDate item = list.remove();
            subscriber.onNext(item);
        }
    }


    @Override
    public void cancel() {
        //标记为删除; 关闭资源等
    }
}
Subscriber
代码语言:txt
复制
public class TimeSubscriber implements Flow.Subscriber<LocalDate> {

    private Flow.Subscription subscription;

    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        this.subscription = subscription;
        subscription.request(1);
    }

    @Override
    public void onNext(LocalDate item) {
        DayOfWeek dayOfWeek = item.getDayOfWeek();
        int days = startAfterDay(dayOfWeek);
        LocalDate localDate = item.plusDays(days);
        System.out.println("完成日期为: " + localDate);

        subscription.request(1);
    }

    private int startAfterDay(DayOfWeek dayOfWeek) {
        return switch (dayOfWeek) {
            case MONDAY -> 2;
            case TUESDAY, THURSDAY, WEDNESDAY -> 1;
            case FRIDAY -> 5;
            case SATURDAY -> 4;
            case SUNDAY -> 3;
        };
    }

    @Override
    public void onError(Throwable throwable) {

    }

    @Override
    public void onComplete() {

    }
}
Main方法
代码语言:txt
复制
    public static void main(String[] args) {
        //第一步, 声明一个发布者
        LinkedList<LocalDate> linkedList = new LinkedList();
        linkedList.add(LocalDate.now());

        TimePublisher timePublisher = new TimePublisher(linkedList);
        //第二部,声明过程,
        TimeSubscriber timeSubscriber = new TimeSubscriber();
        timePublisher.subscribe(timeSubscriber);
    }

第二阶段

将执行过程拆解为两个过程执行,使用Processor,

增加一个处理过程
增加一个处理过程

当天是周几是流的中间结果,即是上游的结果又是下游的数据来源; 尝试下代码的编写

计算周几的中间处理器

这里直接将第一阶段的代码进行拆分,因此Publisher和Subscription不变,而subscriber则变成了Processor,成为了中间处理器

代码语言:txt
复制
public class TimeProcessor implements Flow.Processor<LocalDate,DayOfWeek> {

    private Flow.Subscription subscription;

    public TimeProcessor() {

    }

    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        this.subscription = subscription;
        subscription.request(1);
    }


    @Override
    public void onNext(LocalDate item) {
        DayOfWeek dayOfWeek = item.getDayOfWeek();
        emit(dayOfWeek);
        subscription.request(1);
    }



    @Override
    public void onError(Throwable throwable) {

    }

    @Override
    public void onComplete() {

    }
    // -------  下一个流水线,

    private void emit(DayOfWeek dayOfWeek) {
        subscribe(daySubscriber);
        daySubscription.emit(dayOfWeek);
    }

    DaySubscription daySubscription;
    @Override
    public void subscribe(Flow.Subscriber subscriber) {
        if (daySubscription == null) {
            return;
        }
        DaySubscription subscription = new DaySubscription((DaySubscriber) subscriber);
        this.daySubscription = subscription;
        daySubscriber.onSubscribe(subscription);
    }

    DaySubscriber daySubscriber;
    public TimeProcessor downStream(DaySubscriber daySubscriber) {
        this.daySubscriber = daySubscriber;
        return this;
    }

}
DaySubscription

下游的中介

demo为了省事直接写到了发布者的构造器中; 但是中间状态这里增加一个发出数据的方法; 发射数据后中介会调用订阅者消费消息

代码语言:txt
复制
public class DaySubscription implements Flow.Subscription {


    private final DaySubscriber dayProcessor;
    private DayOfWeek dayOfWeek;

    public DaySubscription(DaySubscriber daySubscriber) {
        this.dayProcessor = daySubscriber;
    }

    public void emit(DayOfWeek dayOfWeek) {
        this.dayOfWeek = dayOfWeek;
        request(1);
    }
    @Override
    public void request(long n) {
        dayProcessor.onNext(dayOfWeek);
    }

    @Override
    public void cancel() {

    }
}
DaySubscriber

下游订阅者

这个地方为了简单,并没有使用到背压; 因为作为demo,没有使用任何异步.所以onNext仅仅执行消费数据的逻辑

代码语言:txt
复制
public class DaySubscriber<T> implements Flow.Subscriber<DayOfWeek> {

    private Flow.Subscription daySubscription;

    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        this.daySubscription = subscription;
    }

    @Override
    public void onNext(DayOfWeek item) {
        int day =  switch (item) {
            case MONDAY -> 2;
            case TUESDAY, THURSDAY, WEDNESDAY -> 1;
            case FRIDAY -> 5;
            case SATURDAY -> 4;
            case SUNDAY -> 3;
        };
        System.out.printf("任务开始于: %s , 将在 %d 天后开始执行  ", item, day);
    }

    @Override
    public void onError(Throwable throwable) {

    }

    @Override
    public void onComplete() {

    }
}
Main

考虑简单; 仅仅有一个中间处理器来熟悉响应式的基本逻辑

本质上需要指定步骤,代码类似于上面的流程图

代码语言:txt
复制
     public static void main(String[] args) {
        LinkedList<LocalDate> linkedList = new LinkedList<>();
        linkedList.add(LocalDate.now());
        TimePublisher timePublisher = new TimePublisher(linkedList);

        timePublisher.subscribe(new TimeProcessor().downStream(new DaySubscriber()));
    }

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 参考网站
  • 编程模型
  • 基本对象
    • Flow.Publisher<T>
      • Flow.Subscription<T>
        • Flow.Subscriber<T>
          • Flow.Processor<T>
          • 简单的尝试
            • 主线任务
              • 第一阶段
                • Publisher
                • Subscription
                • Subscriber
                • Main方法
              • 第二阶段
                • 计算周几的中间处理器
                • DaySubscription
                • DaySubscriber
                • Main
            领券
            问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档