深入理解[Master-Worker模式]原理与技术

Master-Worker模式是常用的并行模式之一。它的核心思想是,系统由两类进程协作工作:Master进程和Worker进程。Master进程负责接收和分配任务,Worker进程负责处理子任务。当各个Worker进程将子任务处理完成后,将结果返回给Master进程,由Master进程做归纳和汇总,从而得到系统的最终结果,其处理过程如图1所示。

Master-Worker模式的好处,它能够将一个大任务分解成若干个小任务,并且执行,从而提高系统的吞吐量。而对于系统请求者Client来说,任务一旦提交,Master进程会分配任务并立即返回,并不会等待系统全部处理完成后再返回,其处理过程是异步的。因此Client不会出现等待现象。

1.Master-Worker的模式结构

Master-Worker模式是一种使用多线程进行数据结构处理的结构。

Master进程为主要进程,它维护了一个Worker进程队列、子任务队列和子结果集。Worker进程队列中的Worker进程,不停地从任务队列中提取要处理的子任务,并将子任务的处理结果写入结果集。

2.Master-Worker的代码实现

基于以上的思路实现一个简易的master-worker框架。其中Master部分的代码如下:

public class Master {
    //任务队列
    protected Queue<Object> workQuery = new ConcurrentLinkedQueue<Object>();
    //worker进程队列
    protected Map<String, Thread> threadMap = new HashMap<>();
    //子任务处理结果集
    protected Map<String, Object> resultMap = new ConcurrentHashMap<>();

    //是否所有的子任务都结束了
    public boolean isComplete() {
        for (Map.Entry<String, Thread> entry : threadMap.entrySet()) {
            if (entry.getValue().getState()!=Thread.State.TERMINATED){
                return false;
            }
        }
        return true;
    }

    //Master 的构造,需要一个Worker 进程逻辑,和需要的Worker进程数量
    public Master(Worker worker,int countWorker){
        worker.setWorkQueue(workQuery);
        worker.setResultMap(resultMap);
        for (int i = 0; i < countWorker; i++) {
            threadMap.put(Integer.toString(i),new Thread(worker));
        }
    }

    //提交一个任务
    public void submit(Object job){
        workQuery.add(job);
    }

    //返回子任务结果集
    public Map<String,Object> getResultMap(){
        return resultMap;
    }

    //开始运行所有的worker进程,进行处理
    public void  execute(){
        for (Map.Entry<String,Thread> entry : threadMap.entrySet()){
            entry.getValue().start();
        }
    }

}

对应的Worker进程的代码实现:

public class Worker implements Runnable {
    //任务队列,用于取得子任务
    protected Queue<Object> workQueue;
    //子任务处理结果集
    protected Map<String, Object> resultMap;

    public void setWorkQueue(Queue<Object> workQueue) {
        this.workQueue = workQueue;
    }

    public void setResultMap(Map<String, Object> resultMap) {
        this.resultMap = resultMap;
    }

    //子任务处理的逻辑,在子类中实现具体逻辑
    public Object handle(Object input) {
        return input;
    }

    @Override
    public void run() {
        while (true) {
            //获取子任务,poll()方法取出(并删除)队首的对象
            Object input = workQueue.poll();
            if (input == null) {
                break;
            }
            //处理子任务
            Object re = handle(input);
            //将处理结果写入结果集
            resultMap.put(Integer.toString(input.hashCode()), re);
        }
    }
}

以上两段代码已经展示了Master-Worker框架的全貌。应用程序通过重载 Worker.handle() 方法实现应用层逻辑。

例如,要实现计算1+2+..+100的结果,代码如下:

public class PlusWorker extends Worker {

    @Override
    public Object handle(Object input) {
        Integer i = (Integer) input;
        return i+1;
    }

    public static void main(String[] args) {
        Master master = new Master(new PlusWorker(), 5);
        for (int i = 0; i < 100; i++) {
            master.submit(i); //提交一百个子任务
        }
        master.execute(); //开始计算
        int re = 0;
        Map<String, Object> resultMap = master.getResultMap();
        while (resultMap.size() > 0 || !master.isComplete()) {
            Set<String> keys = resultMap.keySet();
            String key = null;
            for (String k : keys) {
                key = k;
                break;
            }
            Integer i = null;
            if (key != null) {
                i = (Integer) resultMap.get(key);   //从结果集中获取结果
            }
            if (i != null) {
                re += i;        //最终结果
            }
            if (key != null) {
                resultMap.remove(key);      //移除已经被计算过的项
            }
        }
        System.out.println("result: " + re);
    }

}

