学习
实践
活动
工具
TVP
写文章
专栏首页DDD再识RPC-thrift

再识RPC-thrift

RPC

原理

什么是Stub?

Stub是一段代码,用来转换RPC过程中传递的参数。处理内容包括不同OS之间的大小端问题。另外,Client端一般叫Stub,Server端一般叫Skeleton。

生产方式:

  1. 手动生成,比较麻烦;
  2. 自动生成,使用IDL(InterfaceDescriptionLanguate),定义C/S的接口

RPC的套路:

自古深情留不住 唯有套路留人心

RPC最本质的就是通过socket把方法信息传输到远程服务器并执行相应method

在java界的rpc框架的实现手法:

  • 服务端:socket + 反射
  • 客户端:动态代理 + socket

之前也解析过motain框架,《motain客服端分析》、《motain服务端分析》

thrift

由于我司框架是通过thrift改造,发现这个框架没有按java套路出牌,可能这是跨语言类RPC的套路,有必要了解一下

thrift最初由facebook开发用做系统内各语言之间的RPC通信 。2007年由facebook贡献到apache基金 ,08年5月进入apache孵化器,支持多种语言之间的RPC方式的通信:php语言client可以构造一个对象,调用相应的服务方法来调用java语言的服务 ,跨越语言的C/S RPC调用   

示例

IDL文件

//HelloService.thrfit
namespace java com.jack.thrift
service HelloService{
    string helloString(1:string what)
}

生成代码

运行  thrift -gen HelloService.thrfit

会生成一个HelloService类

实现服务端与客服端

让服务端打印出客户端传入的参数

服务端

public class ThriftServer {

    /**
     * 启动thrift服务器
     * @param args
     */
    public static void main(String[] args) throws Exception {
        try {
            System.out.println("服务端开启....");
            TProcessor tprocessor = new HelloService.Processor<HelloService.Iface>(new HelloServiceImpl());
            // 简单的单线程服务模型
            TServerSocket serverTransport = new TServerSocket(9898);
            TServer.Args tArgs = new TServer.Args(serverTransport);
            tArgs.processor(tprocessor);
            tArgs.protocolFactory(new TBinaryProtocol.Factory());
            TServer server = new TSimpleServer(tArgs);
            server.serve();
        }catch (Exception e) {
            e.printStackTrace();
        }
    }

}

客户端

public class ThriftClient {

    public static void main(String[] args) {
        System.out.println("客户端启动....");
        TTransport transport = null;
        try {
            transport = new TSocket("localhost", 9898, 30000);
            // 协议要和服务端一致
            TProtocol protocol = new TBinaryProtocol(transport);
            HelloService.Client client = new HelloService.Client(protocol);
            transport.open();
            String result = client.helloString("哈哈");
            System.out.println(result);
        } catch (TTransportException e) {
            e.printStackTrace();
        } catch (TException e) {
            e.printStackTrace();
        } finally {
            if (null != transport) {
                transport.close();
            }
        }
    }
}

解析

可以看出server,client代码相对很简单,主要看看生成的HelloService类,这个类就是stub代码

来看一下,这个类是如何封装,把method和args传输到远程的

client

HelloService.Client client = new HelloService.Client(protocol);
String result = client.helloString("哈哈");

关键点在HelloService.Client.helloString()方法

public String helloString(String what) throws org.apache.thrift.TException
    {
      send_helloString(what);
      return recv_helloString();
    }

发送消息

public void send_helloString(String what) throws org.apache.thrift.TException
    {
      helloString_args args = new helloString_args();
      args.setWhat(what);
      sendBase("helloString", args);
    }
  1. 把args抽象成了一个类
  2. 属性赋值
  3. 发送

主要看下sendBase()方法

private void sendBase(String methodName, TBase<?,?> args, byte type) throws TException {
    oprot_.writeMessageBegin(new TMessage(methodName, type, ++seqid_));
    args.write(oprot_);
    oprot_.writeMessageEnd();
    oprot_.getTransport().flush();
  }
  • 1.oprot_.writeMessageBegin 根据Protocol写数据,比如这儿使用的TBinaryProtocol,以二进制写数据
