前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >spring整合各种RPC框架(netty、dubbo、dubbox、RPC、Motan)-续netty

spring整合各种RPC框架(netty、dubbo、dubbox、RPC、Motan)-续netty

作者头像
逍遥壮士
发布2021-02-03 16:17:30
1.4K0
发布2021-02-03 16:17:30
举报
文章被收录于专栏:技术趋势技术趋势

原文地址

注:本文篇幅非常长

有将近10万字~,所以建议各位下载源码学习。(如需要请收藏!转载请声明来源,谢谢!)

代码下载:https://gitee.com/hong99/spring/issues/I1N1DF


本文为继上文 :spring整合各种RPC框架(netty、dubbo、dubbox、RPC、Motan)

netty相关介绍

Netty是由JBOSS提供的一个java开源框架,现为 Github上的独立项目。Netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。---百度百科

介绍地址:https://baike.baidu.com/item/Netty/10061624?fr=aladdin

官网:https://netty.io/

netty基于使用

基于java简单实现

版本信息

jdk 1.8

spring 4.x

netty 4.x

com.hong.spring.netty.NettyClientHandler

代码语言:javascript
复制
package com.hong.spring.netty;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
/**
 *
 * 功能描述: 客户端的监听器
 *
 * @param:
 * @return:
 * @auther: csh
 * @date: 2021/1/15 16:02
 */
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ctx.writeAndFlush(Unpooled.copiedBuffer("hello nettyServer,i'm hong!", CharsetUtil.UTF_8));
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf byteBuf = (ByteBuf) msg;
        System.out.println("server msg " + byteBuf.toString(CharsetUtil.UTF_8));
        System.out.println("server address " + ctx.channel().remoteAddress());

    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}

com.hong.spring.netty.NettyServerHandler

代码语言:javascript
复制
package com.hong.spring.netty;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;

/**
 *
 * 功能描述: 服务端监听器
 *
 * @param:
 * @return:
 * @auther: csh
 * @date: 2021/1/15 16:05
 */
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf byteBuf = (ByteBuf) msg;
        System.out.println("client msg " + byteBuf.toString(CharsetUtil.UTF_8));
        System.out.println("client address " + ctx.channel().remoteAddress());
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.writeAndFlush(Unpooled.copiedBuffer("hello client i'm hong server!",CharsetUtil.UTF_8));
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}

com.hong.spring.NettyServerTest

代码语言:javascript
复制
package com.hong.spring;

import com.hong.spring.netty.NettyServerHandler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

/**
 * @author: csh
 * @Date: 2021/1/15 15:56
 * @Description:netty服务端
 */
public class NettyServerTest  {
    public static void main(String[] args) {
        //用于接收客户端连接的线程组
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        //用于实际业务处理的线程组
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {

            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    //设置TCP缓冲区
                    .option(ChannelOption.SO_BACKLOG, 128)
                    //设置发送数据缓冲大小
                    .option(ChannelOption.SO_SNDBUF, 32 * 1024)
                    //设置接受数据缓冲大小
                    .option(ChannelOption.SO_RCVBUF, 32 * 1024)
                    //保持连接
                    .childOption(ChannelOption.SO_KEEPALIVE, true)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            socketChannel.pipeline().addLast(new NettyServerHandler());
                        }
                    });

            System.out.println("server ready");
            //绑定的端口
            ChannelFuture channelFuture = serverBootstrap.bind(6668).sync();
            channelFuture.channel().closeFuture().sync();

        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

com.hong.spring.NettyClientTest

代码语言:javascript
复制
package com.hong.spring;

import com.hong.spring.netty.NettyClientHandler;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

/**
 * @author: csh
 * @Date: 2021/1/15 15:56
 * @Description:netty客户端
 */
public class NettyClientTest {
    public static void main(String[] args) throws InterruptedException {
        NioEventLoopGroup eventExecutors = new NioEventLoopGroup();

        try {
            //
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(eventExecutors)
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            socketChannel.pipeline().addLast(new NettyClientHandler());
                        }
                    });
            System.out.println("client ready");

            ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6668).sync();
            channelFuture.channel().closeFuture().sync();
        }finally {
            eventExecutors.shutdownGracefully();
        }

    }
}

运行结果(注意先运行服务端)

服务端

代码语言:javascript
复制
"C:\Program Files\Java\jdk1.8.0_181\bin\java.exe" -Dvisualvm.id=101847092493100 "-javaagent:C:\Program Files\JetBrains\IntelliJ IDEA 2018.2.3\lib\idea_rt.jar=60406:C:\Program Files\JetBrains\IntelliJ IDEA 2018.2.3\bin" -Dfile.encoding=UTF-8 -classpath "C:\Program Files\Java\jdk1.8.0_181\jre\lib\charsets.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\deploy.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\ext\access-bridge-64.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\ext\cldrdata.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\ext\dnsns.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\ext\jaccess.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\ext\jfxrt.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\ext\localedata.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\ext\nashorn.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\ext\sunec.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\ext\sunjce_provider.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\ext\sunmscapi.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\ext\sunpkcs11.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\ext\zipfs.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\javaws.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\jce.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\jfr.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\jfxswt.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\jsse.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\management-agent.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\plugin.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\resources.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\rt.jar;D:\ideaWorkSpace\spring\spring_rpc\spring_netty_server\target\test-classes;D:\ideaWorkSpace\spring\spring_rpc\spring_netty_server\target\classes;D:\ideaWorkSpace\spring\spring_rpc\spring_dubbo_api\target\classes;D:\mvn\io\netty\netty-all\4.1.17.Final\netty-all-4.1.17.Final.jar;D:\mvn\org\mybatis\mybatis\3.5.0\mybatis-3.5.0.jar;D:\mvn\org\mybatis\mybatis-spring\2.0.3\mybatis-spring-2.0.3.jar;D:\mvn\com\fasterxml\jackson\core\jackson-databind\2.5.0\jackson-databind-2.5.0.jar;D:\mvn\com\fasterxml\jackson\core\jackson-core\2.5.0\jackson-core-2.5.0.jar;D:\mvn\com\fasterxml\jackson\core\jackson-annotations\2.5.0\jackson-annotations-2.5.0.jar;D:\mvn\mysql\mysql-connector-java\5.1.34\mysql-connector-java-5.1.34.jar;D:\mvn\org\springframework\spring-beans\4.3.11.RELEASE\spring-beans-4.3.11.RELEASE.jar;D:\mvn\org\springframework\spring-context\4.3.11.RELEASE\spring-context-4.3.11.RELEASE.jar;D:\mvn\org\springframework\spring-context-support\4.3.11.RELEASE\spring-context-support-4.3.11.RELEASE.jar;D:\mvn\org\springframework\spring-expression\4.3.11.RELEASE\spring-expression-4.3.11.RELEASE.jar;D:\mvn\org\springframework\spring-core\4.3.11.RELEASE\spring-core-4.3.11.RELEASE.jar;D:\mvn\org\springframework\spring-web\4.3.11.RELEASE\spring-web-4.3.11.RELEASE.jar;D:\mvn\org\springframework\spring-webmvc\4.3.11.RELEASE\spring-webmvc-4.3.11.RELEASE.jar;D:\mvn\org\springframework\spring-test\4.3.11.RELEASE\spring-test-4.3.11.RELEASE.jar;D:\mvn\org\springframework\spring-tx\4.3.11.RELEASE\spring-tx-4.3.11.RELEASE.jar;D:\mvn\junit\junit\4.13\junit-4.13.jar;D:\mvn\org\hamcrest\hamcrest-core\1.3\hamcrest-core-1.3.jar;D:\mvn\com\alibaba\fastjson\1.2.70\fastjson-1.2.70.jar;D:\mvn\com\alibaba\druid\1.1.23\druid-1.1.23.jar;D:\mvn\org\springframework\spring-aop\4.3.11.RELEASE\spring-aop-4.3.11.RELEASE.jar;D:\mvn\commons-logging\commons-logging\1.2\commons-logging-1.2.jar;D:\mvn\org\apache\logging\log4j\log4j-slf4j-impl\2.11.0\log4j-slf4j-impl-2.11.0.jar;D:\mvn\org\slf4j\slf4j-api\1.8.0-alpha2\slf4j-api-1.8.0-alpha2.jar;D:\mvn\org\apache\logging\log4j\log4j-api\2.11.0\log4j-api-2.11.0.jar;D:\mvn\org\apache\logging\log4j\log4j-core\2.11.0\log4j-core-2.11.0.jar;D:\mvn\org\projectlombok\lombok\1.16.22\lombok-1.16.22.jar;D:\mvn\ch\qos\logback\logback-classic\1.2.2\logback-classic-1.2.2.jar;D:\mvn\ch\qos\logback\logback-core\1.2.2\logback-core-1.2.2.jar;D:\mvn\org\hibernate\hibernate-core\4.2.0.Final\hibernate-core-4.2.0.Final.jar;D:\mvn\antlr\antlr\2.7.7\antlr-2.7.7.jar;D:\mvn\org\jboss\logging\jboss-logging\3.1.0.GA\jboss-logging-3.1.0.GA.jar;D:\mvn\org\jboss\spec\javax\transaction\jboss-transaction-api_1.1_spec\1.0.0.Final\jboss-transaction-api_1.1_spec-1.0.0.Final.jar;D:\mvn\dom4j\dom4j\1.6.1\dom4j-1.6.1.jar;D:\mvn\org\hibernate\javax\persistence\hibernate-jpa-2.0-api\1.0.1.Final\hibernate-jpa-2.0-api-1.0.1.Final.jar;D:\mvn\org\javassist\javassist\3.15.0-GA\javassist-3.15.0-GA.jar;D:\mvn\org\hibernate\common\hibernate-commons-annotations\4.0.1.Final\hibernate-commons-annotations-4.0.1.Final.jar;D:\mvn\org\springframework\spring-orm\4.3.11.RELEASE\spring-orm-4.3.11.RELEASE.jar;D:\mvn\org\springframework\spring-jdbc\4.3.11.RELEASE\spring-jdbc-4.3.11.RELEASE.jar;D:\mvn\javax\servlet\javax.servlet-api\3.0.1\javax.servlet-api-3.0.1.jar" com.hong.spring.NettyServerTest
16:06:39.894 [main] DEBUG io.netty.util.internal.logging.InternalLoggerFactory - Using SLF4J as the default logging framework
16:06:39.905 [main] DEBUG io.netty.channel.MultithreadEventLoopGroup - -Dio.netty.eventLoopThreads: 16
16:06:39.929 [main] DEBUG io.netty.util.internal.PlatformDependent - Platform: Windows
16:06:39.935 [main] DEBUG io.netty.util.internal.PlatformDependent0 - -Dio.netty.noUnsafe: false
16:06:39.936 [main] DEBUG io.netty.util.internal.PlatformDependent0 - Java version: 8
16:06:39.938 [main] DEBUG io.netty.util.internal.PlatformDependent0 - sun.misc.Unsafe.theUnsafe: available
16:06:39.939 [main] DEBUG io.netty.util.internal.PlatformDependent0 - sun.misc.Unsafe.copyMemory: available
16:06:39.940 [main] DEBUG io.netty.util.internal.PlatformDependent0 - java.nio.Buffer.address: available
16:06:39.941 [main] DEBUG io.netty.util.internal.PlatformDependent0 - direct buffer constructor: available
16:06:39.942 [main] DEBUG io.netty.util.internal.PlatformDependent0 - java.nio.Bits.unaligned: available, true
16:06:39.943 [main] DEBUG io.netty.util.internal.PlatformDependent0 - jdk.internal.misc.Unsafe.allocateUninitializedArray(int): unavailable prior to Java9
16:06:39.943 [main] DEBUG io.netty.util.internal.PlatformDependent0 - java.nio.DirectByteBuffer.<init>(long, int): available
16:06:39.943 [main] DEBUG io.netty.util.internal.PlatformDependent - sun.misc.Unsafe: available
16:06:39.945 [main] DEBUG io.netty.util.internal.PlatformDependent - -Dio.netty.tmpdir: C:\Users\admin\AppData\Local\Temp (java.io.tmpdir)
16:06:39.945 [main] DEBUG io.netty.util.internal.PlatformDependent - -Dio.netty.bitMode: 64 (sun.arch.data.model)
16:06:39.947 [main] DEBUG io.netty.util.internal.PlatformDependent - -Dio.netty.noPreferDirect: false
16:06:39.948 [main] DEBUG io.netty.util.internal.PlatformDependent - -Dio.netty.maxDirectMemory: 3793747968 bytes
16:06:39.948 [main] DEBUG io.netty.util.internal.PlatformDependent - -Dio.netty.uninitializedArrayAllocationThreshold: -1
16:06:39.949 [main] DEBUG io.netty.util.internal.CleanerJava6 - java.nio.ByteBuffer.cleaner(): available
16:06:39.965 [main] DEBUG io.netty.channel.nio.NioEventLoop - -Dio.netty.noKeySetOptimization: false
16:06:39.966 [main] DEBUG io.netty.channel.nio.NioEventLoop - -Dio.netty.selectorAutoRebuildThreshold: 512
16:06:39.972 [main] DEBUG io.netty.util.internal.PlatformDependent - org.jctools-core.MpscChunkedArrayQueue: available
server ready
16:06:40.518 [main] DEBUG io.netty.channel.DefaultChannelId - -Dio.netty.processId: 20728 (auto-detected)
16:06:40.521 [main] DEBUG io.netty.util.NetUtil - -Djava.net.preferIPv4Stack: false
16:06:40.521 [main] DEBUG io.netty.util.NetUtil - -Djava.net.preferIPv6Addresses: false
16:06:40.916 [main] DEBUG io.netty.util.NetUtil - Loopback interface: lo (Software Loopback Interface 1, 127.0.0.1)
16:06:40.917 [main] DEBUG io.netty.util.NetUtil - Failed to get SOMAXCONN from sysctl and file \proc\sys\net\core\somaxconn. Default: 200
16:06:41.305 [main] DEBUG io.netty.channel.DefaultChannelId - -Dio.netty.machineId: 30:9c:23:ff:fe:ca:fe:19 (auto-detected)
16:06:41.311 [main] DEBUG io.netty.util.internal.InternalThreadLocalMap - -Dio.netty.threadLocalMap.stringBuilder.initialSize: 1024
16:06:41.312 [main] DEBUG io.netty.util.internal.InternalThreadLocalMap - -Dio.netty.threadLocalMap.stringBuilder.maxSize: 4096
16:06:41.320 [main] DEBUG io.netty.util.ResourceLeakDetector - -Dio.netty.leakDetection.level: simple
16:06:41.321 [main] DEBUG io.netty.util.ResourceLeakDetector - -Dio.netty.leakDetection.targetRecords: 4
16:06:41.343 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.numHeapArenas: 16
16:06:41.344 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.numDirectArenas: 16
16:06:41.344 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.pageSize: 8192
16:06:41.344 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.maxOrder: 11
16:06:41.345 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.chunkSize: 16777216
16:06:41.345 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.tinyCacheSize: 512
16:06:41.345 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.smallCacheSize: 256
16:06:41.345 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.normalCacheSize: 64
16:06:41.345 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.maxCachedBufferCapacity: 32768
16:06:41.346 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.cacheTrimInterval: 8192
16:06:41.346 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.useCacheForAllThreads: true
16:06:41.356 [main] DEBUG io.netty.buffer.ByteBufUtil - -Dio.netty.allocator.type: pooled
16:06:41.356 [main] DEBUG io.netty.buffer.ByteBufUtil - -Dio.netty.threadLocalDirectBufferSize: 65536
16:06:41.356 [main] DEBUG io.netty.buffer.ByteBufUtil - -Dio.netty.maxThreadLocalCharBufferSize: 16384
16:06:50.160 [nioEventLoopGroup-3-1] DEBUG io.netty.util.Recycler - -Dio.netty.recycler.maxCapacityPerThread: 32768
16:06:50.160 [nioEventLoopGroup-3-1] DEBUG io.netty.util.Recycler - -Dio.netty.recycler.maxSharedCapacityFactor: 2
16:06:50.160 [nioEventLoopGroup-3-1] DEBUG io.netty.util.Recycler - -Dio.netty.recycler.linkCapacity: 16
16:06:50.160 [nioEventLoopGroup-3-1] DEBUG io.netty.util.Recycler - -Dio.netty.recycler.ratio: 8
16:06:50.169 [nioEventLoopGroup-3-1] DEBUG io.netty.buffer.AbstractByteBuf - -Dio.netty.buffer.bytebuf.checkAccessible: true
16:06:50.171 [nioEventLoopGroup-3-1] DEBUG io.netty.util.ResourceLeakDetectorFactory - Loaded default ResourceLeakDetector: io.netty.util.ResourceLeakDetector@7f64202e
client msg hello nettyServer,i'm hong!
client address /127.0.0.1:60549

