前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >并发编程之Master-Worker模式

并发编程之Master-Worker模式

作者头像
烟雨星空
发布2020-06-16 16:03:36
4540
发布2020-06-16 16:03:36
举报
文章被收录于专栏:Java进阶指南Java进阶指南

我们知道,单个线程计算是串行的,只有等上一个任务结束之后,才能执行下一个任务,所以执行效率是比较低的。

那么,如果用多线程执行任务,就可以在单位时间内执行更多的任务,而Master-Worker就是多线程并行计算的一种实现方式。

它的思想是,启动两个进程协同工作:Master和Worker进程。

Master负责任务的接收和分配,Worker负责具体的子任务执行。每个Worker执行完任务之后把结果返回给Master,最后由Master汇总结果。(其实也是一种分而治之的思想,和forkjoin计算框架有相似之处,参看:forkjoin框架及其性能分析

Master-Worker工作示意图如下:

下面用Master-Worker实现计算1-100的平方和,思路如下:

  1. 定义一个Task类用于存储每个任务的数据。
  2. Master生产固定个数的Worker,把所有worker存放在workers变量(map)中,Master需要存储所有任务的队列workqueue(ConcurrentLinkedQueue)和所有子任务返回的结果集resultMap(ConcurrentHashMap)。
  3. 每个Worker执行自己的子任务,然后把结果存放在resultMap中。
  4. Master汇总resultMap中的数据,然后返回给Client客户端。
  5. 为了扩展Worker的功能,用一个MyWorker继承Worker重写任务处理的具体方法。

Task类:

代码语言:javascript
复制
package com.thread.masterworker;
public class Task {
    private int id;
    private String name;
    private int num;

    public int getId() {
        return id;
    }

    public void setId(int id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public int getNum() {
        return num;
    }

    public void setNum(int num) {
        this.num = num;
    }
}

Master实现:

代码语言:javascript
复制
package com.thread.masterworker;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;

public class Master {
    //所有任务的队列
    private ConcurrentLinkedQueue<Task> workerQueue = new ConcurrentLinkedQueue<Task>();

    //所有worker
    private HashMap<String,Thread> workers = new HashMap<String,Thread>();

    //共享变量,worker返回的结果
    private ConcurrentHashMap<String,Object> resultMap = new ConcurrentHashMap<String,Object>();

    //构造方法,初始化所有worker
    public Master(Worker worker,int workerCount){
        worker.setWorkerQueue(this.workerQueue);
        worker.setResultMap(this.resultMap);

        for (int i = 0; i < workerCount; i++) {
            Thread t = new Thread(worker);
            this.workers.put("worker-"+i,t);
        }
    }

    //任务的提交
    public void submit(Task task){
        this.workerQueue.add(task);
    }

    //执行任务
    public int execute(){
        for (Map.Entry<String, Thread> entry : workers.entrySet()) {
            entry.getValue().start();
        }

        //一直循环,直到结果返回
        while (true){
            if(isComplete()){
                return getResult();
            }
        }

    }

    //判断是否所有线程都已经执行完毕
    public boolean isComplete(){
        for (Map.Entry<String, Thread> entry : workers.entrySet()) {
            //只要有任意一个线程没有结束,就返回false
            if(entry.getValue().getState() != Thread.State.TERMINATED){
                return false;
            }
        }
        return true;
    }

    //处理结果集返回最终结果
    public int getResult(){
        int res = 0;
        for (Map.Entry<String,Object> entry : resultMap.entrySet()) {
            res += (Integer) entry.getValue();
        }
        return res;
    }

}

父类Worker:

代码语言:javascript
复制
package com.thread.masterworker;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;

public class Worker implements Runnable {

    private ConcurrentLinkedQueue<Task> workerQueue;

    private ConcurrentHashMap<String,Object> resultMap;

    public void setWorkerQueue(ConcurrentLinkedQueue<Task> workerQueue) {
        this.workerQueue = workerQueue;
    }

    public void setResultMap(ConcurrentHashMap<String, Object> resultMap) {
        this.resultMap = resultMap;
    }

    @Override
    public void run() {
        while(true){
            //从任务队列中取出一个任务
            Task task = workerQueue.poll();
            if(task == null) break;
            //处理具体的任务
            Object res = doTask(task);
            //把每次处理的结果放到结果集里面,此处直接把num值作为结果
            resultMap.put(String.valueOf(task.getId()),res);
        }

    }

    public Object doTask(Task task) {
        return null;
    }
}

子类MyWorker继承父类Worker,重写doTask方法实现具体的逻辑:

代码语言:javascript
复制
package com.thread.masterworker;

public class MyWorker extends Worker {
    @Override
    public Object doTask(Task task) {
        //暂停0.5秒,模拟任务处理
        try {
            Thread.sleep(500);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        //计算数字的平方
        int num = task.getNum();
        return num * num;
    }
}

客户端Client:

代码语言:javascript
复制
package com.thread.masterworker;

import java.util.Random;

public class Client {
    public static void main(String[] args) {

        Master master = new Master(new MyWorker(), 10);

        //提交n个任务到任务队列里
        for (int i = 0; i < 100; i++) {
            Task task = new Task();
            task.setId(i);
            task.setName("任务"+i);
            task.setNum(i+1);
            master.submit(task);
        }

        //执行任务
        long start = System.currentTimeMillis();
        int res = master.execute();
        long time = System.currentTimeMillis() - start;
        System.out.println("结果:"+res+",耗时:"+time);
    }
}

以上,我们用10个线程去执行子任务,最终由Master做计算求和(1-100的平方和)。每个线程暂停500ms,计算数字的平方值。

总共100个任务,分10个线程并行计算,相当于每个线程均分10个任务,一个任务的时间大概为500ms,故10个任务为5000ms,再加上计算平方值的时间,故稍大于5000ms。结果如下,

代码语言:javascript
复制
结果:338350,耗时:5084
本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2020-02-21,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 烟雨星空 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
GPU 云服务器
GPU 云服务器(Cloud GPU Service,GPU)是提供 GPU 算力的弹性计算服务,具有超强的并行计算能力,作为 IaaS 层的尖兵利器,服务于深度学习训练、科学计算、图形图像处理、视频编解码等场景。腾讯云随时提供触手可得的算力,有效缓解您的计算压力,提升业务效率与竞争力。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档