运行结果:

result: 5050

在应用层代码中,创建了5个Worker工作进程和Worker工作实例PlusWorker。在提交了100个子任务后,便开始子任务的计算。这些子任务中,由生成的5个Worker进程共同完成。Master并不等待所有的Worker执行完毕,就开始访问子结果集进行最终结果的计算,直到子结果集中所有的数据都被处理,并且5个活跃的Worker进程全部终止,才给出最终计算结果。

Master-Worker模式是一种将串行任务并行化的方法,被分解的子任务在系统中可以被并行处理。同时,如果有需要,Master进程不需要等待所有子任务都完成计算,就可以根据已有的部分结果集计算最终结果。

3.Amino框架提供的Master-Worker模式

在Amino框架中为Master-Worker模式提供了较为完善的实现和便捷的操作接口。Amino实现了两套Master-Worker实现:一种是静态的Master-Worker实现,另一种是动态实现。

静态实现不允许在任务开始时添加新的子任务,而动态的Master-Worker允许在任务执行过程中,由Master或Worker添加新的子任务。

在Amino框架中,MasterWorkerFactory.newStatic(new Pow3(),20)用于创建静态的Master-Worker模式,

第二个参数为Worker线程数,第一个参数为执行的任务类,该类需实现Doable<Integer,Integer>接口,该接口泛型的第一个类型为任务方法的参数类型,第二个类型为方法返回类型。MasterWorkerFactory.newDynamic(new Pow3Dyn())用于创建动态的Master-Worker模式,其中参数为实现DynamicWorker接口的实例。

submit()方法用于提交应用层任务,execute()方法将执行所有任务。

Amino框架需要自行下载,下载地址:https://sourceforge.net/projects/amino-cbbs/files/cbbs/0.5.3/,找到cbbs-java-bin-0.5.3.tar.gz 下载即可。

下面用Amino框架演示1+2+..+100的完整示例。

public class Pow3 implements Doable<Integer,Integer> {
    @Override
    public Integer run(Integer input) {
        //业务逻辑
        return input;
    }
}
public class Pow3Dyn implements DynamicWorker<Integer,Integer> {
    @Override
    public Integer run(Integer integer, WorkQueue<Integer> workQueue) {
        //业务逻辑
        return integer;
    }
}
public class AminoDemo {

    /
     * Amino 框架提供开箱即用的Master-Worker模式
     * 其它用法参考API文档
     */
    public static void main(String[] args) {
        new AminoDemo().testDynamic();
        new AminoDemo().testStatic();
    }

    /
     * 静态模式,不允许在任务开始后添加新的任务
     */
    public void testStatic(){
        MasterWorker<Integer,Integer> mw = MasterWorkerFactory.newStatic(new Pow3(),20);//静态模式,可指定线程数
        List<MasterWorker.ResultKey> keyList = new Vector<>();
        for (int i = 1; i <= 100; i++) {
            keyList.add(mw.submit(i)); //传参并调度任务,key用于取得任务结果
        }
        mw.execute();//执行所有任务
        int re = 0;
        while (keyList.size()> 0){ //不等待全部执行完成,就开始求和
            MasterWorker.ResultKey k = keyList.get(0);
            Integer i = mw.result(k); //由Key取得一个任务结果
            if (i!=null){
                re+=i;
                keyList.remove(0); //累加完成后
            }
        }
        System.out.println("result:"+re);
        mw.shutdown();//关闭master-worker,释放资源
    }

