首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >Netty线程模型与长期运行的taks

Netty线程模型与长期运行的taks
EN

Stack Overflow用户
提问于 2020-12-17 17:15:39
回答 1查看 921关注 0票数 2

更新: OK,因为这是我在这里的第一篇文章,我可能过于冗长地查看我收到的零反馈。因此,我决定添加一个图表,以更好地可视化我的问题。也许有人会看一下(我在看你,诺曼·毛雷尔;o)

其中一个能证实我的发现,并解释我如何防止长时间运行的任务阻止其他短请求?更普遍地说,如果我必须在netty服务器上处理长时间运行的作业,建议的不锁定的解决方案是什么?

在过去的2周里,我一直在学习netty 4/5,我读了很多关于它的线程模型(包括"Netty in Action")的文章,然后我用一个简单的测试应用程序检查了我的理解,我希望看到netty服务器对多个TCP简单并发客户端的响应,其中一些客户机请求服务器上长时间运行的作业。我的结果不像预期的那样。

我的设置:

  • netty 4.1.45
  • windows 10上的jdk 1.8
  • 向服务器打开15个并发连接。
  • 1客户机请求长时间运行作业(模拟为服务器端的睡眠)

我的服务器初始化如下:

代码语言:javascript
运行
复制
NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
NioEventLoopGroup workGroup = new NioEventLoopGroup();

ServerBootstrap bootstrap = new ServerBootstrap();

try {
    bootstrap
        .group(bossGroup, workGroup)
        .channel(NioServerSocketChannel.class)
        .childHandler(new MyChannelInitializer(new DefaultEventExecutorGroup(5)));
            
    ChannelFuture future = bootstrap.bind(new InetSocketAddress(SERVER_PORT));
    future.addListener(new MyChannelFutureListener());

    future.channel().closeFuture().sync();
            
} catch (InterruptedException e) {
    logger.error("Error: ", e);
} finally {
    bossGroup.shutdownGracefully().sync();
}

在通道的初始化器中,我有以下代码:

代码语言:javascript
运行
复制
public class MyChannelInitializer extends ChannelInitializer<Channel> {

    private EventExecutorGroup handlerGroup = null;

    public MyChannelInitializer() {
    }

    public MyChannelInitializer(EventExecutorGroup handlerGroup) {
        this.handlerGroup = handlerGroup;
    }

    @Override
    protected void initChannel(Channel ch) throws Exception {
        ch.pipeline().addLast(new LoggingHandler(LogLevel.INFO));
        ch.pipeline().addLast(handlerGroup, new MyTestInboudHandler());
    }
}

如您所见,我希望在单独的eventExecutionGroup中执行入站处理程序,这样就不会阻塞NIO事件循环。入站处理程序中的逻辑非常简单:

代码语言:javascript
运行
复制
public class MyTestInboudHandler extends SimpleChannelInboundHandler<ByteBuf> {

