前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >gRPC服务端启动流程走查

gRPC服务端启动流程走查

作者头像
瓜农老梁
发布2019-11-28 16:28:24
1.8K0
发布2019-11-28 16:28:24
举报
文章被收录于专栏:瓜农老梁

1、服务端启动示例

代码语言:javascript
复制
server = ServerBuilder.forPort(port) // @1
.addService(new GreeterImpl()) // @2
.build() // @3
.start(); // @4

小结: @1 构建监听地址SocketAddress @2 将service注册到缓存 @3 Server构建 @4 服务端启动

小结:服务端启动只有一行代码,设计简洁。

2、构建监听地址SocketAddress
2.1 SPI加载NettyServerProvider

代码坐标:io.grpc.ServerProvider

代码语言:javascript
复制
ServerBuilder.forPort(port)

public static ServerBuilder<?> forPort(int port) {
    return ServerProvider.provider().builderForPort(port);
}

private static final ServerProvider provider = ServiceProviders.load(
  ServerProvider.class,
  Collections.<Class<?>>emptyList(),
  ServerProvider.class.getClassLoader(),
  new PriorityAccessor<ServerProvider>() {
    @Override
    public boolean isAvailable(ServerProvider provider) {
      return provider.isAvailable();
    }

    @Override
    public int getPriority(ServerProvider provider) {
      return provider.priority();
    }
  }); // @1

@1 通过SPI对实例化NettyServerProvider String PREFIX = "META-INF/services/"; String fullName = PREFIX + service.getName(); c = Class.forName(cn, false, loader); 具体目录为:grpc-netty工程 META-INF/services/io.grpc.ServerProvider配置文件提供的配置io.grpc.netty.NettyServerProvider

2.2 根据指定端口创建监听地址

代码坐标1:io.grpc.netty.NettyServerProvider 代码坐标2:io.grpc.netty.NettyServerBuilder

代码语言:javascript
复制
protected NettyServerBuilder builderForPort(int port) {
   return NettyServerBuilder.forPort(port);
}

private final List<SocketAddress> listenAddresses = new ArrayList<>();

private NettyServerBuilder(int port) {
    this.listenAddresses.add(new InetSocketAddress(port)); // @1
}

@1 添加指定端口的SocketAddress监听地址并存入List容器缓存

小结:在服务端启动时指定监听端口,SPI加载NettyServerProvider,构建SocketAddress监听地址并存入List容器缓存。

3、将service注册到缓存

代码坐标:io.grpc.internal.AbstractServerImplBuilder

代码语言:javascript
复制
.addService(new GreeterImpl()) // @1

public final T addService(BindableService bindableService) {
    ...
    return addService(checkNotNull(bindableService, "bindableService").bindService()); // @2
  }
  
 Builder addService(ServerServiceDefinition service) {
  services.put(service.getServiceDescriptor().getName(), service); // @3
  return this;
 }

@1 GreeterImpl自定义提供的服务实现类需继承插件生产的抽象类GreeterGrpc.GreeterImplBase同时实现了BindableService接口 @2 bindService()不需要自己实现,插件自动生成代码时会自动生成其实现类 @3 将服务注册到缓存services中(LinkedHashMap)

小结:将自定义的服务提供实现类注册到缓存中。

4、Server构建
代码语言:javascript
复制
.build()
public final Server build() {
ServerImpl server = new ServerImpl(
    this,
    buildTransportServers(getTracerFactories()), // @1
    Context.ROOT);
for (InternalNotifyOnServerBuild notifyTarget : notifyOnBuildList) {
  notifyTarget.notifyOnBuild(server);
}
return server;
}

protected List<NettyServer> buildTransportServers(){
    ...
   for (SocketAddress listenAddress : listenAddresses) {
     NettyServer transportServer = new NettyServer(...) // @2
   }
}

@1 创建NettyServer @2 为注册的每个SocketAddress创建NettyServer

4.1 NettyServer构造函数参数

代码语言:javascript
复制
NettyServer(
  SocketAddress address, ChannelFactory<? extends ServerChannel> channelFactory, // @1
  Map<ChannelOption<?>, ?> channelOptions, // @2 
  ObjectPool<? extends EventLoopGroup/**一组EventLoop**/> bossGroupPool, // @3
  ObjectPool<? extends EventLoopGroup> workerGroupPool, // @4 
  ProtocolNegotiator protocolNegotiator, // @5
  List<? extends ServerStreamTracer.Factory> streamTracerFactories, // @6
  TransportTracer.Factory transportTracerFactory, // @7
  int maxStreamsPerConnection, // @8
  int flowControlWindow, // @9
  int maxMessageSize, // @10
  int maxHeaderListSize, // @11
  long keepAliveTimeInNanos, // @12
  long keepAliveTimeoutInNanos, // @13
  long maxConnectionIdleInNanos, // @14
  long maxConnectionAgeInNanos, // @15
  long maxConnectionAgeGraceInNanos,// @16
  boolean permitKeepAliveWithoutCalls, // @17
  long permitKeepAliveTimeInNanos, // @18
  InternalChannelz channelz
)

