我有以下代码:
val channel = BroadcastChannel<Event>(10)
fun setup() {
scope.launch {
channel.asFlow().
.flatMapLatest { fetchSomeData() }
.catch { emit(DefaultData()) }
.onEach { handleData() }
.collect()
}
}
fun load() {
channel.offer(Event.Load)
}如果fetchSomeData失败,异常情况下,它将被catch捕获并传递一些默认数据。问题是流本身被取消,并且正在从通道的订阅者中被删除。这意味着向信道提供的任何新事件都将被忽略,因为不再有任何订阅者。
是否有办法确保流在异常情况下不会被取消?
发布于 2020-09-21 09:08:05
您应该捕捉到fetchSomeData()的异常,因此将catch从主流移动到fetchSomeData():
scope.launch {
channel.asFlow().
.flatMapLatest { fetchSomeData().catch { emit(DefaultData()} }
.onEach { handleData() }
.collect()
}发布于 2020-09-14 14:14:02
我也面临着同样的问题。我的解决办法是这样的:
/* Custom onEach extension function */
fun <T> Flow<T>.onEachCatching(block: suspend (T) -> Unit) = OnEachCatching(this, block)
class OnEachCatching<T>(private val src: Flow<T>, private val block: suspend (T) -> Unit, bufferCapacity: Int = Channel.CONFLATED) {
private val okValue = Channel<T>(bufferCapacity)
private var failBlock: (suspend (Throwable) -> Unit)? = null
init {
GlobalScope.launch {
src.collect { value ->
runCatching { block(value) }
.onFailure { failBlock?.invoke(it) }
.onSuccess { okValue.send(value) }
}
okValue.close()
}
}
fun onFailure(block: suspend (Throwable) -> Unit) = this.also {
failBlock = block
}
fun resumeFlow() = okValue.consumeAsFlow()
}用法:
someData
.onEachCatching { handleData() }
.onFailure { emit(DefaultData()) }
.resumeFlow()
.collect()发布于 2021-01-02 20:18:05
我的任务是简单地重新启动收集流
private fun startParsingMessages() {
coroutineScope?.launch {
sessionController.subscribeToMessages()
.onEach { /*code block*/ }
.catch {
it.cause
?.let { error -> Timber.e(error) }
?: Timber.e("startSession(): ${it.message}")
startParsingMessages() //here
}
.collect()
}
}https://stackoverflow.com/questions/60039256
复制相似问题