首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >这个java应用程序如何在不扩展线程或实现runnable的情况下运行多个线程?

这个java应用程序如何在不扩展线程或实现runnable的情况下运行多个线程?
EN

Stack Overflow用户
提问于 2012-03-14 01:15:35
回答 1查看 1K关注 0票数 2

我正在学习Java,并且能够使用runnable对我现有的应用程序进行一些多线程处理。我现在正在研究disruptor(在线程之间共享变量),但我不知道作者实际上是如何产生线程的。

我看到他在使用Executor,我用它在我的程序中提交runnable类,但在这个例子中没有submit (或runnable)。我只从Oracle教程中学到了我所知道的,它们提到了唯一的两种方法是扩展线程或实现runnable(我在这里看不到这两种方法,但他确实将executor提交给disruptor,这可能是他是如何线程化的?)是我漏掉了什么,还是这个人用了不同的方式?我的最终目标是理解这段代码(它可以完美地工作),这样我就可以将它应用于我现有的(使用runnable)代码。

下面是有问题的代码:

App.java

代码语言:javascript
复制
import com.lmax.disruptor.*;
import com.lmax.disruptor.dsl.*;

import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;

public class App {

    private final static int RING_SIZE = 1024 * 8;

    private static long handleCount = 0;

    private final static long ITERATIONS = 1000L * 1000L * 300L;
    private final static int NUM_EVENT_PROCESSORS = 8;

    private final static EventHandler<ValueEvent> handler =
        new EventHandler<ValueEvent>() {
        public void onEvent(final ValueEvent event,
                                final long sequence,
                                final boolean endOfBatch) throws Exception {
        handleCount++;
    }
    };

    public static void main(String[] args) {
    System.out.println("Starting disruptor app.");

    ExecutorService executor = Executors.newFixedThreadPool(NUM_EVENT_PROCESSORS);

    Disruptor<ValueEvent> disruptor =
        new Disruptor<ValueEvent>(ValueEvent.EVENT_FACTORY, executor,
            new SingleThreadedClaimStrategy(RING_SIZE),
            new SleepingWaitStrategy());
    disruptor.handleEventsWith(handler);
    RingBuffer<ValueEvent> ringBuffer = disruptor.start();

    long start = System.currentTimeMillis();

        long sequence;
        ValueEvent event;
    for (long x=0; x<ITERATIONS; x++) {
        sequence = ringBuffer.next();
        event = ringBuffer.get(sequence);
        event.setValue(x);
        ringBuffer.publish(sequence);
        }
    final long expectedSequence = ringBuffer.getCursor();

    while (handleCount < expectedSequence) { }

    long opsPerSecond = (ITERATIONS * 1000L) / (System.currentTimeMillis() - start);
    System.out.printf("op/s: %d, handled: %d", opsPerSecond, handleCount);
    }
}

更新:如果Disruptor正在处理线程的产生,那么我如何向它提交现有的runnable类?或者我需要重新编写代码吗?抱歉,我有点困惑,不知道disruptor是否能与现有代码协同工作,或者我是否需要为此彻底改变我的东西。

EN

Stack Overflow用户

回答已采纳

发布于 2012-03-14 01:20:59

正如您所怀疑的,实际的线程处理(通过提交工作项)是在Disruptor中完成的。因此,您需要查看its source code (幸运的是,它是开源的),以找到以下内容:

代码语言:javascript
复制
public RingBuffer<T> start()
{
    EventProcessor[] gatingProcessors = eventProcessorRepository.getLastEventProcessorsInChain();
    ringBuffer.setGatingSequences(Util.getSequencesFor(gatingProcessors));

    checkOnlyStartedOnce();
    for (EventProcessorInfo<T> eventProcessorInfo : eventProcessorRepository)
    {
        executor.execute(eventProcessorInfo.getEventProcessor());
    }

    return ringBuffer;
}
票数 7
EN
查看全部 1 条回答
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/9688799

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档