public void writeMessageBegin(TMessage message) throws TException {
    if (strictWrite_) {
      int version = VERSION_1 | message.type;
      writeI32(version);
      writeString(message.name);
      writeI32(message.seqid);
    } else {
      writeString(message.name);
      writeByte(message.type);
      writeI32(message.seqid);
    }
  }

再深入看看怎么写二进制数据的

int类型

public void writeI32(int i32) throws TException {
    inoutTemp[0] = (byte)(0xff & (i32 >> 24));
    inoutTemp[1] = (byte)(0xff & (i32 >> 16));
    inoutTemp[2] = (byte)(0xff & (i32 >> 8));
    inoutTemp[3] = (byte)(0xff & (i32));
    trans_.write(inoutTemp, 0, 4);
  }

string类型,先写长度,再写bytes

public void writeString(String str) throws TException {
    try {
      byte[] dat = str.getBytes("UTF-8");
      writeI32(dat.length);
      trans_.write(dat, 0, dat.length);
    } catch (UnsupportedEncodingException uex) {
      throw new TException("JVM DOES NOT SUPPORT UTF-8");
    }
  }

这儿写最终还是使用Transport.write,比如这儿使用的TSocket

public void write(byte[] buf, int off, int len) throws TTransportException {
    if (outputStream_ == null) {
      throw new TTransportException(TTransportException.NOT_OPEN, "Cannot write to null outputStream");
    }
    try {
      outputStream_.write(buf, off, len);
    } catch (IOException iox) {
      throw new TTransportException(TTransportException.UNKNOWN, iox);
    }
  }

就是写到

outputStream_ = new BufferedOutputStream(socket_.getOutputStream(), 1024);
  • 2.args.write(oprot_);
public void write(org.apache.thrift.protocol.TProtocol oprot, helloString_args struct) throws org.apache.thrift.TException {
        struct.validate();

        oprot.writeStructBegin(STRUCT_DESC);
        if (struct.what != null) {
          oprot.writeFieldBegin(WHAT_FIELD_DESC);
          oprot.writeString(struct.what);
          oprot.writeFieldEnd();
        }
        oprot.writeFieldStop();
        oprot.writeStructEnd();
      }

这就是写field,也就是向输出流里写参数内容

  • 3.oprot_.writeMessageEnd(); 这表示消息写完成了,各个协议处理不同,比如二进制就是空实现,但如json就需要写个"}",以完成json格式
  • 4.oprot_.getTransport().flush(); 直接flush
/**
   * Flushes the underlying output stream if not null.
   */
  public void flush() throws TTransportException {
    if (outputStream_ == null) {
      throw new TTransportException(TTransportException.NOT_OPEN, "Cannot flush null outputStream");
    }
    try {
      outputStream_.flush();
    } catch (IOException iox) {
      throw new TTransportException(TTransportException.UNKNOWN, iox);
    }
  }

client总结

整个发送消息就结束了,虽然没有按套路使用动态代理,而是通过生成的stub代码,把methodName,args给封装好了

server

服务端也没有通过反射的方式

主要逻辑在生成的HelloService$Processor类中

public static class Processor<I extends Iface> extends org.apache.thrift.TBaseProcessor<I> implements org.apache.thrift.TProcessor {
    private static final org.slf4j.Logger _LOGGER = org.slf4j.LoggerFactory.getLogger(Processor.class.getName());
    public Processor(I iface) {
      super(iface, getProcessMap(new java.util.HashMap<String, org.apache.thrift.ProcessFunction<I, ? extends org.apache.thrift.TBase>>()));
    }

    protected Processor(I iface, java.util.Map<String, org.apache.thrift.ProcessFunction<I, ? extends org.apache.thrift.TBase>> processMap) {
      super(iface, getProcessMap(processMap));
    }

    private static <I extends Iface> java.util.Map<String,  org.apache.thrift.ProcessFunction<I, ? extends org.apache.thrift.TBase>> getProcessMap(java.util.Map<String, org.apache.thrift.ProcessFunction<I, ? extends  org.apache.thrift.TBase>> processMap) {
      processMap.put("helloString", new helloString());
      return processMap;
    }

    public static class helloString<I extends Iface> extends org.apache.thrift.ProcessFunction<I, helloString_args> {
      public helloString() {
        super("helloString");
      }

      public helloString_args getEmptyArgsInstance() {
        return new helloString_args();
      }

