前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >gRPC背压流控、压缩及JSON通信【知识笔记】

gRPC背压流控、压缩及JSON通信【知识笔记】

作者头像
瓜农老梁
发布2020-05-08 18:02:07
2.9K0
发布2020-05-08 18:02:07
举报
文章被收录于专栏:瓜农老梁瓜农老梁
目录
代码语言:javascript
复制
一、压缩
    1.Server端所有方法压缩
    2.Server单独方法压缩
    3.Client请求内容压缩
二、使用JSON通信
    1.方法描述使用JSON编译
    2.JSON编译具体过程
三、手动流量控制
    1.Consuming Side
    2.Producing Side
四、系列文章

本文继续整理gRPC的使用,走查解读官方给出的压缩示例、使用JSON通信以及手动流量控制。

一、压缩
1.Server端所有方法压缩
代码语言:javascript
复制
server = ServerBuilder.forPort(port)
.intercept(new ServerInterceptor() {
  @Override
  public <ReqT, RespT> Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call, Metadata headers,
      ServerCallHandler<ReqT, RespT> next) {
    // @1 在拦截器中设置压缩算法
    call.setCompression("gzip");
    return next.startCall(call, headers);
  }
})
.addService(new GreeterImpl())
.build()
.start();

备注:如果需要在Server端所有方法进行压缩,可以在ServerInterceptor拦击器中通过setCompression进行设置。

2.Server单独方法压缩

如果不想对所有的方法传输内容压缩,gPRC提供了单独方法的压缩。

代码语言:javascript
复制
int port = 50051;
server = ServerBuilder.forPort(port)
    .addService(new GreeterImpl())
    .build()
    .start();
static classGreeterImplextendsGreeterGrpc.GreeterImplBase{

@Override
public void sayHello(HelloRequest req, StreamObserver<HelloReply> plainResponseObserver) {
  ServerCallStreamObserver<HelloReply> responseObserver =
      (ServerCallStreamObserver<HelloReply>) plainResponseObserver;
  // @1 对单个方法传输内容进行压缩
  responseObserver.setCompression("gzip");
  HelloReply reply = HelloReply.newBuilder().setMessage("Hello " + req.getName()).build();
  responseObserver.onNext(reply);
  responseObserver.onCompleted();
}
}

备注:单个方法的压缩通过ServerCallStreamObserver的setCompression进行单独设置。

3.Client请求内容压缩

客户端对请求内容进行压缩,下面示例通过gzip进行压缩。

代码语言:javascript
复制
publicvoidgreet(String name) {
HelloRequest request = HelloRequest.newBuilder().setName(name).build();
HelloReply response;
try {
  // @1 对请求内容设置压缩类型
  response = blockingStub.withCompression("gzip").sayHello(request);
} catch (StatusRuntimeException e) {
  logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus());
  return;
}
二、使用JSON通信

gRPC可以通过Json格式进行通信,虽然并不建议这么做,Json的效率要远低于ProtoBuf。看下示例是如何通过Json格式通信的。

1.方法描述使用JSON编译

对方法的出参和入参使用JSON适配器,示例中通过MethodDescriptor.toBuilder重写出入参数的解析格式。

代码语言:javascript
复制
static final MethodDescriptor<HelloRequest, HelloReply> METHOD_SAY_HELLO =
GreeterGrpc.getSayHelloMethod()
    .toBuilder(
 // @1 请求参数使用JSON编译   JsonMarshaller.jsonMarshaller(HelloRequest.getDefaultInstance()),
// @2 返回参数使用JSON编译
 JsonMarshaller.jsonMarshaller(HelloReply.getDefaultInstance()))
    .build();
2.JSON编译具体过程

既然通过对方法的出入参数编译成JSON格式,看下gRPC是如何做的呢?