    /
     * 动态模式,可在开始执行任务后继续添加任务
     */
    public void testDynamic(){
        MasterWorker<Integer,Integer> mw = MasterWorkerFactory.newDynamic(new Pow3Dyn());//动态模式,可指定线程数
        List<MasterWorker.ResultKey> keyList = new Vector<>();
        for (int i = 1; i < 50; i++) {
            keyList.add(mw.submit(i)); //传参并调度任务,key用于取得任务结果
        }
        mw.execute();
        for (int i = 50; i <= 100; i++) {
            keyList.add(mw.submit(i)); //传参并调度任务,key用于取得任务结果
        }
        int re = 0;
        while (keyList.size()> 0){
            MasterWorker.ResultKey k = keyList.get(0);
            Integer i = mw.result(k); //由Key取得一个任务结果
            if (i!=null){
                re+=i;
                keyList.remove(0); //累加完成后
            }
        }
        System.out.println("result:"+re);
        mw.shutdown();
    }

}

运行结果:

result:5050
result:5050

MasterWorker类的方法摘要,其它请自行下载API文档。cbbs-java-apidocs-0.5.3.tar.gz

方法摘要

boolean

execute() Begin processing of the work items submitted.

boolean

execute(long timeout, java.util.concurrent.TimeUnit unit) Begin processing of the work items submitted.

void

finished() Indicate to the master/worker that there is not more work coming.

java.util.Collection<T>

getAllResults() Obtain all of the results from the processing work items.

boolean

isCompleted() Poll an executing master/worker for completion.

boolean

isStatic() Determine if a master/worker is static.

int

numWorkers() Get the number of active workers.

T

result(MasterWorker.ResultKey k) Obtain the results from the processing of a work item.

void

shutdown() Shutdown the master/worker.

MasterWorker.ResultKey

submit(S w) Submit a work item for processing.

MasterWorker.ResultKey

submit(S w, long timeout, java.util.concurrent.TimeUnit unit) Submit a work item for processing and block until it is either submitted successfully or the specified timeout period has expired.

boolean

waitForCompletion() Wait until all workers have completed.

boolean

waitForCompletion(long timeout, java.util.concurrent.TimeUnit unit) Wait until all workers have completed or the specified timeout period expires.

参考

《Java程序性能优化》葛一鸣著

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏积累沉淀

Java批处理

批处理 JDBC对批处理的操作,首先简单说一下JDBC操作sql语句的简单机制。 JDBC执行数据库操作语句,首先需要将sql语句打包成为网络字...

4445
来自专栏开发与安全

muduo网络库学习之Exception类、Thread 类封装中的知识点(重点讲pthread_atfork())

一、Exception类封装 class Exception : public std::exception ?  #include <execinfo.h>...

2380
来自专栏何俊林

阿里、华为、腾讯Java技术面试题精选

1695
来自专栏Python自动化测试

测试驱动之xml文件的处理

Xml是可扩展标记语言,关于xml的技术本人这里不在介绍,感兴趣的同学可以去w3c看看详细的资料,这里,我仅仅介绍的是如何获取xml文档结构中的...

1423
来自专栏Ryan Miao

java并发编程实战学习(3)--基础构建模块

转自:java并发编程实战 5.3阻塞队列和生产者-消费者模式 BlockingQueue阻塞队列提供可阻塞的put和take方法,以及支持定时的offer和p...

3127
来自专栏王清培的专栏

log4net 自定义Layout日志字段

最近在使用log4net的时候有一个简单的需求,就是自定义个格式化输出符。这个输出符是专门用来帮我记录下业务ID、业务类型的。比如,“businessID:32...

850
来自专栏智能大石头

实体处理模块IEntityModule

在2015年7月16日,XCode新增了实体处理模块IEntityModule,用于拦截实体对象添删改操作。

1160
来自专栏JackieZheng

RabbitMQ入门-Routing直连模式

Hello World模式,告诉我们如何一对一发送和接收消息; Work模式,告诉我们如何多管齐下高效的消费消息; Publish/Subscribe模式,告...

27310
来自专栏Android相关

处理器结构--ReorderBuffer

Reorder Buffer用来保存在乱序执行之前的(OOOE)指令执行顺序,当指令集合在乱序执行后按照原有指令顺序将结果提交。

1384
来自专栏Java编程技术

使用数据库悲观锁实现不可重入的分布式锁

在同一个jvm进程中时,可以使用JUC提供的一些锁来解决多个线程竞争同一个共享资源时候的线程安全问题,但是当多个不同机器上的不同jvm进程共同竞争同一个共享资源...

751

扫码关注云+社区

领取腾讯云代金券