前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >模仿Tomcat的BIO,NIO线程模型 顶

模仿Tomcat的BIO,NIO线程模型 顶

作者头像
算法之名
发布2019-08-20 11:10:24
2870
发布2019-08-20 11:10:24
举报
文章被收录于专栏:算法之名算法之名

模仿Tomcat的BIO模型,来一个消息,分配一个线程处理.

则主线程池代码如下

代码语言:javascript
复制
package com.guanjian;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * Created by Administrator on 2018/7/10.
 */
public class ThreadPool {
    private ExecutorService service;
    private List<MessageTask> tasks;
    private int fixedThreadNum = 0;
    private List<String> messages;
    private MessageHandler messageHandler;
    public ThreadPool(int fixedThreadNum,List<String> messages,MessageHandler messageHandler) {
        this.fixedThreadNum = fixedThreadNum;
        this.messages = messages;
        this.messageHandler = messageHandler;
        service = Executors.newFixedThreadPool(fixedThreadNum);
        Runtime.getRuntime().addShutdownHook(new Thread() {
            public void run() {
                shutdownGracefully(service);
            }
        });
    }
    public void shutdownGracefully(ExecutorService ThreadPool) {
        ShutdownPool.shutdownThreadPool(ThreadPool, "main-pool");
    }

    public void startup() {
        tasks = new ArrayList<>();
        MessageTask messageTask = (fixedThreadNum == 0 ? new SequentialMessageTask(messageHandler,messages) : new ConcurrentMessageTask(messageHandler,messages));
        for (String message:messages) {
            tasks.add(messageTask);
            service.execute(messageTask);
        }
    }

}

它是通过线程数fixedThreadNum来区分使用哪种线程模型.

代码语言:javascript
复制
package com.guanjian;

/**
 * Created by Administrator on 2018/7/10.
 */
public interface MessageHandler {
    public void execute(String message);
}
代码语言:javascript
复制
package com.guanjian;

/**
 * Created by Administrator on 2018/7/10.
 */
public class MessageHandlerImpl implements MessageHandler {
    @Override
    public void execute(String message) {
        System.out.println(message);
    }
}

以上是消息处理器的接口和实现类

代码语言:javascript
复制
package com.guanjian;

import java.util.List;

/**
 * Created by Administrator on 2018/7/10.
 */
public abstract class MessageTask implements Runnable {
    protected MessageHandler messageHandler;
    protected  List<String> messages;

    MessageTask(MessageHandler messageHandler,List<String> messages) {
        this.messageHandler = messageHandler;
        this.messages = messages;
    }
    @Override
    public void run() {
        for (String message:messages) {
            handlerMessage(message);
        }
    }
    protected abstract void handlerMessage(String message);
}

消息任务抽象类实现了Runnable线程接口,以不同的子类来实现BIO,NIO线程模型,具体在抽象方法handlerMessage中实现.

代码语言:javascript
复制
package com.guanjian;

import java.util.List;

/**
 * Created by Administrator on 2018/7/10.
 */
public class SequentialMessageTask extends MessageTask {
    SequentialMessageTask(MessageHandler messageHandler, List<String> messages) {
        super(messageHandler, messages);
    }

    @Override
    protected void handlerMessage(String message) {
        messageHandler.execute(message);
    }
}

BIO线程模型子类,通过主线程池来分配线程处理.

代码语言:javascript
复制
package com.guanjian;

import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * Created by Administrator on 2018/7/10.
 */
public class ConcurrentMessageTask extends MessageTask {
    private ExecutorService asyncService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2);
    ConcurrentMessageTask(MessageHandler messageHandler, List<String> messages) {
        super(messageHandler, messages);
    }

    @Override
    protected void handlerMessage(String message) {
        asyncService.submit(new Runnable() {
            @Override
            public void run() {
                messageHandler.execute(message);
            }
        });
    }
    protected void shutdown() {
        ShutdownPool.shutdownThreadPool(asyncService,"async-pool-" + Thread.currentThread().getId());
    }
}

NIO线程模型,不再使用主线程池来分配线程,而是异步线程池,类比于Netty中的Worker线程池,从BOSS线程池中接管消息处理.

代码语言:javascript
复制
package com.guanjian;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;

/**
 * Created by Administrator on 2018/7/10.
 */
public class ShutdownPool {
    private static Logger log = LoggerFactory.getLogger(ThreadPool.class);
    /**
     * 优雅关闭线程池
     * @param threadPool
     * @param alias
     */
    public static void shutdownThreadPool(ExecutorService threadPool, String alias) {
        log.info("Start to shutdown the thead pool: {}", alias);

        threadPool.shutdown(); // 使新任务无法提交.
        try {
            // 等待未完成任务结束
            if (!threadPool.awaitTermination(60, TimeUnit.SECONDS)) {
                threadPool.shutdownNow(); // 取消当前执行的任务
                log.warn("Interrupt the worker, which may cause some task inconsistent. Please check the biz logs.");

                // 等待任务取消的响应
                if (!threadPool.awaitTermination(60, TimeUnit.SECONDS))
                    log.error("Thread pool can't be shutdown even with interrupting worker threads, which may cause some task inconsistent. Please check the biz logs.");
            }
        } catch (InterruptedException ie) {
            // 重新取消当前线程进行中断
            threadPool.shutdownNow();
            log.error("The current server thread is interrupted when it is trying to stop the worker threads. This may leave an inconcistent state. Please check the biz logs.");

            // 保留中断状态
            Thread.currentThread().interrupt();
        }

        log.info("Finally shutdown the thead pool: {}", alias);
    }
}

最后是线程池的优雅关闭,无论是主线程池还是异步线程池皆调用该方法实现优雅关闭.

以上只是模型代码,具体可替换成具体需要的业务代码来达到业务性能的提升.

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档