首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Kotlin协程-特殊的阻塞协程

Kotlin协程-特殊的阻塞协程

作者头像
PhoenixZheng
发布2021-05-17 12:22:39
2.2K0
发布2021-05-17 12:22:39
举报

阻塞协程是种特殊的协程启动方式,一般是用 runBlocking{} 扩起来一段协程。

fun main() = runBlocking {
    launch {
        println("launch start")
        delay(100L) // 非阻塞的等待 1 秒钟(默认时间单位是毫秒)
        println("World Thread: ${Thread.currentThread().name}")
        println("World!") // 在延迟后打印输出
    }
    println("Hello!")
    println("Hello Thread: ${Thread.currentThread().name}")
    Thread.sleep(400L) // 阻塞主线程 2 秒钟来保证 JVM 存活
    println("out launch done")
}

这段代码的执行结果是

Hello! Hello Thread: main out launch done launch start World Thread: main World!

代码包含了runBlocking{}和launch{}两段coroutine,父子关系。首先是父协程得到执行,然后才是子协程。

重点是这两段协程都在同一个线程main里完成。这里就带来一个有趣的问题, runBLocking{}和平时常用的launch有什么区别?

甚至你可以把上面的launch{},改成 GlobalScope.launch{},看看结果有什么不一样。这里给出结果,改用GlobalScope.launch之后,子协程会在一个独立的线程里运行。

runBlocking

在kotlin协程官网上对于这个api的解释是桥接阻塞与非阻塞的世界。这个机翻中文让我迷惑了很久,一直不能明白它的意思。于是就去翻了源码的注释,

/**
 * Runs a new coroutine and **blocks** the current thread _interruptibly_ until its completion.
 * This function should not be used from a coroutine. It is designed to bridge regular blocking code
 * to libraries that are written in suspending style, to be used in `main` functions and in tests.
 ...
 */
