首页
学习
活动
专区
工具
TVP
发布
社区首页 >问答首页 >在java中并行处理数组

在java中并行处理数组
EN

Stack Overflow用户
提问于 2018-05-25 17:53:48
回答 2查看 3.1K关注 0票数 1

我正在尝试通过线程获得更快的输出。只是做一个小的POC排序。

假设我有一个问题语句来查找数组中出现奇数的所有数字。下面是我对顺序和并行的尝试。

代码语言:javascript
复制
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

public class Test1 {

    final static Map<Integer, Integer> mymap  = new HashMap<>();

    static Map<Integer, AtomicInteger> mymap1 = new ConcurrentHashMap<>();

    public static void generateData(final int[] arr) {
        final Random aRandom = new Random();
        for (int i = 0; i < arr.length; i++) {
            arr[i] = aRandom.nextInt(10);
        }
    }

    public static void calculateAllOddOccurrence(final int[] arr) {

        for (int i = 0; i < arr.length; i++) {
            if (mymap.containsKey(arr[i])) {
                mymap.put(arr[i], mymap.get(arr[i]) + 1);
            } else {
                mymap.put(arr[i], 1);
            }
        }

        for (final Map.Entry<Integer, Integer> entry : mymap.entrySet()) {
            if (entry.getValue() % 2 != 0) {
                System.out.println(entry.getKey() + "=" + entry.getValue());
            }

        }

    }

    public static void calculateAllOddOccurrenceThread(final int[] arr) {

        final ExecutorService executor = Executors.newFixedThreadPool(10);
        final List<Future<?>> results = new ArrayList<>();
        ;
        final int range = arr.length / 10;
        for (int count = 0; count < 10; ++count) {
            final int startAt = count * range;
            final int endAt = startAt + range;
            executor.submit(() -> {
                for (int i = startAt; i < endAt; i++) {
                    if (mymap1.containsKey(arr[i])) {
                        final AtomicInteger accumulator = mymap1.get(arr[i]);
                        accumulator.incrementAndGet();
                        mymap1.put(arr[i], accumulator);
                    } else {
                        mymap1.put(arr[i], new AtomicInteger(1));
                    }
                }
            });
        }

        awaitTerminationAfterShutdown(executor);

        for (final Entry<Integer, AtomicInteger> entry : mymap1.entrySet()) {
            if (entry.getValue().get() % 2 != 0) {
                System.out.println(entry.getKey() + "=" + entry.getValue());
            }

        }

    }

    public static void calculateAllOddOccurrenceStream(final int[] arr) {

        final ConcurrentMap<Integer, List<Integer>> map2 = Arrays.stream(arr).parallel().boxed().collect(Collectors.groupingByConcurrent(i -> i));
        map2.entrySet().stream().parallel().filter(e -> e.getValue().size() % 2 != 0).forEach(entry -> System.out.println(entry.getKey() + "=" + entry.getValue().size()));

    }

    public static void awaitTerminationAfterShutdown(final ExecutorService threadPool) {
        threadPool.shutdown();
        try {
            if (!threadPool.awaitTermination(60, TimeUnit.SECONDS)) {
                threadPool.shutdownNow();
            }
        } catch (final InterruptedException ex) {
            threadPool.shutdownNow();
            Thread.currentThread().interrupt();
        }
    }

    public static void main(final String... doYourBest) {

        final int[] arr = new int[200000000];

        generateData(arr);
        long starttime = System.currentTimeMillis();
        calculateAllOddOccurrence(arr);

        System.out.println("Total time=" + (System.currentTimeMillis() - starttime));

        starttime = System.currentTimeMillis();
        calculateAllOddOccurrenceStream(arr);

        System.out.println("Total time Thread=" + (System.currentTimeMillis() - starttime));

    }

}

输出:

代码语言:javascript
复制
1=20003685
2=20000961
3=19991311
5=20006433
7=19995737
8=19999463
Total time=3418
5=20006433
7=19995737
1=20003685
8=19999463
2=20000961
3=19991311
Total time Thread=19640

并行执行(calculateAllOddOccurrenceStream )需要更多的时间。并行处理数组然后合并结果的最佳方法是什么?

我的目标不是找到最快的算法,而是使用任何算法,并尝试在不同的线程中运行,以便它们同时处理数组的不同部分。

EN

回答 2

Stack Overflow用户

发布于 2018-05-25 17:56:34

您现在看到的是Java8中引入的STREAMS API:http://www.baeldung.com/java-8-streams

示例:

代码语言:javascript
复制
// sequential processes
myArray.stream().filter( ... ).map( ... ).collect(Collectors.toList()):

// parallel processes
myArray.parallelStream().filter( ... ).map( ... ).collect(Collectors.toList());
票数 1
EN

Stack Overflow用户

发布于 2018-05-25 18:11:38

查看您的代码,您会发现下面这行代码出错了:

代码语言:javascript
复制
mymap1.put(arr[i], mymap1.get(arr[i]) + 1);

您正在并行覆盖这些值,例如:

代码语言:javascript
复制
Thread 1 'get' = 0
Thread 2 'get' = 0
Thread 1 'put 1' 
Thread 2 'put 1'

将您的地图更改为:

代码语言:javascript
复制
static Map<Integer, AtomicInteger>       mymap1 = new ConcurrentHashMap<>();
static {
    //initialize to avoid null values and non-synchronized puts from different Threads
    for(int i=0;i<10;i++) {
        mymap1.put(i, new AtomicInteger());
    }
}
....
    //in your loop
    for (int i = 0; i < arr.length; i++) {
        AtomicInteger accumulator = mymap1.get(arr[i]);
        accumulator.incrementAndGet();
    }

编辑:上述方法的问题当然是mymap1的初始化。为了避免落入相同的陷阱(在循环中创建AtomicInteger并再次覆盖彼此),它需要预先填充值。

既然我觉得自己很慷慨,下面是Streams API可能起作用的地方:

代码语言:javascript
复制
int totalEvenCount = Arrays.stream(arr).parallel().filter(i->i%2==0).reduce(0, Integer::sum);
int totalOddCount = Arrays.stream(arr).parallel().filter(i->i%2!=0).reduce(0, Integer::sum);

//or this to count by individual numbers:
ConcurrentMap<Integer,List<Integer>> map1 = Arrays.stream(arr).parallel().boxed().collect(Collectors.groupingByConcurrent(i->i));
map1.entrySet().stream().filter(e -> e.getKey()%2!=0).forEach(entry -> System.out.println(entry.getKey() + "=" + entry.getValue().size()));

作为读者的练习,也许您可以研究一下各种Collector的工作原理,以便编写自己的countingBy(i->i%2!=0)来输出一个只包含计数而不是值列表的映射。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/50526395

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档