前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >mono subscribe源码分析

mono subscribe源码分析

作者头像
johnhuster的分享
发布2022-03-28 20:17:38
8240
发布2022-03-28 20:17:38
举报
文章被收录于专栏:johnhuster

https://blog.csdn.net/john1337/article/details/101028908这篇文章已经对整个project reactor工作流程做了概述,本文对mono的subscribe源码进行更细的描述:

本文将以实际代码来分析下project reactor常见的subscribe工作原理

代码语言:javascript
复制
        Mono.just("hello")
         .filter(t->t.startsWith("h"))
         .map(String::toUpperCase)
         .subscribe(System.out::println);

上面是一个简单的反应式编程的代码,mono.just创建数据源,然后经过filter经过过滤处理,然后经过map进行处理,熟悉jdk stream的对map这个操作一定不会陌生,map及其以前的操作仅仅是创建了一个publisher,上面仅仅是声明阶段,并没有产生实际效果,只有经过了subscribe之后才开始工作,下面就用上面的代码来分析下整个工作流程。

代码语言:javascript
复制
    /***
    **该方法会根据声明部分创建完整发布、订阅关系链
    *本例子中涉及到下面几个订阅者类:LambdaMonoSubscriber、MapFuseableSubscriber以及 
    *FilterFuseableSubscriber
    ***/
	public final void subscribe(Subscriber<? super T> actual) {
		CorePublisher publisher = Operators.onLastAssembly(this);
		CoreSubscriber subscriber = Operators.toCoreSubscriber(actual);

        //本例中的filter、map节点都是实现了OptimizableOperator
		if (publisher instanceof OptimizableOperator) {
			OptimizableOperator operator = (OptimizableOperator) publisher;
            //下面创建发布、订阅链,创建过程跟声明阶段相比,即map-->filter
			while (true) {
				subscriber = operator.subscribeOrReturn(subscriber);
				if (subscriber == null) {
					// null means "I will subscribe myself", returning...
					return;
				}
                
                //map上一节点为filter,filter不是数据源,filter节点的上一节点
                //为MonoJust,为数据源,所以filter节点nextOptimizableSource返回null
				OptimizableOperator newSource = operator.nextOptimizableSource();
				if (newSource == null) {
                    //最后在这里更新发布者为最终的发布者,例如该例子中的MonoJust
					publisher = operator.source();
					break;
				}
				operator = newSource;
			}
		}
        //对于本文来说,publisher为MonoJust对象,subscriber为FilterFuseableSubscriber
        //MonoJust对象调用subscribe方法会调用其subscriber(即FilterFuseableSubscriber)的onSubscribe方法,然后进入onSubscribe阶段,onSubscribe调用顺序跟声明阶段相同,当onSubscribe传到subscribe方法的订阅者时将进入request阶段,request阶段执行顺序跟声明阶段相反,当request阶段执行到数据源端又会触发调用阶段的执行,常见的为subscriber.onNext方法
		publisher.subscribe(subscriber);
	}
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2020/02/01 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档