前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >【Kotlin 协程】Flow 异步流 ⑧ ( 背压概念 | 使用缓冲处理背压问题 | 使用 flowOn 处理背压问题 | 从提高收集元素效率方向解决背压问题 )

【Kotlin 协程】Flow 异步流 ⑧ ( 背压概念 | 使用缓冲处理背压问题 | 使用 flowOn 处理背压问题 | 从提高收集元素效率方向解决背压问题 )

作者头像
韩曙亮
发布2023-03-30 18:30:29
5690
发布2023-03-30 18:30:29
举报
文章被收录于专栏:韩曙亮的移动开发专栏

文章目录

一、背压概念


" 背压 " 概念 指的是 数据 受到 与 流动方向 一致的压力 ,

数据 生产者 的 生产效率 大于 数据 消费者 的 消费效率 , 就会产生 背压 ;

处理背压问题 , 有 2 种方案 :

  • 降低 数据 生产者 的生产效率 ;
  • 提高 数据 消费者 的消费效率 ;

背压代码示例 : 以 100 ms间隔发射元素 , 以 200 ms 间隔收集元素 , 发射元素的效率 高于 收集元素的效率, 此时会产生背压 ;

代码语言:javascript
复制
package kim.hsl.coroutine

import android.os.Bundle
import androidx.appcompat.app.AppCompatActivity
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.runBlocking
import kotlin.system.measureTimeMillis

class MainActivity : AppCompatActivity() {
    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_main)

        // 将主线程包装成协程
        runBlocking {
            val delta = measureTimeMillis {
                // 以 200 ms 的间隔收集元素
                // 发射元素的效率 高于 收集元素的效率, 此时会产生背压
                flowEmit().collect {
                    delay(200)
                    println("收集元素 $it , 当前线程 ${Thread.currentThread().name}")
                }
            }
            println("收集元素耗时 $delta ms")
        }
    }

    suspend fun flowEmit() = flow<Int> {
        // 以 100 ms 的间隔发射元素
        for (i in 0..5) {
            delay(100)
            emit(i)
            println("发射元素 $i , 当前线程 ${Thread.currentThread().name}")
        }
    }
}

执行结果 : 收集元素的耗时总共耗费了 2284 ms ;

代码语言:javascript
复制
23:37:49.496 System.out   kim.hsl.coroutine     I  收集元素 0 , 当前线程 main
23:37:49.496 System.out   kim.hsl.coroutine     I  发射元素 0 , 当前线程 main
23:37:49.878 System.out   kim.hsl.coroutine     I  收集元素 1 , 当前线程 main
23:37:49.879 System.out   kim.hsl.coroutine     I  发射元素 1 , 当前线程 main
23:37:50.259 System.out   kim.hsl.coroutine     I  收集元素 2 , 当前线程 main
23:37:50.259 System.out   kim.hsl.coroutine     I  发射元素 2 , 当前线程 main
23:37:50.600 System.out   kim.hsl.coroutine     I  收集元素 3 , 当前线程 main
23:37:50.600 System.out   kim.hsl.coroutine     I  发射元素 3 , 当前线程 main
23:37:50.973 System.out   kim.hsl.coroutine     I  收集元素 4 , 当前线程 main
23:37:50.973 System.out   kim.hsl.coroutine     I  发射元素 4 , 当前线程 main
23:37:51.352 System.out   kim.hsl.coroutine     I  收集元素 5 , 当前线程 main
23:37:51.353 System.out   kim.hsl.coroutine     I  发射元素 5 , 当前线程 main
23:37:51.353 System.out   kim.hsl.coroutine     I  收集元素耗时 2284 ms
在这里插入图片描述
在这里插入图片描述

二、使用缓冲处理背压问题


调用 Flow#buffer 函数 , 为 收集元素 添加一个缓冲 , 可以指定缓冲区个数 ;

代码示例 :

代码语言:javascript
复制
package kim.hsl.coroutine

import android.os.Bundle
import androidx.appcompat.app.AppCompatActivity
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.buffer
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.runBlocking
import kotlin.system.measureTimeMillis

class MainActivity : AppCompatActivity() {
    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_main)

        // 将主线程包装成协程
        runBlocking {
            val delta = measureTimeMillis {
                // 以 200 ms 的间隔收集元素
                // 发射元素的效率 高于 收集元素的效率, 此时会产生背压
                flowEmit().buffer(10).collect {
                    delay(200)
                    println("收集元素 $it , 当前线程 ${Thread.currentThread().name}")
                }
            }
            println("收集元素耗时 $delta ms")
        }
    }

    suspend fun flowEmit() = flow<Int> {
        // 以 100 ms 的间隔发射元素
        for (i in 0..5) {
            delay(100)
            emit(i)
            println("发射元素 $i , 当前线程 ${Thread.currentThread().name}")
        }
    }
}

