Netty框架研究

起因

以前也用Netty做到异步网络编程,用过之后也一直没想过要把Netty拿起来重新研究一翻,直到上周工作中遇到一个棘手的问题。 在我们的项目中基于netty-socketio,我们实现了一个基于WebSocket的浏览器与服务端的请求回应机制。

这里贴一下该机制的大概的代码逻辑,真实项目比这复杂得多。

import com.corundumstudio.socketio.AckRequest;
import com.corundumstudio.socketio.Configuration;
import com.corundumstudio.socketio.SocketConfig;
import com.corundumstudio.socketio.SocketIOClient;
import com.corundumstudio.socketio.annotation.OnConnect;
import com.corundumstudio.socketio.annotation.OnEvent;
import com.corundumstudio.socketio.listener.ConnectListener;
import com.corundumstudio.socketio.listener.DataListener;
import com.corundumstudio.socketio.listener.DisconnectListener;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;

import java.util.concurrent.TimeUnit;

/**
 * Created by jeremy on 16/6/18.
 */
public class SocketIOServer {
    public static void main(String[] args) throws InterruptedException {
        SocketIOServer server = new SocketIOServer();
        server.start();

        TimeUnit.SECONDS.sleep(Long.MAX_VALUE);
    }

    private void start() {
        SocketConfig socketConfig = new SocketConfig();
        socketConfig.setReuseAddress(true);
        Configuration config = new Configuration();
        config.setHostname("0.0.0.0");
        config.setPort(8888);
        config.setSocketConfig(socketConfig);
        config.setContext("/socketio");

        com.corundumstudio.socketio.SocketIOServer server = new com.corundumstudio.socketio.SocketIOServer(config);

        server.addListeners(new RequestHandler());

        server.startAsync().addListener(new GenericFutureListener<Future<? super Void>>() {
            public void operationComplete(Future<? super Void> future) throws Exception {
                if(future.isSuccess()){
                    System.out.println("socketio server started.");
                }
            }
        });
    }

    private class RequestHandler implements ConnectListener, DataListener<String>, DisconnectListener{
        public void onConnect(SocketIOClient client) {
            System.out.println("client is connected");
        }

        public void onData(SocketIOClient client, String data, AckRequest ackSender) throws Exception {
            System.out.println("receive data");
            //这里开始解析请求的数据,然后调用后台Controller层相关接口处理请求,处理完毕之后,向客户端返回响应结果
            TimeUnit.SECONDS.sleep(20); //这里用20秒来模拟长时间的耗时操作
            client.sendEvent("data", "data response");
        }

        public void onDisconnect(SocketIOClient client) {
            System.out.println("client is disconnected");
        }
    }
}

最开始功能一切正常,但最近经过压力测试发现,当请求压力上来之后,很多请求在规则的响应时间内都出现超时失败了。

分析原因后发现有问题的关键代码如下

System.out.println("receive data");
//这里开始解析请求的数据,然后调用后台Controller层相关接口处理请求,处理完毕之后,向客户端返回响应结果
TimeUnit.SECONDS.sleep(20); //这里用20秒来模拟长时间的耗时操作
client.sendEvent("data", "data response");

这里实际上是直接使用Netty的NIO线程进行,所以如果业务处理很耗时的话,所以NIO线程都在阻塞等待业务处理完成,这个时候其它请求的IO操作就得不到及时处理。

解决这个问题先

处理这个问题也比较简单,就是将业务的处理放到业务线程池里处理。如下面的代码

private final ExecutorService businessExecutor = Executors.newFixedThreadPool(10);

private class RequestHandler implements ConnectListener, DataListener<String>, DisconnectListener{
    public void onConnect(SocketIOClient client) {
        System.out.println("client is connected");
    }

    public void onData(SocketIOClient client, String data, AckRequest ackSender) throws Exception {
        businessExecutor.submit(new ProcessRequestTask(client, data));
    }

    public void onDisconnect(SocketIOClient client) {
        System.out.println("client is disconnected");
    }
}

private class ProcessRequestTask implements Runnable {
    private final String data;
    private final SocketIOClient client;

