前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >从0.5到1写个rpc框架 - 3:远程服务调用(thrift)

从0.5到1写个rpc框架 - 3:远程服务调用(thrift)

作者头像
acupt
发布2019-08-26 15:04:50
1.1K0
发布2019-08-26 15:04:50
举报
文章被收录于专栏:一杯82年的JAVA一杯82年的JAVA

这和上一篇差不多,只是换了种远程调用的框架,有兴趣也可以实现更多种方式,这里只做一种尝试。

thrift是Facebook开源的rpc框架,基于TPC,默认使用二进制。

需要先掌握thrift的基本用法: thrift-Java 示例

项目结构

代码语言:javascript
复制
- acuprpc
    + acuprpc-core  //server/client核心处理逻辑
    + acuprpc-protocol-thrift  //基于thrift实现远程调用
    + acuprpc-spring-boot-starter  //server端服务扫描,client端动态代理,服务注册/发现

thrift 通信

接口定义

定义服务提供者(server)和服务调用者(client)交流所用的数据结构,client需要告诉server要调用的类名、方法名以及参数(json格式的字符串,在server端再反序列化)。

resources/service.thrift

代码语言:javascript
复制
namespace java com.acupt.acuprpc.protocol.thrift.proto
service ThriftService{
    InvokeResponse invokeMethod(1:InvokeRequest invokeRequest)
}

struct InvokeRequest{
1: required string appName;
2: required string serviceName;
3: required string methodName;
4: required list<string> orderedParameter;
5: required map<string,string> namedParameter;
}

struct InvokeResponse{
1: required i32 code;
2: optional string message;
3: optional string result;
}

thrift-service

这个类负责接收 thrift-client 发过来的请求,取出请求中的参数,转换成通用的结构,交给core层的RpcServer去执行对应方法,然后将返回值序列化成json返回给 thrift-client。

代码语言:javascript
复制
public class ThriftService implements com.acupt.acuprpc.protocol.thrift.proto.ThriftService.Iface {

    private RpcServer rpcServer;

    public ThriftService(RpcServer rpcServer) {
        this.rpcServer = rpcServer;
    }

    @Override
    public InvokeResponse invokeMethod(InvokeRequest invokeRequest) {
        RpcRequest rpcRequest = new RpcRequest(
                invokeRequest.getAppName(),
                invokeRequest.getServiceName(),
                invokeRequest.getMethodName(),
                invokeRequest.getOrderedParameter(),
                invokeRequest.getNamedParameter());
        RpcResponse rpcResponse = rpcServer.execute(rpcRequest);
        InvokeResponse response = new InvokeResponse();
        response.setCode(rpcResponse.getCode());
        response.setMessage(rpcResponse.getMessage());
        response.setResult(rpcResponse.getResultString());
        return response;
    }
}

thrift-server

作物服务提供者的具体实现类,只需要实现两个方法:启动服务和关闭服务,其他的交给core层的父类即可。

由于thrift server 调用serve()方法后会阻塞线程,因此需要另外启动一个线程去开启服务。

代码语言:javascript
复制
public class ThriftServer extends RpcServer {
    private static final int nThreads = 100;
    private TServer server;
    public ThriftServer(RpcInstance rpcInstance) {
        super(rpcInstance);
    }

    @Override
    protected void startRpc() {
        new Thread(() -> {
            TProcessor tprocessor = new com.acupt.acuprpc.protocol.thrift.proto.ThriftService.
                    Processor<com.acupt.acuprpc.protocol.thrift.proto.ThriftService.Iface>(new ThriftService(this));
            TServerTransport serverTransport = null;
            try {
                serverTransport = new TServerSocket(getRpcInstance().getRpcConf().getPort());
            } catch (TTransportException e) {
                throw new RpcException(e);
            }
            TThreadPoolServer.Args tArgs = new TThreadPoolServer.Args(serverTransport);
            tArgs.processor(tprocessor);
            tArgs.executorService(Executors.newFixedThreadPool(nThreads));
            server = new TThreadPoolServer(tArgs);
            server.serve();//阻塞
        }).start();
    }

    @Override
    protected void shutdownRpc() {
        if (server != null) {
            server.setShouldStop(true);
        }
    }
}

thrift-client

作为服务调用者,需要把动态代理类传来的请求信息包装成thrift支持的结构,并调用thrift的请求方法,再把远程服务返回的结果返回给代理类。

thrift client 是线程不安全的,从它提供的方法就能够看出来。

代码语言:javascript
复制
public void send_invokeMethod(InvokeRequest invokeRequest){
    //...
}

public InvokeResponse recv_invokeMethod(){
    //...
}

public InvokeResponse invokeMethod(InvokeRequest invokeRequest) throws org.apache.thrift.TException
{
    send_invokeMethod(invokeRequest);
    return recv_invokeMethod();
}

为了简单直接在把方法设为 synchronized ,后续再使用对象池

代码语言:javascript
复制
public class ThriftClient extends RpcClient implements RpcCode {

    private AtomicReference<ThriftService.Client> clientRef;

    public ThriftClient(NodeInfo nodeInfo) {
        super(nodeInfo);
        clientRef = new AtomicReference<>(getClient(nodeInfo));
    }

    //todo client线程不安全,使用连接池管理
    @Override
    @SneakyThrows
    protected synchronized String remoteInvoke(RpcRequest rpcRequest) {
        InvokeRequest request = new InvokeRequest();
        request.setAppName(rpcRequest.getAppName());
        request.setServiceName(rpcRequest.getServiceName());
        request.setMethodName(rpcRequest.getMethodName());
        request.setOrderedParameter(rpcRequest.getOrderedParameter());
        InvokeResponse response = clientRef.get().invokeMethod(request);
        if (response.getCode() != SUCCESS) {
            throw new HttpStatusException(response.getCode(), response.getMessage());
        }
        return response.getResult();
    }

    @Override
    protected NodeInfo reconnectRpc(NodeInfo nodeInfo) {
        //...
    }

    @Override
    public void shutdownRpc() {
        close(clientRef.get());
    }

    private ThriftService.Client getClient(NodeInfo nodeInfo) {
        //...
    }

    private void close(ThriftService.Client client) {
        //...
    }
}
本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2019-07-14,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 一杯82年的JAVA 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 项目结构
  • thrift 通信
    • 接口定义
      • thrift-service
        • thrift-server
          • thrift-client
          相关产品与服务
          文件存储
          文件存储(Cloud File Storage,CFS)为您提供安全可靠、可扩展的共享文件存储服务。文件存储可与腾讯云服务器、容器服务、批量计算等服务搭配使用,为多个计算节点提供容量和性能可弹性扩展的高性能共享存储。腾讯云文件存储的管理界面简单、易使用,可实现对现有应用的无缝集成;按实际用量付费,为您节约成本,简化 IT 运维工作。
          领券
          问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档