什么是拦截器?
拦截器是一种横切维度的功能延展。
具象说明一下,高速收费站就是一种拦截器。它可以做什么?收费,查证,交通控制等等,面向所有穿行过往的车辆。
gRPC 拦截器主要分为两种:客户端拦截器(ClientInterceptor),服务端拦截器(ServerInterceptor),顾名思义,分别于请求的两端执行相应的前拦截处理。
请求被分发出去之前。
a)、请求日志记录及监控
b)、添加请求头数据、以便代理转发使用
c)、请求或者结果重写
通常,如果要提供认证信息的话,可以使用 CallCredentials 实现,虽然,拦截器里也可以通过设置 CallOptions 来提供。
@ThreadSafe
public interface ClientInterceptor {
<ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next);
}
它只有一个方法:interceptCall,对于注册了相应拦截器的客户端调用,都要经过这个方法,
参数:
1、method:MethodDescriptor 类型,标示请求方法。包括方法全限定名称、请求服务名称、请求、结果、序列化工具、幂等等。
2、callOptions:此次请求的附带信息。
3、next:执行此次 RPC 请求的抽象链接管道(Channel)
返回结果:
ClientCall,包含请求及结果信息,并且不为null。
什么也不做:
public class MyGrpcClientInterceptor implements ClientInterceptor {
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
return next.newCall(method, callOptions);
}
}
可以看到我们的实现里,没有实现任何逻辑,直接执行了 next.newCall 继续执行客户端的此次调用。
next.newCall 只能在当前上下文中执行,每次调用以及返回都必须是一个完整地回路,逃逸使用会导致不必要的内存泄漏问题。
通过 callOption 设置超时及认证信息:
public class MyGrpcClientInterceptor implements ClientInterceptor {
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
return next.newCall(method, callOptions
.withDeadlineAfter(500, TimeUnit.MILLISECONDS) //设置超时
.withCallCredentials(new CallCredentials() { //设置认证信息
@Override
public void applyRequestMetadata(RequestInfo requestInfo, Executor appExecutor, MetadataApplier applier) {
Metadata metadata = new Metadata();
metadata.put(Metadata.Key.of("id", Metadata.ASCII_STRING_MARSHALLER), "king");
applier.apply(metadata);
}
@Override
public void thisUsesUnstableApi() {}
}));
}
}
看着是不是很熟悉,stub 调用时设置,只不过在这里换了一个设置场景。
日志记录:
public class MyGrpcClientInterceptor implements ClientInterceptor {
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
CallOptions myCallOptions = callOptions
.withDeadlineAfter(500, TimeUnit.MILLISECONDS) //设置超时
.withCallCredentials(new CallCredentials() { //设置认证信息
@Override
public void applyRequestMetadata(RequestInfo requestInfo, Executor appExecutor, MetadataApplier applier) {
Metadata metadata = new Metadata();
metadata.put(Metadata.Key.of("id", Metadata.ASCII_STRING_MARSHALLER), "king");
applier.apply(metadata);
}
@Override
public void thisUsesUnstableApi() {}
});
return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(next.newCall(method, myCallOptions)) {
@Override
public void sendMessage(ReqT message) {
System.out.println("request method: " + method.getFullMethodName());
System.out.println("request param:" + message.toString());
super.sendMessage(message);
}
};
}
}
ForwardingClientCall:ClientCall 的一个抽象实现类,用以请求的代理转发。为什么我们这里要用这个类呢?
其实我们完全可以直接使用 ClientCall 实现,只不过作为顶级抽闲类,我们必须要实现很多方法。而使用 ForwardingClientCall,则我们只需要去重写我们需要的方法就可以。
如上代码:
sendMessage 发送消息到请求服务器,可能会执行多次。此处我们记录相应的请求参数信息。
请求被具体的Handler相应前。
a)访问认证
b)请求日志记录及监控
c)代理转发
@ThreadSafe
public interface ServerInterceptor {
<ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
ServerCall<ReqT, RespT> call,
Metadata headers,
ServerCallHandler<ReqT, RespT> next);
}
参数:
call:ServerCall 对象,包含客户端请求的 MethodDescriptor
headers:请求头信息
next:处理链条上的下一个处理。
public class MyGrpcServerInterceptor implements ServerInterceptor {
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call, Metadata headers, ServerCallHandler<ReqT, RespT> next) {
return next.startCall(call, headers);
}
}
ServerCallHandler:定义用以实现请求处理的接口类。
public class MyGrpcServerInterceptor implements ServerInterceptor {
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call, Metadata headers, ServerCallHandler<ReqT, RespT> next) {
//提取认证信息
String id = headers.get(Metadata.Key.of("id", Metadata.ASCII_STRING_MARSHALLER));
return new ForwardingServerCallListener.SimpleForwardingServerCallListener<ReqT>(next.startCall(call, headers)) {
private long startTime = 0; //处理开始时间
private ReqT request;
private boolean valid = false; //认证状态
@Override
public void onComplete() {
//记录请求参数及耗时
System.out.println("process cost: " + (System.nanoTime() - startTime));
System.out.println("process param: " + request.toString());
super.onComplete();
}
@Override
public void onMessage(ReqT message) {
startTime = System.nanoTime();
request = message;
if (StringUtils.equals("king", id)) {
super.onMessage(message);
} else {
valid = false;
}
}
@Override
public void onHalfClose() {
//验证失败则返回 Status.UNAUTHENTICATED
if (!valid) {
call.close(Status.UNAUTHENTICATED.withDescription("auth failed"), new Metadata());
} else {
super.onHalfClose();
}
}
};
}
}
onMessage:接收到请求时进行相应处理,我们这记录处理开始时间,及请求参数,同时根据提取的认证信息进行访问验证,验证通过则继续后续处理,否则设置认证状态为 false。
onHalfClose:处理认证标示及返回。
onComplete:处理结束记录请求参数及耗时。