Flow 流 收集元素 完成后 , 需要进行收尾工作 , 如释放资源等 ;
Flow 流 在执行时可能出现以下两种情况 :
Flow 流收尾工作可以借助以下方案执行 :
在 onCompletion 代码块中进行收尾 时 , 如果是 因为异常导致 Flow 流收集元素失败 , 则可以 在 onCompletion 代码块中拿到异常信息 ;
onCompletion 函数原型如下 :
/**
* 在**流完成或取消后,返回一个调用给定[action] **的流
* 作为[action]原因参数的取消异常或失败。
*
* 从概念上讲,' onCompletion '类似于将流集合包装成' finally '块,
* 例如下面的命令代码片段:
*
* ```
* try {
* myFlow.collect { value ->
* println(value)
* }
* } finally {
* println("Done")
* }
* ```
*
* 可以使用' onCompletion '替换为声明性的:
*
* ```
* myFlow
* .onEach { println(it) }
* .onCompletion { println("Done") }
* .collect()
* ```
*
* 与[catch]不同,此操作符报告上游和下游都发生的异常
* 并观察为取消流而抛出的异常。异常为空当且仅当
* 流程已经完全成功地完成了。从概念上讲,以下代码:
*
* ```
* myFlow.collect { value ->
* println(value)
* }
* println("Completed successfully")
* ```
*
* can be replaced with:
*
* ```
* myFlow
* .onEach { println(it) }
* .onCompletion { if (it == null) println("Completed successfully") }
* .collect()
* ```
*
* [action]的接收者是[FlowCollector],这个操作符可以用来发出附加操作
* 元素在**结束,如果它成功完成**。例如:
*
* ```
* flowOf("a", "b", "c")
* .onCompletion { emit("Done") }
* .collect { println(it) } // prints a, b, c, Done
* ```
*
* 在失败或取消的情况下,任何发出额外元素的尝试都会引发相应的异常。
* 如果需要抑制失败并将其替换为元素的发射,则使用[catch]。
*/
public fun <T> Flow<T>.onCompletion(
action: suspend FlowCollector<T>.(cause: Throwable?) -> Unit
): Flow<T> = unsafeFlow { // 注意:这里使用的是不安全流,但安全收集器用于调用完成操作
try {
collect(this)
} catch (e: Throwable) {
/*
* 使用抛掷收集器防止任何排放物从
* 完成顺序时,下游已失败,否则可能
* 使用“finally”导致不可能的非顺序行为
*/
ThrowingCollector(e).invokeSafely(action, e)
throw e
}
// Normal completion
val sc = SafeCollector(this, currentCoroutineContext())
try {
sc.action(null)
} finally {
sc.releaseIntercepted()
}
}
代码示例 :
package kim.hsl.coroutine
import android.os.Bundle
import androidx.appcompat.app.AppCompatActivity
import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.runBlocking
class MainActivity : AppCompatActivity() {
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_main)
runBlocking {
try {
flowDemo().collect { println("收集元素 $it") }
} finally {
println("finally 代码块, 收集元素完毕")
}
}
}
suspend fun flowDemo() = (0..3).asFlow()
}
执行结果 :
I/System.out: 收集元素 0
I/System.out: 收集元素 1
I/System.out: 收集元素 2
I/System.out: 收集元素 3
I/System.out: finally 代码块, 收集元素完毕
代码示例 :
package kim.hsl.coroutine
import android.os.Bundle
import androidx.appcompat.app.AppCompatActivity
import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.onCompletion
import kotlinx.coroutines.runBlocking
class MainActivity : AppCompatActivity() {
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_main)
runBlocking {
flowDemo()
.onCompletion {
println("onCompletion 代码块, 收集元素完毕")
}
.collect {
println("收集元素 $it")
}
}
}
suspend fun flowDemo() = (0..3).asFlow()
}
执行结果 :
I/System.out: 收集元素 0
I/System.out: 收集元素 1
I/System.out: 收集元素 2
I/System.out: 收集元素 3
I/System.out: onCompletion 代码块, 收集元素完毕
在 onCompletion 代码块中进行收尾 时 , 如果是因为异常导致 Flow 流收集元素失败 , 则可以在 onCompletion 代码块中拿到异常信息 ;
注意 : 在 onCompletion 只是能获取到异常信息 , 并不能捕获该异常 , 程序该崩溃还是崩溃 ;
package kim.hsl.coroutine
import android.os.Bundle
import androidx.appcompat.app.AppCompatActivity
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.onCompletion
import kotlinx.coroutines.runBlocking
import java.io.IOException
class MainActivity : AppCompatActivity() {
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_main)
runBlocking {
flowDemo()
.onCompletion { exception ->
if (exception != null) {
println("onCompletion 代码块, 收集元素完出现异常 ${exception}")
}
}
.collect {
println("收集元素 $it")
}
}
}
suspend fun flowDemo() = flow<Int> {
for (i in 0..3) {
emit(i)
if (i == 3) {
throw IOException("发射元素 IO 异常")
}
}
}
}
执行结果 :
2022-12-27 10:01:46.472 I/System.out: 收集元素 0
2022-12-27 10:01:46.473 I/System.out: 收集元素 1
2022-12-27 10:01:46.473 I/System.out: 收集元素 2
2022-12-27 10:01:46.473 I/System.out: 收集元素 3
2022-12-27 10:01:46.474 I/System.out: onCompletion 代码块, 收集元素完出现异常 java.io.IOException: 发射元素 IO 异常
2022-12-27 10:01:46.477 D/AndroidRuntime: Shutting down VM
--------- beginning of crash
2022-12-27 10:01:46.483 E/AndroidRuntime: FATAL EXCEPTION: main
Process: kim.hsl.coroutine, PID: 29378
java.lang.RuntimeException: Unable to start activity ComponentInfo{kim.hsl.coroutine/kim.hsl.coroutine.MainActivity}: java.io.IOException: 发射元素 IO 异常
at android.app.ActivityThread.performLaunchActivity(ActivityThread.java:2951)
at android.app.ActivityThread.handleLaunchActivity(ActivityThread.java:3086)
at android.app.servertransaction.LaunchActivityItem.execute(LaunchActivityItem.java:78)
at android.app.servertransaction.TransactionExecutor.executeCallbacks(TransactionExecutor.java:108)
at android.app.servertransaction.TransactionExecutor.execute(TransactionExecutor.java:68)
at android.app.ActivityThread$H.handleMessage(ActivityThread.java:1816)
at android.os.Handler.dispatchMessage(Handler.java:106)
at android.os.Looper.loop(Looper.java:193)
at android.app.ActivityThread.main(ActivityThread.java:6718)
at java.lang.reflect.Method.invoke(Native Method)
at com.android.internal.os.RuntimeInit$MethodAndArgsCaller.run(RuntimeInit.java:493)
at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:858)
Caused by: java.io.IOException: 发射元素 IO 异常
at kim.hsl.coroutine.MainActivity$flowDemo$2.invokeSuspend(MainActivity.kt:33)
at kim.hsl.coroutine.MainActivity$flowDemo$2.invoke(Unknown Source:8)
at kim.hsl.coroutine.MainActivity$flowDemo$2.invoke(Unknown Source:4)
at kotlinx.coroutines.flow.SafeFlow.collectSafely(Builders.kt:61)
at kotlinx.coroutines.flow.AbstractFlow.collect(Flow.kt:212)
at kotlinx.coroutines.flow.FlowKt__EmittersKt$onCompletion$$inlined$unsafeFlow$1.collect(SafeCollector.common.kt:114)
at kim.hsl.coroutine.MainActivity$onCreate$1.invokeSuspend(MainActivity.kt:38)
at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:106)
at kotlinx.coroutines.EventLoopImplBase.processNextEvent(EventLoop.common.kt:277)
at kotlinx.coroutines.BlockingCoroutine.joinBlocking(Builders.kt:87)
at kotlinx.coroutines.BuildersKt__BuildersKt.runBlocking(Builders.kt:61)
at kotlinx.coroutines.BuildersKt.runBlocking(Unknown Source:1)
at kotlinx.coroutines.BuildersKt__BuildersKt.runBlocking$default(Builders.kt:40)
at kotlinx.coroutines.BuildersKt.runBlocking$default(Unknown Source:1)
at kim.hsl.coroutine.MainActivity.onCreate(MainActivity.kt:16)
at android.app.Activity.performCreate(Activity.java:7144)
at android.app.Activity.performCreate(Activity.java:7135)
at android.app.Instrumentation.callActivityOnCreate(Instrumentation.java:1271)
at android.app.ActivityThread.performLaunchActivity(ActivityThread.java:2931)
at android.app.ActivityThread.handleLaunchActivity(ActivityThread.java:3086)
at android.app.servertransaction.LaunchActivityItem.execute(LaunchActivityItem.java:78)
at android.app.servertransaction.TransactionExecutor.executeCallbacks(TransactionExecutor.java:108)
at android.app.servertransaction.TransactionExecutor.execute(TransactionExecutor.java:68)
at android.app.ActivityThread$H.handleMessage(ActivityThread.java:1816)
at android.os.Handler.dispatchMessage(Handler.java:106)
at android.os.Looper.loop(Looper.java:193)
at android.app.ActivityThread.main(ActivityThread.java:6718)
at java.lang.reflect.Method.invoke(Native Method)
at com.android.internal.os.RuntimeInit$MethodAndArgsCaller.run(RuntimeInit.java:493)
at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:858)
2022-12-27 10:01:46.509 I/Process: Sending signal. PID: 29378 SIG: 9
上面章节中介绍了 在 Flow#onCompletion 中可以执行收尾 , 同时可以查看出现的异常 , 但是无法捕获处理异常 ;
在 Flow#catch 代码块中 , 可以直接捕获异常并进行处理 ;
代码示例 :
package kim.hsl.coroutine
import android.os.Bundle
import androidx.appcompat.app.AppCompatActivity
import kotlinx.coroutines.flow.catch
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.onCompletion
import kotlinx.coroutines.runBlocking
import java.io.IOException
class MainActivity : AppCompatActivity() {
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_main)
runBlocking {
flowDemo()
.onCompletion { exception ->
if (exception != null) {
println("onCompletion 代码块, 收集元素完出现异常 ${exception}")
}
}
.catch { exception ->
println("catch 代码块, 捕获到异常 ${exception}")
}
.collect {
println("收集元素 $it")
}
}
}
suspend fun flowDemo() = flow<Int> {
for (i in 0..3) {
emit(i)
if (i == 3) {
throw IOException("发射元素 IO 异常")
}
}
}
}
执行结果 :
2022-12-27 10:06:37.168 I/System.out: 收集元素 0
2022-12-27 10:06:37.168 I/System.out: 收集元素 1
2022-12-27 10:06:37.168 I/System.out: 收集元素 2
2022-12-27 10:06:37.168 I/System.out: 收集元素 3
2022-12-27 10:06:37.169 I/System.out: onCompletion 代码块, 收集元素完出现异常 java.io.IOException: 发射元素 IO 异常
2022-12-27 10:06:37.170 I/System.out: catch 代码块, 捕获到异常 java.io.IOException: 发射元素 IO 异常