首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >如何在异常后恢复流

如何在异常后恢复流
EN

Stack Overflow用户
提问于 2020-02-03 12:29:03
回答 3查看 3.4K关注 0票数 10

我有以下代码:

代码语言:javascript
运行
复制
val channel = BroadcastChannel<Event>(10)

fun setup() {
    scope.launch {
        channel.asFlow().
            .flatMapLatest { fetchSomeData() }
            .catch { emit(DefaultData()) }
            .onEach { handleData() }
            .collect()

    }
}

fun load() {
    channel.offer(Event.Load)      
}

如果fetchSomeData失败,异常情况下,它将被catch捕获并传递一些默认数据。问题是流本身被取消,并且正在从通道的订阅者中被删除。这意味着向信道提供的任何新事件都将被忽略,因为不再有任何订阅者。

是否有办法确保流在异常情况下不会被取消?

EN

回答 3

Stack Overflow用户

回答已采纳

发布于 2020-09-21 09:08:05

您应该捕捉到fetchSomeData()的异常,因此将catch从主流移动到fetchSomeData():

代码语言:javascript
运行
复制
    scope.launch {
        channel.asFlow().
            .flatMapLatest { fetchSomeData().catch { emit(DefaultData()} }
            .onEach { handleData() }
            .collect()

    }
票数 2
EN

Stack Overflow用户

发布于 2020-09-14 14:14:02

我也面临着同样的问题。我的解决办法是这样的:

代码语言:javascript
运行
复制
/* Custom onEach extension function */
fun <T> Flow<T>.onEachCatching(block: suspend (T) -> Unit) = OnEachCatching(this, block)

class OnEachCatching<T>(private val src: Flow<T>, private val block: suspend (T) -> Unit, bufferCapacity: Int = Channel.CONFLATED) {

    private val okValue = Channel<T>(bufferCapacity)

    private var failBlock: (suspend (Throwable) -> Unit)? = null

    init {
        GlobalScope.launch {
            src.collect { value ->
                runCatching { block(value) }
                    .onFailure { failBlock?.invoke(it) }
                    .onSuccess { okValue.send(value) }
            }

            okValue.close()
        }
    }

    fun onFailure(block: suspend (Throwable) -> Unit) = this.also {
        failBlock = block
    }

    fun resumeFlow() = okValue.consumeAsFlow()
}

用法:

代码语言:javascript
运行
复制
someData
    .onEachCatching { handleData() }
    .onFailure { emit(DefaultData()) }
    .resumeFlow()
    .collect()
票数 -1
EN

Stack Overflow用户

发布于 2021-01-02 20:18:05

我的任务是简单地重新启动收集流

代码语言:javascript
运行
复制
private fun startParsingMessages() {
    coroutineScope?.launch {
        sessionController.subscribeToMessages()
            .onEach { /*code block*/ }
            .catch {
                it.cause
                    ?.let { error -> Timber.e(error) }
                    ?: Timber.e("startSession(): ${it.message}")

                startParsingMessages() //here
            }
            .collect()
    }
}
票数 -1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/60039256

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档