客服端

代码语言:javascript
复制
"C:\Program Files\Java\jdk1.8.0_181\bin\java.exe" -Dvisualvm.id=101855782551700 "-javaagent:C:\Program Files\JetBrains\IntelliJ IDEA 2018.2.3\lib\idea_rt.jar=60505:C:\Program Files\JetBrains\IntelliJ IDEA 2018.2.3\bin" -Dfile.encoding=UTF-8 -classpath "C:\Program Files\Java\jdk1.8.0_181\jre\lib\charsets.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\deploy.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\ext\access-bridge-64.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\ext\cldrdata.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\ext\dnsns.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\ext\jaccess.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\ext\jfxrt.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\ext\localedata.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\ext\nashorn.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\ext\sunec.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\ext\sunjce_provider.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\ext\sunmscapi.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\ext\sunpkcs11.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\ext\zipfs.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\javaws.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\jce.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\jfr.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\jfxswt.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\jsse.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\management-agent.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\plugin.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\resources.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\rt.jar;D:\ideaWorkSpace\spring\spring_rpc\spring_netty_server\target\test-classes;D:\ideaWorkSpace\spring\spring_rpc\spring_netty_server\target\classes;D:\ideaWorkSpace\spring\spring_rpc\spring_dubbo_api\target\classes;D:\mvn\io\netty\netty-all\4.1.17.Final\netty-all-4.1.17.Final.jar;D:\mvn\org\mybatis\mybatis\3.5.0\mybatis-3.5.0.jar;D:\mvn\org\mybatis\mybatis-spring\2.0.3\mybatis-spring-2.0.3.jar;D:\mvn\com\fasterxml\jackson\core\jackson-databind\2.5.0\jackson-databind-2.5.0.jar;D:\mvn\com\fasterxml\jackson\core\jackson-core\2.5.0\jackson-core-2.5.0.jar;D:\mvn\com\fasterxml\jackson\core\jackson-annotations\2.5.0\jackson-annotations-2.5.0.jar;D:\mvn\mysql\mysql-connector-java\5.1.34\mysql-connector-java-5.1.34.jar;D:\mvn\org\springframework\spring-beans\4.3.11.RELEASE\spring-beans-4.3.11.RELEASE.jar;D:\mvn\org\springframework\spring-context\4.3.11.RELEASE\spring-context-4.3.11.RELEASE.jar;D:\mvn\org\springframework\spring-context-support\4.3.11.RELEASE\spring-context-support-4.3.11.RELEASE.jar;D:\mvn\org\springframework\spring-expression\4.3.11.RELEASE\spring-expression-4.3.11.RELEASE.jar;D:\mvn\org\springframework\spring-core\4.3.11.RELEASE\spring-core-4.3.11.RELEASE.jar;D:\mvn\org\springframework\spring-web\4.3.11.RELEASE\spring-web-4.3.11.RELEASE.jar;D:\mvn\org\springframework\spring-webmvc\4.3.11.RELEASE\spring-webmvc-4.3.11.RELEASE.jar;D:\mvn\org\springframework\spring-test\4.3.11.RELEASE\spring-test-4.3.11.RELEASE.jar;D:\mvn\org\springframework\spring-tx\4.3.11.RELEASE\spring-tx-4.3.11.RELEASE.jar;D:\mvn\junit\junit\4.13\junit-4.13.jar;D:\mvn\org\hamcrest\hamcrest-core\1.3\hamcrest-core-1.3.jar;D:\mvn\com\alibaba\fastjson\1.2.70\fastjson-1.2.70.jar;D:\mvn\com\alibaba\druid\1.1.23\druid-1.1.23.jar;D:\mvn\org\springframework\spring-aop\4.3.11.RELEASE\spring-aop-4.3.11.RELEASE.jar;D:\mvn\commons-logging\commons-logging\1.2\commons-logging-1.2.jar;D:\mvn\org\apache\logging\log4j\log4j-slf4j-impl\2.11.0\log4j-slf4j-impl-2.11.0.jar;D:\mvn\org\slf4j\slf4j-api\1.8.0-alpha2\slf4j-api-1.8.0-alpha2.jar;D:\mvn\org\apache\logging\log4j\log4j-api\2.11.0\log4j-api-2.11.0.jar;D:\mvn\org\apache\logging\log4j\log4j-core\2.11.0\log4j-core-2.11.0.jar;D:\mvn\org\projectlombok\lombok\1.16.22\lombok-1.16.22.jar;D:\mvn\ch\qos\logback\logback-classic\1.2.2\logback-classic-1.2.2.jar;D:\mvn\ch\qos\logback\logback-core\1.2.2\logback-core-1.2.2.jar;D:\mvn\org\hibernate\hibernate-core\4.2.0.Final\hibernate-core-4.2.0.Final.jar;D:\mvn\antlr\antlr\2.7.7\antlr-2.7.7.jar;D:\mvn\org\jboss\logging\jboss-logging\3.1.0.GA\jboss-logging-3.1.0.GA.jar;D:\mvn\org\jboss\spec\javax\transaction\jboss-transaction-api_1.1_spec\1.0.0.Final\jboss-transaction-api_1.1_spec-1.0.0.Final.jar;D:\mvn\dom4j\dom4j\1.6.1\dom4j-1.6.1.jar;D:\mvn\org\hibernate\javax\persistence\hibernate-jpa-2.0-api\1.0.1.Final\hibernate-jpa-2.0-api-1.0.1.Final.jar;D:\mvn\org\javassist\javassist\3.15.0-GA\javassist-3.15.0-GA.jar;D:\mvn\org\hibernate\common\hibernate-commons-annotations\4.0.1.Final\hibernate-commons-annotations-4.0.1.Final.jar;D:\mvn\org\springframework\spring-orm\4.3.11.RELEASE\spring-orm-4.3.11.RELEASE.jar;D:\mvn\org\springframework\spring-jdbc\4.3.11.RELEASE\spring-jdbc-4.3.11.RELEASE.jar;D:\mvn\javax\servlet\javax.servlet-api\3.0.1\javax.servlet-api-3.0.1.jar" com.hong.spring.NettyClientTest
16:06:48.607 [main] DEBUG io.netty.util.internal.logging.InternalLoggerFactory - Using SLF4J as the default logging framework
16:06:48.616 [main] DEBUG io.netty.channel.MultithreadEventLoopGroup - -Dio.netty.eventLoopThreads: 16
16:06:48.637 [main] DEBUG io.netty.util.internal.PlatformDependent - Platform: Windows
16:06:48.643 [main] DEBUG io.netty.util.internal.PlatformDependent0 - -Dio.netty.noUnsafe: false
16:06:48.644 [main] DEBUG io.netty.util.internal.PlatformDependent0 - Java version: 8
16:06:48.645 [main] DEBUG io.netty.util.internal.PlatformDependent0 - sun.misc.Unsafe.theUnsafe: available
16:06:48.646 [main] DEBUG io.netty.util.internal.PlatformDependent0 - sun.misc.Unsafe.copyMemory: available
16:06:48.646 [main] DEBUG io.netty.util.internal.PlatformDependent0 - java.nio.Buffer.address: available
16:06:48.647 [main] DEBUG io.netty.util.internal.PlatformDependent0 - direct buffer constructor: available
16:06:48.648 [main] DEBUG io.netty.util.internal.PlatformDependent0 - java.nio.Bits.unaligned: available, true
16:06:48.649 [main] DEBUG io.netty.util.internal.PlatformDependent0 - jdk.internal.misc.Unsafe.allocateUninitializedArray(int): unavailable prior to Java9
16:06:48.649 [main] DEBUG io.netty.util.internal.PlatformDependent0 - java.nio.DirectByteBuffer.<init>(long, int): available
16:06:48.649 [main] DEBUG io.netty.util.internal.PlatformDependent - sun.misc.Unsafe: available
16:06:48.650 [main] DEBUG io.netty.util.internal.PlatformDependent - -Dio.netty.tmpdir: C:\Users\admin\AppData\Local\Temp (java.io.tmpdir)
16:06:48.651 [main] DEBUG io.netty.util.internal.PlatformDependent - -Dio.netty.bitMode: 64 (sun.arch.data.model)
16:06:48.652 [main] DEBUG io.netty.util.internal.PlatformDependent - -Dio.netty.noPreferDirect: false
16:06:48.652 [main] DEBUG io.netty.util.internal.PlatformDependent - -Dio.netty.maxDirectMemory: 3793747968 bytes
16:06:48.653 [main] DEBUG io.netty.util.internal.PlatformDependent - -Dio.netty.uninitializedArrayAllocationThreshold: -1
16:06:48.654 [main] DEBUG io.netty.util.internal.CleanerJava6 - java.nio.ByteBuffer.cleaner(): available
16:06:48.669 [main] DEBUG io.netty.channel.nio.NioEventLoop - -Dio.netty.noKeySetOptimization: false
16:06:48.670 [main] DEBUG io.netty.channel.nio.NioEventLoop - -Dio.netty.selectorAutoRebuildThreshold: 512
16:06:48.678 [main] DEBUG io.netty.util.internal.PlatformDependent - org.jctools-core.MpscChunkedArrayQueue: available
client ready
16:06:49.250 [main] DEBUG io.netty.channel.DefaultChannelId - -Dio.netty.processId: 20672 (auto-detected)
16:06:49.254 [main] DEBUG io.netty.util.NetUtil - -Djava.net.preferIPv4Stack: false
16:06:49.254 [main] DEBUG io.netty.util.NetUtil - -Djava.net.preferIPv6Addresses: false
16:06:49.664 [main] DEBUG io.netty.util.NetUtil - Loopback interface: lo (Software Loopback Interface 1, 127.0.0.1)
16:06:49.665 [main] DEBUG io.netty.util.NetUtil - Failed to get SOMAXCONN from sysctl and file \proc\sys\net\core\somaxconn. Default: 200
16:06:50.037 [main] DEBUG io.netty.channel.DefaultChannelId - -Dio.netty.machineId: 30:9c:23:ff:fe:ca:fe:19 (auto-detected)
16:06:50.045 [main] DEBUG io.netty.util.internal.InternalThreadLocalMap - -Dio.netty.threadLocalMap.stringBuilder.initialSize: 1024
16:06:50.046 [main] DEBUG io.netty.util.internal.InternalThreadLocalMap - -Dio.netty.threadLocalMap.stringBuilder.maxSize: 4096
16:06:50.057 [main] DEBUG io.netty.util.ResourceLeakDetector - -Dio.netty.leakDetection.level: simple
16:06:50.057 [main] DEBUG io.netty.util.ResourceLeakDetector - -Dio.netty.leakDetection.targetRecords: 4
16:06:50.085 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.numHeapArenas: 16
16:06:50.085 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.numDirectArenas: 16
16:06:50.085 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.pageSize: 8192
16:06:50.086 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.maxOrder: 11
16:06:50.087 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.chunkSize: 16777216
16:06:50.087 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.tinyCacheSize: 512
16:06:50.087 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.smallCacheSize: 256
16:06:50.087 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.normalCacheSize: 64
16:06:50.088 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.maxCachedBufferCapacity: 32768
16:06:50.088 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.cacheTrimInterval: 8192
16:06:50.088 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.useCacheForAllThreads: true
16:06:50.098 [main] DEBUG io.netty.buffer.ByteBufUtil - -Dio.netty.allocator.type: pooled
16:06:50.098 [main] DEBUG io.netty.buffer.ByteBufUtil - -Dio.netty.threadLocalDirectBufferSize: 65536
16:06:50.099 [main] DEBUG io.netty.buffer.ByteBufUtil - -Dio.netty.maxThreadLocalCharBufferSize: 16384
16:06:50.132 [nioEventLoopGroup-2-1] DEBUG io.netty.buffer.AbstractByteBuf - -Dio.netty.buffer.bytebuf.checkAccessible: true
16:06:50.134 [nioEventLoopGroup-2-1] DEBUG io.netty.util.ResourceLeakDetectorFactory - Loaded default ResourceLeakDetector: io.netty.util.ResourceLeakDetector@1fb278ae
16:06:50.142 [nioEventLoopGroup-2-1] DEBUG io.netty.util.Recycler - -Dio.netty.recycler.maxCapacityPerThread: 32768
16:06:50.142 [nioEventLoopGroup-2-1] DEBUG io.netty.util.Recycler - -Dio.netty.recycler.maxSharedCapacityFactor: 2
16:06:50.142 [nioEventLoopGroup-2-1] DEBUG io.netty.util.Recycler - -Dio.netty.recycler.linkCapacity: 16
16:06:50.143 [nioEventLoopGroup-2-1] DEBUG io.netty.util.Recycler - -Dio.netty.recycler.ratio: 8
server msg hello client i'm hong server!
server address /127.0.0.1:6668

