首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >QTT(趣头条)长链接RPC框架

QTT(趣头条)长链接RPC框架

作者头像
FunTester
发布2019-10-29 16:33:32
9510
发布2019-10-29 16:33:32
举报
文章被收录于专栏:FunTesterFunTester

qrpc借鉴了谷歌grpc的核心理念(流+双向调用),但没有http2的历史包袱,实现上更轻量化,性能上也更优化(batch+writev),大致是grpc的2~3倍,并且内存上更节省(没有常驻的写协程)。此外还提供了非常实用的OverlayNetwork特性,使得长链接既可以直接跑在tcp之上,也可以跑在ws等协议之上,而不需改任何业务代码

写(干)在(货)文(地)前(址): https://github.com/zhiqiangxu/qrpc

喜欢的亲们,点点你们的小星星,这将是作者开源更多有价值框架的动力源泉。

轻量级通用长链接框架qrpc

qrpc 提供完整的服务端及客户端功能,并支持以下4种特性使得rpc变得极为容易:

  • 阻塞非阻塞
  • 流式非流式
  • 主动推送
  • 双向调用

默认是阻塞模式,也就是同一个长链接的请求是串行处理,类似http/1.1,但是通过微小的改动就可以切换到其他模式。

此外,qrpc还提供了桥接网络的特性,该特性使得多协议支持不费吹灰之力,同样的一套代码可以同时跑在tcp、websocket及任意已有的协议之下,详情参考ws/README.md;以及语法糖功能,使得服务的注册和调用极大地便利化,详情参考测试用例中的TestSugarPerformance。(由于用到了反射,因此会损失一部分性能,后续会增加基于代码生成的方式,届时性能将零损耗,敬请期待)

协议设计

qrpc提供请求->响应以及主动推送两大类的交互能力。

请求->响应又分为「阻塞非阻塞」以及「流式非流式」。

qrpc请求响应有相同的结构:,即代码中的Frame

每个帧包括8字节的唯一标识1字节的flag3字节的命令,以及不超过可配置上限长度的负荷

客户端会为每个请求帧自动生成8字节的唯一标识,服务端对应的响应帧会有相同的唯一标识。

通过这种方式,一个长链接可以同时发起多个请求,并且精确地知道每个请求对应的响应结果。

此外,请求响应都可以由多个组成,类似http中的chunked传输模式,这就是前面提到的流式非流式

而所有关于是否阻塞、是否流式、是否主动推送的元信息,都包含在头部1字节的flag之中!

话不多说,干货开始

阻塞模式

server.go

package main
import "github.com/zhiqiangxu/qrpc"

const (
    HelloCmd qrpc.Cmd = iota
    HelloRespCmd
)
func main() {
    // handler的作用是路由,根据请求帧的命令,分发到不同的处理子函数
    handler := qrpc.NewServeMux()
    // 注册HelloCmd命令对应的子函数
    handler.HandleFunc(HelloCmd, func(writer/*用于回写响应*/ qrpc.FrameWriter, request/*当前请求的相关信息*/ *qrpc.RequestFrame) {
        // 响应帧和请求帧有相同的唯一标识,并且这里把响应帧的命令设置为HelloRespCmd,会更方便调试
        writer.StartWrite(request.RequestID, HelloRespCmd, 0)

        // 负荷部分为:hello world + 请求帧的原始负荷
        writer.WriteBytes(append([]byte("hello world "), request.Payload...))

        // 前面的StartWrite和WriteBytes其实是构建响应帧的过程
        // 构建完毕后通过EndWrite触发实际的回写
        writer.EndWrite()
    })
    // ServerBinding用于配制想监听的端口以及对应的处理函数,如果想监听多个端口,提供多个即可
    bindings := []qrpc.ServerBinding{
        qrpc.ServerBinding{Addr: "0.0.0.0:8080", Handler: handler}}
    // 构建server
    server := qrpc.NewServer(bindings)
    // 开始监听
    server.ListenAndServe()
}

client.go

package main
import (
    "fmt"
    "github.com/zhiqiangxu/qrpc"
)