    public ProcessRequestTask(SocketIOClient client, String data) {
        this.client = client;
        this.data = data;
    }

    public void run() {
        try {
            System.out.println("receive data");
            //这里开始解析请求的数据,然后调用后台Controller层相关接口处理请求,处理完毕之后,向客户端返回响应结果
            TimeUnit.SECONDS.sleep(20); //这里用20秒来模拟长时间的耗时操作
            client.sendEvent("data", "data response");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

这样Netty的NIO线程就不会由于业务逻辑的处理而阻塞了。

Netty的线程模型

就着这个问题重新复习一下Netty的线程模型

总的来说Netty采用的是Reactor主从多线程模型,工作原理是服务端用于接收客户端连接的是一个独立的NIO线程池。Acceptor接收到客户端TCP连接请求处理完成后(可能包含接入认证等),将新创建的SocketChannel注册到IO线程池(sub reactor线程池)的某个IO线程上,由它负责SocketChannel的读写和编解码工作。Acceptor线程池仅仅只用于对接收到的连接作有限的处理,处理完毕后如果链路建立成功,就将链路注册到后端subReactor线程池的IO线程上,由IO线程负责后续的IO操作。

它的线程模型如下图所示:

netty线程模型

这里结合代码解释一下这里说到的概念

先看一下普通的服务端启动代码

ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
        .option(ChannelOption.SO_REUSEADDR, true)
        .childOption(ChannelOption.TCP_NODELAY, true)
        .handler(new CheckAcceptHandler())
        .childHandler(new ChannelInitializer<SocketChannel>() {
            @Override
            public void initChannel(SocketChannel ch)
                    throws Exception {
                ch.pipeline().addLast("decoder", new LineBasedFrameDecoder(4096));
                ch.pipeline().addLast("stringDecoder", new StringDecoder(StandardCharsets.UTF_8));
                ch.pipeline().addLast("lineEncoder", new LineEncoder(StandardCharsets.UTF_8));
                ch.pipeline().addLast("logicHandler", new ServerLogicalHandler());
            }
        });

final int listenPort = 8888;

b.bind(listenPort).sync();

这里bossGroup就是上面说到的负责接收客户端连接的线程池,workGroup就是subReactor线程池,handler(new ChannelInitializer(){...})就是在指定masterReactor线程池里对接收的连接如何处理的逻辑,childHandler(new ChannelInitializer(){...})就是在指定subReactor线程池里对连接后续的IO操作如何处理的逻辑。workGroup的类型是NioEventLoopGroup,而一个NioEventLoopGroup其实是nNioEventLoop的组合,经过大量经验测算,这个n设置为CPU核心数的两倍整体IO效率较高,所以在Netty里这个n默认就是CPU核心数的两倍。NioEventLoopGroupNioEventLoop的数目是有限,而Netty本身同时要处理成千上万连接的IO操作。很容易得出结论在Netty里一个NioEventLoop是同时处理多个连接的IO操作的。

一个NioEventLoop聚合了一个多路复用器Selector,因此可以处理成百上千的客户端连接,Netty的处理策略是每当有一个新的客户端接入,则从NioEventLoop线程组中顺序获取一个可用的NioEventLoop,当到达数组上限之后,重新返回到0,通过这种方式,可以基本保证各个NioEventLoop的负载均衡。一个客户端连接只注册到一个NioEventLoop上,这样就避免了多个IO线程去并发操作它。

Netty通过串行化设计理念降低了用户的开发难度,提升了处理性能。利用线程组实现了多个串行化线程水平并行执行,线程之间并没有交集,这样既可以充分利用多核提升并行处理能力,同时避免了线程上下文的切换和并发保护带来的额外性能损耗。

业务操作耗费太多CPU怎么办

Netty本身是通过串行化设计理念来处理万千上万连接的IO操作的。这里就带来一个问题,有一些业务操作就是要耗费太多CPU时间,如何保证处理这样的业务操作时不阻塞Netty的NIO线程?

Netty本身提供了EventExecutorGroup作为业务操作线程池,使用示例如下:

private final EventExecutorGroup businessGroup = new DefaultEventExecutorGroup(4);

private final EventLoopGroup bossGroup = new NioEventLoopGroup();
private final EventLoopGroup workerGroup = new NioEventLoopGroup();

private void start() throws InterruptedException {

    ServerBootstrap b = new ServerBootstrap();
    b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
            .option(ChannelOption.SO_REUSEADDR, true)
            .childOption(ChannelOption.TCP_NODELAY, true)
            .handler(new CheckAcceptHandler())
            .childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch)
                        throws Exception {
                    ch.pipeline().addLast("decoder", new LineBasedFrameDecoder(4096));
                    ch.pipeline().addLast("stringDecoder", new StringDecoder(StandardCharsets.UTF_8));
                    ch.pipeline().addLast("lineEncoder", new LineEncoder(StandardCharsets.UTF_8));
                    ch.pipeline().addLast(businessGroup, "logicHandler", new ServerLogicalHandler());
                }
            });

