前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >gRPC四种类型示例分析【知识笔记】

gRPC四种类型示例分析【知识笔记】

作者头像
瓜农老梁
发布2020-04-23 09:38:17
2.1K0
发布2020-04-23 09:38:17
举报
文章被收录于专栏:瓜农老梁瓜农老梁瓜农老梁
目录
一、前言
二、ProtoBuf定义
三、代码结构
    1.服务端
    2.客户端
四、交互走查
    1.简单gRPC交互(UNARY)
    2.服务端到客户端流式交互
    3.客户端到服务端流式交互
    4.双向流式RPC
五、系列文章
一、前言

本文分析下gRPC支持类型的示例,Protobuf生成代码详见前面文章“Google Protocol Buffers三两事” 以及 Maven插件使用参见前面文章 “gRPC示例初探”;具体链接见本文结尾系列文章。gRPC提供四种服务类型,分别为:简单RPC、服务端到客户端流式RPC、客户端到服务端流式RPC、双向流式RPC。将“route_guide.proto”拷贝到工程目录,Maven编译时会生成代码。

二、ProtoBuf定义

下面Protobuf定义了gRPC提供的四种服务类型,走查下内容。

// @1 使用proto3语法
syntax = "proto3";
// @2 生成多个类
option java_multiple_files = true;
// @3 生成java类所在的包
option java_package = "io.grpc.examples.routeguide";
// @4 生成外层类类名
option java_outer_classname = "RouteGuideProto";
// @5 Objective-C类的前缀
option objc_class_prefix = "RTG";
// @6 .proto包名
package routeguide;
// @7 定义RPC服务RouteGuide
service RouteGuide {
  // @8 简单RPC接受Point参数返回Feature类型对象
  rpc GetFeature(Point) returns (Feature) {}
  // @9 服务端到客户端流式RPC,接受Rectangle对象参数,返回批量Feature数据
  rpc ListFeatures(Rectangle) returns (stream Feature) {}
  // @10 客户端到服务端流式RPC,接受批量Point数据,返回RouteSummary类型对象
  rpc RecordRoute(stream Point) returns (RouteSummary) {}
  // @11 双向流式RPC,接受批量RouteNote类型数据,返回批量RouteNote类型数据
  rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}
}
message Point {
  int32 latitude = 1;
  int32 longitude = 2;
}
message Rectangle {
  Point lo = 1;
  Point hi = 2;
}
message Feature {
  string name = 1;
  Point location = 2;
}
message FeatureDatabase {
  repeated Feature feature = 1;
}
message RouteNote {
  Point location = 1;
  string message = 2;
}
message RouteSummary {
  int32 point_count = 1;
  int32 feature_count = 2;
  int32 distance = 3;
  int32 elapsed_time = 4;
}
三、代码结构

编译工具生成了Protobuf定义的Message对应的类和Builder类、gRPC服务端对外提供的接口、客户端调用服务端的存根。

1.服务端

启动gRPC Server

public static void main(String[] args) throws Exception {
    RouteGuideServer server = new RouteGuideServer(8980);
    server.start();
    server.blockUntilShutdown();
 } // @1 启动Server监听处理客户端请求

 public RouteGuideServer(int port, URL featureFile) throws IOException {
    this(ServerBuilder.forPort(port), port, RouteGuideUtil.parseFeatures(featureFile));
 } // @2 将数据从文件route_guide_db.json读出

 public RouteGuideServer(ServerBuilder<?> serverBuilder, int port, Collection<Feature> features) {
    this.port = port;
    server = serverBuilder.addService(new RouteGuideService(features))
        .build();
 } // @3 将自定义实现的RouteGuideService注册到Server

实现gRPC服务接口

编译工具生成了gRPC服务端对外提供的接口,我们使用时需要实现该接口即可,即实际的Server端处理逻辑。