const (
    HelloCmd qrpc.Cmd = iota
)
func main() {
    // 采用默认配置
    conf := qrpc.ConnectionConfig{}

    // 建立一个qrpc的长链接
    conn, _ := qrpc.NewConnection("0.0.0.0:8080", conf, nil)

    // 发起一个命令为HelloCmd的请求帧,flag空,负荷为xu
    _, resp, _ := conn.Request(HelloCmd, 0/*no flags*/, []byte("xu"))
    // 获取响应
    frame, _ := resp.GetFrame()
    // 打印响应负荷
    fmt.Println("resp is", string(frame.Payload))
}

上面的例子中,由于flag为空,所以服务端会采用默认的串行处理模式。

非阻塞模式

要使用该模式,只需要修改client.go的一行代码:

-    _, resp, _ := conn.Request(HelloCmd, 0/*no flags*/, []byte("xu"))
+    _, resp, _ := conn.Request(HelloCmd, qrpc.NBFlag, []byte("xu"))

这样服务端便会并行处理这个长链接发来的请求!

流式

要使用该模式,只需要修改client.go的一行代码:

-    _, resp, _ := conn.Request(HelloCmd, 0/*no flags*/, []byte("xu"))
+    _, resp, _ := conn.Request(HelloCmd, qrpc.NBFlag, []byte("xu"))

这样服务端便会并行处理这个长链接发来的请求

流式请求

streamclient.go:

package main
import (
    "fmt"
    "github.com/zhiqiangxu/qrpc"
)

const (
    HelloCmd qrpc.Cmd = iota
)
func main() {
     // 采用默认配置
    conf := qrpc.ConnectionConfig{}

    // 建立一个qrpc的长链接
    conn, _ := qrpc.NewConnection("0.0.0.0:8080", conf, nil)

    // 采用流式发送HelloCmd请求,第一个请求帧的负荷是first frame
    writer, resp, _ := conn.StreamRequest(HelloCmd, 0, []byte("first frame"))
    // 构建第二个请求帧,负荷是last frame
    writer.StartWrite(HelloCmd)
    writer.WriteBytes([]byte("last frame"))
    // 发送请求,并标记流式结束
    writer.EndWrite(true) // will attach StreamEndFlag
    // 获取响应
    frame, _ := resp.GetFrame()
    // 打印响应负荷
    fmt.Println("resp is", string(frame.Payload))
}

streamserver.go:

package main
import (
    "github.com/zhiqiangxu/qrpc"
    "fmt"
)

const (
    HelloCmd qrpc.Cmd = iota
    HelloRespCmd
)
func main() {
    handler := qrpc.NewServeMux()
    handler.HandleFunc(HelloCmd, func(writer qrpc.FrameWriter, request *qrpc.RequestFrame) {
        // 首帧的处理类似非流式,只是EndWrite最后才会调用
        writer.StartWrite(request.RequestID, HelloRespCmd, 0)

        writer.WriteBytes(append([]byte("first frame "), request.Payload...))

        // 循环取出流式请求中的剩余帧
        for {
            continueFrames := <-request.FrameCh()
            // continueFrames为nil表示该请求的所有帧获取完毕
            if continueFrames == nil {
                break
            }
            // 将后续帧的负荷追加到响应中去
            writer.WriteBytes(append([]byte(" continue frame "), continueFrames.Payload...))
        }
        // 响应帧构建完毕,回写给客户端
        writer.EndWrite()
    })
    bindings := []qrpc.ServerBinding{
        qrpc.ServerBinding{Addr: "0.0.0.0:8080", Handler: handler}}
    server := qrpc.NewServer(bindings)
    err := server.ListenAndServe()
    if err != nil {
        panic(err)
    }
}

流式响应

package main
import (
    "github.com/zhiqiangxu/qrpc"
    "fmt"
)