命令请求服务端:

代码语言:javascript
复制
telnet localhost 6668

可以发现netty实现非常简单,当然还可以通过其他协议通信,这里只是简单实现。

参考:https://segmentfault.com/a/1190000021834427?utm_source=tag-newest

基于spring的实现

服务端

web.xml

代码语言:javascript
复制
<?xml version="1.0" encoding="UTF-8"?>
<web-app xmlns="http://xmlns.jcp.org/xml/ns/javaee"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/javaee http://xmlns.jcp.org/xml/ns/javaee/web-app_3_1.xsd"
         version="3.1">
    <servlet>
        <servlet-name>netty_server</servlet-name>
        <servlet-class>org.springframework.web.servlet.DispatcherServlet</servlet-class>
        <init-param>
            <param-name>contextConfigLocation</param-name>
            <param-value>
                classpath:applicationContext-mybatis.xml,
                classpath:netty.xml
            </param-value>
        </init-param>
        <load-on-startup>1</load-on-startup>
    </servlet>

    <filter>
        <filter-name>encodingFilter</filter-name>
        <filter-class>org.springframework.web.filter.CharacterEncodingFilter</filter-class>
        <init-param>
            <param-name>encoding</param-name>
            <param-value>UTF-8</param-value>
        </init-param>
        <init-param>
            <param-name>forceEncoding</param-name>
            <param-value>true</param-value>
        </init-param>
    </filter>
    <filter-mapping>
        <filter-name>encodingFilter</filter-name>
        <url-pattern>/*</url-pattern>
    </filter-mapping>

    <servlet-mapping>
        <servlet-name>netty_server</servlet-name>
        <url-pattern>/</url-pattern>
    </servlet-mapping>
</web-app>

pom.xml

代码语言:javascript
复制
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>spring_rpc</artifactId>
        <groupId>com.hong</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.hong.spring</groupId>
    <artifactId>spring_netty_server</artifactId>

    <dependencies>
        <dependency>
            <groupId>com.hong</groupId>
            <artifactId>spring_dubbo_api</artifactId>
            <version>1.0-SNAPSHOT</version>
        </dependency>
        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.17.Final</version>
        </dependency>
    </dependencies>
    <!--静态资源导出问题-->
    <build>
        <resources>
            <resource>
                <directory>src/main/java</directory>
                <includes>
                    <include>**/*.properties</include>
                    <include>**/*.xml</include>
                </includes>
                <filtering>false</filtering>
            </resource>
            <resource>
                <directory>src/main/resources</directory>
                <includes>
                    <include>**/*.properties</include>
                    <include>**/*.xml</include>
                </includes>
                <filtering>false</filtering>
            </resource>
        </resources>
    </build>
</project>

application.properties

代码语言:javascript
复制
logging.level.root=WARN
logging.level.org.springframework.web=DEBUG
logging.level.org.hibernate=ERROR

applicationContext-mybatis.xml

代码语言:javascript
复制
<?xml version="1.0" encoding="UTF-8" ?>
<beans xmlns="http://www.springframework.org/schema/beans"
      xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      xmlns:context="http://www.springframework.org/schema/context" xmlns:tx="http://www.springframework.org/schema/tx"
      xmlns:mvc="http://www.springframework.org/schema/mvc"
      xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
    http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx.xsd http://www.springframework.org/schema/mvc http://www.springframework.org/schema/mvc/spring-mvc.xsd">


   <!-- 配置组件扫描 -->
   <context:component-scan base-package="com.hong.spring"></context:component-scan>
   <!--加载配置文件-->
    <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer" >
        <property name="locations">
            <list>
                <value>classpath:jdbc.properties</value>
                <value>classpath:netty.properties</value>
            </list>
        </property>
    </bean>

   <!-- 开启注解 -->
   <context:annotation-config />
   <!--开启注解事务-->
   <tx:annotation-driven transaction-manager="transactionManager" />
   <!--放行静态资源-->
   <mvc:default-servlet-handler />

   <bean class="org.springframework.web.servlet.view.InternalResourceViewResolver"
        id="internalResourceViewResolver">
      <!-- 前缀 -->
      <property name="prefix" value="/WEB-INF/pages/" />
      <!-- 后缀 -->
      <property name="suffix" value=".html" />
      <property name="contentType" value="text/html"/>

   </bean>

   <!--开启mvc注解事务-->
   <!-- 定义注解驱动 -->
   <mvc:annotation-driven>
      <mvc:message-converters>
         <!-- 设置支持中文 -->
         <bean class="org.springframework.http.converter.StringHttpMessageConverter">
            <property name="supportedMediaTypes">
               <list>
                  <value>text/plain;charset=UTF-8</value>
                  <value>text/html;charset=UTF-8</value>
               </list>
            </property>
         </bean>
         <bean class="com.alibaba.fastjson.support.spring.FastJsonHttpMessageConverter"/>
      </mvc:message-converters>
   </mvc:annotation-driven>


   <bean id="dataSource" class="com.alibaba.druid.pool.DruidDataSource">
      <!-- 基础配置 -->
      <property name="url" value="${jdbc.url}"></property>
      <property name="driverClassName" value="${jdbc.driver}"></property>
      <property name="username" value="${jdbc.user}"></property>
      <property name="password" value="${jdbc.password}"></property>

      <!-- 关键配置 -->
      <!-- 初始化时建立物理连接的个数。初始化发生在显示调用init方法,或者第一次getConnection时 -->
      <property name="initialSize" value="3" />
      <!-- 最小连接池数量 -->
      <property name="minIdle" value="2" />
      <!-- 最大连接池数量 -->
      <property name="maxActive" value="15" />
      <!-- 配置获取连接等待超时的时间 -->
      <property name="maxWait" value="10000" />

      <!-- 性能配置 -->
      <!-- 打开PSCache,并且指定每个连接上PSCache的大小 -->
      <property name="poolPreparedStatements" value="true" />
      <property name="maxPoolPreparedStatementPerConnectionSize" value="20" />

      <!-- 其他配置 -->
      <!-- 配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒 -->
      <property name="timeBetweenEvictionRunsMillis" value="60000" />
      <!-- 配置一个连接在池中最小生存的时间,单位是毫秒 -->
      <property name="minEvictableIdleTimeMillis" value="300000" />
      <!-- 建议配置为true,不影响性能,并且保证安全性。申请连接的时候检测,如果空闲时间大于timeBetweenEvictionRunsMillis,
                  执行validationQuery检测连接是否有效。-->
      <property name="testWhileIdle" value="true" />
      <!-- 这里建议配置为TRUE,防止取到的连接不可用 ,申请连接时执行validationQuery检测连接是否有效,做了这个配置会降低性能。-->
      <property name="testOnBorrow" value="true" />
      <!-- 归还连接时执行validationQuery检测连接是否有效,做了这个配置会降低性能 -->
      <property name="testOnReturn" value="false" />
   </bean>

   <!--事务管理器-->
   <!-- sqlSessionFactory -->
   <bean id="sqlSessionFactory" class="org.mybatis.spring.SqlSessionFactoryBean">
      <!-- 加载 MyBatis 的配置文件 -->
      <property name="configLocation" value="classpath:mybatis.xml"/>
      <!-- 数据源 -->
      <property name="dataSource" ref="dataSource"/>
      <!-- 所有配置的mapper文件 -->
      <property name="mapperLocations" value="classpath*:com/hong/spring/mapper/*.xml" />
   </bean>

   <!-- Mapper 扫描器 -->
   <bean class="org.mybatis.spring.mapper.MapperScannerConfigurer">
      <!-- 扫描 包下的组件 -->
      <property name="basePackage" value="com.hong.spring.dao" />
      <!-- 关联mapper扫描器 与 sqlsession管理器 -->
      <property name="sqlSessionFactoryBeanName" value="sqlSessionFactory" />
   </bean>
   <!--事务配置-->
   <bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
      <property name="dataSource" ref="dataSource" />
   </bean>
</beans>

jdbc.properties

代码语言:javascript
复制
config.properties:
#数据库驱动
jdbc.driver=com.mysql.jdbc.Driver
#数据库连接url
jdbc.url=jdbc:mysql://localhost:3306/spring?useUnicode=true&characterEncoding=utf-8
#数据库用户名
jdbc.user=root
#数据库密码
jdbc.password=123456

log4j2.xml

代码语言:javascript
复制
<?xml version="1.0" encoding="UTF-8"?>
<configuration status="INFO">
    <appenders>
        <Console name="Console" target="SYSTEM_OUT">
            <PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"/>
        </Console>
        <RollingFile name="RollingFile" fileName="logs/app.log"
                     filePattern="logs/$${date:yyyy-MM}/app-%d{MM-dd-yyyy}-%i.log.gz">
            <PatternLayout pattern="%d{yyyy.MM.dd 'at' HH:mm:ss z} %-5level %class{36} %L %M - %msg%xEx%n"/>
            <SizeBasedTriggeringPolicy size="5 MB"/>
        </RollingFile>
    </appenders>
    <loggers>
        <root level="DEBUG">
            <appender-ref ref="Console"/>
            <appender-ref ref="RollingFile"/>
        </root>
    </loggers>
</configuration>

logging.properties

代码语言:javascript
复制
org.apache.catalina.core.ContainerBase.[Catalina].level=INFO 
org.apache.catalina.core.ContainerBase.[Catalina].handlers=java.util.logging.ConsoleHandler

handlers = org.apache.juli.FileHandler, java.util.logging.ConsoleHandler

############################################################
# Handler specific properties.
# Describes specific configuration info for Handlers.
############################################################

org.apache.juli.FileHandler.level = FINE
org.apache.juli.FileHandler.directory = ../logs
org.apache.juli.FileHandler.prefix = error-debug.

java.util.logging.ConsoleHandler.level = FINE
java.util.logging.ConsoleHandler.formatter = java.util.logging.SimpleFormatter

mybatis.xml

代码语言:javascript
复制
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE configuration
        PUBLIC "-//mybatis.org//DTD Config 3.0//EN"
        "http://mybatis.org/dtd/mybatis-3-config.dtd">
<configuration>

    <!-- settings -->
    <settings>
        <!-- 打开延迟加载的开关 -->
        <setting name="lazyLoadingEnabled" value="true"/>
        <!-- 将积极加载改为消极加载(即按需加载) -->
        <setting name="aggressiveLazyLoading" value="false"/>
        <!-- 打开全局缓存开关(二级缓存)默认值就是 true -->
        <setting name="cacheEnabled" value="true"/>
        <!-- 开启驼峰命名转换 Table(create_time) -> Entity(createtime) -->
        <setting name="mapUnderscoreToCamelCase" value="true"/>
        <!-- 使用列别名代替列名 默认:true seslect name as title from table -->
        <setting name="useColumnLabel" value="true"/>
        <!--使用jdbc的getGeneratedKeys获取数据库自增主键值-->
        <setting name="useGeneratedKeys" value="true"/>
    </settings>

    <!-- 别名定义 -->
    <typeAliases>
        <package name="com.hong.spring.entity"/>
    </typeAliases>

</configuration>

netty.properties

代码语言:javascript
复制
#web socket端口号
websocket.server.port=7000

websocket.server.communication.port=8081
communication.netty.nodes=
# 用于处理客户端连接请求
client.server.bossGroup.threads=1
#用于处理客户端I/O操作
client.server.workerGroup.threads=0
netty.server.bossGroup.threads=1
netty.server.workerGroup.threads=0
netty.client.eventLoopGroup.threads=1

netty.xml

代码语言:javascript
复制
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd">


    <!-- 把Netty的一些类服务器注册到Spring,方便处理和扩展 -->
    <!-- 用于处理客户端连接请求 -->
    <bean id="bossGroup" class="io.netty.channel.nio.NioEventLoopGroup">
        <constructor-arg value="${client.server.bossGroup.threads}" />
    </bean>
    <!-- 用于处理客户端I/O操作 -->
    <bean id="workerGroup" class="io.netty.channel.nio.NioEventLoopGroup">
        <constructor-arg value="${client.server.workerGroup.threads}" />
    </bean>

    <!-- 服务器启动引导类 -->
    <bean id="serverBootstrap" class="io.netty.bootstrap.ServerBootstrap" scope="prototype"/>

    <!-- 自定义的Netty Websocket服务器 -->
    <bean id="webSocketServer" class="com.hong.spring.netty.WebsocketServer" init-method="start" destroy-method="close">
        <property name="port" value="${websocket.server.port}"/>
        <property name="websocketChannelInitializer" ref="websocketChannelInitializer" />
    </bean>
</beans>

com.hong.spring.dao.UserMapper

代码语言:javascript
复制
package com.hong.spring.dao;

import com.hong.spring.entity.User;
import com.hong.spring.entity.ao.UserAO;
import org.apache.ibatis.annotations.Param;

import java.util.List;

/**
 * @Auther: csh
 * @Date: 2020/8/18 15:04
 * @Description:用户dao层
 */

public interface UserMapper {

    /**
     *
     * 功能描述:查询总条数
     *
     * @param:
     * @return:
     * @auther: csh
     * @date: 2020/8/18 15:31
     */
    List<User> findAllUserList();
    /**
     *
     * 功能描述:获取总数
     *
     * @param:
     * @return:
     * @auther: csh
     * @date: 2020/8/18 15:30
     */
    int findAllTotal();
    /**
     *
     * 功能描述:更新
     *
     * @param:
     * @return:
     * @auther: csh
     * @date: 2020/8/18 15:30
     */
    int update(User user);
    /**
     *
     * 功能描述:添加
     *
     * @param:
     * @return:
     * @auther: csh
     * @date: 2020/8/19 18:39
     */
    int save(User user);
    /**
     *
     * 功能描述:批量添加
     *
     * @param:
     * @return:
     * @auther: csh
     * @date: 2020/8/21 15:46
     */
    int insertBatch(@Param("list") List <User> list);
    /**
     *
     * 功能描述:通过id查询
     *
     * @param:
     * @return:
     * @auther: csh
     * @date: 2020/8/19 18:39
     */
    User findById(int id);
    /**
     *
     * 功能描述:通过分页查询
     *
     * @param:
     * @return:
     * @auther: csh
     * @date: 2020/8/21 16:05
     */
    List<User> findByPage(UserAO ao);
}

