【导读】我在两年前的时候就购买了《Netty权威指南》看了一下,不过没看懂哈哈哈哈,工作中也用不到,很快就忘了,直到前段时间在dy那边需要我重构一个TCP连接通信工具(半个月时间只给它搭了一个架子),所以最近也重新翻开书进行学习。Netty是最流行的NIO框架之一,其健壮性、功能、性能、可定制性和可扩展性都是很优秀的。Netty是Hadoop生态中RPC框架Avro以及Dubbo底层的通信框架。今儿就来聊一下其的简单使用。
一、Netty逻辑架构
采用三层网络架构进行设计和开发,架构如图:
1、Reactor通信调度层
监听网络的读写和连接请求,负责将网络层的数据读取到内存缓冲区中,然后触发各种网络事件,例如连接创建、连接激活、读写事件等,然后将这些事件触发到Pipeline中,由Pipeline管理的职责链进行后续处理。
什么是反应堆Reactor:
BIO模型在并发高的情况下,会创建大量线程,性能很低并可能导致服务宕机。
NIO工作原理:
(1)由一个专门的线程处理所有的IO事件,并负责分发,也就是Selector
(2)事件驱动机制:事件到的时候触发,而不是同步的去监视事件
(3)线程通信:线程之间通过wait/notify等方式进行通信,减少无谓的线程切换
2、职责链Pipeline
负责事件在职责链之间的有序传播,动态的编排职责链。可选择监听和处理特定的事件,拦截处理事件,比如对消息的编解码。使用的是设计模式中的责任链模式。
3、业务逻辑编排层Service ChannelHandler
分成两类:一类是纯粹的业务逻辑编排,另一类是应用层协议插件,用于特定协议的会话和链路管理。比如对消息进行编解码,协议不同,编解码步骤也不一样,例如Http和WebSocket
二、用Netty实现简易聊天窗口
在上一篇用JavaNIO写了一个例子,Java原生的NIO开发需要手动判断事件的类型才能做下一步处理,相比于原生Java NIO的开发,Netty的开发会简单很多,不需要开发者关注当前事件是什么类型的事件,只需在相应的方法里关注业务逻辑的处理等。
1、环境准备:引入Netty依赖
2、服务端开发
public class Server { private int port; public Server(int port) { this.port = port; } public void start(){ //配置服务端NIO线程 //接收客户端连接,事件分发线程池 NioEventLoopGroup boss = null; //处理读写事件线程池 NioEventLoopGroup workder = null; try { boss = new NioEventLoopGroup(); workder = new NioEventLoopGroup();
//创建服务端,相当于NIO的ServerSocketChannel ServerBootstrap server = new ServerBootstrap(); server.group(boss,workder) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); //绑定IO事件处理类 pipeline.addLast(new ServerHandler()); } }) //最大客户端连接数 .option(ChannelOption.SO_BACKLOG, 128) //与客户端保持长链接 .childOption(ChannelOption.SO_KEEPALIVE, true); //绑定端口,同步等待 ChannelFuture future = server.bind(this.port).sync(); System.out.println("服务端已启动,端口为:"+port);
//等待服务器监听端口关闭 future.channel().closeFuture().sync();
} catch (Exception e) {
} finally { //优雅退出,释放线程池资源 boss.shutdownGracefully(); workder.shutdownGracefully(); } } public static void main(String[] args) { new Server(8080).start(); }}
其中在childHandler中是绑定了IO事件的处理类,作用类似于Reactor模式中的Handler类,主要用于处理网络IO事件,例如消息编解码等,我这边主要是用于演示Netty使用,所以未做编解码的处理。
在这边主要用于对聊天室信息发送的处理,看一下其编码:
public class ServerHandler extends ChannelHandlerAdapter {
//存放所有Channel private static final ChannelGroup onLineUsers = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
//定义channel的私有名称属性,用于区分channel AttributeKey<String> name = AttributeKey.valueOf("name");
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { //将消息转换成ByteBuf,对应于JavaNIO的ByteBuffer ByteBuf buf = (ByteBuf) msg; byte[] bytes = new byte[buf.readableBytes()]; buf.readBytes(bytes); String body = new String(bytes, "UTF-8"); Channel channel = ctx.channel();
String message = null; //如果是新建连接 if (!onLineUsers.contains(channel)) { channel.attr(name).getAndSet(body); onLineUsers.add(channel); message = body+"在"+getCurrentTime()+"进入聊天室,当前聊天室人数为:"+onLineUsers.size(); System.out.println(message); for (Channel user : onLineUsers) { if (user!=channel) { //发送给聊天室的其他人 ByteBuf resp = Unpooled.copiedBuffer(message.getBytes()); user.writeAndFlush(resp); } else { //发送给自己 ByteBuf resp = Unpooled.copiedBuffer("您已进入聊天室".getBytes()); user.writeAndFlush(resp); } } } else { //将发送的消息发给其他人 message = "在"+getCurrentTime()+"发送了一条消息,内容:" + body; for (Channel user : onLineUsers) { if (user!=channel) { ByteBuf resp = Unpooled.copiedBuffer((channel.attr(name).get()+message).getBytes()); user.writeAndFlush(resp); } } } }
public static String getCurrentTime(){ SimpleDateFormat format = new SimpleDateFormat("YYYY-MM-dd HH:mm:ss"); String time = format.format(new Date()); return time; }
}
需要继承ChannelHandlerAdapter类重写ChannelRead方法,用于读取客户端消息并进行处理,利用ChannelGroup存放所有channel,也就是聊天室的人,利用AttributeKey定义channel的私有名称属性,用于区分channel。
3、客户端开发
public class Client {
public static void main(String[] args) { //监听控制台输入,将输入的字符用于在服务端定义当前连接的名字 Scanner scanner = new Scanner(System.in); new Client().start(scanner.nextLine()); }
private void start(final String name) { //用于IO读写的线程池 NioEventLoopGroup group = new NioEventLoopGroup(); try { //相当于NIO的SocketChannel Bootstrap client = new Bootstrap(); client.group(group) .option(ChannelOption.TCP_NODELAY,true) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { //绑定IO读写事件的处理类 ch.pipeline().addLast(new ClientHandler(name)); } }); //发起连接 ChannelFuture cf = client.connect(new InetSocketAddress(8080)).sync(); //等待关闭 cf.channel().closeFuture().sync(); } catch (Exception e) { } finally { group.shutdownGracefully(); } }
}
I/O事件处理类:
public class ClientHandler extends ChannelHandlerAdapter{
private String name;
public ClientHandler(String name) { this.name = name; }
/** * 连接成功后触发此方法 * @param ctx * @throws Exception */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { //将第一次输入的名称发送给客户端 String order = name; ByteBuf buf = Unpooled.buffer(order.length()); buf.writeBytes(order.getBytes()); ctx.writeAndFlush(buf); //另起一个线程监听控制台输入,如果不添加线程,此客户端会阻塞在此 new Thread(new Runnable() { @Override public void run() { Scanner scanner = new Scanner(System.in); while (scanner.hasNextLine()) { String s = scanner.nextLine(); ByteBuf buffer = Unpooled.buffer(s.length()); buffer.writeBytes(s.getBytes()); ctx.writeAndFlush(buffer); } } }).start();
}
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { //接收服务端返回的消息 ByteBuf buf=(ByteBuf) msg; byte[] bytes=new byte[buf.readableBytes()]; buf.readBytes(bytes); String txt=new String(bytes,"UTF-8"); System.out.println("【"+name+"】接收到信息:"+txt); }
}
4、启动
(1)启动服务端
(2)启动两个客户端,名称分别是张三、李四
张三:
此时服务端会输出信息:
李四:
此时服务端会输出信息:
用张三的窗口发送一条消息给李四:
李四会受到消息:
当一个客户端发消息的时候,其他在群里面的人都会受到消息。
=============================
以上就是Netty的简略使用,更多敬请关注之后几篇,一起学习。