    final int listenPort = 8888;

    b.bind(listenPort).sync();
}

可以看到ChannelPipelineadd相关方法均提供了一个重载方法,用来指定Handler将在哪个线程池里运行,如果不指定默认就是在Netty的NIO线程池里。

问题又来了,前面说过Netty是采用串行化设计理念处理连接上的IO操作的,这样做可以避免线程竞争,线程上下文切换带来的额外性能损耗。那如果NIO线程与业务线程同时往channel里写数据,这个不是破坏了Netty的串行化理念。刚开始我也这么想,可后来看到Netty底层的write方法才发现并不存在这个问题。

private void write(Object msg, boolean flush, ChannelPromise promise) {
    AbstractChannelHandlerContext next = findContextOutbound();
    final Object m = pipeline.touch(msg, next);
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        if (flush) {
            next.invokeWriteAndFlush(m, promise);
        } else {
            next.invokeWrite(m, promise);
        }
    } else {
        AbstractWriteTask task;
        if (flush) {
            task = WriteAndFlushTask.newInstance(next, m, promise);
        }  else {
            task = WriteTask.newInstance(next, m, promise);
        }
        safeExecute(executor, task, promise, m);
    }
}

这里会判断当前EventExecutor是否是NIO线程,如果是则执行Write操作,否则仅仅是创建一个异步Task,执行该异步Task,该异步Task会到该连接对应的NIO线程里去执行,最终保证还是一个NIO线程串行化地处理该连接上的IO操作。

当然如果不想用EventExecutorGroup,也可以像最上面的示例一样,将长时间的任务丢进自己的业务线程池里处理。

Netty源码研究

花了几天时间结合李林峰的帖子自己翻阅Netty的源码,有了不少收获。Netty框架的源代码解读,李林峰的帖子已经把比较重要写了一遍,这里就不赘述了。这里将我发现的几处亮点记录下来以备忘。

  • MultithreadEventExecutorGroup是一个多线程的线程池,它提供了一个next方法供外部获取一个EventExecutor来提交Task。而为了MultithreadEventExecutorGroup内部的EventExecutor能比较均衡地干活,每次调用next方法实际上是使用EventExecutorChooser帮助选择一个EventExecutor的,下面是EventExecutorChooser的实现代码
public EventExecutorChooser newChooser(EventExecutor[] executors) {
    if (isPowerOfTwo(executors.length)) {
        return new PowerOfTowEventExecutorChooser(executors);
    } else {
        return new GenericEventExecutorChooser(executors);
    }
}

private static boolean isPowerOfTwo(int val) {
    return (val & -val) == val;
}

private static final class PowerOfTowEventExecutorChooser implements EventExecutorChooser {
    private final AtomicInteger idx = new AtomicInteger();
    private final EventExecutor[] executors;

    PowerOfTowEventExecutorChooser(EventExecutor[] executors) {
        this.executors = executors;
    }

    @Override
    public EventExecutor next() {
        return executors[idx.getAndIncrement() & executors.length - 1];
    }
}

private static final class GenericEventExecutorChooser implements EventExecutorChooser {
    private final AtomicInteger idx = new AtomicInteger();
    private final EventExecutor[] executors;

