netty中组件主要包括Channel、EventLoop、ChannelFuture、ChannelHandler、ChannelPipeline等。
Channel---Socket
EventLoop---控制流、多线程处理、并发
定义了netty的核心抽象,用于处理连接的生命周期中所发生的事件
ChannelFuture---异步通知
ChannelHandler是处理入站和出站的事件、数据的应用程序逻辑容器。其中ChannelInBoundHandler相当于接收入站事件和数据。当你要给连接的客户端响应时,也可以从ChannelInBoundHandler中冲刷数据。
ChannelPipeline为ChannelHandler链提供容器,当Channel被穿件时,会自动的被分配到它的专属ChannelPipeline中。其中ChannelHandler的执行顺序是由它们被添加的顺序所决定的。
实现代码
public void start() throws InterruptedException {
final EchoServerHandler serverHandler = new EchoServerHandler();
//创建EventLoopGroup,异步通知消息
EventLoopGroup group = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(group)
//指定nio-Channel(Socket)来传输小心
.channel(NioServerSocketChannel.class)
.localAddress(new InetSocketAddress(port))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(serverHandler);
}
});
//异步绑定服务器,调用sync知道sync方法直到成功绑定
ChannelFuture future = bootstrap.bind().sync();
//获取Channel的CloseFuture,并且阻塞当前线程知道他完成
future.channel().closeFuture().sync();
} finally {
group.shutdownGracefully().sync();
}
}
public void start()
throws Exception {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.remoteAddress(new InetSocketAddress(host, port))
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new EchoClientHandler());
}
});
//连接到远程节点,阻塞等待直到连接完成
ChannelFuture f = b.connect().sync();
//阻塞直到channel关闭
f.channel().closeFuture().sync();
} finally {
//关闭连接并且释放所有资源
group.shutdownGracefully().sync();
}
}
@ChannelHandler.Sharable
public class EchoClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
//通知channel是活跃状态时发送一条消息
ctx.writeAndFlush(Unpooled.copiedBuffer("Netty rocks!", CharsetUtil.UTF_8));
}
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) throws Exception {
//记录已经接受消息的转发
System.out.println("Client received: " + byteBuf.toString(CharsetUtil.UTF_8));
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
//发生异常记录并且关闭channel
cause.printStackTrace();
ctx.close();
}
}
/**
* @ChannelHandler.Sharable 一个Channel-Handler可以被多个Channel共享
*/
@ChannelHandler.Sharable
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
/**
* 对于每个传入的消息都要处理调用,将受到的消息写给发送者,而不冲刷出站消息,从socket中读取消息
* @param ctx
* @param msg
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf byteBuf = (ByteBuf) msg;
System.out.println("Server received" + byteBuf.toString(CharsetUtil.UTF_8));
ctx.write(byteBuf);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush(Unpooled.EMPTY_BUFFER)
.addListener(ChannelFutureListener.CLOSE);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}