前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >NIO如何多线程操作 顶

NIO如何多线程操作 顶

作者头像
算法之名
发布2019-08-20 11:03:44
4670
发布2019-08-20 11:03:44
举报
文章被收录于专栏:算法之名算法之名

因为NIO本身是非阻塞的,所以他的消息选择器Selector可以在单线程下连接多台客户端的访问。

为了加强NIO的性能,我们加入多线程的操作,当然NIO并不能简单的把Selector.select()放入Executor.execute(Runnable)的run方法中。

为完成NIO的多线程,我们应该有一个调度类,一个服务类。

调度类的目的是初始化一定数量的线程,以及线程交接。

代码语言:javascript
复制
package com.netty.nionetty.pool;

import com.netty.nionetty.NioServerBoss;
import com.netty.nionetty.NioServerWorker;

import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * Created by Administrator on 2018-05-17.
 */
public class NioSelectorRunnablePool {
    private final AtomicInteger bossIndex = new AtomicInteger();
    //欢迎线程数组
    private Boss[] bosses;
    private final AtomicInteger workerIndex = new AtomicInteger();
    //工作线程数组
    private Worker[] workers;
    public NioSelectorRunnablePool(Executor boss,Executor worker) {
        initBoss(boss,1);
        initWorker(worker,Runtime.getRuntime().availableProcessors() * 2);
    }
    //初始化1个欢迎线程
    private void initBoss(Executor boss,int count) {
        this.bosses = new NioServerBoss[count];
        for (int i = 0;i < bosses.length;i++) {
            bosses[i] = new NioServerBoss(boss,"boss thread " + (i + 1),this);
        }
    }
    //初始化2倍计算机核数的工作线程
    private void initWorker(Executor worker,int count) {
        this.workers = new NioServerWorker[count];
        for (int i = 0; i < workers.length;i++) {
            workers[i] = new NioServerWorker(worker,"worker thread" + (i + 1),this);
        }
    }
    //交接工作线程(从工作线程数组中挑出)
    public Worker nextWorker() {
        return workers[Math.abs(workerIndex.getAndIncrement() % workers.length)];
    }
    //交接欢迎线程(从欢迎线程数组中挑出)
    public Boss nextBoss() {
        return bosses[Math.abs(bossIndex.getAndIncrement() % bosses.length)];
    }
}

另外带一个欢迎线程接口,一个工作线程接口

代码语言:javascript
复制
package com.netty.nionetty.pool;

import java.nio.channels.ServerSocketChannel;

/**
 * Created by Administrator on 2018-05-17.
 */
public interface Boss {
    void registerAcceptChannelTask(ServerSocketChannel serverChannel);
}
代码语言:javascript
复制
package com.netty.nionetty.pool;

import java.nio.channels.SocketChannel;

/**
 * Created by Administrator on 2018-05-17.
 */
public interface Worker {
    void registerNewChannelTask(SocketChannel channel);
}

有两种线程(欢迎线程和工作线程),所以我们有一个抽象线程类

代码语言:javascript
复制
package com.netty.nionetty;

import com.netty.nionetty.pool.NioSelectorRunnablePool;

import java.io.IOException;
import java.nio.channels.Selector;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;

/**
 * Created by Administrator on 2018-05-17.
 */
public abstract class AbstractNioSelector implements Runnable {
    //线程池
    private final Executor executor;
    //NIO消息选择器
    protected Selector selector;
    protected final AtomicBoolean wakeUp = new AtomicBoolean();
    //线程任务队列
    private final Queue<Runnable> taskQueue = new ConcurrentLinkedQueue<Runnable>();
    //线程名
    private String threadName;
    //线程调度器
    protected NioSelectorRunnablePool selectorRunnablePool;
    AbstractNioSelector(Executor executor,String threadName,NioSelectorRunnablePool selectorRunnablePool) {
        this.executor = executor;
        this.threadName = threadName;
        this.selectorRunnablePool = selectorRunnablePool;
        openSelector();
    }
    private void openSelector() {
        try {
            this.selector = Selector.open(); //打开消息选择器
        } catch (IOException e) {
            e.printStackTrace();
        }
        //把线程放入线程池,开始执行run方法
        executor.execute(this);
    }

