专栏首页架构师进阶并发模式(二)Master-Worker模式

并发模式(二)Master-Worker模式

Master-Worker模式是一种使用多线程进行数据处理的结构。多个Worker进程协作处理用户请求,Master进程负责维护Worker进程,并整合最终处理结果。

概念

Master-Worker模式是常用的并行模式之一。系统有两类进程协作工作:Master进程和Worker进程。Master进程负责接收和分配任务,Worker进程负责处理子任务。当所有的Worker进程将子任务完成以后,将结果返回给Master进程,由Master进程做归纳和总结。

工作示意图

模式结构图

Master维护任务队列、Worker进程队列、子任务结果集

代码实现

Master-Worker模式简易实现

Master

public class Master{
    //任务队列
    protected Queue<Object> taskQueue=new ConcurrentLinkedQueue<>();
    //子任务结果集
    protected Map<String ,Object>  resultMap=new HashMap<>();
    //worker进程队列
    protected Map<String ,Thread> threadMap=new HashMap<>();

    //判断所有的子任务是否都结束
    public boolean isComeplete(){
        for (Map.Entry<String,Thread> entry:threadMap.entrySet()){
            if (entry.getValue().getState()!= Thread.State.TERMINATED){
                return false;
            }
        }
        return true;
    }

    public Master(Worker worker,int countWorker){
        worker.setWorkQueue(taskQueue);
        worker.setResultMap(resultMap);
        for (int i=0;i<countWorker;i++){
            threadMap.put(Integer.toString(i),new Thread(worker,Integer.toString(i)));
        }
    }

    //提交一个子任务
    public void submit(Object o){
        taskQueue.add(o);
    }

    //返回子任务结果集
    public Map<String ,Object> getResultMap(){
        return resultMap;
    }

    //开始运行所有的worker进程
    public void execute(){
        for (Map.Entry<String,Thread> entry:threadMap.entrySet()){
            entry.getValue().start();
        }
    }
}

Worker

public class Worker implements Runnable{
    //子任务队列,用于取得任务
    protected Queue<Object> workQueue;

    //子任务结果集
    protected Map<String,Object> resultMap;

    public void setWorkQueue(Queue<Object> workQueue) {
        this.workQueue = workQueue;
    }

    public void setResultMap(Map<String, Object> resultMap) {
        this.resultMap = resultMap;
    }

    //子任务的处理逻辑,在子类中实现具体逻辑
    public Object handle(Object input){
        return input;
    }

    @Override
    public void run() {
        while (true){
            Object input=workQueue.poll();
            if (input==null) break;;
            Object re=handle(input);

            resultMap.put(Integer.toString(input.hashCode()),re);
        }
    }
}

实战

计算100以内的数字的立方和

CubePlusWorker

public class CubePlusWorker extends Worker {
    //子任务具体实现逻辑
    @Override
    public Object handle(Object input) {
        Integer integer=Integer.valueOf(input.toString());
        return integer*integer*integer;
    }
}

Main

public class Main {
    public static void main(String[] args) {
        long s=System.nanoTime();
        CubePlusWorker cube = new CubePlusWorker();
        Master master = new Master(cube, 10);
        //分解为100个子任务
        for (int i = 1; i <= 100; i++) {
            master.submit(i);
        }
        //执行子任务
        master.execute();
        Integer result = 0;
        System.out.println("每个子任务的执行结果是:");
        for (Map.Entry<String, Object> entry : master.getResultMap().entrySet()) {
            System.out.println(entry.getKey() + " : " + entry.getValue());
            result += Integer.valueOf(entry.getValue().toString());
        }
        System.out.println("开始汇总计算结果:");
        if (result != 0 && master.isComeplete()) {
            System.out.println("The SUM is : " + result);
        }
        System.out.println("Cost time :"+(System.nanoTime()-s)+"ns");
    }
}

执行结果:

开始汇总计算结果: The SUM is : 17236269 Cost time :3900697ns

不使用Master-Worker模式的程序执行时间要比这个快很多。

实战优化

优化计算代码。不需要等待所有Worker都执行完,即可开始计算最终结果。

public class Main {
    public static void main(String[] args) {
        long s = System.nanoTime();
        Master master=new Master(new CubePlusWorker(),10);
        for(int i=1;i<=100;i++){
            master.submit(i);
        }
        master.execute();
        Map<String,Object> resultMap=master.getResultMap();
        int result=0;
        while (resultMap.size()>0 || !master.isComeplete()){
            Set<String> keys=resultMap.keySet();
            String key=null;
            for (String k:keys){
                key=k;
                break;
            }
            Integer i=null;
            if (key!=null){
                i= (Integer) resultMap.get(key);
            }
            //最终结果
            if (i!=null){
                result+=i;
            }
            //移除已被计算过的
            if (key!=null){
                resultMap.remove(key);
            }
        }
        System.out.println("The SUM is : " + result);
        System.out.println("Cost time :" + (System.nanoTime() - s) + "ns");
    }
}

Master-Worker模式是一种将串行任务并行化的方法,被分解的子任务在系统中可以被并行处理。同时,Master进程不需要等待所有子任务都完成,就可以根据已有的部分结果集计算最终结果。

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • 基础服务系列-git仓库删除所有提交历史记录,成为一个干净的新仓库

    需要更换代码Git仓库,想把之前的commits全部删掉。网上找了众多方法,都不起作用。踩了2次坑,幸好查到了。

    用户2146693
  • Java文件下载中文名不显示&乱码

    Java开发中,文件上传下载是很常用的功能。实际开发中遇到了中文名不显示的问题,一开始以为是后台代码的原因,网上查了一些资料,原来是Header中只支持ASCI...

    用户2146693
  • SonarQube 安装

    SonarQube(以前叫Sonar)是一个用于代码质量管理的开源平台,用于管理源代码的质量,可以从七个维度检测代码质量通过插件形式,可以支持包括java,C#...

    用户2146693
  • 如何在12个小时,搞定http监控?

    系统监控,是做系统必须解决的一个问题。创业型公司,微服务以前,如何用半天的时间,搞定一个可扩展,通用的http监控框架,是今天要聊的话题。

    架构师之路
  • 并发编程之master-worker模式

    一、简介 Master-Worker模式是常用的并行设计模式。它的核心思想是,系统有两个进程协议工作:Master进程和Worker进程。Master进程负责接...

    lyb-geek
  • Master-Worker模式

    原文地址:http://www.wangqi94.com/to/master/blog?uuid=80

    七七分享
  • 设计模式之代理模式

           代理模式是一种注重过程的一种模式,在我们做某一具体事情之前或之后做一些辅助性的工作; 好处是将辅助工作从主要工作中抽离出来,但是又能协同工作, ...

    用户1215919
  • 动态代理(一)

    代理模式是Java的一种设计模式,开发中可能会有一种场景,某个类的方法需要补充,但是由于不想在原有的类基础上改动,该如何做呢,如下:

    LiosWong
  • 扯点儿高性能(一):CGI篇【搞附近】

    CGI是一种协议,并不是一种具体的代码程序。上古时代的PHP程序就是靠CGI协议与HTTP服务器比如Apache协作完成。最开始那会儿Web站点的出现一般都是纯...

    老李秀
  • 聊聊storm的IEventLogger

    storm-2.0.0/storm-client/src/jvm/org/apache/storm/metric/IEventLogger.java

    codecraft

扫码关注云+社区

领取腾讯云代金券