Thrift 服务模型和序列化机制深入学习

http://www.liuqianfei.com/article/065b0f1ee59a4cf0b94a84c4e33af127

一、 Thrift 网络服务模型

Thrift 提供网络模型:单线程、多线程、事件驱动。从另一个角度划分为:阻塞服务模型、非阻塞服务模型。

  • 阻塞服务

TSimpleServer

TThreadPoolServer

  • 非阻塞服务模型

TNonblockingServer

THsHaServer

TThreadedSelectorServer

1、TSimpleServer

TSimpleServer 的实现非常的简单,循环监听新请求的到来并完成对请求的处理,是个单线程阻塞模型。由于一次只能接收和处理一个 socket 连接,效率比较低,在实际开发过程中很少用到它。

2、ThreadPoolServer

TThreadPoolServer 为解决了 TSimpleServer 不支持并发和多连接的问题,引入了线程池。但仍然是多线程阻塞模式即实现的模型是 One Thread Per Connection。

线程池采用线程数可伸缩的模式,线程池中的队列采用同步队列(SynchronousQueue)。

TThreadPoolServer 拆分了监听线程(accept)和处理客户端连接的工作线程(worker), 监听线程每接到一个客户端, 就投给线程池去处理。

服务端 ThriftTThreadPoolServer.java

import org.apache.thrift.TException;
import org.apache.thrift.TProcessor;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.server.TServer;
import org.apache.thrift.server.TThreadPoolServer;
import org.apache.thrift.transport.TServerSocket;

/**
 * 注册服务端
 *     线程池服务模型,使用标准的阻塞式 IO,预先创建一组线程处理请求
 */
public class ThriftTThreadPoolServer
{
    // 注册端口
    public static final int SERVER_PORT = 9981;

    public static void main(String[] args) throws TException
    {
        TProcessor tprocessor = new HelloWorld.Processor<HelloWorld.Iface>(new HelloWorldImpl());
        // 阻塞IO
        TServerSocket serverTransport = new TServerSocket(SERVER_PORT);
        // 多线程服务模型
        TThreadPoolServer.Args tArgs = new TThreadPoolServer.Args(serverTransport);
        tArgs.processor(tprocessor);
        // 客户端协议要一致
        tArgs.protocolFactory(new TBinaryProtocol.Factory());
         // 线程池服务模型,使用标准的阻塞式IO,预先创建一组线程处理请求。
        TServer server = new TThreadPoolServer(tArgs);
        System.out.println("Hello TThreadPoolServer....");
        server.serve(); // 启动服务
    }
}

客户端 BlockClient.java

/**
 * 客户端调用
 * 阻塞
 */
public class BlockClient
{
    public static final String SERVER_IP = "127.0.0.1";
    public static final int SERVER_PORT = 9981;
    public static final int TIMEOUT = 30000;

    public static void main(String[] args) throws TException
    {
        // 设置传输通道
        TTransport transport = new TSocket(SERVER_IP, SERVER_PORT, TIMEOUT);
        // 协议要和服务端一致
        // 使用二进制协议 
        TProtocol protocol = new TBinaryProtocol(transport);
        // 创建Client
        HelloWorld.Client client = new HelloWorld.Client(protocol);
        transport.open();
        String result = client.sayHello("thrift");
        System.out.println("result : " + result);
        // 关闭资源
        transport.close();
    }
}

TThreadPoolServer模式优点:

线程池模式中,数据读取和业务处理都交由线程池完成,主线程只负责监听新连接,因此在并发量较大时新连接也能够被及时接受。线程池模式比较适合服务器端能预知最多有多少个客户端并发的情况,这时每个请求都能被业务线程池及时处理,性能也非常高。

TThreadPoolServer模式缺点:

线程池模式的处理能力受限于线程池的工作能力,当并发请求数大于线程池中的线程数时,新请求也只能排队等待。

3、TNonblockingServer

