专栏首页行者悟空利用动态代理&反射&socket实现简单的RPC通信

利用动态代理&反射&socket实现简单的RPC通信

摘 要

利用动态代理&反射&socket实现简单的RPC通信

概述

RPC(Remote Procedure Call Protocol)——远程过程调用协议,它是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议。RPC跨越了传输层和应用层,RPC使得开发包括网络分布式多程序在内的应用程序更加容易。

现在RPC通信在内部分布式集群环境中已经很常见了。现在的开源的分布式框架已经提供了相应的实现,但仅停留在用的层面是远远不够的,这不符合一只码畜的追求。所以为了弄清RPC到底是个啥玩意,就查阅了部分资料,并针对其所述实现了一版最基础的RPC。

实现思路

大体思路是这样的:

首先Consumer通过JDK动态代理的机制去创建socket,让socket连接Producer的SocketServer,内部利用ObjectOutputStream将请求信息(接口信息,方法,参数)封装,通过socket传输。

其次Producer接到ObjectInputStream,将信息拆包(接口信息,方法,参数)。利用反射将接口实现类实例化(这就是为什么RPC框架客户端和服务端都需要有一致的接口类)。

最后Producer利用反射将业务处理完毕后,用ObjectOutputStream将结果封装,通过socket返回数据。Consumer接收到返回数据。

具体代码实现

//RPCServer实现
RPCServer.java
package com.itunic.rpc;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.Method;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * 
 * RPC服务端<br>
 * 基于SocketServer&反射实现
 * 
 * @ClassName RPCServer
 * @author yinbin
 * @website https://itunic.com
 * @Date 2017年6月23日 上午10:59:50
 * @version 1.0.0
 */
public class RPCServer {
 private static final Map<String, Class<?>> serviceMap = new ConcurrentHashMap<String, Class<?>>();
 private static final ExecutorService exec = Executors
            .newFixedThreadPool(Runtime.getRuntime().availableProcessors());
 private Integer port;
 private Boolean isRunning = true;

 public RPCServer(int port) {
 this.port = port;
    }

 public void register(Class<?> serviceInterface, Class<?> impl) {
        serviceMap.put(serviceInterface.getName(), impl);
    }

 public void start() throws IOException {
        ServerSocket ss = new ServerSocket();
 try {
            ss.bind(new InetSocketAddress(port));
 while (isRunning) {
                exec.execute(new ServiceTask(ss.accept()));
            }
        } finally {
            ss.close();
        }
    }

 private class ServiceTask implements Runnable {
        Socket socket = null;

 public ServiceTask(Socket socket) {
 this.socket = socket;
        }