执行结果 : 发射元素后 , 将发射的元素缓存起来 , 然后慢慢接收元素 ;

代码语言:javascript
复制
23:39:41.401 System.out   kim.hsl.coroutine     I  发射元素 0 , 当前线程 main
23:39:41.543 System.out   kim.hsl.coroutine     I  发射元素 1 , 当前线程 main
23:39:41.644 System.out   kim.hsl.coroutine     I  收集元素 0 , 当前线程 main
23:39:41.646 System.out   kim.hsl.coroutine     I  发射元素 2 , 当前线程 main
23:39:41.760 System.out   kim.hsl.coroutine     I  发射元素 3 , 当前线程 main
23:39:41.877 System.out   kim.hsl.coroutine     I  收集元素 1 , 当前线程 main
23:39:41.879 System.out   kim.hsl.coroutine     I  发射元素 4 , 当前线程 main
23:39:42.022 System.out   kim.hsl.coroutine     I  发射元素 5 , 当前线程 main
23:39:42.120 System.out   kim.hsl.coroutine     I  收集元素 2 , 当前线程 main
23:39:42.364 System.out   kim.hsl.coroutine     I  收集元素 3 , 当前线程 main
23:39:42.572 System.out   kim.hsl.coroutine     I  收集元素 4 , 当前线程 main
23:39:42.814 System.out   kim.hsl.coroutine     I  收集元素 5 , 当前线程 main
23:39:42.821 System.out   kim.hsl.coroutine     I  收集元素耗时 1601 ms
在这里插入图片描述
在这里插入图片描述

三、使用 flowOn 处理背压问题


上述 发射元素 和 收集元素 都是在同一个线程中执行的 , 这两个操作可以并行执行 , 即使用 flowOn 指定收集元素的线程 ;

使用 flowOn 更改了协程上下文 , 使得 发射元素 与 收集元素 在不同的线程中并行执行 ;

代码示例 :

代码语言:javascript
复制
package kim.hsl.coroutine

import android.os.Bundle
import androidx.appcompat.app.AppCompatActivity
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.buffer
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.flowOn
import kotlinx.coroutines.runBlocking
import kotlin.system.measureTimeMillis

class MainActivity : AppCompatActivity() {
    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_main)

        // 将主线程包装成协程
        runBlocking {
            val delta = measureTimeMillis {
                // 以 200 ms 的间隔收集元素
                // 发射元素的效率 高于 收集元素的效率, 此时会产生背压
                flowEmit().flowOn(Dispatchers.Default).collect {
                    delay(200)
                    println("收集元素 $it , 当前线程 ${Thread.currentThread().name}")
                }
            }
            println("收集元素耗时 $delta ms")
        }
    }

    suspend fun flowEmit() = flow<Int> {
        // 以 100 ms 的间隔发射元素
        for (i in 0..5) {
            delay(100)
            emit(i)
            println("发射元素 $i , 当前线程 ${Thread.currentThread().name}")
        }
    }
}

执行结果 :

代码语言:javascript
复制
23:45:19.675 System.out   kim.hsl.coroutine     I  发射元素 0 , 当前线程 DefaultDispatcher-worker-1
23:45:19.817 System.out   kim.hsl.coroutine     I  发射元素 1 , 当前线程 DefaultDispatcher-worker-1
23:45:19.918 System.out   kim.hsl.coroutine     I  收集元素 0 , 当前线程 main
23:45:19.921 System.out   kim.hsl.coroutine     I  发射元素 2 , 当前线程 DefaultDispatcher-worker-1
23:45:20.046 System.out   kim.hsl.coroutine     I  发射元素 3 , 当前线程 DefaultDispatcher-worker-1
23:45:20.124 System.out   kim.hsl.coroutine     I  收集元素 1 , 当前线程 main
23:45:20.186 System.out   kim.hsl.coroutine     I  发射元素 4 , 当前线程 DefaultDispatcher-worker-1
23:45:20.292 System.out   kim.hsl.coroutine     I  发射元素 5 , 当前线程 DefaultDispatcher-worker-1
23:45:20.333 System.out   kim.hsl.coroutine     I  收集元素 2 , 当前线程 main
23:45:20.548 System.out   kim.hsl.coroutine     I  收集元素 3 , 当前线程 main
23:45:20.790 System.out   kim.hsl.coroutine     I  收集元素 4 , 当前线程 main
23:45:21.000 System.out   kim.hsl.coroutine     I  收集元素 5 , 当前线程 main
23:45:21.007 System.out   kim.hsl.coroutine     I  收集元素耗时 1507 ms
在这里插入图片描述
在这里插入图片描述

四、从提高收集元素效率方向解决背压问题


从提高收集元素效率方向解决背压问题 :

  • 调用 Flow#conflate 函数 , 合并发射元素项 , 不对每个值进行单独处理 ;
  • 调用 Flow#collectLatest 函数 , 取消并重新发射最后一个元素 , 只关心最后一个结果 , 不关心中间的过程值 ;

