前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Kotlin 学习笔记(七)—— Flow 数据流学习实践指北(三)冷流转热流以及代码实例

Kotlin 学习笔记(七)—— Flow 数据流学习实践指北(三)冷流转热流以及代码实例

作者头像
修之竹
发布2023-03-24 14:40:11
1.1K0
发布2023-03-24 14:40:11
举报

“最近好像没啥热点,还是说太忙了没空摸鱼看新闻了?人大又要召开了,真心希望这一届的委员们能够提一些靠谱的提案,也不枉我上周网购的商品这周还没到北京了···

前一节(Kotlin 学习笔记(六)—— Flow 数据流学习实践指北(二)StateFlow 与 SharedFlow)介绍完了两种热流的构造方法以及它们的特点,那有没有方法可以将冷流转化为热流呢?当然是有的。那为什么需要将冷流转化为热流呢?

假如有这么一个场景:一开始有一个冷流 coldFlow 和它对应的消费者,后来下游又有几个新来的消费者需要使用这个 coldFlow,并且还需要之前已发送过的数据。而冷流的生产者与消费者是一对一的关系,且没有 replay 缓存机制,为新的消费者再创建一个冷流开销较大,这种情况下将冷流转为热流就显得事半功倍了。

1. shareIn 操作符

Flow 中的 shareIn 操作符就可以将冷流转为热流,它的方法声明是:

// code 1
public fun <T> Flow<T>.shareIn(
    scope: CoroutineScope,
    started: SharingStarted,
    replay: Int = 0
): SharedFlow<T>

首先看返回值,最终确实会转化为一个热流 SharedFlow 实例。方法参数先来看最简单的 replay 参数,就是设置回播到每个新增消费者的数据个数,默认为 0。所以默认情况下,新增的消费者只能收到从它开始收集的时间点之后,生产者发送的数据。

再来看第一个 scope 参数,用于设置一个 CoroutineScope 作用域,注意其生命周期的长度需要比任何消费者都要长,保证被转化成的热流能在所有消费者收集数据进行消费时,都能处于活跃状态。新被转化的热流其实就是一个共享数据流,可以被所有的消费者共享使用。

第二个参数 started 复杂一些,它是用于设置被转化为共享数据流的启动方式,官方提供有 3 种方式,下面一个个说:

SharingStarted.Eagerly 勤快式启动方式。不等第一个消费者出现就会立即启动,需要注意的是,这种方式只会保留启动时数据流发送的前 replay 个数据,再之前的数据会立即丢弃。即不对数据流缓存区以外的数据负责,所以 replay 缓存区大小设置很重要。

SharingStarted.Lazily 懒汉式启动方式。需要等第一个消费者出现才会启动,第一个消费者可以接收到数据流所有发送的数据;但其他后面的消费者只能接收到最近的 replay 个数据。这种方式启动的数据流会一直保持活跃状态,甚至所有的的消费者都退出观察不再接收了,数据流仍然会缓存最近的 replay 个数据。

SharingStarted.WhileSubscribed() 灵活式启动方式。默认情况下就是有消费者来它就立即启动,没消费者接收了它就立即停止。所以在第一个消费者出现数据流就启动,当最后一个消费者退出它就立即停止,但它仍会永久缓存最近的 replay 个数据。此外,这种启动方式还可以根据需求自定义设置参数:

// code 2
public fun WhileSubscribed(
    stopTimeoutMillis: Long = 0,
    replayExpirationMillis: Long = Long.MAX_VALUE
): SharingStarted =
    StartedWhileSubscribed(stopTimeoutMillis, replayExpirationMillis)

stopTimeoutMillis:设置最后一个消费者退出后,多长时间后再关闭数据流。默认是 0,即立即关闭。replayExpirationMillis:设置关闭流之后等待多长时间后,再重置清空缓存区 replay cache 的数据。默认是 Long.MAX_VALUE,即永远保存。

自定义 SharingStarted 其实还可以自定义启动方式,自己实现 SharingStarted 接口即可。如果看了前三种启动方式的源码,不难会发现,其实启动方式都是使用固定的几个 SharingCommand 实现的。SharingCommand 有三种:

// code 3
public enum class SharingCommand {
    /**
     * 开始启动,并开始收集上游数据流.
     * 多次发送这个命令并没有什么用(支持防抖),如果先发送 STOP 再发送 START 则是重启一个上游数据流。
     */
    START,

    /**
     * 停止数据流, 取消上游数据流的收集所在协程。
     */
    STOP,

    /**
     * 停止数据流, 取消上游数据流的收集所在协程。并且将 replayCache 缓冲区的值重置为初始状态。
     * 如果是 shareIn 操作符,则会调用 [MutableSharedFlow.resetReplayCache] 方法;
     * 如果是 stateIn 操作符,则会将缓冲数据重置为最初设置的初始值.
     */
    STOP_AND_RESET_REPLAY_CACHE
}

感兴趣的同学可以看看 SharingStarted.WhileSubscribed() 的具体实现类 StartedWhileSubscribed 里面的源码。如果需要自定义启动方式,照着葫芦画瓢即可。

既然有 shareIn,那自然就少不了 stateIn 了。

2. stateIn 操作符

方法声明:

// code 4
public fun <T> Flow<T>.stateIn(
    scope: CoroutineScope,
    started: SharingStarted,
    initialValue: T
): StateFlow<T>

首先可以看出返回值是一个热流 StateFlow 实例,那么自然而然就需要一个参数给它设置一个初始值,即第三个参数 initialValue。前两个参数与 shareIn 一样,这里就不再赘述。

3. shareIn 与 stateIn 使用指北

3.1 SharingStarted.WhileSubscribed() 实际使用

从上面的介绍可知,这种启动方式可以在没有消费者时自动取消上游数据流,从而避免资源的浪费。但在实际使用中,建议使用 SharingStarted.WhileSubscribed(5000),即在最后一个消费者停止后再保持数据流 5 秒钟的活跃状态。避免在某些特定情况下(如配置改变——最常见就是横竖屏切换、暗夜模式切换)重启上游的数据流。

3.2 shareIn、stateIn 适用于属性声明而非方法返回值

shareInstateIn 都会创建一个新的数据流,具体说就是 shareIn 会构建一个 ReadonlySharedFlow 实例;stateIn 则会构建一个 ReadonlyStateFlow 实例。而新创建的数据流会一直保存在内存中,直到传入数据流的作用域被取消或者没有任何引用时才会被 GC 回收。

所以下面代码中,前一部分代码是禁止使用的,正确的使用应该是如后一部分的代码,即在属性中使用。

// code 5
//错误示例:每次调用方法都会构建新的数据流
fun getUser(): Flow<User> =
    userLocalDataSource.getUser()
            .shareIn(externalScope, WhileSubscribed())    

//正确示例:在属性中使用 shareIn 或 stateIn 
 val user: Flow<User> = 
     userLocalDataSource.getUser().shareIn(externalScope, WhileSubscribed())

3.3 MutableSharedFlow 的 subscriptionCount 参数

这个参数表示的是 MutableSharedFlow 中活跃的消费者数目,即订阅者的个数。可用于监听消费者的数目变更,下面就是一个例子:

