前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >dubbox 增加google-gprc/protobuf支持

dubbox 增加google-gprc/protobuf支持

作者头像
菩提树下的杨过
发布2018-01-18 16:49:49
2.8K0
发布2018-01-18 16:49:49
举报

好久没写东西了,今年实在太忙,基本都在搞业务开发,晚上来补一篇,作为今年的收官博客。google-rpc 正式发布以来,受到了不少人的关注,这么知名的rpc框架,不集成到dubbox中有点说不过去。

但是grpc的思路与其它rpc(比如:avro/thrift)有些不一样,并非直接采用 "接口定义+服务实现"的套路,而是采用了"抽象类派生"的做法,见下面的示例:

代码语言:javascript
复制
 1 syntax = "proto3";
 2 
 3 option java_multiple_files = true;
 4 option java_package = "com.cnblogs.yjmyzz.demo.service.api.grpc";
 5 option java_outer_classname = "GrpcHelloServiceProto";
 6 
 7 package hello;
 8 
 9 service GrpcHelloService {
10     rpc ping (PingRequest) returns (PingResponse) {}
11 }
12 
13 message PingRequest{}
14 
15 message PingResponse {
16     string message = 1;
17 }

这是一段protobuf的定义文件,最终生成的java代码为:

代码语言:javascript
复制
  1 package com.cnblogs.yjmyzz.demo.service.api.grpc;
  2 
  3 import static io.grpc.stub.ClientCalls.asyncUnaryCall;
  4 import static io.grpc.stub.ClientCalls.asyncServerStreamingCall;
  5 import static io.grpc.stub.ClientCalls.asyncClientStreamingCall;
  6 import static io.grpc.stub.ClientCalls.asyncBidiStreamingCall;
  7 import static io.grpc.stub.ClientCalls.blockingUnaryCall;
  8 import static io.grpc.stub.ClientCalls.blockingServerStreamingCall;
  9 import static io.grpc.stub.ClientCalls.futureUnaryCall;
 10 import static io.grpc.MethodDescriptor.generateFullMethodName;
 11 import static io.grpc.stub.ServerCalls.asyncUnaryCall;
 12 import static io.grpc.stub.ServerCalls.asyncServerStreamingCall;
 13 import static io.grpc.stub.ServerCalls.asyncClientStreamingCall;
 14 import static io.grpc.stub.ServerCalls.asyncBidiStreamingCall;
 15 import static io.grpc.stub.ServerCalls.asyncUnimplementedUnaryCall;
 16 import static io.grpc.stub.ServerCalls.asyncUnimplementedStreamingCall;
 17 
 18 /**
 19  */
 20 @javax.annotation.Generated(
 21     value = "by gRPC proto compiler (version 1.0.1)",
 22     comments = "Source: hello.proto")
 23 public class GrpcHelloServiceGrpc {
 24 
 25   private GrpcHelloServiceGrpc() {}
 26 
 27   public static final String SERVICE_NAME = "hello.GrpcHelloService";
 28 
 29   // Static method descriptors that strictly reflect the proto.
 30   @io.grpc.ExperimentalApi("https://github.com/grpc/grpc-java/issues/1901")
 31   public static final io.grpc.MethodDescriptor<com.cnblogs.yjmyzz.demo.service.api.grpc.PingRequest,
 32       com.cnblogs.yjmyzz.demo.service.api.grpc.PingResponse> METHOD_PING =
 33       io.grpc.MethodDescriptor.create(
 34           io.grpc.MethodDescriptor.MethodType.UNARY,
 35           generateFullMethodName(
 36               "hello.GrpcHelloService", "ping"),
 37           io.grpc.protobuf.ProtoUtils.marshaller(com.cnblogs.yjmyzz.demo.service.api.grpc.PingRequest.getDefaultInstance()),
 38           io.grpc.protobuf.ProtoUtils.marshaller(com.cnblogs.yjmyzz.demo.service.api.grpc.PingResponse.getDefaultInstance()));
 39 
 40   /**
 41    * Creates a new async stub that supports all call types for the service
 42    */
 43   public static GrpcHelloServiceStub newStub(io.grpc.Channel channel) {
 44     return new GrpcHelloServiceStub(channel);
 45   }
 46 
 47   /**
 48    * Creates a new blocking-style stub that supports unary and streaming output calls on the service
 49    */
 50   public static GrpcHelloServiceBlockingStub newBlockingStub(
 51       io.grpc.Channel channel) {
 52     return new GrpcHelloServiceBlockingStub(channel);
 53   }
 54 
 55   /**
 56    * Creates a new ListenableFuture-style stub that supports unary and streaming output calls on the service
 57    */
 58   public static GrpcHelloServiceFutureStub newFutureStub(
 59       io.grpc.Channel channel) {
 60     return new GrpcHelloServiceFutureStub(channel);
 61   }
 62 
 63   /**
 64    */
 65   public static abstract class GrpcHelloServiceImplBase implements io.grpc.BindableService {
 66 
 67     /**
 68      */
 69     public void ping(com.cnblogs.yjmyzz.demo.service.api.grpc.PingRequest request,
 70         io.grpc.stub.StreamObserver<com.cnblogs.yjmyzz.demo.service.api.grpc.PingResponse> responseObserver) {
 71       asyncUnimplementedUnaryCall(METHOD_PING, responseObserver);
 72     }
 73 
 74     @java.lang.Override public io.grpc.ServerServiceDefinition bindService() {
 75       return io.grpc.ServerServiceDefinition.builder(getServiceDescriptor())
 76           .addMethod(
 77             METHOD_PING,
 78             asyncUnaryCall(
 79               new MethodHandlers<
 80                 com.cnblogs.yjmyzz.demo.service.api.grpc.PingRequest,
 81                 com.cnblogs.yjmyzz.demo.service.api.grpc.PingResponse>(
 82                   this, METHODID_PING)))
 83           .build();
 84     }
 85   }
 86 
 87   /**
 88    */
 89   public static final class GrpcHelloServiceStub extends io.grpc.stub.AbstractStub<GrpcHelloServiceStub> {
 90     private GrpcHelloServiceStub(io.grpc.Channel channel) {
 91       super(channel);
 92     }
 93 
 94     private GrpcHelloServiceStub(io.grpc.Channel channel,
 95         io.grpc.CallOptions callOptions) {
 96       super(channel, callOptions);
 97     }
 98 
 99     @java.lang.Override
100     protected GrpcHelloServiceStub build(io.grpc.Channel channel,
101         io.grpc.CallOptions callOptions) {
102       return new GrpcHelloServiceStub(channel, callOptions);
103     }
104 
105     /**
106      */
107     public void ping(com.cnblogs.yjmyzz.demo.service.api.grpc.PingRequest request,
108         io.grpc.stub.StreamObserver<com.cnblogs.yjmyzz.demo.service.api.grpc.PingResponse> responseObserver) {
109       asyncUnaryCall(
110           getChannel().newCall(METHOD_PING, getCallOptions()), request, responseObserver);
111     }
112   }
113 
114   /**
115    */
116   public static final class GrpcHelloServiceBlockingStub extends io.grpc.stub.AbstractStub<GrpcHelloServiceBlockingStub> {
117     private GrpcHelloServiceBlockingStub(io.grpc.Channel channel) {
118       super(channel);
119     }
120 
121     private GrpcHelloServiceBlockingStub(io.grpc.Channel channel,
122         io.grpc.CallOptions callOptions) {
123       super(channel, callOptions);
124     }
125 
126     @java.lang.Override
127     protected GrpcHelloServiceBlockingStub build(io.grpc.Channel channel,
128         io.grpc.CallOptions callOptions) {
129       return new GrpcHelloServiceBlockingStub(channel, callOptions);
130     }
131 
132     /**
133      */
134     public com.cnblogs.yjmyzz.demo.service.api.grpc.PingResponse ping(com.cnblogs.yjmyzz.demo.service.api.grpc.PingRequest request) {
135       return blockingUnaryCall(
136           getChannel(), METHOD_PING, getCallOptions(), request);
137     }
138   }
139 
140   /**
141    */
142   public static final class GrpcHelloServiceFutureStub extends io.grpc.stub.AbstractStub<GrpcHelloServiceFutureStub> {
143     private GrpcHelloServiceFutureStub(io.grpc.Channel channel) {
144       super(channel);
145     }
146 
147     private GrpcHelloServiceFutureStub(io.grpc.Channel channel,
148         io.grpc.CallOptions callOptions) {
149       super(channel, callOptions);
150     }
151 
152     @java.lang.Override
153     protected GrpcHelloServiceFutureStub build(io.grpc.Channel channel,
154         io.grpc.CallOptions callOptions) {
155       return new GrpcHelloServiceFutureStub(channel, callOptions);
156     }
157 
158     /**
159      */
160     public com.google.common.util.concurrent.ListenableFuture<com.cnblogs.yjmyzz.demo.service.api.grpc.PingResponse> ping(
161         com.cnblogs.yjmyzz.demo.service.api.grpc.PingRequest request) {
162       return futureUnaryCall(
163           getChannel().newCall(METHOD_PING, getCallOptions()), request);
164     }
165   }
166 
167   private static final int METHODID_PING = 0;
168 
169   private static class MethodHandlers<Req, Resp> implements
170       io.grpc.stub.ServerCalls.UnaryMethod<Req, Resp>,
171       io.grpc.stub.ServerCalls.ServerStreamingMethod<Req, Resp>,
172       io.grpc.stub.ServerCalls.ClientStreamingMethod<Req, Resp>,
173       io.grpc.stub.ServerCalls.BidiStreamingMethod<Req, Resp> {
174     private final GrpcHelloServiceImplBase serviceImpl;
175     private final int methodId;
176 
177     public MethodHandlers(GrpcHelloServiceImplBase serviceImpl, int methodId) {
178       this.serviceImpl = serviceImpl;
179       this.methodId = methodId;
180     }
181 
182     @java.lang.Override
183     @java.lang.SuppressWarnings("unchecked")
184     public void invoke(Req request, io.grpc.stub.StreamObserver<Resp> responseObserver) {
185       switch (methodId) {
186         case METHODID_PING:
187           serviceImpl.ping((com.cnblogs.yjmyzz.demo.service.api.grpc.PingRequest) request,
188               (io.grpc.stub.StreamObserver<com.cnblogs.yjmyzz.demo.service.api.grpc.PingResponse>) responseObserver);
189           break;
190         default:
191           throw new AssertionError();
192       }
193     }
194 
195     @java.lang.Override
196     @java.lang.SuppressWarnings("unchecked")
197     public io.grpc.stub.StreamObserver<Req> invoke(
198         io.grpc.stub.StreamObserver<Resp> responseObserver) {
199       switch (methodId) {
200         default:
201           throw new AssertionError();
202       }
203     }
204   }
205 
206   public static io.grpc.ServiceDescriptor getServiceDescriptor() {
207     return new io.grpc.ServiceDescriptor(SERVICE_NAME,
208         METHOD_PING);
209   }
210 
211 }

