前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >简单实现通过netty通信,后续提供基于protobuf传输协议的rpc框架

简单实现通过netty通信,后续提供基于protobuf传输协议的rpc框架

作者头像
gfu
发布2019-09-29 16:35:34
5750
发布2019-09-29 16:35:34
举报
文章被收录于专栏:gfugfu

后续也会提供service-mesh简单的代码实现 netty通信和socket通信大致是类似的,在socket的基础上对其进行封装,当然你也可以实现netty功能,但是我给你一句话。

package org.gfu.base.netty;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.CharsetUtil;

import java.util.Date;


/**
 * netty client
 *
 * @author 719383495@qq.com |719383495qq@gmail.com |gfu
 * @date 2019/9/27
 */
class NettyClient {

    private String host;
    private int port;
    private String jsonStr;


    NettyClient(String host, int port) {
        this.host = host;
        this.port = port;
    }

    NettyClient setMessage(String jsonStr) {
        this.jsonStr = jsonStr;
        return this;
    }

    void run() {
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(workerGroup)
                    .channel(NioSocketChannel.class)
                    .option(ChannelOption.SO_KEEPALIVE, true)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new StringEncoder(CharsetUtil.UTF_8));
                            ch.pipeline().addLast(new StringDecoder(CharsetUtil.UTF_8));
                            ch.pipeline().addLast(new NettyClientHandler(jsonStr));
                        }
                    });
            ChannelFuture channelFuture = bootstrap.connect(host, port).addListener(
                    f -> {
                        if (f.isSuccess()) {
                            System.out.println("连接成功:" + host + ":" + port);
                        } else {
                            System.out.println(new Date() + "-- 连接失败:" + host + ":" + port);
                        }
                    }
            ).sync();
            channelFuture.channel().closeFuture().sync();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            workerGroup.shutdownGracefully();
        }
    }

}
package org.gfu.base.netty;

import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.SimpleChannelInboundHandler;

import java.util.Scanner;

/**
 * netty server handler
 *
 * @author 719383495@qq.com |719383495qq@gmail.com |gfu
 * @date 2019/9/27
 */
public class NettyClientHandler extends ChannelInboundHandlerAdapter {

    private ChannelHandlerContext ctx;
    private String jsonStr;

    public NettyClientHandler(String jsonStr) {
        this.jsonStr = jsonStr;
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        this.ctx = ctx;
        ctx.writeAndFlush(jsonStr);
    }
}
package org.gfu.base.netty;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.sctp.nio.NioSctpServerChannel;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.CharsetUtil;

import java.util.Date;

/**
 * netty server
 *
 * @author 719383495@qq.com |719383495qq@gmail.com |gfu
 * @date 2019/9/27
 */
class NettyServer {

    private String host;
    private int port;

    NettyServer(String host, int port) {
        this.host = host;
        this.port = port;
    }

    void run() {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new StringEncoder(CharsetUtil.UTF_8));
                            ch.pipeline().addLast(new StringDecoder(CharsetUtil.UTF_8));
                            ch.pipeline().addLast(new NettyServerHandler());
                        }
                    }).option(ChannelOption.SO_BACKLOG, 128)
                    .childOption(ChannelOption.SO_KEEPALIVE, true);
            ChannelFuture channelFuture = serverBootstrap.bind(host, port).
                    addListener(f -> {
                        if (f.isSuccess()) {
                            System.out.println("绑定成功:" + host + ":" + port);
                        } else {
                            System.out.println(new Date() + "--绑定失败:" + host + ":" + port);
                        }
                    }).sync();
            channelFuture.channel().closeFuture().sync();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}
package org.gfu.base.netty;

import io.netty.buffer.ByteBuf;
import io.netty.channel.*;

/**
 * netty server handler
 *
 * @author 719383495@qq.com |719383495qq@gmail.com |gfu
 * @date 2019/9/27
 */
public class NettyServerHandler extends ChannelInboundHandlerAdapter {

    private String msg;

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println(msg);
        this.msg = msg.toString();
        this.channelActive(ctx);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        super.exceptionCaught(ctx, cause);
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println(msg);
        ctx.writeAndFlush(msg + "server accept success");
    }
}
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2019.09.28 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档