本文利用java自带的socket编程实现了一个简单的rpc调用框架,由两个工程组成分别名为battercake-provider(服务提供者)、battercake-consumer(服务调用者)。
设计思路如下: 1、在battercake-provider中,写一个服务叫BatterCakeService
2、在battercake-provider中,启动RpcProvider,发布该服务
3、在battercake-consumer中,启动测试类RpcTest
4、在battercake-consumer中,利用jdk动态代理,获得BatterCakeService的动态代理类BatterCakeService$Proxy0
5、在battercake-consumer中,动态代理类BatterCakeService$Proxy0,与battercake-provider建立socket连接,battercake-provider针对每一个连接,都会启动一个ServerThread处理请求,代理类则发送服务参数等相关信息
6、在battercake-consumer中,接收battercake-provider的ServerThread请求返回的结果。
上述过程时序图如下所示
接下来上代码!!
本部分的工程为battercake-provider,项目结构图如下图所示
先上使用的部分的代码 先创建一个微服务,接口如下
package com.rjzheng.service;public interface BatterCakeService { /** * 卖煎饼的服务 * @param name * @return */
public String sellBatterCake(String name);
}
实现类如下
package com.rjzheng.service.impl;import com.rjzheng.service.BatterCakeService;public class BatterCakeServiceImpl implements BatterCakeService { @Override
public String sellBatterCake(String name) { // TODO Auto-generated method stub
return name+"煎饼,卖的特别好";
}
}
接下来就是发布服务
package com.rjzheng.start;import com.rjzheng.rpc.RpcProvider;import com.rjzheng.service.BatterCakeService;import com.rjzheng.service.impl.BatterCakeServiceImpl;public class RpcBootStrap { public static void main(String[] args) throws Exception {
BatterCakeService batterCakeService =new BatterCakeServiceImpl(); //发布卖煎饼的服务,注册在20006端口
RpcProvider.export(20006,batterCakeService);
}
}
接下来是rpc框架调用部分的代码,RpcProvider,该部分代码可以总结为两步
package com.rjzheng.rpc;import java.net.ServerSocket;import java.net.Socket;import java.util.ArrayList;import java.util.Arrays;import java.util.List;/** * RPC服务提供器 * @author zhengrongjun * */public class RpcProvider {
//存储注册的服务列表
private static List<Object> serviceList;
/** * 发布rpc服务 * @param object * @param port * @throws Exception */
public static void export(int port,Object... services) throws Exception {
serviceList=Arrays.asList(services);
ServerSocket server = new ServerSocket(port);
Socket client = null; while (true) { //阻塞等待输入
client = server.accept(); //每一个请求,启动一个线程处理
new Thread(new ServerThread(client,serviceList)).start();
}
}
}
接下来ServerThread线程处理类的代码,ServerThread主要做以下几个步骤
package com.rjzheng.rpc;import java.io.IOException;import java.io.ObjectInputStream;import java.io.ObjectOutputStream;import java.lang.reflect.Method;import java.net.Socket;import java.util.List;public class ServerThread implements Runnable { private Socket client = null; private List<Object> serviceList = null; public ServerThread(Socket client, List<Object> service) { this.client = client; this.serviceList = service;
} @Override
public void run() {
ObjectInputStream input = null;
ObjectOutputStream output = null; try {
input = new ObjectInputStream(client.getInputStream());
output = new ObjectOutputStream(client.getOutputStream()); // 读取客户端要访问那个service
Class serviceClass = (Class) input.readObject(); // 找到该服务类
Object obj = findService(serviceClass); if (obj == null) {
output.writeObject(serviceClass.getName() + "服务未发现");
} else { //利用反射调用该方法,返回结果
try {
String methodName = input.readUTF();
Class<?>[] parameterTypes = (Class<?>[]) input.readObject();
Object[] arguments = (Object[]) input.readObject();
Method method = obj.getClass().getMethod(methodName, parameterTypes);
Object result = method.invoke(obj, arguments);
output.writeObject(result);
} catch (Throwable t) {
output.writeObject(t);
}
}
} catch (Exception e) {
e.printStackTrace();
} finally { try {
client.close();
input.close();
output.close();
} catch (IOException e) { // TODO Auto-generated catch block
e.printStackTrace();
}
}
} private Object findService(Class serviceClass) { // TODO Auto-generated method stub
for (Object obj : serviceList) { boolean isFather = serviceClass.isAssignableFrom(obj.getClass()); if (isFather) { return obj;
}
} return null;
}
}
本部分的工程为battercake-consumer,项目结构图如下图所示
先上rpc框架调用部分的代码RpcConsumer,步骤分两步
package com.rjzheng.rpc;import java.lang.reflect.Proxy;public class RpcConsumer {
public static <T> T getService(Class<T> clazz,String ip,int port) {
ProxyHandler proxyHandler =new ProxyHandler(ip,port); return (T)Proxy.newProxyInstance(RpcConsumer.class.getClassLoader(), new Class<?>[] {clazz}, proxyHandler);
}
}
接下来上代理类处理器的代码,代理类处理步骤分以下几步
package com.rjzheng.rpc;import java.io.ObjectInputStream;import java.io.ObjectOutputStream;import java.lang.reflect.InvocationHandler;import java.lang.reflect.Method;import java.net.Socket;import com.rjzheng.service.BatterCakeService;public class ProxyHandler implements InvocationHandler { private String ip; private int port; public ProxyHandler(String ip, int port) { // TODO Auto-generated constructor stub
this.ip = ip; this.port = port;
} @Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { // TODO Auto-generated method stub
Socket socket = new Socket(this.ip, this.port);
ObjectOutputStream output = new ObjectOutputStream(socket.getOutputStream());
ObjectInputStream input = new ObjectInputStream(socket.getInputStream()); try {
output.writeObject(proxy.getClass().getInterfaces()[0]);
output.writeUTF(method.getName());
output.writeObject(method.getParameterTypes());
output.writeObject(args);
output.flush();
Object result = input.readObject(); if(result instanceof Throwable) { throw (Throwable) result;
} return result;
} finally {
socket.shutdownOutput();
}
}
}
接下来建立一个测试类RpcTest如下(跑该测试类前,记得运行在battercake-provider端的RpcBootstrap类发布BatterCakeService服务)
package com.rjzheng.start;import com.rjzheng.rpc.RpcConsumer;import com.rjzheng.service.BatterCakeService;public class RpcTest { public static void main(String[] args) {
BatterCakeService batterCakeService=RpcConsumer.getService(BatterCakeService.class, "127.0.0.1", 20006);
String result=batterCakeService.sellBatterCake("双蛋");
System.out.println(result);
}
}
输出结果如下
双蛋煎饼,卖的特别好
至此,我们就实现了一个简易的rpc服务调用框架