    GenericEventExecutorChooser(EventExecutor[] executors) {
        this.executors = executors;
    }

    @Override
    public EventExecutor next() {
        return executors[Math.abs(idx.getAndIncrement() % executors.length)];
    }
}

这里针对executors的数目是2的倍数的情况,采用了位操作替换了普通的取模操作。

  • Netty内部实现中有很多实例对象内部的状态变更比较频繁,而且这些变更也有可能是由多个线程造成的,为了最小化多线程修改状态所生产的性能开销,同时保证线程安全,采用了大量非阻塞同步的CAS指令实现,这种乐观锁的策略比互斥同步的悲观锁性能高不少。如下面的代码
public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor {
    ...
    private volatile int state = ST_NOT_STARTED;
    ...
    private static final AtomicIntegerFieldUpdater<SingleThreadEventExecutor> STATE_UPDATER;
    static{
        AtomicIntegerFieldUpdater<SingleThreadEventExecutor> updater =
                PlatformDependent.newAtomicIntegerFieldUpdater(SingleThreadEventExecutor.class, "state");
        if (updater == null) {
            updater = AtomicIntegerFieldUpdater.newUpdater(SingleThreadEventExecutor.class, "state");
        }
        STATE_UPDATER = updater;
    }

    ...
    public boolean isShuttingDown() {
        return STATE_UPDATER.get(this) >= ST_SHUTTING_DOWN;
    }
    ...
    private void startThread() {
        if (STATE_UPDATER.get(this) == ST_NOT_STARTED) {
            if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
                doStartThread();
            }
        }
    }
    ...
}

  • NioEventLoop线程除了对多个连接的IO操作进行处理外,本身还需要处理一些定时任务,因而会有以下的代码
protected void run() {
    for (;;) {
        ...
        final int ioRatio = this.ioRatio;
            if (ioRatio == 100) {
                processSelectedKeys();
                runAllTasks();
            } else {
                final long ioStartTime = System.nanoTime();

                processSelectedKeys();

                final long ioTime = System.nanoTime() - ioStartTime;
                runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
            }
        ...
    }
}

/**
 * Poll all tasks from the task queue and run them via {@link Runnable#run()} method.  This method stops running
 * the tasks in the task queue and returns if it ran longer than {@code timeoutNanos}.
 */
protected boolean runAllTasks(long timeoutNanos) {
    fetchFromScheduledTaskQueue();
    Runnable task = pollTask();
    if (task == null) {
        return false;
    }

    final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
    long runTasks = 0;
    long lastExecutionTime;
    for (;;) {
        try {
            task.run();
        } catch (Throwable t) {
            logger.warn("A task raised an exception.", t);
        }

        runTasks ++;

        // Check timeout every 64 tasks because nanoTime() is relatively expensive.
        // XXX: Hard-coded value - will make it configurable if it is really a problem.
        if ((runTasks & 0x3F) == 0) {
            lastExecutionTime = ScheduledFutureTask.nanoTime();
            if (lastExecutionTime >= deadline) {
                break;
            }
        }

        task = pollTask();
        if (task == null) {
            lastExecutionTime = ScheduledFutureTask.nanoTime();
            break;
        }
    }

    this.lastExecutionTime = lastExecutionTime;
    return true;
}

