首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >如果接收到信号,重复操作

如果接收到信号,重复操作
EN

Code Review用户
提问于 2017-02-17 08:20:40
回答 1查看 102关注 0票数 2

我有一个数据同步操作,正在观察一个数据库。对于系统来说,这是一段相当关键的代码,所以我想确保正确处理并发性。

问题

  • 该操作操作数据库中的同步状态标志,并且应该具有零并发性。
  • 操作必须是可取消的。
  • 如果在运行时接收到信号,则该操作应重复一次额外的时间。

行为

  • 服务是响应对基础数据的更改而启动的。
  • 服务是为了响应连接性的变化而启动的。
  • 服务定期启动。

电流设计

代码语言:javascript
运行
复制
    bool syncRequired = false;

    private async void StartSynchronisation()
    {
        // the cancellation token operates as the flag to indicate if the operation is running
        if (this.cancellationTokenSource != null)
        {
            syncRequired = true;
            return;
        }

        try
        {
            this.cancellationTokenSource = new System.Threading.CancellationTokenSource();

            // stop the time that periodically prompt the operation
            this.synchronisationTimer.Stop();

            bool dataWasSynchronised = false;

            // Repeated synchronisation and replication until no updates are made
            do
            {
                syncRequired = false;
                // Only attempt synchronisation if the service is actually available
                if (this.connectivityMonitor.IsServiceAvailable)
                {
                    dataWasSynchronised = await DoSynchronisation().ConfiguredAwait(false);
                }
            }
            while (dataWasSynchronised || syncRequired );
        }
        catch (Exception ex)
        {
            // handle exception using injected policy handler
        }
        finally
        {
            this.synchronisationTimer.Start();
            this.cancellationTokenSource.Dispose();
            this.cancellationTokenSource = null;
        }
    }

我给您带来了这段代码,因为它缺少任何线程同步,而且它可能需要它。目前,它依赖于对引用(cancellationTokenSource)和bool (syncRequired)的读/写的原子性,但这并没有真正涵盖任何内容。作为一段关键的代码,我不想错过一个事件,需要等待下一个计时器间隔来启动操作。它会继续工作,但是我们的测试部门不会喜欢它。我的问题是,线程并发不是我最强大的领域,我已经在这个功能上疲惫不堪了--我在不把头绑成结的情况下很难对它进行推理。

我需要哪些同步原语来确保所有事件都按预期的方式处理?是否有更好的模式使用并发队列或反应性扩展,我可以在这里加入?我一直在考虑使用反应性扩展的BehaviorSubject<T>

EN

回答 1

Code Review用户

发布于 2017-02-17 14:16:59

我的理解是:

第一个if语句是通过检查cancellationTokenSource是否为null来检查线程是否已经在执行循环;如果它不是null,则通过将syncRequired设置为true并返回来发出需要同步的信号。由于while (dataWasSynchronised || syncRequired )条件,执行该循环的线程将运行另一个循环。

如果cancellationTokenSource是空的,那么同步循环就不会被执行,所以这个线程就会继续这样做。

