前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >【Kotlin 协程】Flow 流展平 ( 连接模式 flatMapConcat | 合并模式 flatMapMerge | 最新展平模式 flatMapLatest )

【Kotlin 协程】Flow 流展平 ( 连接模式 flatMapConcat | 合并模式 flatMapMerge | 最新展平模式 flatMapLatest )

作者头像
韩曙亮
发布2023-03-30 18:31:30
1.2K0
发布2023-03-30 18:31:30
举报
文章被收录于专栏:韩曙亮的移动开发专栏

文章目录

一、Flow 流展平


Flow 流在 接收元素 时 , 可能需要 另一个 流的元素 , 两个流之间进行 交互的操作 就是 展平 , 常见的 展平模式有 :

  • 连接模式 flatMapConcat : m 个元素的流 与 n 个元素的流 连接后 , 元素个数为 m x n 个 ;
  • 合并模式 flatMapMerge : m 个元素的流 与 n 个元素的流 合并后 , 元素个数为 n x m 个 ;
  • 最新展平模式 flatMapLatest : 前面的看时间间隔进行结合 , 中间的可能跳过某些元素 , 不要中间值 , 只重视最新的数据 ;

1、连接模式 flatMapConcat 代码示例

连接模式 flatMapConcat : m 个元素的流 与 n 个元素的流 连接后 , 元素个数为 m x n 个 ;

flatMapConcat 函数原型 :

代码语言:javascript
复制
/**
 * 通过应用[transform]转换原始流发出的元素,它返回另一个流,
 * 然后连接并压平这些流。
 *
 * 该方法是' map(transform).flattenConcat() '的快捷方式。看到[flattenConcat]。
 *
 * 请注意,尽管这个操作符看起来非常熟悉,但我们不鼓励在常规的特定于应用程序的流中使用它。
 * 最有可能的是,暂停[map]操作符中的操作就足够了,线性转换更容易推理。
 */
@FlowPreview
public fun <T, R> Flow<T>.flatMapConcat(transform: suspend (value: T) -> Flow<R>): Flow<R> =
    map(transform).flattenConcat()

调用 FlowA.flatMapConcat(FlowB) 代码 , 先拿到 FlowA , 然后让 FlowA 每个元素 与 FlowB 进行连接 , 以 FlowA 的元素顺序为主导 ;

代码示例 : 注意 两个 流 连接后的间隔 , (0…2) 流之间的发射间隔 100ms , stringFlow 流元素发射间隔 200ms , 连接后的流要结合上述两个间隔 , 在 (0…2) 流 的元素之间间隔为 100ms , 在 (0…2) 流单个元素与所有的 stringFlow 流元素连接的间隔为 200ms ;

代码语言:javascript
复制
package kim.hsl.coroutine

import android.os.Bundle
import androidx.appcompat.app.AppCompatActivity
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

class MainActivity : AppCompatActivity() {
    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_main)

        runBlocking {
            val startTime = System.currentTimeMillis()
            (0..2).asFlow()
                        .onEach { delay(100) }
                        // 该 Flow 流与  stringFlow 进行连接
                        .flatMapConcat { stringFlow(it) }
                        .collect {
                            println("收集到元素 $it, 时间 ${System.currentTimeMillis() - startTime}")
                        }
        }
    }

    suspend fun stringFlow(num: Int) = flow<String> {
        emit("$num flatMapContact Hello First")
        delay(200)
        emit("$num flatMapContact Hello Second")
    }
}

执行结果 :

代码语言:javascript
复制
I/System.out: 收集到元素 0 flatMapContact Hello First, 时间 201
I/System.out: 收集到元素 0 flatMapContact Hello Second, 时间 448
I/System.out: 收集到元素 1 flatMapContact Hello First, 时间 608
I/System.out: 收集到元素 1 flatMapContact Hello Second, 时间 837
I/System.out: 收集到元素 2 flatMapContact Hello First, 时间 954
I/System.out: 收集到元素 2 flatMapContact Hello Second, 时间 1196
在这里插入图片描述
在这里插入图片描述

2、合并模式 flatMapMerge 代码示例

合并模式 flatMapMerge : m 个元素的流 与 n 个元素的流 合并后 , 元素个数为 n x m 个 ;

flatMapMerge 函数原型 :

代码语言:javascript
复制
/**
 * 通过应用[transform]转换原始流发出的元素,它返回另一个流,
 * 然后合并并压平这些气流。
 *
 * 此操作符按顺序调用[transform],然后将结果流与[concurrency]合并
 * 对并发收集流的数量的限制。
 * 它是' map(transform).flattenMerge(concurrency)'的快捷方式。
 * 详见[flattenMerge]。
 *
 * 请注意,尽管这个操作符看起来非常熟悉,但我们不鼓励在常规的特定于应用程序的流中使用它。
 * 最有可能的是,暂停[map]操作符中的操作就足够了,线性转换更容易推理。
 *
 * ###算子融合
 *
 * [flowOn]、[buffer]和[produceIn] __after_此操作符的应用被融合
 * 它是并发合并,因此只有一个正确配置的通道用于执行合并逻辑。
 *
 * @param并发控制运行中的流的数量,最多收集[concurrency]个流
 * 同时。默认情况下,它等于[DEFAULT_CONCURRENCY]。
 */