// code 6
sharedFlow.subscriptionCount
    .map { count -> count > 0 } // count > 0 说明有消费者,返回 true;= 0 说明没有消费者了,返回 false
    .distinctUntilChanged() // only react to true<->false changes
    .onEach { isActive -> // configure an action
        if (isActive) { // do something } else { // do something }
    }
    .launchIn(scope) // launch it

这个例子可以在有消费者收集数据流时,做一些自己的操作;当所有消费者都停止收集时,再处理另外的一些操作,比如资源回收等。

distinctUntilChanged 操作符比较面生,它就是过滤掉前面接收到的重复值,从而使得后面只会接收到发生了变化的新值,和 StateFlow 特性一样。

onEach 操作符也比较常见,可以在流上新增一些处理操作,再发给下游。

3.4 与操作符的搭配使用

如果在实际使用中,需要得知上游数据流的一些状态,比如开始、完成等,则需要在上游数据流转为热流之前添加一些操作符起到监听的作用。

onStart、onCompletion 操作符监听启动和完成

// code 7
private fun shareInOnStartDemo() {
    val testFlow = flow {
        println("++++emit before")
        emit(4)
        delay(1000)
        emit(5)
        delay(1000)
        emit(6)
    }.onStart {
        emit(-1)
        println("++++ onStart")
    }.onCompletion {
        emit(-100)
        println("++++ onCompletion")
    }.shareIn(
        lifecycleScope,
        SharingStarted.WhileSubscribed(),
        8
    )
    lifecycleScope.launch {
        testFlow.collect {
            println("++++ collector receive $it")
        }
    }
}

从打印的 log 可以看到,确实可以监听状态。当然也可以在相同的位置添加 catch 操作符用于监听异常的发生,感兴趣的同学可以试试看。

4. StateFlow 代码实战

说了这么多 Flow 的东西,最后以一个实际的例子结束这一章节的学习笔记吧!

下面我将用一个应用实例来讲解 StateFlow 的实际应用。这个例子将会用到 debouncedistinctUnitChangedflatMapLatest 等操作符,用这些操作符去实现一个文本输入中实时查询的例子。

假设有个需求,要实现一个浏览器搜索的功能,根据用户不断输入的字符去查询相关的内容。如果不做任何处理,用户对键入的字符串做的任何修改,都会去请求一次接口,那后端服务器肯定是吃不消的;对于用户而言,在不断输入的过程中返回的结果用户并不会很关心,他只会关心最终输入完成之后请求的数据。那么,如何减少后端的接口请求次数是关键所在。

先来看看核心的代码:

// code 8   ViewModel.kt 文件
val queryStateFlow = MutableStateFlow("")

fun getQueryResult(): Flow<String> {
    return queryStateFlow
        .debounce(300L)
        .distinctUntilChanged()
        .flatMapLatest {
            if (it.isNullOrBlank()) {
                flow { emit("") }
            } else {
                dataFromNetwork(it).catch {
                    emitAll( flow { emit("") } )
                }
            }
        }
        .flowOn(Dispatchers.IO)
}

// 模拟网络请求的耗时操作
private fun dataFromNetwork(query: String): Flow<String> {
    return flow {
        delay(2000)
        emit(query) // 返回请求的结果
    }
}

首先可以直观地感受到,使用 Flow 去处理这一逻辑较为简单,代码量较少,这也是 Flow 的魅力所在。我们按顺序介绍一下所使用到的 Flow 操作符:

debounce 操作符 具体的操作符方法声明:

// code 9
public fun <T> Flow<T>.debounce(timeoutMillis: Long): Flow<T>

用于过滤掉最新的发射值之前 timeoutMillis 时间内发射的值,返回一个过滤后的 Flow。官方栗子非常清楚:

// code 10
flow {
    emit(1)
    delay(90)
    emit(2)
    delay(90)
    emit(3)
    delay(1010)
    emit(4)
    delay(1010)
    emit(5)
}.debounce(1000)
最终会发射出下面的三个值:
3, 4, 5

发射 1 之后不到 1000ms 又发射了 2,所以 1 就会被过滤掉不会发射了,以此类推。所以最后发射的值是一定可以发射成功的。通过这个操作符,我们就可以有效减少频繁请求接口的问题,这里设置的 timeout 为 300ms,即在用户连续输入过程中每间隔 300ms 才去请求一次数据。

distinctUntilChanged 操作符 具体操作符声明为:

// code 11
public fun <T> Flow<T>.distinctUntilChanged(): Flow<T>

用于过滤掉重复的发射值。虽然 StateFlow 本身就可过滤掉没有变化的发射值,但在这里还是需要的,因为用户可能会删除刚输入的字符,这一操作符可进一步减少不必要的接口请求。

flatMapLatest 操作符 我看的代码版本这个操作符还是实验性api,后续可能被移除。具体操作符声明为:

// code 12
@ExperimentalCoroutinesApi
public inline fun <T, R> Flow<T>.flatMapLatest(@BuilderInference crossinline transform: suspend (value: T) -> Flow<R>): Flow<R>

这个操作符可以在原流的基础上生成一个新流,当原流依次发出 a、b 两值时,新流都会接收,但如果新流 a 值的相关操作还未结束,则会取消 a 值的相关操作,并用 b 值进行操作。简单说就是,丢弃旧值操作,换用新值操作。下面是一个例子:

// code 13
    fun flatMapLatestDemo() {
        val testFlow = flow {
            emit("a")
            delay(100)
            emit("b")
        }.flatMapLatest {
            flow {
                emit("receive $it")
                delay(200)
                emit("send $it")
            }
        }

        lifecycleScope.launch {
            testFlow.collect {
                println("----$it")
            }
        }
    }

通过打印的 log 可以看出,a,b 都被 flatMapLatest 操作符接收到了,只有 b 最终通过。这是因为 a 先到达,等待了 100ms 后新的值 b 也到了,但 a 还在等待中,这时 flatMapLatest 就会取消掉 a 后续的操作。如果把 delay(200) 改成 delay(50),那最终 a,b 都能被打印出来。

所以这个操作符在 code 8 中的作用就是进一步减少接口请求的次数。当输入的新字符串到来时,就会将之前旧字符串还未结束的请求操作取消掉,用新的字符串去请求数据。

ViewModel.kt 的代码终于说完了,其他的代码就比较常规了,直接上码:

// code 14  MainActivity.kt
binding.editText.addTextChangedListener(object : TextWatcher{
    override fun beforeTextChanged(s: CharSequence?, start: Int, count: Int, after: Int) { }

    override fun onTextChanged(input: CharSequence?, start: Int, before: Int, count: Int) {
        viewModel.queryStateFlow.value = input.toString()
    }

    override fun afterTextChanged(s: Editable?) { }
})

lifecycleScope.launch {
    repeatOnLifecycle(Lifecycle.State.STARTED) {
        viewModel.getQueryResult()
            .collect {
                binding.tvText.text = it
            }
    }
}

有关 Flow 的相关知识就到此结束了,来个简单总结吧~

总结

1)shareInstateIn 都可将冷流转化为热流,将数据共享给多个消费者,无需为每个消费者创建同一个数据流的新实例。两者通常用于提升性能,在没有消费者时缓存数据; 2)SharingStarted 启动方式有 EagerlyLazilyWhileSubscribed 三种,最常用的还是 WhileSubscribed,有消费者就启动,没有就停止,还能设置停止延时时长和缓存过期时长;3)注意 shareInstateIn 都会新建一个 Flow,不要用于方法的返回值,建议赋值给属性;4)shareInstateInonStartonCompletion 等搭配可监听转成的热流的状态;5)distinctUntilChanged 操作符可过滤重复数据,一般用于 SharedFlow;debounce 可用于在某一时间段内防抖;flatMapLatest 操作符可以用最新值替换旧值发送给下游,旧值直接被取消作废。

参考文献

  1. StateFlow 和 SharedFlow 官方文档 https://developer.android.google.cn/kotlin/flow/stateflow-and-sharedflow?hl=zh-cn
  2. Flow 操作符 shareIn 和 stateIn 使用须知;Android开发者;https://mp.weixin.qq.com/s/PbqF-vzDrttYq-cSR6NDmQ
  3. Kotlin协程:冷流转换热流的使用与原理;LeeDuo;https://blog.csdn.net/LeeDuoZuiShuai/article/details/127145092
本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2023-03-06,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 修之竹 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1. shareIn 操作符
  • 2. stateIn 操作符
  • 3. shareIn 与 stateIn 使用指北
    • 3.1 SharingStarted.WhileSubscribed() 实际使用
      • 3.2 shareIn、stateIn 适用于属性声明而非方法返回值
        • 3.3 MutableSharedFlow 的 subscriptionCount 参数
          • 3.4 与操作符的搭配使用
          • 4. StateFlow 代码实战
          • 总结
          • 参考文献
          领券
          问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档