在上一篇文章讲到Dotnetty的基本认识,本文这次会讲解dotnetty非常核心的模块是属于比较硬核的干货了,然后继续往下讲解如何根据自己的需求或者自己的喜好去配置Dotnetty而不是生搬硬套官网的示例源码。如果看了本文有收获的话麻烦关注一下文章尾部的公众号和技术讨论群。各位的支持是对我莫大的帮助。
主要讲解一下几个知识点:
Group :设置线程组模型,Reactor线程模型对比EventLoopGroup
Channel:设置channel通道类型NioServerSocketChannel、OioServerSocketChannel
Option: 作用于每个新建立的channel,设置TCP连接中的一些参数,如下:
Channel: 客户端和服务端建立的一个连接通道(可以理解为一个channel就是一个socket连接) ChannelHandler:负责Channel的逻辑处理 ChannelPipeline: 负责管理ChannelHandler的有序容器
关系: 一个Channel包含一个ChannelPipeline,所有ChannelHandler都会顺序加入到ChannelPipeline中 创建 Channel时会自动创建一个ChannelPipeline,每个Channel都有一个管理它的pipeline,这关联是永久 性的Channel当状态出现变化,就会触发对应的事件。
生命周期:
ChannelHandlerContext是连接ChannelHandler和ChannelPipeline的桥梁,ChannelHandlerContext部分方法和Channel及ChannelPipeline重合。
一般的项目中,inboundHandler和outboundHandler有多个,在Pipeline中的执行顺序?
InboundHandler顺序执行,OutboundHandler逆序执行
以上概念性的东西介绍完了之后开始编写本章实战代码(完整的案例代码将在qq群文件共享里上传,文章末尾有QQ群二维码和联系方式)。接下来我们先看一下项目结构。
Handlers - 主要存放所有处理相关类。
Initializer - 存放初始化tcp服务的相关内容。
appsetting.json - 主要存放的内容为,服务端的相关配置例如:ip地址、端口号等。
dotnetty - 安全证书
Program - 启动类
项目结构介绍完毕之后,我大致将这个demo分为5个部分来实现具体根据自己需求去设计搭建结构都是可以的,这里的内容仅供参考。
1 //主要工作组,设置为2个线程
2 private static readonly IEventLoopGroup bossGroup = new MultithreadEventLoopGroup(4);
3 //子工作组,默认为内核数*2的线程数
4 private static readonly IEventLoopGroup workerGroup = new MultithreadEventLoopGroup();
5
6 static async Task RunAsync() {
7 /*
8 *初始化服务端引导对象。
9 *声明一个服务端Bootstrap,每个Netty服务端程序,都由ServerBootstrap控制,
10 *通过链式的方式组装需要的参数
11 */
12 ServerBootstrap bootstrap = new ServerBootstrap();
13 //添加工作组
14 bootstrap.Group(bossGroup, workerGroup);
15 //初始化工作频道
16 bootstrap.Channel<TcpServerSocketChannel>();
17 bootstrap
18 //存放已完成三次握手的请求的等待队列的最大长度;
19 .Option(ChannelOption.SoBacklog, 1024)
20 //ByteBuf的分配器(重用缓冲区)大小
21 .Option(ChannelOption.Allocator, UnpooledByteBufferAllocator.Default)
22 //接收字符的长度
23 .Option(ChannelOption.RcvbufAllocator, new FixedRecvByteBufAllocator(1024 * 8))
24 //保持长连接
25 .ChildOption(ChannelOption.SoKeepalive, true)
26 //取消延迟发送
27 .ChildOption(ChannelOption.TcpNodelay, true)
28 //端口复用
29 .ChildOption(ChannelOption.SoReuseport, true)
30 //初始化日志拦截器,可以不加
31 .Handler(new LoggingHandler("SRV-LSTN"))
32 //自定义初始化Tcp服务
33 .ChildHandler(new EchoServerInitializer());
34
35 //绑定服务端,端口号。IP地址默认读取项目配置文件。
36 await bootstrap.BindAsync(ServerSettings.Port);
37 }
1 /// <summary>
2 /// 初始化
3 /// </summary>
4 public class EchoServerInitializer : ChannelInitializer<ISocketChannel>
5 {
6 /// <summary>
7 /// No interaction time.300s
8 /// </summary>
9 public const int AllTimeOut = 60 * 5;
10
11 /// <summary>
12 /// Read Time Out.60s
13 /// </summary>
14 public const int ReadTimeOut = 60;
15
16 /// <summary>
17 /// Recive Time Out.60s
18 /// </summary>
19 public const int WriterTimeOut = 60;
20
21 protected override void InitChannel(ISocketChannel channel)
22 {
23 /*
24 * 工作线程连接器是设置了一个频道,服务端主线程所有接收到的信息都会通过这个管道一层层往下传输
25 * 同时所有出栈的消息 也要这个频道的所有处理器进行一步步处理
26 */
27 IChannelPipeline pipeline = channel.Pipeline;
28 //初始化Dotnetty日志拦截器
29 pipeline.AddLast(new LoggingHandler("SRV-CONN"));
30 //心跳超时时间配置
31 pipeline.AddLast(new IdleStateHandler(
32 ReadTimeOut,
33 WriterTimeOut,
34 AllTimeOut));
35 //消息内容编码逻辑处理类
36 pipeline.AddLast("encoder", new EncoderHandler());
37 //解码逻辑处理类
38 pipeline.AddLast("decoder", new DecoderHandler());
39 //心跳逻辑处理
40 pipeline.AddLast(new HeartBeatHandler());
41 //每个频道请求消息处理类
42 pipeline.AddLast(new ServerHandler());
43 }
44 }
1 public class HeartBeatHandler : ChannelHandlerAdapter
2 {
3 /// <summary>
4 /// 每个频道都有自己的心跳管理,如果频道长时间不操作踢掉线的逻辑可以写在这里
5 /// </summary>
6 /// <param name="context"></param>
7 /// <param name="evt"></param>
8 public override void UserEventTriggered(IChannelHandlerContext context, object evt)
9 {
10 var eventState = evt as IdleStateEvent;
11 if (eventState != null)
12 {
13 String type = string.Empty;
14 if (eventState.State == IdleState.ReaderIdle)
15 {
16 type = "read idle";//没有任何接受
17 }
18 else if (eventState.State == IdleState.WriterIdle)
19 {
20 type = "write idle";//没有任何写入
21 }
22 else if (eventState.State == IdleState.AllIdle)
23 {
24 type = "all idle";
25 context.CloseAsync();//5分钟内无任何交互则断开该客户端连接
26 }
27 }
28 else
29 {
30 base.UserEventTriggered(context, evt);
31 }
32 }
33 }
1 /// <summary>
2 /// 解码
3 /// </summary>
4 public class DecoderHandler : ByteToMessageDecoder
5 {
6 protected override void Decode(IChannelHandlerContext context, IByteBuffer input, List<object> output)
7 {
8 throw new NotImplementedException();
9 }
10 }
11
12
13
14 public class EncoderHandler : MessageToByteEncoder<byte[]>
15 {
16 /// <summary>
17 /// 编码
18 /// </summary>
19 /// <param name="context"></param>
20 /// <param name="message"></param>
21 /// <param name="output"></param>
22 protected override void Encode(IChannelHandlerContext context, byte[] message, IByteBuffer output)
23 {
24 throw new NotImplementedException();
25 }
26 }
1 public class ServerHandler : ChannelHandlerAdapter
2 {
3 /*
4 * Channel的生命周期
5 * 1.ChannelRegistered 先注册
6 * 2.ChannelActive 再被激活
7 * 3.ChannelRead 客户端与服务端建立连接之后的会话(数据交互)
8 * 4.ChannelReadComplete 读取客户端发送的消息完成之后
9 * error. ExceptionCaught 如果在会话过程当中出现dotnetty框架内部异常都会通过Caught方法返回给开发者
10 * 5.ChannelInactive 使当前频道处于未激活状态
11 * 6.ChannelUnregistered 取消注册
12 */
13
14 /// <summary>
15 /// 频道注册
16 /// </summary>
17 /// <param name="context"></param>
18 public override void ChannelRegistered(IChannelHandlerContext context)
19 {
20 base.ChannelRegistered(context);
21 }
22
23 /// <summary>
24 /// socket client 连接到服务端的时候channel被激活的回调函数
25 /// </summary>
26 /// <param name="context"></param>
27 public override void ChannelActive(IChannelHandlerContext context)
28 {
29 //一般可用来记录连接对象信息
30 base.ChannelActive(context);
31 }
32
33 /// <summary>
34 /// socket接收消息方法具体的实现
35 /// </summary>
36 /// <param name="context">当前频道的句柄,可使用发送和接收方法</param>
37 /// <param name="message">接收到的客户端发送的内容</param>
38 public override void ChannelRead(IChannelHandlerContext context, object message)
39 {
40 var buffer = message as IByteBuffer;
41 if (buffer != null)
42 {
43 Console.WriteLine("Received from client: " + buffer.ToString(Encoding.UTF8));
44 }
45 context.WriteAsync(message);//发送给客户端方法
46 }
47
48 /// <summary>
49 /// 该次会话读取完成后回调函数
50 /// </summary>
51 /// <param name="context"></param>
52 public override void ChannelReadComplete(IChannelHandlerContext context) => context.Flush();//
53
54 /// <summary>
55 /// 异常捕获
56 /// </summary>
57 /// <param name="context"></param>
58 /// <param name="exception"></param>
59 public override void ExceptionCaught(IChannelHandlerContext context, Exception exception)
60 {
61 Console.WriteLine("Exception: " + exception);
62 context.CloseAsync();
63 }
64
65 /// <summary>
66 /// 当前频道未激活状态
67 /// </summary>
68 /// <param name="context"></param>
69 public override void ChannelInactive(IChannelHandlerContext context)
70 {
71 base.ChannelInactive(context);
72 }
73
74 /// <summary>
75 /// 取消注册当前频道,可理解为销毁当前频道
76 /// </summary>
77 /// <param name="context"></param>
78 public override void ChannelUnregistered(IChannelHandlerContext context)
79 {
80 base.ChannelUnregistered(context);
81 }
82 }