其中:

代码语言:javascript
复制
  public static abstract class GrpcHelloServiceImplBase implements io.grpc.BindableService {

    /**
     */
    public void ping(com.cnblogs.yjmyzz.demo.service.api.grpc.PingRequest request,
        io.grpc.stub.StreamObserver<com.cnblogs.yjmyzz.demo.service.api.grpc.PingResponse> responseObserver) {
      asyncUnimplementedUnaryCall(METHOD_PING, responseObserver);
    }

    @java.lang.Override public io.grpc.ServerServiceDefinition bindService() {
      return io.grpc.ServerServiceDefinition.builder(getServiceDescriptor())
          .addMethod(
            METHOD_PING,
            asyncUnaryCall(
              new MethodHandlers<
                com.cnblogs.yjmyzz.demo.service.api.grpc.PingRequest,
                com.cnblogs.yjmyzz.demo.service.api.grpc.PingResponse>(
                  this, METHODID_PING)))
          .build();
    }
  }

就是一个抽象类,而且调用时要借助stub来实现,而stub的生成,又要借助channel,所以在集成到dubbox中时,要花点心思。 

先定义一个辅助接口:

代码语言:javascript
复制
package com.alibaba.dubbo.rpc.protocol.grpc;

import io.grpc.BindableService;
import io.grpc.Channel;