TNonblockingServer 采用单线程非阻塞(NIO)的模式, 借助 Channel/Selector 机制, 采用 IO 事件模型来处理。所有的 socket 都被注册到 selector 中,在一个线程中通过 seletor 循环监控所有的 socket,每次 selector 结束时,处理所有的处于就绪状态的 socket,对于有数据到来的 socket 进行数据读取操作,对于有数据发送的 socket 则进行数据发送,对于监听 socket 则产生一个新业务 socket 并将其注册到 selector 中。

private void select()
{
    try
    {
        selector.select();  // wait for io events.
        // process the io events we received
        Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator();
        while (!stopped_ && selectedKeys.hasNext())
        {
            SelectionKey key = selectedKeys.next();
            selectedKeys.remove();
            if (key.isAcceptable())
            {
                handleAccept(); // deal with accept
            }
            else if (key.isReadable())
            {
                handleRead(key);    // deal with reads
            }
            else if (key.isWritable())
            {
                handleWrite(key); // deal with writes
            }
       }
    }
    catch (IOException e)
    {
    }
}

select 代码里对 accept/read/write 等 IO 事件进行监控和处理, 唯一可惜的这个单线程处理。当遇到 handler 里有阻塞的操作时, 会导致整个服务被阻塞住。

服务端 ThriftTNonblockingServer.java

import org.apache.thrift.TException;
import org.apache.thrift.TProcessor;
import org.apache.thrift.protocol.TCompactProtocol;
import org.apache.thrift.server.TNonblockingServer;
import org.apache.thrift.server.TServer;
import org.apache.thrift.transport.TFramedTransport;
import org.apache.thrift.transport.TNonblockingServerSocket;

/**
 * 注册服务端
 *     使用非阻塞式IO,服务端和客户端需要指定 TFramedTransport 数据传输的方式  
 */
public class ThriftTNonblockingServer
{
    // 注册端口
    public static final int SERVER_PORT = 9981;

    public static void main(String[] args) throws TException
    {
        // 处理器
        TProcessor tprocessor = new HelloWorld.Processor<HelloWorld.Iface>(new HelloWorldImpl());
        // 传输通道 - 非阻塞方式  
        TNonblockingServerSocket serverTransport = new TNonblockingServerSocket(SERVER_PORT);
        // 异步IO,需要使用TFramedTransport,它将分块缓存读取。  
        TNonblockingServer.Args tArgs = new TNonblockingServer.Args(serverTransport);
        tArgs.processor(tprocessor);
        tArgs.transportFactory(new TFramedTransport.Factory());
        // 使用高密度二进制协议 
        tArgs.protocolFactory(new TCompactProtocol.Factory());
        // 使用非阻塞式IO,服务端和客户端需要指定TFramedTransport数据传输的方式
        TServer server = new TNonblockingServer(tArgs);
        System.out.println("Hello TNonblockingServer....");
        server.serve(); // 启动服务
    }
}

客户端 NonblockingClient.java

import org.apache.thrift.TException;
import org.apache.thrift.protocol.TCompactProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.transport.TFramedTransport;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;

/**
 * 客户端调用
 * 非阻塞
 */
public class NonblockingClient
{
    public static final String SERVER_IP = "127.0.0.1";
    public static final int SERVER_PORT = 9981;
    public static final int TIMEOUT = 30000;

    public static void main(String[] args) throws TException
    {
        // 设置传输通道,对于非阻塞服务,需要使用TFramedTransport,它将数据分块发送
        TTransport transport = new TFramedTransport(new TSocket(SERVER_IP, SERVER_PORT, TIMEOUT));
        // 协议要和服务端一致
        //HelloTNonblockingServer
        // 使用高密度二进制协议 
        TProtocol protocol = new TCompactProtocol(transport);
        // HelloTHsHaServer
        // 使用二进制协议 
        // TProtocol protocol = new TBinaryProtocol(transport);
        HelloWorld.Client client = new HelloWorld.Client(protocol);
        transport.open();
        String result = client.sayHello("jack");
        System.out.println("result : " + result);
        // 关闭资源
        transport.close();
    }
}