1、Flow#conflate 代码示例

代码示例 :

代码语言:javascript
复制
package kim.hsl.coroutine

import android.os.Bundle
import androidx.appcompat.app.AppCompatActivity
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.runBlocking
import kotlin.system.measureTimeMillis

class MainActivity : AppCompatActivity() {
    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_main)

        // 将主线程包装成协程
        runBlocking {
            val delta = measureTimeMillis {
                // 以 200 ms 的间隔收集元素
                // 发射元素的效率 高于 收集元素的效率, 此时会产生背压
                flowEmit().conflate().collect {
                    delay(200)
                    println("收集元素 $it , 当前线程 ${Thread.currentThread().name}")
                }
            }
            println("收集元素耗时 $delta ms")
        }
    }

    suspend fun flowEmit() = flow<Int> {
        // 以 100 ms 的间隔发射元素
        for (i in 0..5) {
            delay(100)
            emit(i)
            println("发射元素 $i , 当前线程 ${Thread.currentThread().name}")
        }
    }
}

执行结果 : 发射了 6 个元素 , 但是只接收到了 5 个元素 , 元素 2 被过滤掉了 ;

代码语言:javascript
复制
23:49:21.720 System.out   kim.hsl.coroutine     I  发射元素 0 , 当前线程 main
23:49:21.855 System.out   kim.hsl.coroutine     I  发射元素 1 , 当前线程 main
23:49:21.924 System.out   kim.hsl.coroutine     I  收集元素 0 , 当前线程 main
23:49:21.992 System.out   kim.hsl.coroutine     I  发射元素 2 , 当前线程 main
23:49:22.129 System.out   kim.hsl.coroutine     I  发射元素 3 , 当前线程 main
23:49:22.130 System.out   kim.hsl.coroutine     I  收集元素 1 , 当前线程 main
23:49:22.270 System.out   kim.hsl.coroutine     I  发射元素 4 , 当前线程 main
23:49:22.333 System.out   kim.hsl.coroutine     I  收集元素 3 , 当前线程 main
23:49:22.374 System.out   kim.hsl.coroutine     I  发射元素 5 , 当前线程 main
23:49:22.564 System.out   kim.hsl.coroutine     I  收集元素 4 , 当前线程 main
23:49:22.805 System.out   kim.hsl.coroutine     I  收集元素 5 , 当前线程 main
23:49:22.814 System.out   kim.hsl.coroutine     I  收集元素耗时 1277 ms
在这里插入图片描述
在这里插入图片描述

2、Flow#collectLatest 代码示例

代码示例 :

代码语言:javascript
复制
package kim.hsl.coroutine

import android.os.Bundle
import androidx.appcompat.app.AppCompatActivity
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.runBlocking
import kotlin.system.measureTimeMillis

class MainActivity : AppCompatActivity() {
    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_main)

        // 将主线程包装成协程
        runBlocking {
            val delta = measureTimeMillis {
                // 以 200 ms 的间隔收集元素
                // 发射元素的效率 高于 收集元素的效率, 此时会产生背压
                flowEmit().collectLatest {
                    delay(200)
                    println("收集元素 $it , 当前线程 ${Thread.currentThread().name}")
                }
            }
            println("收集元素耗时 $delta ms")
        }
    }

    suspend fun flowEmit() = flow<Int> {
        // 以 100 ms 的间隔发射元素
        for (i in 0..5) {
            delay(100)
            emit(i)
            println("发射元素 $i , 当前线程 ${Thread.currentThread().name}")
        }
    }
}

执行结果 : 只接收了最后一个元素 , 前几个元素没有接收 ;

代码语言:javascript
复制
23:53:01.328 System.out   kim.hsl.coroutine     I  发射元素 0 , 当前线程 main
23:53:01.461 System.out   kim.hsl.coroutine     I  发射元素 1 , 当前线程 main
23:53:01.603 System.out   kim.hsl.coroutine     I  发射元素 2 , 当前线程 main
23:53:01.712 System.out   kim.hsl.coroutine     I  发射元素 3 , 当前线程 main
23:53:01.857 System.out   kim.hsl.coroutine     I  发射元素 4 , 当前线程 main
23:53:02.004 System.out   kim.hsl.coroutine     I  发射元素 5 , 当前线程 main
23:53:02.246 System.out   kim.hsl.coroutine     I  收集元素 5 , 当前线程 main
23:53:02.255 System.out   kim.hsl.coroutine     I  收集元素耗时 1119 ms
在这里插入图片描述
在这里插入图片描述
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2022-12-24,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 文章目录
  • 一、背压概念
  • 二、使用缓冲处理背压问题
  • 三、使用 flowOn 处理背压问题
  • 四、从提高收集元素效率方向解决背压问题
    • 1、Flow#conflate 代码示例
      • 2、Flow#collectLatest 代码示例
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档