com/hong/spring/mapper/UserMapper.xml

代码语言:javascript
复制
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper
        PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
        "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.hong.spring.dao.UserMapper">
    <resultMap type="com.hong.spring.entity.User" id="user">
        <id column="id" property="id" />
        <result column="user_name" property="username" />
        <result column="age" property="age" />
    </resultMap>

    <select id="findById" resultType="com.hong.spring.entity.User">
      SELECT * FROM user WHERE id = #{id,jdbcType=INTEGER}
    </select>

    <select id="findByPage" resultMap="user" parameterType="com.hong.spring.entity.ao.UserAO">
        select * from user where 1=1 limit #{page},#{pageSize}
    </select>

    <select id="findAllUserList" resultMap="user">
      SELECT * FROM user
    </select>

    <select id="findAllTotal" resultType="int">
      SELECT count(*) FROM user
    </select>

    <insert id="save" >
         INSERT INTO user ( user_name, age)
        VALUES (#{username,jdbcType=VARCHAR},
        #{age,jdbcType=INTEGER})
    </insert>

    <insert id="insertBatch">
        insert into user
        ( user_name, age)
        values
        <foreach collection="list" item="user" index="index"
                 separator=",">
            (#{user.username,jdbcType=VARCHAR},#{user.age,jdbcType=INTEGER})
        </foreach>
    </insert>

    <update id="update" >
        update user
        <set>
            <if test="username !=null">
                user_name=#{username,jdbcType=VARCHAR},
            </if>
            <if test="age !=null">
                age =#{age,jdbcType=INTEGER}
            </if>
        </set>
        where id = #{id,jdbcType=INTEGER}
    </update>
</mapper>

com.hong.spring.netty.NettyClientHandler

代码语言:javascript
复制
package com.hong.spring.netty;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
/**
 *
 * 功能描述: 客户端的监听器
 *
 * @param:
 * @return:
 * @auther: csh
 * @date: 2021/1/15 16:02
 */
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
    /**
     *
     * 功能描述: 客户端连接成功,会促发一次
     *
     * @param: 
     * @return: 
     * @auther: csh
     * @date: 2021/1/18 15:12
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ctx.writeAndFlush(Unpooled.copiedBuffer("hello nettyServer,i'm hong!", CharsetUtil.UTF_8));
    }
    /**
     *
     * 功能描述: 接收消息方法
     *
     * @param: 
     * @return: 
     * @auther: csh
     * @date: 2021/1/18 15:12
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf byteBuf = (ByteBuf) msg;
        System.out.println("server msg " + byteBuf.toString(CharsetUtil.UTF_8));
        System.out.println("server address " + ctx.channel().remoteAddress());
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}

com.hong.spring.netty.NettyHttpHandler

代码语言:javascript
复制
package com.hong.spring.netty;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.*;
import io.netty.util.CharsetUtil;

/**
 * @author: csh
 * @Date: 2021/1/19 10:54
 * @Description:
 */
public class NettyHttpHandler extends SimpleChannelInboundHandler<HttpObject> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception {
        if(msg instanceof HttpRequest) {
            //1.打印浏览器的请求地址
            System.out.println("客户端地址:" + ctx.channel().remoteAddress());

            //2.给浏览器发送的信息,封装成ByteBuf
            ByteBuf content = Unpooled.copiedBuffer("hello world", CharsetUtil.UTF_8);

            //3.构造一个http的相应,即 httpresponse
            FullHttpResponse response = new DefaultFullHttpResponse(
                    HttpVersion.HTTP_1_1,
                    HttpResponseStatus.OK,
                    content);
            //4.设置响应头信息-响应格式
            response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain");
            //5.设置响应头信息-响应数据长度
            response.headers().set(HttpHeaderNames.CONTENT_LENGTH, content.readableBytes());

            //6.将构建好 response返回
            ctx.writeAndFlush(response);
        }
    }
}

com.hong.spring.netty.NettyServerHandler

代码语言:javascript
复制
package com.hong.spring.netty;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;

/**
 *
 * 功能描述: 服务端监听器
 *
 * 方法 描述
 * handlerAdded Handler 被加入 Pipeline 时触发(仅仅触发一次)
 * channelRegistered channelRegistered 注册成功时触发
 * channelActive channel 连接就绪时触发
 * channelRead channel 有数据可读时触发
 * channelReadComplete channel 有数据可读,并且读完时触发
 * channelInactive channel 断开时触发
 * channelUnregistered channel 取消注册时触发
 * handlerRemoved handler 被从 Pipeline 移除时触发
 *
 * @param:
 * @return:
 * @auther: csh
 * @date: 2021/1/15 16:05
 */
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
    /**
     *
     * 功能描述: 收到消息方法!
     *
     * @param: 
     * @return: 
     * @auther: csh
     * @date: 2021/1/18 15:13
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf byteBuf = (ByteBuf) msg;
        System.out.println("client msg " + byteBuf.toString(CharsetUtil.UTF_8));
        System.out.println("client address " + ctx.channel().remoteAddress());
        ctx.writeAndFlush(byteBuf.toString(CharsetUtil.UTF_8));
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.writeAndFlush(Unpooled.copiedBuffer("hello client i'm hong server!",CharsetUtil.UTF_8));
    }
    /**
     *
     * 功能描述: 异常触发
     *
     * @param:
     * @return:
     * @auther: csh
     * @date: 2021/1/18 15:16
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println(cause.getMessage());
        ctx.close();
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("通道断开:"+ctx.channel().id());
        super.channelInactive(ctx);
    }
}

com.hong.spring.netty.NettyWebSocketHandler

代码语言:javascript
复制
package com.hong.spring.netty;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import org.springframework.stereotype.Component;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

/**
 * @author: csh
 * @Date: 2021/1/19 11:08
 * @Description:websocket监听
 */
@Component
@ChannelHandler.Sharable
public class NettyWebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
    private SimpleDateFormat format=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        //1.获取Channel通道
        final Channel channel=ctx.channel();

        //2.创建一个定时线程池
        ScheduledExecutorService ses= Executors.newScheduledThreadPool(1);
        //3.一秒钟之后只需,并且每隔5秒往浏览器发送数据
        ses.scheduleWithFixedDelay(new Runnable() {
            @Override
            public void run() {
                String sendTime=format.format(new Date());
                channel.writeAndFlush(new TextWebSocketFrame("推送时间=" + sendTime));
            }
        },1,5, TimeUnit.SECONDS);
    }

    //接受浏览器消息
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
        System.out.println("收到消息 " + msg.text());
    }

    //当web客户端连接后,触发方法
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        System.out.println("用户连接:"+ctx.channel().id());
    }

    //当web客户端断开后,触发方法
    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        System.out.println("用户断开:"+ctx.channel().id());
    }

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        System.out.println(ctx.channel().id()+"促发事件");
        super.userEventTriggered(ctx, evt);
    }
}

com.hong.spring.netty.WebsocketChannelInitializer

代码语言:javascript
复制
package com.hong.spring.netty;

import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.codec.http.websocketx.extensions.compression.WebSocketServerCompressionHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class WebsocketChannelInitializer extends ChannelInitializer<SocketChannel> {

    @Autowired
    private ChannelHandler nettyWebSocketHandler;

    @Override
    protected void initChannel(SocketChannel ch) {
        ChannelPipeline pipeline = ch.pipeline();
// pipeline.addLast(new HttpServerCodec());
// pipeline.addLast(new WebSocketServerCompressionHandler());
// pipeline.addLast(new WebSocketServerProtocolHandler("/hong",true));
// pipeline.addLast(new HttpObjectAggregator(65536));


        //基于http协议,使用http的编码和解码器
        pipeline.addLast(new HttpServerCodec());
        //是以块方式写,添加ChunkedWriteHandler处理器
        pipeline.addLast(new ChunkedWriteHandler());
        //http在传输过程中分段
        pipeline.addLast(new HttpObjectAggregator(8192));
        //心跳
        //pipeline.addLast(new IdleStateHandler(20L, 0L, 0L, TimeUnit.SECONDS));
        //将http升级为websocket
        pipeline.addLast(new WebSocketServerProtocolHandler("/hong"));
        //pipeline.addLast(new NettyServerHandler());
        pipeline.addLast(new NettyWebSocketHandler());
        
        pipeline.addLast(nettyWebSocketHandler);
    }
}

com.hong.spring.netty.WebsocketServer

代码语言:javascript
复制
package com.hong.spring.netty;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.util.concurrent.Future;
import lombok.extern.log4j.Log4j2;
import org.springframework.beans.factory.annotation.Autowired;

/**
 * @author: csh
 * @Date: 2021/1/19 11:45
 * @Description:WebSocket服务器使用独立的线程启动
 */
@Log4j2
public class WebsocketServer {
    @Autowired
    private ServerBootstrap serverBootstrap;

    private ChannelFuture serverChannelFuture;

    @Autowired
    private EventLoopGroup bossGroup;

    @Autowired
    private EventLoopGroup workerGroup;

    private ChannelHandler websocketChannelInitializer;


    private int port;

    public ChannelHandler getWebsocketChannelInitializer() {
        return websocketChannelInitializer;
    }

    public void setWebsocketChannelInitializer(ChannelHandler websocketChannelInitializer) {
        this.websocketChannelInitializer = websocketChannelInitializer;
    }

    public int getPort() {
        return port;
    }

    public void setPort(int port) {
        this.port = port;
    }

    /**
     *
     * 功能描述: websocket服务器
     *
     * @param:
     * @return:
     * @auther: csh
     * @date: 2021/1/19 11:45
     */
    public void start() {
        try {
            serverBootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(websocketChannelInitializer);
            serverChannelFuture = serverBootstrap.bind(port).sync();

            log.info("WEBSOCKET服务启动完成,已绑定端口:" + port);
        } catch (Exception e) {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
            log.error("WEBSOCKET服务启动异常!", e);
        }
    }
    /**
     *
     * 功能描述: 关闭Netty Websocket服务器,主要是释放连接
     *
     * @param:
     * @return:
     * @auther: csh
     * @date: 2021/1/19 11:48
     */
    public void close() {
        serverChannelFuture.channel().close();
        Future<?> bossGroupFuture = bossGroup.shutdownGracefully();
        Future<?> workerGroupFuture = workerGroup.shutdownGracefully();

        try {
            bossGroupFuture.await();
            workerGroupFuture.await();
            log.info("websocket服务停止中...");
        } catch (InterruptedException ignore) {
            ignore.printStackTrace();
        }
    }
}

com.hong.spring.provider.UserServiceImpl

代码语言:javascript
复制
package com.hong.spring.provider;

import com.hong.spring.api.IUserService;
import com.hong.spring.dao.UserMapper;
import com.hong.spring.entity.User;
import com.hong.spring.entity.ao.UserAO;
import com.hong.spring.utils.DataResponse;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import java.util.List;


/**
 * @Auther: csh
 * @Date: 2020/8/18 15:16
 * @Description:用户实现
 */
@Service
public class UserServiceImpl implements IUserService {
    @Autowired
    private UserMapper userDao;

    @Override
    public DataResponse<List<User>> findByAll() {
        List <User> allUserList = userDao.findAllUserList();
        int allTotal = userDao.findAllTotal();
        return DataResponse.BuildSuccessResponse(allUserList,allTotal);
    }
    @Override
    @Transactional
    public DataResponse <Boolean> save(User user) {
        if(null==user){
            return DataResponse.BuildFailResponse("必传参数不能为空!");
        }
        int save = userDao.save(user);
        return DataResponse.BuildSuccessResponse(save>0?true:false);
    }

    @Override
    public DataResponse <Boolean> insertBatch(List <User> list) {
        if(null==list){
            return DataResponse.BuildFailResponse("参数不能为空!");
        }
        int batchSave = userDao.insertBatch(list);
        return DataResponse.BuildSuccessResponse(batchSave>0?true:false);
    }

    @Override
    @Transactional
    public DataResponse <Boolean> update(User user) {
        if(null==user || user.getId()==null){
            return DataResponse.BuildFailResponse("必传参数不能为空!");
        }
        int update = userDao.update(user);
        return DataResponse.BuildSuccessResponse(update>0?true:false);
    }
    @Override
    public DataResponse <User> findById(int i) {
        User byId = userDao.findById(i);
        return DataResponse.BuildSuccessResponse(byId);
    }

    @Override
    public DataResponse <List <User>> findByPage(UserAO ao) {
        if(ao==null){
            ao.setPage(0);
            ao.setPageSize(10);
        }else{
            ao.setPage(ao.getPageSize() * ao.getPage());
        }
        int allTotal = userDao.findAllTotal();
        List <User> byPage = userDao.findByPage(ao);
        return DataResponse.BuildSuccessResponse(byPage,allTotal);
    }
}

tomcat配置

启动