/**
 * Created by yangjunming on 16/10/7.
 */
public interface GrpcBindableService extends BindableService {

    Channel getChannel();

    void setChannel(Channel channel);
}

这个接口的目的,是为了最终调用时,能拿到channel,进而生成stub.

然后在实现具体gprc服务时,实现这个接口:

代码语言:javascript
复制
package com.cnblogs.yjmyzz.demo.service.impl.grpc;

import com.alibaba.dubbo.rpc.protocol.grpc.GrpcBindableService;
import com.cnblogs.yjmyzz.demo.service.api.grpc.GrpcHelloServiceGrpc;
import com.cnblogs.yjmyzz.demo.service.api.grpc.PingRequest;
import com.cnblogs.yjmyzz.demo.service.api.grpc.PingResponse;
import io.grpc.Channel;
import io.grpc.stub.StreamObserver;
import org.springframework.stereotype.Service;

/**
 * Created by yangjunming on 2016/11/3.
 */
@Service("grpcService")
public class HelloServiceImpl extends GrpcHelloServiceGrpc.GrpcHelloServiceImplBase implements GrpcBindableService {

    private Channel channel;

    public Channel getChannel() {
        return channel;
    }

    public void setChannel(Channel channel) {
        this.channel = channel;
    }

    @Override
    public void ping(PingRequest request,
                     StreamObserver<PingResponse> responseObserver) {
        PingResponse reply = PingResponse.newBuilder().setMessage("grpc is running").build();
        responseObserver.onNext(reply);
        responseObserver.onCompleted();
    }
}

