前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >netty案例,netty4.1中级拓展篇八《Netty心跳服务与断线重连》

netty案例,netty4.1中级拓展篇八《Netty心跳服务与断线重连》

原创
作者头像
小傅哥
修改2020-03-04 09:43:51
1K0
修改2020-03-04 09:43:51
举报

小傅哥 | https://bugstack.cn 沉淀、分享、成长,让自己和他人都能有所收获。专注于原创专题案例编写,目前已完成的专题有;Netty4.x实战专题案例、用Java实现JVM、基于JavaAgent的全链路监控、手写RPC框架、架构设计专题案例、源码分析等。你用剑🗡、我用刀🔪,好的代码都很烧,望你不吝出招!

一、前言介绍

在我们使用netty中,需要监测服务是否稳定以及在网络异常链接断开时候可以自动重连。需要实现监听;f.addListener(new MyChannelFutureListener())

二、环境准备

  1. jdk1.8【jdk1.7以下只能部分支持netty】
  2. Netty4.1.36.Final【netty3.x 4.x 5每次的变化较大,接口类名也随着变化】

三、代码示例

代码语言:txt
复制
itstack-demo-rpc-2-08
└── src
    └── main
    │    └── java
    │        └── org.itstack.demo.netty
    │             ├── client
    │             │   ├── MyChannelFutureListener.java
    │             │   ├── MyChannelInitializer.java
    │             │   ├── MyClientHandler.java
    │             │   └── NettyClient.java
    │             └── server
    │             	  ├── MyChannelInitializer.java
    │             	  ├── MyServerHandler.java
    │             	  └── NettyServer.java
    └── test
         └── java
             └── org.itstack.demo.test
                 ├── StartClient.java
                 └── StartServer.java

展示部分重要代码块,完整代码可以关注公众号获取;bugstack虫洞栈

client/MyChannelFutureListener.java

代码语言:txt
复制
/**
 * 虫洞栈:https://bugstack.cn
 * 公众号:bugstack虫洞栈  {获取学习源码}
 * Create by fuzhengwei on 2019
 */
public class MyChannelFutureListener implements ChannelFutureListener {
    @Override
    public void operationComplete(ChannelFuture channelFuture) throws Exception {
        if (channelFuture.isSuccess()) {
            System.out.println("itstack-demo-netty client start done. {关注公众号:bugstack虫洞栈,获取源码}");
            return;
        }
        final EventLoop loop = channelFuture.channel().eventLoop();
        loop.schedule(new Runnable() {
            @Override
            public void run() {
                try {
                    new NettyClient().connect("127.0.0.1", 7397);
                    System.out.println("itstack-demo-netty client start done. {关注公众号:bugstack虫洞栈,获取源码}");
                    Thread.sleep(500);
                } catch (Exception e){
                    System.out.println("itstack-demo-netty client start error go reconnect ... {关注公众号:bugstack虫洞栈,获取源码}");
                }
            }
        }, 1L, TimeUnit.SECONDS);
    }
}

client/MyClientHandler.java

代码语言:txt
复制
/**
 * 虫洞栈:https://bugstack.cn
 * 公众号:bugstack虫洞栈  {获取学习源码}
 * Create by fuzhengwei on 2019
 */
public class MyClientHandler extends ChannelInboundHandlerAdapter {

    /**
     * 当客户端主动链接服务端的链接后,这个通道就是活跃的了。也就是客户端与服务端建立了通信通道并且可以传输数据
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        SocketChannel channel = (SocketChannel) ctx.channel();
        System.out.println("链接报告开始");
        System.out.println("链接报告信息:本客户端链接到服务端。channelId:" + channel.id());
        System.out.println("链接报告IP:" + channel.localAddress().getHostString());
        System.out.println("链接报告Port:" + channel.localAddress().getPort());
        System.out.println("链接报告完毕");
    }

    /**
     * 当客户端主动断开服务端的链接后,这个通道就是不活跃的。也就是说客户端与服务端的关闭了通信通道并且不可以传输数据
     */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("断开链接重连" + ctx.channel().localAddress().toString());
        //使用过程中断线重连
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    new NettyClient().connect("127.0.0.1", 7397);
                    System.out.println("itstack-demo-netty client start done. {关注公众号:bugstack虫洞栈,获取源码}");
                    Thread.sleep(500);
                } catch (Exception e){
                    System.out.println("itstack-demo-netty client start error go reconnect ... {关注公众号:bugstack虫洞栈,获取源码}");
                }
            }
        }).start();
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        //接收msg消息{与上一章节相比,此处已经不需要自己进行解码}
        System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + " 接收到消息:" + msg);
    }

    /**
     * 抓住异常,当发生异常的时候,可以做一些相应的处理,比如打印日志、关闭链接
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println("异常信息,断开重连:\r\n" + cause.getMessage());
        ctx.close();
    }

}

client/NettyClient.java

代码语言:txt
复制
/**
 * 虫洞栈:https://bugstack.cn
 * 公众号:bugstack虫洞栈  {获取学习源码}
 * Create by fuzhengwei on 2019
 */
