Kotlin---使用协程的异步

协程间的通信

协程与协程间不能直接通过变量来访问数据,会导致数据原子性的问题,所以协程提供了一套Channel机制来在协程间传递数据。

Channel 是一个和 BlockingQueue 非常相似的概念。其中一个不同是它代替了阻塞的 put 操作并提供了挂起的 send,还替代了阻塞的 take 操作并提供了挂起的 receive

Channel发送和接收操作是 公平的 并且尊重调用它们的多个协程。它们遵守先进先出原则,可以看到第一个协程调用 receive 并得到了元素

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*

fun main() = runBlocking {
    val channel = Channel<Int>()
    launch {
        // 这里可能是消耗大量CPU运算的异步逻辑,我们将仅仅做5次整数的平方并发送
        for (x in 1..5) channel.send(x * x)
        channel.close()
    }
    // 这里我们打印了5次被接收的整数:
    // channel.receive()是阻塞的,等待发送数据发送完毕
    repeat(5) { println(channel.receive()) }
    println("Done!")
}

当发送完毕后,记得调用channel.close()close()操作就像向通道发送了一个特殊的关闭指令。 这个迭代停止就说明关闭指令已经被接收了。所以这里保证所有先前发送出去的元素都在通道关闭前被接收到。

基于协程的生产者\消费者

在协程中,可以通过produce来模拟生产者生产数据。并且通过consume来模拟消费者情况。

目前,在1.3.11版本的Kotlin中,produceconsume都还只是实验性的功能,没有正式release,使用时记得使用@ExperimentalCoroutinesApi标记使用的函数

runBlocking {
      val receiveChannel: ReceiveChannel<Int> = produce {
          for (x in 1..5) send(x * x)
      }
      receiveChannel.consumeEach {
          println(it)
      }
 }

扇入

扇入的概念与文件系统中的IO多路复用差不多。

扇入允许多个协程可以发送到同一个通道。

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*

fun main() = runBlocking {
    val channel = Channel<String>()
    launch { sendString(channel, "foo", 200L) }
    launch { sendString(channel, "BAR!", 500L) }
    repeat(6) { // 接收前六个
        println("${channel.receive()}...${Thread.currentThread().name}")
    }
    coroutineContext.cancelChildren() // 取消所有子协程来让主协程结束
}

suspend fun sendString(channel: SendChannel<String>, s: String, time: Long) {
    while (true) {
        delay(time)
        channel.send(s)
    }
}

运行结果:

foo...main @coroutine#1
foo...main @coroutine#1
BAR!...main @coroutine#1
foo...main @coroutine#1
foo...main @coroutine#1
foo...main @coroutine#1

使用 async 并发

async就类似于 launch。它启动了一个单独的协程,这是一个轻量级的线程并与其它所有的协程一起并发的工作。不同之处在于 launch 返回一个 Job并且不附带任何结果值,而 async 返回一个 Deferred—— 一个轻量级的非阻塞 future, 这代表了一个将会在稍后提供结果的 promise。你可以使用 .await() 在一个延期的值上得到它的最终结果, 但是 Deferred 也是一个 Job,所以如果需要的话,你可以取消它。

其实区别就是,async可以获取执行结果,而launch只是简单的执行任务。

import kotlinx.coroutines.*
import kotlin.system.*

fun main() = runBlocking<Unit> {
    val time = measureTimeMillis {
        val one = async { doSomethingUsefulOne() }
        val two = async { doSomethingUsefulTwo() }
        println("The answer is ${one.await() + two.await()}")
    }
    println("Completed in $time ms")    
}

suspend fun doSomethingUsefulOne(): Int {
    delay(1000L) // 假设我们在这里做了些有用的事
    return 13
}

suspend fun doSomethingUsefulTwo(): Int {
    delay(1000L) // 假设我们在这里也做了些有用的事
    return 29
}

执行结果:

The answer is 42
Completed in 1017 ms

async{}会直接启动协程,如果需要等待某个事件启动的话,则需要使用CoroutineStart.LAZY

val time = measureTimeMillis {
    val one = async(start = CoroutineStart.LAZY) { doSomethingUsefulOne() }
    val two = async(start = CoroutineStart.LAZY) { doSomethingUsefulTwo() }
    // 执行一些计算
    one.start() // 启动第一个
    two.start() // 启动第二个
    println("The answer is ${one.await() + two.await()}")
}
println("Completed in $time ms")

协程安全

协程与线程一样,对于数据的操作无法保持原子性,所以在协程中,需要使用原子性的数据结构,例如AotimicInteger等,或者使用mutex.withLock,来处理数据的原子性

import kotlinx.coroutines.*
import kotlinx.coroutines.sync.*
import kotlin.system.*

suspend fun CoroutineScope.massiveRun(action: suspend () -> Unit) {
    val n = 100  // 启动的协程数量
    val k = 1000 // 每个协程重复执行同一动作的次数
    val time = measureTimeMillis {
        val jobs = List(n) {
            launch {
                repeat(k) { action() }
            }
        }
        jobs.forEach { it.join() }
    }
    println("Completed ${n * k} actions in $time ms")    
}

val mutex = Mutex()
var counter = 0

fun main() = runBlocking<Unit> {
    GlobalScope.massiveRun {
        mutex.withLock {
            counter++        
        }
    }
    println("Counter = $counter")    
}

输出结果:

Completed 100000 actions in 104 ms
Counter = 100000

Actor

Actor 是由协程、被限制并封装到该协程中的状态以及一个与其它协程通信的 通道 组合而成的一个实体。一个简单的 actor 可以简单的写成一个函数, 但是一个拥有复杂状态的 actor 更适合由类来表示。

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlin.system.*

suspend fun CoroutineScope.massiveRun(action: suspend () -> Unit) {
    val n = 100  // 启动的协程数量
    val k = 1000 // 每个协程重复执行同个动作的次数
    val time = measureTimeMillis {
        val jobs = List(n) {
            launch {
                repeat(k) { action() }
            }
        }
        jobs.forEach { it.join() }
    }
    println("Completed ${n * k} actions in $time ms")    
}

// 计数器 Actor 的各种类型
sealed class CounterMsg
object IncCounter : CounterMsg() // 递增计数器的单向消息
class GetCounter(val response: CompletableDeferred<Int>) : CounterMsg() // 携带回复的请求

// 这个函数启动一个新的计数器 actor
fun CoroutineScope.counterActor() = actor<CounterMsg> {
    var counter = 0 // actor 状态
    for (msg in channel) { // 即将到来消息的迭代器
        when (msg) {
            is IncCounter -> counter++
            is GetCounter -> msg.response.complete(counter)
        }
    }
}

fun main() = runBlocking<Unit> {
    val counter = counterActor() // 创建该 actor
    GlobalScope.massiveRun {
        counter.send(IncCounter)
    }
    // 发送一条消息以用来从一个 actor 中获取计数值
    val response = CompletableDeferred<Int>()
    counter.send(GetCounter(response))
    println("Counter = ${response.await()}")
    counter.close() // 关闭该actor    
}

actor 本身执行时所处上下文(就正确性而言)无关紧要。一个 actor 是一个协程,而一个协程是按顺序执行的,因此将状态限制到特定协程可以解决共享可变状态的问题。实际上,actor 可以修改自己的私有状态,但只能通过消息互相影响(避免任何锁定)。

actor 在高负载下比锁更有效,因为在这种情况下它总是有工作要做,而且根本不需要切换到不同的上下文。

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

扫码关注云+社区

领取腾讯云代金券