代码语言:javascript
复制
09:55:15.796 [RMI TCP Connection(3)-127.0.0.1] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.numHeapArenas: 16
09:55:15.798 [RMI TCP Connection(3)-127.0.0.1] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.numDirectArenas: 16
09:55:15.799 [RMI TCP Connection(3)-127.0.0.1] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.pageSize: 8192
09:55:15.799 [RMI TCP Connection(3)-127.0.0.1] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.maxOrder: 11
09:55:15.800 [RMI TCP Connection(3)-127.0.0.1] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.chunkSize: 16777216
09:55:15.801 [RMI TCP Connection(3)-127.0.0.1] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.tinyCacheSize: 512
09:55:15.802 [RMI TCP Connection(3)-127.0.0.1] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.smallCacheSize: 256
09:55:15.802 [RMI TCP Connection(3)-127.0.0.1] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.normalCacheSize: 64
09:55:15.803 [RMI TCP Connection(3)-127.0.0.1] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.maxCachedBufferCapacity: 32768
09:55:15.806 [RMI TCP Connection(3)-127.0.0.1] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.cacheTrimInterval: 8192
09:55:15.807 [RMI TCP Connection(3)-127.0.0.1] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.useCacheForAllThreads: true
09:55:15.843 [RMI TCP Connection(3)-127.0.0.1] DEBUG io.netty.buffer.ByteBufUtil - -Dio.netty.allocator.type: pooled
09:55:15.844 [RMI TCP Connection(3)-127.0.0.1] DEBUG io.netty.buffer.ByteBufUtil - -Dio.netty.threadLocalDirectBufferSize: 65536
09:55:15.844 [RMI TCP Connection(3)-127.0.0.1] DEBUG io.netty.buffer.ByteBufUtil - -Dio.netty.maxThreadLocalCharBufferSize: 16384
09:55:15.876 [RMI TCP Connection(3)-127.0.0.1] INFO com.hong.spring.netty.WebsocketServer - WEBSOCKET服务启动完成,已绑定端口:7000
一月 20, 2021 9:55:16 上午 org.springframework.web.servlet.DispatcherServlet initServletBean
信息: FrameworkServlet 'netty_server': initialization completed in 7110 ms
[2021-01-20 09:55:16,085] Artifact spring_netty_server:war exploded: Artifact is deployed successfully
[2021-01-20 09:55:16,086] Artifact spring_netty_server:war exploded: Deploy took 13,498 milliseconds

结果

通过:http://coolaf.com/tool/chattest 测试

客户端

有了服务端当然需要通过客户端来连接实现,当然也是可以直接用websocket js来连接,也是可以通过后端来连接发送的。这里的话就通过然后请求接口提交信息,然后将信息提交给服务端,或者调用服务端,将信息返回给用户端就OK了。

pom.xml

代码语言:javascript
复制
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>spring_rpc</artifactId>
        <groupId>com.hong</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.hong.spring</groupId>
    <artifactId>spring_netty_client</artifactId>

    <dependencies>
        <dependency>
            <groupId>com.hong</groupId>
            <artifactId>spring_dubbo_api</artifactId>
            <version>1.0-SNAPSHOT</version>
            <exclusions>
                <exclusion>
                    <groupId>org.mybatis</groupId>
                    <artifactId>mybatis-spring</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.17.Final</version>
        </dependency>
    </dependencies>

    <!--静态资源导出问题-->
    <build>
        <resources>
            <resource>
                <directory>src/main/java</directory>
                <includes>
                    <include>**/*.properties</include>
                    <include>**/*.xml</include>
                </includes>
                <filtering>false</filtering>
            </resource>
            <resource>
                <directory>src/main/resources</directory>
                <includes>
                    <include>**/*.properties</include>
                    <include>**/*.xml</include>
                </includes>
                <filtering>false</filtering>
            </resource>
        </resources>
    </build>
</project>

netty.properties

代码语言:javascript
复制
#web socket端口号
websocket.server.port=7000
websocket.server.host=127.0.0.1
# ws的url
websocket.server.url=ws://127.0.0.1:7000/hong
# 用于处理客户端连接请求
client.server.bossGroup.threads=1

logging.properties

代码语言:javascript
复制
org.apache.catalina.core.ContainerBase.[Catalina].level=INFO 
org.apache.catalina.core.ContainerBase.[Catalina].handlers=java.util.logging.ConsoleHandler

handlers = org.apache.juli.FileHandler, java.util.logging.ConsoleHandler

############################################################
# Handler specific properties.
# Describes specific configuration info for Handlers.
############################################################

org.apache.juli.FileHandler.level = FINE
org.apache.juli.FileHandler.directory = ../logs
org.apache.juli.FileHandler.prefix = error-debug.

java.util.logging.ConsoleHandler.level = FINE
java.util.logging.ConsoleHandler.formatter = java.util.logging.SimpleFormatter

log4j2.xml

代码语言:javascript
复制
<?xml version="1.0" encoding="UTF-8"?>
<configuration status="INFO">
    <appenders>
        <Console name="Console" target="SYSTEM_OUT">
            <PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"/>
        </Console>
        <RollingFile name="RollingFile" fileName="logs/app.log"
                     filePattern="logs/$${date:yyyy-MM}/app-%d{MM-dd-yyyy}-%i.log.gz">
            <PatternLayout pattern="%d{yyyy.MM.dd 'at' HH:mm:ss z} %-5level %class{36} %L %M - %msg%xEx%n"/>
            <SizeBasedTriggeringPolicy size="5 MB"/>
        </RollingFile>
    </appenders>
    <loggers>
        <root level="DEBUG">
            <appender-ref ref="Console"/>
            <appender-ref ref="RollingFile"/>
        </root>
    </loggers>
</configuration>

applicationContext.xml

代码语言:javascript
复制
<?xml version="1.0" encoding="UTF-8" ?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context" xmlns:tx="http://www.springframework.org/schema/tx"
       xmlns:mvc="http://www.springframework.org/schema/mvc"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
    http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx.xsd http://www.springframework.org/schema/mvc http://www.springframework.org/schema/mvc/spring-mvc.xsd">


    <!-- 配置组件扫描 -->
    <context:component-scan base-package="com.hong.spring"></context:component-scan>
    <!--加载配置文件-->
    <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer" >
        <property name="locations">
            <list>
                <value>classpath:netty.properties</value>
            </list>
        </property>
    </bean>

    <!-- 把Netty的一些类服务器注册到Spring,方便处理和扩展 -->
    <!-- 用于处理服务端连接请求 -->
    <bean id="eventExecutors" class="io.netty.channel.nio.NioEventLoopGroup">
        <constructor-arg value="${client.server.bossGroup.threads}" />
    </bean>


    <!-- 服务器启动引导类 -->
    <bean id="clientBootstrap" class="io.netty.bootstrap.Bootstrap" scope="prototype"/>

    <!-- 开启注解 -->
    <context:annotation-config />

    <mvc:default-servlet-handler />


    <bean class="org.springframework.web.servlet.view.InternalResourceViewResolver"
          id="internalResourceViewResolver">
        <!-- 前缀 -->
        <property name="prefix" value="/WEB-INF/pages/" />
        <!-- 后缀 -->
        <property name="suffix" value=".html" />
        <property name="contentType" value="text/html"/>
    </bean>

    <!--开启mvc注解事务-->
    <!-- 定义注解驱动 -->
    <mvc:annotation-driven>
        <mvc:message-converters>
            <!-- 设置支持中文 -->
            <bean class="org.springframework.http.converter.StringHttpMessageConverter">
                <property name="supportedMediaTypes">
                    <list>
                        <value>text/plain;charset=UTF-8</value>
                        <value>text/html;charset=UTF-8</value>
                    </list>
                </property>
            </bean>
            <bean class="com.alibaba.fastjson.support.spring.FastJsonHttpMessageConverter"/>
        </mvc:message-converters>
    </mvc:annotation-driven>

    <!-- 自定义的Netty Websocket服务器 -->
    <bean id="webSocketClient" class="com.hong.spring.netty.websocketClient"  lazy-init="true" init-method="run" destroy-method="close">
        <property name="port" value="${websocket.server.port}"/>
        <property name="host" value="${websocket.server.host}"/>
        <property name="uri" value="${websocket.server.url}" />
    </bean>
</beans>

application.properties

代码语言:javascript
复制
logging.level.root=WARN
logging.level.org.springframework.web=DEBUG
logging.level.org.hibernate=ERROR

com.hong.spring.netty.WebSocketClientHandler

代码语言:javascript
复制
package com.hong.spring.netty;
 
import io.netty.channel.*;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.websocketx.*;
import io.netty.util.CharsetUtil;
 
/**
 *
 * 功能描述: 监听器
 *
 * @param:
 * @return:
 * @auther: csh
 * @date: 2021/1/21 11:54
 */
public class WebSocketClientHandler extends SimpleChannelInboundHandler<Object> {
    /**
     * 负责和服务器握手
     */
    private final WebSocketClientHandshaker handshaker;
    /**
     * 握手结果
     */
    private ChannelPromise handshakeFuture;
 
    public WebSocketClientHandler(WebSocketClientHandshaker handshaker) {
        this.handshaker = handshaker;
    }
 
    public ChannelFuture handshakeFuture() {
        return handshakeFuture;
    }
 
 
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        handshaker.handshake(ctx.channel());
    }
 
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("连接断开");
    }
 
    /**
     * 当前handler被添加到pipeline时,new出握手的结果示例,以备将来使用
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        handshakeFuture = ctx.newPromise();
    }
 
    /**
     * 读取数据
     *
     * @param ctx
     * @param msg
     * @throws Exception
     */
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
        Channel ch = ctx.channel();
        //握手未完成,完成握手
        if (!handshaker.isHandshakeComplete()) {
            try {
                handshaker.finishHandshake(ch, (FullHttpResponse) msg);
                System.out.println("完成连接");
                handshakeFuture.setSuccess();
            } catch (WebSocketHandshakeException e) {
                System.out.println("握手连接失败");
                handshakeFuture.setFailure(e);
            }
            return;
        }
        //握手完成,升级为websocket,不应该再收到http报文
        if (msg instanceof FullHttpResponse) {
            FullHttpResponse response = (FullHttpResponse) msg;
            throw new IllegalStateException(
                    "Unexpected FullHttpResponse (getStatus=" + response.status() +
                            ", content=" + response.content().toString(CharsetUtil.UTF_8) + ')');
        }
 
        //处理websocket报文
        WebSocketFrame frame = (WebSocketFrame) msg;
        if (frame instanceof TextWebSocketFrame) {
            TextWebSocketFrame textWebSocketFrame = (TextWebSocketFrame) frame;
            System.out.println("收到消息:" + textWebSocketFrame.text());
        } else if (frame instanceof PongWebSocketFrame) {
            System.out.println("客户端收到pong");
        } else if (frame instanceof CloseWebSocketFrame) {
            System.out.println("客户端手动关闭");
            ch.close();
        }
    }
 
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        if (!handshakeFuture.isDone()) {
            handshakeFuture.setFailure(cause);
        }
        ctx.close();
    }
}

com.hong.spring.netty.websocketClient

代码语言:javascript
复制
package com.hong.spring.netty;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.DefaultHttpHeaders;
import io.netty.handler.codec.http.websocketx.WebSocketClientHandshakerFactory;
import io.netty.handler.codec.http.websocketx.WebSocketVersion;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
import io.netty.util.concurrent.Future;
import lombok.extern.log4j.Log4j2;
import org.springframework.beans.factory.annotation.Autowired;

import javax.net.ssl.SSLException;
import java.net.URI;

/**
 * @author: csh
 * @Date: 2021/1/20 10:38
 * @Description:客户端
 */
@Log4j2
public class websocketClient {

    @Autowired
    private Bootstrap clientBootstrap;

    @Autowired
    private NioEventLoopGroup eventExecutors;
    /**ws url */
    private URI uri;
    /**sslCtx */
    private SslContext sslCtx;
    /**handler */
    private WebSocketClientHandler handler;

    @Autowired
    private WebsocketChannelInitializer websocketChannelInitializer;

    private Channel channel;

    public Channel getChannel() {
        return channel;
    }

    public void setChannel(Channel channel) {
        this.channel = channel;
    }

    /**主机地址 */
    private String host;
    /**端口号 */
    private int port;


    public void close() {
        Future<?> bossGroupFuture = eventExecutors.shutdownGracefully();
        try {
            bossGroupFuture.await();
            System.out.println("websocket客户端服务停止中...");
        } catch (InterruptedException ignore) {
            ignore.printStackTrace();
        }
    }

    public String getHost() {
        return host;
    }

    public void setHost(String host) {
        this.host = host;
    }

    public int getPort() {
        return port;
    }

    public void setPort(int port) {
        this.port = port;
    }

    public URI getUri() {
        return uri;
    }

    public void setUri(URI uri) {
        this.uri = uri;
    }

    public SslContext getSslCtx() {
        return sslCtx;
    }

    public void setSslCtx(SslContext sslCtx) {
        this.sslCtx = sslCtx;
    }

    public WebSocketClientHandler getHandler() {
        return handler;
    }

    public void setHandler(WebSocketClientHandler handler) {
        this.handler = handler;
    }

    public void run() {
        new Thread(()->{
            try {
                String scheme = uri.getScheme() == null ? "ws" : uri.getScheme();
                final String host = uri.getHost() == null ? "127.0.0.1" : uri.getHost();
                final int port = uri.getPort();
                if (!"ws".equalsIgnoreCase(scheme) && !"wss".equalsIgnoreCase(scheme)) {
                    System.out.println("只支持ws");
                    return;
                }

                final boolean ssl = "wss".equalsIgnoreCase(scheme);
                if (ssl) {
                    sslCtx = SslContextBuilder.forClient().trustManager(InsecureTrustManagerFactory.INSTANCE).build();
                } else {
                    sslCtx = null;
                }
                handler = new WebSocketClientHandler(WebSocketClientHandshakerFactory.newHandshaker(uri, WebSocketVersion.V13, null, true, new DefaultHttpHeaders()));
                clientBootstrap.group(eventExecutors)
                        .channel(NioSocketChannel.class)
                        .handler(websocketChannelInitializer);
                System.out.println("client ready");
                Channel channel = clientBootstrap.connect(host, port).sync().channel();
                this.channel = channel;
                //等待握手完成
                handler.handshakeFuture().sync();
                log.info("握手完成!");
                channel.closeFuture().sync();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (SSLException e) {
                e.printStackTrace();
            } finally {
                System.out.println("触发关闭连接池!");
                eventExecutors.shutdownGracefully();
            }
        }).start();
    }
}

com.hong.spring.netty.WebsocketChannelInitializer

代码语言:javascript
复制
package com.hong.spring.netty;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpClientCodec;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.extensions.compression.WebSocketClientCompressionHandler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 *
 * 功能描述: 通道初始化值
 *
 * @param:
 * @return:
 * @auther: csh
 * @date: 2021/1/21 11:54
 */
@Component
public class WebsocketChannelInitializer extends ChannelInitializer<SocketChannel> {
    @Autowired
    private websocketClient websocketClient;

