前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Kotlin协程实现原理:CoroutineScope&Job

Kotlin协程实现原理:CoroutineScope&Job

作者头像
Rouse
发布2020-11-17 15:24:34
1.5K0
发布2020-11-17 15:24:34
举报
文章被收录于专栏:Android补给站Android补给站

今天我们来聊聊Kotlin的协程Coroutine

如果你还没有接触过协程,推荐你先阅读这篇入门级文章What? 你还不知道Kotlin Coroutine?

如果你已经接触过协程,但对协程的原理存在疑惑,那么在阅读本篇文章之前推荐你先阅读下面的文章,这样能让你更全面更顺畅的理解这篇文章。

Kotlin协程实现原理:Suspend&CoroutineContext

如果你已经接触过协程,相信你都有过以下几个疑问:

  1. 协程到底是个什么东西?
  2. 协程的suspend有什么作用,工作原理是怎样的?
  3. 协程中的一些关键名称(例如:JobCoroutineDispatcherCoroutineContextCoroutineScope)它们之间到底是怎么样的关系?
  4. 协程的所谓非阻塞式挂起与恢复又是什么?
  5. 协程的内部实现原理是怎么样的?
  6. ...

接下来的一些文章试着来分析一下这些疑问,也欢迎大家一起加入来讨论。

CoroutineScope

CoroutineScope是什么?如果你觉得陌生,那么GlobalScopelifecycleScopeviewModelScope相信就很熟悉了吧(当然这个是针对于Android开发者)。它们都实现了CoroutineScope接口。

代码语言:javascript
复制
public interface CoroutineScope {
    /**
     * The context of this scope.
     * Context is encapsulated by the scope and used for implementation of coroutine builders that are extensions on the scope.
     * Accessing this property in general code is not recommended for any purposes except accessing the [Job] instance for advanced usages.
     *
     * By convention, should contain an instance of a [job][Job] to enforce structured concurrency.
     */
    public val coroutineContext: CoroutineContext
}

CoroutineScope中只包含一个待实现的变量CoroutineContext,至于CoroutineContext之前的文章已经分析了它的内部结构,这里就不再累赘了。

通过它的结构,我们可以认为它是提供CoroutineContext的容器,保证CoroutineContext能在整个协程运行中传递下去,约束CoroutineContext的作用边界。

例如,在Android中使用协程来请求数据,当接口还没有请求完成时Activity就已经退出了,这时如果不停止正在运行的协程将会造成不可预期的后果。所以在Activity中我们都推荐使用lifecycleScope来启动协程,lifecycleScope可以让协程具有与Activity一样的生命周期意识。

下面是lifecycleScope源码:

代码语言:javascript
复制
val LifecycleOwner.lifecycleScope: LifecycleCoroutineScope
    get() = lifecycle.coroutineScope
    
val Lifecycle.coroutineScope: LifecycleCoroutineScope
    get() {
        while (true) {
            val existing = mInternalScopeRef.get() as LifecycleCoroutineScopeImpl?
            if (existing != null) {
                return existing
            }
            val newScope = LifecycleCoroutineScopeImpl(
                this,
                SupervisorJob() + Dispatchers.Main.immediate
            )
            if (mInternalScopeRef.compareAndSet(null, newScope)) {
                newScope.register()
                return newScope
            }
        }
    }

它创建了一个LifecycleCoroutineScopeImpl实例,它实现了CoroutineScope接口,同时传入SupervisorJob() + Dispatchers.Main作为它的CoroutineContext

我们再来看它的register()方法