public class NettyClient {

    public static void main(String[] args) {
        new NettyClient().connect("127.0.0.1", 7397);
    }

    public void connect(String inetHost, int inetPort) {
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            b.group(workerGroup);
            b.channel(NioSocketChannel.class);
            b.option(ChannelOption.AUTO_READ, true);
            b.handler(new MyChannelInitializer());
            ChannelFuture f = b.connect(inetHost, inetPort).sync();
            f.addListener(new MyChannelFutureListener()); //添加监听,处理重连
            f.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            workerGroup.shutdownGracefully();
        }
    }

}

server/MyServerHandler.java

代码语言:txt
复制
/**
 * 虫洞栈:https://bugstack.cn
 * 公众号:bugstack虫洞栈  {获取学习源码}
 * Create by fuzhengwei on 2019
 */
public class MyServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        super.userEventTriggered(ctx, evt);
        if (evt instanceof IdleStateEvent) {
            IdleStateEvent e = (IdleStateEvent) evt;
            if (e.state() == IdleState.READER_IDLE) {
                System.out.println("bugstack虫洞栈提醒=> Reader Idle");
                ctx.writeAndFlush("读取等待:公众号bugstack虫洞栈,客户端你在吗[ctx.close()]{我结尾是一个换行符用于处理半包粘包}... ...\r\n");
                ctx.close();
            } else if (e.state() == IdleState.WRITER_IDLE) {
                System.out.println("bugstack虫洞栈提醒=> Write Idle");
                ctx.writeAndFlush("写入等待:公众号bugstack虫洞栈,客户端你在吗{我结尾是一个换行符用于处理半包粘包}... ...\r\n");
            } else if (e.state() == IdleState.ALL_IDLE) {
                System.out.println("bugstack虫洞栈提醒=> All_IDLE");
                ctx.writeAndFlush("全部时间:公众号bugstack虫洞栈,客户端你在吗{我结尾是一个换行符用于处理半包粘包}... ...\r\n");
            }
        }
        ctx.flush();
    }

    /**
     * 当客户端主动链接服务端的链接后,这个通道就是活跃的了。也就是客户端与服务端建立了通信通道并且可以传输数据
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        SocketChannel channel = (SocketChannel) ctx.channel();
        System.out.println("链接报告开始");
        System.out.println("链接报告信息:有一客户端链接到本服务端");
        System.out.println("链接报告IP:" + channel.localAddress().getHostString());
        System.out.println("链接报告Port:" + channel.localAddress().getPort());
        System.out.println("链接报告完毕");
        //通知客户端链接建立成功
        String str = "通知客户端链接建立成功" + " " + new Date() + " " + channel.localAddress().getHostString() + "\r\n";
        ctx.writeAndFlush(str);
    }

    /**
     * 当客户端主动断开服务端的链接后,这个通道就是不活跃的。也就是说客户端与服务端的关闭了通信通道并且不可以传输数据
     */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("客户端断开链接" + ctx.channel().localAddress().toString());
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        //接收msg消息{与上一章节相比,此处已经不需要自己进行解码}
        System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + " 接收到消息:" + msg);
        //通知客户端链消息发送成功
        String str = "服务端收到:" + new Date() + " " + msg + "\r\n";
        ctx.writeAndFlush(str);
    }

    /**
     * 抓住异常,当发生异常的时候,可以做一些相应的处理,比如打印日志、关闭链接
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
        System.out.println("异常信息:\r\n" + cause.getMessage());
    }

}

server/NettyServer.java

代码语言:txt
复制
/**
 * 虫洞栈:https://bugstack.cn
 * 公众号:bugstack虫洞栈  {获取学习源码}
 * Create by fuzhengwei on 2019
 */
public class NettyServer {

    public static void main(String[] args) {
        new NettyServer().bing(7397);
    }

    private void bing(int port) {
        //配置服务端NIO线程组
        EventLoopGroup parentGroup = new NioEventLoopGroup(); //NioEventLoopGroup extends MultithreadEventLoopGroup Math.max(1, SystemPropertyUtil.getInt("io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
        EventLoopGroup childGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(parentGroup, childGroup)
                    .channel(NioServerSocketChannel.class)    //非阻塞模式
                    .option(ChannelOption.SO_BACKLOG, 128)
                    .childHandler(new MyChannelInitializer());
            ChannelFuture f = b.bind(port).sync();
            System.out.println("itstack-demo-netty server start done. {关注公众号:bugstack虫洞栈,获取源码}");
            f.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            childGroup.shutdownGracefully();
            parentGroup.shutdownGracefully();
        }

    }

}