      protected boolean isOneway() {
        return false;
      }

      @Override
      protected boolean handleRuntimeExceptions() {
        return false;
      }

      public helloString_result getResult(I iface, helloString_args args) throws org.apache.thrift.TException {
        helloString_result result = new helloString_result();
        result.success = iface.helloString(args.what);
        return result;
      }
    }

  }
  • 1.先看构造函数
protected Processor(I iface, java.util.Map<String, org.apache.thrift.ProcessFunction<I, ? extends org.apache.thrift.TBase>> processMap) {
      super(iface, getProcessMap(processMap));
    }

    private static <I extends Iface> java.util.Map<String,  org.apache.thrift.ProcessFunction<I, ? extends org.apache.thrift.TBase>> getProcessMap(java.util.Map<String, org.apache.thrift.ProcessFunction<I, ? extends  org.apache.thrift.TBase>> processMap) {
      processMap.put("helloString", new helloString());
      return processMap;
    }

这段把methodName与对应的处理类映射,那后面的事就简单了,当接受到消息,取得methodName,通过map获取对就的处理类回调就可以

public static class helloString<I extends Iface> extends org.apache.thrift.ProcessFunction<I, helloString_args> {
      public helloString() {
        super("helloString");
      }

      public helloString_args getEmptyArgsInstance() {
        return new helloString_args();
      }

      protected boolean isOneway() {
        return false;
      }

      @Override
      protected boolean handleRuntimeExceptions() {
        return false;
      }

      public helloString_result getResult(I iface, helloString_args args) throws org.apache.thrift.TException {
        helloString_result result = new helloString_result();
        result.success = iface.helloString(args.what);
        return result;
      }
    }

处理类,继承ProcessFunction类,实现getResult(),这个方法就是调用了对应service.helloString()

可以再深入看一下,在socket监听消息时

