首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >在executor框架中同步资源

在executor框架中同步资源
EN

Stack Overflow用户
提问于 2015-03-24 03:28:24
回答 2查看 78关注 0票数 0

我正在使用executor框架来执行一个大型任务。出于进程状态的目的,我需要对已完成的数量进行计数。所以我创建了一个带有计数器的单例类来保持计数。

代码语言:javascript
运行
复制
public class ProgramInitializationTracker {

    private static Map<String, Integer> programInitializedTracker = new HashMap<>();
    private static ProgramInitializationTracker instance;

    private ProgramInitializationTracker(){

    }

    public static ProgramInitializationTracker getInstance(){
        if(instance == null){
            synchronized (ProgramInitializationTracker.class) {
                if(instance == null){
                    instance = new ProgramInitializationTracker();
                }
            }
        }
        return instance;
    }
    public Integer getProgramInitializedTracker(String key) {
        return programInitializedTracker.get(key);
    }

    public void setProgramInitializedTracker(String key, int value) {
        synchronized (ProgramInitializationTracker.class) {
            ProgramInitializationTracker.programInitializedTracker.put(key, value);
        }
    }
}

但问题是,仅仅通过同步set方法并不能真正确保我有正确的count值。就我所能得到的多线程而言。让get函数也同步会对我有帮助。如果不是,那么我应该做些什么来纠正它。

EN

回答 2

Stack Overflow用户

发布于 2015-03-24 03:51:19

当Java已经为您提供了对集合的线程安全访问时,您不应该尝试实现您自己的线程安全访问。

您应该使用ConcurrentHashMap。诸如get之类的读取不会阻塞。

但是,您应该使用AtomicInteger,而不是使用Integer类型作为存储在映射中的值,这将确保尝试修改与相同键关联的值的多个线程是线程安全的。

票数 1
EN

Stack Overflow用户

发布于 2015-03-24 04:47:31

在你发布的约束下,只需在你提交给ExecutorService的任务和你想要有指标的地方之间共享一个AtomicInteger实例即可。variant1用于具有覆盖所有任务的单个计数器,而variant2用于针对每个任务类型具有计数器。这段代码是(应该是)线程安全的。

代码语言:javascript
运行
复制
@ThreadSafe
class Test {

    private static class CountingRunnable implements Runnable {
        @Nonnull
        private final Runnable actualTask;

        @Nonnull
        private final AtomicInteger submitted;

        public CountingRunnable(@Nonnull Runnable actualTask, @Nonnull AtomicInteger submitted) {
            this.actualTask = actualTask;
            this.submitted = submitted;
        }

        @Override
        public void run() {
            actualTask.run();
            submitted.incrementAndGet();
        }
    }


    public static void main(String[] args) throws InterruptedException {
        variant2();
    }

    private static void variant1() throws InterruptedException {
        ExecutorService service = Executors.newFixedThreadPool(2);

        AtomicInteger counter = new AtomicInteger();

        final CountDownLatch latch = new CountDownLatch(1);

        service.submit(new CountingRunnable(new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(1000);
                    latch.countDown();
                } catch (InterruptedException e) {}
            }
        }, counter));

        latch.await();
        System.out.println(counter.get());
        service.shutdown();
    }

    private enum TaskType {
        TYPE_1,
        TYPE_2
    }

    private static void variant2() throws InterruptedException {
        ExecutorService service = Executors.newFixedThreadPool(2);

        final CountDownLatch latch = new CountDownLatch(2);
        final EnumMap<TaskType, AtomicInteger> metrics = new EnumMap<>(TaskType.class);
        metrics.put(TaskType.TYPE_1, new AtomicInteger());
        metrics.put(TaskType.TYPE_2, new AtomicInteger());

        service.submit(new CountingRunnable(new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(1000);
                    latch.countDown();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }, metrics.get(TaskType.TYPE_1)));

        service.submit(new CountingRunnable(new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(1000);
                    latch.countDown();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }, metrics.get(TaskType.TYPE_2)));

        latch.await();

        System.out.println("type 1: " + metrics.get(TaskType.TYPE_1));
        System.out.println("type 2: " + metrics.get(TaskType.TYPE_2));

        service.shutdown();
    }
}
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/29218739

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档