run方法里那段代码逻辑很简单,就是处理一些SelectionKey后,按比例抽出一些CPU时间进行队列中任务的处理。runAllTasks方法里从队列里取出任务处理,而且为了在指定的时间内跳出处理任务的逻辑,而且考虑到ScheduledFutureTask.nanoTime()方法比较耗时,每处理64个任务后检查一次,这里为了数字比较高效,又采用了位操作if ((runTasks & 0x3F) == 0)

  • NioEventLoop的run方法里每次loop都需要处理一些SelectionKey,正常办法是使用selector.selectedKeys()拿到SelectionKeySet,然后依次处理就好了,但Netty为了高效是拿到SelectionKey的数组进行处理的
 ...
    private SelectedSelectionKeySet selectedKeys;
    ...

            try {
            SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();

            Class<?> selectorImplClass =
                    Class.forName("sun.nio.ch.SelectorImpl", false, PlatformDependent.getSystemClassLoader());

            // Ensure the current selector implementation is what we can instrument.
            if (!selectorImplClass.isAssignableFrom(selector.getClass())) {
                return selector;
            }

            Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
            Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");

            selectedKeysField.setAccessible(true);
            publicSelectedKeysField.setAccessible(true);

            selectedKeysField.set(selector, selectedKeySet);
            publicSelectedKeysField.set(selector, selectedKeySet);

            selectedKeys = selectedKeySet;
            logger.trace("Instrumented an optimized java.util.Set into: {}", selector);
        } catch (Throwable t) {
            selectedKeys = null;
            logger.trace("Failed to instrument an optimized java.util.Set into: {}", selector, t);
        }

    private void processSelectedKeys() {
        if (selectedKeys != null) {
            processSelectedKeysOptimized(selectedKeys.flip());
        } else {
            processSelectedKeysPlain(selector.selectedKeys());
        }
    }

    package io.netty.channel.nio;

import java.nio.channels.SelectionKey;
import java.util.AbstractSet;
import java.util.Iterator;

final class SelectedSelectionKeySet extends AbstractSet<SelectionKey> {

    private SelectionKey[] keysA;
    private int keysASize;
    private SelectionKey[] keysB;
    private int keysBSize;
    private boolean isA = true;

    SelectedSelectionKeySet() {
        keysA = new SelectionKey[1024];
        keysB = keysA.clone();
    }

    @Override
    public boolean add(SelectionKey o) {
        if (o == null) {
            return false;
        }

        if (isA) {
            int size = keysASize;
            keysA[size ++] = o;
            keysASize = size;
            if (size == keysA.length) {
                doubleCapacityA();
            }
        } else {
            int size = keysBSize;
            keysB[size ++] = o;
            keysBSize = size;
            if (size == keysB.length) {
                doubleCapacityB();
            }
        }

        return true;
    }

    private void doubleCapacityA() {
        SelectionKey[] newKeysA = new SelectionKey[keysA.length << 1];
        System.arraycopy(keysA, 0, newKeysA, 0, keysASize);
        keysA = newKeysA;
    }

    private void doubleCapacityB() {
        SelectionKey[] newKeysB = new SelectionKey[keysB.length << 1];
        System.arraycopy(keysB, 0, newKeysB, 0, keysBSize);
        keysB = newKeysB;
    }

    SelectionKey[] flip() {
        if (isA) {
            isA = false;
            keysA[keysASize] = null;
            keysBSize = 0;
            return keysA;
        } else {
            isA = true;
            keysB[keysBSize] = null;
            keysASize = 0;
            return keysB;
        }
    }

    @Override
    public int size() {
        if (isA) {
            return keysASize;
        } else {
            return keysBSize;
        }
    }

    @Override
    public boolean remove(Object o) {
        return false;
    }

    @Override
    public boolean contains(Object o) {
        return false;
    }

    @Override
    public Iterator<SelectionKey> iterator() {
        throw new UnsupportedOperationException();
    }
}

看到这里我都惊了,看来Netty团队开发人员连数组Set两者之间遍历的性能差别都注意到了。

使用Netty进行网络编程注意事项

读了下述的帖子后,发现使用Netty进行网络编程有不少注意事项,这里列举出来以备忘。

  • 操作系统的最大句柄数修改
vim /etc/security/limits.conf
*  soft  nofile  1000000
*  hard  nofile  1000000

  • 尽量不要在Netty的I/O线程上处理业务,业务处理就扔到业务线程池里
  • IdleStateHandler,ReadTimeoutHandler,WriteTimeoutHandler处理时延要可控,防止时延不可控导致的NioEventLoop被意外阻塞。
  • 合理的心跳周期,具体的心跳周期并没有统一的标准,180S也许是个不错的选择
  • 合理设置接收和发送缓冲区容量,用好AdaptiveRecvByteBufAllocator
  • 使用Netty的内存池,同时注意完成ByteBuf的解码工作之后必须显式的调用ReferenceCountUtil.release(msg)对接收缓冲区ByteBuf进行内存释放

