好久没写东西了,今年实在太忙,基本都在搞业务开发,晚上来补一篇,作为今年的收官博客。google-rpc 正式发布以来,受到了不少人的关注,这么知名的rpc框架,不集成到dubbox中有点说不过去。
但是grpc的思路与其它rpc(比如:avro/thrift)有些不一样,并非直接采用 "接口定义+服务实现"的套路,而是采用了"抽象类派生"的做法,见下面的示例:
1 syntax = "proto3";
2
3 option java_multiple_files = true;
4 option java_package = "com.cnblogs.yjmyzz.demo.service.api.grpc";
5 option java_outer_classname = "GrpcHelloServiceProto";
6
7 package hello;
8
9 service GrpcHelloService {
10 rpc ping (PingRequest) returns (PingResponse) {}
11 }
12
13 message PingRequest{}
14
15 message PingResponse {
16 string message = 1;
17 }
这是一段protobuf的定义文件,最终生成的java代码为:
1 package com.cnblogs.yjmyzz.demo.service.api.grpc;
2
3 import static io.grpc.stub.ClientCalls.asyncUnaryCall;
4 import static io.grpc.stub.ClientCalls.asyncServerStreamingCall;
5 import static io.grpc.stub.ClientCalls.asyncClientStreamingCall;
6 import static io.grpc.stub.ClientCalls.asyncBidiStreamingCall;
7 import static io.grpc.stub.ClientCalls.blockingUnaryCall;
8 import static io.grpc.stub.ClientCalls.blockingServerStreamingCall;
9 import static io.grpc.stub.ClientCalls.futureUnaryCall;
10 import static io.grpc.MethodDescriptor.generateFullMethodName;
11 import static io.grpc.stub.ServerCalls.asyncUnaryCall;
12 import static io.grpc.stub.ServerCalls.asyncServerStreamingCall;
13 import static io.grpc.stub.ServerCalls.asyncClientStreamingCall;
14 import static io.grpc.stub.ServerCalls.asyncBidiStreamingCall;
15 import static io.grpc.stub.ServerCalls.asyncUnimplementedUnaryCall;
16 import static io.grpc.stub.ServerCalls.asyncUnimplementedStreamingCall;
17
18 /**
19 */
20 @javax.annotation.Generated(
21 value = "by gRPC proto compiler (version 1.0.1)",
22 comments = "Source: hello.proto")
23 public class GrpcHelloServiceGrpc {
24
25 private GrpcHelloServiceGrpc() {}
26
27 public static final String SERVICE_NAME = "hello.GrpcHelloService";
28
29 // Static method descriptors that strictly reflect the proto.
30 @io.grpc.ExperimentalApi("https://github.com/grpc/grpc-java/issues/1901")
31 public static final io.grpc.MethodDescriptor<com.cnblogs.yjmyzz.demo.service.api.grpc.PingRequest,
32 com.cnblogs.yjmyzz.demo.service.api.grpc.PingResponse> METHOD_PING =
33 io.grpc.MethodDescriptor.create(
34 io.grpc.MethodDescriptor.MethodType.UNARY,
35 generateFullMethodName(
36 "hello.GrpcHelloService", "ping"),
37 io.grpc.protobuf.ProtoUtils.marshaller(com.cnblogs.yjmyzz.demo.service.api.grpc.PingRequest.getDefaultInstance()),
38 io.grpc.protobuf.ProtoUtils.marshaller(com.cnblogs.yjmyzz.demo.service.api.grpc.PingResponse.getDefaultInstance()));
39
40 /**
41 * Creates a new async stub that supports all call types for the service
42 */
43 public static GrpcHelloServiceStub newStub(io.grpc.Channel channel) {
44 return new GrpcHelloServiceStub(channel);
45 }
46
47 /**
48 * Creates a new blocking-style stub that supports unary and streaming output calls on the service
49 */
50 public static GrpcHelloServiceBlockingStub newBlockingStub(
51 io.grpc.Channel channel) {
52 return new GrpcHelloServiceBlockingStub(channel);
53 }
54
55 /**
56 * Creates a new ListenableFuture-style stub that supports unary and streaming output calls on the service
57 */
58 public static GrpcHelloServiceFutureStub newFutureStub(
59 io.grpc.Channel channel) {
60 return new GrpcHelloServiceFutureStub(channel);
61 }
62
63 /**
64 */
65 public static abstract class GrpcHelloServiceImplBase implements io.grpc.BindableService {
66
67 /**
68 */
69 public void ping(com.cnblogs.yjmyzz.demo.service.api.grpc.PingRequest request,
70 io.grpc.stub.StreamObserver<com.cnblogs.yjmyzz.demo.service.api.grpc.PingResponse> responseObserver) {
71 asyncUnimplementedUnaryCall(METHOD_PING, responseObserver);
72 }
73
74 @java.lang.Override public io.grpc.ServerServiceDefinition bindService() {
75 return io.grpc.ServerServiceDefinition.builder(getServiceDescriptor())
76 .addMethod(
77 METHOD_PING,
78 asyncUnaryCall(
79 new MethodHandlers<
80 com.cnblogs.yjmyzz.demo.service.api.grpc.PingRequest,
81 com.cnblogs.yjmyzz.demo.service.api.grpc.PingResponse>(
82 this, METHODID_PING)))
83 .build();
84 }
85 }
86
87 /**
88 */
89 public static final class GrpcHelloServiceStub extends io.grpc.stub.AbstractStub<GrpcHelloServiceStub> {
90 private GrpcHelloServiceStub(io.grpc.Channel channel) {
91 super(channel);
92 }
93
94 private GrpcHelloServiceStub(io.grpc.Channel channel,
95 io.grpc.CallOptions callOptions) {
96 super(channel, callOptions);
97 }
98
99 @java.lang.Override
100 protected GrpcHelloServiceStub build(io.grpc.Channel channel,
101 io.grpc.CallOptions callOptions) {
102 return new GrpcHelloServiceStub(channel, callOptions);
103 }
104
105 /**
106 */
107 public void ping(com.cnblogs.yjmyzz.demo.service.api.grpc.PingRequest request,
108 io.grpc.stub.StreamObserver<com.cnblogs.yjmyzz.demo.service.api.grpc.PingResponse> responseObserver) {
109 asyncUnaryCall(
110 getChannel().newCall(METHOD_PING, getCallOptions()), request, responseObserver);
111 }
112 }
113
114 /**
115 */
116 public static final class GrpcHelloServiceBlockingStub extends io.grpc.stub.AbstractStub<GrpcHelloServiceBlockingStub> {
117 private GrpcHelloServiceBlockingStub(io.grpc.Channel channel) {
118 super(channel);
119 }
120
121 private GrpcHelloServiceBlockingStub(io.grpc.Channel channel,
122 io.grpc.CallOptions callOptions) {
123 super(channel, callOptions);
124 }
125
126 @java.lang.Override
127 protected GrpcHelloServiceBlockingStub build(io.grpc.Channel channel,
128 io.grpc.CallOptions callOptions) {
129 return new GrpcHelloServiceBlockingStub(channel, callOptions);
130 }
131
132 /**
133 */
134 public com.cnblogs.yjmyzz.demo.service.api.grpc.PingResponse ping(com.cnblogs.yjmyzz.demo.service.api.grpc.PingRequest request) {
135 return blockingUnaryCall(
136 getChannel(), METHOD_PING, getCallOptions(), request);
137 }
138 }
139
140 /**
141 */
142 public static final class GrpcHelloServiceFutureStub extends io.grpc.stub.AbstractStub<GrpcHelloServiceFutureStub> {
143 private GrpcHelloServiceFutureStub(io.grpc.Channel channel) {
144 super(channel);
145 }
146
147 private GrpcHelloServiceFutureStub(io.grpc.Channel channel,
148 io.grpc.CallOptions callOptions) {
149 super(channel, callOptions);
150 }
151
152 @java.lang.Override
153 protected GrpcHelloServiceFutureStub build(io.grpc.Channel channel,
154 io.grpc.CallOptions callOptions) {
155 return new GrpcHelloServiceFutureStub(channel, callOptions);
156 }
157
158 /**
159 */
160 public com.google.common.util.concurrent.ListenableFuture<com.cnblogs.yjmyzz.demo.service.api.grpc.PingResponse> ping(
161 com.cnblogs.yjmyzz.demo.service.api.grpc.PingRequest request) {
162 return futureUnaryCall(
163 getChannel().newCall(METHOD_PING, getCallOptions()), request);
164 }
165 }
166
167 private static final int METHODID_PING = 0;
168
169 private static class MethodHandlers<Req, Resp> implements
170 io.grpc.stub.ServerCalls.UnaryMethod<Req, Resp>,
171 io.grpc.stub.ServerCalls.ServerStreamingMethod<Req, Resp>,
172 io.grpc.stub.ServerCalls.ClientStreamingMethod<Req, Resp>,
173 io.grpc.stub.ServerCalls.BidiStreamingMethod<Req, Resp> {
174 private final GrpcHelloServiceImplBase serviceImpl;
175 private final int methodId;
176
177 public MethodHandlers(GrpcHelloServiceImplBase serviceImpl, int methodId) {
178 this.serviceImpl = serviceImpl;
179 this.methodId = methodId;
180 }
181
182 @java.lang.Override
183 @java.lang.SuppressWarnings("unchecked")
184 public void invoke(Req request, io.grpc.stub.StreamObserver<Resp> responseObserver) {
185 switch (methodId) {
186 case METHODID_PING:
187 serviceImpl.ping((com.cnblogs.yjmyzz.demo.service.api.grpc.PingRequest) request,
188 (io.grpc.stub.StreamObserver<com.cnblogs.yjmyzz.demo.service.api.grpc.PingResponse>) responseObserver);
189 break;
190 default:
191 throw new AssertionError();
192 }
193 }
194
195 @java.lang.Override
196 @java.lang.SuppressWarnings("unchecked")
197 public io.grpc.stub.StreamObserver<Req> invoke(
198 io.grpc.stub.StreamObserver<Resp> responseObserver) {
199 switch (methodId) {
200 default:
201 throw new AssertionError();
202 }
203 }
204 }
205
206 public static io.grpc.ServiceDescriptor getServiceDescriptor() {
207 return new io.grpc.ServiceDescriptor(SERVICE_NAME,
208 METHOD_PING);
209 }
210
211 }
其中:
public static abstract class GrpcHelloServiceImplBase implements io.grpc.BindableService {
/**
*/
public void ping(com.cnblogs.yjmyzz.demo.service.api.grpc.PingRequest request,
io.grpc.stub.StreamObserver<com.cnblogs.yjmyzz.demo.service.api.grpc.PingResponse> responseObserver) {
asyncUnimplementedUnaryCall(METHOD_PING, responseObserver);
}
@java.lang.Override public io.grpc.ServerServiceDefinition bindService() {
return io.grpc.ServerServiceDefinition.builder(getServiceDescriptor())
.addMethod(
METHOD_PING,
asyncUnaryCall(
new MethodHandlers<
com.cnblogs.yjmyzz.demo.service.api.grpc.PingRequest,
com.cnblogs.yjmyzz.demo.service.api.grpc.PingResponse>(
this, METHODID_PING)))
.build();
}
}
就是一个抽象类,而且调用时要借助stub来实现,而stub的生成,又要借助channel,所以在集成到dubbox中时,要花点心思。
先定义一个辅助接口:
package com.alibaba.dubbo.rpc.protocol.grpc;
import io.grpc.BindableService;
import io.grpc.Channel;
/**
* Created by yangjunming on 16/10/7.
*/
public interface GrpcBindableService extends BindableService {
Channel getChannel();
void setChannel(Channel channel);
}
这个接口的目的,是为了最终调用时,能拿到channel,进而生成stub.
然后在实现具体gprc服务时,实现这个接口:
package com.cnblogs.yjmyzz.demo.service.impl.grpc;
import com.alibaba.dubbo.rpc.protocol.grpc.GrpcBindableService;
import com.cnblogs.yjmyzz.demo.service.api.grpc.GrpcHelloServiceGrpc;
import com.cnblogs.yjmyzz.demo.service.api.grpc.PingRequest;
import com.cnblogs.yjmyzz.demo.service.api.grpc.PingResponse;
import io.grpc.Channel;
import io.grpc.stub.StreamObserver;
import org.springframework.stereotype.Service;
/**
* Created by yangjunming on 2016/11/3.
*/
@Service("grpcService")
public class HelloServiceImpl extends GrpcHelloServiceGrpc.GrpcHelloServiceImplBase implements GrpcBindableService {
private Channel channel;
public Channel getChannel() {
return channel;
}
public void setChannel(Channel channel) {
this.channel = channel;
}
@Override
public void ping(PingRequest request,
StreamObserver<PingResponse> responseObserver) {
PingResponse reply = PingResponse.newBuilder().setMessage("grpc is running").build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
}
}
这样处理后,dubbox中添加grpc的协议就方便了:
package com.alibaba.dubbo.rpc.protocol.grpc;
import com.alibaba.dubbo.common.URL;
import com.alibaba.dubbo.common.logger.Logger;
import com.alibaba.dubbo.common.logger.LoggerFactory;
import com.alibaba.dubbo.rpc.RpcException;
import com.alibaba.dubbo.rpc.protocol.AbstractProxyProtocol;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import java.io.IOException;
/**
* 为dubbo-rpc添加"google-gRPC"支持
* by 杨俊明(http://yjmyzz.cnblogs.com/)
*/
public class GrpcProtocol extends AbstractProxyProtocol {
public static final int DEFAULT_PORT = 50051;
private static final Logger logger = LoggerFactory.getLogger(GrpcProtocol.class);
public int getDefaultPort() {
return DEFAULT_PORT;
}
public GrpcProtocol() {
super(IOException.class, RpcException.class);
}
@Override
protected <T> Runnable doExport(T impl, Class<T> type, URL url)
throws RpcException {
logger.info("impl => " + impl.getClass());
logger.info("type => " + type.getName());
logger.info("url => " + url);
try {
String clsName = url.getParameter("class");
Class<?> cls = Class.forName(clsName);
GrpcBindableService service = (GrpcBindableService) cls.newInstance();
final Server grpcServer = ServerBuilder.forPort(url.getPort())
.addService(service)
.build()
.start();
logger.info("grpc server started !");
return new Runnable() {
public void run() {
try {
logger.info("Close gRPC Server");
grpcServer.shutdown();
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
}
}
};
} catch (Exception e) {
logger.error(e.getMessage(), e);
throw new RpcException(e.getMessage(), e);
}
}
@Override
protected <T> T doRefer(Class<T> type, URL url) throws RpcException {
logger.info("type => " + type.getName());
logger.info("url => " + url);
final ManagedChannel channel = ManagedChannelBuilder.forAddress(url.getHost(), url.getPort())
.usePlaintext(true)
.build();
try {
DefaultBindableService service = new DefaultBindableService();
service.setChannel(channel);
return (T) service;
} catch (Exception e) {
logger.error(e.getMessage(), e);
throw new RpcException(e.getMessage(), e);
}
}
}
在doExport暴露grpc服务时,通过类型转换成我们刚才定义的接口GrpcBindableService,解决了grpc服务的启动问题。
再来看如何引用这个服务,此为还要再定义一个辅助类:
package com.alibaba.dubbo.rpc.protocol.grpc;
import io.grpc.Channel;
import io.grpc.ServerServiceDefinition;
/**
* Created by yangjunming on 16/10/7.
*/
public class DefaultBindableService implements GrpcBindableService {
private Channel channel;
@Override
public Channel getChannel() {
return channel;
}
@Override
public void setChannel(Channel channel) {
this.channel = channel;
}
@Override
public ServerServiceDefinition bindService() {
return null;
}
}
这个类就是刚才定义的新接口GrpcBindableService的默认实现,目的是为了能将生成的channel通过setter方法保存下来。doRefer方法利用这个类,拿到了channel,最终给到grpc服务的调用方。
客户端调用示例:
private static void testGrpc(ConfigurableApplicationContext ctx) throws InterruptedException {
GrpcBindableService service = ctx.getBean(GrpcBindableService.class, "grpcService");
AbstractStub stub = GrpcHelloServiceGrpc.newBlockingStub(service.getChannel());
PingRequest request = PingRequest.newBuilder().build();
logger.info("\n---------gprc协议测试开始---------");
logger.info(stub.getClass().toString());
PingResponse response = ((GrpcHelloServiceGrpc.GrpcHelloServiceBlockingStub) stub).ping(request);
logger.info("\tping=>" + response.getMessage());
((ManagedChannel) stub.getChannel()).shutdown().awaitTermination(5, TimeUnit.SECONDS);
}
完整示例代码请参考github上我重写的dubbo-demo示例
最后:祝大家圣诞快乐!