@Throws(InterruptedException::class)
public fun <T> runBlocking(context: CoroutineContext = EmptyCoroutineContext, block: suspend CoroutineScope.() -> T): T {

blablabla一堆,意思是也跟"桥接阻塞与非阻塞的世界"差不多,只是多了一句“会阻塞当前线程直到coroutine完成”。但实际情况跟注释有点不同,如果在 runBlocking 中开一个 GlobalScope.launch,并且在里面延时很久,那么外面的线程其实是不会等待 GlobalScope 里的协程完成的。弄明白这点需要理解这个特殊的阻塞协程 runBlocking 的原理。

创建

runBlocking的创建在jvm包下的Builders.kt中,

public fun <T> runBlocking(context: CoroutineContext = EmptyCoroutineContext, block: suspend CoroutineScope.() -> T): T {
    val currentThread = Thread.currentThread()
    val contextInterceptor = context[ContinuationInterceptor]
    val eventLoop: EventLoop?
    val newContext: CoroutineContext
    if (contextInterceptor == null) {
        // create or use private event loop if no dispatcher is specified
        eventLoop = ThreadLocalEventLoop.eventLoop //默认派发器
        newContext = GlobalScope.newCoroutineContext(context + eventLoop)
    } else {
        // See if context's interceptor is an event loop that we shall use (to support TestContext)
        // or take an existing thread-local event loop if present to avoid blocking it (but don't create one)
        eventLoop = (contextInterceptor as? EventLoop)?.takeIf { it.shouldBeProcessedFromContext() } //继承自上下文的派发器
            ?: ThreadLocalEventLoop.currentOrNull()
        newContext = GlobalScope.newCoroutineContext(context)
    }
    val coroutine = BlockingCoroutine<T>(newContext, currentThread, eventLoop)
    coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
    return coroutine.joinBlocking() 
}

首先它会判断当前是否上下文有现成的Dispatcher,或者叫Intercepter,如果有的话就直接拿过来。没有的话就使用默认的eventloop。EventLoop是协程里对阻塞型coroutine进行调度的默认调度器。runBlocking和launch的主要区别就靠EventLoop实现。

在创建完coroutine后就进入派发流程了,这部分和Kotlin协程-一个协程的生命周期中的逻辑比较相似,下面也会讲到。

最后会调用 joinBlocking() 去执行coroutine,我们放到第三部分执行分析。

派发

EventLoop是一个特殊的调度类型。它的公用实现在 EventLoop.common.kt 中,

@ThreadLocal
internal object ThreadLocalEventLoop {
    private val ref = CommonThreadLocal<EventLoop?>()

    internal val eventLoop: EventLoop //eventloop对象
        get() = ref.get() ?: createEventLoop().also { ref.set(it) }

    internal fun currentOrNull(): EventLoop? =
        ref.get()

    internal fun resetEventLoop() {
        ref.set(null)
    }

    internal fun setEventLoop(eventLoop: EventLoop) {
        ref.set(eventLoop)
    }
}

createEventLoop()是个expect函数,用来获取平台上的实际实现。函数声明也在这个文件中,

internal expect fun createEventLoop(): EventLoop

而eventloop对象,是保存在ThreadLocal中的,意味着这个对象在每个线程里都会有一个,而且互不影响。每个线程都可以起一个独立的阻塞协程队列。

在jvm平台上的eventloop对象是在jvm包下的EventLoop.kt中,它的默认实现是 BlockingEventLoop

internal class BlockingEventLoop(
    override val thread: Thread
) : EventLoopImplBase()

internal actual fun createEventLoop(): EventLoop = BlockingEventLoop(Thread.currentThread())

按惯例最后会去执行派发器的dispatch()方法,因为有了之前的分析经验,这里直接到它的dispatch()函数,

public final override fun dispatch(context: CoroutineContext, block: Runnable) = enqueue(block) //重载 dispatch函数,调用入队函数

public fun enqueue(task: Runnable) {
    if (enqueueImpl(task)) { //入队
        // todo: we should unpark only when this delayed task became first in the queue
        unpark()
    } else {
        DefaultExecutor.enqueue(task)
    }
}

@Suppress("UNCHECKED_CAST")
private fun enqueueImpl(task: Runnable): Boolean { //真正入队
    _queue.loop { queue ->
        if (isCompleted) return false // fail fast if already completed, may still add, but queues will close
        when (queue) {
            null -> if (_queue.compareAndSet(null, task)) return true //在这里入队
            is Queue<*> -> {
                when ((queue as Queue<Runnable>).addLast(task)) {
                    Queue.ADD_SUCCESS -> return true
                    Queue.ADD_CLOSED -> return false
                    Queue.ADD_FROZEN -> _queue.compareAndSet(queue, queue.next())
                }
            }
            else -> when {
                queue === CLOSED_EMPTY -> return false
                else -> {
                    // update to full-blown queue to add one more
                    val newQueue = Queue<Runnable>(Queue.INITIAL_CAPACITY, singleConsumer = true)
                    newQueue.addLast(queue as Runnable)
                    newQueue.addLast(task)
                    if (_queue.compareAndSet(queue, newQueue)) return true
                }
            }
        }
    }
}

BlockingEventLoop 的入队函数 enqueueImpl 逻辑比较简单,通过when判断queue的类型走不同的逻辑。实际上这段逻辑还不稳定,仔细分析会发现,queue 在blocking eventloop 的场景下,只会有 null一种可能。所以它的入队,实际上最后都会走这段代码。

null -> if (_queue.compareAndSet(null, task)) return true

执行

回到上面的创建阶段,最后会执行 joinBlocking

   fun joinBlocking(): T {
        registerTimeLoopThread()
        try {
            eventLoop?.incrementUseCount()
            try {
                while (true) {
                    @Suppress("DEPRECATION")
                    if (Thread.interrupted()) throw InterruptedException().also { cancelCoroutine(it) }
                    val parkNanos = eventLoop?.processNextEvent() ?: Long.MAX_VALUE //执行队列里的下一个任务
                    // note: process next even may loose unpark flag, so check if completed before parking
                    if (isCompleted) break
                    parkNanos(this, parkNanos)
                }
            } finally { // paranoia
                eventLoop?.decrementUseCount()
            }
        } finally { // paranoia
            unregisterTimeLoopThread()
        }
        // now return result
        val state = this.state.unboxState()
        (state as? CompletedExceptionally)?.let { throw it.cause }
        return state as T
    }

processNextEvent()会从上面的queue中取出任务并且执行。因为eventloop在jvm上的实现是BlockingEventLoop,它的父类是 EventLoopImplBase,在 EventLoop.common.kt 中,

override fun processNextEvent(): Long {
    // unconfined events take priority
    if (processUnconfinedEvent()) return nextTime
    // queue all delayed tasks that are due to be executed
    val delayed = _delayed.value
    if (delayed != null && !delayed.isEmpty) { //判断是否到延时时间,否则重新入队
        val now = nanoTime()
        while (true) {
            // make sure that moving from delayed to queue removes from delayed only after it is added to queue
            // to make sure that 'isEmpty' and `nextTime` that check both of them
            // do not transiently report that both delayed and queue are empty during move
            delayed.removeFirstIf {
                if (it.timeToExecute(now)) {//重新入队
                    enqueueImpl(it)
                } else
                    false
            } ?: break // quit loop when nothing more to remove or enqueueImpl returns false on "isComplete"
        }
    }
    // then process one event from queue
    dequeue()?.run() //出队并执行
    return nextTime
}

dequeue()的实现也相对简单,跟入队的逻辑差不多

@Suppress("UNCHECKED_CAST")
private fun dequeue(): Runnable? {
    _queue.loop { queue ->
        when (queue) {
            null -> return null
            is Queue<*> -> {
                val result = (queue as Queue<Runnable>).removeFirstOrNull()
                if (result !== Queue.REMOVE_FROZEN) return result as Runnable?
                _queue.compareAndSet(queue, queue.next())
            }
            else -> when {
                queue === CLOSED_EMPTY -> return null
                else -> if (_queue.compareAndSet(queue, null)) return queue as Runnable //出队并把当前queue设为null
            }
        }
    }
}

上面说过,在BlockingEventLoop场景下,queue的入队只会有null一种可能。而这里也是一样,只会从else进去。

虽然queue名义上是个队列,它也支持队列的逻辑,比如在 is Queue<*> 这个分支上,它的实现是个队列。但现在可以把它当做个容量为1的队列。

之后就是task.run的流程了,和之前的分析没什么区别。

BlockingEventLoop的特殊性

上面的分析可以看出一个问题,queue不是个队列,而且每次它都只会在 null->task 之间转换。也就是说,不管什么时候,queue的长度只会是1或者0.

这个问说明,runBLocking{}这种协程,它的运行逻辑是先把父协程放队列里,然后取出来执行,执行完毕再把子协程入队,再出队子协程,用同样的方式递归。虽然这种方式能保证整体是个阻塞流程,但是设计上不够优雅。猜测是为了避免协程嵌套太多,导致stack over flow的问题出现。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2021-04-29,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 Android每日一讲 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • runBlocking
  • 创建
  • 派发
  • 执行
  • BlockingEventLoop的特殊性
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档