Coroutine是kotlin官方文档上推荐的,个人理解,其实就是一个轻量级的线程库 使用前加依赖
implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-core:1.3.1'
import kotlinx.coroutines.*
fun main() = runBlocking<Unit> { // start main coroutine
GlobalScope.launch { // launch a new coroutine in background and continue
delay(1000L)
println("World!")
}
println("Hello,") // main coroutine continues here immediately
delay(2000L) // delaying for 2 seconds to keep JVM alive
}
GlobalScope.launch是最高等级的coroutine(类似于守护进程)。 但如果delay太久呢?久到runBlocking已经结束了还没走完。由此可见,GlobalScope.launch不方便管理,可以用launch取代。 只有launch里面全部执行完,整个runBlocking才能结束,因此,用了launch以后delay也就不需要了
fun main() = runBlocking {
val job = launch {
repeat(1000) { i ->
println("job: I'm sleeping $i ...")
delay(500L)
}
}
delay(1300L) // delay a bit
println("main: I'm tired of waiting!")
job.cancel() // cancels the job
job.join() // waits for job's completion
println("main: Now I can quit.")
}
fun main() = runBlocking {
val startTime = System.currentTimeMillis()
val job = launch(Dispatchers.Default) {
var nextPrintTime = startTime
var i = 0
while (i < 5) { // computation loop, just wastes CPU
// print a message twice a second
if (System.currentTimeMillis() >= nextPrintTime) {
println("job: I'm sleeping ${i++} ...")
nextPrintTime += 500L
}
}
}
delay(1300L) // delay a bit
println("main: I'm tired of waiting!")
job.cancelAndJoin() // cancels the job and waits for its completion
println("main: Now I can quit.")
}
为什么第一个可以取消,第二个不可以,因为第一个有delay,delay是suspend函数(注意,所有的suspend函数都可以cancel) 如何让第二个可以取消。第一种,手动调用yield方法。第二种,用isActive方法 取消suspend函数会抛出CancellationException,记得 try {...} finally {...}
launch返回一个job,无其他信息。async返回Deferred,有点类似于Future。使用 .await()来获取最终数据
import kotlinx.coroutines.*
import kotlin.system.*
fun main() = runBlocking<Unit> {
val time = measureTimeMillis {
val one = async { doSomethingUsefulOne() }
val two = async { doSomethingUsefulTwo() }
println("The answer is ${one.await() + two.await()}")
}
println("Completed in $time ms")
}
suspend fun doSomethingUsefulOne(): Int {
delay(1000L) // pretend we are doing something useful here
return 13
}
suspend fun doSomethingUsefulTwo(): Int {
delay(1000L) // pretend we are doing something useful here, too
return 29
}
当然,也可以lazy start
fun main() = runBlocking<Unit> {
val time = measureTimeMillis {
val one = async(start = CoroutineStart.LAZY) { doSomethingUsefulOne() }
val two = async(start = CoroutineStart.LAZY) { doSomethingUsefulTwo() }
// some computation
one.start() // start the first one
two.start() // start the second one
println("The answer is ${one.await() + two.await()}")
}
println("Completed in $time ms")
}
launch 后面一共可以拥有3个
MainScope其实是专门为UI创建,默认Dispatchers.Main
用withContext
import kotlinx.coroutines.*
fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")
fun main() {
newSingleThreadContext("Ctx1").use { ctx1 ->
newSingleThreadContext("Ctx2").use { ctx2 ->
runBlocking(ctx1) {
log("Started in ctx1")
withContext(ctx2) {
log("Working in ctx2")
}
log("Back to ctx1")
}
}
}
}
实体类
class Repo : Serializable {
var id: Long? = null
var node_id: String? = null
var name: String? = null
}
定义接口
interface GitHubService {
@GET("users/{user}/repos")
fun listRepos(@Path("user") user: String): Observable<List<Repo>>
}
最后是具体实现
val scheduler = Schedulers.from(Executors.newFixedThreadPool(10));
val retrofit = Retrofit.Builder()
.baseUrl("https://api.github.com")
.addConverterFactory(GsonConverterFactory.create())
.addCallAdapterFactory(RxJava2CallAdapterFactory.createWithScheduler(scheduler))//这里设置了scheduler在后面的网络请求中就不用每个设置了
.build()
val service = retrofit.create(GitHubService::class.java)
service.listRepos("octocat").observeOn(AndroidSchedulers.mainThread()).subscribe({ result ->
if(isFinishing) {//处理生命周期,或者CompositeDisposable
return@subscribe
}
result.forEach {
Log.e("MainActivity", it.name)
}
}, {
Log.e("MainActivity", it.message ?: "null")
})
这些都比较熟。生命周期除了简单的isFinishing以外,还有CompositeDisposable。线程池专门拿出来主要是后面有用,后面再说
首先,还记得上面1.3提到的么?async返回Deferred,有点类似于Future。使用 .await()来获取最终数据。所以请求的接口就需要改一下
interface GitHubService {
@GET("users/{user}/repos")
fun listRepos(@Path("user") user: String): Observable<List<Repo>>
@GET("users/{user}/repos")
fun listReposV2(@Path("user") user: String): Deferred<List<Repo>>
}
改成Deferred的返回值。同时,retrofit的构造函数中addCallAdapterFactory也有所变化
val retrofit = Retrofit.Builder()
.baseUrl("https://api.github.com")
.addConverterFactory(GsonConverterFactory.create())
.addCallAdapterFactory(CoroutineCallAdapterFactory())
.build()
val service = retrofit.create(GitHubService::class.java)
为了让CoroutineCallAdapterFactory能运行起来需要添加两个依赖库
implementation "org.jetbrains.kotlinx:kotlinx-coroutines-android:1.1.1"
implementation 'com.jakewharton.retrofit:retrofit2-kotlin-coroutines-adapter:0.9.2'
接下来看最重要的网络请求那一段
try {
launch(MyDispatcher()) {
val data = service.listReposV2("octocat").await()
withContext(Dispatchers.Main) {
if (isFinishing) {//生命周期
return@withContext
}
data.forEach {
Log.e("SecondActivity", it.name)
}
}
}
} catch (e: Exception) {
// CancellationException
Log.e("SecondActivity", e.message ?: "null")
}
重点解释一下生命周期的处理和线程池的共用问题 生命周期的处理除了isFinishing外,Coroutine还提供了CoroutineScope(Dispatchers.Main)和MainScope。其实两者都是把整个activity视为最大的Coroutine,只需要在onDestroy的时候把最大的cancel,那么内部所有的子Coroutine也会先自行cancel,也就达到了生命周期管理的功能,当然,也可以准备一个list,每次launch以后把job加进去,onDestroy的时候循环遍历去取消 接下来解释一下线程池共用的问题。如果要用到项目中去,那么必然会涉及到和原来的retrofit共用同一个线程池。Dispatchers.Default里面分为DefaultScheduler和CommonPool。我就仿照这两个自己写了一个MyDispatcher,里面包了一层Executors.newFixedThreadPool 最终的详细代码如下
class SecondActivity : AppCompatActivity() , CoroutineScope by CoroutineScope(Dispatchers.Main){
override fun onDestroy() {
cancel()
super.onDestroy()
}
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_main)
val retrofit = Retrofit.Builder()
.baseUrl("https://api.github.com")
.addConverterFactory(GsonConverterFactory.create())
.addCallAdapterFactory(CoroutineCallAdapterFactory())
.build()
val service = retrofit.create(GitHubService::class.java)
try {
launch(MyDispatcher()) {
val data = service.listReposV2("octocat").await()
withContext(Dispatchers.Main) {
if (isFinishing) {//生命周期
return@withContext
}
data.forEach {
Log.e("SecondActivity", it.name)
}
}
}
} catch (e: Exception) {
// CancellationException
Log.e("SecondActivity", e.message ?: "null")
}
}
class MyDispatcher: CoroutineDispatcher() {
private var pool = Executors.newFixedThreadPool(10)//这里可以考虑和以前老的retrofit共用同一个线程池
override fun dispatch(context: CoroutineContext, block: Runnable) {
try {
pool.execute(block)//模仿CommonPool的实现方法
} catch (e: Exception) {
}
}
}
}