私有异步空StartSynchronisation() { //取消令牌作为标志操作,指示操作是否正在运行(this.cancellationTokenSource != null) { syncRequired = true;} try { this.cancellationTokenSource =新System.Threading.CancellationTokenSource();

因此,第一个问题是,该检查(第一个if语句)没有同步。一个线程可以看到cancellationTokenSource为null,然后继续执行同步循环,首先将cancellationTokenSource设置为一个新对象。在分配新对象(例如,在将syncRequired设置为true之后)之前,有一个上下文开关,另一个线程执行相同的代码。它也看到cancellationTokenSource为null,将syncRequired设置为真,为cancellationTokenSource分配一个新对象,并开始运行循环。然后是另一个上下文开关,我们的原始线程又回来了,为cancellationTokenSource分配了一个新的对象,并开始执行循环。瞧,你现在违反了你的第一个要求,即零并发。

修复很简单:检查以及任何被修改以影响检查结果的对象都必须同步。

实现这一目标的基本途径是:

代码语言:javascript
运行
复制
class MyClass
{
    private readonly object syncLock = new object();
    private volatile bool syncRequired = false;
    private volatile bool syncRunning = false;

    private void MySyncFunc()
    {
        lock (syncLock)
        {
            if (syncRunning)
            {
                syncRequired = true;
                return;
            }

            syncRunning = true;
        }

        while (true)
        {
            syncRequired = false;

            // your sync operations here

            lock (syncLock)
            {
                if (!syncRequired)
                {
                    syncRunning = false;
                    break;
                }
            }
        }
    }
}

我很快就会指出,这并不是最有效、最漂亮的方法,但这是一种简单的(希望是准确的)展示和实现的方法。

首先,请注意,如果没有锁,您甚至无法检查同步循环是否正在运行。所以一次只有一个线程可以检查。如果线程确定它没有运行,它会将值设置为true (以表示它现在正在运行)并退出锁。在前面的示例中到达的第二个线程,如果在这段时间内切换到上下文,就无法继续执行,因为它无法获得锁。一旦第一个线程退出锁,第二个线程就会获得锁。现在它看到syncRunning是true,所以它将syncRequired设置为true并返回。那里没有同步问题。

现在让我们来看看这个循环。尽管这是一个无限循环,我只是想从循环条件中得到检查结果。所以它首先要做的是将syncRequired设置为false。这不需要同步,因为即使另一个线程已经将其设置为真,您现在还是要去做实际的同步。

因此,在您自己的同步工作完成后,是时候检查您是否可以退出循环,还是需要重复它。(我没有检查同步操作的结果,因为它与此无关。)再说一次你得去拿锁。为什么?因为线程可以检查syncRequired并看到它是false,然后继续设置syncRunning = false。但是在设置它之前,上下文切换之前,另一个线程检查并看到syncRunning == true,因此将syncRequired设置为true并返回。同时,第一个线程继续将syncRunning设置为false并返回。现在您有了syncRequired == true,但是没有运行循环的线程。喔,你错过了一个活动!看看您的原始代码,您就会发现存在这个问题,如果在循环退出后,在syncRequired块中将cancellationTokenSource设置为null,那么finally就会被设置为true。

因此,上面的代码所做的是再次获得同一个对象的锁,并检查是否需要同步。如果是,好的,锁就退出,循环重新开始。但如果不是,则在退出锁之前将syncRunning设置为false。由于希望检查syncRunning == true的任何其他线程都必须等到循环中的锁退出时才会看到正确的值,因此当它们最终获得锁时,它们将看到正确的值,因为如果第一个线程将退出该循环,则它将在拥有锁时和退出锁之前将syncRunning设置为false。下一个线程是否获得锁并进行检查,它将看到syncRunning ==为false,并继续运行循环本身。别再输了!

其他几个问题:

如果要在循环中使用成员变量,但不修改其在循环中的值(或仅在每次迭代中将其设置为单个值,这样做就没有明显的意义,即使您知道其他线程可以修改该值),那么将其标记为volatile。这将导致每次从内存中读取值,因此您将从其他线程获取更改。否则,编译器可以优化已读出的循环;毕竟,在循环的生存期内,它永远不会改变。

你的密码是不可取消的。当然,您创建了一个CancellationTokenSource,但实际上您从未使用过它--因此,在我的示例中,我只是简单地用一个bool替换它。您需要做的是检查每个迭代中的令牌(或者可能将其传递给被调用的函数,如果这是应该允许取消其任务的部分,尽管随后您必须更改函数以接受并知道如何使用令牌),并查看是否已请求取消;如果是,则抛出一个OperationCanceledException (或者您希望中止当前正在运行的任务)。

票数 2
EN
页面原文内容由Code Review提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://codereview.stackexchange.com/questions/155575

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档