TNonblockingServer模式优点:

相比于 TSimpleServer 效率提升主要体现在 IO 多路复用上,TNonblockingServer 采用非阻塞 IO,同时监控多个 socket 的状态变化;

TNonblockingServer模式缺点:

TNonblockingServer 模式在业务处理上还是采用单线程顺序来完成,在业务处理比较复杂、耗时的时候,例如某些接口函数需要读取数据库执行时间较长,此时该模式效率也不高,因为多个调用请求任务依然是顺序一个接一个执行。

4、THsHaServer

THsHaServer 类是 TNonblockingServer 类的子类,为解决 TNonblockingServer 的缺点, THsHaServer 引入了线程池去处理, 其模型把读写任务放到线程池去处理即多线程非阻塞模式。THsHaServer是: Half-sync/Half-async 的处理模式, Half-aysnc 是在处理 IO 事件上(accept/read/write io),Half-sync 用于 handler 对 rpc 的同步处理上。因此可以认为 THsHaServer 半同步半异步。

THsHaServer的优点:

与 TNonblockingServer 模式相比,THsHaServer 在完成数据读取之后,将业务处理过程交由一个线程池来完成,主线程直接返回进行下一次循环操作,效率大大提升;

THsHaServer的缺点:

主线程需要完成对所有 socket 的监听以及数据读写的工作,当并发请求数较大时,且发送数据量较多时,监听 socket 上新连接请求不能被及时接受。

5、TThreadedSelectorServer

TThreadedSelectorServer 是大家广泛采用的服务模型,其多线程服务器端使用非堵塞式 I/O 模型,是对 TNonblockingServer 的扩充, 其分离了 Accept 和 Read/Write 的 Selector 线程, 同时引入 Worker 工作线程池。

(1)一个 AcceptThread 线程对象,专门用于处理监听 socket 上的新连接;

(2)若干个 SelectorThread 对象专门用于处理业务 socket 的网络 I/O 操作,所有网络数据的读写均是有这些线程来完成;

(3)一个负载均衡器 SelectorThreadLoadBalancer 对象,主要用于 AcceptThread 线程接收到一个新 socket 连接请求时,决定将这个新连接请求分配给哪个 SelectorThread 线程。

(4)一个 ExecutorService 类型的工作线程池,在 SelectorThread 线程中,监听到有业务 socket 中有调用请求过来,则将请求读取之后,交给 ExecutorService 线程池中的线程完成此次调用的具体执行

MainReactor 就是 Accept 线程, 用于监听客户端连接, SubReactor 采用 IO 事件线程(多个),主要负责对所有客户端的 IO 读写事件进行处理。 而 Worker 工作线程主要用于处理每个 rpc 请求的 handler 回调处理(这部分是同步的)。

TThreadedSelectorServer 模式对于大部分应用场景性能都不会差,因为其有一个专门的线程 AcceptThread 用于处理新连接请求,因此能够及时响应大量并发连接请求;另外它将网络 I/O 操作分散到多个 SelectorThread 线程中来完成,因此能够快速对网络 I/O 进行读写操作,能够很好地应对网络 I/O 较多的情况。

服务端 ThriftTThreadedSelectorServer.java

import org.apache.thrift.TException;
import org.apache.thrift.TProcessor;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.server.TServer;
import org.apache.thrift.server.TThreadedSelectorServer;
import org.apache.thrift.transport.TFramedTransport;
import org.apache.thrift.transport.TNonblockingServerSocket;

/**
 * 多线程Half-sync/Half-async的服务模型.
 * 需要指定为: TFramedTransport 数据传输的方式。
 */
public class ThriftTThreadedSelectorServer
{

    // 注册端口
    public static final int SERVER_PORT = 9981;

