前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >GO实现高可用高并发分布式系统:使用gRPC实现一对多和多对多交互

GO实现高可用高并发分布式系统:使用gRPC实现一对多和多对多交互

作者头像
望月从良
发布2022-03-28 15:38:49
1K0
发布2022-03-28 15:38:49
举报
文章被收录于专栏:Coding迪斯尼Coding迪斯尼

在上一节我们使用gRPC实现了客户端和服务端的一对一通讯,也就是客户端向服务端发出一个请求,服务端返回一个结果。但是在很多场景下可能需要客户端向服务端连续发送多个请求后,服务端才能进行处理然后返回一个结果,例如客户端向服务端发送多个订单号,让服务端对订单号进行记录,然后服务端把所有订单号记录后返回结果;或者是客户端发送一个订单号查询所有大于给定订单号的交易记录,然后服务端返回满足条件的十几条记录等。

我们首先看看服务端给客户端返回多条记录的情形。在gRPC中,可以连续发送多条数据的对象叫stream,该对象支持异步发送,假设客户端要查询所有订单号大于10的交易记录,假设在服务端存储了满足条件的记录有20条,那么服务端可以先返回5条,等5分钟后再返回10条,然后等20分钟后再返回5条,因此客户端在接收记录时需要做相应的异步处理。

我们首先修改proto文件如下:

代码语言:javascript
复制
ervice OrderManagement {
    rpc getOrder(google.protobuf.StringValue) returns(Order);
    rpc searchOrders(google.protobuf.StringValue) returns (stream Order);
}

上面代码中的stream表明,当客户端通过searchOrders接口向服务器发出请求时,它需要通过stream对象来获取一系列从服务器返回的Order数据。按照上一节的方法再次编译proto文件后,我们看看它内容的改变,使用searchOrders作为关键字在生成的pb.go文件中查询我们可以看到如下内容:

代码语言:javascript
复制
type OrderManagementClient interface {
    GetOrder(ctx context.Context, in *wrappers.StringValue, opts ...grpc.CallOption) (*Order, error)

    SearchOrders(ctx context.Context, in *wrappers.StringValue, opts ...grpc.CallOption) (OrderManagement_SearchOrdersClient, error)

}
。。。

type OrderManagement_SearchOrdersClient interface {
    Recv() (*Order, error)
    grpc.ClientStream
}

type orderManagementSearchOrdersClient struct {
    grpc.ClientStream
}

func (x *orderManagementSearchOrdersClient) Recv() (*Order, error) {
    m := new(Order)
    if err := x.ClientStream.RecvMsg(m); err != nil {
        return nil, err
    }
    return m, nil
}

这段代码表明客户端在调用searchOrders接口时它会获得一个名为OrderManagement_SearchOrdersClient的对象,这个对象实现了一个接口叫Recv,我们不难猜测到时候客户端需要调用Recv()来接收服务端返回的一系列Order数据。继续往下查可以看到如下代码:

代码语言:javascript
复制
// OrderManagementServer is the server API for OrderManagement service.
type OrderManagementServer interface {
    GetOrder(context.Context, *wrappers.StringValue) (*Order, error)

    SearchOrders(*wrappers.StringValue, OrderManagement_SearchOrdersServer) error

}

。。。。

type OrderManagement_SearchOrdersServer interface {
    Send(*Order) error
    grpc.ServerStream
}

type orderManagementSearchOrdersServer struct {
    grpc.ServerStream
}

func (x *orderManagementSearchOrdersServer) Send(m *Order) error {
    return x.ServerStream.SendMsg(m)
}

上面代码代码表明,服务端在实现searchOrders接口时需要使用一个名为OrderManagement_SearchOrdersServer的对象,它用于一个接口叫Send,我们不难猜测服务端将调用这个接口给客户端发送一系列Order数据,我们首先看服务端代码的实现,在server/main.go中增加代码如下:

代码语言:javascript
复制
func (s *server) SearchOrders(searchQuery *wrappers.StringValue, 
    stream pb.OrderManagement_SearchOrdersServer) error {
        for key, order := range orderMap {
            log.Print(key, order)
            for _, itemStr := range order.Items {
                log.Print(itemStr)
                if strings.Contains(itemStr, searchQuery.Value) {
                    err := stream.Send(&order)
                    if err != nil {
                        return fmt.Errorf("error sending message to stream: %v", err)
                    }
                    log.Print("Matching Order Found: " + key)
                    break
                }

            }
        }
        return nil //返回nil,gRPC会关闭服务器发往客户端的数据管道
    }

服务端通过实现SearchOrders接口来执行业务逻辑,其中stream的类型为OrderManagement_SearchOrdersServer,它有gRPC框架传给我们,通过前面的分析我们知道它有接口Send, 函数的输入参数searchQuery其实就是客户端发送过来的订单号字符串,代码从该数据结构拿到订单号后,从数据存储中进行查询,把所有查到的满足条件的Order数据通过Send发送给客户端。这里需要注意的是,客户端在接收数据过程中可能由于多种原因中断连接,这时服务端调用Send就会返回错误,同时还需要注意的是当服务端发送完所有数据后,一定要return nil,这样gRPC才会把发送管道给关闭调。

