我正在尝试实现一个线程安全的密钥值磁盘存储与协同。存储应该能够以FCFS顺序处理put和获取请求。在阅读文档时,我在共享的可变状态和并发性上找到了这个文章,并遇到了参与者协同构建器。
文档上的示例说明了如何从并行协同中安全地增加计数器变量。
假设参与者构建器非常适合,我尝试使用相同的构造来使用一些预定义的值并行更新一个整数变量。但是,在执行所有更新后向参与者发送get消息并不返回预期的最后更新值。
@Test
fun `updating actor state from multiple coroutines should hold the last updated value`() = runBlocking {
val counter = actor<CounterMsg> {
var counter = 0 // actor state
for (msg in channel) { // iterate over incoming messages
when (msg) {
is GetCounter -> msg.response.complete(counter)
is SetCounter -> counter = msg.value
}
}
}
val numbers = arrayOf(1, 3, 5, 8, 2, 10)
withContext(Dispatchers.Default) {
numbers.forEach {
launch {
counter.send(SetCounter(it))
}
}
}
// send a message to get a counter value from an actor
val response = CompletableDeferred<Int>()
counter.send(GetCounter(response))
val actualValue = response.await()
counter.close() // shutdown the actor
assertEquals(numbers.last(), actualValue) //fails, expected to get 10
}
// Message types for counterActor
sealed class CounterMsg
class GetCounter(val response: CompletableDeferred<Int>) : CounterMsg() // a request with reply
class SetCounter(val value: Int) : CounterMsg()
我知道测试失败了,因为启动、异步或任何挂起函数都不能保证执行顺序。
但用户可能会使用上述结构调用'get‘或'put’,并期望读取他们更新的最后一个值。
如何按特定顺序更新整数,并确保它始终保存最后更新的值?
发布于 2022-09-01 12:39:29
FCFS存储确实可以用参与者构建器实现。我检验订单的方法是错误的。在启动、异步或调用任何挂起函数(包括使用CompletableDeferred对参与者响应挂起)时,无法保证排序。下面是对带有参与者的并发队列的更好的测试--此测试断言,来自基于参与者的队列的并发轮询以提供的顺序返回head元素。
@RepeatedTest(5)
fun `actor queue test`() = runTest {
val sendQueue = LinkedList<Int>()
val receiveQueue = ConcurrentLinkedQueue<Int>()
val queueActor = actor<QueueMsg>(capacity = Channel.UNLIMITED) {
val queue = LinkedList<Int>()
for (msg in channel) {
when (msg) {
is OfferQueue -> run {
queue.offer(msg.value)
sendQueue.offer(msg.value)
}
is PollQueue -> run {
val polledInt = queue.poll()
msg.response.complete(polledInt)
receiveQueue.offer(polledInt)
}
}
}
}
withContext(Dispatchers.Default) {
repeat(10) {
launch {
queueActor.trySend(OfferQueue(it))
}
}
}
withContext(Dispatchers.Default) {
repeat(10) {
launch {
val response = CompletableDeferred<Int>()
queueActor.trySend(PollQueue(response))
}
}
}
queueActor.close()
assertIterableEquals(sendQueue, receiveQueue)
}
// Message types for queueActor
sealed class QueueMsg
class PollQueue(val response: CompletableDeferred<Int>) : QueueMsg() // a request with reply
class OfferQueue(val value: Int) : QueueMsg()
除非使用单个线程调度程序,否则无法从多个协同器中按特定顺序更新参与者的状态。但是,可以保证参与者将按照接收更新的顺序更新其状态。
类似地,当响应通过等待等待延迟收集时,无法保证参与者响应排序。
对于FCFS存储,所有操作都可以按顺序处理,方法是通过参与者发送操作,并确保在操作本身中不调用其他挂起函数。
https://stackoverflow.com/questions/73505100
复制相似问题