本文作为,其他文章链接使用,单独参考意义不大。
Master-Worker
框架如下,首先实现的Master
线程,主要用作分配任务,和返回结果集。
/** * Created by Joker on 2015/3/9. */ public class Master { //任务队列 protected Queue<Object> workQueue = new ConcurrentLinkedQueue<Object>(); //线程队列 protected Map<String, Thread> threadMap = new HashMap<String, Thread>(); //子任务处理结果集 protected Map<String, Object> resultMap = new ConcurrentHashMap<String, Object>(); public Master(Worker worker, int workerCount) { worker.setWorkQueue(workQueue); worker.setResultMap(resultMap); if (workerCount > 5) workerCount = 5; /*根据workerCount,创建指定数量的工作线程Worker*/ for (int i = 0; i < workerCount; i++) { threadMap.put(Integer.toString(i), new Thread(worker, Integer.toString(i))); } } /** * 检查子任务是否全部执行完毕 */ public boolean isComplete() { for (Map.Entry<String, Thread> entry : threadMap.entrySet()) { if (entry.getValue().getState() != Thread.State.TERMINATED) { return false; } } return true; } /** * 提交任务 */ public void submit(Object object) { workQueue.add(object); } /** * 获取子任务结果集 * * @return 结果集 */ public Map<String, Object> getResultMap() { return resultMap; } /** * 执行所有工作线程worker,处理任务。 */ public void execute() { for (Map.Entry<String, Thread> entry : threadMap.entrySet()) { entry.getValue().start(); } } }
Worker
作为子线程,实现在Handle
方法中实现具体业务逻辑,并加处理结果放到结果集中。
/** * Created by Joker on 2015/3/9. */ public class Worker implements Runnable { //子任务队列,用于过去子任务 protected Queue<Object> workQueue; //结果集 protected Map<String, Object> resultMap; public void setWorkQueue(Queue<Object> workQueue) { this.workQueue = workQueue; } public void setResultMap(Map<String, Object> resultMap) { this.resultMap = resultMap; } /** * 执行具体业务逻辑 * 模拟立方和计算 * * @param object * @return */ public Object handle(Object object) { Integer i = (Integer) object; return i * i * i; } @Override public void run() { while (true) { //获取任务 Object work = workQueue.poll(); if (work == null) break; resultMap.put(Integer.toString(work.hashCode()), this.handle(work)); } } }
需要调用的地方,进行立方和计算。
/** * 计算立方和1~100的立方和 */ public void executeCubic() { //创建三个Worker线程,执行任务 Master master = new Master(new Worker(), 3); for (int i = 0; i < 100; i++) { master.submit(i); } //执行任务 master.execute(); //计算结果,初始化 int result = 0; //计算结果集 Map<String, Object> resultMap = master.getResultMap(); while (resultMap.size() > 0 || !master.isComplete()) { String key = null; Integer i = null; for (String k : resultMap.keySet()) { key = k; break; } if (key != null) { i = (Integer) resultMap.get(key); //从结果集中移除倍计算过的key resultMap.remove(key); } if (i != null) { result += i; } } }
提交100个任务后由3个Worker
线程进行计算,Master
并不等待所有Worker
线程执行完毕,就开始访问子结果集,进行相加计算,直到子结果集中的所有数据处理完毕,并且3个活跃Worker
线程全部停止,才给出最终的立方总和。result
最终结果便是1~100立方和。
本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。
我来说两句