本系列代码地址:https://github.com/JoJoTec/spring-cloud-parent
在前面两节,我们梳理了实现 Feign 断路器以及线程隔离的思路,并说明了如何优化目前的负载均衡算法。但是如何更新负载均衡的数据缓存,以及实现重试、断路器以及线程隔离的源码还没提,这一节我们会详细分析。
首先,从 spring.factories 引入,增加我们自定义 OpenFeign 配置的加载:
# AutoConfiguration
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
com.github.jojotech.spring.cloud.webmvc.auto.OpenFeignAutoConfiguration
自动配置类是 OpenFeignAutoConfiguration
,其内容是:
OpenFeignAutoConfiguration.java
//设置 `@Configuration(proxyBeanMethods=false)`,因为没有 @Bean 的方法互相调用需要每次返回同一个 Bean,没必要代理,关闭增加启动速度
@Configuration(proxyBeanMethods = false)
//加载配置,CommonOpenFeignConfiguration
@Import(CommonOpenFeignConfiguration.class)
//启用 OpenFeign 注解扫描和配置,默认配置为 DefaultOpenFeignConfiguration,其实就是 Feign 的 NamedContextFactory(即 FeignContext)的默认配置类是 DefaultOpenFeignConfiguration
@EnableFeignClients(value = "com.github.jojotech", defaultConfiguration = DefaultOpenFeignConfiguration.class)
public class OpenFeignAutoConfiguration {
}
为何要加这一层而不是直接使用 Import 的 CommonOpenFeignConfiguration
?使用 @AutoConfigurationBefore
和 @AutoConfigurationAfter
配置和其他 AutoConfiguration
加载的前后顺序。 @AutoConfigurationBefore
和 @AutoConfigurationAfter
是 spring-boot 的注解,只对于 spring.factories 加载的 AutoConfiguration 生效。所以在设计上要加上这一层,防止我们未来可能会用到这些注解。
CommonOpenFeignConfiguration
中包含所有 OpenFeign 的共用的一些 Bean,这些 Bean 是单例被所有 FeignClient 公用的,包括:
CommonOpenFeignConfiguration.java
@Configuration(proxyBeanMethods = false)
public class CommonOpenFeignConfiguration {
//创建 Apache HttpClient,自定义一些配置
@Bean
public HttpClient getHttpClient() {
// 长连接保持5分钟
PoolingHttpClientConnectionManager pollingConnectionManager = new PoolingHttpClientConnectionManager(5, TimeUnit.MINUTES);
// 总连接数
pollingConnectionManager.setMaxTotal(1000);
// 同路由的并发数
pollingConnectionManager.setDefaultMaxPerRoute(1000);
HttpClientBuilder httpClientBuilder = HttpClients.custom();
httpClientBuilder.setConnectionManager(pollingConnectionManager);
// 保持长连接配置,需要在头添加Keep-Alive
httpClientBuilder.setKeepAliveStrategy(new DefaultConnectionKeepAliveStrategy());
return httpClientBuilder.build();
}
//创建使用 HttpClient 实现的 OpenFeign 的 Client 接口的 Bean
@Bean
public ApacheHttpClient apacheHttpClient(HttpClient httpClient) {
return new ApacheHttpClient(httpClient);
}
//FeignBlockingLoadBalancerClient 的代理类,也是实现 OpenFeign 的 Client 接口的 Bean
@Bean
//使用 Primary 让 FeignBlockingLoadBalancerClientDelegate 成为所有 FeignClient 实际使用的 Bean
@Primary
public FeignBlockingLoadBalancerClientDelegate feignBlockingLoadBalancerCircuitBreakableClient(
ServiceInstanceMetrics serviceInstanceMetrics,
//我们上面创建的 ApacheHttpClient Bean
ApacheHttpClient apacheHttpClient,
//为何使用 ObjectProvider 请参考 FeignBlockingLoadBalancerClientDelegate 源码的注释
ObjectProvider<LoadBalancerClient> loadBalancerClientProvider,
//resilience4j 的线程隔离
ThreadPoolBulkheadRegistry threadPoolBulkheadRegistry,
//resilience4j 的断路器
CircuitBreakerRegistry circuitBreakerRegistry,
//Sleuth 的 Tracer,用于获取请求上下文
Tracer tracer,
//负载均衡属性
LoadBalancerProperties properties,
//为何使用这个不直接用 FeignBlockingLoadBalancerClient 请参考 FeignBlockingLoadBalancerClientDelegate 的注释
LoadBalancerClientFactory loadBalancerClientFactory
) {
return new FeignBlockingLoadBalancerClientDelegate(
//我们自己封装的核心 Client 实现,加入了断路器,线程隔离以及负载均衡数据采集
new Resilience4jFeignClient(
serviceInstanceMetrics, apacheHttpClient,
threadPoolBulkheadRegistry,
circuitBreakerRegistry,
tracer
),
loadBalancerClientProvider,
properties,
loadBalancerClientFactory
);
}
}
其中,Resilience4jFeignClient 粘合断路器,线程隔离的核心代码,同时也记录了负载均衡的实际调用数据
public class Resilience4jFeignClient implements Client {
private final ServiceInstanceMetrics serviceInstanceMetrics;
private final ThreadPoolBulkheadRegistry threadPoolBulkheadRegistry;
private final CircuitBreakerRegistry circuitBreakerRegistry;
private final Tracer tracer;
private ApacheHttpClient apacheHttpClient;
public Resilience4jFeignClient(
ServiceInstanceMetrics serviceInstanceMetrics, ApacheHttpClient apacheHttpClient,
ThreadPoolBulkheadRegistry threadPoolBulkheadRegistry,
CircuitBreakerRegistry circuitBreakerRegistry,
Tracer tracer
) {
this.serviceInstanceMetrics = serviceInstanceMetrics;
this.apacheHttpClient = apacheHttpClient;
this.threadPoolBulkheadRegistry = threadPoolBulkheadRegistry;
this.circuitBreakerRegistry = circuitBreakerRegistry;
this.tracer = tracer;
}
@Override
public Response execute(Request request, Request.Options options) throws IOException {
//获取定义 FeignClient 的接口的 FeignClient 注解
FeignClient annotation = request.requestTemplate().methodMetadata().method().getDeclaringClass().getAnnotation(FeignClient.class);
//和 Retry 保持一致,使用 contextId,而不是微服务名称
//contextId 会作为我们后面读取断路器以及线程隔离配置的 key
String contextId = annotation.contextId();
//获取实例唯一id
String serviceInstanceId = getServiceInstanceId(contextId, request);
//获取实例+方法唯一id
String serviceInstanceMethodId = getServiceInstanceMethodId(contextId, request);
ThreadPoolBulkhead threadPoolBulkhead;
CircuitBreaker circuitBreaker;
try {
//每个实例一个线程池
threadPoolBulkhead = threadPoolBulkheadRegistry.bulkhead(serviceInstanceId, contextId);
} catch (ConfigurationNotFoundException e) {
threadPoolBulkhead = threadPoolBulkheadRegistry.bulkhead(serviceInstanceId);
}
try {
//每个服务实例具体方法一个resilience4j熔断记录器,在服务实例具体方法维度做熔断,所有这个服务的实例具体方法共享这个服务的resilience4j熔断配置
circuitBreaker = circuitBreakerRegistry.circuitBreaker(serviceInstanceMethodId, contextId);
} catch (ConfigurationNotFoundException e) {
circuitBreaker = circuitBreakerRegistry.circuitBreaker(serviceInstanceMethodId);
}
//保持traceId
Span span = tracer.currentSpan();
ThreadPoolBulkhead finalThreadPoolBulkhead = threadPoolBulkhead;
CircuitBreaker finalCircuitBreaker = circuitBreaker;
Supplier<CompletionStage<Response>> completionStageSupplier = ThreadPoolBulkhead.decorateSupplier(threadPoolBulkhead,
OpenfeignUtil.decorateSupplier(circuitBreaker, () -> {
try (Tracer.SpanInScope cleared = tracer.withSpanInScope(span)) {
log.info("call url: {} -> {}, ThreadPoolStats({}): {}, CircuitBreakStats({}): {}",
request.httpMethod(),
request.url(),
serviceInstanceId,
JSON.toJSONString(finalThreadPoolBulkhead.getMetrics()),
serviceInstanceMethodId,
JSON.toJSONString(finalCircuitBreaker.getMetrics())
);
Response execute = apacheHttpClient.execute(request, options);
log.info("response: {} - {}", execute.status(), execute.reason());
return execute;
} catch (IOException e) {
throw new CompletionException(e);
}
})
);
ServiceInstance serviceInstance = getServiceInstance(request);
try {
serviceInstanceMetrics.recordServiceInstanceCall(serviceInstance);
Response response = Try.ofSupplier(completionStageSupplier).get().toCompletableFuture().join();
serviceInstanceMetrics.recordServiceInstanceCalled(serviceInstance, true);
return response;
} catch (BulkheadFullException e) {
//线程池限流异常
serviceInstanceMetrics.recordServiceInstanceCalled(serviceInstance, false);
return Response.builder()
.request(request)
.status(SpecialHttpStatus.BULKHEAD_FULL.getValue())
.reason(e.getLocalizedMessage())
.requestTemplate(request.requestTemplate()).build();
} catch (CompletionException e) {
serviceInstanceMetrics.recordServiceInstanceCalled(serviceInstance, false);
//内部抛出的所有异常都被封装了一层 CompletionException,所以这里需要取出里面的 Exception
Throwable cause = e.getCause();
//对于断路器打开,返回对应特殊的错误码
if (cause instanceof CallNotPermittedException) {
return Response.builder()
.request(request)
.status(SpecialHttpStatus.CIRCUIT_BREAKER_ON.getValue())
.reason(cause.getLocalizedMessage())
.requestTemplate(request.requestTemplate()).build();
}
//对于 IOException,需要判断是否请求已经发送出去了
//对于 connect time out 的异常,则可以重试,因为请求没发出去,但是例如 read time out 则不行,因为请求已经发出去了
if (cause instanceof IOException) {
boolean containsRead = cause.getMessage().toLowerCase().contains("read");
if (containsRead) {
log.info("{}-{} exception contains read, which indicates the request has been sent", e.getMessage(), cause.getMessage());
//如果是 read 异常,则代表请求已经发了出去,则不能重试(除非是 GET 请求或者有 RetryableMethod 注解,这个在 DefaultErrorDecoder 判断)
return Response.builder()
.request(request)
.status(SpecialHttpStatus.NOT_RETRYABLE_IO_EXCEPTION.getValue())
.reason(cause.getLocalizedMessage())
.requestTemplate(request.requestTemplate()).build();
} else {
return Response.builder()
.request(request)
.status(SpecialHttpStatus.RETRYABLE_IO_EXCEPTION.getValue())
.reason(cause.getLocalizedMessage())
.requestTemplate(request.requestTemplate()).build();
}
}
throw e;
}
}
private ServiceInstance getServiceInstance(Request request) throws MalformedURLException {
URL url = new URL(request.url());
DefaultServiceInstance defaultServiceInstance = new DefaultServiceInstance();
defaultServiceInstance.setHost(url.getHost());
defaultServiceInstance.setPort(url.getPort());
return defaultServiceInstance;
}
//获取微服务实例id,格式为:FeignClient 的 contextId:host:port,例如: test1Client:10.238.45.78:8251
private String getServiceInstanceId(String contextId, Request request) throws MalformedURLException {
//解析 URL
URL url = new URL(request.url());
//拼接微服务实例id
return contextId + ":" + url.getHost() + ":" + url.getPort();
}
//获取微服务实例方法id,格式为:FeignClient 的 contextId:host:port:methodName,例如:test1Client:10.238.45.78:8251:
private String getServiceInstanceMethodId(String contextId, Request request) throws MalformedURLException {
URL url = new URL(request.url());
//通过微服务名称 + 实例 + 方法的方式,获取唯一id
String methodName = request.requestTemplate().methodMetadata().method().toGenericString();
return contextId + ":" + url.getHost() + ":" + url.getPort() + ":" + methodName;
}
}
在上面,我们定义了几种特殊的 HTTP 返回码,主要目的是想将一些异常封装成响应返回,然后通过我们后面 Feign 错误解码器解码成统一的 RetryableException,这样在 resilience4j 的重试配置中,我们就不用配置很复杂的异常重试,仅针对 RetryableException 进行重试即可
我们想让 spring-cloud-openfeign 的核心负载均衡 Client, 在完成调用 LoadBalancer 选择实例并替换 url 之后,调用的 client 直接是 ApacheHttpClient 而是我们上面这个类,所以加入了 FeignBlockingLoadBalancerClientDelegate 封装:
/**
* 由于初始化 FeignBlockingLoadBalancerClient 需要 LoadBalancerClient
* 但是由于 Spring Cloud 2020 之后,Spring Cloud LoadBalancer BlockingClient 的加载,强制加入了顺序
* @see org.springframework.cloud.loadbalancer.config.BlockingLoadBalancerClientAutoConfiguration
* 这个自动配置加入了 @AutoConfigureAfter(LoadBalancerAutoConfiguration.class)
* 导致我们在初始化的 FeignClient 的时候,无法拿到 BlockingClient
* 所以,需要通过 ObjectProvider 封装 LoadBalancerClient,在真正调用 FeignClient 的时候通过 ObjectProvider 拿到 LoadBalancerClient 来创建 FeignBlockingLoadBalancerClient
*/
public class FeignBlockingLoadBalancerClientDelegate implements Client {
private FeignBlockingLoadBalancerClient feignBlockingLoadBalancerClient;
private final Client delegate;
private final ObjectProvider<LoadBalancerClient> loadBalancerClientObjectProvider;
private final LoadBalancerProperties properties;
private final LoadBalancerClientFactory loadBalancerClientFactory;
public FeignBlockingLoadBalancerClientDelegate(
Client delegate,
ObjectProvider<LoadBalancerClient> loadBalancerClientObjectProvider,
LoadBalancerProperties properties,
LoadBalancerClientFactory loadBalancerClientFactory
) {
this.delegate = delegate;
this.loadBalancerClientObjectProvider = loadBalancerClientObjectProvider;
this.properties = properties;
this.loadBalancerClientFactory = loadBalancerClientFactory;
}
@Override
public Response execute(Request request, Request.Options options) throws IOException {
if (feignBlockingLoadBalancerClient == null) {
synchronized (this) {
if (feignBlockingLoadBalancerClient == null) {
feignBlockingLoadBalancerClient = new FeignBlockingLoadBalancerClient(
this.delegate,
this.loadBalancerClientObjectProvider.getIfAvailable(),
this.properties,
this.loadBalancerClientFactory
);
}
}
}
return feignBlockingLoadBalancerClient.execute(request, options);
}
}
我们指定的 FeignClient 的 NamedContextFactory(即 FeignContext)的默认配置 DefaultOpenFeignConfiguration 中,主要粘合了重试逻辑,以及错误解码器:
@Configuration(proxyBeanMethods = false)
public class DefaultOpenFeignConfiguration {
@Bean
public ErrorDecoder errorDecoder() {
return new DefaultErrorDecoder();
}
@Bean
public Feign.Builder resilience4jFeignBuilder(
List<FeignDecoratorBuilderInterceptor> feignDecoratorBuilderInterceptors,
FeignDecorators.Builder builder
) {
feignDecoratorBuilderInterceptors.forEach(feignDecoratorBuilderInterceptor -> feignDecoratorBuilderInterceptor.intercept(builder));
return Resilience4jFeign.builder(builder.build());
}
@Bean
public FeignDecorators.Builder defaultBuilder(Environment environment, RetryRegistry retryRegistry) {
String name = environment.getProperty("feign.client.name");
Retry retry = null;
try {
retry = retryRegistry.retry(name, name);
} catch (ConfigurationNotFoundException e) {
retry = retryRegistry.retry(name);
}
//覆盖其中的异常判断,只针对 feign.RetryableException 进行重试,所有需要重试的异常我们都在 DefaultErrorDecoder 以及 Resilience4jFeignClient 中封装成了 RetryableException
retry = Retry.of(name, RetryConfig.from(retry.getRetryConfig()).retryOnException(throwable -> {
return throwable instanceof feign.RetryableException;
}).build());
return FeignDecorators.builder().withRetry(
retry
);
}
}
错误解码器即把上面可以重试的异常响应码,以及我们想重试的请求封装成 RetryableException,代码就不赘述了。这样我们就实现了自定义的实现重试、断路器以及线程隔离的 FeignClient。可以通过如下方式进行配置使用:
application.yml 配置:
################ feign配置 ################
feign:
hystrix:
enabled: false
client:
config:
default:
# 链接超时
connectTimeout: 500
# 读取超时
readTimeout: 8000
test1-client:
# 链接超时
connectTimeout: 500
# 读取超时
readTimeout: 60000
################ resilience配置 ################
resilience4j.circuitbreaker:
configs:
default:
registerHealthIndicator: true
slidingWindowSize: 10
minimumNumberOfCalls: 5
slidingWindowType: TIME_BASED
permittedNumberOfCallsInHalfOpenState: 3
automaticTransitionFromOpenToHalfOpenEnabled: true
waitDurationInOpenState: 2s
failureRateThreshold: 30
eventConsumerBufferSize: 10
recordExceptions:
- java.lang.Exception
resilience4j.retry:
configs:
default:
maxRetryAttempts: 2
test1-client:
maxRetryAttempts: 3
resilience4j.thread-pool-bulkhead:
configs:
default:
maxThreadPoolSize: 64
coreThreadPoolSize: 32
queueCapacity: 32
定义 Feignclient:
//这个会用到所有 key 为 test1-client 的配置,如果对应的配置中没有 test1-client,就用 default
@FeignClient(name = "service1", contextId = "test1-client")
public interface TestService1Client {
@GetMapping("/anything")
HttpBinAnythingResponse anything();
}
//这个会用到所有 key 为 test2-client 的配置,由于我们这里没有 test2-client 的单独配置,所以用的全是 default 配置
@FeignClient(name = "service1", contextId = "test2-client")
public interface TestService1Client2 {
@GetMapping("/anything")
HttpBinAnythingResponse anything();
}
下一节开始,我们会对这里实现的 FeignClient 封装进行单元测试,验证我们的正确性。