前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >基于Netty实现的简单RPC调用

基于Netty实现的简单RPC调用

作者头像
用户1215919
发布2021-12-28 12:43:27
3450
发布2021-12-28 12:43:27
举报
文章被收录于专栏:大大的微笑大大的微笑

模块

rpc-api

rpc-consumer

rpc-provider

依赖:
代码语言:javascript
复制
<dependency>
	<groupId>io.netty</groupId>
	<artifactId>netty-all</artifactId>
	<version>4.1.25.Final</version>
 </dependency>
rpc-api代码
代码语言:javascript
复制
// 接口, consumer和provider分别添加api的依赖
public interface ISayHelloService {
    String say(String name);
}

// 请求参数封装
public class Request{
     private String methodName; // 方法名
    private String className;  // 全类名
    private Object[] values; // 实参列表
}
rpc-provider代码:
代码语言:javascript
复制
public class Server {
    private void start() throws InterruptedException {
       // 初始化
        ServerHandler h = new ServerHandler();
        h.init();
        
        EventLoopGroup boosGroup = new NioEventLoopGroup();
        EventLoopGroup workGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.channel(NioServerSocketChannel.class)
                    .group(boosGroup, workGroup)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel sc) throws Exception {
                            sc.pipeline()
                                    .addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4,
                                            0, 4))
                                    .addLast(new LengthFieldPrepender(4))
                                    .addLast("encoder", new ObjectEncoder())
                                    .addLast("decoder", new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null)))
                                    .addLast(h);
                        }
                    })
                    .option(ChannelOption.SO_BACKLOG, 10)
                    .childOption(ChannelOption.SO_KEEPALIVE, true);
            ChannelFuture future = bootstrap.bind(8080).sync();
            future.channel()
                    .closeFuture()
                    .sync();
            System.out.println("server running, listener port : 8080 !");
        } finally {
            boosGroup.shutdownGracefully();
            workGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) {
        try {
            new Server().start();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    
   // 具体业务处理
    
    public class ServerHandler extends ChannelInboundHandlerAdapter {
    private static String bashScanPackage = "com.data.service.impl";
    private ClassLoader classLoader = this.getClass().getClassLoader();
    private static final Map<String, Mapping> SERVICES = new HashMap<>();


    /**
     * 初始化
     *
     * @throws ClassNotFoundException
     */
    public void init() {
        URL url = classLoader.getResource(bashScanPackage.replaceAll("\\.", "/"));
        String filePath = url.getFile();
        File file = new File(filePath);
        for (String s : file.list()) {
            s = s.substring(0, s.indexOf("."));
            Class clazz = null;
            try {
                clazz = Class.forName(bashScanPackage + "." + s);
            } catch (ClassNotFoundException e) {
                e.printStackTrace();
            }
            Method[] methods = clazz.getDeclaredMethods();
            String interfaceName = clazz.getInterfaces()[0].getName();
            for (Method m : methods) {
                Mapping mapping = new Mapping();
                mapping.setMethod(m);
                mapping.setParameters(m.getParameterTypes());
                try {
                    mapping.setTarget(clazz.newInstance());
                } catch (InstantiationException e) {
                    e.printStackTrace();
                } catch (IllegalAccessException e) {
                    e.printStackTrace();
                }
                SERVICES.putIfAbsent(interfaceName + "." + m.getName(), mapping);
            }

        }
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        Request request = (Request) msg;
        Object result;
        String key = request.getClassName() + "." + request.getMethodName();
        if (!SERVICES.containsKey(key)) {
            return;
        }
        Mapping clazz = SERVICES.get(key);
        result = clazz.getMethod().invoke(clazz.getTarget(), request.getValues());
        ctx.write(result);
        ctx.flush();
        ctx.close();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        super.exceptionCaught(ctx, cause);
    }
}

// 业务提供方封装的参数
public class Mapping {
    private Object target;
    private Method method;
    private Class[] parameters;
}
rpc-consumer代码
代码语言:javascript
复制
// 动态代理类
public class ProxyHandler {
    public static <T> T create(Class<?> clazz) {
        MethodProxy proxy = new MethodProxy(clazz);
        Class<?>[] interfaces = clazz.isInterface() ?
                new Class[]{clazz} :
                clazz.getInterfaces();
        T result = (T) Proxy.newProxyInstance(clazz.getClassLoader(), interfaces, proxy);
        return result;
    }

    private static class MethodProxy implements InvocationHandler {
        private Class<?> clazz;

        public MethodProxy(Class<?> clazz) {
            this.clazz = clazz;
        }

        @Override
        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
            return invoke(method, args);
        }

        public Object invoke(Method method, Object[] args) {

            Request msg = new Request();
            msg.setClassName(this.clazz.getName());
            msg.setMethodName(method.getName());
            msg.setValues(args);
            msg.setParameters(method.getParameterTypes());
            EventLoopGroup group = new NioEventLoopGroup();
            RPCBusinessHandler handler = new RPCBusinessHandler();
            try {
                Bootstrap b = new Bootstrap();
                b.group(group)
                        .channel(NioSocketChannel.class)
                        .option(ChannelOption.TCP_NODELAY, true)
                        .handler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            public void initChannel(SocketChannel ch) throws Exception {
                                ChannelPipeline pipeline = ch.pipeline();
                                pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0,
                                        4, 0, 4));
                                pipeline.addLast("frameEncoder", new LengthFieldPrepender(4));
                                pipeline.addLast("encoder", new ObjectEncoder());
                                pipeline.addLast("decoder", new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null)));
                                pipeline.addLast("handler", handler);
                            }
                        });

                ChannelFuture future = b.connect("localhost", 8080).sync();
                future.channel().writeAndFlush(msg).sync();
                future.channel().closeFuture().sync();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                group.shutdownGracefully();
            }
            return handler.getResult();
        }
    }
    
// 客户端逻辑处理
public class RPCBusinessHandler extends ChannelInboundHandlerAdapter {
    private Object result;
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        this.result = msg;
    }



    public Object getResult(){
        return this.result;
    }    
    
    
// 测试
 ISayHelloService service = new ProxyHandler().create(ISayHelloService.class);
 System.out.println(service.say("tony"));    
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 依赖:
  • rpc-api代码
  • rpc-provider代码:
  • rpc-consumer代码
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档