    @Override
    protected void initChannel(SocketChannel ch) {
        ChannelPipeline p = ch.pipeline();
        if (websocketClient != null && websocketClient.getSslCtx()!=null) {
            p.addLast(websocketClient.getSslCtx().newHandler(ch.alloc(), websocketClient.getHost(), websocketClient.getPort()));
        }
        //http协议握手
        p.addLast(new HttpClientCodec());
        p.addLast(new HttpObjectAggregator(8192));
        //支持websocket数据压缩
        p.addLast(WebSocketClientCompressionHandler.INSTANCE);
        p.addLast(websocketClient.getHandler());
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("传进来的消息:"+(TextWebSocketFrame)msg);
        super.channelRead(ctx, msg);
    }
}

com.hong.spring.controller.UserController

代码语言:javascript
复制
package com.hong.spring.controller;

import com.alibaba.fastjson.JSONObject;
import com.hong.spring.netty.websocketClient;
import com.hong.spring.utils.DataResponse;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import io.netty.util.CharsetUtil;
import lombok.extern.log4j.Log4j2;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestController;

/**
 * @Auther: csh
 * @Date: 2020/8/18 16:11
 * @Description:
 */
@RestController
@RequestMapping("/")
@Log4j2
public class UserController {

    @Autowired
    private websocketClient websocketClient;

    @RequestMapping(value = "",method = RequestMethod.GET)
    @ResponseBody
    public DataResponse<Boolean> sendMsg(String msg){
        try {
            log.info("请求进来了{}",msg);
            Channel channel = websocketClient.getChannel();
            log.info("获取的channel"+ JSONObject.toJSONString(channel));
            WebSocketFrame frame = new TextWebSocketFrame(msg);
            channel.writeAndFlush(frame);
            return DataResponse.BuildSuccessResponse();
        }catch (Exception e){
            log.error("发送出错!{}",e);
        }
        return DataResponse.BuildFailResponse("发送出错!");
    }
}

相关配置

请求

服务端

改造成rpc方式

服务端

com.hong.spring.netty.NettyWebSocketHandler 修改如下

代码语言:javascript
复制
//接受浏览器消息
@Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
    System.out.println("收到消息 " + msg.text());
    if(msg.text().startsWith("com.hong.spring.service.IUserService.findById")){
        String[] list = msg.text().split("#");
        if(null==list || list.length<=0){
            ctx.writeAndFlush(new TextWebSocketFrame("没有id"));
        }
        User user = userMapper.findById(Integer.parseInt(list[1]));
        ctx.writeAndFlush(new TextWebSocketFrame(JSONObject.toJSONString(user)));
    }else{
        ctx.writeAndFlush(new TextWebSocketFrame("没有收到值啊!"));
    }
}

客户端

com.hong.spring.netty.websocketClient 修改如下

代码语言:javascript
复制
package com.hong.spring.netty;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.DefaultHttpHeaders;
import io.netty.handler.codec.http.websocketx.WebSocketClientHandshakerFactory;
import io.netty.handler.codec.http.websocketx.WebSocketVersion;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
import io.netty.util.concurrent.Future;
import lombok.extern.log4j.Log4j2;
import org.springframework.beans.factory.annotation.Autowired;

import javax.net.ssl.SSLException;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.net.URI;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * @author: csh
 * @Date: 2021/1/20 10:38
 * @Description:客户端
 */
@Log4j2
public class websocketClient {

    @Autowired
    private Bootstrap clientBootstrap;

    @Autowired
    private NioEventLoopGroup eventExecutors;
    /**ws url */
    private URI uri;
    /**sslCtx */
    private SslContext sslCtx;
    /**handler */
    private WebSocketClientHandler handler;

    @Autowired
    private WebsocketChannelInitializer websocketChannelInitializer;

    private Channel channel;

    public Channel getChannel() {
        return channel;
    }

    public void setChannel(Channel channel) {
        this.channel = channel;
    }

    private static ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());


    /**主机地址 */
    private String host;
    /**端口号 */
    private int port;



    public void close() {
        Future<?> bossGroupFuture = eventExecutors.shutdownGracefully();
        try {
            bossGroupFuture.await();
            System.out.println("websocket客户端服务停止中...");
        } catch (InterruptedException ignore) {
            ignore.printStackTrace();
        }
    }

    public String getHost() {
        return host;
    }

    public void setHost(String host) {
        this.host = host;
    }

    public int getPort() {
        return port;
    }

    public void setPort(int port) {
        this.port = port;
    }

    public URI getUri() {
        return uri;
    }

    public void setUri(URI uri) {
        this.uri = uri;
    }

    public SslContext getSslCtx() {
        return sslCtx;
    }

    public void setSslCtx(SslContext sslCtx) {
        this.sslCtx = sslCtx;
    }

    public WebSocketClientHandler getHandler() {
        return handler;
    }

    public void setHandler(WebSocketClientHandler handler) {
        this.handler = handler;
    }

    public void run() {
        new Thread(()->{
            try {
                String scheme = uri.getScheme() == null ? "ws" : uri.getScheme();
                final String host = uri.getHost() == null ? "127.0.0.1" : uri.getHost();
                final int port = uri.getPort();
                if (!"ws".equalsIgnoreCase(scheme) && !"wss".equalsIgnoreCase(scheme)) {
                    System.out.println("只支持ws");
                    return;
                }

                final boolean ssl = "wss".equalsIgnoreCase(scheme);
                if (ssl) {
                    sslCtx = SslContextBuilder.forClient().trustManager(InsecureTrustManagerFactory.INSTANCE).build();
                } else {
                    sslCtx = null;
                }
                handler = new WebSocketClientHandler(WebSocketClientHandshakerFactory.newHandshaker(uri, WebSocketVersion.V13, null, true, new DefaultHttpHeaders()));
                clientBootstrap.group(eventExecutors)
                        .channel(NioSocketChannel.class)
                        .handler(websocketChannelInitializer);
                System.out.println("client ready");
                Channel channel = clientBootstrap.connect(host, port).sync().channel();
                this.channel = channel;
                //等待握手完成
                handler.handshakeFuture().sync();
                log.info("握手完成!");
                channel.closeFuture().sync();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (SSLException e) {
                e.printStackTrace();
            } finally {
                System.out.println("触发关闭连接池!");
                eventExecutors.shutdownGracefully();
            }
        }).start();
    }
}

com.hong.spring.netty.WebSocketClientHandler

代码语言:javascript
复制
package com.hong.spring.netty;
 
import io.netty.channel.*;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.websocketx.*;
import io.netty.util.CharsetUtil;

import java.util.concurrent.Callable;

/**
 *
 * 功能描述: 监听器
 *
 * @param:
 * @return:
 * @auther: csh
 * @date: 2021/1/21 11:54
 */
public class WebSocketClientHandler extends SimpleChannelInboundHandler<Object> implements Callable {
    /**
     * 负责和服务器握手
     */
    private final WebSocketClientHandshaker handshaker;
    /**上下文 */
    private ChannelHandlerContext context;
    /**参数 */
    private String param;

    /**返回结果 */
    private String result;
    /**
     * 握手结果
     */
    private ChannelPromise handshakeFuture;
 
    public WebSocketClientHandler(WebSocketClientHandshaker handshaker) {
        this.handshaker = handshaker;
    }
 
    public ChannelFuture handshakeFuture() {
        return handshakeFuture;
    }

    public String getParam() {
        return param;
    }

    public void setParam(String param) {
        this.param = param;
    }

    public String getResult() {
        return result;
    }

    public void setResult(String result) {
        this.result = result;
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        this.context = ctx;
        handshaker.handshake(ctx.channel());
    }
 
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("连接断开");
    }
 
    /**
     * 当前handler被添加到pipeline时,new出握手的结果示例,以备将来使用
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        handshakeFuture = ctx.newPromise();
    }

    @Override
    public synchronized void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("channelRead method invoke");
        result = msg.toString();
        System.out.println("server message:"+result);
        notify();
        super.channelRead(ctx, msg);
    }

    /**
     * 读取数据
     *
     * @param ctx
     * @param msg
     * @throws Exception
     */
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
        Channel ch = ctx.channel();
        //握手未完成,完成握手
        if (!handshaker.isHandshakeComplete()) {
            try {
                handshaker.finishHandshake(ch, (FullHttpResponse) msg);
                System.out.println("完成连接");
                handshakeFuture.setSuccess();
            } catch (WebSocketHandshakeException e) {
                System.out.println("握手连接失败");
                handshakeFuture.setFailure(e);
            }
            return;
        }
        //握手完成,升级为websocket,不应该再收到http报文
        if (msg instanceof FullHttpResponse) {
            FullHttpResponse response = (FullHttpResponse) msg;
            throw new IllegalStateException(
                    "Unexpected FullHttpResponse (getStatus=" + response.status() +
                            ", content=" + response.content().toString(CharsetUtil.UTF_8) + ')');
        }
 
        //处理websocket报文
        WebSocketFrame frame = (WebSocketFrame) msg;
        if (frame instanceof TextWebSocketFrame) {
            TextWebSocketFrame textWebSocketFrame = (TextWebSocketFrame) frame;
            System.out.println("客户端收到消息:" + textWebSocketFrame.text());
            result = textWebSocketFrame.text();
        } else if (frame instanceof PongWebSocketFrame) {
            System.out.println("客户端收到pong");
        } else if (frame instanceof CloseWebSocketFrame) {
            System.out.println("客户端手动关闭");
            ch.close();
        }
    }
 
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        if (!handshakeFuture.isDone()) {
            handshakeFuture.setFailure(cause);
        }
        ctx.close();
    }

    @Override
    public synchronized Object call() throws Exception {
        System.out.println("call method invoke");
        context.writeAndFlush(new TextWebSocketFrame(param)); //发送消息给服务端
        System.out.println("send message :"+ param);
        wait(); //等待 channelRead 获取到数据
        return result;
    }
}

com.hong.spring.controller.UserController

代码语言:javascript
复制
@RequestMapping(value = "/user",method = RequestMethod.GET)
@ResponseBody
public DataResponse<Boolean> user(Integer id){
    try {
        WebSocketClientHandler handler = websocketClient.getHandler();
        String method = "com.hong.spring.service.IUserService.findById"+"#"+id;
        handler.setParam(method);
        handler.call();
        log.info("获取的channel"+ JSONObject.toJSONString(handler.getResult()));
        return DataResponse.BuildSuccessResponse(handler.getResult());
    }catch (Exception e){
        log.error("发送出错!{}",e);
    }
    return DataResponse.BuildFailResponse("发送出错!");
}

运行结果

代码语言:javascript
复制
send message :com.hong.spring.service.IUserService.findById#1
15:42:13.285 [nioEventLoopGroup-2-1] DEBUG io.netty.handler.codec.http.websocketx.WebSocket08FrameEncoder - Encoding WebSocket Frame opCode=1 length=47
15:42:13.295 [nioEventLoopGroup-2-1] DEBUG io.netty.handler.codec.http.websocketx.WebSocket08FrameDecoder - Decoding WebSocket Frame opCode=1
15:42:13.295 [nioEventLoopGroup-2-1] DEBUG io.netty.handler.codec.http.websocketx.WebSocket08FrameDecoder - Decoding WebSocket Frame length=35
channelRead method invoke
server message:TextWebSocketFrame(data: PooledUnsafeDirectByteBuf(ridx: 0, widx: 35, cap: 35))
客户端收到消息:{"age":100,"id":1,"username":"333"}
15:42:13.297 [http-nio-8887-exec-1] INFO com.hong.spring.controller.UserController - 获取的channel"{\"age\":100,\"id\":1,\"username\":\"333\"}"

参考:

https://blog.csdn.net/BorisCao/article/details/105895027

https://github.com/datougege/rpc/tree/master/src

总结:netty基于spring mvc 的确相关学习资料很少,并且很多都很不靠谱,所以在这个上面花了不少时间.....,当然工作中netty客户端还是前端为主需要做到心跳机制。

基于springboot的实现

版本信息:

springboot 2.x

jdk 1.8

服务端

pom.xml

代码语言:javascript
复制
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <groupId>com.hong.springboot</groupId>
        <artifactId>springboot_all</artifactId>
        <version>0.0.1-SNAPSHOT</version>
        <relativePath/>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.hong.springboot</groupId>
    <artifactId>springboot_netty_service</artifactId>
    <version>0.0.1-SNAPSHOT</version>


    <properties>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>

        <dependency>
            <groupId>com.hong.springboot</groupId>
            <artifactId>springboot_dubbo_api</artifactId>
            <version>1.0.0-SNAPSHOT</version>
        </dependency>

        <dependency>
            <groupId>org.mybatis.spring.boot</groupId>
            <artifactId>mybatis-spring-boot-starter</artifactId>
            <version>2.1.3</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid-spring-boot-starter</artifactId>
            <version>1.1.10</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.17.Final</version>
        </dependency>


    </dependencies>

    <!--静态资源导出问题-->
    <build>
        <resources>
            <resource>
                <directory>src/main/java</directory>
                <includes>
                    <include>**/*.properties</include>
                    <include>**/*.xml</include>
                </includes>
                <filtering>false</filtering>
            </resource>
            <resource>
                <directory>src/main/resources</directory>
                <includes>
                    <include>**/*.properties</include>
                    <include>**/*.xml</include>
                </includes>
                <filtering>false</filtering>
            </resource>
        </resources>
    </build>

</project>

application.properties

代码语言:javascript
复制
#web socket端口号
websocket.server.port=7000

websocket.server.communication.port=8081
communication.netty.nodes=
# 用于处理客户端连接请求
client.server.bossGroup.threads=1
#用于处理客户端I/O操作
client.server.workerGroup.threads=0
netty.server.bossGroup.threads=1
netty.server.workerGroup.threads=0
netty.client.eventLoopGroup.threads=1



#避免端口冲突
server.port=8085
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.url=jdbc:mysql://localhost:3306/spring?useUnicode=true&characterEncoding=utf-8&serverTimezone=UTC
spring.datasource.username=root
spring.datasource.password=123456

#mybatis配置
mybatis.typeAliasesPackage=com.hong.springboot.entity

com.hong.springboot.config.DruidConfig

代码语言:javascript
复制
package com.hong.springboot.config;

import com.alibaba.druid.pool.DruidDataSource;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import javax.sql.DataSource;

/**
 * @author: csh
 * @Date: 2021/1/8 18:08
 * @Description:数据源配置
 */
@Configuration
public class DruidConfig {
    @Bean
    @ConfigurationProperties(prefix = "spring.datasource")
    public DataSource dataSource(){
        return new DruidDataSource();
    }
}

com.hong.springboot.dao.UserMapper

代码语言:javascript
复制
package com.hong.springboot.dao;

import com.hong.springboot.entity.User;
import org.apache.ibatis.annotations.Select;

import java.util.List;

/**
 * @Auther: csh
 * @Date: 2020/8/18 15:04
 * @Description:用户dao层
 */