四、测试结果

启动NettyServer *在心跳中设置ctx.close();模拟断开链接,等待重连

代码语言:txt
复制
itstack-demo-netty server start done. {关注公众号:bugstack虫洞栈,获取源码}
链接报告开始
链接报告信息:有一客户端链接到本服务端
链接报告IP:127.0.0.1
链接报告Port:7397
链接报告完毕
bugstack虫洞栈提醒=> Reader Idle
客户端断开链接/127.0.0.1:7397
链接报告开始
链接报告信息:有一客户端链接到本服务端
链接报告IP:127.0.0.1
链接报告Port:7397
链接报告完毕
bugstack虫洞栈提醒=> Reader Idle
客户端断开链接/127.0.0.1:7397
链接报告开始
链接报告信息:有一客户端链接到本服务端
链接报告IP:127.0.0.1
链接报告Port:7397
链接报告完毕
bugstack虫洞栈提醒=> Reader Idle
客户端断开链接/127.0.0.1:7397
链接报告开始
链接报告信息:有一客户端链接到本服务端
链接报告IP:127.0.0.1
链接报告Port:7397
链接报告完毕
异常信息:
远程主机强迫关闭了一个现有的连接。
客户端断开链接/127.0.0.1:7397

Process finished with exit code -1

启动NettyClient

代码语言:txt
复制
链接报告开始
链接报告信息:本客户端链接到服务端。channelId:d9f3f045
链接报告IP:127.0.0.1
链接报告Port:53009
链接报告完毕
itstack-demo-netty client start done. {关注公众号:bugstack虫洞栈,获取源码}
2019-08-18 16:49:28 接收到消息:通知客户端链接建立成功 Sun Aug 18 16:49:28 CST 2019 127.0.0.1
2019-08-18 16:49:30 接收到消息:读取等待:公众号bugstack虫洞栈,客户端你在吗[ctx.close()]{我结尾是一个换行符用于处理半包粘包}... ...
断开链接重连/127.0.0.1:53009
链接报告开始
链接报告信息:本客户端链接到服务端。channelId:23dc9235
链接报告IP:127.0.0.1
链接报告Port:53035
链接报告完毕
itstack-demo-netty client start done. {关注公众号:bugstack虫洞栈,获取源码}
2019-08-18 16:49:30 接收到消息:通知客户端链接建立成功 Sun Aug 18 16:49:30 CST 2019 127.0.0.1
2019-08-18 16:49:32 接收到消息:读取等待:公众号bugstack虫洞栈,客户端你在吗[ctx.close()]{我结尾是一个换行符用于处理半包粘包}... ...
断开链接重连/127.0.0.1:53035
itstack-demo-netty client start done. {关注公众号:bugstack虫洞栈,获取源码}
链接报告开始
链接报告信息:本客户端链接到服务端。channelId:9c186f92
链接报告IP:127.0.0.1
链接报告Port:53052
链接报告完毕
itstack-demo-netty client start done. {关注公众号:bugstack虫洞栈,获取源码}
2019-08-18 16:49:32 接收到消息:通知客户端链接建立成功 Sun Aug 18 16:49:32 CST 2019 127.0.0.1
2019-08-18 16:49:34 接收到消息:读取等待:公众号bugstack虫洞栈,客户端你在吗[ctx.close()]{我结尾是一个换行符用于处理半包粘包}... ...
断开链接重连/127.0.0.1:53052
itstack-demo-netty client start done. {关注公众号:bugstack虫洞栈,获取源码}
链接报告开始
链接报告信息:本客户端链接到服务端。channelId:46b1d56a
链接报告IP:127.0.0.1
链接报告Port:53069
链接报告完毕
itstack-demo-netty client start done. {关注公众号:bugstack虫洞栈,获取源码}
2019-08-18 16:49:34 接收到消息:通知客户端链接建立成功 Sun Aug 18 16:49:34 CST 2019 127.0.0.1

Process finished with exit code -1

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、前言介绍
  • 二、环境准备
  • 三、代码示例
  • 四、测试结果
相关产品与服务
应用性能监控
应用性能监控(Application Performance Management,APM)是一款应用性能管理平台,基于实时多语言应用探针全量采集技术,为您提供分布式性能分析和故障自检能力。APM 协助您在复杂的业务系统里快速定位性能问题,降低 MTTR(平均故障恢复时间),实时了解并追踪应用性能,提升用户体验。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档