本系列代码地址:https://github.com/JoJoTec/spring-cloud-parent
在前面一节,我们梳理了实现 Feign 断路器以及线程隔离的思路,这一节,我们先不看如何源码实现(因为源码中会包含负载均衡算法的改进部分),先来讨论下如何优化目前的负载均衡算法。
其中请求包含 traceId 是来自于我们使用了 spring-cloud-sleuth 链路追踪,基于这种机制我们能保证请求不会重试到之前已经调用过的实例。源码是:
//一定必须是实现ReactorServiceInstanceLoadBalancer
//而不是ReactorLoadBalancer<ServiceInstance>
//因为注册的时候是ReactorServiceInstanceLoadBalancer
@Log4j2
public class RoundRobinWithRequestSeparatedPositionLoadBalancer implements ReactorServiceInstanceLoadBalancer {
private final ServiceInstanceListSupplier serviceInstanceListSupplier;
//每次请求算上重试不会超过1分钟
//对于超过1分钟的,这种请求肯定比较重,不应该重试
private final LoadingCache<Long, AtomicInteger> positionCache = Caffeine.newBuilder().expireAfterWrite(1, TimeUnit.MINUTES)
//随机初始值,防止每次都是从第一个开始调用
.build(k -> new AtomicInteger(ThreadLocalRandom.current().nextInt(0, 1000)));
private final String serviceId;
private final Tracer tracer;
public RoundRobinWithRequestSeparatedPositionLoadBalancer(ServiceInstanceListSupplier serviceInstanceListSupplier, String serviceId, Tracer tracer) {
this.serviceInstanceListSupplier = serviceInstanceListSupplier;
this.serviceId = serviceId;
this.tracer = tracer;
}
//每次重试,其实都会调用这个 choose 方法重新获取一个实例
@Override
public Mono<Response<ServiceInstance>> choose(Request request) {
return serviceInstanceListSupplier.get().next().map(serviceInstances -> getInstanceResponse(serviceInstances));
}
private Response<ServiceInstance> getInstanceResponse(List<ServiceInstance> serviceInstances) {
if (serviceInstances.isEmpty()) {
log.warn("No servers available for service: " + this.serviceId);
return new EmptyResponse();
}
return getInstanceResponseByRoundRobin(serviceInstances);
}
private Response<ServiceInstance> getInstanceResponseByRoundRobin(List<ServiceInstance> serviceInstances) {
if (serviceInstances.isEmpty()) {
log.warn("No servers available for service: " + this.serviceId);
return new EmptyResponse();
}
//为了解决原始算法不同调用并发可能导致一个请求重试相同的实例
//从 sleuth 的 Tracer 中获取当前请求的上下文
Span currentSpan = tracer.currentSpan();
//如果上下文不存在,则可能不是前端用户请求,而是其他某些机制触发,我们就创建一个新的上下文
if (currentSpan == null) {
currentSpan = tracer.newTrace();
}
//从请求上下文中获取请求的 traceId,用来唯一标识一个请求
long l = currentSpan.context().traceId();
AtomicInteger seed = positionCache.get(l);
int s = seed.getAndIncrement();
int pos = s % serviceInstances.size();
log.info("position {}, seed: {}, instances count: {}", pos, s, serviceInstances.size());
return new DefaultResponse(serviceInstances.stream()
//实例返回列表顺序可能不同,为了保持一致,先排序再取
.sorted(Comparator.comparing(ServiceInstance::getInstanceId))
.collect(Collectors.toList()).get(pos));
}
}
但是在这次请求突增很多的时候,这种负载均衡算法还是给我们带来了问题。
首先,本次突增,我们并没有采取扩容,导致本次的性能压力对于压力的均衡分布非常敏感。举个例子是,假设微服务 A 有 9 个实例,在业务高峰点来的时候,最理想的情况是保证无论何时这 9 个负载压力都完全均衡,但是由于我们使用了初始值为随机数的原子变量 position,虽然从一天的总量上来看,负责均衡压力肯定是均衡,但是在某一小段时间内,很可能压力全都跑到了某几个实例上,导致这几个实例被压垮,熔断,然后又都跑到了另外的几个实例上,又被压垮,熔断,如此恶性循环。
然后,我们部署采用的是 k8s 部署,同一个虚拟机上面可能会跑很多微服务的 pod。在某些情况下,同一个微服务的多个 pod 可能会跑到同一个虚拟机 Node 上,这个可以从pod 的 ip 网段上看出来:例如某个微服务有如下 7 个实例:10.238.13.12:8181,10.238.13.24:8181,10.238.15.12:8181,10.238.17.12:8181,10.238.20.220:8181,10.238.21.31:8181,10.238.21.121:8181,那么 10.238.13.12:8181 与 10.238.13.24:8181 很可能在同一个 Node 上,10.238.21.31:8181 和 10.238.21.121:8181 很可能在同一个 Node 上。我们重试,需要优先重试与之前重试过的实例尽量不在同一个 Node 上的实例,因为同一个 Node 上的实例只要有一个有问题或者压力过大,其他的基本上也有问题或者压力过大。
最后,如果调用某个实例一直失败,那么这个实例的调用优先级需要排在其他正常的实例后面。这个对于减少快速刷新发布(一下子启动很多实例之后停掉多个老实例,实例个数大于重试次数配置)对于用户的影响,以及某个可用区突然发生异常导致多个实例下线对用户的影响,以及业务压力已经过去,压力变小后,需要关掉不再需要的实例,导致大量实例发生迁移的时候对用户的影响,有很大的作用。
我们针对上面三个问题,提出了一种优化后的解决方案:
具体实现是:以下的代码来自于:https://github.com/JoJoTec/spring-cloud-parent
我们使用了依赖:
<dependency>
<groupId>io.dropwizard.metrics</groupId>
<artifactId>metrics-core</artifactId>
</dependency>
记录实例数据的缓存类:
@Log4j2
public class ServiceInstanceMetrics {
private static final String CALLING = "-Calling";
private static final String FAILED = "-Failed";
private MetricRegistry metricRegistry;
ServiceInstanceMetrics() {
}
public ServiceInstanceMetrics(MetricRegistry metricRegistry) {
this.metricRegistry = metricRegistry;
}
/**
* 记录调用实例
* @param serviceInstance
*/
public void recordServiceInstanceCall(ServiceInstance serviceInstance) {
String key = serviceInstance.getHost() + ":" + serviceInstance.getPort();
metricRegistry.counter(key + CALLING).inc();
}
/**
* 记录调用实例结束
* @param serviceInstance
* @param isSuccess 是否成功
*/
public void recordServiceInstanceCalled(ServiceInstance serviceInstance, boolean isSuccess) {
String key = serviceInstance.getHost() + ":" + serviceInstance.getPort();
metricRegistry.counter(key + CALLING).dec();
if (!isSuccess) {
//不成功则记录失败
metricRegistry.meter(key + FAILED).mark();
}
}
/**
* 获取正在运行的调用次数
* @param serviceInstance
* @return
*/
public long getCalling(ServiceInstance serviceInstance) {
String key = serviceInstance.getHost() + ":" + serviceInstance.getPort();
long count = metricRegistry.counter(key + CALLING).getCount();
log.debug("ServiceInstanceMetrics-getCalling: {} -> {}", key, count);
return count;
}
/**
* 获取最近一分钟调用失败次数分钟速率,其实是滑动平均数
* @param serviceInstance
* @return
*/
public double getFailedInRecentOneMin(ServiceInstance serviceInstance) {
String key = serviceInstance.getHost() + ":" + serviceInstance.getPort();
double rate = metricRegistry.meter(key + FAILED).getOneMinuteRate();
log.debug("ServiceInstanceMetrics-getFailedInRecentOneMin: {} -> {}", key, rate);
return rate;
}
}
负载均衡核心代码:
private final LoadingCache<Long, Set<String>> calledIpPrefixes = Caffeine.newBuilder()
.expireAfterAccess(3, TimeUnit.MINUTES)
.build(k -> Sets.newConcurrentHashSet());
private final String serviceId;
private final Tracer tracer;
private final ServiceInstanceMetrics serviceInstanceMetrics;
//每次重试,其实都会调用这个 choose 方法重新获取一个实例
@Override
public Mono<Response<ServiceInstance>> choose(Request request) {
Span span = tracer.currentSpan();
return serviceInstanceListSupplier.get().next()
.map(serviceInstances -> {
//保持 span 和调用 choose 的 span 一样
try (Tracer.SpanInScope cleared = tracer.withSpanInScope(span)) {
return getInstanceResponse(serviceInstances);
}
});
}
private Response<ServiceInstance> getInstanceResponse(List<ServiceInstance> serviceInstances) {
if (serviceInstances.isEmpty()) {
log.warn("No servers available for service: " + this.serviceId);
return new EmptyResponse();
}
//读取 spring-cloud-sleuth 的对于当前请求的链路追踪上下文,获取对应的 traceId
Span currentSpan = tracer.currentSpan();
if (currentSpan == null) {
currentSpan = tracer.newTrace();
}
long l = currentSpan.context().traceId();
return getInstanceResponseByRoundRobin(l, serviceInstances);
}
@VisibleForTesting
public Response<ServiceInstance> getInstanceResponseByRoundRobin(long traceId, List<ServiceInstance> serviceInstances) {
//首先随机打乱列表中实例的顺序
Collections.shuffle(serviceInstances);
//需要先将所有参数缓存起来,否则 comparator 会调用多次,并且可能在排序过程中参数发生改变(针对实例的请求统计数据一直在并发改变)
Map<ServiceInstance, Integer> used = Maps.newHashMap();
Map<ServiceInstance, Long> callings = Maps.newHashMap();
Map<ServiceInstance, Double> failedInRecentOneMin = Maps.newHashMap();
serviceInstances = serviceInstances.stream().sorted(
Comparator
//之前已经调用过的网段,这里排后面
.<ServiceInstance>comparingInt(serviceInstance -> {
return used.computeIfAbsent(serviceInstance, k -> {
return calledIpPrefixes.get(traceId).stream().anyMatch(prefix -> {
return serviceInstance.getHost().contains(prefix);
}) ? 1 : 0;
});
})
//当前错误率最少的
.thenComparingDouble(serviceInstance -> {
return failedInRecentOneMin.computeIfAbsent(serviceInstance, k -> {
double value = serviceInstanceMetrics.getFailedInRecentOneMin(serviceInstance);
//由于使用的是移动平均值(EMA),需要忽略过小的差异(保留两位小数,不是四舍五入,而是直接舍弃)
return ((int) (value * 100)) / 100.0;
});
})
//当前负载请求最少的
.thenComparingLong(serviceInstance -> {
return callings.computeIfAbsent(serviceInstance, k ->
serviceInstanceMetrics.getCalling(serviceInstance)
);
})
).collect(Collectors.toList());
if (serviceInstances.isEmpty()) {
log.warn("No servers available for service: " + this.serviceId);
return new EmptyResponse();
}
ServiceInstance serviceInstance = serviceInstances.get(0);
//记录本次返回的网段
calledIpPrefixes.get(traceId).add(serviceInstance.getHost().substring(0, serviceInstance.getHost().lastIndexOf(".")));
//目前记录这个只为了兼容之前的单元测试(调用次数测试)
positionCache.get(traceId).getAndIncrement();
return new DefaultResponse(serviceInstance);
}
对于记录实例数据的缓存何时更新,是在 FeignClient 粘合重试,断路以及线程隔离的代码中的,这个我们下一节就会看到。
1. 为何没有使用所有微服务共享的缓存来保存调用数据,来让这些数据更加准确?
共享缓存的可选方案包括将这些数据记录放入 Redis,或者是 Apache Ignite 这样的内存网格中。但是有两个问题:
每个微服务使用本地缓存,记录自己调用其他实例的数据,在我们这里看来,不仅是更容易实现,也是更准确的做法。
2. 采用 EMA 的方式而不是请求窗口的方式统计最近错误率
采用请求窗口的方式统计,肯定是最准确的,例如我们统计最近一分钟的错误率,就将最近一分钟的请求缓存起来,读取的时候,将缓存起来的请求数据加在一起取平均数即可。但是这种方式在请求突增的时候,可能会占用很多很多内存来缓存这些请求。同时计算错误率的时候,随着缓存请求数的增多也会消耗更大量的 CPU 进行计算。这样做很不值得。
EMA 这种滑动平均值的计算方式,常见于各种性能监控统计场景,例如 JVM 中 TLAB 大小的动态计算,G1 GC Region 大小的伸缩以及其他很多 JVM 需要动态得出合适值的地方,都用这种计算方式。他不用将请求缓存起来,而是直接用最新值乘以一个比例之后加上老值乘以 (1 - 这个比例),这个比例一般高于 0.5,表示 EMA 和当前最新值更加相关。
但是 EMA 也带来另一个问题,我们会发现随着程序运行小数点位数会非常多,会看到类似于如下的值:0.00000000123, 0.120000001, 0.120000003, 为了忽略过于细致差异的影响(其实这些影响也来自于很久之前的错误请求),我们只保留两位小数进行排序。