1

childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)

  • NIO线程里尽量异步打日志
  • 调整TCP层面的接收和发送缓冲区大小设置,在Netty中分别对应ChannelOption的SO_SNDBUF和SO_RCVBUF,通常建议值为128K或者256K
  • 对于时延敏感的应用场景需关闭SO_TCPNODELAY
  • Linux内核版本如果大于2.6.35,可考虑开启RPS以实现软中断均衡在多个cpu上,提升网络并行处理性能
  • 尽量使用“零拷贝”
  • 采用高性能的序列化框架,比如Protobuf、thrift
  • 正确地使用互斥同步
    • 始终使用wait循环来调用wait方法,永远不要在循环之外调用wait方法。原因是尽管条件并不满足被唤醒条件,但是由于其它线程意外调用notifyAll()方法会导致被阻塞线程意外唤醒,此时执行条件并不满足,它将破坏被锁保护的约定关系,导致约束失效,引起意想不到的结果;
    • 唤醒线程,应该使用notify还是notifyAll,当你不知道究竟该调用哪个方法时,保守的做法是调用notifyAll唤醒所有等待的线程。从优化的角度看,如果处于等待的所有线程都在等待同一个条件,而每次只有一个线程可以从这个条件中被唤醒,那么就应该选择调用notify
  • 如果一个变量虽然被多线程访问,但只是一个线程写、其它线程读,可考虑只用volatile关键字来保证线程安全

总结

通过这几天阅读帖子及Netty的源码,对Netty框架的理解深入了许多,现在回头一想以前使用Netty做的网络程序还有不少优化空间。另外这里十分感谢李林峰写的一系列Netty相关的帖子,质量相当高。

参考

http://www.infoq.com/cn/articles/netty-threading-model

http://www.infoq.com/cn/articles/netty-server-create

http://www.infoq.com/cn/articles/netty-concurrent-programming-analysis

http://www.infoq.com/cn/articles/the-multithreading-of-netty-cases

http://www.infoq.com/cn/articles/the-multithreading-of-netty-cases-part02

http://www.infoq.com/cn/articles/netty-elegant-exit-mechanism-and-principles

http://www.infoq.com/cn/articles/netty-high-performance

http://www.infoq.com/cn/articles/netty-million-level-push-service-design-points

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • JVM的Finalization Delay引起的OOM

    jeremyxu
  • 快速创建KVM虚拟机

    jeremyxu
  • Java开发小技巧_02

    jeremyxu
  • 多线程真的会使用CPU所有的核吗?

    原文链接:http://www.jianshu.com/p/352caffd6366

    业余草
  • 多线程真的会使用CPU所有的内核吗?

    学习多线程的时候,我们都知道如果多个线程分配到CPU多个内核是可以并发的执行。但真的是这样的吗?

    用户5224393
  • 把cef3集成到MFC中的一些坑

    cef3变化太大了。要注意版本。笔者所用版本为:cef_binary_3.3626.1895.g7001d56_windows32 版本。在将cef3集成到MF...

    xiny120
  • SpringBoot2集成redis,使用lettuce客户端

    Springboot集成redis大家都会用,主要就是使用RedisTemplate类来进行各种操作。可能很多人并没有注意,在Springboot2以后,底层访...

    天涯泪小武
  • 探索 JUC 之美---Future 与 FutureTaskFuture介绍FutureTask数据结构及核心参数源码解析小结

    JavaEdge
  • Dav1d移动端解码高清视频达30FPS

    LiveVideoStack正在招募编辑/记者/运营,与全球顶尖多媒及技术专家和LiveVideoStack年轻的伙伴一起,推动多媒体技术生态发展。了解岗位信息...

    LiveVideoStack
  • ThreadPoolExecutor 线程池的源码解析

      上一篇从整体上介绍了Executor接口,从上一篇我们知道了Executor框架的最顶层实现是ThreadPoolExecutor类,Executors工厂...

    小勇DW3

扫码关注云+社区

领取腾讯云代金券