这样处理后,dubbox中添加grpc的协议就方便了:

代码语言:javascript
复制
package com.alibaba.dubbo.rpc.protocol.grpc;

import com.alibaba.dubbo.common.URL;
import com.alibaba.dubbo.common.logger.Logger;
import com.alibaba.dubbo.common.logger.LoggerFactory;
import com.alibaba.dubbo.rpc.RpcException;
import com.alibaba.dubbo.rpc.protocol.AbstractProxyProtocol;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.Server;
import io.grpc.ServerBuilder;

import java.io.IOException;

/**
 * 为dubbo-rpc添加"google-gRPC"支持
 * by 杨俊明(http://yjmyzz.cnblogs.com/)
 */
public class GrpcProtocol extends AbstractProxyProtocol {
    public static final int DEFAULT_PORT = 50051;
    private static final Logger logger = LoggerFactory.getLogger(GrpcProtocol.class);

    public int getDefaultPort() {
        return DEFAULT_PORT;
    }

    public GrpcProtocol() {
        super(IOException.class, RpcException.class);
    }

    @Override
    protected <T> Runnable doExport(T impl, Class<T> type, URL url)
            throws RpcException {

        logger.info("impl => " + impl.getClass());
        logger.info("type => " + type.getName());
        logger.info("url => " + url);

        try {
            String clsName = url.getParameter("class");
            Class<?> cls = Class.forName(clsName);
            GrpcBindableService service = (GrpcBindableService) cls.newInstance();
            final Server grpcServer = ServerBuilder.forPort(url.getPort())
                    .addService(service)
                    .build()
                    .start();
            logger.info("grpc server started !");
            return new Runnable() {
                public void run() {
                    try {
                        logger.info("Close gRPC Server");
                        grpcServer.shutdown();
                    } catch (Throwable e) {
                        logger.warn(e.getMessage(), e);
                    }
                }
            };
        } catch (Exception e) {
            logger.error(e.getMessage(), e);
            throw new RpcException(e.getMessage(), e);
        }
    }

    @Override
    protected <T> T doRefer(Class<T> type, URL url) throws RpcException {
        logger.info("type => " + type.getName());
        logger.info("url => " + url);
        final ManagedChannel channel = ManagedChannelBuilder.forAddress(url.getHost(), url.getPort())
                .usePlaintext(true)
                .build();
        try {
            DefaultBindableService service = new DefaultBindableService();
            service.setChannel(channel);
            return (T) service;
        } catch (Exception e) {
            logger.error(e.getMessage(), e);
            throw new RpcException(e.getMessage(), e);
        }
    }

}

在doExport暴露grpc服务时,通过类型转换成我们刚才定义的接口GrpcBindableService,解决了grpc服务的启动问题。

再来看如何引用这个服务,此为还要再定义一个辅助类:

代码语言:javascript
复制
package com.alibaba.dubbo.rpc.protocol.grpc;

import io.grpc.Channel;
import io.grpc.ServerServiceDefinition;

/**
 * Created by yangjunming on 16/10/7.
 */
public class DefaultBindableService implements GrpcBindableService {

    private Channel channel;

    @Override
    public Channel getChannel() {
        return channel;
    }

    @Override
    public void setChannel(Channel channel) {
        this.channel = channel;
    }

    @Override
    public ServerServiceDefinition bindService() {
        return null;
    }
}

这个类就是刚才定义的新接口GrpcBindableService的默认实现,目的是为了能将生成的channel通过setter方法保存下来。doRefer方法利用这个类,拿到了channel,最终给到grpc服务的调用方。

客户端调用示例:

代码语言:javascript
复制
    private static void testGrpc(ConfigurableApplicationContext ctx) throws InterruptedException {
        GrpcBindableService service = ctx.getBean(GrpcBindableService.class, "grpcService");
        AbstractStub stub = GrpcHelloServiceGrpc.newBlockingStub(service.getChannel());
        PingRequest request = PingRequest.newBuilder().build();
        logger.info("\n---------gprc协议测试开始---------");
        logger.info(stub.getClass().toString());
        PingResponse response = ((GrpcHelloServiceGrpc.GrpcHelloServiceBlockingStub) stub).ping(request);
        logger.info("\tping=>" + response.getMessage());
        ((ManagedChannel) stub.getChannel()).shutdown().awaitTermination(5, TimeUnit.SECONDS);
    }

完整示例代码请参考github上我重写的dubbo-demo示例 

最后:祝大家圣诞快乐!

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2016-12-25 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档