同理我们看客户端的实现,在client/main.go的main函数中添加如下代码:

代码语言:javascript
复制

    searchStream, _ := client.SearchOrders(ctx, &wrapper.StringValue{Value: "Google"})
    //如果server 使用stream传输结果,客户端需要使用Recv()接收多个返回
    for {
        searchOrder, err := searchStream.Recv()
        if err == io.EOF {
            log.Print("EOF")
            break
        }
        if err == nil {
            log.Print("Search result: ", searchOrder)
        }
    }

从前面代码查询可以看到,客户端调用SearchOrder时会返回一个orderManagementSearchOrdersClient对象,它实现了接口Recv()用来接收服务端发送来的一连串数据,所以在上面代码实现中,我们在for循环中调用Recv()接口不断接收服务端发送的数据,如果数据发送完了,前面服务端通过return nil断掉连接后,客户端就会在调用Recv时得到io.EOF错误,这是就可以中断对Recv()的调用。

以上是客户端发送一个请求,服务端返回一系列结果,我们看看反过来,客户端发送一系列请求,服务端返回一个结果,首先还是修改proto文件,增加一个接口定义:

代码语言:javascript
复制
service OrderManagement {
    rpc getOrder(google.protobuf.StringValue) returns(Order);

    rpc searchOrders(google.protobuf.StringValue) returns (stream Order);
    rpc updateOrders(stream Order) returns (google.protobuf.StringValue);
}

updateOrders就是新增加的接口,注意到它对应的输入参数使用了stream来修饰,也就是说客户端会给服务端连续发送一系列Order数据,服务端处理后只返回一个StringValue结构,我们可以使用前面的搜索方法在新编译后的pb.go文件里查询新增加的接口,同样道理,服务端在实现该接口是,也是在一个for循环中使用Recv接口来获取客户端发送的一系列数据,在server/main.go中添加代码如下:

代码语言:javascript
复制
func (s *server) UpdateOrders(stream pb.OrderManagement_UpdateOrdersServer) error {
    ordersStr := "Updated Order IDs: "
    for {
        order, err := stream.Recv()
        if err == io.EOF {
            //通知客户端不用继续发送
            return stream.SendAndClose(&wrapper.StringValue{Value: "Orders processed" + ordersStr})
        }

        orderMap[order.Id] = *order 
        log.Printf("Order ID ", order.Id, ": Updated")
        ordersStr += order.Id + ", "
    }
}

代码的实现逻辑跟前面客户端实现的服务请求逻辑一样,相当于服务端和客户端的角色颠倒了一下。这里需要注意的是服务端如何给客户端返回结果,代码中调用了SendAndClose,它把返回结果传输给客户端的同时将连接关闭,于是客户端就不能继续再给服务端发送数据。我们看看客户端的实现,在client/main.go中添加代码如下:

代码语言:javascript
复制
updOrder1 := pb.Order{Id: "102", Items:[]string{"Google Pixel 3A", "Google Pixel Book"}, Destination:"Mountain View, CA", Price:1100.00}
    updOrder2 := pb.Order{Id: "103", Items:[]string{"Apple Watch S4", "Mac Book Pro", "iPad Pro"}, Destination:"San Jose, CA", Price:2800.00}
    updOrder3 := pb.Order{Id: "104", Items:[]string{"Google Home Mini", "Google Nest Hub", "iPad Mini"}, Destination:"Mountain View, CA", Price:2200.00}

    updateStream, err := client.UpdateOrders(ctx)
    if err != nil {
        log.Fatalf("%v.UpdateOrders(_) = , %v", client, err)
    }

    if err := updateStream.Send(&updOrder1); err != nil {
        log.Fatalf("%v.Send(%v) = %v", updateStream, updOrder1, err)
    }

    if err := updateStream.Send(&updOrder2); err != nil {
        log.Fatalf("%v.Send(%v) = %v", updateStream, updOrder2, err)
    }

    if err := updateStream.Send(&updOrder3); err != nil {
        log.Fatalf("%v.Send(%v) = %v", updateStream, updOrder3, err)
    }

    updateRes, err := updateStream.CloseAndRecv()
    if err != nil {
        log.Fatalf("%v.CloseAndRecv() got error %v, want %v", updateStream, err, nil)
    }
    log.Printf("Update orders res: %s", updateRes)

客户端先是构造一系列Order数据然后分别调用多次Send传递给服务端,如果客户端没有多余数据要传输后,它调用CloseAndRecv(),这个函数会让服务端的Recv()返回io.EOF错误,然后客户端阻塞等待服务端将处理结果返回。

最后我们看客户端给服务端发送一系列数据,然后服务端返回一系列结果给客户端的情况。假设客户端给服务端发送了一系列订单信息,服务端收到订单信息后,把收货地址相同的货物信息合在一起发送给客户端,我们用shipment表示收货地址相同的货物信息组合。如果客户端发送order1, order2,order3, order4 等4个订单号给服务端,其中order1 ,order3 对应货物的收货地址一样, order2, order4对应的收货地址一样,于是服务端就返回两个shipment结构,第一个对应order1, order3, 第二个对应order2, order4,我们先看proto文件的修改:

代码语言:javascript
复制
service OrderManagement {
    rpc getOrder(google.protobuf.StringValue) returns(Order);

    rpc searchOrders(google.protobuf.StringValue) returns (stream Order);
    rpc updateOrders(stream Order) returns (google.protobuf.StringValue);
    rpc processOrders(stream google.protobuf.StringValue) returns (stream CombinedShipment);
}

message Order {
    string id = 1;
    repeated string items = 2;
    string description = 3;
    float price = 4;
    string destination = 5;
}

message CombinedShipment {
    string id = 1;
    string status = 2;
    repeated Order orderList = 3;
}

我们先看服务端的实现,在server/main.go中添加如下代码:

代码语言:javascript
复制
func (s *server) ProcessOrder(stream pb.OrderManagement_ProcessOrdersServer) error {
    batchMarker := 1
    var combinedShipmentMap = make(map[string]pb.CombinedShipment)
    for {
        orderId, err := stream.Recv()
        log.Printf("Reading Proc order: %s", orderId)
        if err == io.EOF {
            log.Printf("EOF: %s", orderId)
            for _, shipment := range combinedShipmentMap {
                if err := stream.Send(&shipment); err != nil {
                    return err 
                }
            }
            return nil //返回nil,gRPC框架会关闭调server发送给客户端的管道
        }
        if err != nil {
            log.Println(err)
            return err 
        }
        destination := orderMap[orderId.GetValue()].Destination 
        shipment, found := combinedShipmentMap[destination]
        if found {
            ord := orderMap[orderId.GetValue()]
            shipment.OrdersList = append(shipment.OrderList, &ord)
            combinedShipmentMap[destination] = shipment 
        } else {
            comShip := pb.CombinedShipment{Id: "cmb - " + (orderMap[orderId.GetValue()].Destination), Status: "Processed!",}
            ord := orderMap[orderId.GetValue()]
            comShip.OrdersList = append(shipment.OrdersList, &ord)
            combinedShipmentMap[destination] = comShip 
            log.Print(len(comShip.OrdersList), comShip.GetId())
        }

        if batchMarker == orderBatchSize {
            for _, comb := range combinedShipmentMap {
                log.Printf("Shipping: %v -> %v", comb.Id, len(comb.OrdersList))
                if err := stream.Send(&comb); err != nil {
                    return err 
                }
            }
            batchMarker = 0
            combinedShipmentMap = make(map[string]pb.CombinedShipment)
        } else {
            batchMarker++
        }
    }
}

上面代码实现我们只需要注意几点,首先它使用一个stream对象来完成两个功能,一个功能是调用Recv()来接收客户端发送的多个数据,然后同样是这个对象,继续调用它的Send接口给客户端发送多个数据,也就是一个stream对象既负责接收客户端发送的一系列数据,又负责将服务端的一系列处理结果发送给客户端,把握这一点就行,其他那些业务逻辑无关紧要。

我们再看看客户端的实现,在client/main.go中添加如下代码:

代码语言:javascript
复制
func main() {
。。。
    channel := make(chan struct{})
    go asncClientBidirectionalRPC(streamProcOrder, channel)
    time.Sleep(time.Milliscond * 1000)

    if err := streamProcOrder.Send(&wrapper.StringValue{Value: "101"}); err != nil {
        log.Fatalf("%v.Send(%v) = %v", client, "101", err)
    }

    if err := streamProcOrder.CloseSend(); err != nil {
        log.Fatal(err)
    }
    channel <- struct{}{} 

}

func asncClientBidirectionalRPC(streamProcOrder pb.OrderManagement_ProcessOrdersClient, c chan struct{}) {
    for {
        combinedShipment, errorProcOrder := streamProcOrder.Recv()
        if errProcOrder == io.EOF {
            break 
        }
        log.Printf("Combined shipment: ", combinedShipment.OrdersList)
    }
    <-c
}

上面代码实现中有一个关键点需要把握,客户端也是通过一个stream对象来完成数据的发送和接收,同时我们要特别注意到,同一个stream对象发送和接收完全可以在异步的条件下同时进行,所有上面代码在主函数main里通过Send发送请求,然后扔出一个goroutine异步接收服务端发送回来的数据,虽然发送和接收同时进行但客户端不用加锁,也就是gRPC框架保证了发送和接收在异步情况下业务逻辑依然不会出错。

相关代码从上一节的github路径可以获取。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2022-01-22,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 Coding迪斯尼 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
数据保险箱
数据保险箱(Cloud Data Coffer Service,CDCS)为您提供更高安全系数的企业核心数据存储服务。您可以通过自定义过期天数的方法删除数据,避免误删带来的损害,还可以将数据跨地域存储,防止一些不可抗因素导致的数据丢失。数据保险箱支持通过控制台、API 等多样化方式快速简单接入,实现海量数据的存储管理。您可以使用数据保险箱对文件数据进行上传、下载,最终实现数据的安全存储和提取。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档