专栏首页GoUpUpGo 每日一库之 jsonrpc

Go 每日一库之 jsonrpc

简介

在上一篇文章中我们介绍了 Go 标准库net/rpc的用法。在默认情况下,rpc库内部使用gob格式传输数据。我们仿造gob的编解码器实现了一个json格式的。实际上标准库net/rpc/jsonrcp中已有实现。本文是对上一篇文章的补充。

快速使用

标准库无需安装。

首先是服务端,使用net/rpc/jsonrpc之后,我们就不用自己去编写json的编解码器了:

package main

import (
  "log"
  "net"
  "net/rpc"
  "net/rpc/jsonrpc"
)

type Args struct {
  A, B int
}

type Arith int

func (t *Arith) Multiply(args *Args, reply *int) error {
  *reply = args.A * args.B
  return nil
}

func main() {
  l, err := net.Listen("tcp", ":1234")
  if err != nil {
    log.Fatal("listen error:", err)
  }

  arith := new(Arith)
  rpc.Register(arith)

  for {
    conn, err := l.Accept()
    if err != nil {
      log.Fatal("accept error:", err)
    }

    // 注意这一行
    go rpc.ServeCodec(jsonrpc.NewServerCodec(conn))
  }
}

直接调用jsonrpc.NewServerCodec(conn)创建一个服务端的codec。客户端也是类似的:

func main() {
  conn, err := net.Dial("tcp", ":1234")
  if err != nil {
    log.Fatal("dial error:", err)
  }

  // 这里,这里?
  client := rpc.NewClientWithCodec(jsonrpc.NewClientCodec(conn))

  args := &Args{7, 8}
  var reply int
  err = client.Call("Arith.Multiply", args, &reply)
  if err != nil {
    log.Fatal("Multiply error:", err)
  }
  fmt.Printf("Multiply: %d*%d=%d\n", args.A, args.B, reply)
}

先运行服务端程序:

$ go run main.go

然后在一个新的控制台中运行客户端程序:

$ go run client.go
Multiply: 7*8=56

下面这段代码基本上每个使用jsonrpc的程序都要编写:

conn, err := net.Dial("tcp", ":1234")
if err != nil {
  log.Fatal("dial error:", err)
}

client := rpc.NewClientWithCodec(jsonrpc.NewClientCodec(conn))

因此jsonrpc为了方便直接提供了一个Dial方法。使用Dial简化上面的客户端程序:

func main() {
  client, err := jsonrpc.Dial("tcp", ":1234")
  if err != nil {
    log.Fatal("dial error:", err)
  }

  args := &Args{7, 8}
  var reply int
  err = client.Call("Arith.Multiply", args, &reply)
  if err != nil {
    log.Fatal("Multiply error:", err)
  }
  fmt.Printf("Multiply: %d*%d=%d\n", args.A, args.B, reply)
}

效果是一样的。

JSON-RPC 标准

JSON-RPC 1.0 标准在 2005 年发布,经过数年演化,于 2010 年发布了 2.0 版本。JSON-RPC 标准的内容可在https://www.jsonrpc.org/specification查看。Go 标准库net/rpc/jsonrpc实现了 1.0 版本。关于 2.0 版本的实现可以在pkg.go.dev上搜索json-rpc+2.0。本文以 1.0 版本为基础进行介绍。

JSON-RPC 传输的是单一的对象,序列化为 JSON 格式。请求对象包含以下 3 个属性:

  • method:请求调用的方法;
  • params:一个数组表示传给方法的各个参数;
  • id:请求 ID。ID 可以是任何类型,在收到响应时根据这个属性判断对应哪个请求。

响应对象包含以下 3 个属性:

  • result:方法返回的对象,如果error非空时,该属性必须为null
  • error:表示调用是否出错;
  • id:对应请求的 ID。

另外标准还定义了一种通知类型,除了id属性为null之外,通知对象的属性与请求对象完全一样。

调用client.Call("echo", "Hello JSON-RPC", &reply)时:

请求:{ "method": "echo", "params": ["Hello JSON-RPC"], "id": 1}
响应:{ "result": "Hello JSON-RPC", "error": null, "id": 1}

使用 zookeeper 实现简单的负载均衡

下面我们使用zookeeper实现一个简单的客户端侧的负载均衡。zookeeper中记录所有的可提供服务的服务器,客户端每次请求时都随机挑选一个。我们的示例中,请求必须是无状态的。首先,我们改造一下服务端程序,将监听地址提取出来,通过flag指定:

package main

import (
  "flag"
  "log"
  "net"
  "net/rpc"
  "net/rpc/jsonrpc"
)

var (
  addr *string
)

type Args struct {
  A, B int
}

type Arith int

func (t *Arith) Multiply(args *Args, reply *int) error {
  *reply = args.A * args.B
  return nil
}

func init() {
  addr = flag.String("addr", ":1111", "addr to listen")
}

