本系列代码地址:https://github.com/JoJoTec/spring-cloud-parent
我们继续上一节针对我们的重试进行测试
通过系列前面的源码分析,我们知道 spring-cloud-openfeign 的 FeignClient 其实是懒加载的。所以我们实现的断路器也是懒加载的,需要先调用,之后才会初始化线程隔离。所以这里如果我们要模拟线程隔离满的异常,需要先手动读取载入线程隔离,之后才能获取对应实例的线程隔离,将线程池填充满。
我们先定义一个 FeignClient:
@FeignClient(name = "testService1", contextId = "testService1Client")
public interface TestService1Client {
@GetMapping("/anything")
HttpBinAnythingResponse anything();
}
使用前面同样的方式,给这个微服务添加实例:
//SpringExtension也包含了 Mockito 相关的 Extension,所以 @Mock 等注解也生效了
@ExtendWith(SpringExtension.class)
@SpringBootTest(properties = {
//关闭 eureka client
"eureka.client.enabled=false",
//默认请求重试次数为 3
"resilience4j.retry.configs.default.maxAttempts=3",
//增加断路器配置
"resilience4j.circuitbreaker.configs.default.failureRateThreshold=50",
"resilience4j.circuitbreaker.configs.default.slidingWindowType=COUNT_BASED",
"resilience4j.circuitbreaker.configs.default.slidingWindowSize=5",
"resilience4j.circuitbreaker.configs.default.minimumNumberOfCalls=2",
})
@Log4j2
public class OpenFeignClientTest {
@SpringBootApplication
@Configuration
public static class App {
@Bean
public DiscoveryClient discoveryClient() {
//模拟两个服务实例
ServiceInstance service1Instance1 = Mockito.spy(ServiceInstance.class);
ServiceInstance service1Instance3 = Mockito.spy(ServiceInstance.class);
Map<String, String> zone1 = Map.ofEntries(
Map.entry("zone", "zone1")
);
when(service1Instance1.getMetadata()).thenReturn(zone1);
when(service1Instance1.getInstanceId()).thenReturn("service1Instance1");
when(service1Instance1.getHost()).thenReturn("httpbin.org");
when(service1Instance1.getPort()).thenReturn(80);
when(service1Instance3.getMetadata()).thenReturn(zone1);
when(service1Instance3.getInstanceId()).thenReturn("service1Instance3");
//这其实就是 httpbin.org ,为了和第一个实例进行区分加上 www
when(service1Instance3.getHost()).thenReturn("www.httpbin.org");
DiscoveryClient spy = Mockito.spy(DiscoveryClient.class);
//微服务 testService3 有两个实例即 service1Instance1 和 service1Instance4
Mockito.when(spy.getInstances("testService1"))
.thenReturn(List.of(service1Instance1, service1Instance3));
return spy;
}
}
}
然后,编写测试代码:
@Test
public void testRetryOnBulkheadException() {
//防止断路器影响
circuitBreakerRegistry.getAllCircuitBreakers().asJava().forEach(CircuitBreaker::reset);
this.testService1Client.anything();
ThreadPoolBulkhead threadPoolBulkhead;
try {
threadPoolBulkhead = threadPoolBulkheadRegistry
.bulkhead("testService1Client:httpbin.org:80", "testService1Client");
} catch (ConfigurationNotFoundException e) {
//找不到就用默认配置
threadPoolBulkhead = threadPoolBulkheadRegistry
.bulkhead("testService1Client:httpbin.org:80");
}
//线程队列我们配置的是 1,线程池大小是 10,这样会将线程池填充满
for (int i = 0; i < 10 + 1; i++) {
threadPoolBulkhead.submit(() -> {
try {
//这样任务永远不会结束了
Thread.currentThread().join();
}
catch (InterruptedException e) {
e.printStackTrace();
}
});
}
//调用多次,调用成功即对断路器异常重试了
for (int i = 0; i < 10; i++) {
this.testService1Client.anything();
}
}
运行测试,日志中可以看出,针对线程池满的异常进行重试了:
2021-11-13 03:35:16.371 INFO [,,] 3824 --- [ main] c.g.j.s.c.w.f.DefaultErrorDecoder : TestService1Client#anything() response: 584-Bulkhead 'testService1Client:httpbin.org:80' is full and does not permit further calls, should retry: true
我们通过使用 http.bin 的 /status/{statusCode}
接口,这个接口会根据路径参数 statusCode
返回对应状态码的响应:
@FeignClient(name = "testService1", contextId = "testService1Client")
public interface TestService1Client {
@GetMapping("/status/500")
String testGetRetryStatus500();
}
我们如何感知被重试三次呢?每次调用,就会从负载均衡器获取一个服务实例。在负载均衡器代码中,我们使用了根据当前 sleuth 的上下文的 traceId 的缓存,每次调用,traceId 对应的 position 值就会加 1。我们可以通过观察这个值的变化获取到究竟本次请求调用了几次负载均衡器,也就是做了几次调用。
编写测试:
@Test
public void testNon2xxRetry() {
Span span = tracer.nextSpan();
try (Tracer.SpanInScope cleared = tracer.withSpanInScope(span)) {
//防止断路器影响
circuitBreakerRegistry.getAllCircuitBreakers().asJava().forEach(CircuitBreaker::reset);
long l = span.context().traceId();
RoundRobinWithRequestSeparatedPositionLoadBalancer loadBalancerClientFactoryInstance
= (RoundRobinWithRequestSeparatedPositionLoadBalancer) loadBalancerClientFactory.getInstance("testService1");
AtomicInteger atomicInteger = loadBalancerClientFactoryInstance.getPositionCache().get(l);
int start = atomicInteger.get();
try {
//get 方法会重试
testService1Client.testGetRetryStatus500();
} catch (Exception e) {
}
//因为每次调用都会失败,所以会重试配置的 3 次
Assertions.assertEquals(3, atomicInteger.get() - start);
}
}
我们通过使用 http.bin 的 /status/{statusCode}
接口,这个接口会根据路径参数 statusCode
返回对应状态码的响应,并且支持各种 HTTP 请求方式:
@FeignClient(name = "testService1", contextId = "testService1Client")
public interface TestService1Client {
@PostMapping("/status/500")
String testPostRetryStatus500();
}
默认情况下,我们只会对 GET 方法重试,对于其他 HTTP 请求方法,是不会重试的:
@Test
public void testNon2xxRetry() {
Span span = tracer.nextSpan();
try (Tracer.SpanInScope cleared = tracer.withSpanInScope(span)) {
//防止断路器影响
circuitBreakerRegistry.getAllCircuitBreakers().asJava().forEach(CircuitBreaker::reset);
long l = span.context().traceId();
RoundRobinWithRequestSeparatedPositionLoadBalancer loadBalancerClientFactoryInstance
= (RoundRobinWithRequestSeparatedPositionLoadBalancer) loadBalancerClientFactory.getInstance("testService1");
AtomicInteger atomicInteger = loadBalancerClientFactoryInstance.getPositionCache().get(l);
int start = atomicInteger.get();
try {
//post 方法不会重试
testService1Client.testPostRetryStatus500();
} catch (Exception e) {
}
//不会重试,因此只会被调用 1 次
Assertions.assertEquals(1, atomicInteger.get() - start);
}
}