client = serverTransport_.accept();
        if (client != null) {
          processor = processorFactory_.getProcessor(client);
          inputTransport = inputTransportFactory_.getTransport(client);
          outputTransport = outputTransportFactory_.getTransport(client);
          inputProtocol = inputProtocolFactory_.getProtocol(inputTransport);
          outputProtocol = outputProtocolFactory_.getProtocol(outputTransport);
          if (eventHandler_ != null) {
            connectionContext = eventHandler_.createContext(inputProtocol, outputProtocol);
          }
          while (true) {
            if (eventHandler_ != null) {
              eventHandler_.processContext(connectionContext, inputTransport, outputTransport);
            }
            if(!processor.process(inputProtocol, outputProtocol)) {
              break;
            }
          }

关键行:processor.process(inputProtocol, outputProtocol)

public boolean process(TProtocol in, TProtocol out) throws TException {
    TMessage msg = in.readMessageBegin();
    ProcessFunction fn = processMap.get(msg.name);
    if (fn == null) {
      TProtocolUtil.skip(in, TType.STRUCT);
      in.readMessageEnd();
      TApplicationException x = new TApplicationException(TApplicationException.UNKNOWN_METHOD, "Invalid method name: '"+msg.name+"'");
      out.writeMessageBegin(new TMessage(msg.name, TMessageType.EXCEPTION, msg.seqid));
      x.write(out);
      out.writeMessageEnd();
      out.getTransport().flush();
      return true;
    }
    fn.process(msg.seqid, in, out, iface);
    return true;
  }

这就很明显了,通过methodName从map中取得ProccessFunction,再执行process方法,调用相应service的方法

总结

虽然thrift没有按以往java套路出牌,但最根本的把method发送到远程执行是一致的。可能对于多语言来讲,便于所以语言一致性,的确需要通过生成的stub代码手法来实现RPC

当然thrift并不简单,还有很多的内容需要深挖学习,但至少这个简单示例可以了解跨语言型的RPC,相关IDL,Stub的知识,有清晰认知,而不局限于概念

文章分享自微信公众号:
码农戏码

本文参与 腾讯云自媒体分享计划 ,欢迎热爱写作的你一起参与!

作者:朱兴生
原始发表时间:2018-10-12
如有侵权,请联系 cloudcommunity@tencent.com 删除。
登录 后参与评论
0 条评论

相关文章

  • JS魔法堂:再识instanceof

    一、Breif                                  大家都知道instanceof一般就是用来检查A对象是否为B类或子类的实例。那...

    ^_^肥仔John
  • JS魔法堂:再识Number type

    Brief                                   本来只打算理解JS中0.1 + 0.2 == 0.300000000000000...

    ^_^肥仔John
  • 再识云计算前世今生来世

    云计算,当我第一次听说这个词的时候,是在2015年吧。可以说直到现在对于这个概念都不是十分理解。直到上个月看了这本书《大话云计算》。

    葆宁
  • JS魔法堂:再识IE的内存泄露

    一、前言                               IE6~8除了不遵守W3C标准和各种诡异外,我想最让人诟病的应该是内存泄露的问题了。这阵子...

    ^_^肥仔John
  • JS魔法堂:再识Bitwise Operation & Bitwise Shift

    Brief                                 linkFly的《JavaScript-如果...没有方法》中提及如何手写Math....

    ^_^肥仔John
  • RPC-Thrift(一)

      注意点:1)Thrift客户端和服务端使用的I/O模型必须一致,上例中都是使用阻塞式同步I/O模型。

    在周末
  • RPC-Thrift(四)

        同步客户端比较简单,以RPC-Thrift(一)中的的例子为基础进行研究源码,先看一下类图。

    在周末
  • RPC-Thrift(二)

        阻塞Server使用TServerSocket,它封装了ServerSocket实例,ServerSocket实例监听到客户端的请求会创建一个Socke...

    在周末
  • RPC-Thrift(三)

        TCompactProtocol:高效率,密集的二进制编码格式,使用了zigzag压缩算法,使用了类似于ProtocolBuffer的Variable-...

    在周末
  • Netty4.x 的逆袭之路 —— 再识 Netty

    在netty数据传输过程中可以有很多选择,比如;字符串、json、xml、java对象,但为了保证传输的数据具备;良好的通用性、方便的操作性和传输的高性能,我们...

    星尘的一个朋友
  • JS魔法堂:再识ASCII实体、符号实体和字符实体

    一、前言                                            相信大家都熟悉通过字符实体 &nbsp; 来实现多个连续空格的输...

    ^_^肥仔John
  • Spark通识

    在说Spark之前,笔者在这里向对Spark感兴趣的小伙伴们建议,想要了解、学习、使用好Spark,Spark的官网是一个很好的工具,几乎能满足你大部分需求。

    大数据学习与分享
  • (二)深入浅出TCPIP之再识TCP,理解TCP三次握手(上)

    TCP作为一种可靠传输控制协议,其核心思想:既要保证数据可靠传输,又要提高传输的效率,而用三次握手恰恰可以满足以上两方面的需求!

    用户3479834
  • (三)深入浅出TCPIP之再识TCP,理解TCP四次挥手(上)

    上篇文章对于"三次握手”做了说明。本节我们对不常听见的“四次挥手”为大家详尽, 直观,完整地绍“挥手”的过程。 所谓的四次挥手即tcp连接的释放(解除)。连接的...

    用户3479834
  • Spark通识

    在说Spark之前,笔者在这里向对Spark感兴趣的小伙伴们建议,想要了解、学习、使用好Spark,Spark的官网是一个很好的工具,几乎能满足你大部分需求。...

    大数据学习与分享
  • AI听曲识歌!哼曲、口哨吹,都能秒识! ⛵

    本文讲解音频检索技术及其广泛的应用场景。以『听曲识歌』为例,技术流程为具对已知歌曲抽取特征并构建特征向量库,而对于待检索的歌曲音频,同样做特征抽取后进行比对和快...

    ShowMeAI
  • 状态模式通识篇

    状态模式也是行为型模式中的一种,顾名思义状态模式主要是基于对象有不同的状态,从而导致具有与其对应状态的行为。

    RobinsonZhang
  • 初识JMM_一!,识J

    JMM:(Java Memory Model的缩写) 作用:缓存一致性协议,用于定义数据读写的规则。

    全栈程序员站长
  • Java集合 | 重识HashMap

    在Java中,Map接口主要定义了映射容器的一些基本属性,包括长度(size)、是否为空(isEmpty)、获取(get)、存放(put)、移除(remove)...

    搬砖俱乐部

扫码关注腾讯云开发者

领取腾讯云代金券