本篇文章,我们一起来实现一个基于Netty的简单多人聊天示例程序,目的是帮助大家了解Netty在Socket通信方面的典型应用场景。
先来简单介绍一下本示例服务端的基本功能点和执行流程:
1.服务端负责记录客户端的上线、下线信息;
2.服务端负责接收多个客户端的消息内容,保存与客户端的链接信息,并将消息内容广播到所有客户端;
3.多个客户端可以同时接收到某一客户端发出的消息内容(类似QQ群);
4.每个客户端可以接收到其他客户端的加入、离开的信息;
了解了程序的大体功能点之后,接下来就需要考虑具体的实现了。还记得上一篇文章中我们编写的HttpServerHandler类吗?它实现了ChannelHandler接口,这个接口是Netty中极为重要的组件,我们的业务逻辑往往要写在ChannelHandler接口的实现类中,本示例中我们的服务端声明一个ChatServerHandler类,用来实现之前提到的基本功能点。
首先,我们定义ChatServerHandler类,此类同样需要继承SimpleChannelInboundHandler(注意范型String表示服务端接收到的消息类型),由于服务端在接收到某一客户端发送过来的消息后,需要将消息向其他在线的客户端广播,因此ChatServerHandler类中需要包含一个实例变量用于存放所有的链接信息,这个变量需要用到Netty提供的组件ChannelGroup,ChannelGroup是一个接口,创建其实例的代码如下:
//服务端用于存放与多个客户端之间的socket链接
private staticChannelGroupchannelGroup=newDefaultChannelGroup(GlobalEventExecutor.INSTANCE);
还记得上篇文章中我们提到的“事件驱动”吗,当客户端链接到服务端时,会产生handlerAdded事件,我们在ChatServerHandler类中添加handlerAdded的事件回调方法:
//客户端与服务端建立好链接时会触发此方法
@Override
public voidhandlerAdded(ChannelHandlerContext ctx)throwsException {
Channel channel = ctx.channel();
//channelGroup的writeAndFlush方法会调用其所包含的所有的channel的writeAndFlush方法
channelGroup.writeAndFlush("【服务器】- "+ channel.remoteAddress() +" 加入\n");
channelGroup.add(channel);//将客户端链接存放到channelGroup中
}
这段代码的作用是服务端将新接入的链接存放到channelGroup中,向所有客户端广播新链接到的客户端信息并在控制台打印链接数。
当客户端与服务端的链接处于活动状态时,会触发channelActive事件,我们在ChatServerHandler添加channelActive事件的回调方法:
//客户端与服务端建立好链接,链接处于活动状态时触发此方法
@Override
public voidchannelActive(ChannelHandlerContext ctx)throwsException {
Channel channel = ctx.channel();
}
此方法表示当有客户端链接到服务端时,服务端会在控制台打印出远程客户端的ip地址:端口 + "上线"(需要强调的是,回调方法的执行时机是handlerAdded方法早于channelActive方法)。
当服务端接收到客户端发来的聊天消息后,会触发channelRead0方法的执行,我们在这个回调方法中处理接收到的消息:
//参数msg表示客户端的消息内容,此类型由SimpleChannelInboundHandler的范型决定
@Override
protected voidchannelRead0(ChannelHandlerContext ctx, String msg)throwsException {
Channel channel = ctx.channel();
Iterator iterator =channelGroup.iterator();
while(iterator.hasNext()) {
Channel ch = iterator.next();
if(channel != ch) {
ch.writeAndFlush(channel.remoteAddress() +" 发送的消息:"+ msg +"\n");
}else{
ch.writeAndFlush("【自己】"+ msg +"\n");
}
}
}
此方法为本示例的重点方法,请注意,当有多个客户端链接到服务端时,handlerAdded方法将会被调用多次,每次调用都会分别将链接存放在channelGroup中,通过迭代channelGroup可以获取到所有客户端的链接。例如:有一个客户端向服务端发送了“你好”这样一个消息,服务端会读到此消息,并通过迭代channelGroup获取到其他客户端的链接,并通过channel的writeAndFlush方法将“你好”这条消息消息分别写到不同的客户端,从而实现群聊的效果。
客户端与服务端的链接失效时,会触发channelInactive事件,我们在ChatServerHandler中添加channelInactive事件的回调方法,当客户端与服务端的链接断开时,服务端在控制台上打印ip地址:端口 + "下线"。
@Override
public voidchannelInactive(ChannelHandlerContext ctx)throwsException {
Channel channel = ctx.channel();
}
当客户端与服务端的链接断开后,会触发handlerRemoved事件,我们在ChatServerHandler类中添加handlerRemoved事件的回调方法:
//链接彻底断开时触发此方法
@Override
public voidhandlerRemoved(ChannelHandlerContext ctx)throwsException {
Channel channel = ctx.channel();
channelGroup.writeAndFlush("【服务器】- "+ channel.remoteAddress() +" 离开\n");
//channelGroup自动移除断掉的channel
}
另外还有一个事件回调方法需要注意,那就是异常捕获方法,当ChatServerHandler类中的方法产生异常时,exceptionCaught事件将被触发:
@Override
public voidexceptionCaught(ChannelHandlerContext ctx, Throwable cause)throwsException {
cause.printStackTrace();
ctx.close();
}
通常情况下这个方法需要通过ctx.close()方法关闭网络连接。至此,我们ChatServerHandler类就编写完成了(写法可以参考上一篇文章中的HttpServerHandler类的实现方式)。
接下来我们定义ChannelHandler的初始化容器ChannelInitializer类的子类ChatServerInitializer。如果记不清ChannelInitializer的作用,请阅读上一篇文章。
public classChatServerInitializerextendsChannelInitializer {
@Override
protected voidinitChannel(SocketChannel ch)throwsException {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(newDelimiterBasedFrameDecoder(4096, Delimiters.lineDelimiter()));
pipeline.addLast(newStringDecoder(CharsetUtil.UTF_8));
pipeline.addLast(newStringEncoder(CharsetUtil.UTF_8));
pipeline.addLast(newChatServerHandler());
}
}
与我们上一篇定义的HttpServerInitializer不同之处仅在于我们使用了不同的编解码器,DelimiterBasedFrameDecoder解码器会按照指定的分隔符如"\r"或"\n"来判断是否接收了一条完整消息,StringDecoder和StringEncoder理解起来相对容易些,本示例中通过指定utf-8字符集来对消息内容进行编解码(channelRead0方法的第二个参数类型是String,是实际消息内容,这些内容经过了StringDecoder解码器处理,当消息从服务端广播到客户端时,会通过StringEncoder编码器处理),最后将我们定义的ChatServerHandler类也加入到ChannelPipeline中。
public classChatServer {
public static voidmain(String[] args)throwsException {
EventLoopGroup bossGroup =newNioEventLoopGroup();
EventLoopGroup workerGroup =newNioEventLoopGroup();
try{
ServerBootstrap serverBootstrap =newServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).
childHandler(newChatServerInitializer());
ChannelFuture channelFuture = serverBootstrap.bind(8899).sync();
channelFuture.channel().closeFuture().sync();
}finally{
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
至此,服务端的代码全部编写完毕,总结起来您可能会发现,服务端的启动类ChatServer和ChannelHandler的初始化器ChatServerInitializer同上一篇文章中的对应组件几乎没有变化,变化最大的是我们自定义的处理器类,本示例我们在处理器类ChatServerHandler中引入了若干个回调方法,用于响应不同的网络事件,将这些事件进行有机结合,进而实现了聊天的业务逻辑,在下一篇文章中我们将介绍如何编写聊天程序客户端的功能。也许此刻您依然对Channel,ChannelHandler,ChannelInitializer这些组件的逻辑关系存在迷惑,先不要着急,慢慢来,我们下篇文章见。
领取专属 10元无门槛券
私享最新 技术干货