前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >利用动态代理&反射&socket实现简单的RPC通信

利用动态代理&反射&socket实现简单的RPC通信

作者头像
天策
发布2018-06-22 14:02:21
8280
发布2018-06-22 14:02:21
举报
文章被收录于专栏:行者悟空行者悟空

摘 要

利用动态代理&反射&socket实现简单的RPC通信

概述

RPC(Remote Procedure Call Protocol)——远程过程调用协议,它是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议。RPC跨越了传输层和应用层,RPC使得开发包括网络分布式多程序在内的应用程序更加容易。

现在RPC通信在内部分布式集群环境中已经很常见了。现在的开源的分布式框架已经提供了相应的实现,但仅停留在用的层面是远远不够的,这不符合一只码畜的追求。所以为了弄清RPC到底是个啥玩意,就查阅了部分资料,并针对其所述实现了一版最基础的RPC。

实现思路

大体思路是这样的:

首先Consumer通过JDK动态代理的机制去创建socket,让socket连接Producer的SocketServer,内部利用ObjectOutputStream将请求信息(接口信息,方法,参数)封装,通过socket传输。

其次Producer接到ObjectInputStream,将信息拆包(接口信息,方法,参数)。利用反射将接口实现类实例化(这就是为什么RPC框架客户端和服务端都需要有一致的接口类)。

最后Producer利用反射将业务处理完毕后,用ObjectOutputStream将结果封装,通过socket返回数据。Consumer接收到返回数据。

具体代码实现
//RPCServer实现
RPCServer.java
package com.itunic.rpc;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.Method;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * 
 * RPC服务端<br>
 * 基于SocketServer&反射实现
 * 
 * @ClassName RPCServer
 * @author yinbin
 * @website https://itunic.com
 * @Date 2017年6月23日 上午10:59:50
 * @version 1.0.0
 */
public class RPCServer {
 private static final Map<String, Class<?>> serviceMap = new ConcurrentHashMap<String, Class<?>>();
 private static final ExecutorService exec = Executors
            .newFixedThreadPool(Runtime.getRuntime().availableProcessors());
 private Integer port;
 private Boolean isRunning = true;

 public RPCServer(int port) {
 this.port = port;
    }

 public void register(Class<?> serviceInterface, Class<?> impl) {
        serviceMap.put(serviceInterface.getName(), impl);
    }

 public void start() throws IOException {
        ServerSocket ss = new ServerSocket();
 try {
            ss.bind(new InetSocketAddress(port));
 while (isRunning) {
                exec.execute(new ServiceTask(ss.accept()));
            }
        } finally {
            ss.close();
        }
    }

 private class ServiceTask implements Runnable {
        Socket socket = null;

 public ServiceTask(Socket socket) {
 this.socket = socket;
        }

 @Override
 public void run() {
            ObjectInputStream input = null;
            ObjectOutputStream output = null;
 try {
                input = new ObjectInputStream(socket.getInputStream());
                String serviceName = input.readUTF();
                String methodName = input.readUTF();
                Class<?>[] parameterTypes = (Class<?>[]) input.readObject();
                Object[] arguments = (Object[]) input.readObject();
                Class<?> serviceClass = serviceMap.get(serviceName);
 if (serviceClass == null) {
 throw new ClassNotFoundException(serviceName + " not found");
                }
                Method method = serviceClass.getMethod(methodName, parameterTypes);
                Object result = method.invoke(serviceClass.newInstance(), arguments);
                output = new ObjectOutputStream(socket.getOutputStream());
                output.writeObject(result);
                output.flush();
            } catch (Exception e) {
 // TODO 自动生成的 catch 块
                e.printStackTrace();
            } finally {
 if (null != output) {
 try {
                        output.close();
                    } catch (IOException e) {
 // TODO 自动生成的 catch 块
                        e.printStackTrace();
                    }
                }
 if (input != null) {
 try {
                        input.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
 if (socket != null) {
 try {
                        socket.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
        }

    }
}
//RPCClient实现
RPCClient.java
package com.itunic.rpc;

import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.net.InetSocketAddress;
import java.net.Socket;

/**
 * 
 * RPC客户端 基于动态代理&socket实现
 * 
 * @ClassName RPCClient
 * @author yinbin
 * @website https://itunic.com
 * @Date 2017年6月23日 上午10:59:04
 * @version 1.0.0
 */
public class RPCClient {
 @SuppressWarnings("unchecked")
 public static <T> T getRemoteProxyObj(final Class<?> serviceInterface, final InetSocketAddress addr) {
 return (T) Proxy.newProxyInstance(serviceInterface.getClassLoader(), new Class<?>[] { serviceInterface },
 new InvocationHandler() {

 @Override
 public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
                        Socket socket = null;
                        ObjectOutputStream output = null;
                        ObjectInputStream input = null;
 try {
                            socket = new Socket();
                            socket.connect(addr);
                            output = new ObjectOutputStream(socket.getOutputStream());
                            output.writeUTF(serviceInterface.getName());
                            output.writeUTF(method.getName());
                            output.writeObject(method.getParameterTypes());
                            output.writeObject(args);
                            input = new ObjectInputStream(socket.getInputStream());

 return input.readObject();
                        } finally {
 if (socket != null)
                                socket.close();
 if (output != null)
                                output.close();
 if (input != null)
                                input.close();
                        }
                    }
                });
    }
}
package com.itunic.rpc;

import java.io.IOException;

/**
 * 
 *  RPC服务端启动
 * @ClassName RPCServerAction
 * @author c
 * @Date 2017年6月23日 下午5:12:27
 * @version 1.0.0
 */
public class RPCServerAction {
 public static void main(String[] args) throws IOException {
        RPCServer server = new RPCServer(8888);
 /**
         * 注册相关的接口实现类
         */
        server.register(TestHello.class, TestHelloImpl.class);
        server.start();
    }
}
总结

RPC原理还是挺简单的,但可以有更好的实现。例如:socket用高并发框架netty替代,支持更多协议等。

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2017年06月23日,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 概述
  • 实现思路
  • 具体代码实现
  • 总结
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档