 @Override
 public void run() {
            ObjectInputStream input = null;
            ObjectOutputStream output = null;
 try {
                input = new ObjectInputStream(socket.getInputStream());
                String serviceName = input.readUTF();
                String methodName = input.readUTF();
                Class<?>[] parameterTypes = (Class<?>[]) input.readObject();
                Object[] arguments = (Object[]) input.readObject();
                Class<?> serviceClass = serviceMap.get(serviceName);
 if (serviceClass == null) {
 throw new ClassNotFoundException(serviceName + " not found");
                }
                Method method = serviceClass.getMethod(methodName, parameterTypes);
                Object result = method.invoke(serviceClass.newInstance(), arguments);
                output = new ObjectOutputStream(socket.getOutputStream());
                output.writeObject(result);
                output.flush();
            } catch (Exception e) {
 // TODO 自动生成的 catch 块
                e.printStackTrace();
            } finally {
 if (null != output) {
 try {
                        output.close();
                    } catch (IOException e) {
 // TODO 自动生成的 catch 块
                        e.printStackTrace();
                    }
                }
 if (input != null) {
 try {
                        input.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
 if (socket != null) {
 try {
                        socket.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
        }

    }
}
//RPCClient实现
RPCClient.java
package com.itunic.rpc;

import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.net.InetSocketAddress;
import java.net.Socket;

/**
 * 
 * RPC客户端 基于动态代理&socket实现
 * 
 * @ClassName RPCClient
 * @author yinbin
 * @website https://itunic.com
 * @Date 2017年6月23日 上午10:59:04
 * @version 1.0.0
 */
public class RPCClient {
 @SuppressWarnings("unchecked")
 public static <T> T getRemoteProxyObj(final Class<?> serviceInterface, final InetSocketAddress addr) {
 return (T) Proxy.newProxyInstance(serviceInterface.getClassLoader(), new Class<?>[] { serviceInterface },
 new InvocationHandler() {

 @Override
 public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
                        Socket socket = null;
                        ObjectOutputStream output = null;
                        ObjectInputStream input = null;
 try {
                            socket = new Socket();
                            socket.connect(addr);
                            output = new ObjectOutputStream(socket.getOutputStream());
                            output.writeUTF(serviceInterface.getName());
                            output.writeUTF(method.getName());
                            output.writeObject(method.getParameterTypes());
                            output.writeObject(args);
                            input = new ObjectInputStream(socket.getInputStream());

 return input.readObject();
                        } finally {
 if (socket != null)
                                socket.close();
 if (output != null)
                                output.close();
 if (input != null)
                                input.close();
                        }
                    }
                });
    }
}
package com.itunic.rpc;

import java.io.IOException;

/**
 * 
 *  RPC服务端启动
 * @ClassName RPCServerAction
 * @author c
 * @Date 2017年6月23日 下午5:12:27
 * @version 1.0.0
 */
public class RPCServerAction {
 public static void main(String[] args) throws IOException {
        RPCServer server = new RPCServer(8888);
 /**
         * 注册相关的接口实现类
         */
        server.register(TestHello.class, TestHelloImpl.class);
        server.start();
    }
}

总结

RPC原理还是挺简单的,但可以有更好的实现。例如:socket用高并发框架netty替代,支持更多协议等。

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • 以编程方式执行Spark SQL查询的两种实现方式

    天策
  • Spark DAG调度

    天策
  • Hadoop之HDFS概念与体系结构

    天策
  • 使用nodejs和Java访问远程服务器的服务

    既然这篇文章用的是nodejs和Java访问远程服务器的服务,那么咱们先用另一门编程语言,SAP的ABAP(我日常工作使用得最多的编程语言)来开发一个服务吧。

    Jerry Wang
  • Apache CollectionUtils使用指南

    对集合判定‘空’的操作使用Apache的commons-collection的工具包。

    白凡
  • 8.4 Spring Boot集成Kotlin混合Java开发

    本章介绍Spring Boot集成Kotlin混合Java开发一个完整的spring boot应用:Restfeel,一个企业级的Rest API接口测试平台(...

    一个会写诗的程序员
  • 手写RPC框架第二章《netty通信》

    案例介绍 在我们实现rpc框架的时候,需要选择socket的通信方式。而我们知道一般情况下socket通信类似与qq聊天,发过去消息,什么时候回复都可以。但是我...

    小傅哥
  • 1 秒杀系统模拟基础实现,使用DB实现

    业务隔离:将秒杀业务独立出来,尽量不与其他业务关联,以减少对其他业务的依赖性。譬如秒杀业务只保留用户id,商品id,数量等重要属性,通过中间件发送给业务系统,完...

    天涯泪小武
  • android socket实现文件导出功能

    该功能主要描述如下:将SD卡中的文件通过socket导出到window文件夹中。 首先我要先介绍一个客户端和服务器端共有的一个文件类: UploadFi...

    提莫队长
  • Spring Boot 1.X和2.X优雅重启实战

    在重启之前首先发送重启命令到endpoint,或者用kill 进程ID的方式,千万不要用kill -9。

    猿天地

扫码关注云+社区

领取腾讯云代金券