private static class RouteGuideService extends RouteGuideGrpc.RouteGuideImplBase {
}
// @1 RouteGuideGrpc.RouteGuideImplBase由编译器生成Server端接口类
// @2 RouteGuideService由用户实现的类处理Server端业务逻辑

小结:在服务端我们需要做实现生成的服务接口,并将该服务实现类注册到gRPC Server中。

2.客户端

构建通道和存根

 public static void main(String[] args) throws InterruptedException {
    String target = "localhost:8980";
    List<Feature> features;
    // @1 读取route_guide_db.json测试数据
    features = RouteGuideUtil.parseFeatures(RouteGuideUtil.getDefaultFeaturesFile());
    // @2 构建与Server的RPC通道
    ManagedChannel channel = ManagedChannelBuilder.forTarget(target).usePlaintext().build();
    RouteGuideClient client = new RouteGuideClient(channel);
 }

 public RouteGuideClient(Channel channel) {
   // @3 构建同步调用存根
   blockingStub = RouteGuideGrpc.newBlockingStub(channel);
   // @4 构建异步调用存根
   asyncStub = RouteGuideGrpc.newStub(channel);
 }
四、交互走查
1 简单gRPC交互(UNARY)

客户端调用

RouteGuideClient client = new RouteGuideClient(channel);
client.getFeature(409146138, -746188906);

Point request = Point.newBuilder().setLatitude(lat).setLongitude(lon).build();
// @1 使用同步调用存根向服务端发起请求,入参为普通对象
feature = blockingStub.getFeature(request);

System.out.printf("Found feature called %s, at %s, %s!%n",
                feature.getName(),
                RouteGuideUtil.getLatitude(feature.getLocation()),
                RouteGuideUtil.getLongitude(feature.getLocation()));

服务端响应

@Override
public void getFeature(Point request, StreamObserver<Feature> responseObserver) {
      // @2 通过responseObserver.onNext向客户端发送消息
      responseObserver.onNext(checkFeature(request));
      // @3 标记服务端响应完成
      responseObserver.onCompleted();
}

输出内容

Found feature called Berkshire Valley Management Area Trail, Jefferson, NJ, USA, at 40.9146138, -74.6188906!

备注:使用观察者模式响应客户端请求。

2.服务端到客户端流式交互

客户端调用

 RouteGuideClient client = new RouteGuideClient(channel);
 client.listFeatures(400000000, -750000000, 420000000, -730000000);
 public void listFeatures(int lowLat, int lowLon, int hiLat, int hiLon) {
    Iterator<Feature> features;
      // @1 客户端调用同步存根向服务端发起调用
      features = blockingStub.listFeatures(request);
      for (int i = 1; features.hasNext(); i++) {
        Feature feature = features.next();
        info("Result #" + i + ": {0}", feature);
      }
 }

服务端响应

public void listFeatures(Rectangle request, StreamObserver<Feature> responseObserver) {
  // ...
  for (Feature feature : features) {
    if (!RouteGuideUtil.exists(feature)) {
      continue;
    }
    int lat = feature.getLocation().getLatitude();
    int lon = feature.getLocation().getLongitude();
    if (lon >= left && lon <= right && lat >= bottom && lat <= top) {
      // @2 循环调用responseObserver.onNext向客户端回调发送数据
      responseObserver.onNext(feature);
    }
  }
  // @3 标记服务端响应完成
  responseObserver.onCompleted();
}

输出内容

Result #62: name: "3387 Richmond Terrace, Staten Island, NY 10303, USA"
location {
  latitude: 406411633
  longitude: -741722051
}
Result #63: name: "261 Van Sickle Road, Goshen, NY 10924, USA"
location {
  latitude: 413069058
  longitude: -744597778
}
//...

备注:服务端到客户端流式交互。即:SERVER_STREAMING。客户端通过存根发起RPC调用,由服务端多次调用onNext回调客户端完成响应。

3.客户端到服务端流式交互

客户端调用