const (
    HelloCmd qrpc.Cmd = iota
    HelloRespCmd
)
func main() {
    handler := qrpc.NewServeMux()
    handler.HandleFunc(HelloCmd, func(writer qrpc.FrameWriter, request *qrpc.RequestFrame) {
        // 构建流式响应的第一帧
        writer.StartWrite(request.RequestID, HelloRespCmd, qrpc.StreamFlag)
        writer.WriteBytes(append([]byte("first frame "), request.Payload...))
        // 第一帧构建完毕,发送
        writer.EndWrite()

        for {
            // 获取流式请求的后续帧
            continueFrames := <-request.FrameCh()
            if continueFrames == nil {
                break
            }

            fmt.Printf("%s\n", continueFrames.Payload)
            // 构建流式响应的后续帧
            if continueFrames.Flags.IsDone() {
                // 最后一帧,flag标记为qrpc.StreamEndFlag
                writer.StartWrite(request.RequestID, HelloRespCmd, qrpc.StreamEndFlag)
            } else {
                // 不是最后一帧,标记为qrpc.StreamFlag
                writer.StartWrite(request.RequestID, HelloRespCmd, qrpc.StreamFlag)
            }
            
            writer.WriteBytes(append([]byte(" continue frame "), continueFrames.Payload...))
            // 后续帧构建完毕,发送
            writer.EndWrite()
        }
    })
    bindings := []qrpc.ServerBinding{
        qrpc.ServerBinding{Addr: "0.0.0.0:8080", Handler: handler}}
    server := qrpc.NewServer(bindings)
    err := server.ListenAndServe()
    if err != nil {
        panic(err)
    }
}

关键是qrpc.StreamFlag!

推送模式

package main
import (
    "github.com/zhiqiangxu/qrpc"
    "sync"
)

const (
    HelloCmd qrpc.Cmd = iota
    HelloRespCmd
)
func main() {
    handler := qrpc.NewServeMux()
    handler.HandleFunc(HelloCmd, func(writer qrpc.FrameWriter, request *qrpc.RequestFrame) {
        var (
            wg    sync.WaitGroup
        )
        qserver := request.ConnectionInfo().SC.Server()
        pushID := qserver.GetPushID()
        // 遍历所有长链接
        qserver.WalkConn(0, func(writer qrpc.FrameWriter, ci *qrpc.ConnectionInfo) bool {
            qrpc.GoFunc(&wg, func() {
                // PushFlag表示主动推送
                writer.StartWrite(pushID, HelloCmd, qrpc.PushFlag)
                writer.WriteBytes([]byte("pushed msg"))
                writer.EndWrite()
            })
            return true
        })
        wg.Wait()

        writer.StartWrite(request.RequestID, HelloRespCmd, 0)
        writer.WriteBytes(append([]byte("push done"), request.Payload...))
        writer.EndWrite()
    })
    bindings := []qrpc.ServerBinding{
        qrpc.ServerBinding{Addr: "0.0.0.0:8080", Handler: handler}}
    server := qrpc.NewServer(bindings)
    err := server.ListenAndServe()
    if err != nil {
        panic(err)
    }
}

上述代码中,HelloCmd的处理子函数将给每个长链接推送一条消息!

客户端处理推送消息的方式如下:

-    conn, _ := qrpc.NewConnection("0.0.0.0:8080", conf, nil)
+    conn, _ := qrpc.NewConnection("0.0.0.0:8080", conf, func(conn *qrpc.Connection, pushedFrame *qrpc.Frame) {
+        fmt.Println(pushedFrame)
+    })

双向调用

前面的demo都是client调用server,其实client也可以注册回调让server调,参考测试用例中的TestClientHandler

Performance

性能大概是http的 4 倍!

适用场景&成果

1场景

推送、IM、微服务rpc、中间件

2设计原则

抽象、高性能、易用

3推送&IM系统使用成果

目前急于qrpc已经实现了长链接推送和IM系统:

长链接推送已经过日活800W、同时在线峰值200W的App的长期大流量验证;

IM系统经过多个300-500W日活App的大流量验证

两套系统均非常稳定,后期也可能会开源。(敬请期待)

4微服务场景产出

对于微服务场景,qrpc提供了强大的语法糖功能,RegisterService即可注册服务,UseService即可调用服务。

5中间件的使用

qrpc的流式特性非常适合多次交互的transaction场景,可用于中间件。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2019-10-24,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 FunTester 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • streamclient.go:
  • streamserver.go:
相关产品与服务
消息队列 TDMQ
消息队列 TDMQ (Tencent Distributed Message Queue)是腾讯基于 Apache Pulsar 自研的一个云原生消息中间件系列,其中包含兼容Pulsar、RabbitMQ、RocketMQ 等协议的消息队列子产品,得益于其底层计算与存储分离的架构,TDMQ 具备良好的弹性伸缩以及故障恢复能力。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档