func main() {
  flag.Parse()

  l, err := net.Listen("tcp", *addr)
  if err != nil {
    log.Fatal("listen error:", err)
  }

  arith := new(Arith)
  rpc.Register(arith)

  for {
    conn, err := l.Accept()
    if err != nil {
      log.Fatal("accept error:", err)
    }

    go rpc.ServeCodec(jsonrpc.NewServerCodec(conn))
  }
}

关于有哪些服务器可用,我们存储在zookeeper中。

首先要启动一个zookeeper的程序。在 Apache Zookeeper 官网可以下载能直接运行的 Windows 程序。下载之后解压,将conf文件夹中的样板配置zoo_sample.cfg复制一份,文件名改为zoo.cfg。在编辑器中打开zoo.cfg,将dataDir改为一个已存在的目录,或创建一个新目录。我在bin同级目录中创建了一个data目录,然后设置dataDir=../data。切换到bin目录下执行zkServer.batzookeeper程序就运行起来了。使用zkClient.bat连接上这个zookeeper,增加一个节点,设置数据:

$ create /rpcserver
$ set /rpcserver 127.0.0.1:1111,127.0.0.1:1112,127.0.0.1:1113

我们用,分隔多个服务器地址。

准备工作完成后,接下来就开始编写客户端代码了。我们实现一个代理类,负责监听zookeeper的数据变化,根据zookeeper中新的地址创建到服务器的连接,删除老的连接,将调用请求随机转发到一个服务器处理:

type Proxy struct {
  zookeeper     string
  clients       map[string]*rpc.Client
  events        <-chan zk.Event
  zookeeperConn *zk.Conn
  mutex         sync.Mutex
}

func NewProxy(addr string) *Proxy {
  return &Proxy{
    zookeeper: addr,
    clients:   make(map[string]*rpc.Client),
  }
}

这里我们使用了go-zookeeper这个库,需要额外安装:

$ go get github.com/samuel/go-zookeeper/zk

程序启动时,代理对象从zookeeper中获取服务端地址,创建连接:

func (p *Proxy) Connect() {
  c, _, err := zk.Connect([]string{p.zookeeper}, time.Second) //*10)
  if err != nil {
    panic(err)
  }

  data, _, event, err := c.GetW("/rpcserver")
  if err != nil {
    panic(err)
  }

  p.events = event
  p.zookeeperConn = c

  p.CreateClients(string(data))
}

func (p *Proxy) CreateClients(server string) {
  p.mutex.Lock()
  defer p.mutex.Unlock()

  addrs := strings.Split(server, ",")
  allAddr := make(map[string]struct{})
  for _, addr := range addrs {
    allAddr[addr] = struct{}{}
    if _, exist := p.clients[addr]; exist {
      continue
    }

    client, err := jsonrpc.Dial("tcp", addr)
    if err != nil {
      log.Println("jsonrpc Dial error:", err)
      continue
    }

    p.clients[addr] = client
    log.Println("new addr:", addr)
  }

  for addr := range p.clients {
    if _, exist := allAddr[addr]; !exist {
   // 不在 zookeeper 中的地址,删除对应连接
   oldClient.Close()
   delete(p.clients, addr)

      log.Println("delete addr", addr)
    }
  }
}

同时,需要监听zookeeper中的数据变化,当新增或删除某个服务端地址时,Proxy要及时更新连接:

func (p *Proxy) Run() {
  for {
    select {
    case event := <-p.events:
      if event.Type == zk.EventNodeDataChanged {
        data, _, err := p.zookeeperConn.Get("/rpcserver")
        if err != nil {
          log.Println("get zookeeper data failed:", err)
          continue
        }

        p.CreateClients(string(data))
      }
    }
  }
}

客户端主体程序使用Proxy结构非常方便:

package main

import (
  "flag"
  "fmt"
  "math/rand"
)

var (
  zookeeperAddr *string
)

func init() {
  zookeeperAddr = flag.String("addr", ":2181", "zookeeper address")
}

type Args struct {
  A, B int
}

func main() {
  flag.Parse()

  fmt.Println(*zookeeperAddr)
  p := NewProxy(*zookeeperAddr)
  p.Connect()

  go p.Run()

  for i := 0; i < 10; i++ {
    var reply int
    args := &Args{rand.Intn(1000), rand.Intn(1000)}
    p.Call("Arith.Multiply", args, &reply)
    fmt.Printf("%d*%d=%d\n", args.A, args.B, reply)
  }

  // sleep 过程中可以修改 zookeeper 中的数据
  time.Sleep(1 * time.Minute)

  // 使用新的地址做随机
  for i := 0; i < 100; i++ {
    var reply int
    args := &Args{rand.Intn(1000), rand.Intn(1000)}
    p.Call("Arith.Multiply", args, &reply)
    fmt.Printf("%d*%d=%d\n", args.A, args.B, reply)
  }
}

创建一个代理对象,在一个新的 goroutine 中监听zookeeper事件。然后通过ProxyCall调用远程服务端的方法:

func (p *Proxy) Call(method string, args interface{}, reply interface{}) error {
  var client *rpc.Client
  var addr string
  idx := rand.Int31n(int32(len(p.clients)))
  var i int32
  p.mutex.Lock()
  for a, c := range p.clients {
    if i == idx {
      client = c
      addr = a
      break
    }
    i++
  }
  p.mutex.Unlock()

  fmt.Println("use", addr)
  return client.Call(method, args, reply)
}

首先我们要启动 3 个服务端程序,分别监听端口 1111、1112、1113,需要 3 个控制台:

控制台 1:

$ go run main.go -addr :1111

控制台 2:

$ go run main.go -addr :1112

控制台 3:

$ go run main.go -addr :1113

客户端在一个新的控制台启动,指定zookeeper地址:

$ go run . -addr=127.0.0.1:2181

在输出中,我们可以看到是怎么随机挑选服务器的。

我们可以尝试在客户端程序运行的过程中,将某个服务器地址从zookeeper中删除。我特意在程序中加了一个 1 分钟的延迟。在sleep过程中,通过zkClient.cmd127.0.0.1:1113这个地址从zookeeper中删除:

$ set /rpcserver 127.0.0.1:1111,127.0.0.1:1112

控制台输出:

$ 2020/05/10 23:47:47 delete addr 127.0.0.1:1113

并且后续的请求不会再发到127.0.0.1:1113这个服务器了。

其实,在实际的项目中,Proxy一般是一个独立的服务器,而不是放在客户端侧。上面示例这样处理只是为了方便。

总结

RPC 底层可以使用各种协议传输数据,JSON/XML/Protobuf 都可以。对 rpc 感兴趣的建议看看rpcx这个库,https://github.com/smallnest/rpcx。非常强大!

大家如果发现好玩、好用的 Go 语言库,欢迎到 Go 每日一库 GitHub 上提交 issue?

参考

  1. jsonrpc GitHub:https://golang.org/pkg/net/rpc/jsonrpc/
  2. Go 每日一库 GitHub:https://github.com/darjun/go-daily-lib

本文分享自微信公众号 - GoUpUp(GoUp-Up),作者:dj

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2020-05-11

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • Go 每日一库之 email

    程序中时常有发送邮件的需求。有异常情况了需要通知管理员和负责人,用户下单后可能需要通知订单信息,电商平台、中国移动和联通都有每月账单,这些都可以通过邮件来推送。...

    用户7731323
  • Go 每日一库之 goquery

    goquery是用 Go 语言编写的一个类似于 jQuery 的库。它基于 HTML 解析库net/html和 CSS 库cascadia,提供与 jQuery...

    用户7731323
  • Go 每日一库之 plot

    本文介绍 Go 语言的一个非常强大、好用的绘图库——plot。plot内置了很多常用的组件,基本满足日常需求。同时,它也提供了定制化的接口,可以实现我们的个性化...

    用户7731323
  • win2008 r2 安装sqlserver 2000问题的解决方法

    最近服务器升级了win2008 r2系统,考虑到用户额需要,sqlserver使用了2000,其实个人建议安装sql2005或sql2008,但也不能考虑一些朋...

    习惯说一说
  • 如何使用流媒体接入网关实现拉RTSP流转推RTMP流到流媒体服务器?

    我们团队有一款完善的流媒体接入软件网关EasyRTMPLive,即软件编码器,可以实现将RTSP、RTMP、HTTP、HLS等各种各样的网络流媒体先拉取到本地,...

    EasyNVR
  • 打造自己最喜爱的 Windows10 —— 系统与软件配置优化篇

    快捷键 win + i 打开 设置 ,在 更新与安全 - Windows 更新 里检查更新,等待驱动安装完毕重启系统

    cnguu
  • python基础教程:内置函数(二)

    input([prompt]) 如果存在 prompt 实参,则将其写入标准输出,末尾不带换行符。接下来,该函数从输入中读取一行,将其转换为字符串(除了末尾的...

    一墨编程学习
  • 1w5000字概括ES6全部特性

    第三次阅读阮一峰老师的《ECMAScript 6 入门》了,以前阅读时不细心,很多地方都是一目十行。最近这次阅读都是逐个逐个字来读,发现很多以前都没有注意到的知...

    前端迷
  • 一万元搭建深度学习系统:硬件、软件安装教程,以及性能测试

    来源:量子位 作者:Slav Ivanov@blog.slavv.com 编译:问耕 本文长度为4600字,建议阅读6分钟 本文教你万元打造一个深度学习系统。 ...

    数据派THU
  • 一万元搭建深度学习系统:硬件、软件安装教程,以及性能测试

    Macbook这种轻薄的笔记本,是搞不了深度学习的。亚马逊P2云服务,会给堆积越来越多的账单,换个便宜的服务,训练时间又太长…… 没办法,已经十多年没用过台式机...

    小莹莹

扫码关注云+社区

领取腾讯云代金券