首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >用kotlin协同实现线程安全数据结构

用kotlin协同实现线程安全数据结构
EN

Stack Overflow用户
提问于 2022-08-26 18:21:10
回答 1查看 200关注 0票数 1

我正在尝试实现一个线程安全的密钥值磁盘存储与协同。存储应该能够以FCFS顺序处理put和获取请求。在阅读文档时,我在共享的可变状态和并发性上找到了这个文章,并遇到了参与者协同构建器。

文档上的示例说明了如何从并行协同中安全地增加计数器变量。

假设参与者构建器非常适合,我尝试使用相同的构造来使用一些预定义的值并行更新一个整数变量。但是,在执行所有更新后向参与者发送get消息并不返回预期的最后更新值。

代码语言:javascript
运行
复制
    @Test
    fun `updating actor state from multiple coroutines should hold the last updated value`() = runBlocking {

        val counter = actor<CounterMsg> {
            var counter = 0 // actor state
            for (msg in channel) { // iterate over incoming messages
                when (msg) {
                    is GetCounter -> msg.response.complete(counter)
                    is SetCounter -> counter = msg.value
                }
            }
        }
        val numbers = arrayOf(1, 3, 5, 8, 2, 10)
        withContext(Dispatchers.Default) {
            numbers.forEach {
                launch {
                    counter.send(SetCounter(it))
                }
            }
        }
        // send a message to get a counter value from an actor
        val response = CompletableDeferred<Int>()
        counter.send(GetCounter(response))
        val actualValue = response.await()
        counter.close() // shutdown the actor
        assertEquals(numbers.last(), actualValue) //fails, expected to get 10
    }

    // Message types for counterActor
    sealed class CounterMsg
    class GetCounter(val response: CompletableDeferred<Int>) : CounterMsg() // a request with reply
    class SetCounter(val value: Int) : CounterMsg()

我知道测试失败了,因为启动、异步或任何挂起函数都不能保证执行顺序。

但用户可能会使用上述结构调用'get‘或'put’,并期望读取他们更新的最后一个值。

如何按特定顺序更新整数,并确保它始终保存最后更新的值?

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2022-09-01 12:39:29

FCFS存储确实可以用参与者构建器实现。我检验订单的方法是错误的。在启动、异步或调用任何挂起函数(包括使用CompletableDeferred对参与者响应挂起)时,无法保证排序。下面是对带有参与者的并发队列的更好的测试--此测试断言,来自基于参与者的队列的并发轮询以提供的顺序返回head元素。

代码语言:javascript
运行
复制
    @RepeatedTest(5)
    fun `actor queue test`() = runTest {

        val sendQueue = LinkedList<Int>()
        val receiveQueue = ConcurrentLinkedQueue<Int>()

        val queueActor = actor<QueueMsg>(capacity = Channel.UNLIMITED) {
            val queue = LinkedList<Int>()
            for (msg in channel) {
                when (msg) {
                    is OfferQueue -> run {
                        queue.offer(msg.value)
                        sendQueue.offer(msg.value)
                    }
                    is PollQueue -> run {
                        val polledInt = queue.poll()
                        msg.response.complete(polledInt)
                        receiveQueue.offer(polledInt)
                    }
                }
            }
        }
        withContext(Dispatchers.Default) {
            repeat(10) {
                launch {
                    queueActor.trySend(OfferQueue(it))
                }
            }
        }
        withContext(Dispatchers.Default) {
            repeat(10) {
                launch {
                    val response = CompletableDeferred<Int>()
                    queueActor.trySend(PollQueue(response))
                }
            }
        }
        queueActor.close()
        assertIterableEquals(sendQueue, receiveQueue)
    }

    // Message types for queueActor
    sealed class QueueMsg
    class PollQueue(val response: CompletableDeferred<Int>) : QueueMsg() // a request with reply
    class OfferQueue(val value: Int) : QueueMsg()

除非使用单个线程调度程序,否则无法从多个协同器中按特定顺序更新参与者的状态。但是,可以保证参与者将按照接收更新的顺序更新其状态。

类似地,当响应通过等待等待延迟收集时,无法保证参与者响应排序。

对于FCFS存储,所有操作都可以按顺序处理,方法是通过参与者发送操作,并确保在操作本身中不调用其他挂起函数。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/73505100

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档