" 背压 " 概念 指的是 数据 受到 与 流动方向 一致的压力 ,
数据 生产者 的 生产效率 大于 数据 消费者 的 消费效率 , 就会产生 背压 ;
处理背压问题 , 有 2 种方案 :
背压代码示例 : 以 100 ms间隔发射元素 , 以 200 ms 间隔收集元素 , 发射元素的效率 高于 收集元素的效率, 此时会产生背压 ;
package kim.hsl.coroutine
import android.os.Bundle
import androidx.appcompat.app.AppCompatActivity
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.runBlocking
import kotlin.system.measureTimeMillis
class MainActivity : AppCompatActivity() {
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_main)
// 将主线程包装成协程
runBlocking {
val delta = measureTimeMillis {
// 以 200 ms 的间隔收集元素
// 发射元素的效率 高于 收集元素的效率, 此时会产生背压
flowEmit().collect {
delay(200)
println("收集元素 $it , 当前线程 ${Thread.currentThread().name}")
}
}
println("收集元素耗时 $delta ms")
}
}
suspend fun flowEmit() = flow<Int> {
// 以 100 ms 的间隔发射元素
for (i in 0..5) {
delay(100)
emit(i)
println("发射元素 $i , 当前线程 ${Thread.currentThread().name}")
}
}
}
执行结果 : 收集元素的耗时总共耗费了 2284 ms ;
23:37:49.496 System.out kim.hsl.coroutine I 收集元素 0 , 当前线程 main
23:37:49.496 System.out kim.hsl.coroutine I 发射元素 0 , 当前线程 main
23:37:49.878 System.out kim.hsl.coroutine I 收集元素 1 , 当前线程 main
23:37:49.879 System.out kim.hsl.coroutine I 发射元素 1 , 当前线程 main
23:37:50.259 System.out kim.hsl.coroutine I 收集元素 2 , 当前线程 main
23:37:50.259 System.out kim.hsl.coroutine I 发射元素 2 , 当前线程 main
23:37:50.600 System.out kim.hsl.coroutine I 收集元素 3 , 当前线程 main
23:37:50.600 System.out kim.hsl.coroutine I 发射元素 3 , 当前线程 main
23:37:50.973 System.out kim.hsl.coroutine I 收集元素 4 , 当前线程 main
23:37:50.973 System.out kim.hsl.coroutine I 发射元素 4 , 当前线程 main
23:37:51.352 System.out kim.hsl.coroutine I 收集元素 5 , 当前线程 main
23:37:51.353 System.out kim.hsl.coroutine I 发射元素 5 , 当前线程 main
23:37:51.353 System.out kim.hsl.coroutine I 收集元素耗时 2284 ms
调用 Flow#buffer 函数 , 为 收集元素 添加一个缓冲 , 可以指定缓冲区个数 ;
代码示例 :
package kim.hsl.coroutine
import android.os.Bundle
import androidx.appcompat.app.AppCompatActivity
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.buffer
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.runBlocking
import kotlin.system.measureTimeMillis
class MainActivity : AppCompatActivity() {
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_main)
// 将主线程包装成协程
runBlocking {
val delta = measureTimeMillis {
// 以 200 ms 的间隔收集元素
// 发射元素的效率 高于 收集元素的效率, 此时会产生背压
flowEmit().buffer(10).collect {
delay(200)
println("收集元素 $it , 当前线程 ${Thread.currentThread().name}")
}
}
println("收集元素耗时 $delta ms")
}
}
suspend fun flowEmit() = flow<Int> {
// 以 100 ms 的间隔发射元素
for (i in 0..5) {
delay(100)
emit(i)
println("发射元素 $i , 当前线程 ${Thread.currentThread().name}")
}
}
}
执行结果 : 发射元素后 , 将发射的元素缓存起来 , 然后慢慢接收元素 ;
23:39:41.401 System.out kim.hsl.coroutine I 发射元素 0 , 当前线程 main
23:39:41.543 System.out kim.hsl.coroutine I 发射元素 1 , 当前线程 main
23:39:41.644 System.out kim.hsl.coroutine I 收集元素 0 , 当前线程 main
23:39:41.646 System.out kim.hsl.coroutine I 发射元素 2 , 当前线程 main
23:39:41.760 System.out kim.hsl.coroutine I 发射元素 3 , 当前线程 main
23:39:41.877 System.out kim.hsl.coroutine I 收集元素 1 , 当前线程 main
23:39:41.879 System.out kim.hsl.coroutine I 发射元素 4 , 当前线程 main
23:39:42.022 System.out kim.hsl.coroutine I 发射元素 5 , 当前线程 main
23:39:42.120 System.out kim.hsl.coroutine I 收集元素 2 , 当前线程 main
23:39:42.364 System.out kim.hsl.coroutine I 收集元素 3 , 当前线程 main
23:39:42.572 System.out kim.hsl.coroutine I 收集元素 4 , 当前线程 main
23:39:42.814 System.out kim.hsl.coroutine I 收集元素 5 , 当前线程 main
23:39:42.821 System.out kim.hsl.coroutine I 收集元素耗时 1601 ms
上述 发射元素 和 收集元素 都是在同一个线程中执行的 , 这两个操作可以并行执行 , 即使用 flowOn 指定收集元素的线程 ;
使用 flowOn 更改了协程上下文 , 使得 发射元素 与 收集元素 在不同的线程中并行执行 ;
代码示例 :
package kim.hsl.coroutine
import android.os.Bundle
import androidx.appcompat.app.AppCompatActivity
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.buffer
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.flowOn
import kotlinx.coroutines.runBlocking
import kotlin.system.measureTimeMillis
class MainActivity : AppCompatActivity() {
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_main)
// 将主线程包装成协程
runBlocking {
val delta = measureTimeMillis {
// 以 200 ms 的间隔收集元素
// 发射元素的效率 高于 收集元素的效率, 此时会产生背压
flowEmit().flowOn(Dispatchers.Default).collect {
delay(200)
println("收集元素 $it , 当前线程 ${Thread.currentThread().name}")
}
}
println("收集元素耗时 $delta ms")
}
}
suspend fun flowEmit() = flow<Int> {
// 以 100 ms 的间隔发射元素
for (i in 0..5) {
delay(100)
emit(i)
println("发射元素 $i , 当前线程 ${Thread.currentThread().name}")
}
}
}
执行结果 :
23:45:19.675 System.out kim.hsl.coroutine I 发射元素 0 , 当前线程 DefaultDispatcher-worker-1
23:45:19.817 System.out kim.hsl.coroutine I 发射元素 1 , 当前线程 DefaultDispatcher-worker-1
23:45:19.918 System.out kim.hsl.coroutine I 收集元素 0 , 当前线程 main
23:45:19.921 System.out kim.hsl.coroutine I 发射元素 2 , 当前线程 DefaultDispatcher-worker-1
23:45:20.046 System.out kim.hsl.coroutine I 发射元素 3 , 当前线程 DefaultDispatcher-worker-1
23:45:20.124 System.out kim.hsl.coroutine I 收集元素 1 , 当前线程 main
23:45:20.186 System.out kim.hsl.coroutine I 发射元素 4 , 当前线程 DefaultDispatcher-worker-1
23:45:20.292 System.out kim.hsl.coroutine I 发射元素 5 , 当前线程 DefaultDispatcher-worker-1
23:45:20.333 System.out kim.hsl.coroutine I 收集元素 2 , 当前线程 main
23:45:20.548 System.out kim.hsl.coroutine I 收集元素 3 , 当前线程 main
23:45:20.790 System.out kim.hsl.coroutine I 收集元素 4 , 当前线程 main
23:45:21.000 System.out kim.hsl.coroutine I 收集元素 5 , 当前线程 main
23:45:21.007 System.out kim.hsl.coroutine I 收集元素耗时 1507 ms
从提高收集元素效率方向解决背压问题 :
代码示例 :
package kim.hsl.coroutine
import android.os.Bundle
import androidx.appcompat.app.AppCompatActivity
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.runBlocking
import kotlin.system.measureTimeMillis
class MainActivity : AppCompatActivity() {
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_main)
// 将主线程包装成协程
runBlocking {
val delta = measureTimeMillis {
// 以 200 ms 的间隔收集元素
// 发射元素的效率 高于 收集元素的效率, 此时会产生背压
flowEmit().conflate().collect {
delay(200)
println("收集元素 $it , 当前线程 ${Thread.currentThread().name}")
}
}
println("收集元素耗时 $delta ms")
}
}
suspend fun flowEmit() = flow<Int> {
// 以 100 ms 的间隔发射元素
for (i in 0..5) {
delay(100)
emit(i)
println("发射元素 $i , 当前线程 ${Thread.currentThread().name}")
}
}
}
执行结果 : 发射了 6 个元素 , 但是只接收到了 5 个元素 , 元素 2 被过滤掉了 ;
23:49:21.720 System.out kim.hsl.coroutine I 发射元素 0 , 当前线程 main
23:49:21.855 System.out kim.hsl.coroutine I 发射元素 1 , 当前线程 main
23:49:21.924 System.out kim.hsl.coroutine I 收集元素 0 , 当前线程 main
23:49:21.992 System.out kim.hsl.coroutine I 发射元素 2 , 当前线程 main
23:49:22.129 System.out kim.hsl.coroutine I 发射元素 3 , 当前线程 main
23:49:22.130 System.out kim.hsl.coroutine I 收集元素 1 , 当前线程 main
23:49:22.270 System.out kim.hsl.coroutine I 发射元素 4 , 当前线程 main
23:49:22.333 System.out kim.hsl.coroutine I 收集元素 3 , 当前线程 main
23:49:22.374 System.out kim.hsl.coroutine I 发射元素 5 , 当前线程 main
23:49:22.564 System.out kim.hsl.coroutine I 收集元素 4 , 当前线程 main
23:49:22.805 System.out kim.hsl.coroutine I 收集元素 5 , 当前线程 main
23:49:22.814 System.out kim.hsl.coroutine I 收集元素耗时 1277 ms
代码示例 :
package kim.hsl.coroutine
import android.os.Bundle
import androidx.appcompat.app.AppCompatActivity
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.runBlocking
import kotlin.system.measureTimeMillis
class MainActivity : AppCompatActivity() {
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_main)
// 将主线程包装成协程
runBlocking {
val delta = measureTimeMillis {
// 以 200 ms 的间隔收集元素
// 发射元素的效率 高于 收集元素的效率, 此时会产生背压
flowEmit().collectLatest {
delay(200)
println("收集元素 $it , 当前线程 ${Thread.currentThread().name}")
}
}
println("收集元素耗时 $delta ms")
}
}
suspend fun flowEmit() = flow<Int> {
// 以 100 ms 的间隔发射元素
for (i in 0..5) {
delay(100)
emit(i)
println("发射元素 $i , 当前线程 ${Thread.currentThread().name}")
}
}
}
执行结果 : 只接收了最后一个元素 , 前几个元素没有接收 ;
23:53:01.328 System.out kim.hsl.coroutine I 发射元素 0 , 当前线程 main
23:53:01.461 System.out kim.hsl.coroutine I 发射元素 1 , 当前线程 main
23:53:01.603 System.out kim.hsl.coroutine I 发射元素 2 , 当前线程 main
23:53:01.712 System.out kim.hsl.coroutine I 发射元素 3 , 当前线程 main
23:53:01.857 System.out kim.hsl.coroutine I 发射元素 4 , 当前线程 main
23:53:02.004 System.out kim.hsl.coroutine I 发射元素 5 , 当前线程 main
23:53:02.246 System.out kim.hsl.coroutine I 收集元素 5 , 当前线程 main
23:53:02.255 System.out kim.hsl.coroutine I 收集元素耗时 1119 ms