注:本文篇幅非常长
有将近10万字~,所以建议各位下载源码学习。(如需要请收藏!转载请声明来源,谢谢!)
本文为继上文 :spring整合各种RPC框架(netty、dubbo、dubbox、RPC、Motan)
netty相关介绍
Netty是由JBOSS提供的一个java开源框架,现为 Github上的独立项目。Netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。---百度百科
介绍地址:https://baike.baidu.com/item/Netty/10061624?fr=aladdin
官网:https://netty.io/
netty基于使用
基于java简单实现
版本信息
jdk 1.8
spring 4.x
netty 4.x
com.hong.spring.netty.NettyClientHandler
package com.hong.spring.netty;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
/**
*
* 功能描述: 客户端的监听器
*
* @param:
* @return:
* @auther: csh
* @date: 2021/1/15 16:02
*/
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush(Unpooled.copiedBuffer("hello nettyServer,i'm hong!", CharsetUtil.UTF_8));
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf byteBuf = (ByteBuf) msg;
System.out.println("server msg " + byteBuf.toString(CharsetUtil.UTF_8));
System.out.println("server address " + ctx.channel().remoteAddress());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
com.hong.spring.netty.NettyServerHandler
package com.hong.spring.netty;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
/**
*
* 功能描述: 服务端监听器
*
* @param:
* @return:
* @auther: csh
* @date: 2021/1/15 16:05
*/
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf byteBuf = (ByteBuf) msg;
System.out.println("client msg " + byteBuf.toString(CharsetUtil.UTF_8));
System.out.println("client address " + ctx.channel().remoteAddress());
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush(Unpooled.copiedBuffer("hello client i'm hong server!",CharsetUtil.UTF_8));
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
com.hong.spring.NettyServerTest
package com.hong.spring;
import com.hong.spring.netty.NettyServerHandler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
/**
* @author: csh
* @Date: 2021/1/15 15:56
* @Description:netty服务端
*/
public class NettyServerTest {
public static void main(String[] args) {
//用于接收客户端连接的线程组
EventLoopGroup bossGroup = new NioEventLoopGroup();
//用于实际业务处理的线程组
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
//设置TCP缓冲区
.option(ChannelOption.SO_BACKLOG, 128)
//设置发送数据缓冲大小
.option(ChannelOption.SO_SNDBUF, 32 * 1024)
//设置接受数据缓冲大小
.option(ChannelOption.SO_RCVBUF, 32 * 1024)
//保持连接
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new NettyServerHandler());
}
});
System.out.println("server ready");
//绑定的端口
ChannelFuture channelFuture = serverBootstrap.bind(6668).sync();
channelFuture.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
com.hong.spring.NettyClientTest
package com.hong.spring;
import com.hong.spring.netty.NettyClientHandler;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
/**
* @author: csh
* @Date: 2021/1/15 15:56
* @Description:netty客户端
*/
public class NettyClientTest {
public static void main(String[] args) throws InterruptedException {
NioEventLoopGroup eventExecutors = new NioEventLoopGroup();
try {
//
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(eventExecutors)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new NettyClientHandler());
}
});
System.out.println("client ready");
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6668).sync();
channelFuture.channel().closeFuture().sync();
}finally {
eventExecutors.shutdownGracefully();
}
}
}
运行结果(注意先运行服务端)
服务端
"C:\Program Files\Java\jdk1.8.0_181\bin\java.exe" -Dvisualvm.id=101847092493100 "-javaagent:C:\Program Files\JetBrains\IntelliJ IDEA 2018.2.3\lib\idea_rt.jar=60406:C:\Program Files\JetBrains\IntelliJ IDEA 2018.2.3\bin" -Dfile.encoding=UTF-8 -classpath "C:\Program Files\Java\jdk1.8.0_181\jre\lib\charsets.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\deploy.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\ext\access-bridge-64.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\ext\cldrdata.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\ext\dnsns.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\ext\jaccess.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\ext\jfxrt.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\ext\localedata.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\ext\nashorn.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\ext\sunec.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\ext\sunjce_provider.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\ext\sunmscapi.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\ext\sunpkcs11.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\ext\zipfs.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\javaws.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\jce.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\jfr.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\jfxswt.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\jsse.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\management-agent.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\plugin.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\resources.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\rt.jar;D:\ideaWorkSpace\spring\spring_rpc\spring_netty_server\target\test-classes;D:\ideaWorkSpace\spring\spring_rpc\spring_netty_server\target\classes;D:\ideaWorkSpace\spring\spring_rpc\spring_dubbo_api\target\classes;D:\mvn\io\netty\netty-all\4.1.17.Final\netty-all-4.1.17.Final.jar;D:\mvn\org\mybatis\mybatis\3.5.0\mybatis-3.5.0.jar;D:\mvn\org\mybatis\mybatis-spring\2.0.3\mybatis-spring-2.0.3.jar;D:\mvn\com\fasterxml\jackson\core\jackson-databind\2.5.0\jackson-databind-2.5.0.jar;D:\mvn\com\fasterxml\jackson\core\jackson-core\2.5.0\jackson-core-2.5.0.jar;D:\mvn\com\fasterxml\jackson\core\jackson-annotations\2.5.0\jackson-annotations-2.5.0.jar;D:\mvn\mysql\mysql-connector-java\5.1.34\mysql-connector-java-5.1.34.jar;D:\mvn\org\springframework\spring-beans\4.3.11.RELEASE\spring-beans-4.3.11.RELEASE.jar;D:\mvn\org\springframework\spring-context\4.3.11.RELEASE\spring-context-4.3.11.RELEASE.jar;D:\mvn\org\springframework\spring-context-support\4.3.11.RELEASE\spring-context-support-4.3.11.RELEASE.jar;D:\mvn\org\springframework\spring-expression\4.3.11.RELEASE\spring-expression-4.3.11.RELEASE.jar;D:\mvn\org\springframework\spring-core\4.3.11.RELEASE\spring-core-4.3.11.RELEASE.jar;D:\mvn\org\springframework\spring-web\4.3.11.RELEASE\spring-web-4.3.11.RELEASE.jar;D:\mvn\org\springframework\spring-webmvc\4.3.11.RELEASE\spring-webmvc-4.3.11.RELEASE.jar;D:\mvn\org\springframework\spring-test\4.3.11.RELEASE\spring-test-4.3.11.RELEASE.jar;D:\mvn\org\springframework\spring-tx\4.3.11.RELEASE\spring-tx-4.3.11.RELEASE.jar;D:\mvn\junit\junit\4.13\junit-4.13.jar;D:\mvn\org\hamcrest\hamcrest-core\1.3\hamcrest-core-1.3.jar;D:\mvn\com\alibaba\fastjson\1.2.70\fastjson-1.2.70.jar;D:\mvn\com\alibaba\druid\1.1.23\druid-1.1.23.jar;D:\mvn\org\springframework\spring-aop\4.3.11.RELEASE\spring-aop-4.3.11.RELEASE.jar;D:\mvn\commons-logging\commons-logging\1.2\commons-logging-1.2.jar;D:\mvn\org\apache\logging\log4j\log4j-slf4j-impl\2.11.0\log4j-slf4j-impl-2.11.0.jar;D:\mvn\org\slf4j\slf4j-api\1.8.0-alpha2\slf4j-api-1.8.0-alpha2.jar;D:\mvn\org\apache\logging\log4j\log4j-api\2.11.0\log4j-api-2.11.0.jar;D:\mvn\org\apache\logging\log4j\log4j-core\2.11.0\log4j-core-2.11.0.jar;D:\mvn\org\projectlombok\lombok\1.16.22\lombok-1.16.22.jar;D:\mvn\ch\qos\logback\logback-classic\1.2.2\logback-classic-1.2.2.jar;D:\mvn\ch\qos\logback\logback-core\1.2.2\logback-core-1.2.2.jar;D:\mvn\org\hibernate\hibernate-core\4.2.0.Final\hibernate-core-4.2.0.Final.jar;D:\mvn\antlr\antlr\2.7.7\antlr-2.7.7.jar;D:\mvn\org\jboss\logging\jboss-logging\3.1.0.GA\jboss-logging-3.1.0.GA.jar;D:\mvn\org\jboss\spec\javax\transaction\jboss-transaction-api_1.1_spec\1.0.0.Final\jboss-transaction-api_1.1_spec-1.0.0.Final.jar;D:\mvn\dom4j\dom4j\1.6.1\dom4j-1.6.1.jar;D:\mvn\org\hibernate\javax\persistence\hibernate-jpa-2.0-api\1.0.1.Final\hibernate-jpa-2.0-api-1.0.1.Final.jar;D:\mvn\org\javassist\javassist\3.15.0-GA\javassist-3.15.0-GA.jar;D:\mvn\org\hibernate\common\hibernate-commons-annotations\4.0.1.Final\hibernate-commons-annotations-4.0.1.Final.jar;D:\mvn\org\springframework\spring-orm\4.3.11.RELEASE\spring-orm-4.3.11.RELEASE.jar;D:\mvn\org\springframework\spring-jdbc\4.3.11.RELEASE\spring-jdbc-4.3.11.RELEASE.jar;D:\mvn\javax\servlet\javax.servlet-api\3.0.1\javax.servlet-api-3.0.1.jar" com.hong.spring.NettyServerTest
16:06:39.894 [main] DEBUG io.netty.util.internal.logging.InternalLoggerFactory - Using SLF4J as the default logging framework
16:06:39.905 [main] DEBUG io.netty.channel.MultithreadEventLoopGroup - -Dio.netty.eventLoopThreads: 16
16:06:39.929 [main] DEBUG io.netty.util.internal.PlatformDependent - Platform: Windows
16:06:39.935 [main] DEBUG io.netty.util.internal.PlatformDependent0 - -Dio.netty.noUnsafe: false
16:06:39.936 [main] DEBUG io.netty.util.internal.PlatformDependent0 - Java version: 8
16:06:39.938 [main] DEBUG io.netty.util.internal.PlatformDependent0 - sun.misc.Unsafe.theUnsafe: available
16:06:39.939 [main] DEBUG io.netty.util.internal.PlatformDependent0 - sun.misc.Unsafe.copyMemory: available
16:06:39.940 [main] DEBUG io.netty.util.internal.PlatformDependent0 - java.nio.Buffer.address: available
16:06:39.941 [main] DEBUG io.netty.util.internal.PlatformDependent0 - direct buffer constructor: available
16:06:39.942 [main] DEBUG io.netty.util.internal.PlatformDependent0 - java.nio.Bits.unaligned: available, true
16:06:39.943 [main] DEBUG io.netty.util.internal.PlatformDependent0 - jdk.internal.misc.Unsafe.allocateUninitializedArray(int): unavailable prior to Java9
16:06:39.943 [main] DEBUG io.netty.util.internal.PlatformDependent0 - java.nio.DirectByteBuffer.<init>(long, int): available
16:06:39.943 [main] DEBUG io.netty.util.internal.PlatformDependent - sun.misc.Unsafe: available
16:06:39.945 [main] DEBUG io.netty.util.internal.PlatformDependent - -Dio.netty.tmpdir: C:\Users\admin\AppData\Local\Temp (java.io.tmpdir)
16:06:39.945 [main] DEBUG io.netty.util.internal.PlatformDependent - -Dio.netty.bitMode: 64 (sun.arch.data.model)
16:06:39.947 [main] DEBUG io.netty.util.internal.PlatformDependent - -Dio.netty.noPreferDirect: false
16:06:39.948 [main] DEBUG io.netty.util.internal.PlatformDependent - -Dio.netty.maxDirectMemory: 3793747968 bytes
16:06:39.948 [main] DEBUG io.netty.util.internal.PlatformDependent - -Dio.netty.uninitializedArrayAllocationThreshold: -1
16:06:39.949 [main] DEBUG io.netty.util.internal.CleanerJava6 - java.nio.ByteBuffer.cleaner(): available
16:06:39.965 [main] DEBUG io.netty.channel.nio.NioEventLoop - -Dio.netty.noKeySetOptimization: false
16:06:39.966 [main] DEBUG io.netty.channel.nio.NioEventLoop - -Dio.netty.selectorAutoRebuildThreshold: 512
16:06:39.972 [main] DEBUG io.netty.util.internal.PlatformDependent - org.jctools-core.MpscChunkedArrayQueue: available
server ready
16:06:40.518 [main] DEBUG io.netty.channel.DefaultChannelId - -Dio.netty.processId: 20728 (auto-detected)
16:06:40.521 [main] DEBUG io.netty.util.NetUtil - -Djava.net.preferIPv4Stack: false
16:06:40.521 [main] DEBUG io.netty.util.NetUtil - -Djava.net.preferIPv6Addresses: false
16:06:40.916 [main] DEBUG io.netty.util.NetUtil - Loopback interface: lo (Software Loopback Interface 1, 127.0.0.1)
16:06:40.917 [main] DEBUG io.netty.util.NetUtil - Failed to get SOMAXCONN from sysctl and file \proc\sys\net\core\somaxconn. Default: 200
16:06:41.305 [main] DEBUG io.netty.channel.DefaultChannelId - -Dio.netty.machineId: 30:9c:23:ff:fe:ca:fe:19 (auto-detected)
16:06:41.311 [main] DEBUG io.netty.util.internal.InternalThreadLocalMap - -Dio.netty.threadLocalMap.stringBuilder.initialSize: 1024
16:06:41.312 [main] DEBUG io.netty.util.internal.InternalThreadLocalMap - -Dio.netty.threadLocalMap.stringBuilder.maxSize: 4096
16:06:41.320 [main] DEBUG io.netty.util.ResourceLeakDetector - -Dio.netty.leakDetection.level: simple
16:06:41.321 [main] DEBUG io.netty.util.ResourceLeakDetector - -Dio.netty.leakDetection.targetRecords: 4
16:06:41.343 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.numHeapArenas: 16
16:06:41.344 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.numDirectArenas: 16
16:06:41.344 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.pageSize: 8192
16:06:41.344 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.maxOrder: 11
16:06:41.345 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.chunkSize: 16777216
16:06:41.345 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.tinyCacheSize: 512
16:06:41.345 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.smallCacheSize: 256
16:06:41.345 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.normalCacheSize: 64
16:06:41.345 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.maxCachedBufferCapacity: 32768
16:06:41.346 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.cacheTrimInterval: 8192
16:06:41.346 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.useCacheForAllThreads: true
16:06:41.356 [main] DEBUG io.netty.buffer.ByteBufUtil - -Dio.netty.allocator.type: pooled
16:06:41.356 [main] DEBUG io.netty.buffer.ByteBufUtil - -Dio.netty.threadLocalDirectBufferSize: 65536
16:06:41.356 [main] DEBUG io.netty.buffer.ByteBufUtil - -Dio.netty.maxThreadLocalCharBufferSize: 16384
16:06:50.160 [nioEventLoopGroup-3-1] DEBUG io.netty.util.Recycler - -Dio.netty.recycler.maxCapacityPerThread: 32768
16:06:50.160 [nioEventLoopGroup-3-1] DEBUG io.netty.util.Recycler - -Dio.netty.recycler.maxSharedCapacityFactor: 2
16:06:50.160 [nioEventLoopGroup-3-1] DEBUG io.netty.util.Recycler - -Dio.netty.recycler.linkCapacity: 16
16:06:50.160 [nioEventLoopGroup-3-1] DEBUG io.netty.util.Recycler - -Dio.netty.recycler.ratio: 8
16:06:50.169 [nioEventLoopGroup-3-1] DEBUG io.netty.buffer.AbstractByteBuf - -Dio.netty.buffer.bytebuf.checkAccessible: true
16:06:50.171 [nioEventLoopGroup-3-1] DEBUG io.netty.util.ResourceLeakDetectorFactory - Loaded default ResourceLeakDetector: io.netty.util.ResourceLeakDetector@7f64202e
client msg hello nettyServer,i'm hong!
client address /127.0.0.1:60549
客服端
"C:\Program Files\Java\jdk1.8.0_181\bin\java.exe" -Dvisualvm.id=101855782551700 "-javaagent:C:\Program Files\JetBrains\IntelliJ IDEA 2018.2.3\lib\idea_rt.jar=60505:C:\Program Files\JetBrains\IntelliJ IDEA 2018.2.3\bin" -Dfile.encoding=UTF-8 -classpath "C:\Program Files\Java\jdk1.8.0_181\jre\lib\charsets.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\deploy.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\ext\access-bridge-64.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\ext\cldrdata.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\ext\dnsns.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\ext\jaccess.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\ext\jfxrt.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\ext\localedata.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\ext\nashorn.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\ext\sunec.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\ext\sunjce_provider.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\ext\sunmscapi.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\ext\sunpkcs11.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\ext\zipfs.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\javaws.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\jce.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\jfr.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\jfxswt.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\jsse.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\management-agent.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\plugin.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\resources.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\rt.jar;D:\ideaWorkSpace\spring\spring_rpc\spring_netty_server\target\test-classes;D:\ideaWorkSpace\spring\spring_rpc\spring_netty_server\target\classes;D:\ideaWorkSpace\spring\spring_rpc\spring_dubbo_api\target\classes;D:\mvn\io\netty\netty-all\4.1.17.Final\netty-all-4.1.17.Final.jar;D:\mvn\org\mybatis\mybatis\3.5.0\mybatis-3.5.0.jar;D:\mvn\org\mybatis\mybatis-spring\2.0.3\mybatis-spring-2.0.3.jar;D:\mvn\com\fasterxml\jackson\core\jackson-databind\2.5.0\jackson-databind-2.5.0.jar;D:\mvn\com\fasterxml\jackson\core\jackson-core\2.5.0\jackson-core-2.5.0.jar;D:\mvn\com\fasterxml\jackson\core\jackson-annotations\2.5.0\jackson-annotations-2.5.0.jar;D:\mvn\mysql\mysql-connector-java\5.1.34\mysql-connector-java-5.1.34.jar;D:\mvn\org\springframework\spring-beans\4.3.11.RELEASE\spring-beans-4.3.11.RELEASE.jar;D:\mvn\org\springframework\spring-context\4.3.11.RELEASE\spring-context-4.3.11.RELEASE.jar;D:\mvn\org\springframework\spring-context-support\4.3.11.RELEASE\spring-context-support-4.3.11.RELEASE.jar;D:\mvn\org\springframework\spring-expression\4.3.11.RELEASE\spring-expression-4.3.11.RELEASE.jar;D:\mvn\org\springframework\spring-core\4.3.11.RELEASE\spring-core-4.3.11.RELEASE.jar;D:\mvn\org\springframework\spring-web\4.3.11.RELEASE\spring-web-4.3.11.RELEASE.jar;D:\mvn\org\springframework\spring-webmvc\4.3.11.RELEASE\spring-webmvc-4.3.11.RELEASE.jar;D:\mvn\org\springframework\spring-test\4.3.11.RELEASE\spring-test-4.3.11.RELEASE.jar;D:\mvn\org\springframework\spring-tx\4.3.11.RELEASE\spring-tx-4.3.11.RELEASE.jar;D:\mvn\junit\junit\4.13\junit-4.13.jar;D:\mvn\org\hamcrest\hamcrest-core\1.3\hamcrest-core-1.3.jar;D:\mvn\com\alibaba\fastjson\1.2.70\fastjson-1.2.70.jar;D:\mvn\com\alibaba\druid\1.1.23\druid-1.1.23.jar;D:\mvn\org\springframework\spring-aop\4.3.11.RELEASE\spring-aop-4.3.11.RELEASE.jar;D:\mvn\commons-logging\commons-logging\1.2\commons-logging-1.2.jar;D:\mvn\org\apache\logging\log4j\log4j-slf4j-impl\2.11.0\log4j-slf4j-impl-2.11.0.jar;D:\mvn\org\slf4j\slf4j-api\1.8.0-alpha2\slf4j-api-1.8.0-alpha2.jar;D:\mvn\org\apache\logging\log4j\log4j-api\2.11.0\log4j-api-2.11.0.jar;D:\mvn\org\apache\logging\log4j\log4j-core\2.11.0\log4j-core-2.11.0.jar;D:\mvn\org\projectlombok\lombok\1.16.22\lombok-1.16.22.jar;D:\mvn\ch\qos\logback\logback-classic\1.2.2\logback-classic-1.2.2.jar;D:\mvn\ch\qos\logback\logback-core\1.2.2\logback-core-1.2.2.jar;D:\mvn\org\hibernate\hibernate-core\4.2.0.Final\hibernate-core-4.2.0.Final.jar;D:\mvn\antlr\antlr\2.7.7\antlr-2.7.7.jar;D:\mvn\org\jboss\logging\jboss-logging\3.1.0.GA\jboss-logging-3.1.0.GA.jar;D:\mvn\org\jboss\spec\javax\transaction\jboss-transaction-api_1.1_spec\1.0.0.Final\jboss-transaction-api_1.1_spec-1.0.0.Final.jar;D:\mvn\dom4j\dom4j\1.6.1\dom4j-1.6.1.jar;D:\mvn\org\hibernate\javax\persistence\hibernate-jpa-2.0-api\1.0.1.Final\hibernate-jpa-2.0-api-1.0.1.Final.jar;D:\mvn\org\javassist\javassist\3.15.0-GA\javassist-3.15.0-GA.jar;D:\mvn\org\hibernate\common\hibernate-commons-annotations\4.0.1.Final\hibernate-commons-annotations-4.0.1.Final.jar;D:\mvn\org\springframework\spring-orm\4.3.11.RELEASE\spring-orm-4.3.11.RELEASE.jar;D:\mvn\org\springframework\spring-jdbc\4.3.11.RELEASE\spring-jdbc-4.3.11.RELEASE.jar;D:\mvn\javax\servlet\javax.servlet-api\3.0.1\javax.servlet-api-3.0.1.jar" com.hong.spring.NettyClientTest
16:06:48.607 [main] DEBUG io.netty.util.internal.logging.InternalLoggerFactory - Using SLF4J as the default logging framework
16:06:48.616 [main] DEBUG io.netty.channel.MultithreadEventLoopGroup - -Dio.netty.eventLoopThreads: 16
16:06:48.637 [main] DEBUG io.netty.util.internal.PlatformDependent - Platform: Windows
16:06:48.643 [main] DEBUG io.netty.util.internal.PlatformDependent0 - -Dio.netty.noUnsafe: false
16:06:48.644 [main] DEBUG io.netty.util.internal.PlatformDependent0 - Java version: 8
16:06:48.645 [main] DEBUG io.netty.util.internal.PlatformDependent0 - sun.misc.Unsafe.theUnsafe: available
16:06:48.646 [main] DEBUG io.netty.util.internal.PlatformDependent0 - sun.misc.Unsafe.copyMemory: available
16:06:48.646 [main] DEBUG io.netty.util.internal.PlatformDependent0 - java.nio.Buffer.address: available
16:06:48.647 [main] DEBUG io.netty.util.internal.PlatformDependent0 - direct buffer constructor: available
16:06:48.648 [main] DEBUG io.netty.util.internal.PlatformDependent0 - java.nio.Bits.unaligned: available, true
16:06:48.649 [main] DEBUG io.netty.util.internal.PlatformDependent0 - jdk.internal.misc.Unsafe.allocateUninitializedArray(int): unavailable prior to Java9
16:06:48.649 [main] DEBUG io.netty.util.internal.PlatformDependent0 - java.nio.DirectByteBuffer.<init>(long, int): available
16:06:48.649 [main] DEBUG io.netty.util.internal.PlatformDependent - sun.misc.Unsafe: available
16:06:48.650 [main] DEBUG io.netty.util.internal.PlatformDependent - -Dio.netty.tmpdir: C:\Users\admin\AppData\Local\Temp (java.io.tmpdir)
16:06:48.651 [main] DEBUG io.netty.util.internal.PlatformDependent - -Dio.netty.bitMode: 64 (sun.arch.data.model)
16:06:48.652 [main] DEBUG io.netty.util.internal.PlatformDependent - -Dio.netty.noPreferDirect: false
16:06:48.652 [main] DEBUG io.netty.util.internal.PlatformDependent - -Dio.netty.maxDirectMemory: 3793747968 bytes
16:06:48.653 [main] DEBUG io.netty.util.internal.PlatformDependent - -Dio.netty.uninitializedArrayAllocationThreshold: -1
16:06:48.654 [main] DEBUG io.netty.util.internal.CleanerJava6 - java.nio.ByteBuffer.cleaner(): available
16:06:48.669 [main] DEBUG io.netty.channel.nio.NioEventLoop - -Dio.netty.noKeySetOptimization: false
16:06:48.670 [main] DEBUG io.netty.channel.nio.NioEventLoop - -Dio.netty.selectorAutoRebuildThreshold: 512
16:06:48.678 [main] DEBUG io.netty.util.internal.PlatformDependent - org.jctools-core.MpscChunkedArrayQueue: available
client ready
16:06:49.250 [main] DEBUG io.netty.channel.DefaultChannelId - -Dio.netty.processId: 20672 (auto-detected)
16:06:49.254 [main] DEBUG io.netty.util.NetUtil - -Djava.net.preferIPv4Stack: false
16:06:49.254 [main] DEBUG io.netty.util.NetUtil - -Djava.net.preferIPv6Addresses: false
16:06:49.664 [main] DEBUG io.netty.util.NetUtil - Loopback interface: lo (Software Loopback Interface 1, 127.0.0.1)
16:06:49.665 [main] DEBUG io.netty.util.NetUtil - Failed to get SOMAXCONN from sysctl and file \proc\sys\net\core\somaxconn. Default: 200
16:06:50.037 [main] DEBUG io.netty.channel.DefaultChannelId - -Dio.netty.machineId: 30:9c:23:ff:fe:ca:fe:19 (auto-detected)
16:06:50.045 [main] DEBUG io.netty.util.internal.InternalThreadLocalMap - -Dio.netty.threadLocalMap.stringBuilder.initialSize: 1024
16:06:50.046 [main] DEBUG io.netty.util.internal.InternalThreadLocalMap - -Dio.netty.threadLocalMap.stringBuilder.maxSize: 4096
16:06:50.057 [main] DEBUG io.netty.util.ResourceLeakDetector - -Dio.netty.leakDetection.level: simple
16:06:50.057 [main] DEBUG io.netty.util.ResourceLeakDetector - -Dio.netty.leakDetection.targetRecords: 4
16:06:50.085 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.numHeapArenas: 16
16:06:50.085 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.numDirectArenas: 16
16:06:50.085 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.pageSize: 8192
16:06:50.086 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.maxOrder: 11
16:06:50.087 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.chunkSize: 16777216
16:06:50.087 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.tinyCacheSize: 512
16:06:50.087 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.smallCacheSize: 256
16:06:50.087 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.normalCacheSize: 64
16:06:50.088 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.maxCachedBufferCapacity: 32768
16:06:50.088 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.cacheTrimInterval: 8192
16:06:50.088 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.useCacheForAllThreads: true
16:06:50.098 [main] DEBUG io.netty.buffer.ByteBufUtil - -Dio.netty.allocator.type: pooled
16:06:50.098 [main] DEBUG io.netty.buffer.ByteBufUtil - -Dio.netty.threadLocalDirectBufferSize: 65536
16:06:50.099 [main] DEBUG io.netty.buffer.ByteBufUtil - -Dio.netty.maxThreadLocalCharBufferSize: 16384
16:06:50.132 [nioEventLoopGroup-2-1] DEBUG io.netty.buffer.AbstractByteBuf - -Dio.netty.buffer.bytebuf.checkAccessible: true
16:06:50.134 [nioEventLoopGroup-2-1] DEBUG io.netty.util.ResourceLeakDetectorFactory - Loaded default ResourceLeakDetector: io.netty.util.ResourceLeakDetector@1fb278ae
16:06:50.142 [nioEventLoopGroup-2-1] DEBUG io.netty.util.Recycler - -Dio.netty.recycler.maxCapacityPerThread: 32768
16:06:50.142 [nioEventLoopGroup-2-1] DEBUG io.netty.util.Recycler - -Dio.netty.recycler.maxSharedCapacityFactor: 2
16:06:50.142 [nioEventLoopGroup-2-1] DEBUG io.netty.util.Recycler - -Dio.netty.recycler.linkCapacity: 16
16:06:50.143 [nioEventLoopGroup-2-1] DEBUG io.netty.util.Recycler - -Dio.netty.recycler.ratio: 8
server msg hello client i'm hong server!
server address /127.0.0.1:6668
命令请求服务端:
telnet localhost 6668
可以发现netty实现非常简单,当然还可以通过其他协议通信,这里只是简单实现。
参考:https://segmentfault.com/a/1190000021834427?utm_source=tag-newest
基于spring的实现
服务端
web.xml
<?xml version="1.0" encoding="UTF-8"?>
<web-app xmlns="http://xmlns.jcp.org/xml/ns/javaee"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/javaee http://xmlns.jcp.org/xml/ns/javaee/web-app_3_1.xsd"
version="3.1">
<servlet>
<servlet-name>netty_server</servlet-name>
<servlet-class>org.springframework.web.servlet.DispatcherServlet</servlet-class>
<init-param>
<param-name>contextConfigLocation</param-name>
<param-value>
classpath:applicationContext-mybatis.xml,
classpath:netty.xml
</param-value>
</init-param>
<load-on-startup>1</load-on-startup>
</servlet>
<filter>
<filter-name>encodingFilter</filter-name>
<filter-class>org.springframework.web.filter.CharacterEncodingFilter</filter-class>
<init-param>
<param-name>encoding</param-name>
<param-value>UTF-8</param-value>
</init-param>
<init-param>
<param-name>forceEncoding</param-name>
<param-value>true</param-value>
</init-param>
</filter>
<filter-mapping>
<filter-name>encodingFilter</filter-name>
<url-pattern>/*</url-pattern>
</filter-mapping>
<servlet-mapping>
<servlet-name>netty_server</servlet-name>
<url-pattern>/</url-pattern>
</servlet-mapping>
</web-app>
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>spring_rpc</artifactId>
<groupId>com.hong</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<groupId>com.hong.spring</groupId>
<artifactId>spring_netty_server</artifactId>
<dependencies>
<dependency>
<groupId>com.hong</groupId>
<artifactId>spring_dubbo_api</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.17.Final</version>
</dependency>
</dependencies>
<!--静态资源导出问题-->
<build>
<resources>
<resource>
<directory>src/main/java</directory>
<includes>
<include>**/*.properties</include>
<include>**/*.xml</include>
</includes>
<filtering>false</filtering>
</resource>
<resource>
<directory>src/main/resources</directory>
<includes>
<include>**/*.properties</include>
<include>**/*.xml</include>
</includes>
<filtering>false</filtering>
</resource>
</resources>
</build>
</project>
application.properties
logging.level.root=WARN
logging.level.org.springframework.web=DEBUG
logging.level.org.hibernate=ERROR
applicationContext-mybatis.xml
<?xml version="1.0" encoding="UTF-8" ?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context" xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:mvc="http://www.springframework.org/schema/mvc"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx.xsd http://www.springframework.org/schema/mvc http://www.springframework.org/schema/mvc/spring-mvc.xsd">
<!-- 配置组件扫描 -->
<context:component-scan base-package="com.hong.spring"></context:component-scan>
<!--加载配置文件-->
<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer" >
<property name="locations">
<list>
<value>classpath:jdbc.properties</value>
<value>classpath:netty.properties</value>
</list>
</property>
</bean>
<!-- 开启注解 -->
<context:annotation-config />
<!--开启注解事务-->
<tx:annotation-driven transaction-manager="transactionManager" />
<!--放行静态资源-->
<mvc:default-servlet-handler />
<bean class="org.springframework.web.servlet.view.InternalResourceViewResolver"
id="internalResourceViewResolver">
<!-- 前缀 -->
<property name="prefix" value="/WEB-INF/pages/" />
<!-- 后缀 -->
<property name="suffix" value=".html" />
<property name="contentType" value="text/html"/>
</bean>
<!--开启mvc注解事务-->
<!-- 定义注解驱动 -->
<mvc:annotation-driven>
<mvc:message-converters>
<!-- 设置支持中文 -->
<bean class="org.springframework.http.converter.StringHttpMessageConverter">
<property name="supportedMediaTypes">
<list>
<value>text/plain;charset=UTF-8</value>
<value>text/html;charset=UTF-8</value>
</list>
</property>
</bean>
<bean class="com.alibaba.fastjson.support.spring.FastJsonHttpMessageConverter"/>
</mvc:message-converters>
</mvc:annotation-driven>
<bean id="dataSource" class="com.alibaba.druid.pool.DruidDataSource">
<!-- 基础配置 -->
<property name="url" value="${jdbc.url}"></property>
<property name="driverClassName" value="${jdbc.driver}"></property>
<property name="username" value="${jdbc.user}"></property>
<property name="password" value="${jdbc.password}"></property>
<!-- 关键配置 -->
<!-- 初始化时建立物理连接的个数。初始化发生在显示调用init方法,或者第一次getConnection时 -->
<property name="initialSize" value="3" />
<!-- 最小连接池数量 -->
<property name="minIdle" value="2" />
<!-- 最大连接池数量 -->
<property name="maxActive" value="15" />
<!-- 配置获取连接等待超时的时间 -->
<property name="maxWait" value="10000" />
<!-- 性能配置 -->
<!-- 打开PSCache,并且指定每个连接上PSCache的大小 -->
<property name="poolPreparedStatements" value="true" />
<property name="maxPoolPreparedStatementPerConnectionSize" value="20" />
<!-- 其他配置 -->
<!-- 配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒 -->
<property name="timeBetweenEvictionRunsMillis" value="60000" />
<!-- 配置一个连接在池中最小生存的时间,单位是毫秒 -->
<property name="minEvictableIdleTimeMillis" value="300000" />
<!-- 建议配置为true,不影响性能,并且保证安全性。申请连接的时候检测,如果空闲时间大于timeBetweenEvictionRunsMillis,
执行validationQuery检测连接是否有效。-->
<property name="testWhileIdle" value="true" />
<!-- 这里建议配置为TRUE,防止取到的连接不可用 ,申请连接时执行validationQuery检测连接是否有效,做了这个配置会降低性能。-->
<property name="testOnBorrow" value="true" />
<!-- 归还连接时执行validationQuery检测连接是否有效,做了这个配置会降低性能 -->
<property name="testOnReturn" value="false" />
</bean>
<!--事务管理器-->
<!-- sqlSessionFactory -->
<bean id="sqlSessionFactory" class="org.mybatis.spring.SqlSessionFactoryBean">
<!-- 加载 MyBatis 的配置文件 -->
<property name="configLocation" value="classpath:mybatis.xml"/>
<!-- 数据源 -->
<property name="dataSource" ref="dataSource"/>
<!-- 所有配置的mapper文件 -->
<property name="mapperLocations" value="classpath*:com/hong/spring/mapper/*.xml" />
</bean>
<!-- Mapper 扫描器 -->
<bean class="org.mybatis.spring.mapper.MapperScannerConfigurer">
<!-- 扫描 包下的组件 -->
<property name="basePackage" value="com.hong.spring.dao" />
<!-- 关联mapper扫描器 与 sqlsession管理器 -->
<property name="sqlSessionFactoryBeanName" value="sqlSessionFactory" />
</bean>
<!--事务配置-->
<bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
<property name="dataSource" ref="dataSource" />
</bean>
</beans>
jdbc.properties
config.properties:
#数据库驱动
jdbc.driver=com.mysql.jdbc.Driver
#数据库连接url
jdbc.url=jdbc:mysql://localhost:3306/spring?useUnicode=true&characterEncoding=utf-8
#数据库用户名
jdbc.user=root
#数据库密码
jdbc.password=123456
log4j2.xml
<?xml version="1.0" encoding="UTF-8"?>
<configuration status="INFO">
<appenders>
<Console name="Console" target="SYSTEM_OUT">
<PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"/>
</Console>
<RollingFile name="RollingFile" fileName="logs/app.log"
filePattern="logs/$${date:yyyy-MM}/app-%d{MM-dd-yyyy}-%i.log.gz">
<PatternLayout pattern="%d{yyyy.MM.dd 'at' HH:mm:ss z} %-5level %class{36} %L %M - %msg%xEx%n"/>
<SizeBasedTriggeringPolicy size="5 MB"/>
</RollingFile>
</appenders>
<loggers>
<root level="DEBUG">
<appender-ref ref="Console"/>
<appender-ref ref="RollingFile"/>
</root>
</loggers>
</configuration>
logging.properties
org.apache.catalina.core.ContainerBase.[Catalina].level=INFO
org.apache.catalina.core.ContainerBase.[Catalina].handlers=java.util.logging.ConsoleHandler
handlers = org.apache.juli.FileHandler, java.util.logging.ConsoleHandler
############################################################
# Handler specific properties.
# Describes specific configuration info for Handlers.
############################################################
org.apache.juli.FileHandler.level = FINE
org.apache.juli.FileHandler.directory = ../logs
org.apache.juli.FileHandler.prefix = error-debug.
java.util.logging.ConsoleHandler.level = FINE
java.util.logging.ConsoleHandler.formatter = java.util.logging.SimpleFormatter
mybatis.xml
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE configuration
PUBLIC "-//mybatis.org//DTD Config 3.0//EN"
"http://mybatis.org/dtd/mybatis-3-config.dtd">
<configuration>
<!-- settings -->
<settings>
<!-- 打开延迟加载的开关 -->
<setting name="lazyLoadingEnabled" value="true"/>
<!-- 将积极加载改为消极加载(即按需加载) -->
<setting name="aggressiveLazyLoading" value="false"/>
<!-- 打开全局缓存开关(二级缓存)默认值就是 true -->
<setting name="cacheEnabled" value="true"/>
<!-- 开启驼峰命名转换 Table(create_time) -> Entity(createtime) -->
<setting name="mapUnderscoreToCamelCase" value="true"/>
<!-- 使用列别名代替列名 默认:true seslect name as title from table -->
<setting name="useColumnLabel" value="true"/>
<!--使用jdbc的getGeneratedKeys获取数据库自增主键值-->
<setting name="useGeneratedKeys" value="true"/>
</settings>
<!-- 别名定义 -->
<typeAliases>
<package name="com.hong.spring.entity"/>
</typeAliases>
</configuration>
netty.properties
#web socket端口号
websocket.server.port=7000
websocket.server.communication.port=8081
communication.netty.nodes=
# 用于处理客户端连接请求
client.server.bossGroup.threads=1
#用于处理客户端I/O操作
client.server.workerGroup.threads=0
netty.server.bossGroup.threads=1
netty.server.workerGroup.threads=0
netty.client.eventLoopGroup.threads=1
netty.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd">
<!-- 把Netty的一些类服务器注册到Spring,方便处理和扩展 -->
<!-- 用于处理客户端连接请求 -->
<bean id="bossGroup" class="io.netty.channel.nio.NioEventLoopGroup">
<constructor-arg value="${client.server.bossGroup.threads}" />
</bean>
<!-- 用于处理客户端I/O操作 -->
<bean id="workerGroup" class="io.netty.channel.nio.NioEventLoopGroup">
<constructor-arg value="${client.server.workerGroup.threads}" />
</bean>
<!-- 服务器启动引导类 -->
<bean id="serverBootstrap" class="io.netty.bootstrap.ServerBootstrap" scope="prototype"/>
<!-- 自定义的Netty Websocket服务器 -->
<bean id="webSocketServer" class="com.hong.spring.netty.WebsocketServer" init-method="start" destroy-method="close">
<property name="port" value="${websocket.server.port}"/>
<property name="websocketChannelInitializer" ref="websocketChannelInitializer" />
</bean>
</beans>
com.hong.spring.dao.UserMapper
package com.hong.spring.dao;
import com.hong.spring.entity.User;
import com.hong.spring.entity.ao.UserAO;
import org.apache.ibatis.annotations.Param;
import java.util.List;
/**
* @Auther: csh
* @Date: 2020/8/18 15:04
* @Description:用户dao层
*/
public interface UserMapper {
/**
*
* 功能描述:查询总条数
*
* @param:
* @return:
* @auther: csh
* @date: 2020/8/18 15:31
*/
List<User> findAllUserList();
/**
*
* 功能描述:获取总数
*
* @param:
* @return:
* @auther: csh
* @date: 2020/8/18 15:30
*/
int findAllTotal();
/**
*
* 功能描述:更新
*
* @param:
* @return:
* @auther: csh
* @date: 2020/8/18 15:30
*/
int update(User user);
/**
*
* 功能描述:添加
*
* @param:
* @return:
* @auther: csh
* @date: 2020/8/19 18:39
*/
int save(User user);
/**
*
* 功能描述:批量添加
*
* @param:
* @return:
* @auther: csh
* @date: 2020/8/21 15:46
*/
int insertBatch(@Param("list") List <User> list);
/**
*
* 功能描述:通过id查询
*
* @param:
* @return:
* @auther: csh
* @date: 2020/8/19 18:39
*/
User findById(int id);
/**
*
* 功能描述:通过分页查询
*
* @param:
* @return:
* @auther: csh
* @date: 2020/8/21 16:05
*/
List<User> findByPage(UserAO ao);
}
com/hong/spring/mapper/UserMapper.xml
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper
PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
"http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.hong.spring.dao.UserMapper">
<resultMap type="com.hong.spring.entity.User" id="user">
<id column="id" property="id" />
<result column="user_name" property="username" />
<result column="age" property="age" />
</resultMap>
<select id="findById" resultType="com.hong.spring.entity.User">
SELECT * FROM user WHERE id = #{id,jdbcType=INTEGER}
</select>
<select id="findByPage" resultMap="user" parameterType="com.hong.spring.entity.ao.UserAO">
select * from user where 1=1 limit #{page},#{pageSize}
</select>
<select id="findAllUserList" resultMap="user">
SELECT * FROM user
</select>
<select id="findAllTotal" resultType="int">
SELECT count(*) FROM user
</select>
<insert id="save" >
INSERT INTO user ( user_name, age)
VALUES (#{username,jdbcType=VARCHAR},
#{age,jdbcType=INTEGER})
</insert>
<insert id="insertBatch">
insert into user
( user_name, age)
values
<foreach collection="list" item="user" index="index"
separator=",">
(#{user.username,jdbcType=VARCHAR},#{user.age,jdbcType=INTEGER})
</foreach>
</insert>
<update id="update" >
update user
<set>
<if test="username !=null">
user_name=#{username,jdbcType=VARCHAR},
</if>
<if test="age !=null">
age =#{age,jdbcType=INTEGER}
</if>
</set>
where id = #{id,jdbcType=INTEGER}
</update>
</mapper>
com.hong.spring.netty.NettyClientHandler
package com.hong.spring.netty;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
/**
*
* 功能描述: 客户端的监听器
*
* @param:
* @return:
* @auther: csh
* @date: 2021/1/15 16:02
*/
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
/**
*
* 功能描述: 客户端连接成功,会促发一次
*
* @param:
* @return:
* @auther: csh
* @date: 2021/1/18 15:12
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush(Unpooled.copiedBuffer("hello nettyServer,i'm hong!", CharsetUtil.UTF_8));
}
/**
*
* 功能描述: 接收消息方法
*
* @param:
* @return:
* @auther: csh
* @date: 2021/1/18 15:12
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf byteBuf = (ByteBuf) msg;
System.out.println("server msg " + byteBuf.toString(CharsetUtil.UTF_8));
System.out.println("server address " + ctx.channel().remoteAddress());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
com.hong.spring.netty.NettyHttpHandler
package com.hong.spring.netty;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.*;
import io.netty.util.CharsetUtil;
/**
* @author: csh
* @Date: 2021/1/19 10:54
* @Description:
*/
public class NettyHttpHandler extends SimpleChannelInboundHandler<HttpObject> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception {
if(msg instanceof HttpRequest) {
//1.打印浏览器的请求地址
System.out.println("客户端地址:" + ctx.channel().remoteAddress());
//2.给浏览器发送的信息,封装成ByteBuf
ByteBuf content = Unpooled.copiedBuffer("hello world", CharsetUtil.UTF_8);
//3.构造一个http的相应,即 httpresponse
FullHttpResponse response = new DefaultFullHttpResponse(
HttpVersion.HTTP_1_1,
HttpResponseStatus.OK,
content);
//4.设置响应头信息-响应格式
response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain");
//5.设置响应头信息-响应数据长度
response.headers().set(HttpHeaderNames.CONTENT_LENGTH, content.readableBytes());
//6.将构建好 response返回
ctx.writeAndFlush(response);
}
}
}
com.hong.spring.netty.NettyServerHandler
package com.hong.spring.netty;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
/**
*
* 功能描述: 服务端监听器
*
* 方法 描述
* handlerAdded Handler 被加入 Pipeline 时触发(仅仅触发一次)
* channelRegistered channelRegistered 注册成功时触发
* channelActive channel 连接就绪时触发
* channelRead channel 有数据可读时触发
* channelReadComplete channel 有数据可读,并且读完时触发
* channelInactive channel 断开时触发
* channelUnregistered channel 取消注册时触发
* handlerRemoved handler 被从 Pipeline 移除时触发
*
* @param:
* @return:
* @auther: csh
* @date: 2021/1/15 16:05
*/
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
/**
*
* 功能描述: 收到消息方法!
*
* @param:
* @return:
* @auther: csh
* @date: 2021/1/18 15:13
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf byteBuf = (ByteBuf) msg;
System.out.println("client msg " + byteBuf.toString(CharsetUtil.UTF_8));
System.out.println("client address " + ctx.channel().remoteAddress());
ctx.writeAndFlush(byteBuf.toString(CharsetUtil.UTF_8));
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush(Unpooled.copiedBuffer("hello client i'm hong server!",CharsetUtil.UTF_8));
}
/**
*
* 功能描述: 异常触发
*
* @param:
* @return:
* @auther: csh
* @date: 2021/1/18 15:16
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println(cause.getMessage());
ctx.close();
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println("通道断开:"+ctx.channel().id());
super.channelInactive(ctx);
}
}
com.hong.spring.netty.NettyWebSocketHandler
package com.hong.spring.netty;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import org.springframework.stereotype.Component;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
/**
* @author: csh
* @Date: 2021/1/19 11:08
* @Description:websocket监听
*/
@Component
@ChannelHandler.Sharable
public class NettyWebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
private SimpleDateFormat format=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
//1.获取Channel通道
final Channel channel=ctx.channel();
//2.创建一个定时线程池
ScheduledExecutorService ses= Executors.newScheduledThreadPool(1);
//3.一秒钟之后只需,并且每隔5秒往浏览器发送数据
ses.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
String sendTime=format.format(new Date());
channel.writeAndFlush(new TextWebSocketFrame("推送时间=" + sendTime));
}
},1,5, TimeUnit.SECONDS);
}
//接受浏览器消息
@Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
System.out.println("收到消息 " + msg.text());
}
//当web客户端连接后,触发方法
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
System.out.println("用户连接:"+ctx.channel().id());
}
//当web客户端断开后,触发方法
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
System.out.println("用户断开:"+ctx.channel().id());
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
System.out.println(ctx.channel().id()+"促发事件");
super.userEventTriggered(ctx, evt);
}
}
com.hong.spring.netty.WebsocketChannelInitializer
package com.hong.spring.netty;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.codec.http.websocketx.extensions.compression.WebSocketServerCompressionHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class WebsocketChannelInitializer extends ChannelInitializer<SocketChannel> {
@Autowired
private ChannelHandler nettyWebSocketHandler;
@Override
protected void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
// pipeline.addLast(new HttpServerCodec());
// pipeline.addLast(new WebSocketServerCompressionHandler());
// pipeline.addLast(new WebSocketServerProtocolHandler("/hong",true));
// pipeline.addLast(new HttpObjectAggregator(65536));
//基于http协议,使用http的编码和解码器
pipeline.addLast(new HttpServerCodec());
//是以块方式写,添加ChunkedWriteHandler处理器
pipeline.addLast(new ChunkedWriteHandler());
//http在传输过程中分段
pipeline.addLast(new HttpObjectAggregator(8192));
//心跳
//pipeline.addLast(new IdleStateHandler(20L, 0L, 0L, TimeUnit.SECONDS));
//将http升级为websocket
pipeline.addLast(new WebSocketServerProtocolHandler("/hong"));
//pipeline.addLast(new NettyServerHandler());
pipeline.addLast(new NettyWebSocketHandler());
pipeline.addLast(nettyWebSocketHandler);
}
}
com.hong.spring.netty.WebsocketServer
package com.hong.spring.netty;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.util.concurrent.Future;
import lombok.extern.log4j.Log4j2;
import org.springframework.beans.factory.annotation.Autowired;
/**
* @author: csh
* @Date: 2021/1/19 11:45
* @Description:WebSocket服务器使用独立的线程启动
*/
@Log4j2
public class WebsocketServer {
@Autowired
private ServerBootstrap serverBootstrap;
private ChannelFuture serverChannelFuture;
@Autowired
private EventLoopGroup bossGroup;
@Autowired
private EventLoopGroup workerGroup;
private ChannelHandler websocketChannelInitializer;
private int port;
public ChannelHandler getWebsocketChannelInitializer() {
return websocketChannelInitializer;
}
public void setWebsocketChannelInitializer(ChannelHandler websocketChannelInitializer) {
this.websocketChannelInitializer = websocketChannelInitializer;
}
public int getPort() {
return port;
}
public void setPort(int port) {
this.port = port;
}
/**
*
* 功能描述: websocket服务器
*
* @param:
* @return:
* @auther: csh
* @date: 2021/1/19 11:45
*/
public void start() {
try {
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(websocketChannelInitializer);
serverChannelFuture = serverBootstrap.bind(port).sync();
log.info("WEBSOCKET服务启动完成,已绑定端口:" + port);
} catch (Exception e) {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
log.error("WEBSOCKET服务启动异常!", e);
}
}
/**
*
* 功能描述: 关闭Netty Websocket服务器,主要是释放连接
*
* @param:
* @return:
* @auther: csh
* @date: 2021/1/19 11:48
*/
public void close() {
serverChannelFuture.channel().close();
Future<?> bossGroupFuture = bossGroup.shutdownGracefully();
Future<?> workerGroupFuture = workerGroup.shutdownGracefully();
try {
bossGroupFuture.await();
workerGroupFuture.await();
log.info("websocket服务停止中...");
} catch (InterruptedException ignore) {
ignore.printStackTrace();
}
}
}
com.hong.spring.provider.UserServiceImpl
package com.hong.spring.provider;
import com.hong.spring.api.IUserService;
import com.hong.spring.dao.UserMapper;
import com.hong.spring.entity.User;
import com.hong.spring.entity.ao.UserAO;
import com.hong.spring.utils.DataResponse;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.util.List;
/**
* @Auther: csh
* @Date: 2020/8/18 15:16
* @Description:用户实现
*/
@Service
public class UserServiceImpl implements IUserService {
@Autowired
private UserMapper userDao;
@Override
public DataResponse<List<User>> findByAll() {
List <User> allUserList = userDao.findAllUserList();
int allTotal = userDao.findAllTotal();
return DataResponse.BuildSuccessResponse(allUserList,allTotal);
}
@Override
@Transactional
public DataResponse <Boolean> save(User user) {
if(null==user){
return DataResponse.BuildFailResponse("必传参数不能为空!");
}
int save = userDao.save(user);
return DataResponse.BuildSuccessResponse(save>0?true:false);
}
@Override
public DataResponse <Boolean> insertBatch(List <User> list) {
if(null==list){
return DataResponse.BuildFailResponse("参数不能为空!");
}
int batchSave = userDao.insertBatch(list);
return DataResponse.BuildSuccessResponse(batchSave>0?true:false);
}
@Override
@Transactional
public DataResponse <Boolean> update(User user) {
if(null==user || user.getId()==null){
return DataResponse.BuildFailResponse("必传参数不能为空!");
}
int update = userDao.update(user);
return DataResponse.BuildSuccessResponse(update>0?true:false);
}
@Override
public DataResponse <User> findById(int i) {
User byId = userDao.findById(i);
return DataResponse.BuildSuccessResponse(byId);
}
@Override
public DataResponse <List <User>> findByPage(UserAO ao) {
if(ao==null){
ao.setPage(0);
ao.setPageSize(10);
}else{
ao.setPage(ao.getPageSize() * ao.getPage());
}
int allTotal = userDao.findAllTotal();
List <User> byPage = userDao.findByPage(ao);
return DataResponse.BuildSuccessResponse(byPage,allTotal);
}
}
tomcat配置
启动
09:55:15.796 [RMI TCP Connection(3)-127.0.0.1] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.numHeapArenas: 16
09:55:15.798 [RMI TCP Connection(3)-127.0.0.1] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.numDirectArenas: 16
09:55:15.799 [RMI TCP Connection(3)-127.0.0.1] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.pageSize: 8192
09:55:15.799 [RMI TCP Connection(3)-127.0.0.1] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.maxOrder: 11
09:55:15.800 [RMI TCP Connection(3)-127.0.0.1] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.chunkSize: 16777216
09:55:15.801 [RMI TCP Connection(3)-127.0.0.1] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.tinyCacheSize: 512
09:55:15.802 [RMI TCP Connection(3)-127.0.0.1] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.smallCacheSize: 256
09:55:15.802 [RMI TCP Connection(3)-127.0.0.1] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.normalCacheSize: 64
09:55:15.803 [RMI TCP Connection(3)-127.0.0.1] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.maxCachedBufferCapacity: 32768
09:55:15.806 [RMI TCP Connection(3)-127.0.0.1] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.cacheTrimInterval: 8192
09:55:15.807 [RMI TCP Connection(3)-127.0.0.1] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.useCacheForAllThreads: true
09:55:15.843 [RMI TCP Connection(3)-127.0.0.1] DEBUG io.netty.buffer.ByteBufUtil - -Dio.netty.allocator.type: pooled
09:55:15.844 [RMI TCP Connection(3)-127.0.0.1] DEBUG io.netty.buffer.ByteBufUtil - -Dio.netty.threadLocalDirectBufferSize: 65536
09:55:15.844 [RMI TCP Connection(3)-127.0.0.1] DEBUG io.netty.buffer.ByteBufUtil - -Dio.netty.maxThreadLocalCharBufferSize: 16384
09:55:15.876 [RMI TCP Connection(3)-127.0.0.1] INFO com.hong.spring.netty.WebsocketServer - WEBSOCKET服务启动完成,已绑定端口:7000
一月 20, 2021 9:55:16 上午 org.springframework.web.servlet.DispatcherServlet initServletBean
信息: FrameworkServlet 'netty_server': initialization completed in 7110 ms
[2021-01-20 09:55:16,085] Artifact spring_netty_server:war exploded: Artifact is deployed successfully
[2021-01-20 09:55:16,086] Artifact spring_netty_server:war exploded: Deploy took 13,498 milliseconds
结果
通过:http://coolaf.com/tool/chattest 测试
客户端
有了服务端当然需要通过客户端来连接实现,当然也是可以直接用websocket js来连接,也是可以通过后端来连接发送的。这里的话就通过然后请求接口提交信息,然后将信息提交给服务端,或者调用服务端,将信息返回给用户端就OK了。
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>spring_rpc</artifactId>
<groupId>com.hong</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<groupId>com.hong.spring</groupId>
<artifactId>spring_netty_client</artifactId>
<dependencies>
<dependency>
<groupId>com.hong</groupId>
<artifactId>spring_dubbo_api</artifactId>
<version>1.0-SNAPSHOT</version>
<exclusions>
<exclusion>
<groupId>org.mybatis</groupId>
<artifactId>mybatis-spring</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.17.Final</version>
</dependency>
</dependencies>
<!--静态资源导出问题-->
<build>
<resources>
<resource>
<directory>src/main/java</directory>
<includes>
<include>**/*.properties</include>
<include>**/*.xml</include>
</includes>
<filtering>false</filtering>
</resource>
<resource>
<directory>src/main/resources</directory>
<includes>
<include>**/*.properties</include>
<include>**/*.xml</include>
</includes>
<filtering>false</filtering>
</resource>
</resources>
</build>
</project>
netty.properties
#web socket端口号
websocket.server.port=7000
websocket.server.host=127.0.0.1
# ws的url
websocket.server.url=ws://127.0.0.1:7000/hong
# 用于处理客户端连接请求
client.server.bossGroup.threads=1
logging.properties
org.apache.catalina.core.ContainerBase.[Catalina].level=INFO
org.apache.catalina.core.ContainerBase.[Catalina].handlers=java.util.logging.ConsoleHandler
handlers = org.apache.juli.FileHandler, java.util.logging.ConsoleHandler
############################################################
# Handler specific properties.
# Describes specific configuration info for Handlers.
############################################################
org.apache.juli.FileHandler.level = FINE
org.apache.juli.FileHandler.directory = ../logs
org.apache.juli.FileHandler.prefix = error-debug.
java.util.logging.ConsoleHandler.level = FINE
java.util.logging.ConsoleHandler.formatter = java.util.logging.SimpleFormatter
log4j2.xml
<?xml version="1.0" encoding="UTF-8"?>
<configuration status="INFO">
<appenders>
<Console name="Console" target="SYSTEM_OUT">
<PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"/>
</Console>
<RollingFile name="RollingFile" fileName="logs/app.log"
filePattern="logs/$${date:yyyy-MM}/app-%d{MM-dd-yyyy}-%i.log.gz">
<PatternLayout pattern="%d{yyyy.MM.dd 'at' HH:mm:ss z} %-5level %class{36} %L %M - %msg%xEx%n"/>
<SizeBasedTriggeringPolicy size="5 MB"/>
</RollingFile>
</appenders>
<loggers>
<root level="DEBUG">
<appender-ref ref="Console"/>
<appender-ref ref="RollingFile"/>
</root>
</loggers>
</configuration>
applicationContext.xml
<?xml version="1.0" encoding="UTF-8" ?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context" xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:mvc="http://www.springframework.org/schema/mvc"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx.xsd http://www.springframework.org/schema/mvc http://www.springframework.org/schema/mvc/spring-mvc.xsd">
<!-- 配置组件扫描 -->
<context:component-scan base-package="com.hong.spring"></context:component-scan>
<!--加载配置文件-->
<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer" >
<property name="locations">
<list>
<value>classpath:netty.properties</value>
</list>
</property>
</bean>
<!-- 把Netty的一些类服务器注册到Spring,方便处理和扩展 -->
<!-- 用于处理服务端连接请求 -->
<bean id="eventExecutors" class="io.netty.channel.nio.NioEventLoopGroup">
<constructor-arg value="${client.server.bossGroup.threads}" />
</bean>
<!-- 服务器启动引导类 -->
<bean id="clientBootstrap" class="io.netty.bootstrap.Bootstrap" scope="prototype"/>
<!-- 开启注解 -->
<context:annotation-config />
<mvc:default-servlet-handler />
<bean class="org.springframework.web.servlet.view.InternalResourceViewResolver"
id="internalResourceViewResolver">
<!-- 前缀 -->
<property name="prefix" value="/WEB-INF/pages/" />
<!-- 后缀 -->
<property name="suffix" value=".html" />
<property name="contentType" value="text/html"/>
</bean>
<!--开启mvc注解事务-->
<!-- 定义注解驱动 -->
<mvc:annotation-driven>
<mvc:message-converters>
<!-- 设置支持中文 -->
<bean class="org.springframework.http.converter.StringHttpMessageConverter">
<property name="supportedMediaTypes">
<list>
<value>text/plain;charset=UTF-8</value>
<value>text/html;charset=UTF-8</value>
</list>
</property>
</bean>
<bean class="com.alibaba.fastjson.support.spring.FastJsonHttpMessageConverter"/>
</mvc:message-converters>
</mvc:annotation-driven>
<!-- 自定义的Netty Websocket服务器 -->
<bean id="webSocketClient" class="com.hong.spring.netty.websocketClient" lazy-init="true" init-method="run" destroy-method="close">
<property name="port" value="${websocket.server.port}"/>
<property name="host" value="${websocket.server.host}"/>
<property name="uri" value="${websocket.server.url}" />
</bean>
</beans>
application.properties
logging.level.root=WARN
logging.level.org.springframework.web=DEBUG
logging.level.org.hibernate=ERROR
com.hong.spring.netty.WebSocketClientHandler
package com.hong.spring.netty;
import io.netty.channel.*;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.websocketx.*;
import io.netty.util.CharsetUtil;
/**
*
* 功能描述: 监听器
*
* @param:
* @return:
* @auther: csh
* @date: 2021/1/21 11:54
*/
public class WebSocketClientHandler extends SimpleChannelInboundHandler<Object> {
/**
* 负责和服务器握手
*/
private final WebSocketClientHandshaker handshaker;
/**
* 握手结果
*/
private ChannelPromise handshakeFuture;
public WebSocketClientHandler(WebSocketClientHandshaker handshaker) {
this.handshaker = handshaker;
}
public ChannelFuture handshakeFuture() {
return handshakeFuture;
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
handshaker.handshake(ctx.channel());
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println("连接断开");
}
/**
* 当前handler被添加到pipeline时,new出握手的结果示例,以备将来使用
*
* @param ctx
* @throws Exception
*/
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
handshakeFuture = ctx.newPromise();
}
/**
* 读取数据
*
* @param ctx
* @param msg
* @throws Exception
*/
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
Channel ch = ctx.channel();
//握手未完成,完成握手
if (!handshaker.isHandshakeComplete()) {
try {
handshaker.finishHandshake(ch, (FullHttpResponse) msg);
System.out.println("完成连接");
handshakeFuture.setSuccess();
} catch (WebSocketHandshakeException e) {
System.out.println("握手连接失败");
handshakeFuture.setFailure(e);
}
return;
}
//握手完成,升级为websocket,不应该再收到http报文
if (msg instanceof FullHttpResponse) {
FullHttpResponse response = (FullHttpResponse) msg;
throw new IllegalStateException(
"Unexpected FullHttpResponse (getStatus=" + response.status() +
", content=" + response.content().toString(CharsetUtil.UTF_8) + ')');
}
//处理websocket报文
WebSocketFrame frame = (WebSocketFrame) msg;
if (frame instanceof TextWebSocketFrame) {
TextWebSocketFrame textWebSocketFrame = (TextWebSocketFrame) frame;
System.out.println("收到消息:" + textWebSocketFrame.text());
} else if (frame instanceof PongWebSocketFrame) {
System.out.println("客户端收到pong");
} else if (frame instanceof CloseWebSocketFrame) {
System.out.println("客户端手动关闭");
ch.close();
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
if (!handshakeFuture.isDone()) {
handshakeFuture.setFailure(cause);
}
ctx.close();
}
}
com.hong.spring.netty.websocketClient
package com.hong.spring.netty;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.DefaultHttpHeaders;
import io.netty.handler.codec.http.websocketx.WebSocketClientHandshakerFactory;
import io.netty.handler.codec.http.websocketx.WebSocketVersion;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
import io.netty.util.concurrent.Future;
import lombok.extern.log4j.Log4j2;
import org.springframework.beans.factory.annotation.Autowired;
import javax.net.ssl.SSLException;
import java.net.URI;
/**
* @author: csh
* @Date: 2021/1/20 10:38
* @Description:客户端
*/
@Log4j2
public class websocketClient {
@Autowired
private Bootstrap clientBootstrap;
@Autowired
private NioEventLoopGroup eventExecutors;
/**ws url */
private URI uri;
/**sslCtx */
private SslContext sslCtx;
/**handler */
private WebSocketClientHandler handler;
@Autowired
private WebsocketChannelInitializer websocketChannelInitializer;
private Channel channel;
public Channel getChannel() {
return channel;
}
public void setChannel(Channel channel) {
this.channel = channel;
}
/**主机地址 */
private String host;
/**端口号 */
private int port;
public void close() {
Future<?> bossGroupFuture = eventExecutors.shutdownGracefully();
try {
bossGroupFuture.await();
System.out.println("websocket客户端服务停止中...");
} catch (InterruptedException ignore) {
ignore.printStackTrace();
}
}
public String getHost() {
return host;
}
public void setHost(String host) {
this.host = host;
}
public int getPort() {
return port;
}
public void setPort(int port) {
this.port = port;
}
public URI getUri() {
return uri;
}
public void setUri(URI uri) {
this.uri = uri;
}
public SslContext getSslCtx() {
return sslCtx;
}
public void setSslCtx(SslContext sslCtx) {
this.sslCtx = sslCtx;
}
public WebSocketClientHandler getHandler() {
return handler;
}
public void setHandler(WebSocketClientHandler handler) {
this.handler = handler;
}
public void run() {
new Thread(()->{
try {
String scheme = uri.getScheme() == null ? "ws" : uri.getScheme();
final String host = uri.getHost() == null ? "127.0.0.1" : uri.getHost();
final int port = uri.getPort();
if (!"ws".equalsIgnoreCase(scheme) && !"wss".equalsIgnoreCase(scheme)) {
System.out.println("只支持ws");
return;
}
final boolean ssl = "wss".equalsIgnoreCase(scheme);
if (ssl) {
sslCtx = SslContextBuilder.forClient().trustManager(InsecureTrustManagerFactory.INSTANCE).build();
} else {
sslCtx = null;
}
handler = new WebSocketClientHandler(WebSocketClientHandshakerFactory.newHandshaker(uri, WebSocketVersion.V13, null, true, new DefaultHttpHeaders()));
clientBootstrap.group(eventExecutors)
.channel(NioSocketChannel.class)
.handler(websocketChannelInitializer);
System.out.println("client ready");
Channel channel = clientBootstrap.connect(host, port).sync().channel();
this.channel = channel;
//等待握手完成
handler.handshakeFuture().sync();
log.info("握手完成!");
channel.closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (SSLException e) {
e.printStackTrace();
} finally {
System.out.println("触发关闭连接池!");
eventExecutors.shutdownGracefully();
}
}).start();
}
}
com.hong.spring.netty.WebsocketChannelInitializer
package com.hong.spring.netty;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpClientCodec;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.extensions.compression.WebSocketClientCompressionHandler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
*
* 功能描述: 通道初始化值
*
* @param:
* @return:
* @auther: csh
* @date: 2021/1/21 11:54
*/
@Component
public class WebsocketChannelInitializer extends ChannelInitializer<SocketChannel> {
@Autowired
private websocketClient websocketClient;
@Override
protected void initChannel(SocketChannel ch) {
ChannelPipeline p = ch.pipeline();
if (websocketClient != null && websocketClient.getSslCtx()!=null) {
p.addLast(websocketClient.getSslCtx().newHandler(ch.alloc(), websocketClient.getHost(), websocketClient.getPort()));
}
//http协议握手
p.addLast(new HttpClientCodec());
p.addLast(new HttpObjectAggregator(8192));
//支持websocket数据压缩
p.addLast(WebSocketClientCompressionHandler.INSTANCE);
p.addLast(websocketClient.getHandler());
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("传进来的消息:"+(TextWebSocketFrame)msg);
super.channelRead(ctx, msg);
}
}
com.hong.spring.controller.UserController
package com.hong.spring.controller;
import com.alibaba.fastjson.JSONObject;
import com.hong.spring.netty.websocketClient;
import com.hong.spring.utils.DataResponse;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import io.netty.util.CharsetUtil;
import lombok.extern.log4j.Log4j2;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestController;
/**
* @Auther: csh
* @Date: 2020/8/18 16:11
* @Description:
*/
@RestController
@RequestMapping("/")
@Log4j2
public class UserController {
@Autowired
private websocketClient websocketClient;
@RequestMapping(value = "",method = RequestMethod.GET)
@ResponseBody
public DataResponse<Boolean> sendMsg(String msg){
try {
log.info("请求进来了{}",msg);
Channel channel = websocketClient.getChannel();
log.info("获取的channel"+ JSONObject.toJSONString(channel));
WebSocketFrame frame = new TextWebSocketFrame(msg);
channel.writeAndFlush(frame);
return DataResponse.BuildSuccessResponse();
}catch (Exception e){
log.error("发送出错!{}",e);
}
return DataResponse.BuildFailResponse("发送出错!");
}
}
相关配置
请求
服务端
改造成rpc方式
服务端
com.hong.spring.netty.NettyWebSocketHandler 修改如下
//接受浏览器消息
@Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
System.out.println("收到消息 " + msg.text());
if(msg.text().startsWith("com.hong.spring.service.IUserService.findById")){
String[] list = msg.text().split("#");
if(null==list || list.length<=0){
ctx.writeAndFlush(new TextWebSocketFrame("没有id"));
}
User user = userMapper.findById(Integer.parseInt(list[1]));
ctx.writeAndFlush(new TextWebSocketFrame(JSONObject.toJSONString(user)));
}else{
ctx.writeAndFlush(new TextWebSocketFrame("没有收到值啊!"));
}
}
客户端
com.hong.spring.netty.websocketClient 修改如下
package com.hong.spring.netty;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.DefaultHttpHeaders;
import io.netty.handler.codec.http.websocketx.WebSocketClientHandshakerFactory;
import io.netty.handler.codec.http.websocketx.WebSocketVersion;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
import io.netty.util.concurrent.Future;
import lombok.extern.log4j.Log4j2;
import org.springframework.beans.factory.annotation.Autowired;
import javax.net.ssl.SSLException;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.net.URI;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* @author: csh
* @Date: 2021/1/20 10:38
* @Description:客户端
*/
@Log4j2
public class websocketClient {
@Autowired
private Bootstrap clientBootstrap;
@Autowired
private NioEventLoopGroup eventExecutors;
/**ws url */
private URI uri;
/**sslCtx */
private SslContext sslCtx;
/**handler */
private WebSocketClientHandler handler;
@Autowired
private WebsocketChannelInitializer websocketChannelInitializer;
private Channel channel;
public Channel getChannel() {
return channel;
}
public void setChannel(Channel channel) {
this.channel = channel;
}
private static ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
/**主机地址 */
private String host;
/**端口号 */
private int port;
public void close() {
Future<?> bossGroupFuture = eventExecutors.shutdownGracefully();
try {
bossGroupFuture.await();
System.out.println("websocket客户端服务停止中...");
} catch (InterruptedException ignore) {
ignore.printStackTrace();
}
}
public String getHost() {
return host;
}
public void setHost(String host) {
this.host = host;
}
public int getPort() {
return port;
}
public void setPort(int port) {
this.port = port;
}
public URI getUri() {
return uri;
}
public void setUri(URI uri) {
this.uri = uri;
}
public SslContext getSslCtx() {
return sslCtx;
}
public void setSslCtx(SslContext sslCtx) {
this.sslCtx = sslCtx;
}
public WebSocketClientHandler getHandler() {
return handler;
}
public void setHandler(WebSocketClientHandler handler) {
this.handler = handler;
}
public void run() {
new Thread(()->{
try {
String scheme = uri.getScheme() == null ? "ws" : uri.getScheme();
final String host = uri.getHost() == null ? "127.0.0.1" : uri.getHost();
final int port = uri.getPort();
if (!"ws".equalsIgnoreCase(scheme) && !"wss".equalsIgnoreCase(scheme)) {
System.out.println("只支持ws");
return;
}
final boolean ssl = "wss".equalsIgnoreCase(scheme);
if (ssl) {
sslCtx = SslContextBuilder.forClient().trustManager(InsecureTrustManagerFactory.INSTANCE).build();
} else {
sslCtx = null;
}
handler = new WebSocketClientHandler(WebSocketClientHandshakerFactory.newHandshaker(uri, WebSocketVersion.V13, null, true, new DefaultHttpHeaders()));
clientBootstrap.group(eventExecutors)
.channel(NioSocketChannel.class)
.handler(websocketChannelInitializer);
System.out.println("client ready");
Channel channel = clientBootstrap.connect(host, port).sync().channel();
this.channel = channel;
//等待握手完成
handler.handshakeFuture().sync();
log.info("握手完成!");
channel.closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (SSLException e) {
e.printStackTrace();
} finally {
System.out.println("触发关闭连接池!");
eventExecutors.shutdownGracefully();
}
}).start();
}
}
com.hong.spring.netty.WebSocketClientHandler
package com.hong.spring.netty;
import io.netty.channel.*;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.websocketx.*;
import io.netty.util.CharsetUtil;
import java.util.concurrent.Callable;
/**
*
* 功能描述: 监听器
*
* @param:
* @return:
* @auther: csh
* @date: 2021/1/21 11:54
*/
public class WebSocketClientHandler extends SimpleChannelInboundHandler<Object> implements Callable {
/**
* 负责和服务器握手
*/
private final WebSocketClientHandshaker handshaker;
/**上下文 */
private ChannelHandlerContext context;
/**参数 */
private String param;
/**返回结果 */
private String result;
/**
* 握手结果
*/
private ChannelPromise handshakeFuture;
public WebSocketClientHandler(WebSocketClientHandshaker handshaker) {
this.handshaker = handshaker;
}
public ChannelFuture handshakeFuture() {
return handshakeFuture;
}
public String getParam() {
return param;
}
public void setParam(String param) {
this.param = param;
}
public String getResult() {
return result;
}
public void setResult(String result) {
this.result = result;
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
this.context = ctx;
handshaker.handshake(ctx.channel());
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println("连接断开");
}
/**
* 当前handler被添加到pipeline时,new出握手的结果示例,以备将来使用
*
* @param ctx
* @throws Exception
*/
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
handshakeFuture = ctx.newPromise();
}
@Override
public synchronized void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("channelRead method invoke");
result = msg.toString();
System.out.println("server message:"+result);
notify();
super.channelRead(ctx, msg);
}
/**
* 读取数据
*
* @param ctx
* @param msg
* @throws Exception
*/
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
Channel ch = ctx.channel();
//握手未完成,完成握手
if (!handshaker.isHandshakeComplete()) {
try {
handshaker.finishHandshake(ch, (FullHttpResponse) msg);
System.out.println("完成连接");
handshakeFuture.setSuccess();
} catch (WebSocketHandshakeException e) {
System.out.println("握手连接失败");
handshakeFuture.setFailure(e);
}
return;
}
//握手完成,升级为websocket,不应该再收到http报文
if (msg instanceof FullHttpResponse) {
FullHttpResponse response = (FullHttpResponse) msg;
throw new IllegalStateException(
"Unexpected FullHttpResponse (getStatus=" + response.status() +
", content=" + response.content().toString(CharsetUtil.UTF_8) + ')');
}
//处理websocket报文
WebSocketFrame frame = (WebSocketFrame) msg;
if (frame instanceof TextWebSocketFrame) {
TextWebSocketFrame textWebSocketFrame = (TextWebSocketFrame) frame;
System.out.println("客户端收到消息:" + textWebSocketFrame.text());
result = textWebSocketFrame.text();
} else if (frame instanceof PongWebSocketFrame) {
System.out.println("客户端收到pong");
} else if (frame instanceof CloseWebSocketFrame) {
System.out.println("客户端手动关闭");
ch.close();
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
if (!handshakeFuture.isDone()) {
handshakeFuture.setFailure(cause);
}
ctx.close();
}
@Override
public synchronized Object call() throws Exception {
System.out.println("call method invoke");
context.writeAndFlush(new TextWebSocketFrame(param)); //发送消息给服务端
System.out.println("send message :"+ param);
wait(); //等待 channelRead 获取到数据
return result;
}
}
com.hong.spring.controller.UserController
@RequestMapping(value = "/user",method = RequestMethod.GET)
@ResponseBody
public DataResponse<Boolean> user(Integer id){
try {
WebSocketClientHandler handler = websocketClient.getHandler();
String method = "com.hong.spring.service.IUserService.findById"+"#"+id;
handler.setParam(method);
handler.call();
log.info("获取的channel"+ JSONObject.toJSONString(handler.getResult()));
return DataResponse.BuildSuccessResponse(handler.getResult());
}catch (Exception e){
log.error("发送出错!{}",e);
}
return DataResponse.BuildFailResponse("发送出错!");
}
运行结果
send message :com.hong.spring.service.IUserService.findById#1
15:42:13.285 [nioEventLoopGroup-2-1] DEBUG io.netty.handler.codec.http.websocketx.WebSocket08FrameEncoder - Encoding WebSocket Frame opCode=1 length=47
15:42:13.295 [nioEventLoopGroup-2-1] DEBUG io.netty.handler.codec.http.websocketx.WebSocket08FrameDecoder - Decoding WebSocket Frame opCode=1
15:42:13.295 [nioEventLoopGroup-2-1] DEBUG io.netty.handler.codec.http.websocketx.WebSocket08FrameDecoder - Decoding WebSocket Frame length=35
channelRead method invoke
server message:TextWebSocketFrame(data: PooledUnsafeDirectByteBuf(ridx: 0, widx: 35, cap: 35))
客户端收到消息:{"age":100,"id":1,"username":"333"}
15:42:13.297 [http-nio-8887-exec-1] INFO com.hong.spring.controller.UserController - 获取的channel"{\"age\":100,\"id\":1,\"username\":\"333\"}"
参考:
https://blog.csdn.net/BorisCao/article/details/105895027
https://github.com/datougege/rpc/tree/master/src
总结:netty基于spring mvc 的确相关学习资料很少,并且很多都很不靠谱,所以在这个上面花了不少时间.....,当然工作中netty客户端还是前端为主需要做到心跳机制。
基于springboot的实现
版本信息:
springboot 2.x
jdk 1.8
服务端
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>com.hong.springboot</groupId>
<artifactId>springboot_all</artifactId>
<version>0.0.1-SNAPSHOT</version>
<relativePath/>
</parent>
<modelVersion>4.0.0</modelVersion>
<groupId>com.hong.springboot</groupId>
<artifactId>springboot_netty_service</artifactId>
<version>0.0.1-SNAPSHOT</version>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>com.hong.springboot</groupId>
<artifactId>springboot_dubbo_api</artifactId>
<version>1.0.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
<version>2.1.3</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid-spring-boot-starter</artifactId>
<version>1.1.10</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.17.Final</version>
</dependency>
</dependencies>
<!--静态资源导出问题-->
<build>
<resources>
<resource>
<directory>src/main/java</directory>
<includes>
<include>**/*.properties</include>
<include>**/*.xml</include>
</includes>
<filtering>false</filtering>
</resource>
<resource>
<directory>src/main/resources</directory>
<includes>
<include>**/*.properties</include>
<include>**/*.xml</include>
</includes>
<filtering>false</filtering>
</resource>
</resources>
</build>
</project>
application.properties
#web socket端口号
websocket.server.port=7000
websocket.server.communication.port=8081
communication.netty.nodes=
# 用于处理客户端连接请求
client.server.bossGroup.threads=1
#用于处理客户端I/O操作
client.server.workerGroup.threads=0
netty.server.bossGroup.threads=1
netty.server.workerGroup.threads=0
netty.client.eventLoopGroup.threads=1
#避免端口冲突
server.port=8085
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.url=jdbc:mysql://localhost:3306/spring?useUnicode=true&characterEncoding=utf-8&serverTimezone=UTC
spring.datasource.username=root
spring.datasource.password=123456
#mybatis配置
mybatis.typeAliasesPackage=com.hong.springboot.entity
com.hong.springboot.config.DruidConfig
package com.hong.springboot.config;
import com.alibaba.druid.pool.DruidDataSource;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.sql.DataSource;
/**
* @author: csh
* @Date: 2021/1/8 18:08
* @Description:数据源配置
*/
@Configuration
public class DruidConfig {
@Bean
@ConfigurationProperties(prefix = "spring.datasource")
public DataSource dataSource(){
return new DruidDataSource();
}
}
com.hong.springboot.dao.UserMapper
package com.hong.springboot.dao;
import com.hong.springboot.entity.User;
import org.apache.ibatis.annotations.Select;
import java.util.List;
/**
* @Auther: csh
* @Date: 2020/8/18 15:04
* @Description:用户dao层
*/
public interface UserMapper {
@Select("select id,user_name,age from user")
List<User> findAllUser();
}
com.hong.springboot.netty.NettyWebSocketHandler
package com.hong.springboot.netty;
import com.alibaba.fastjson.JSONObject;
import com.hong.springboot.dao.UserMapper;
import com.hong.springboot.entity.User;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.text.SimpleDateFormat;
import java.util.List;
/**
* @author: csh
* @Date: 2021/1/19 11:08
* @Description:websocket监听
*/
@Component
@ChannelHandler.Sharable
public class NettyWebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
private SimpleDateFormat format=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
@Autowired
private UserMapper userMapper;
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
//1.获取Channel通道
final Channel channel=ctx.channel();
channel.writeAndFlush(new TextWebSocketFrame("收到啦"));
channel.writeAndFlush("收到啦");
}
//接受浏览器消息
@Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
System.out.println("收到消息 " + msg.text());
if(msg.text().startsWith("com.hong.spring.service.IUserService.findById")){
String[] list = msg.text().split("#");
if(null==list || list.length<=0){
ctx.writeAndFlush(new TextWebSocketFrame("没有id"));
}
List <User> allUser = userMapper.findAllUser();
ctx.writeAndFlush(new TextWebSocketFrame(JSONObject.toJSONString(allUser)));
}else{
ctx.writeAndFlush(new TextWebSocketFrame("没有收到值啊!"));
}
}
//当web客户端连接后,触发方法
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
System.out.println("用户连接:"+ctx.channel().id());
}
//当web客户端断开后,触发方法
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
System.out.println("用户断开:"+ctx.channel().id());
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
System.out.println(ctx.channel().id()+"促发事件");
super.userEventTriggered(ctx, evt);
}
}
com.hong.springboot.netty.WebsocketChannelInitializer
package com.hong.springboot.netty;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
public class WebsocketChannelInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
//基于http协议,使用http的编码和解码器
pipeline.addLast(new HttpServerCodec());
//是以块方式写,添加ChunkedWriteHandler处理器
pipeline.addLast(new ChunkedWriteHandler());
//http在传输过程中分段
pipeline.addLast(new HttpObjectAggregator(8192));
//心跳
//pipeline.addLast(new IdleStateHandler(20L, 0L, 0L, TimeUnit.SECONDS));
//将http升级为websocket
pipeline.addLast(new WebSocketServerProtocolHandler("/hong"));
//pipeline.addLast(new NettyServerHandler());
pipeline.addLast(new NettyWebSocketHandler());
}
}
com.hong.springboot.netty.WebsocketServer
package com.hong.springboot.netty;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.util.concurrent.Future;
import lombok.extern.log4j.Log4j2;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
/**
* @author: csh
* @Date: 2021/1/19 11:45
* @Description:WebSocket服务器使用独立的线程启动
*/
@Log4j2
@Component
public class WebsocketServer {
private ChannelFuture serverChannelFuture;
private EventLoopGroup bossGroup;
private EventLoopGroup workerGroup;
@Value("${websocket.server.port}")
private int port;
public int getPort() {
return port;
}
public void setPort(int port) {
this.port = port;
}
/**
*
* 功能描述: websocket服务器
*
* @param:
* @return:
* @auther: csh
* @date: 2021/1/19 11:45
*/
@PostConstruct
public void start() {
bossGroup = new NioEventLoopGroup();
workerGroup = new NioEventLoopGroup();
try {
System.out.println("websocket启动了!");
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new WebsocketChannelInitializer());
serverChannelFuture = serverBootstrap.bind(port).sync();
log.info("springboot websockt服务启动完成,已绑定端口: " + port);
} catch (Exception e) {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
log.error("springboot websocket服务启动异常!", e);
}
}
/**
*
* 功能描述: 关闭Netty Websocket服务器,主要是释放连接
*
* @param:
* @return:
* @auther: csh
* @date: 2021/1/19 11:48
*/
@Deprecated
public void close() {
serverChannelFuture.channel().close();
Future<?> bossGroupFuture = bossGroup.shutdownGracefully();
Future<?> workerGroupFuture = workerGroup.shutdownGracefully();
try {
bossGroupFuture.await();
workerGroupFuture.await();
log.info("springboot websocket服务停止中...");
} catch (InterruptedException ignore) {
ignore.printStackTrace();
}
}
}
com.hong.springboot.provider.UserServiceImpl
package com.hong.springboot.provider;
import com.hong.springboot.api.IUserService;
import com.hong.springboot.dao.UserMapper;
import com.hong.springboot.entity.User;
import com.hong.springboot.utils.DataResponse;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.List;
/**
* @Auther: csh
* @Date: 2020/8/18 15:16
* @Description:用户实现
*/
@Service
public class UserServiceImpl implements IUserService {
@Autowired
private UserMapper userDao;
@Override
public DataResponse<List<User>> findByAll() {
List <User> allUserList = userDao.findAllUser();
return DataResponse.BuildSuccessResponse(allUserList,allUserList.size());
}
}
com.hong.springboot.Application
package com.hong.springboot;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* @author: csh
* @Date: 2020/11/21 11:37
* @Description:
*/
@SpringBootApplication(scanBasePackages = "com.hong.springboot")
@MapperScan("com.hong.springboot.dao")
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class);
}
}
结果
/*
* 提示:该行代码过长,系统自动注释不进行高亮。一键复制会移除系统注释
* "C:\Program Files\Java\jdk1.8.0_181\bin\java.exe" -Dvisualvm.id=57004932705900 "-javaagent:C:\Program Files\JetBrains\IntelliJ IDEA 2018.2.3\lib\idea_rt.jar=51479:C:\Program Files\JetBrains\IntelliJ IDEA 2018.2.3\bin" -Dfile.encoding=UTF-8 -classpath "C:\Program Files\Java\jdk1.8.0_181\jre\lib\charsets.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\deploy.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\ext\access-bridge-64.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\ext\cldrdata.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\ext\dnsns.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\ext\jaccess.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\ext\jfxrt.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\ext\localedata.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\ext\nashorn.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\ext\sunec.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\ext\sunjce_provider.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\ext\sunmscapi.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\ext\sunpkcs11.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\ext\zipfs.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\javaws.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\jce.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\jfr.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\jfxswt.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\jsse.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\management-agent.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\plugin.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\resources.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\rt.jar;D:\ideaWorkSpace\spring\springboot_all\springboot_netty_service\target\classes;D:\ideaWorkSpace\spring\springboot_all\springboot_dubbo_api\target\classes;D:\mvn\org\mybatis\spring\boot\mybatis-spring-boot-starter\2.1.3\mybatis-spring-boot-starter-2.1.3.jar;D:\mvn\org\springframework\boot\spring-boot-starter\2.4.0\spring-boot-starter-2.4.0.jar;D:\mvn\org\springframework\boot\spring-boot\2.4.0\spring-boot-2.4.0.jar;D:\mvn\jakarta\annotation\jakarta.annotation-api\1.3.5\jakarta.annotation-api-1.3.5.jar;D:\mvn\org\yaml\snakeyaml\1.27\snakeyaml-1.27.jar;D:\mvn\org\springframework\boot\spring-boot-starter-jdbc\2.4.0\spring-boot-starter-jdbc-2.4.0.jar;D:\mvn\com\zaxxer\HikariCP\3.4.5\HikariCP-3.4.5.jar;D:\mvn\org\springframework\spring-jdbc\5.3.1\spring-jdbc-5.3.1.jar;D:\mvn\org\springframework\spring-tx\5.3.1\spring-tx-5.3.1.jar;D:\mvn\org\mybatis\spring\boot\mybatis-spring-boot-autoconfigure\2.1.3\mybatis-spring-boot-autoconfigure-2.1.3.jar;D:\mvn\org\mybatis\mybatis\3.5.5\mybatis-3.5.5.jar;D:\mvn\org\mybatis\mybatis-spring\2.0.5\mybatis-spring-2.0.5.jar;D:\mvn\com\alibaba\druid-spring-boot-starter\1.1.10\druid-spring-boot-starter-1.1.10.jar;D:\mvn\com\alibaba\druid\1.1.10\druid-1.1.10.jar;D:\mvn\org\slf4j\slf4j-api\1.7.30\slf4j-api-1.7.30.jar;D:\mvn\org\springframework\boot\spring-boot-autoconfigure\2.4.0\spring-boot-autoconfigure-2.4.0.jar;D:\mvn\org\springframework\boot\spring-boot-starter-web\2.4.0\spring-boot-starter-web-2.4.0.jar;D:\mvn\org\springframework\boot\spring-boot-starter-json\2.4.0\spring-boot-starter-json-2.4.0.jar;D:\mvn\com\fasterxml\jackson\core\jackson-databind\2.11.3\jackson-databind-2.11.3.jar;D:\mvn\com\fasterxml\jackson\core\jackson-annotations\2.11.3\jackson-annotations-2.11.3.jar;D:\mvn\com\fasterxml\jackson\core\jackson-core\2.11.3\jackson-core-2.11.3.jar;D:\mvn\com\fasterxml\jackson\datatype\jackson-datatype-jdk8\2.11.3\jackson-datatype-jdk8-2.11.3.jar;D:\mvn\com\fasterxml\jackson\datatype\jackson-datatype-jsr310\2.11.3\jackson-datatype-jsr310-2.11.3.jar;D:\mvn\com\fasterxml\jackson\module\jackson-module-parameter-names\2.11.3\jackson-module-parameter-names-2.11.3.jar;D:\mvn\org\springframework\boot\spring-boot-starter-tomcat\2.4.0\spring-boot-starter-tomcat-2.4.0.jar;D:\mvn\org\apache\tomcat\embed\tomcat-embed-core\9.0.39\tomcat-embed-core-9.0.39.jar;D:\mvn\org\glassfish\jakarta.el\3.0.3\jakarta.el-3.0.3.jar;D:\mvn\org\apache\tomcat\embed\tomcat-embed-websocket\9.0.39\tomcat-embed-websocket-9.0.39.jar;D:\mvn\org\springframework\spring-web\5.3.1\spring-web-5.3.1.jar;D:\mvn\org\springframework\spring-beans\5.3.1\spring-beans-5.3.1.jar;D:\mvn\org\springframework\spring-webmvc\5.3.1\spring-webmvc-5.3.1.jar;D:\mvn\org\springframework\spring-aop\5.3.1\spring-aop-5.3.1.jar;D:\mvn\org\springframework\spring-context\5.3.1\spring-context-5.3.1.jar;D:\mvn\org\springframework\spring-expression\5.3.1\spring-expression-5.3.1.jar;D:\mvn\org\springframework\spring-core\5.3.1\spring-core-5.3.1.jar;D:\mvn\org\springframework\spring-jcl\5.3.1\spring-jcl-5.3.1.jar;D:\mvn\io\netty\netty-all\4.1.17.Final\netty-all-4.1.17.Final.jar;D:\mvn\org\springframework\boot\spring-boot-starter-logging\2.4.0\spring-boot-starter-logging-2.4.0.jar;D:\mvn\ch\qos\logback\logback-classic\1.2.3\logback-classic-1.2.3.jar;D:\mvn\ch\qos\logback\logback-core\1.2.3\logback-core-1.2.3.jar;D:\mvn\org\apache\logging\log4j\log4j-to-slf4j\2.13.3\log4j-to-slf4j-2.13.3.jar;D:\mvn\org\apache\logging\log4j\log4j-api\2.13.3\log4j-api-2.13.3.jar;D:\mvn\org\slf4j\jul-to-slf4j\1.7.30\jul-to-slf4j-1.7.30.jar;D:\mvn\com\alibaba\fastjson\1.2.62\fastjson-1.2.62.jar;D:\mvn\mysql\mysql-connector-java\8.0.22\mysql-connector-java-8.0.22.jar;D:\mvn\org\projectlombok\lombok\1.16.16\lombok-1.16.16.jar" com.hong.springboot.Application
*/
. ____ _ __ _ _
/\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
\\/ ___)| |_)| | | | | || (_| | ) ) ) )
' |____| .__|_| |_|_| |_\__, | / / / /
=========|_|==============|___/=/_/_/_/
:: Spring Boot :: (v2.4.0)
2021-01-22 09:38:31.732 INFO 5520 --- [ main] com.hong.springboot.Application : Starting Application using Java 1.8.0_181 on DESKTOP-1VMHJGQ with PID 5520 (D:\ideaWorkSpace\spring\springboot_all\springboot_netty_service\target\classes started by admin in D:\ideaWorkSpace\spring)
2021-01-22 09:38:31.734 INFO 5520 --- [ main] com.hong.springboot.Application : No active profile set, falling back to default profiles: default
2021-01-22 09:38:33.475 INFO 5520 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat initialized with port(s): 8085 (http)
2021-01-22 09:38:33.487 INFO 5520 --- [ main] o.apache.catalina.core.StandardService : Starting service [Tomcat]
2021-01-22 09:38:33.487 INFO 5520 --- [ main] org.apache.catalina.core.StandardEngine : Starting Servlet engine: [Apache Tomcat/9.0.39]
2021-01-22 09:38:33.634 INFO 5520 --- [ main] o.a.c.c.C.[Tomcat].[localhost].[/] : Initializing Spring embedded WebApplicationContext
2021-01-22 09:38:33.634 INFO 5520 --- [ main] w.s.c.ServletWebServerApplicationContext : Root WebApplicationContext: initialization completed in 1851 ms
websocket启动了!
2021-01-22 09:38:34.659 INFO 5520 --- [ main] c.hong.springboot.netty.WebsocketServer : springboot websockt服务启动完成,已绑定端口: 7000
2021-01-22 09:38:34.814 INFO 5520 --- [ main] o.s.s.concurrent.ThreadPoolTaskExecutor : Initializing ExecutorService 'applicationTaskExecutor'
2021-01-22 09:38:35.052 INFO 5520 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat started on port(s): 8085 (http) with context path ''
2021-01-22 09:38:35.064 INFO 5520 --- [ main] com.hong.springboot.Application : Started Application in 3.746 seconds (JVM running for 4.31)
用户连接:ace6a544
ace6a544促发事件
ace6a544促发事件
收到消息 2222222222
收到消息 333333333
收到消息 33333
客户端
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>com.hong.springboot</groupId>
<artifactId>springboot_all</artifactId>
<version>0.0.1-SNAPSHOT</version>
<relativePath/>
</parent>
<modelVersion>4.0.0</modelVersion>
<groupId>com.hong.springboot</groupId>
<artifactId>springboot_netty_client</artifactId>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.25.Final</version>
</dependency>
<dependency>
<groupId>com.hong.springboot</groupId>
<artifactId>springboot_dubbo_api</artifactId>
<version>1.0.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<!--静态资源导出问题-->
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
<resources>
<resource>
<directory>src/main/java</directory>
<includes>
<include>**/*.properties</include>
<include>**/*.xml</include>
</includes>
<filtering>false</filtering>
</resource>
<resource>
<directory>src/main/resources</directory>
<includes>
<include>**/*.properties</include>
<include>**/*.xml</include>
</includes>
<filtering>false</filtering>
</resource>
</resources>
</build>
</project>
application.properties
#web socket端口号
websocket.server.port=7000
websocket.server.host=127.0.0.1
# ws的url
websocket.server.url=ws://127.0.0.1:7000/hong
# 用于处理客户端连接请求
client.server.bossGroup.threads=1
#避免端口冲突
server.port=8086
com.hong.springboot.controller.UserController
package com.hong.springboot.controller;
import com.alibaba.fastjson.JSONObject;
import com.hong.springboot.netty.WebSocketClientHandler;
import com.hong.springboot.netty.WebsocketClient;
import com.hong.springboot.utils.DataResponse;
import io.netty.channel.Channel;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import lombok.extern.log4j.Log4j2;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestController;
/**
* @Auther: csh
* @Date: 2020/8/18 16:11
* @Description:
*/
@RestController
@RequestMapping("/")
@Log4j2
public class UserController {
@Autowired
private WebsocketClient websocketClient;
@RequestMapping(value = "",method = RequestMethod.GET)
@ResponseBody
public DataResponse<Boolean> sendMsg(String msg){
try {
log.info("请求进来了{}",msg);
Channel channel = websocketClient.getChannel();
log.info("获取的channel"+ JSONObject.toJSONString(channel));
WebSocketFrame frame = new TextWebSocketFrame(msg);
channel.writeAndFlush(frame);
return DataResponse.BuildSuccessResponse();
}catch (Exception e){
log.error("发送出错!{}",e);
}
return DataResponse.BuildFailResponse("发送出错!");
}
@RequestMapping(value = "/user",method = RequestMethod.GET)
@ResponseBody
public DataResponse<Boolean> user(Integer id){
try {
WebSocketClientHandler handler = websocketClient.getHandler();
String method = "com.hong.spring.service.IUserService.findById"+"#"+id;
handler.setParam(method);
handler.call();
log.info("获取的channel"+ JSONObject.toJSONString(handler.getResult()));
return DataResponse.BuildSuccessResponse(handler.getResult());
}catch (Exception e){
log.error("发送出错!{}",e);
}
return DataResponse.BuildFailResponse("发送出错!");
}
}
com.hong.springboot.netty.MyWebsocketChannelInitializer
package com.hong.springboot.netty;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpClientCodec;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.extensions.compression.WebSocketClientCompressionHandler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
*
* 功能描述: 通道初始化值
*
* @param:
* @return:
* @auther: csh
* @date: 2021/1/21 11:54
*/
@Component("myWebsocketChannelInitializer")
public class MyWebsocketChannelInitializer extends ChannelInitializer<SocketChannel> {
private WebsocketClient websocketClient;
public MyWebsocketChannelInitializer(WebsocketClient websocketClient) {
this.websocketClient = websocketClient;
}
@Override
protected void initChannel(SocketChannel ch) {
ChannelPipeline p = ch.pipeline();
if (websocketClient != null && websocketClient.getSslCtx()!=null) {
p.addLast(websocketClient.getSslCtx().newHandler(ch.alloc(), websocketClient.getHost(), websocketClient.getPort()));
}
//http协议握手
p.addLast(new HttpClientCodec());
p.addLast(new HttpObjectAggregator(8192));
//支持websocket数据压缩
p.addLast(WebSocketClientCompressionHandler.INSTANCE);
p.addLast(websocketClient.getHandler());
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("传进来的消息:"+(TextWebSocketFrame)msg);
super.channelRead(ctx, msg);
}
}
com.hong.springboot.netty.WebsocketClient
package com.hong.springboot.netty;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.DefaultHttpHeaders;
import io.netty.handler.codec.http.websocketx.WebSocketClientHandshakerFactory;
import io.netty.handler.codec.http.websocketx.WebSocketVersion;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
import io.netty.util.concurrent.Future;
import lombok.extern.log4j.Log4j2;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.net.ssl.SSLException;
import java.net.URI;
/**
* @author: csh
* @Date: 2021/1/20 10:38
* @Description:客户端
*/
@Log4j2
@Component("websocketClient")
public class WebsocketClient {
private Bootstrap clientBootstrap;
private NioEventLoopGroup eventExecutors;
/**ws url */
@Value("${websocket.server.url}")
private URI uri;
/**sslCtx */
private SslContext sslCtx;
/**handler */
private WebSocketClientHandler handler;
private Channel channel;
public Channel getChannel() {
return channel;
}
public void setChannel(Channel channel) {
this.channel = channel;
}
/**主机地址 */
@Value("${websocket.server.host}")
private String host;
/**端口号 */
@Value("${websocket.server.port}")
private int port;
@Deprecated
public void close() {
Future<?> bossGroupFuture = eventExecutors.shutdownGracefully();
try {
bossGroupFuture.await();
System.out.println("websocket客户端服务停止中...");
} catch (InterruptedException ignore) {
ignore.printStackTrace();
}
}
public String getHost() {
return host;
}
public void setHost(String host) {
this.host = host;
}
public int getPort() {
return port;
}
public void setPort(int port) {
this.port = port;
}
public URI getUri() {
return uri;
}
public void setUri(URI uri) {
this.uri = uri;
}
public SslContext getSslCtx() {
return sslCtx;
}
public void setSslCtx(SslContext sslCtx) {
this.sslCtx = sslCtx;
}
public WebSocketClientHandler getHandler() {
return handler;
}
public void setHandler(WebSocketClientHandler handler) {
this.handler = handler;
}
@PostConstruct
public void run() {
new Thread(()->{
try {
clientBootstrap = new Bootstrap();
eventExecutors = new NioEventLoopGroup();
String scheme = uri.getScheme() == null ? "ws" : uri.getScheme();
final String host = uri.getHost() == null ? "127.0.0.1" : uri.getHost();
final int port = uri.getPort();
if (!"ws".equalsIgnoreCase(scheme) && !"wss".equalsIgnoreCase(scheme)) {
System.out.println("只支持ws");
return;
}
final boolean ssl = "wss".equalsIgnoreCase(scheme);
if (ssl) {
sslCtx = SslContextBuilder.forClient().trustManager(InsecureTrustManagerFactory.INSTANCE).build();
} else {
sslCtx = null;
}
handler = new WebSocketClientHandler(WebSocketClientHandshakerFactory.newHandshaker(uri, WebSocketVersion.V13, null, true, new DefaultHttpHeaders()));
clientBootstrap.group(eventExecutors)
.channel(NioSocketChannel.class)
.handler(new MyWebsocketChannelInitializer(this));
System.out.println("client ready");
Channel channel = clientBootstrap.connect(host, port).sync().channel();
this.channel = channel;
//等待握手完成
handler.handshakeFuture().sync();
log.info("握手完成!");
channel.closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (SSLException e) {
e.printStackTrace();
} finally {
System.out.println("触发关闭连接池!");
eventExecutors.shutdownGracefully();
}
}).start();
}
}
com.hong.springboot.netty.WebSocketClientHandler
package com.hong.springboot.netty;
import io.netty.channel.*;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.websocketx.*;
import io.netty.util.CharsetUtil;
import java.util.concurrent.Callable;
/**
*
* 功能描述: 监听器
*
* @param:
* @return:
* @auther: csh
* @date: 2021/1/21 11:54
*/
public class WebSocketClientHandler extends SimpleChannelInboundHandler<Object> implements Callable {
/**
* 负责和服务器握手
*/
private final WebSocketClientHandshaker handshaker;
/**上下文 */
private ChannelHandlerContext context;
/**参数 */
private String param;
/**返回结果 */
private String result;
/**
* 握手结果
*/
private ChannelPromise handshakeFuture;
public WebSocketClientHandler(WebSocketClientHandshaker handshaker) {
this.handshaker = handshaker;
}
public ChannelFuture handshakeFuture() {
return handshakeFuture;
}
public String getParam() {
return param;
}
public void setParam(String param) {
this.param = param;
}
public String getResult() {
return result;
}
public void setResult(String result) {
this.result = result;
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
this.context = ctx;
handshaker.handshake(ctx.channel());
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println("连接断开");
}
/**
* 当前handler被添加到pipeline时,new出握手的结果示例,以备将来使用
*
* @param ctx
* @throws Exception
*/
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
handshakeFuture = ctx.newPromise();
}
@Override
public synchronized void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("channelRead method invoke");
result = msg.toString();
System.out.println("server message:"+result);
notify();
super.channelRead(ctx, msg);
}
/**
* 读取数据
*
* @param ctx
* @param msg
* @throws Exception
*/
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
Channel ch = ctx.channel();
//握手未完成,完成握手
if (!handshaker.isHandshakeComplete()) {
try {
handshaker.finishHandshake(ch, (FullHttpResponse) msg);
System.out.println("完成连接");
handshakeFuture.setSuccess();
} catch (WebSocketHandshakeException e) {
System.out.println("握手连接失败");
handshakeFuture.setFailure(e);
}
return;
}
//握手完成,升级为websocket,不应该再收到http报文
if (msg instanceof FullHttpResponse) {
FullHttpResponse response = (FullHttpResponse) msg;
throw new IllegalStateException(
"Unexpected FullHttpResponse (getStatus=" + response.status() +
", content=" + response.content().toString(CharsetUtil.UTF_8) + ')');
}
//处理websocket报文
WebSocketFrame frame = (WebSocketFrame) msg;
if (frame instanceof TextWebSocketFrame) {
TextWebSocketFrame textWebSocketFrame = (TextWebSocketFrame) frame;
System.out.println("客户端收到消息:" + textWebSocketFrame.text());
result = textWebSocketFrame.text();
} else if (frame instanceof PongWebSocketFrame) {
System.out.println("客户端收到pong");
} else if (frame instanceof CloseWebSocketFrame) {
System.out.println("客户端手动关闭");
ch.close();
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
if (!handshakeFuture.isDone()) {
handshakeFuture.setFailure(cause);
}
ctx.close();
}
@Override
public synchronized Object call() throws Exception {
System.out.println("call method invoke");
context.writeAndFlush(new TextWebSocketFrame(param)); //发送消息给服务端
System.out.println("send message :"+ param);
wait(); //等待 channelRead 获取到数据
return result;
}
}
com.hong.springboot.Application
package com.hong.springboot;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* @author: csh
* @Date: 2020/11/21 11:37
* @Description:springboot dubbo消费端
*/
@SpringBootApplication(scanBasePackages = "com.hong.springboot")
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class);
}
}
请求:http://localhost:8086/?msg=1
客服端收到如下:
服务端收到如下:
尝试RPC调用
参考
https://www.cnblogs.com/cxyxiaobao/p/11946015.html
发现springboot真香~~,相对于mvc来说真的是极大简化和快速....
最后
netty本身就不是一个rpc框架,只是一个集成通讯框架,并且各大rpc其实大部分底层都是基于netty来实现的,所以学习netty是一个相当重要的事情,让我们更清晰明白了解dubbo或其他基于netty实现的rpc框架,是如何实现的,但是本文相对来说比较浅,推荐阅读:《netty权威指南》《netty实战》《Netty、Redis、Zookeeper高并发实战》,当然如何想彻底深入了解建议从io、bio、nio、aio这样一层一层了解上来。毕竟东西有点多,而且有意向高并发高可用方向发展的同学,还需要了解线程相关的知识,netty只是在这些基础上实现而以..
参考文章:
https://tool.oschina.net/apidocs/apidoc?api=netty
https://netty.io/
(
本文单编辑就花了1.5个小时,写了1个来星期
,如果觉得不错请关注以下二维码~)
本人工作之余,长期在线答疑解惑(仅针对新手,高手请略过...)