@FlowPreview
public fun <T, R> Flow<T>.flatMapMerge(
    concurrency: Int = DEFAULT_CONCURRENCY,
    transform: suspend (value: T) -> Flow<R>
): Flow<R> =
    map(transform).flattenMerge(concurrency)

调用 FlowA.flatMapMerge(FlowB) 代码 , 先拿到 FlowB , 然后让 FlowB 每个元素 与 FlowA 进行结合 , 以 FlowB 的元素顺序为主导 ;

代码示例 :

代码语言:javascript
复制
package kim.hsl.coroutine

import android.os.Bundle
import androidx.appcompat.app.AppCompatActivity
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

class MainActivity : AppCompatActivity() {
    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_main)

        runBlocking {
            val startTime = System.currentTimeMillis()
            (0..2).asFlow()
                        .onEach { delay(100) }
                        // 该 Flow 流与  stringFlow 进行合并
                        .flatMapMerge { stringFlow(it) }
                        .collect {
                            println("收集到元素 $it, 时间 ${System.currentTimeMillis() - startTime}")
                        }
        }
    }

    suspend fun stringFlow(num: Int) = flow<String> {
        emit("$num flatMapMerge Hello First")
        delay(500)
        emit("$num flatMapMerge Hello Second")
    }
}

执行结果 :

代码语言:javascript
复制
I/System.out: 收集到元素 0 flatMapMerge Hello First, 时间 192
I/System.out: 收集到元素 1 flatMapMerge Hello First, 时间 328
I/System.out: 收集到元素 2 flatMapMerge Hello First, 时间 451
I/System.out: 收集到元素 0 flatMapMerge Hello Second, 时间 698
I/System.out: 收集到元素 1 flatMapMerge Hello Second, 时间 866
I/System.out: 收集到元素 2 flatMapMerge Hello Second, 时间 993
在这里插入图片描述
在这里插入图片描述

3、最新展平模式 flatMapLatest 代码示例

最新展平模式 flatMapLatest : 前面的看时间间隔进行结合 , 中间的可能跳过某些元素 , 不要中间值 , 只重视最新的数据 ;

flatMapLatest 函数原型 :

代码语言:javascript
复制
/**
 * 返回一个流,每当原始流发出一个值时,该流切换到[transform]函数生成的新流。
 * 当原始流产生一个新值时,由' transform '块产生的前一个流将被取消。
 *
 * 例如,以下流程:
 * ```
 * flow {
 *     emit("a")
 *     delay(100)
 *     emit("b")
 * }.flatMapLatest { value ->
 *     flow {
 *         emit(value)
 *         delay(200)
 *         emit(value + "_last")
 *     }
 * }
 * ```
 * produces `a b b_last`
 *
 * 该操作符默认为[buffered][buffer],其输出缓冲区的大小可以通过应用后续的[buffer]操作符来改变。
 */
@ExperimentalCoroutinesApi
public inline fun <T, R> Flow<T>.flatMapLatest(@BuilderInference crossinline transform: suspend (value: T) -> Flow<R>): Flow<R> =
    transformLatest { emitAll(transform(it)) }

代码示例 :

代码语言:javascript
复制
package kim.hsl.coroutine

import android.os.Bundle
import androidx.appcompat.app.AppCompatActivity
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

class MainActivity : AppCompatActivity() {
    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_main)

        runBlocking {
            val startTime = System.currentTimeMillis()
            (0..2).asFlow()
                        .onEach { delay(100) }
                        // 该 Flow 流与  stringFlow 进行合并
                        .flatMapLatest { stringFlow(it) }
                        .collect {
                            println("收集到元素 $it, 时间 ${System.currentTimeMillis() - startTime}")
                        }
        }
    }

    suspend fun stringFlow(num: Int) = flow<String> {
        emit("$num flatMapLatest Hello First")
        delay(500)
        emit("$num flatMapLatest Hello Second")
    }
}

执行结果 :

代码语言:javascript
复制
I/System.out: 收集到元素 0 flatMapLatest Hello First, 时间 233
I/System.out: 收集到元素 1 flatMapLatest Hello First, 时间 381
I/System.out: 收集到元素 2 flatMapLatest Hello First, 时间 547
I/System.out: 收集到元素 2 flatMapLatest Hello Second, 时间 1079
在这里插入图片描述
在这里插入图片描述
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2022-12-26,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 文章目录
  • 一、Flow 流展平
    • 1、连接模式 flatMapConcat 代码示例
      • 2、合并模式 flatMapMerge 代码示例
        • 3、最新展平模式 flatMapLatest 代码示例
        领券
        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档