@1 Channel工厂类创建新的通道 @2 ChannelOption设置项 @3 用于accept客户端链接的线程池转发给workerGroupPool @4 初始化客户端连接的线程池 @5 遵循HTTP/2规范的通信协商 @6 用于创建ServerStreamTracer @7 创建TransportTracer工厂类用于统计通信流量 @8 每个连接允许的最大Streams @9 HTTP/2流控窗口大小 @10 服务端允许的最大消息体大小该属性已废弃使用maxInboundMessageSize代替 @11 接受最大Header大小已废弃使用maxInboundMetadataSize代替 @12 存活时间单位纳秒在存活时间内发送下一次keepAlive ping @13 keepalive ping requests的超时时间 @14 连接最大空闲时间超过后将优雅关闭 @15 连接最大存活时间超过后将优雅关闭 @16 当达到最大连接时RPCs有优雅关闭时间,在优雅时间内RPC请求未完成将被取消 @17 是否允许在没有RPC调用的情况下客户端发送keep-alive HTTP/2 PINGs 默认false @18 允许客户端保持连接的最大时间 默认5分钟

5、服务端启动

代码坐标:ServerImpl.start

代码语言:javascript
复制
.start()

public ServerImpl start() throws IOException {
synchronized (lock) {
  checkState(!started, "Already started");// @1
  checkState(!shutdown, "Shutting down");// @2
  ServerListenerImpl listener = new ServerListenerImpl(); // @3
  for (InternalServer ts : transportServers) {
    ts.start(listener); // @4
    activeTransportServers++; // @5
  }
  executor = Preconditions.checkNotNull(executorPool.getObject(), "executor"); // @6
  started = true; // @7
  return this;
}

@1 检查服务是否启动 @2 检查服务是否正在关闭 @3 transport创建监听器 @4 server启动 @5 活动server统计 @6 从守护线程池中获取一个线程 @7 服务启动标记

代码坐标:NettyServer.start

代码语言:javascript
复制
public void start(ServerListener serverListener) {
    ServerBootstrap b = new ServerBootstrap();
    ...
     b.childHandler(new ChannelInitializer<Channel>() {
      @Override
      public void initChannel(Channel ch) {
        NettyServerTransport transport =
            new NettyServerTransport(...); // @1
      }
      transport.start(transportListener);
      ...
    });
}

代码坐标:NettyServerTransport.start

代码语言:javascript
复制
...
grpcHandler = createHandler(listener, channelUnused); // @2 
...

代码坐标:NettyServerHandler.newHandler

代码语言:javascript
复制
 static NettyServerHandler newHandler(...){ // @3
    final Http2Connection connection = new DefaultHttp2Connection(true);
    WeightedFairQueueByteDistributor dist = new WeightedFairQueueByteDistributor(connection);
    dist.allocationQuantum(16 * 1024); // Make benchmarks fast again.
    DefaultHttp2RemoteFlowController controller =
        new DefaultHttp2RemoteFlowController(connection, dist);
    connection.remote().flowController(controller);
    final KeepAliveEnforcer keepAliveEnforcer = new KeepAliveEnforcer(
        permitKeepAliveWithoutCalls, permitKeepAliveTimeInNanos, TimeUnit.NANOSECONDS);

    // Create the local flow controller configured to auto-refill the connection window.
    connection.local().flowController(
        new DefaultHttp2LocalFlowController(connection, DEFAULT_WINDOW_UPDATE_RATIO, true));
    frameWriter = new WriteMonitoringFrameWriter(frameWriter, keepAliveEnforcer);
    Http2ConnectionEncoder encoder = new DefaultHttp2ConnectionEncoder(connection, frameWriter);
    encoder = new Http2ControlFrameLimitEncoder(encoder, 10000);
    Http2ConnectionDecoder decoder = new DefaultHttp2ConnectionDecoder(connection, encoder,
        frameReader);

    Http2Settings settings = new Http2Settings();
    settings.initialWindowSize(flowControlWindow);
    settings.maxConcurrentStreams(maxStreams);
    settings.maxHeaderListSize(maxHeaderListSize);

    return new NettyServerHandler(...)
 }

@1 创建NettyServerTransport @2 基于Netty HTTP/2构建gRPC服务端 @3 具体的Netty HTTP/2实现,具体在分析HTTP/2时再回头分析

6、小结

从一行代码启动gRPC服务端开始,从注册地址、注册服务、Server构建、Server启动流程走查。gRPC基于Netty HTTP/2协议栈封装底层通信。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2019-11-26,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 瓜农老梁 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 2、构建监听地址SocketAddress
  • 2.1 SPI加载NettyServerProvider
  • 2.2 根据指定端口创建监听地址
  • 3、将service注册到缓存
  • 4、Server构建
  • 5、服务端启动
  • 6、小结
相关产品与服务
容器服务
腾讯云容器服务(Tencent Kubernetes Engine, TKE)基于原生 kubernetes 提供以容器为核心的、高度可扩展的高性能容器管理服务,覆盖 Serverless、边缘计算、分布式云等多种业务部署场景,业内首创单个集群兼容多种计算节点的容器资源管理模式。同时产品作为云原生 Finops 领先布道者,主导开源项目Crane,全面助力客户实现资源优化、成本控制。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档