public interface UserMapper {
    @Select("select id,user_name,age from user")
    List<User> findAllUser();
}

com.hong.springboot.netty.NettyWebSocketHandler

代码语言:javascript
复制
package com.hong.springboot.netty;

import com.alibaba.fastjson.JSONObject;
import com.hong.springboot.dao.UserMapper;
import com.hong.springboot.entity.User;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.text.SimpleDateFormat;
import java.util.List;

/**
 * @author: csh
 * @Date: 2021/1/19 11:08
 * @Description:websocket监听
 */
@Component
@ChannelHandler.Sharable
public class NettyWebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
    private SimpleDateFormat format=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

    @Autowired
    private UserMapper userMapper;

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        //1.获取Channel通道
        final Channel channel=ctx.channel();
        channel.writeAndFlush(new TextWebSocketFrame("收到啦"));
        channel.writeAndFlush("收到啦");
    }

    //接受浏览器消息
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
        System.out.println("收到消息 " + msg.text());
        if(msg.text().startsWith("com.hong.spring.service.IUserService.findById")){
            String[] list = msg.text().split("#");
            if(null==list || list.length<=0){
                ctx.writeAndFlush(new TextWebSocketFrame("没有id"));
            }
            List <User> allUser = userMapper.findAllUser();
            ctx.writeAndFlush(new TextWebSocketFrame(JSONObject.toJSONString(allUser)));
        }else{
            ctx.writeAndFlush(new TextWebSocketFrame("没有收到值啊!"));
        }
    }

    //当web客户端连接后,触发方法
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        System.out.println("用户连接:"+ctx.channel().id());
    }

    //当web客户端断开后,触发方法
    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        System.out.println("用户断开:"+ctx.channel().id());
    }

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        System.out.println(ctx.channel().id()+"促发事件");
        super.userEventTriggered(ctx, evt);
    }
}

com.hong.springboot.netty.WebsocketChannelInitializer

代码语言:javascript
复制
package com.hong.springboot.netty;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.stream.ChunkedWriteHandler;


public class WebsocketChannelInitializer extends ChannelInitializer<SocketChannel> {


    @Override
    protected void initChannel(SocketChannel ch) {
        ChannelPipeline pipeline = ch.pipeline();

        //基于http协议,使用http的编码和解码器
        pipeline.addLast(new HttpServerCodec());
        //是以块方式写,添加ChunkedWriteHandler处理器
        pipeline.addLast(new ChunkedWriteHandler());
        //http在传输过程中分段
        pipeline.addLast(new HttpObjectAggregator(8192));
        //心跳
        //pipeline.addLast(new IdleStateHandler(20L, 0L, 0L, TimeUnit.SECONDS));
        //将http升级为websocket
        pipeline.addLast(new WebSocketServerProtocolHandler("/hong"));
        //pipeline.addLast(new NettyServerHandler());
        pipeline.addLast(new NettyWebSocketHandler());
    }
}

com.hong.springboot.netty.WebsocketServer

代码语言:javascript
复制
package com.hong.springboot.netty;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.util.concurrent.Future;
import lombok.extern.log4j.Log4j2;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;

/**
 * @author: csh
 * @Date: 2021/1/19 11:45
 * @Description:WebSocket服务器使用独立的线程启动
 */
@Log4j2
@Component
public class WebsocketServer {


    private ChannelFuture serverChannelFuture;

    private  EventLoopGroup bossGroup;

    private  EventLoopGroup workerGroup;
    @Value("${websocket.server.port}")
    private int port;


    public int getPort() {
        return port;
    }

    public void setPort(int port) {
        this.port = port;
    }

    /**
     *
     * 功能描述: websocket服务器
     *
     * @param:
     * @return:
     * @auther: csh
     * @date: 2021/1/19 11:45
     */
    @PostConstruct
    public void start() {
        bossGroup = new NioEventLoopGroup();
        workerGroup = new NioEventLoopGroup();
        try {
            System.out.println("websocket启动了!");
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new WebsocketChannelInitializer());
            serverChannelFuture = serverBootstrap.bind(port).sync();
            log.info("springboot websockt服务启动完成,已绑定端口: " + port);
        } catch (Exception e) {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
            log.error("springboot websocket服务启动异常!", e);
        }
    }
    /**
     *
     * 功能描述: 关闭Netty Websocket服务器,主要是释放连接
     *
     * @param:
     * @return:
     * @auther: csh
     * @date: 2021/1/19 11:48
     */
    @Deprecated
    public void close() {
        serverChannelFuture.channel().close();
        Future<?> bossGroupFuture = bossGroup.shutdownGracefully();
        Future<?> workerGroupFuture = workerGroup.shutdownGracefully();

        try {
            bossGroupFuture.await();
            workerGroupFuture.await();
            log.info("springboot websocket服务停止中...");
        } catch (InterruptedException ignore) {
            ignore.printStackTrace();
        }
    }
}

com.hong.springboot.provider.UserServiceImpl

代码语言:javascript
复制
package com.hong.springboot.provider;


import com.hong.springboot.api.IUserService;
import com.hong.springboot.dao.UserMapper;
import com.hong.springboot.entity.User;
import com.hong.springboot.utils.DataResponse;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.List;


/**
 * @Auther: csh
 * @Date: 2020/8/18 15:16
 * @Description:用户实现
 */
@Service
public class UserServiceImpl implements IUserService {
    @Autowired
    private UserMapper userDao;

    @Override
    public DataResponse<List<User>> findByAll() {
        List <User> allUserList = userDao.findAllUser();
        return DataResponse.BuildSuccessResponse(allUserList,allUserList.size());
    }

}

com.hong.springboot.Application

代码语言:javascript
复制
package com.hong.springboot;

import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

/**
 * @author: csh
 * @Date: 2020/11/21 11:37
 * @Description:
 */
@SpringBootApplication(scanBasePackages = "com.hong.springboot")
@MapperScan("com.hong.springboot.dao")
public class Application {
    public static void main(String[] args) {
        SpringApplication.run(Application.class);
    }
}

结果

代码语言:javascript
复制
/*
* 提示:该行代码过长,系统自动注释不进行高亮。一键复制会移除系统注释 
* "C:\Program Files\Java\jdk1.8.0_181\bin\java.exe" -Dvisualvm.id=57004932705900 "-javaagent:C:\Program Files\JetBrains\IntelliJ IDEA 2018.2.3\lib\idea_rt.jar=51479:C:\Program Files\JetBrains\IntelliJ IDEA 2018.2.3\bin" -Dfile.encoding=UTF-8 -classpath "C:\Program Files\Java\jdk1.8.0_181\jre\lib\charsets.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\deploy.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\ext\access-bridge-64.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\ext\cldrdata.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\ext\dnsns.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\ext\jaccess.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\ext\jfxrt.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\ext\localedata.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\ext\nashorn.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\ext\sunec.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\ext\sunjce_provider.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\ext\sunmscapi.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\ext\sunpkcs11.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\ext\zipfs.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\javaws.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\jce.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\jfr.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\jfxswt.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\jsse.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\management-agent.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\plugin.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\resources.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\rt.jar;D:\ideaWorkSpace\spring\springboot_all\springboot_netty_service\target\classes;D:\ideaWorkSpace\spring\springboot_all\springboot_dubbo_api\target\classes;D:\mvn\org\mybatis\spring\boot\mybatis-spring-boot-starter\2.1.3\mybatis-spring-boot-starter-2.1.3.jar;D:\mvn\org\springframework\boot\spring-boot-starter\2.4.0\spring-boot-starter-2.4.0.jar;D:\mvn\org\springframework\boot\spring-boot\2.4.0\spring-boot-2.4.0.jar;D:\mvn\jakarta\annotation\jakarta.annotation-api\1.3.5\jakarta.annotation-api-1.3.5.jar;D:\mvn\org\yaml\snakeyaml\1.27\snakeyaml-1.27.jar;D:\mvn\org\springframework\boot\spring-boot-starter-jdbc\2.4.0\spring-boot-starter-jdbc-2.4.0.jar;D:\mvn\com\zaxxer\HikariCP\3.4.5\HikariCP-3.4.5.jar;D:\mvn\org\springframework\spring-jdbc\5.3.1\spring-jdbc-5.3.1.jar;D:\mvn\org\springframework\spring-tx\5.3.1\spring-tx-5.3.1.jar;D:\mvn\org\mybatis\spring\boot\mybatis-spring-boot-autoconfigure\2.1.3\mybatis-spring-boot-autoconfigure-2.1.3.jar;D:\mvn\org\mybatis\mybatis\3.5.5\mybatis-3.5.5.jar;D:\mvn\org\mybatis\mybatis-spring\2.0.5\mybatis-spring-2.0.5.jar;D:\mvn\com\alibaba\druid-spring-boot-starter\1.1.10\druid-spring-boot-starter-1.1.10.jar;D:\mvn\com\alibaba\druid\1.1.10\druid-1.1.10.jar;D:\mvn\org\slf4j\slf4j-api\1.7.30\slf4j-api-1.7.30.jar;D:\mvn\org\springframework\boot\spring-boot-autoconfigure\2.4.0\spring-boot-autoconfigure-2.4.0.jar;D:\mvn\org\springframework\boot\spring-boot-starter-web\2.4.0\spring-boot-starter-web-2.4.0.jar;D:\mvn\org\springframework\boot\spring-boot-starter-json\2.4.0\spring-boot-starter-json-2.4.0.jar;D:\mvn\com\fasterxml\jackson\core\jackson-databind\2.11.3\jackson-databind-2.11.3.jar;D:\mvn\com\fasterxml\jackson\core\jackson-annotations\2.11.3\jackson-annotations-2.11.3.jar;D:\mvn\com\fasterxml\jackson\core\jackson-core\2.11.3\jackson-core-2.11.3.jar;D:\mvn\com\fasterxml\jackson\datatype\jackson-datatype-jdk8\2.11.3\jackson-datatype-jdk8-2.11.3.jar;D:\mvn\com\fasterxml\jackson\datatype\jackson-datatype-jsr310\2.11.3\jackson-datatype-jsr310-2.11.3.jar;D:\mvn\com\fasterxml\jackson\module\jackson-module-parameter-names\2.11.3\jackson-module-parameter-names-2.11.3.jar;D:\mvn\org\springframework\boot\spring-boot-starter-tomcat\2.4.0\spring-boot-starter-tomcat-2.4.0.jar;D:\mvn\org\apache\tomcat\embed\tomcat-embed-core\9.0.39\tomcat-embed-core-9.0.39.jar;D:\mvn\org\glassfish\jakarta.el\3.0.3\jakarta.el-3.0.3.jar;D:\mvn\org\apache\tomcat\embed\tomcat-embed-websocket\9.0.39\tomcat-embed-websocket-9.0.39.jar;D:\mvn\org\springframework\spring-web\5.3.1\spring-web-5.3.1.jar;D:\mvn\org\springframework\spring-beans\5.3.1\spring-beans-5.3.1.jar;D:\mvn\org\springframework\spring-webmvc\5.3.1\spring-webmvc-5.3.1.jar;D:\mvn\org\springframework\spring-aop\5.3.1\spring-aop-5.3.1.jar;D:\mvn\org\springframework\spring-context\5.3.1\spring-context-5.3.1.jar;D:\mvn\org\springframework\spring-expression\5.3.1\spring-expression-5.3.1.jar;D:\mvn\org\springframework\spring-core\5.3.1\spring-core-5.3.1.jar;D:\mvn\org\springframework\spring-jcl\5.3.1\spring-jcl-5.3.1.jar;D:\mvn\io\netty\netty-all\4.1.17.Final\netty-all-4.1.17.Final.jar;D:\mvn\org\springframework\boot\spring-boot-starter-logging\2.4.0\spring-boot-starter-logging-2.4.0.jar;D:\mvn\ch\qos\logback\logback-classic\1.2.3\logback-classic-1.2.3.jar;D:\mvn\ch\qos\logback\logback-core\1.2.3\logback-core-1.2.3.jar;D:\mvn\org\apache\logging\log4j\log4j-to-slf4j\2.13.3\log4j-to-slf4j-2.13.3.jar;D:\mvn\org\apache\logging\log4j\log4j-api\2.13.3\log4j-api-2.13.3.jar;D:\mvn\org\slf4j\jul-to-slf4j\1.7.30\jul-to-slf4j-1.7.30.jar;D:\mvn\com\alibaba\fastjson\1.2.62\fastjson-1.2.62.jar;D:\mvn\mysql\mysql-connector-java\8.0.22\mysql-connector-java-8.0.22.jar;D:\mvn\org\projectlombok\lombok\1.16.16\lombok-1.16.16.jar" com.hong.springboot.Application
*/

  . ____ _ __ _ _
 /\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/ ___)| |_)| | | | | || (_| | ) ) ) )
  ' |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot :: (v2.4.0)

2021-01-22 09:38:31.732 INFO 5520 --- [ main] com.hong.springboot.Application : Starting Application using Java 1.8.0_181 on DESKTOP-1VMHJGQ with PID 5520 (D:\ideaWorkSpace\spring\springboot_all\springboot_netty_service\target\classes started by admin in D:\ideaWorkSpace\spring)
2021-01-22 09:38:31.734 INFO 5520 --- [ main] com.hong.springboot.Application : No active profile set, falling back to default profiles: default
2021-01-22 09:38:33.475 INFO 5520 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat initialized with port(s): 8085 (http)
2021-01-22 09:38:33.487 INFO 5520 --- [ main] o.apache.catalina.core.StandardService : Starting service [Tomcat]
2021-01-22 09:38:33.487 INFO 5520 --- [ main] org.apache.catalina.core.StandardEngine : Starting Servlet engine: [Apache Tomcat/9.0.39]
2021-01-22 09:38:33.634 INFO 5520 --- [ main] o.a.c.c.C.[Tomcat].[localhost].[/] : Initializing Spring embedded WebApplicationContext
2021-01-22 09:38:33.634 INFO 5520 --- [ main] w.s.c.ServletWebServerApplicationContext : Root WebApplicationContext: initialization completed in 1851 ms
websocket启动了!
2021-01-22 09:38:34.659 INFO 5520 --- [ main] c.hong.springboot.netty.WebsocketServer : springboot websockt服务启动完成,已绑定端口: 7000
2021-01-22 09:38:34.814 INFO 5520 --- [ main] o.s.s.concurrent.ThreadPoolTaskExecutor : Initializing ExecutorService 'applicationTaskExecutor'
2021-01-22 09:38:35.052 INFO 5520 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat started on port(s): 8085 (http) with context path ''
2021-01-22 09:38:35.064 INFO 5520 --- [ main] com.hong.springboot.Application : Started Application in 3.746 seconds (JVM running for 4.31)
用户连接:ace6a544
ace6a544促发事件
ace6a544促发事件
收到消息 2222222222
收到消息 333333333
收到消息 33333