    public static void main(String[] args) throws TException
    {
        TProcessor tprocessor = new HelloWorld.Processor<HelloWorld.Iface>(new HelloWorldImpl());
        // 传输通道 - 非阻塞方式  
        TNonblockingServerSocket serverTransport = new TNonblockingServerSocket(SERVER_PORT);

        // 多线程半同步半异步
        TThreadedSelectorServer.Args tArgs = new TThreadedSelectorServer.Args(serverTransport);
        tArgs.processor(tprocessor);
        tArgs.transportFactory(new TFramedTransport.Factory());
        // 二进制协议
        tArgs.protocolFactory(new TBinaryProtocol.Factory());
        // 多线程半同步半异步的服务模型
        TServer server = new TThreadedSelectorServer(tArgs);
        System.out.println("Hello TThreadedSelectorServer....");
        // 启动服务
        server.serve();
    }
}

二、Thrift 序列化机制

Thrift 提供了可扩展序列化机制, 不但兼容性好而且压缩率高。

thrift 数据格式描述

thrift 的向后兼容性(Version)借助属性标识(数字编号id + 属性类型type)来实现, 可以理解为在序列化后(属性数据存储由 field_name:field_value => id+type:field_value)

我们定义IDL文件形如

namespace java stu.thrift;

struct User {
  1: required string name
  2: required string address
}

是不是和我们使用序列化的数据 xml/json 有了很大的差别,那么我们来比较是常见的数据传输格式:

| 数据传输格式 | 类型 | 优点 | 缺点 |  | ————- | ————- | —– | —– |  | Xml | 文本 | 1、良好的可读性  2、序列化的数据包含完整的结构  3、调整不同属性的顺序对序列化/反序列化不影响 | 1、数据传输量大  2、不支持二进制数据类型 |  | Json | 文本 | 1、良好的可读性  2、调整不同属性的顺序对序列化/反序列化不影响 | 1、丢弃了类型信息, 比如"price”:100, 对price类型是int/double解析有二义性  2、不支持二进制数据类型 |  | Thrift | 二进制 | 高效 | 1、不宜读  2、向后兼容有一定的约定限制,采用id递增的方式标识并以optional修饰来添加 |  | Google Protobuf | 二进制 | 高效 | 1、不宜读  2、向后兼容有一定的约定限制 |

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏微信公众号:Java团长

Java Web基础面试题整理

a、API:有一个接口servlet,它是servlet规范中定义的用来处理客户端请求的程序需要实现的顶级接口。

55730
来自专栏软件开发 -- 分享 互助 成长

linux下进程相关操作

一、定义和理解 狭义定义:进程是正在运行的程序的实例。 广义定义:进程是一个具有一定独立功能的程序关于某个数据集合的一次运行活动。 进程的概念主要有两点: 第一...

27650
来自专栏博岩Java大讲堂

Java日志体系(log4j)

893110
来自专栏程序员互动联盟

【高级编程】linux进程间通信总结

1. 概览 本文记录经典的IPC:pipes, FIFOs, message queues, semaphores, and shared memory。 2....

57370
来自专栏乐百川的学习频道

Python 日志输出

打印日志是很多程序的重要需求,良好的日志输出可以帮我们更方便的检测程序运行状态。Python标准库提供了logging模块,让我们也可以方便的在Python中打...

48990
来自专栏Java架构师学习

Java程序员必知的并发编程艺术——并发机制的底层原理实现

Java编程语言允许线程访问共享变量,为了确保共享变量能被准确和一致的更新,线程应该确保通过排他锁单独获得这个变量。 volatile借助Java内存模型...

391110
来自专栏编程软文

快速上手友盟推送前后端

38250
来自专栏开发与安全

linux网络编程之POSIX 消息队列 和 系列函数

一、在前面介绍了system v 消息队列的相关知识,现在来稍微看看posix 消息队列。 posix消息队列的一个可能实现如下图: ? 其实消息队列就是一个可...

30700
来自专栏Python

flask celery 使用方法

由于celery4.0不支持window,如果在window上安装celery4.0将会出现下面的错误

58000
来自专栏互联网高可用架构

初识消息队列处理机模板KClientKafka客户端(KClient)

20130

扫码关注云+社区

领取腾讯云代金券