RouteGuideClient client = new RouteGuideClient(channel);
client.recordRoute(features, 10);
public void recordRoute(List<Feature> features, int numPoints) throws InterruptedException {
    info("*** RecordRoute");
    final CountDownLatch finishLatch = new CountDownLatch(1);
    // @1 创建responseObserver用于服务端回调客户端
    StreamObserver<RouteSummary> responseObserver = new StreamObserver<RouteSummary>() {
      @Override
      // @2 响应服务端responseObserver.onNext回调
      public void onNext(RouteSummary summary) {
       // print
      }
      @Override
      // @3 响应服务端responseObserver.onError回调
      public void onError(Throwable t) {
       // print
      }
      @Override
      // @4 响应服务端responseObserver.onCompleted的回调
      public void onCompleted() {
        // print
        finishLatch.countDown();
      }
    };
// @5 通过异步存根发起调用,参数为响应观察者responseObserver
StreamObserver<Point> requestObserver = asyncStub.recordRoute(responseObserver);
    try {
      for (int i = 0; i < numPoints; ++i) {
        int index = random.nextInt(features.size());
        Point point = features.get(index).getLocation();
        // print
        // @6 多次调用requestObserver.onNext向服务端写入数据
        requestObserver.onNext(point);
        Thread.sleep(random.nextInt(1000) + 500);
        if (finishLatch.getCount() == 0) {
          return;
        }
      }
    } catch (RuntimeException e) {
      requestObserver.onError(e);
      throw e;
    }
    // @7 标记客户端写入结束
    requestObserver.onCompleted();
  }

服务端响应

@Override
public StreamObserver<Point> recordRoute(final StreamObserver<RouteSummary> responseObserver) {
  // @1 构造观察者与客户端交互
  return new StreamObserver<Point>() {
    int pointCount;
    int featureCount;
    int distance;
    Point previous;
    final long startTime = System.nanoTime();

    @Override
    // @2 响应客户端
    public void onNext(Point point) {
      pointCount++;
      if (RouteGuideUtil.exists(checkFeature(point))) {
        featureCount++;
      }
      if (previous != null) {
        distance += calcDistance(previous, point);
      }
      previous = point;
    }

    @Override
    public void onError(Throwable t) {
      logger.log(Level.WARNING, "recordRoute cancelled");
    }

    @Override
    // @3 在客户端调用requestObserver.onCompleted()时触发,标记服务端处理完成
    public void onCompleted() {
      long seconds = NANOSECONDS.toSeconds(System.nanoTime() - startTime);
      // @4 回调客户端responseObserver.onNext
      responseObserver.onNext(RouteSummary.newBuilder()
      .setPointCount(pointCount)
      .setFeatureCount(featureCount).setDistance(distance)
      .setElapsedTime((int) seconds).build());
      // @5 回调客户端responseObserver.onCompleted标记完成
      responseObserver.onCompleted();
    }
  };
}

输出内容

*** RecordRoute
Visiting point 40.213, -74.361
Visiting point 41.478, -74.062
Visiting point 40.915, -74.619
Visiting point 40.213, -74.361
Visiting point 40.408, -74.612
Visiting point 40.431, -74.028
Visiting point 40.964, -74.602
Visiting point 40.007, -74.679
Visiting point 41.268, -74.261
Visiting point 40.812, -74.4
Finished trip with 10 points. Passed 5 features. Travelled 761,415 meters. It took 110 seconds.
Finished RecordRoute

小结:客户端到服务端流式交互,即:CLIENT_STREAMING。客户端由异步存根asyncStub发起调用,参数为“responseObserver”;服务端通过onNext响应客户端请求,在客户端触发写入结束响应onCompleted后,服务端onCompleted被触发,调用响应观察者“responseObserver”回调到客户端完成结束操作。

4.双向流式RPC

客户端调用