客户端

pom.xml

代码语言:javascript
复制
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <groupId>com.hong.springboot</groupId>
        <artifactId>springboot_all</artifactId>
        <version>0.0.1-SNAPSHOT</version>
        <relativePath/>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.hong.springboot</groupId>
    <artifactId>springboot_netty_client</artifactId>


    <properties>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.25.Final</version>
        </dependency>
        <dependency>
            <groupId>com.hong.springboot</groupId>
            <artifactId>springboot_dubbo_api</artifactId>
            <version>1.0.0-SNAPSHOT</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

    </dependencies>

    <!--静态资源导出问题-->
    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
        <resources>
            <resource>
                <directory>src/main/java</directory>
                <includes>
                    <include>**/*.properties</include>
                    <include>**/*.xml</include>
                </includes>
                <filtering>false</filtering>
            </resource>
            <resource>
                <directory>src/main/resources</directory>
                <includes>
                    <include>**/*.properties</include>
                    <include>**/*.xml</include>
                </includes>
                <filtering>false</filtering>
            </resource>
        </resources>
    </build>

</project>

application.properties

代码语言:javascript
复制
#web socket端口号
websocket.server.port=7000
websocket.server.host=127.0.0.1
# ws的url
websocket.server.url=ws://127.0.0.1:7000/hong
# 用于处理客户端连接请求
client.server.bossGroup.threads=1

#避免端口冲突
server.port=8086

com.hong.springboot.controller.UserController

代码语言:javascript
复制
package com.hong.springboot.controller;

import com.alibaba.fastjson.JSONObject;
import com.hong.springboot.netty.WebSocketClientHandler;
import com.hong.springboot.netty.WebsocketClient;
import com.hong.springboot.utils.DataResponse;
import io.netty.channel.Channel;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import lombok.extern.log4j.Log4j2;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestController;

/**
 * @Auther: csh
 * @Date: 2020/8/18 16:11
 * @Description:
 */
@RestController
@RequestMapping("/")
@Log4j2
public class UserController {

    @Autowired
    private WebsocketClient websocketClient;


    @RequestMapping(value = "",method = RequestMethod.GET)
    @ResponseBody
    public DataResponse<Boolean> sendMsg(String msg){
        try {
            log.info("请求进来了{}",msg);
            Channel channel = websocketClient.getChannel();
            log.info("获取的channel"+ JSONObject.toJSONString(channel));
            WebSocketFrame frame = new TextWebSocketFrame(msg);
            channel.writeAndFlush(frame);
            return DataResponse.BuildSuccessResponse();
        }catch (Exception e){
            log.error("发送出错!{}",e);
        }
        return DataResponse.BuildFailResponse("发送出错!");
    }

    @RequestMapping(value = "/user",method = RequestMethod.GET)
    @ResponseBody
    public DataResponse<Boolean> user(Integer id){
        try {
            WebSocketClientHandler handler = websocketClient.getHandler();
            String method = "com.hong.spring.service.IUserService.findById"+"#"+id;
            handler.setParam(method);
            handler.call();
            log.info("获取的channel"+ JSONObject.toJSONString(handler.getResult()));
            return DataResponse.BuildSuccessResponse(handler.getResult());
        }catch (Exception e){
            log.error("发送出错!{}",e);
        }
        return DataResponse.BuildFailResponse("发送出错!");
    }
}

com.hong.springboot.netty.MyWebsocketChannelInitializer

代码语言:javascript
复制
package com.hong.springboot.netty;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpClientCodec;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.extensions.compression.WebSocketClientCompressionHandler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 *
 * 功能描述: 通道初始化值
 *
 * @param:
 * @return:
 * @auther: csh
 * @date: 2021/1/21 11:54
 */
@Component("myWebsocketChannelInitializer")
public class MyWebsocketChannelInitializer extends ChannelInitializer<SocketChannel> {
    private WebsocketClient websocketClient;



    public MyWebsocketChannelInitializer(WebsocketClient websocketClient) {
        this.websocketClient = websocketClient;
    }

    @Override
    protected void initChannel(SocketChannel ch) {
        ChannelPipeline p = ch.pipeline();
        if (websocketClient != null && websocketClient.getSslCtx()!=null) {
            p.addLast(websocketClient.getSslCtx().newHandler(ch.alloc(), websocketClient.getHost(), websocketClient.getPort()));
        }
        //http协议握手
        p.addLast(new HttpClientCodec());
        p.addLast(new HttpObjectAggregator(8192));
        //支持websocket数据压缩
        p.addLast(WebSocketClientCompressionHandler.INSTANCE);
        p.addLast(websocketClient.getHandler());
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("传进来的消息:"+(TextWebSocketFrame)msg);
        super.channelRead(ctx, msg);
    }
}

com.hong.springboot.netty.WebsocketClient

代码语言:javascript
复制
package com.hong.springboot.netty;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.DefaultHttpHeaders;
import io.netty.handler.codec.http.websocketx.WebSocketClientHandshakerFactory;
import io.netty.handler.codec.http.websocketx.WebSocketVersion;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
import io.netty.util.concurrent.Future;
import lombok.extern.log4j.Log4j2;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import javax.net.ssl.SSLException;
import java.net.URI;

/**
 * @author: csh
 * @Date: 2021/1/20 10:38
 * @Description:客户端
 */
@Log4j2
@Component("websocketClient")
public class WebsocketClient {

    private Bootstrap clientBootstrap;

    private NioEventLoopGroup eventExecutors;
    /**ws url */
    @Value("${websocket.server.url}")
    private URI uri;
    /**sslCtx */
    private SslContext sslCtx;
    /**handler */
    private WebSocketClientHandler handler;

    private Channel channel;

    public Channel getChannel() {
        return channel;
    }

    public void setChannel(Channel channel) {
        this.channel = channel;
    }

    /**主机地址 */
    @Value("${websocket.server.host}")
    private String host;
    /**端口号 */
    @Value("${websocket.server.port}")
    private int port;


    @Deprecated
    public void close() {
        Future<?> bossGroupFuture = eventExecutors.shutdownGracefully();
        try {
            bossGroupFuture.await();
            System.out.println("websocket客户端服务停止中...");
        } catch (InterruptedException ignore) {
            ignore.printStackTrace();
        }
    }

    public String getHost() {
        return host;
    }

    public void setHost(String host) {
        this.host = host;
    }

    public int getPort() {
        return port;
    }

    public void setPort(int port) {
        this.port = port;
    }

    public URI getUri() {
        return uri;
    }

    public void setUri(URI uri) {
        this.uri = uri;
    }

    public SslContext getSslCtx() {
        return sslCtx;
    }

    public void setSslCtx(SslContext sslCtx) {
        this.sslCtx = sslCtx;
    }

    public WebSocketClientHandler getHandler() {
        return handler;
    }

    public void setHandler(WebSocketClientHandler handler) {
        this.handler = handler;
    }

    @PostConstruct
    public void run() {
        new Thread(()->{
            try {
                clientBootstrap = new Bootstrap();
                eventExecutors = new NioEventLoopGroup();
                String scheme = uri.getScheme() == null ? "ws" : uri.getScheme();
                final String host = uri.getHost() == null ? "127.0.0.1" : uri.getHost();
                final int port = uri.getPort();
                if (!"ws".equalsIgnoreCase(scheme) && !"wss".equalsIgnoreCase(scheme)) {
                    System.out.println("只支持ws");
                    return;
                }

                final boolean ssl = "wss".equalsIgnoreCase(scheme);
                if (ssl) {
                    sslCtx = SslContextBuilder.forClient().trustManager(InsecureTrustManagerFactory.INSTANCE).build();
                } else {
                    sslCtx = null;
                }
                handler = new WebSocketClientHandler(WebSocketClientHandshakerFactory.newHandshaker(uri, WebSocketVersion.V13, null, true, new DefaultHttpHeaders()));
                clientBootstrap.group(eventExecutors)
                        .channel(NioSocketChannel.class)
                        .handler(new MyWebsocketChannelInitializer(this));
                System.out.println("client ready");
                Channel channel = clientBootstrap.connect(host, port).sync().channel();
                this.channel = channel;
                //等待握手完成
                handler.handshakeFuture().sync();
                log.info("握手完成!");
                channel.closeFuture().sync();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (SSLException e) {
                e.printStackTrace();
            } finally {
                System.out.println("触发关闭连接池!");
                eventExecutors.shutdownGracefully();
            }
        }).start();
    }
}

com.hong.springboot.netty.WebSocketClientHandler

代码语言:javascript
复制
package com.hong.springboot.netty;
 
import io.netty.channel.*;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.websocketx.*;
import io.netty.util.CharsetUtil;

import java.util.concurrent.Callable;

/**
 *
 * 功能描述: 监听器
 *
 * @param:
 * @return:
 * @auther: csh
 * @date: 2021/1/21 11:54
 */
public class WebSocketClientHandler extends SimpleChannelInboundHandler<Object> implements Callable {
    /**
     * 负责和服务器握手
     */
    private final WebSocketClientHandshaker handshaker;
    /**上下文 */
    private ChannelHandlerContext context;
    /**参数 */
    private String param;

    /**返回结果 */
    private String result;
    /**
     * 握手结果
     */
    private ChannelPromise handshakeFuture;
 
    public WebSocketClientHandler(WebSocketClientHandshaker handshaker) {
        this.handshaker = handshaker;
    }
 
    public ChannelFuture handshakeFuture() {
        return handshakeFuture;
    }

    public String getParam() {
        return param;
    }

    public void setParam(String param) {
        this.param = param;
    }

    public String getResult() {
        return result;
    }

    public void setResult(String result) {
        this.result = result;
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        this.context = ctx;
        handshaker.handshake(ctx.channel());
    }
 
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("连接断开");
    }
 
    /**
     * 当前handler被添加到pipeline时,new出握手的结果示例,以备将来使用
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        handshakeFuture = ctx.newPromise();
    }

    @Override
    public synchronized void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("channelRead method invoke");
        result = msg.toString();
        System.out.println("server message:"+result);
        notify();
        super.channelRead(ctx, msg);
    }

    /**
     * 读取数据
     *
     * @param ctx
     * @param msg
     * @throws Exception
     */
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
        Channel ch = ctx.channel();
        //握手未完成,完成握手
        if (!handshaker.isHandshakeComplete()) {
            try {
                handshaker.finishHandshake(ch, (FullHttpResponse) msg);
                System.out.println("完成连接");
                handshakeFuture.setSuccess();
            } catch (WebSocketHandshakeException e) {
                System.out.println("握手连接失败");
                handshakeFuture.setFailure(e);
            }
            return;
        }
        //握手完成,升级为websocket,不应该再收到http报文
        if (msg instanceof FullHttpResponse) {
            FullHttpResponse response = (FullHttpResponse) msg;
            throw new IllegalStateException(
                    "Unexpected FullHttpResponse (getStatus=" + response.status() +
                            ", content=" + response.content().toString(CharsetUtil.UTF_8) + ')');
        }
 
        //处理websocket报文
        WebSocketFrame frame = (WebSocketFrame) msg;
        if (frame instanceof TextWebSocketFrame) {
            TextWebSocketFrame textWebSocketFrame = (TextWebSocketFrame) frame;
            System.out.println("客户端收到消息:" + textWebSocketFrame.text());
            result = textWebSocketFrame.text();
        } else if (frame instanceof PongWebSocketFrame) {
            System.out.println("客户端收到pong");
        } else if (frame instanceof CloseWebSocketFrame) {
            System.out.println("客户端手动关闭");
            ch.close();
        }
    }
 
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        if (!handshakeFuture.isDone()) {
            handshakeFuture.setFailure(cause);
        }
        ctx.close();
    }

    @Override
    public synchronized Object call() throws Exception {
        System.out.println("call method invoke");
        context.writeAndFlush(new TextWebSocketFrame(param)); //发送消息给服务端
        System.out.println("send message :"+ param);
        wait(); //等待 channelRead 获取到数据
        return result;
    }
}

com.hong.springboot.Application

代码语言:javascript
复制
package com.hong.springboot;


import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

/**
 * @author: csh
 * @Date: 2020/11/21 11:37
 * @Description:springboot dubbo消费端
 */
@SpringBootApplication(scanBasePackages = "com.hong.springboot")
public class Application {
    public static void main(String[] args) {
        SpringApplication.run(Application.class);
    }
}

请求:http://localhost:8086/?msg=1

客服端收到如下:

服务端收到如下:

尝试RPC调用

参考

https://www.cnblogs.com/cxyxiaobao/p/11946015.html

发现springboot真香~~,相对于mvc来说真的是极大简化和快速....

最后

netty本身就不是一个rpc框架,只是一个集成通讯框架,并且各大rpc其实大部分底层都是基于netty来实现的,所以学习netty是一个相当重要的事情,让我们更清晰明白了解dubbo或其他基于netty实现的rpc框架,是如何实现的,但是本文相对来说比较浅,推荐阅读:《netty权威指南》《netty实战》《Netty、Redis、Zookeeper高并发实战》,当然如何想彻底深入了解建议从io、bio、nio、aio这样一层一层了解上来。毕竟东西有点多,而且有意向高并发高可用方向发展的同学,还需要了解线程相关的知识,netty只是在这些基础上实现而以..

参考文章:

https://tool.oschina.net/apidocs/apidoc?api=netty

https://netty.io/

本文单编辑就花了1.5个小时,写了1个来星期

,如果觉得不错请关注以下二维码~)

本人工作之余,长期在线答疑解惑(仅针对新手,高手请略过...)

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2021-01-23,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 技术趋势 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 代码下载:https://gitee.com/hong99/spring/issues/I1N1DF
相关产品与服务
云数据库 Redis
腾讯云数据库 Redis(TencentDB for Redis)是腾讯云打造的兼容 Redis 协议的缓存和存储服务。丰富的数据结构能帮助您完成不同类型的业务场景开发。支持主从热备,提供自动容灾切换、数据备份、故障迁移、实例监控、在线扩容、数据回档等全套的数据库服务。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档