代码语言:javascript
复制
internal class LifecycleCoroutineScopeImpl(
    override val lifecycle: Lifecycle,
    override val coroutineContext: CoroutineContext
) : LifecycleCoroutineScope(), LifecycleEventObserver {
    init {
        // in case we are initialized on a non-main thread, make a best effort check before
        // we return the scope. This is not sync but if developer is launching on a non-main
        // dispatcher, they cannot be 100% sure anyways.
        if (lifecycle.currentState == Lifecycle.State.DESTROYED) {
            coroutineContext.cancel()
        }
    }
 
    fun register() {
        // TODO use Main.Immediate once it is graduated out of experimental.
        launch(Dispatchers.Main) {
            if (lifecycle.currentState >= Lifecycle.State.INITIALIZED) {
                lifecycle.addObserver(this@LifecycleCoroutineScopeImpl)
            } else {
                coroutineContext.cancel()
            }
        }
    }
 
    override fun onStateChanged(source: LifecycleOwner, event: Lifecycle.Event) {
        if (lifecycle.currentState <= Lifecycle.State.DESTROYED) {
            lifecycle.removeObserver(this)
            coroutineContext.cancel()
        }
    }

register方法中通过经典的launch来创建一个协程,而launch使用到的CoroutineContext就是CoroutineSope中的CoroutineContext。然后在协程中结合JetpackLifecycle特性来监听Activiyt的生命周期。

如果对Lifecycle的使用与特性还不是很了解的,推荐阅读这篇入门级文章Android Architecture Components Part3:Lifecycle

意思就是说在Activity销毁的时候会调用下面的方法取消协程的运行。

代码语言:javascript
复制
coroutineContext.cancel()

这里就使用到了CoroutineContext,经过上篇文章的分析我们很容易知道CoroutineContext自身是没有cancel方法的,所以这个cancel方法是CoroutineContext的扩展方法。

代码语言:javascript
复制
public fun CoroutineContext.cancel(): Unit {
    this[Job]?.cancel()
}

所以真正的逻辑是从CoroutineContex集合中取出KeyJob的实例,这个对应的就是上面创建LifecycleCoroutineScopeImpl实例时传入的SupervisorJob,它是CoroutineContext的其中一个子类。

这时再来看lifecycleScope相关的一些方法

代码语言:javascript
复制
lifecycleScope.launchWhenCreated {  }
lifecycleScope.launchWhenStarted {  }
lifecycleScope.launchWhenResumed {  }

这些方法的内部逻辑就很明显了,也就是通过Lifecycle来追踪Activity的生命周期,从而约束协程运行的时机。

我们也可以不使用lifecycleScope,自己实现一个CoroutineScope,让它在Activity达到同样的效果。

代码语言:javascript
复制
 class MyActivity : AppCompatActivity(), CoroutineScope {
     lateinit var job: Job
     override val coroutineContext: CoroutineContext
         get() = Dispatchers.Main + job

     override fun onCreate(savedInstanceState: Bundle?) {
         super.onCreate(savedInstanceState)
         job = Job()
     }
 
     override fun onDestroy() {
         super.onDestroy()
         job.cancel() // Cancel job on activity destroy. After destroy all children jobs will be cancelled automatically
     }
 
     /*
      * Note how coroutine builders are scoped: if activity is destroyed or any of the launched coroutines
      * in this method throws an exception, then all nested coroutines are cancelled.
      */
     fun loadDataFromUI() = launch { // <- extension on current activity, launched in the main thread
        val ioData = async(Dispatchers.IO) { // <- extension on launch scope, launched in IO dispatcher
            // blocking I/O operation
        }
        // do something else concurrently with I/O
        val data = ioData.await() // wait for result of I/O
        draw(data) // can draw in the main thread
     }
 }

上面的实现也能够保证当前Activiyt中的协程在Activity销毁的时候终止协程的运行。

到这里CoroutineScope的作用就呼之欲出了,它就是用来约束协程的边界,能够很好的提供对应的协程取消功能,保证协程的运行范围。

当然这又引申出另外一个话题

Job是什么?

Job

基本上每启动一个协程就会产生对应的Job,例如

代码语言:javascript
复制
lifecycleScope.launch {
}

launch返回的就是一个Job,它可以用来管理协程,一个Job中可以关联多个子Job,同时它也提供了通过外部传入parent的实现

代码语言:javascript
复制
public fun Job(parent: Job? = null): Job = JobImpl(parent)

这个很好理解,当传入parent时,此时的Job将会作为parent的子Job

既然Job是来管理协程的,那么它提供了六种状态来表示协程的运行状态。

  1. New: 创建
  2. Active: 运行
  3. Completing: 已经完成等待自身的子协程
  4. Completed: 完成
  5. Cancelling: 正在进行取消或者失败
  6. Cancelled: 取消或失败

这六种状态Job对外暴露了三种状态,它们随时可以通过Job来获取

代码语言:javascript
复制
public val isActive: Boolean
public val isCompleted: Boolean
public val isCancelled: Boolean

所以如果你需要自己来手动管理协程,可以通过下面的方式来判断当前协程是否在运行。

代码语言:javascript
复制
while (job.isActive) {
// 协程运行中            
}

一般来说,协程创建的时候就处在Active状态,但也有特例。

例如我们通过launch启动协程的时候可以传递一个start参数

代码语言:javascript
复制
public fun CoroutineScope.launch(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> Unit
): Job {
 ...
}

如果这个start传递的是CoroutineStart.LAZY,那么它将处于New状态。可以通过调用start或者join来唤起协程进入Active状态。

下面我们来看一张简图,就能很清晰的了解Job中的六个状态间的转化过程。

代码语言:javascript
复制
                                        wait children
  +-----+ start  +--------+ complete   +-------------+  finish  +-----------+
  | New | -----> | Active | ---------> | Completing  | -------> | Completed |
  +-----+        +--------+            +-------------+          +-----------+
                   |  cancel / fail       |
                   |     +----------------+
                   |     |
                   V     V
               +------------+                           finish  +-----------+
               | Cancelling | --------------------------------> | Cancelled |
               +------------+                                   +-----------+

上面已经提及到一个Job可以有多个子Job,所以一个Job的完成都必须等待它内部所有的子Job完成;对应的cancel也是一样的。

默认情况下,如果内部的子Job发生异常,那么它对应的parent Job与它相关连的其它子Job都将取消运行。俗称连锁反应。

我们也可以改变这种默认机制,Kotlin提供了SupervisorJob来改变这种机制。这种情况还是很常见的,例如用协程请求两个接口,但并不想因为其中一个接口失败导致另外的接口也不请求,这时就可以使用SupervisorJob来改变协程的这种默认机制。

使用很简单,在我们创建CoroutineContext的时候加入SupervisorJob即可。例如在上面提到过的lifecycleScope,内部就使用到了SupervisorJob

代码语言:javascript
复制
val newScope = LifecycleCoroutineScopeImpl(
    this,
    SupervisorJob() + Dispatchers.Main
)

你也可以尝试运行下面的这个例子,然后将它的SupervisorJob替换成别的CoroutineContext再来看下效果。

代码语言:javascript
复制
fun main() = runBlocking {
    val supervisor = SupervisorJob()
    with(CoroutineScope(coroutineContext + supervisor)) {
        // 启动第一个子作业——这个示例将会忽略它的异常(不要在实践中这么做!)
        val firstChild = launch(CoroutineExceptionHandler { _, _ ->  }) {
            println("The first child is failing")
            throw AssertionError("The first child is cancelled")
        }
        // 启动第二个子作业
        val secondChild = launch {
            firstChild.join()
            // 取消了第一个子作业且没有传播给第二个子作业
            println("The first child is cancelled: ${firstChild.isCancelled}, but the second one is still active")
            try {
                delay(Long.MAX_VALUE)
            } finally {
                // 但是取消了监督的传播
                println("The second child is cancelled because the supervisor was cancelled")
            }
        }
        // 等待直到第一个子作业失败且执行完成
        firstChild.join()
        println("Cancelling the supervisor")
        supervisor.cancel()
        secondChild.join()
    }
}

如果有些任务你并不想被手动取消,可以使用NonCancellable作为任务的CoroutineContext

如果需要Job获取协程的返回结果,可以通过Deferred来实现,它是Job的一个子类,所以也拥有Job所用功能。同时额外提供await方法来等待协程结果的返回。

Deferred可以通过CoroutineScope.async创建。

最后我们再来介绍下Job的几个方法,startcancel就不多说了,分别是启动与取消。

invokeOnCompletion

这个方法是Job的回调通知,当Job执行完后会调用这个方法

代码语言:javascript
复制
public fun invokeOnCompletion(handler: CompletionHandler): DisposableHandle
 
public typealias CompletionHandler = (cause: Throwable?) -> Unit

这个cause有三种情况分别为:

  1. is null: 协程正常执行完毕
  2. is CancellationException: 协程正常取消,并非异常导致的取消
  3. Otherwise: 协程发生异常

同时它的返回值DisposableHandle可以用来取消回调的监听。

join

代码语言:javascript
复制
public suspend fun join()

注意这是一个suspend函数,所以它只能在suspend或者coroutine中进行调用。

它的作用是暂停当前运行的协程任务,立刻执行自身Job的协程任务,直到自身执行完毕之后才恢复之前的协程任务继续执行。

本篇文章主要介绍了CoroutineScope的作用与Job的相关状态演化与运用。希望对学习协程的伙伴们能够有所帮助,敬请期待后续的协程分析。

项目

android_startup: https://github.com/idisfkj/android-startup

提供一种在应用启动时能够更加简单、高效的方式来初始化组件,优化启动速度。不仅支持Jetpack App Startup的全部功能,还提供额外的同步与异步等待、线程控制与多进程支持等功能。

AwesomeGithub:https://github.com/idisfkj/AwesomeGithub

基于Github客户端,纯练习项目,支持组件化开发,支持账户密码与认证登陆。使用Kotlin语言进行开发,项目架构是基于Jetpack&DataBindingMVVM;项目中使用了ArouterRetrofitCoroutineGlideDaggerHilt等流行开源技术。

flutter_github: https://github.com/idisfkj/flutter_github

基于Flutter的跨平台版本Github客户端,与AwesomeGithub相对应。

android-api-analysis: https://github.com/idisfkj/android-api-analysis

结合详细的Demo来全面解析Android相关的知识点, 帮助读者能够更快的掌握与理解所阐述的要点。

daily_algorithm: https://github.com/idisfkj/daily_algorithm

每日一算法,由浅入深,欢迎加入一起共勉。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2020-11-13,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 Android补给站 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • CoroutineScope
  • Job
    • invokeOnCompletion
      • join
      • 项目
      相关产品与服务
      容器服务
      腾讯云容器服务(Tencent Kubernetes Engine, TKE)基于原生 kubernetes 提供以容器为核心的、高度可扩展的高性能容器管理服务,覆盖 Serverless、边缘计算、分布式云等多种业务部署场景,业内首创单个集群兼容多种计算节点的容器资源管理模式。同时产品作为云原生 Finops 领先布道者,主导开源项目Crane,全面助力客户实现资源优化、成本控制。
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档