代码语言:javascript
复制
public static <T extends Message> Marshaller<T> jsonMarshaller(final T defaultInstance) {
    final Parser parser = JsonFormat.parser();
    final Printer printer = JsonFormat.printer();
    return jsonMarshaller(defaultInstance, parser, printer);
}
public static <T extends Message> Marshaller<T> jsonMarshaller(
      final T defaultInstance, final Parser parser, final Printer printer) {
final Charset charset = Charset.forName("UTF-8");
return new Marshaller<T>() {
  @Override
  public InputStream stream(T value) {
    try {
      // @1 通过printer.print将出入参数转换为JSON格式
      return new ByteArrayInputStream(printer.print(value).getBytes(charset));
    } catch (InvalidProtocolBufferException e) {
      // ...
    }
  }
// ....
}

备注:在JsonFormat.print方法中进行具体的请求/返回参数转换为JSON的具体实现。

请求转换JSON格式截图

返回转换JSON格式截图

3.Client使用JSON格式的方法描述
代码语言:javascript
复制
public HelloReply sayHello(HelloRequest request) {
 // @1 使用JSON格式的方法描述METHOD_SAY_HELLO
  return blockingUnaryCall(
      getChannel(), METHOD_SAY_HELLO, getCallOptions(), request);
}
4.Server使用JSON格式的方法描述
代码语言:javascript
复制
public ServerServiceDefinition bindService() {
  return ServerServiceDefinition
      .builder(GreeterGrpc.getServiceDescriptor().getName())
       // @1 使用JSON格式的方法描述METHOD_SAY_HELLO
      .addMethod(HelloJsonClient.HelloJsonStub.METHOD_SAY_HELLO,
          asyncUnaryCall(
              new UnaryMethod<HelloRequest, HelloReply>() {
                @Override
                publicvoidinvoke(
                    HelloRequest request, StreamObserver<HelloReply> responseObserver) {
                  sayHello(request, responseObserver);
                }
              }))
      .build();
}
三、手动流量控制

gRPC的流量控制基于HTTP/2的流量控制,即背压模式。关于gRPC和HTTP/2背压模式原理和关系,请看下面摘录。

代码语言:javascript
复制
At the bottom is the HTTP/2's byte-based flow control. HTTP/2 works on streams of bytes and is completely unaware of gRPC messages or reactive streams. By default, the stream consumer allocates a budget of 65536 bytes.
The stream producer can send up to this many bytes before backpressure engages. As the consumer reads bytes, WINDOW_UPDATE messages are sent to the producer to increase its send budget.

In the middle is the gRPC-Java message-based flow control. gRPC's flow control adapts the stream-based flow control of HTTP/2 to a message-based flow control model.
Importantly, gRPC's flow control is aware of how it interacts with HTTP/2 and the network.

On producing side, an on-ready handler reads a message, serializes it into bytes using protobuf, and then queues it up for transmission over the HTTP/2 byte stream.
If there is insuficient room in the HTTP/2 flow control window to transmit, backpressure engages an no more messages are requested from the producer until space becomes available.

On the consuming side, each time a consumer calls request(x), gRPC attempts to read and deserialize x messages from the HTTP/2 stream.
Since the size of a protobuf encoded message is variable, there is not a one-to-one correlation between pulling messages from gRPC and pulling bytes over HTTP/2.
1.Consuming Side
代码语言:javascript
复制
publicstaticvoidmain(String[] args) throws InterruptedException, IOException {
// @1 Server端服务实现
StreamingGreeterGrpc.StreamingGreeterImplBase svc = new StreamingGreeterGrpc.StreamingGreeterImplBase() {
  @Override
  public StreamObserver<HelloRequest> sayHelloStreaming(final StreamObserver<HelloReply> responseObserver) {
    final ServerCallStreamObserver<HelloReply> serverCallStreamObserver =
        (ServerCallStreamObserver<HelloReply>) responseObserver;
    // @2 禁止自动流控模式,开启手动流控
    serverCallStreamObserver.disableAutoInboundFlowControl();
    // @3 背压模式流控,当消费端有足够空间时将会回调OnReadyHandler
    // 默认空间大小为65536字节
     classOnReadyHandlerimplementsRunnable{
      private boolean wasReady = false;

      @Override
      publicvoidrun() {
        if (serverCallStreamObserver.isReady() && !wasReady) {
          wasReady = true;
          logger.info("READY");
          // @4 向HTTP/2流请求读取并解压(x)条消息
          // 即发信号通知发送端发送继续发消息
          serverCallStreamObserver.request(1);
        }
      }
    }
    final OnReadyHandler onReadyHandler = new OnReadyHandler();
    serverCallStreamObserver.setOnReadyHandler(onReadyHandler);
    // @5 处理具体进来的请求
    return new StreamObserver<HelloRequest>() {
      @Override
      publicvoidonNext(HelloRequest request) {
        try {
          String name = request.getName();
          logger.info("--> " + name);
          Thread.sleep(100);
          String message = "Hello " + name;
          logger.info("<-- " + message);
          HelloReply reply = HelloReply.newBuilder().setMessage(message).build();
          // @6 向Client发送请求
          responseObserver.onNext(reply);
          if (serverCallStreamObserver.isReady()) {
            // @7 向HTTP/2流请求读取并解压(x)条消息
            serverCallStreamObserver.request(1);
          } else {
            onReadyHandler.wasReady = false;
          }
        } catch (Throwable throwable) {
          //
        }
      }
      @Override
      publicvoidonError(Throwable t) {
        t.printStackTrace();
        responseObserver.onCompleted();
      }
      @Override
      publicvoidonCompleted() {
        logger.info("COMPLETED");
        responseObserver.onCompleted();
      }
    };
  }
};

final Server server = ServerBuilder
    .forPort(50051)
    .addService(svc)
    .build()
    .start();
2.Producing Side
代码语言:javascript
复制
publicstaticvoidmain(String[] args) throws InterruptedException {
final CountDownLatch done = new CountDownLatch(1);
ManagedChannel channel = ManagedChannelBuilder
    .forAddress("localhost", 50051)
    .usePlaintext()
    .build();
StreamingGreeterGrpc.StreamingGreeterStub stub = StreamingGreeterGrpc.newStub(channel);

ClientResponseObserver<HelloRequest, HelloReply> clientResponseObserver =
    new ClientResponseObserver<HelloRequest, HelloReply>() {
      ClientCallStreamObserver<HelloRequest> requestStream;
      @Override
      publicvoidbeforeStart(final ClientCallStreamObserver<HelloRequest> requestStream) {
        this.requestStream = requestStream;
        // @1设置手动流量控制
        requestStream.disableAutoInboundFlowControl();
        // @2 当Consumer端有足够空间时自动回调
        // 序列化protobuf先发送到缓存区(还未到Server端)
        // Server端需要调用request()向Client拉取消息
        requestStream.setOnReadyHandler(new Runnable() {
        Iterator<String> iterator = names().iterator();
          @Override
          publicvoidrun() {
            while (requestStream.isReady()) {
              if (iterator.hasNext()) {
                  String name = iterator.next();
                  logger.info("--> " + name);
                  HelloRequest request = HelloRequest.newBuilder().setName(name).build();
                  // @3 将消息发送到缓存区
                  requestStream.onNext(request);
              } else {
                  // @4 标记Client发送完成
                  requestStream.onCompleted();
              }
            }
          }
        });
      }

      @Override
      publicvoidonNext(HelloReply value) {
        // @5 接受Server端返回信息
        logger.info("<-- " + value.getMessage());
        // @6 通知Client继续发送
        requestStream.request(1);
      }

      @Override
      publicvoidonError(Throwable t) {
        t.printStackTrace();
        done.countDown();
      }

      @Override
      publicvoidonCompleted() {
        logger.info("All Done");
        done.countDown();
      }
    };
stream processing.
stub.sayHelloStreaming(clientResponseObserver);

done.await();

channel.shutdown();
channel.awaitTermination(1, TimeUnit.SECONDS);
}
本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2020-04-25,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 瓜农老梁 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 目录
  • 一、压缩
    • 1.Server端所有方法压缩
      • 2.Server单独方法压缩
        • 3.Client请求内容压缩
        • 二、使用JSON通信
          • 1.方法描述使用JSON编译
            • 2.JSON编译具体过程
              • 3.Client使用JSON格式的方法描述
                • 4.Server使用JSON格式的方法描述
                • 三、手动流量控制
                  • 1.Consuming Side
                    • 2.Producing Side
                    相关产品与服务
                    文件存储
                    文件存储(Cloud File Storage,CFS)为您提供安全可靠、可扩展的共享文件存储服务。文件存储可与腾讯云服务器、容器服务、批量计算等服务搭配使用,为多个计算节点提供容量和性能可弹性扩展的高性能共享存储。腾讯云文件存储的管理界面简单、易使用,可实现对现有应用的无缝集成;按实际用量付费,为您节约成本,简化 IT 运维工作。
                    领券
                    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档