    public void run() {
        Thread.currentThread().setName(this.threadName);
        while (true) {
            try {
                wakeUp.set(false);  //把消息选择器的状态定为未唤醒状态
                select(selector);   //消息选择器选择消息方式
                processTaskQueue(); //因为在主程序中绑定端口的时候已经注册了接收通道任务线程,所以这里是读出任务。
                process(selector);  //任务处理,欢迎线程跟工作线程各不相同
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
    //欢迎线程跟工作线程各自添加不同的线程,再把消息选择器唤醒
    protected final void registerTask(Runnable task) {
        taskQueue.add(task);
        Selector selector = this.selector;
        if (selector != null) {
            if (wakeUp.compareAndSet(false,true)) {
                selector.wakeup();
            }
        }else {
            taskQueue.remove(task);
        }
    }
    public NioSelectorRunnablePool getSelectorRunnablePool() {
        return selectorRunnablePool;
    }

    private void processTaskQueue() {
        for (;;) {
            final Runnable task = taskQueue.poll();
            if (task == null) {
                break;
            }
            task.run();
        }
    }
    protected abstract int select(Selector selector) throws IOException;
    protected abstract void process(Selector selector) throws IOException;
}

欢迎线程跟工作线程的具体实现

代码语言:javascript
复制
package com.netty.nionetty;

import com.netty.nionetty.pool.Boss;
import com.netty.nionetty.pool.NioSelectorRunnablePool;
import com.netty.nionetty.pool.Worker;

import java.io.IOException;
import java.nio.channels.*;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.Executor;

/**
 * Created by Administrator on 2018-05-17.
 */
public class NioServerBoss extends AbstractNioSelector implements Boss {
    public NioServerBoss(Executor executor, String threadName, NioSelectorRunnablePool selectorRunnablePool) {
        super(executor,threadName,selectorRunnablePool);
    }
    //注册接收任务,会先调用抽象类,把任务线程先添加到任务队列,再注册接收消息类型
    public void registerAcceptChannelTask(final ServerSocketChannel serverChannel) {
        final Selector selector = this.selector;
        registerTask(new Runnable() {
            public void run() {
                try {
                    serverChannel.register(selector,SelectionKey.OP_ACCEPT);
                } catch (ClosedChannelException e) {
                    e.printStackTrace();
                }
            }
        });
    }

    @Override
    protected int select(Selector selector) throws IOException {
        return selector.select();
    }
    //NIO操作,开始接收,接收后再启用工作线程,接收线程依然存在,而且工作线程也不断给到线程池未使用线程
    //具体看初始化的时候初始了多少工作线程,但是是几个连接对应一个工作线程。
    @Override
    protected void process(Selector selector) throws IOException {
        Set<SelectionKey> selectedKeys = selector.selectedKeys();
        if (selectedKeys.isEmpty()) {
            return;
        }
        for (Iterator<SelectionKey> i = selectedKeys.iterator();i.hasNext();) {
            SelectionKey key = i.next();
            i.remove();
            ServerSocketChannel server = (ServerSocketChannel)key.channel();
            SocketChannel channel = server.accept();
            channel.configureBlocking(false);
            Worker nextWorker = getSelectorRunnablePool().nextWorker();
            nextWorker.registerNewChannelTask(channel);
            System.out.println("新客户端连接");
        }
    }
}
代码语言:javascript
复制
package com.netty.nionetty;

import com.netty.nionetty.pool.NioSelectorRunnablePool;
import com.netty.nionetty.pool.Worker;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.Executor;

/**
 * Created by Administrator on 2018-05-17.
 */
public class NioServerWorker extends AbstractNioSelector implements Worker {
    public NioServerWorker(Executor executor, String threadName, NioSelectorRunnablePool selectorRunnablePool) {
        super(executor,threadName,selectorRunnablePool);
    }

    public void registerNewChannelTask(final SocketChannel channel) {
        final Selector selector = this.selector;
        registerTask(new Runnable() {
            public void run() {
                try {
                    channel.register(selector,SelectionKey.OP_READ);
                } catch (ClosedChannelException e) {
                    e.printStackTrace();
                }
            }
        });
    }

    @Override
    protected int select(Selector selector) throws IOException {
        return selector.select(500);
    }

    @Override
    protected void process(Selector selector) throws IOException {
        Set<SelectionKey> selectedKeys = selector.selectedKeys();
        if (selectedKeys.isEmpty()) {
            return;
        }
        Iterator<SelectionKey> ite = this.selector.selectedKeys().iterator();
        while (ite.hasNext()) {
            SelectionKey key = (SelectionKey)ite.next();
            ite.remove();
            SocketChannel channel = (SocketChannel)key.channel();
            int ret = 0;
            boolean failure = true;
            ByteBuffer buffer = ByteBuffer.allocate(1024);
            try {
                ret = channel.read(buffer);
                failure = false;
            } catch (IOException e) {
                e.printStackTrace();
            }
            if (ret <= 0 || failure) {
                key.cancel();
                System.out.println("客户端断开连接");
            }else {
                System.out.println("收到数据:" + new String(buffer.array()));
                ByteBuffer outBuffer = ByteBuffer.wrap("收到\n".getBytes());
                channel.write(outBuffer);
            }
        }
    }
}

服务类

代码语言:javascript
复制
package com.netty.nionetty;

import com.netty.nionetty.pool.Boss;
import com.netty.nionetty.pool.NioSelectorRunnablePool;

import java.io.IOException;
import java.net.SocketAddress;
import java.nio.channels.ServerSocketChannel;

/**
 * Created by Administrator on 2018-05-17.
 */
public class ServerBootstap {
    private NioSelectorRunnablePool selectorRunnablePool;
    public ServerBootstap(NioSelectorRunnablePool selectorRunnablePool) {
        this.selectorRunnablePool = selectorRunnablePool;
    }
    public void bind(final SocketAddress localAddress) {
        try {
            ServerSocketChannel serverChannel = ServerSocketChannel.open();
            serverChannel.configureBlocking(false);
            serverChannel.socket().bind(localAddress);
            Boss nextBoss = selectorRunnablePool.nextBoss();
            nextBoss.registerAcceptChannelTask(serverChannel);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

}

主程序

代码语言:javascript
复制
package com.netty.nionetty;

import com.netty.nionetty.pool.NioSelectorRunnablePool;

import java.net.InetSocketAddress;
import java.util.concurrent.Executors;

/**
 * Created by Administrator on 2018-05-17.
 */
public class Start {
    public static void main(String[] args) {
        NioSelectorRunnablePool nioSelectorRunnablePool = new NioSelectorRunnablePool(Executors.newCachedThreadPool(),Executors.newCachedThreadPool());
        ServerBootstap bootstrap = new ServerBootstap(nioSelectorRunnablePool);
        bootstrap.bind(new InetSocketAddress(10101));
        System.out.println("start");
    }
}

其实最主要的就是在线程调度器中,各种线程已经被初始化存在于线程池内存中了,所以后面只是把这些线程拿出来,并注册消息类型,进行处理,这就是NIO的多线程处理了。

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档