前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊reactive streams的tranform操作

聊聊reactive streams的tranform操作

作者头像
code4it
发布2018-09-17 15:45:55
7160
发布2018-09-17 15:45:55
举报
文章被收录于专栏:码匠的流水账

本文主要展示一下reactive streams的一些transform操作

mergeWith

代码语言:javascript
复制
    @Test
    public void testMerge(){
        Flux<String> flux1 = Flux.interval(Duration.ofSeconds(1))
                .take(3)
                .map(e -> "[flux1]:"+e);

        Flux<String> mergeFlux = Flux.interval(Duration.ofSeconds(1))
                .delayElements(Duration.ofSeconds(1))
                .take(3)
                .map(e -> "[flux2]:"+e)
                .mergeWith(flux1);

        mergeFlux.subscribe(e -> {
            LOGGER.info("subscribe:{}",e);
        });

        mergeFlux.blockLast();
    }

输出实例

代码语言:javascript
复制
21:18:07.583 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
21:18:08.618 [parallel-2] INFO com.example.demo.TransformTest - subscribe:[flux1]:0
21:18:09.619 [parallel-2] INFO com.example.demo.TransformTest - subscribe:[flux1]:1
21:18:09.645 [parallel-6] INFO com.example.demo.TransformTest - subscribe:[flux2]:0
21:18:10.619 [parallel-2] INFO com.example.demo.TransformTest - subscribe:[flux1]:2
21:18:10.649 [parallel-8] INFO com.example.demo.TransformTest - subscribe:[flux2]:1
21:18:11.654 [parallel-2] INFO com.example.demo.TransformTest - subscribe:[flux2]:2

可以发现,他们是交叉合并的。

concatWith

代码语言:javascript
复制
    @Test
    public void testConcat(){
        Flux<String> flux1 = Flux.interval(Duration.ofSeconds(1))
                .take(3)
                .map(e -> "[flux1]:"+e);

        Flux<String> concatFlux = Flux.interval(Duration.ofSeconds(1))
                .delayElements(Duration.ofSeconds(1))
                .take(3)
                .map(e -> "[flux2]:"+e)
                .concatWith(flux1);
        concatFlux.subscribe(e -> {
            LOGGER.info("subscribe:{}",e);
        });
        concatFlux.blockLast();
    }

输出

代码语言:javascript
复制
21:19:00.779 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
21:19:02.832 [parallel-4] INFO com.example.demo.TransformTest - subscribe:[flux2]:0
21:19:03.836 [parallel-6] INFO com.example.demo.TransformTest - subscribe:[flux2]:1
21:19:04.840 [parallel-8] INFO com.example.demo.TransformTest - subscribe:[flux2]:2
21:19:05.845 [parallel-2] INFO com.example.demo.TransformTest - subscribe:[flux1]:0
21:19:06.845 [parallel-2] INFO com.example.demo.TransformTest - subscribe:[flux1]:1
21:19:07.844 [parallel-2] INFO com.example.demo.TransformTest - subscribe:[flux1]:2

可以发现concatWith只是连接两个flux的数据,并不是按emit的顺序交叉来

zipWith

代码语言:javascript
复制
    @Test
    public void testZip(){
        List<String> firstList = Lists.newArrayList("a","b","c","d","e","a","b");
        List<String> secondList = Lists.newArrayList("1","2","3","4","5");
        Flux<Tuple2<String,String>> zipFlux =  Flux.fromIterable(firstList)
                .zipWith(Flux.fromIterable(secondList));
        zipFlux.subscribe(e -> {
            LOGGER.info("subscribe:{}",e);
        });
    }

输出如下

代码语言:javascript
复制
21:20:59.506 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
21:20:59.516 [main] INFO com.example.demo.TransformTest - subscribe:[a,1]
21:20:59.517 [main] INFO com.example.demo.TransformTest - subscribe:[b,2]
21:20:59.517 [main] INFO com.example.demo.TransformTest - subscribe:[c,3]
21:20:59.517 [main] INFO com.example.demo.TransformTest - subscribe:[d,4]
21:20:59.517 [main] INFO com.example.demo.TransformTest - subscribe:[e,5]

可以发现flux1相比flux2多余的数据没有被zip

flatMap

代码语言:javascript
复制
    @Test
    public void testFlatMap(){
        List<String> secondList = Lists.newArrayList("1","2","3","4","5");
        Flux<String> flatMapFlux = Flux.fromIterable(secondList)
                .flatMap((str) ->{
                    return Mono.just(str).repeat(2).map(String::toUpperCase).delayElements(Duration.ofMillis(1));
                });
        flatMapFlux.subscribe(e -> {
            LOGGER.info("subscribe:{}",e);
        });
        flatMapFlux.blockLast();

        Flux<String> mapFlux = Flux.fromIterable(secondList)
                .repeat(2)
                .map(String::toUpperCase);
        mapFlux.subscribe(e -> {
                    LOGGER.info("map subscribe:{}",e);
                });
        mapFlux.blockLast();
    }

输出

