首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >一个发布者和4个并行使用者的中断示例

一个发布者和4个并行使用者的中断示例
EN

Stack Overflow用户
提问于 2012-11-12 05:28:22
回答 2查看 3.2K关注 0票数 7

在本例中,https://stackoverflow.com/a/9980346/93647和这里的为什么我的干扰者的例子这么慢? (在问题的末尾)有一个发布项和一个使用者。

但在我的例子中,消费者的工作要复杂得多,而且需要一些时间。所以我想要4个用户并行地处理数据。

例如,如果生产者生产数字:1,2,3,4,5,6,7,8,9,10,11。

我要consumer1抓到1,5,9,.consumer2捕捉2,6,10,.consumer3捕捉3,7,11,.consumer4捕捉4,8,12.(不完全是这些数字,我们的想法是数据应该并行处理,我不在乎哪个特定的数字是在哪个消费者上处理的)

请记住,这需要并行完成,因为在实际应用程序中,使用者的工作非常昂贵。我期望使用者在不同的线程中执行,以使用多核系统的能力。

当然,我可以创建4个环形缓冲器,并将1个消费者附加到1个环形缓冲区。这样我就可以使用原始的例子了。但我觉得这是不对的。创建一个发布者(1个林格缓冲区)和4个消费者可能是正确的,因为这是我所需要的。

在google组中添加一个非常模拟的问题的链接:https://groups.google.com/forum/#!msg/lmax-disruptor/-CLapWuwWLU/GHEP4UkxrAEJ

所以我们有两个选择:

  • 一个铃声很多消费者(每个消费者都会在每次添加时“醒来”,所有消费者都应该拥有相同的WaitStrategy)。
  • 许多“一环一消费者”(每个消费者只会唤醒它应该处理的数据)。每个消费者都可以拥有自己的WaitStrategy)。
EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2012-11-12 06:57:17

编辑:我忘了提到代码部分取自常见问题。我不知道这个办法比弗兰克的建议好还是坏。

该项目严重缺乏文件记录,这是一个耻辱,因为它看起来很好。

无论如何,尝试下面的snip (基于您的第一个链接)-在mono上进行测试,并且似乎还可以:

代码语言:javascript
运行
复制
using System;
using System.Threading.Tasks;
using Disruptor;
using Disruptor.Dsl;

namespace DisruptorTest
{
    public sealed class ValueEntry
    {
        public long Value { get; set; }
    }

    public class MyHandler : IEventHandler<ValueEntry>
    {
        private static int _consumers = 0;
        private readonly int _ordinal;

        public MyHandler()
        {
            this._ordinal = _consumers++;
        }

        public void OnNext(ValueEntry data, long sequence, bool endOfBatch)
        {
            if ((sequence % _consumers) == _ordinal)
                Console.WriteLine("Event handled: Value = {0}, event {1} processed by {2}", data.Value, sequence, _ordinal);
            else
                Console.WriteLine("Event {0} rejected by {1}", sequence, _ordinal);                     
        }
    }

    class Program
    {
        private static readonly Random _random = new Random();
        private const int SIZE = 16;  // Must be multiple of 2
        private const int WORKERS = 4; 

        static void Main()
        {
            var disruptor = new Disruptor.Dsl.Disruptor<ValueEntry>(() => new ValueEntry(), SIZE, TaskScheduler.Default);
            for (int i=0; i < WORKERS; i++)
                disruptor.HandleEventsWith(new MyHandler());
            var ringBuffer = disruptor.Start();

            while (true)
            {
                long sequenceNo = ringBuffer.Next();
                ringBuffer[sequenceNo].Value =  _random.Next();;
                ringBuffer.Publish(sequenceNo);
                Console.WriteLine("Published entry {0}, value {1}", sequenceNo, ringBuffer[sequenceNo].Value);
                Console.ReadKey();
            }
        }
    }
}
票数 2
EN

Stack Overflow用户

发布于 2012-11-12 05:36:21

从环形缓冲区的规范中,您将看到每个消费者都将尝试处理您的ValueEvent。在你的情况下你不需要那个。

我就这样解决了:

将已处理的字段添加到您的ValueEvent中,当使用者在该字段上测试事件时,如果已经处理了该字段,则转移到下一个字段。

不是最漂亮的方式,但这是缓冲区的工作方式。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/13338796

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档