    private static final Logger logger = LogManager.getLogger((MyTestInboudHandler.class));

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {

        if (msg.isReadable()) {
            String content = msg.toString(CharsetUtil.UTF_8);
            int len = content.length();
            if (len > 120) {
                content = content.substring(0, 119);
            }

            String clientId = content.substring(0, content.indexOf(':'));
            String payload = content.substring(content.indexOf(':') + 1);

            logger.info(ctx.channel().hashCode() + ": received message of length: " + len + " -- " + content + "<<<");

            if (payload.equals("LONG_TASK_REQ")) {
                // simulate load run
                Thread.sleep(50_000L);
                ctx.writeAndFlush(Unpooled.copiedBuffer("DoneBIG", CharsetUtil.UTF_8)).addListener(ChannelFutureListener.CLOSE);
            } else {
                ctx.writeAndFlush(Unpooled.copiedBuffer("Done", CharsetUtil.UTF_8)).addListener(ChannelFutureListener.CLOSE);
            }

        } else {
            logger.info("ByteBuf not readable...");
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {

        logger.error("{} - Exception in input handler: {} ", ctx.channel().hashCode(), cause.getMessage());
        ctx.close();

    }
}

因此,如果我在请求有效负载中找到的客户机id等于13,这意味着需要完成一项漫长的工作(例如,JDBC对数据库的调用),我用50秒的睡眠进行模拟。所有其他请求都可以通过简单地回复“完成”并关闭连接立即处理。

我预期的结果是看到所有客户端立即完成他们的服务器调用,除了客户机13,它需要50秒才能完成。下面是客户端实现,也使用netty:

代码语言:javascript
运行
复制
public class MyNettyClient extends Thread {

    private static final Logger logger = LogManager.getLogger(MyNettyClient.class);

    private int id;

    private MyNettyClient() {
    }

    public MyNettyClient(int id) {
        this();
        this.id = id;
    }

    public void run() {

        logger.info("Netty client starting with id: " + this.id);

        EventLoopGroup group = new NioEventLoopGroup();

        try {
            Bootstrap bootstrap = new Bootstrap();

            bootstrap.group(group)
                .channel(NioSocketChannel.class)
                    .remoteAddress(new InetSocketAddress("127.0.0.1", 8778))
                .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new MyNettyClientHandler(id));
                        }
                    });

            ChannelFuture future = bootstrap.connect().sync();
            future.channel().closeFuture().sync();
        } catch (Exception e) {
            logger.error("Error from client " + id + " : ", e);
        } finally {
            logger.info("Exiting client: " + id);
            try {
                group.shutdownGracefully().sync();
            } catch (InterruptedException e) {
                logger.error("Error from client " + id, e);
            }
        }

    }

    public static void main(String[] args) throws InterruptedException, ExecutionException {

        ExecutorService executor = Executors.newCachedThreadPool();
        List<Future<?>> futureList = new ArrayList<Future<?>>();

        for (int i = 1; i < 15; ++i) {
            futureList.add(executor.submit(new MyNettyClient(i)));
        }

        for (Future<?> f : futureList) {
            f.get();
            if (f.isCancelled()) {
                logger.info("One future was cancelled: " + f.toString());
            }
        }

        logger.info("Calling shutdown on executor");
        executor.shutdown();

        logger.info("MAIN THREAD DONE");
    }
}

编写对Netty服务器调用的处理程序的逻辑如下:

代码语言:javascript
运行
复制
public class MyNettyClientHandler extends SimpleChannelInboundHandler<ByteBuf> {

    private static final Logger logger = LogManager.getLogger(MyNettyClientHandler.class);

    private int clientId;