代码语言:javascript
复制
21:33:46.904 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
21:33:46.958 [parallel-1] INFO com.example.demo.TransformTest - subscribe:1
21:33:46.959 [parallel-1] INFO com.example.demo.TransformTest - subscribe:2
21:33:46.959 [parallel-1] INFO com.example.demo.TransformTest - subscribe:3
21:33:46.960 [parallel-1] INFO com.example.demo.TransformTest - subscribe:4
21:33:46.960 [parallel-1] INFO com.example.demo.TransformTest - subscribe:5
21:33:46.960 [parallel-1] INFO com.example.demo.TransformTest - subscribe:2
21:33:46.960 [parallel-7] INFO com.example.demo.TransformTest - subscribe:3
21:33:46.960 [parallel-8] INFO com.example.demo.TransformTest - subscribe:4
21:33:46.960 [parallel-1] INFO com.example.demo.TransformTest - subscribe:5
21:33:46.961 [parallel-6] INFO com.example.demo.TransformTest - subscribe:1
21:33:46.963 [main] INFO com.example.demo.TransformTest - map subscribe:1
21:33:46.963 [main] INFO com.example.demo.TransformTest - map subscribe:2
21:33:46.963 [main] INFO com.example.demo.TransformTest - map subscribe:3
21:33:46.963 [main] INFO com.example.demo.TransformTest - map subscribe:4
21:33:46.963 [main] INFO com.example.demo.TransformTest - map subscribe:5
21:33:46.963 [main] INFO com.example.demo.TransformTest - map subscribe:1
21:33:46.963 [main] INFO com.example.demo.TransformTest - map subscribe:2
21:33:46.963 [main] INFO com.example.demo.TransformTest - map subscribe:3
21:33:46.963 [main] INFO com.example.demo.TransformTest - map subscribe:4
21:33:46.963 [main] INFO com.example.demo.TransformTest - map subscribe:5

flatMap是异步的

reduce

代码语言:javascript
复制
    @Test
    public void testReduce(){
        List<String> secondList = Lists.newArrayList("1","2","3","4","5");
        Mono<Integer> reduceMono = Flux.fromIterable(secondList)
                .flatMap(e -> Mono.just(e).map(item -> Integer.valueOf(item)))
                .reduce((total, e) -> total + e);
        reduceMono.subscribe(e -> {
            LOGGER.info("subscribe:{}",e);
        });
    }

输出

代码语言:javascript
复制
21:36:29.978 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
21:36:30.014 [main] INFO com.example.demo.TransformTest - subscribe:15

groupBy

代码语言:javascript
复制
    @Test
    public void testGroup(){
        List<String> firstList = Lists.newArrayList("a","b","c","d","e","a","b");
        Flux<GroupedFlux<String, String>> groupFlux = Flux.fromIterable(firstList)
                .map(String::toUpperCase)
                .groupBy(key -> key);
        groupFlux.subscribe(e -> {
            LOGGER.info("subscribe:{}",e.collectList().subscribe(item -> {
                LOGGER.info("item:{}",item);
            }));
        });
    }

输出

代码语言:javascript
复制
21:37:00.912 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
21:37:00.949 [main] INFO com.example.demo.TransformTest - subscribe:reactor.core.publisher.LambdaMonoSubscriber@5faeada1
21:37:00.951 [main] INFO com.example.demo.TransformTest - subscribe:reactor.core.publisher.LambdaMonoSubscriber@1563da5
21:37:00.951 [main] INFO com.example.demo.TransformTest - subscribe:reactor.core.publisher.LambdaMonoSubscriber@2bbf4b8b
21:37:00.951 [main] INFO com.example.demo.TransformTest - subscribe:reactor.core.publisher.LambdaMonoSubscriber@30a3107a
21:37:00.951 [main] INFO com.example.demo.TransformTest - subscribe:reactor.core.publisher.LambdaMonoSubscriber@33c7e1bb
21:37:00.951 [main] INFO com.example.demo.TransformTest - item:[A, A]
21:37:00.952 [main] INFO com.example.demo.TransformTest - item:[B, B]
21:37:00.952 [main] INFO com.example.demo.TransformTest - item:[C]
21:37:00.952 [main] INFO com.example.demo.TransformTest - item:[D]
21:37:00.952 [main] INFO com.example.demo.TransformTest - item:[E]

first

代码语言:javascript
复制
    @Test
    public void testFirst(){
        List<String> firstList = Lists.newArrayList("a","b","c","d","e","a","b");
        List<String> secondList = Lists.newArrayList("1","2","3","4","5");
        Flux<String> firstFlux = Flux.fromIterable(firstList)
                .delayElements(Duration.ofMillis(200));
        Flux<String> secondFlux = Flux.fromIterable(secondList)
                .take(2);

        Flux<String> result = Flux.first(firstFlux, secondFlux);
        result.subscribe(e -> {
            LOGGER.info("subscribe:{}",e);
        });
    }

toIterable

代码语言:javascript
复制
    @Test
    public void testToIterable(){
        List<String> firstList = Lists.newArrayList("a","b","c","d","e","a","b");
        Iterable<String> itr = Flux.fromIterable(firstList)
                .map(String::toUpperCase)
                .toIterable();
        itr.forEach(e -> LOGGER.info(e));
    }

输出

代码语言:javascript
复制
21:39:35.031 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
21:39:35.045 [main] INFO com.example.demo.TransformTest - A
21:39:35.045 [main] INFO com.example.demo.TransformTest - B
21:39:35.045 [main] INFO com.example.demo.TransformTest - C
21:39:35.045 [main] INFO com.example.demo.TransformTest - D
21:39:35.045 [main] INFO com.example.demo.TransformTest - E
21:39:35.045 [main] INFO com.example.demo.TransformTest - A
21:39:35.045 [main] INFO com.example.demo.TransformTest - B

小结

reactive streams的操作相当于在jdk的streams的基础上实现了reactive化,可以参照着了解。

doc

  • Reactor – How to Combine Publishers (Flux/Mono)
本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2018-01-18,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 码匠的流水账 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • mergeWith
  • concatWith
  • zipWith
  • flatMap
  • reduce
  • groupBy
  • first
  • toIterable
  • 小结
  • doc
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档