我有一个CoroutineScope
和log()
函数的实例,如下所示:
private val scope = CoroutineScope(Dispatchers.IO)
fun log(message: String) = scope.launch { // launching a coroutine
println("$message")
TimeUnit.MILLISECONDS.sleep(100) // some blocking operation
}
我使用这个测试代码来启动协同:
repeat(5) { item ->
log("Log $item")
}
log()
函数可以从任何地方调用,也可以在任何Thread
中调用,但不能从协同线调用。
经过几次测试后,我可以看到以下不连续的结果:
Log 0
Log 2
Log 4
Log 1
Log 3
打印日志可以有不同的顺序。如果我正确理解了协同执行并不能保证是连续的。它的意思是,item 2
的协同线可以在item 0
的协同线之前启动。
我希望为每个项目依次启动协同机制,并且“一些阻塞操作”将按顺序执行,以始终实现下一个日志:
Log 0
Log 1
Log 2
Log 3
Log 4
有什么方法可以使发射协同系统连续进行吗?或者也许还有其他方法来实现我想要的?
提前感谢您的帮助!
发布于 2022-05-07 13:27:24
一种可能的策略是使用一个频道来有序地加入已启动的工作。您需要懒洋洋地启动这些作业,以便在调用join
之前不会启动它们。当频道容量无限时,trySend
总是成功的。您需要使用trySend
,以便可以从协同线之外调用它。
private val lazyJobChannel = Channel<Job>(capacity = Channel.UNLIMITED).apply {
scope.launch {
consumeEach { it.join() }
}
}
fun log(message: String) {
lazyJobChannel.trySend(
scope.launch(start = CoroutineStart.LAZY) {
println("$message")
TimeUnit.MILLISECONDS.sleep(100) // some blocking operation
}
)
}
发布于 2022-05-21 19:00:08
由于Flow
s are sequential,我们可以使用MutableSharedFlow
顺序地收集和处理数据:
class Info {
// make sure replay(in case some jobs were emitted before sharedFlow is being collected and could be lost)
// and extraBufferCapacity are large enough to handle all the jobs.
// In case some jobs are lost try to increase either of the values.
private val sharedFlow = MutableSharedFlow<String>(replay = 10, extraBufferCapacity = 10)
private val scope = CoroutineScope(Dispatchers.IO)
init {
sharedFlow.onEach { message ->
println("$message")
TimeUnit.MILLISECONDS.sleep(100) // some blocking or suspend operation
}.launchIn(scope)
}
fun log(message: String) {
sharedFlow.tryEmit(message)
}
}
fun test() {
val info = Info()
repeat(10) { item ->
info.log("Log $item")
}
}
它总是按照正确的顺序打印日志:
Log 0
Log 1
Log 2
...
Log 9
它适用于所有情况,但需要确保有足够的元素设置为replay
和extraBufferCapacity
参数的MutableSharedFlow
,以处理所有项目。
另一种方法是
使用Dispatchers.IO.limitedParallelism(1)
作为CoroutineScope
的上下文。如果协同器不包含对suspend
函数的调用并从相同的线程(例如Main Thread )启动,它们就会按顺序运行。因此,该解决方案仅适用于launch
协同构建器中的阻塞(而不是launch
)操作:
private val scope = CoroutineScope(Dispatchers.IO.limitedParallelism(1))
fun log(message: String) = scope.launch { // launching a coroutine from the same Thread, e.g. Main Thread
println("$message")
TimeUnit.MILLISECONDS.sleep(100) // only blocking operation, not `suspend` operation
}
结果表明,单线程调度程序是FIFO执行器。因此,将CoroutineScope
执行限制在一个线程上可以解决这个问题。
https://stackoverflow.com/questions/72152449
复制相似问题