    public MyNettyClientHandler(int id) {
        clientId = id;
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {

        logger.info("@@@@@Message received: " + msg.toString(CharsetUtil.UTF_8));
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

        ByteBuf b = (ByteBuf) msg;
        String serverMsg = b.toString(CharsetUtil.UTF_8);
        logger.info("Read from server by client " + clientId + ": " + serverMsg);

    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {

        String content = null;

        if (clientId % 13 == 0) {
            content = clientId + ":LONG_TASK_REQ";
        } else {
            content = clientId + ":SMALL_TASK_REQ";
        }
        ChannelFuture f = ctx.writeAndFlush(Unpooled.copiedBuffer(content, CharsetUtil.UTF_8)).sync();

        if (f.isDone()) {
             logger.info("Send for client {} completed successfully.", clientId);
        } else {
            logger.error("Future not done for client {}. Reason: {}", clientId, f.cause().getMessage());
        }

        logger.info("Finish sending from client " + clientId);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {

        logger.error("Exception in client {}, type: {} ", clientId, cause.getMessage());

        ctx.close();
    }
}

我得到的结果表明,当服务器在50秒内接收到长请求并进行处理时,总是阻止不同客户端在不同通道上发出的其他请求。通道似乎在激活时从EventExecutionGroup绑定到某个线程,之后在该通道上进行的所有活动都将由同一个线程处理,不管该线程有多忙,池中有多少空闲线程。这是我真的不明白的,因为EventExecutionGroup的全部目的是处理长时间的同步作业,而不对其他客户端进行任何干扰。

我知道我可以实现自己的工作线程池来处理和同步长作业,但问题仍然存在:如果结果如此有限,为什么我们可以编写ch.pipeline().addLast(handlerGroup, new MyTestInboudHandler());

请给我建议。

EN

回答 1

Stack Overflow用户

发布于 2020-12-23 20:18:15

EventLoopGroup只是EventLoops的集合。每个EventLoop都是一个线程,分配给一个或多个Channels,并在其生命周期内属于该通道。这种关系是1:许多单个EventLoop可以处理多个Channels的地方。

当信道在EventLoopGroup中注册时,它将把它们分配给一个新的EventLoop (如果容量允许的话)或一个现有的EventLoop (我相信通过循环)。这意味着,一个长时间运行的任务完全有可能在通道的EventLoop上运行,而这个Channels恰好处理来自其他运行短时间运行任务的客户端的Channels,从而导致您所经历的阻塞。

您可以做的一件事是将这些长期运行的任务从您的ExecutorService中卸载到单独的MyTestInboundHandler中。这样,长时间运行的任务就不会阻塞,而只是将处理分派给另一个线程。

但是请注意,当您将一个任务卸载到一个单独的ExecutorService时,它将在一个单独的线程中运行,并且要写入通道,写操作Channel.writeAndFlush(data)实际上会将任务排队到ChannelEventLoop队列中,并在下一个迭代中运行它,它不会立即运行它。

例如:

代码语言:javascript
运行
复制
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
    myExecutor.submit(() -> {
        byte[] response = doSomeLongTask(msg);
        ctx.channel().writeAndFlush(response); // Submits to channel's EventLoop queue to be written
    }
}

现在要回答主要问题:,这和向每个处理程序添加一个EventExecutor有什么区别?

通过执行以下操作,几乎可以获得相同的结果。

代码语言:javascript
运行
复制
ch.pipeline().addLast(handlerGroup, new MyTestInboudHandler());

主要的区别是[1],在将自己的handlerGroup添加到每个处理程序的情况下,每个套接字事件都将在该组中运行(强调我的):

用户在向EventExecutor添加处理程序时可以指定ChannelPipeline。

  • 如果指定,ChannelHandler的ChannelHandler处理程序方法总是由指定的EventExecutor调用。
  • 如果未指定,则处理程序方法始终由注册到其关联通道的EventLoop调用。

分配给处理程序或通道的EventExecutor和EventLoop总是单线程的.

  • 处理程序方法将始终由同一个线程调用。
  • --如果指定了多线程EventExecutor或EventLoop,将首先选择其中一个线程,然后使用所选线程直到取消注册
  • 如果同一管道中的两个处理程序被分配给不同的>EventExecutor,则同时调用它们。如果多个处理程序访问共享数据,即使共享数据仅由同一管道中的处理程序访问,用户也必须注意线程安全性。

这意味着,对于发生在管道上的每个处理程序事件,以及对于分配给自己的执行器的处理程序,这些整个方法(如channelRead0())将在它们自己的线程中从指定的执行器中运行。

如果不想为处理程序指定执行器,也可以通过重写每个处理程序方法并立即卸载对自定义执行程序的调用来实现这一点。注意:您需要覆盖每个方法,任何未被覆盖的方法仍将在netty EventLoop线程中运行。

因此,从本质上说,它们几乎是完全相同的。通过使用分配给处理程序的执行器,您仍然受到单个线程的限制,因为Netty将对每个处理程序重用相同的线程。但好处是每个处理程序方法将在一个单独的线程中自动运行,而不是通道的指定EventLoop

通过使用您自己的执行器并将任务卸载到它,EventLoop仍然是在同一个线程中调用每个处理程序方法的那个人,但是您可以将完成的实际工作卸载到您自己的执行器上。通过这种方式,执行器还可以使用不止一个线程并发地处理这些任务。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/65345371

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档