RouteGuideClient client = new RouteGuideClient(channel);
CountDownLatch finishLatch = client.routeChat();
public CountDownLatch routeChat() {
final CountDownLatch finishLatch = new CountDownLatch(1);

StreamObserver<RouteNote> requestObserver =
    // @1 使用异步存根asyncStub向服务端发起调用    // 调用参数为响应观察者即:responseObserver    asyncStub.routeChat(new StreamObserver<RouteNote>() {
      @Override
      // @2 响应服务端responseObserver回调onNext
      public void onNext(RouteNote note) {
        // print...
      }
      @Override
      public void onError(Throwable t) {
        // print...
        finishLatch.countDown();
      }
      @Override
      // @3 响应服务端responseObserver回调onCompleted
      public void onCompleted() {
        info("Finished RouteChat");
        finishLatch.countDown();
      }
    });

RouteNote[] requests =
   {newNote("First message", 0, 0), newNote("Second message", 0, 1),
          newNote("Third message", 1, 0), newNote("Fourth message", 1, 1)};

for (RouteNote request : requests) {
    // @4 循环调用requestObserver.onNext向服务端写入消息
    requestObserver.onNext(request);
}

// @5 标记客户端写入完成
requestObserver.onCompleted();

return finishLatch;
}

服务端响应

@Override
public StreamObserver<RouteNote> routeChat(final StreamObserver<RouteNote> responseObserver) {
  return new StreamObserver<RouteNote>() {
    @Override
    // @1 当客户端调用requestObserver.onNext触发接受数据
    public void onNext(RouteNote note) {
      List<RouteNote> notes = getOrCreateNotes(note.getLocation());
      for (RouteNote prevNote : notes.toArray(new RouteNote[0])) {
        // @2 循环调用responseObserver.onNext回调客户端
        responseObserver.onNext(prevNote);
      }
      notes.add(note);
    }

    @Override
    public void onError(Throwable t) {
      logger.log(Level.WARNING, "routeChat cancelled");
    }

    @Override
    // @3 客户端调用requestObserver.onCompleted()时触发
    public void onCompleted() {
      // @4 回调客户端标记完成
      responseObserver.onCompleted();
    }
  };
}

输出内容

信息: *** RouteChat
信息: Sending message "First message" at 0, 0
信息: Sending message "Second message" at 0, 1
信息: Sending message "Third message" at 1, 0
信息: Sending message "Fourth message" at 1, 1
信息: Got message "First message" at 0, 0
信息: Got message "First message" at 0, 0
信息: Got message "First message" at 0, 0
信息: Got message "First message" at 0, 0
信息: Got message "First message" at 0, 0
信息: Got message "Second message" at 0, 1
信息: Got message "Second message" at 0, 1
信息: Got message "Second message" at 0, 1
信息: Got message "Second message" at 0, 1
信息: Got message "Second message" at 0, 1
信息: Got message "Third message" at 1, 0
信息: Got message "Third message" at 1, 0
信息: Got message "Third message" at 1, 0
信息: Got message "Third message" at 1, 0
信息: Got message "Third message" at 1, 0
信息: Got message "Fourth message" at 1, 1
信息: Got message "Fourth message" at 1, 1
信息: Got message "Fourth message" at 1, 1
信息: Got message "Fourth message" at 1, 1
信息: Got message "Fourth message" at 1, 1
信息: Finished RouteChat

小结:双向流式RPC,即:BIDI_STREAMING。客户端和服务端均通过StreamObserver来交互,客户端发起时传入responseObserver,服务端可以通过responseObserver对客户端进行回调。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2020-04-06,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 瓜农老梁 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 目录
  • 一、前言
  • 二、ProtoBuf定义
  • 三、代码结构
    • 1.服务端
      • 2.客户端
        • 四、交互走查
          • 1 简单gRPC交互(UNARY)
            • 2.服务端到客户端流式交互
              • 3.客户端到服务端流式交互
                • 4.双向流式RPC
                领券
                问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档