前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >spring-cloud-sleuth源码学习三

spring-cloud-sleuth源码学习三

原创
作者头像
eeaters
修改2021-10-11 10:43:46
1.9K0
修改2021-10-11 10:43:46
举报
文章被收录于专栏:阿杰阿杰

文章分三部分:

- spring-cloud-sleuth快速上手(https://cloud.tencent.com/developer/article/1884423)

- zipkin-brave的demo及源码(https://cloud.tencent.com/developer/article/1884429)

- spring-cloud整合zipkin源码

序言

十一放个假,每天都在疯狂的补觉,现在上班还处于想睡觉的状态...哈哈

springcloud整合zipkin源码

本次看源码的目标

因为不同的交互方式无法统一一个方法来携带信息(如http/udp的不同,与redis/mq/rest交互时), https://github.com/openzipkin/brave/tree/master/brave是brave自己针对不同的方式进行的整合

本次源码主要是梳理springcloud中mvc请求/openfeign/rabbitmq/redis中的切入方式

自动装配入口

通过Maven Helper很容易的在spring-cloud-starter-sleuth中看到spring-cloud-sleuth-autoconfigure,一看就猜到这个八成就是自动装配的入口了,进去找下spring.factory文件,

代码语言:javascript
复制
# Auto Configuration
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
org.springframework.cloud.sleuth.autoconfig.instrument.async.TraceAsyncAutoConfiguration,\
org.springframework.cloud.sleuth.autoconfig.instrument.async.TraceAsyncCustomAutoConfiguration,\
org.springframework.cloud.sleuth.autoconfig.instrument.async.TraceAsyncDefaultAutoConfiguration,\
org.springframework.cloud.sleuth.autoconfig.instrument.circuitbreaker.TraceCircuitBreakerAutoConfiguration,\
org.springframework.cloud.sleuth.autoconfig.instrument.rxjava.TraceRxJavaAutoConfiguration,\
org.springframework.cloud.sleuth.autoconfig.instrument.quartz.TraceQuartzAutoConfiguration,\
org.springframework.cloud.sleuth.autoconfig.instrument.web.TraceWebAutoConfiguration,\
org.springframework.cloud.sleuth.autoconfig.instrument.web.client.TraceWebClientAutoConfiguration,\
org.springframework.cloud.sleuth.autoconfig.instrument.web.client.feign.TraceFeignClientAutoConfiguration,\
org.springframework.cloud.sleuth.autoconfig.instrument.web.client.TraceWebAsyncClientAutoConfiguration,\
org.springframework.cloud.sleuth.autoconfig.instrument.scheduling.TraceSchedulingAutoConfiguration,\
org.springframework.cloud.sleuth.autoconfig.instrument.reactor.TraceReactorAutoConfiguration,\
org.springframework.cloud.sleuth.autoconfig.instrument.messaging.TraceFunctionAutoConfiguration,\
org.springframework.cloud.sleuth.autoconfig.instrument.messaging.TraceSpringIntegrationAutoConfiguration,\
org.springframework.cloud.sleuth.autoconfig.instrument.messaging.TraceSpringMessagingAutoConfiguration,\
org.springframework.cloud.sleuth.autoconfig.instrument.messaging.TraceWebSocketAutoConfiguration,\
org.springframework.cloud.sleuth.autoconfig.brave.BraveAutoConfiguration,\
org.springframework.cloud.sleuth.autoconfig.brave.instrument.web.client.BraveWebClientAutoConfiguration,\
org.springframework.cloud.sleuth.autoconfig.brave.instrument.rpc.BraveRpcAutoConfiguration,\
org.springframework.cloud.sleuth.autoconfig.brave.instrument.grpc.BraveGrpcAutoConfiguration,\
org.springframework.cloud.sleuth.autoconfig.brave.instrument.messaging.BraveKafkaStreamsAutoConfiguration,\
org.springframework.cloud.sleuth.autoconfig.brave.instrument.messaging.BraveMessagingAutoConfiguration,\
org.springframework.cloud.sleuth.autoconfig.brave.instrument.opentracing.BraveOpentracingAutoConfiguration,\
org.springframework.cloud.sleuth.autoconfig.brave.instrument.redis.BraveRedisAutoConfiguration,\
org.springframework.cloud.sleuth.autoconfig.brave.instrument.mongodb.BraveMongoDbAutoConfiguration,\
org.springframework.cloud.sleuth.autoconfig.zipkin2.ZipkinAutoConfiguration
# Environment Post Processor
org.springframework.boot.env.EnvironmentPostProcessor=\
org.springframework.cloud.sleuth.autoconfig.TraceEnvironmentPostProcessor,\
org.springframework.cloud.sleuth.autoconfig.instrument.web.client.TraceGatewayEnvironmentPostProcessor

首次看可能会有些多; 但是我们在了解了brave源码后,我们就可以根据自己的感觉来挑选出是和我们的装配类

挑选阅读的配置类

首先可以随便找几个想看的配置类,先进行猜测一波,

  • TraceWebAutoConfiguration/TraceWebClientAutoConfiguration/TraceFeignClientAutoConfiguration可能能看到web请求和feign请求的处理方式
  • TraceWebSocketAutoConfiguration可能可以获取到websocket交互时的处理方式
  • BraveRpcAutoConfiguration/BraveGrpcAutoConfiguration可能可以获取到rpc/grpc的处理方式
  • BraveKafkaStreamsAutoConfiguration/BraveMessagingAutoConfiguration可能获取到jms的处理方式
  • BraveRedisAutoConfiguration/BraveMongoDbAutoConfiguration可能获取到缓存服务的处理方式
  • BraveAutoConfiguration可能获取到我们看brave源码中涉及的核心Tracer相关的信息
  • ZipkinAutoConfiguration是用于装配与zipkin服务器进行交互的配置类

甭管对不对,看看也就知道了

日志调整:- TraceEnvironmentPostProcessor

在学习brave中,日志并没有打印出traceId和spanId, 因为我们根本没有规定日志打印的内容, 而在springboot的项目的日志中.并没有做什么事情,日志可以打印出这些信息,一定是在某一步对日志进行了调整; config类应该不会做; 而配置中下面的两个EnvironmentPostProcessor就可以做这个事情;

代码语言:javascript
复制
    @Override
    public void postProcessEnvironment(ConfigurableEnvironment environment, SpringApplication application) {
        Map<String, Object> map = new HashMap<String, Object>();
        // This doesn't work with all logging systems but it's a useful default so you see
        // traces in logs without having to configure it.
        if (Boolean.parseBoolean(environment.getProperty("spring.sleuth.enabled", "true"))) {
            map.put("logging.pattern.level",
                    "%5p [${spring.zipkin.service.name:" + "${spring.application.name:}},%X{traceId:-},%X{spanId:-}]");
        }
        addOrReplace(environment.getPropertySources(), map);
    }

BraveAutoConfiguration

这个类确实和猜测的一样,创建Tracing/Tracer/CurrentTraceContext等

ZipkinAutoConfiguration

内部定义了Reporter,是否注册到注册中心中去,是否统计计量等; 这点和预期一致,没有细看; 因为我是梳理在项目中使用到链路追踪的流程

TraceWebClientAutoConfiguration

内部包含了RestTemplateConfig/HttpHeadersFilterConfig/NettyConfiguration/WebClientConfig/TraceOAuthConfiguration几个配置 通过类名及内部的bean可以得知,这是处理web请求相关的; RestTemplate是我常用的类; 那么就那这个类的代码进行阅读下

TracingClientHttpRequestInterceptor

RestTemplateConfig内的bean; 用的是典型的拦截器模式; 在execution.execute方法前后进行相应的逻辑 ; handleSend从request中获取传入的traceId和spanId; 然后通过tracer调用nextSpan来创建属于当前的span handleReceive则是调用span.finish()或者error()方法进行收尾 我们在brave的demo中代码虽然; 但是demo写完看这套代码轻松很多

代码语言:javascript
复制
public final class TracingClientHttpRequestInterceptor implements ClientHttpRequestInterceptor {
​
    @Override
    public ClientHttpResponse intercept(HttpRequest req, byte[] body, ClientHttpRequestExecution execution)
            throws IOException {
        HttpRequestWrapper request = new HttpRequestWrapper(req);
        Span span = handler.handleSend(request);
        if (log.isDebugEnabled()) {
            log.debug("Wrapping an outbound http call with span [" + span + "]");
        }
        ClientHttpResponse response = null;
        Throwable error = null;
        try (CurrentTraceContext.Scope ws = currentTraceContext.newScope(span.context())) {
            response = execution.execute(req, body);
            return response;
        }
        catch (Throwable e) {
            error = e;
            throw e;
        }
        finally {
            handler.handleReceive(new ClientHttpResponseWrapper(request, response, error), span);
        }
    }
}

TraceFeignClientAutoConfiguration

见名知意, 这个类就是处理feign的, 先对结论进行描述: feign进行交互时抽象了一个Client,比如默认的Default,okhttp的OkHttpClient和apache的ApacheHttpClient;通过对这个client包装了一层实现的

FeignContextBeanPostProcessor

FeignBeanPostProcessorConfiguration配置下的bean;强行将所有FeignContext进行了封装,而所有Feign的创建都需要从FeignContext中获取组件 通过代码可以看到核心的关键还是TraceFeignObjectWrapper类;

关于FeignContext这个类,简单的描述下,能帮忙理解的话最好,如果让你更迷惑的话,就无视这一段描述 FeignContext: Feign应用上下文, 所有装配的组件都是从该上下文中获取的; Feign里面有一段很偷懒的代码; 个人的简单理解是,是每一个FeignClient都可能有不同的配置,比如自定义Decoder/Contract等, springcloudopenfeign是直接为每一个FeignClient都搞一个应用上下文;这个上下文的是应用上下文的子上下文, 然后因为每一个Feign都对应了一个属于自己的应用上下文,如果有自定义组件就用自己的; 没有就用父上下文(会有默认的)组件

代码语言:javascript
复制
    @Override
    public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
        if (bean instanceof FeignContext && !(bean instanceof TraceFeignContext)) {
            return new TraceFeignContext(traceFeignObjectWrapper(), (FeignContext) bean);
        }
        return bean;
    }
​
    private TraceFeignObjectWrapper traceFeignObjectWrapper() {
        return new TraceFeignObjectWrapper(this.beanFactory);
    }
​

TraceFeignObjectWrapper

这个类就是将原本FeignContext返回的,经过一个warp方法进行包装; 代码只是对client进行包装, 我们深入到client的实现类看下

代码语言:javascript
复制
    Object wrap(Object bean) {
        if (bean instanceof Client && !(bean instanceof TracingFeignClient)
                && !(bean instanceof LazyTracingFeignClient)) {
            if (loadBalancerPresent && bean instanceof FeignBlockingLoadBalancerClient
                    && !(bean instanceof TraceFeignBlockingLoadBalancerClient)) {
                return instrumentedFeignLoadBalancerClient(bean);
            }
            if (loadBalancerPresent && bean instanceof RetryableFeignBlockingLoadBalancerClient
                    && !(bean instanceof TraceRetryableFeignBlockingLoadBalancerClient)) {
                return instrumentedRetryableFeignLoadBalancerClient(bean);
            }
            return new LazyTracingFeignClient(this.beanFactory, (Client) bean);
        }
        return this.traceFeignBuilderBeanPostProcessor.postProcessAfterInitialization(bean, null);
    }
​

TracingFeignClient

直接贴下核心代码; 逻辑和web的那段是一样的方式

代码语言:javascript
复制
    @Override
    public Response execute(Request req, Request.Options options) throws IOException {
        RequestWrapper request = new RequestWrapper(req);
        Span span = this.handler.handleSend(request);
        if (log.isDebugEnabled()) {
            log.debug("Handled send of " + span);
        }
        Response res = null;
        Throwable error = null;
        try (CurrentTraceContext.Scope ws = this.currentTraceContext.newScope(span.context())) {
            res = this.delegate.execute(request.build(), options);
            if (res == null) { // possibly null on bad implementation or mocks
                res = Response.builder().request(req).build();
            }
            return res;
        }
        catch (Throwable e) {
            error = e;
            throw e;
        }
        finally {
            ResponseWrapper response = new ResponseWrapper(request, res, error);
            this.handler.handleReceive(response, span);
​
            if (log.isDebugEnabled()) {
                log.debug("Handled receive of " + span);
            }
        }
    }

BraveRedisAutoConfiguration

明显是redis的; 里面只有针对Lettuce进行后置处理的逻辑,所以可以预判,如果项目的版本太低,比如springboot1.5.4, 那么与redis交互时下面代码应该是不会生效的

代码语言:javascript
复制
@Configuration(proxyBeanMethods = false)
@ConditionalOnProperty(value = "spring.sleuth.redis.enabled", matchIfMissing = true)
@ConditionalOnBean({ Tracing.class, ClientResources.class })
@AutoConfigureAfter({ BraveAutoConfiguration.class })
@EnableConfigurationProperties(TraceRedisProperties.class)
@ConditionalOnClass(BraveTracing.class)
public class BraveRedisAutoConfiguration {
    @Bean
    static TraceLettuceClientResourcesBeanPostProcessor traceLettuceClientResourcesBeanPostProcessor(
            BeanFactory beanFactory) {
        return new TraceLettuceClientResourcesBeanPostProcessor(beanFactory);
    }
}

TraceLettuceClientResourcesBeanPostProcessor

关于对redis的处理; lettuce的客户端本身就有tracing功能; 所以这里是进行了整合, 因为我还没有看过lettuce的源码,对这个不熟,不多说

代码语言:javascript
复制
public class TraceLettuceClientResourcesBeanPostProcessor implements BeanPostProcessor {
    @Override
    public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
        if (bean instanceof ClientResources) {
            ClientResources cr = (ClientResources) bean;
            if (!cr.tracing().isEnabled()) {
                if (log.isDebugEnabled()) {
                    log.debug("Lettuce ClientResources bean is auto-configured to enable tracing.");
                }
                return cr.mutate().tracing(new LazyTracing(this.beanFactory)).build();
            }
            else if (log.isDebugEnabled()) {
                log.debug(
                        "Lettuce ClientResources bean is skipped for auto-configuration because tracing was already enabled.");
            }
        }
        return bean;
    }
}
​
class LazyTracing implements io.lettuce.core.tracing.Tracing {
​
    private final BeanFactory beanFactory;
​
    private final io.lettuce.core.tracing.Tracing noOpTracing = NoOpTracing.INSTANCE;
​
    private BraveTracing braveTracing;
​
    LazyTracing(BeanFactory beanFactory) {
        this.beanFactory = beanFactory;
    }
    //...
​
}

BraveMessagingAutoConfiguration

看下springRabbit的实现方式; SleuthRabbitBeanPostProcessor这个bean上面标注for tests; 所以核心逻辑应为SpringRabbitTracing, 不幸的是我先通过SleuthRabbitBeanPostProcessor类看实现了,好在这样子让我更快的看出实现方式

代码语言:javascript
复制
@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(MessagingTracing.class)
@ConditionalOnMessagingEnabled
@ConditionalOnBean(Tracing.class)
@EnableConfigurationProperties(SleuthMessagingProperties.class)
public class BraveMessagingAutoConfiguration {
​
    @Configuration(proxyBeanMethods = false)
    @ConditionalOnProperty(value = "spring.sleuth.messaging.rabbit.enabled", matchIfMissing = true)
    @ConditionalOnClass(RabbitTemplate.class)
    protected static class SleuthRabbitConfiguration {
​
        @Bean
        // for tests
        @ConditionalOnMissingBean
        static SleuthRabbitBeanPostProcessor sleuthRabbitBeanPostProcessor(BeanFactory beanFactory) {
            return new SleuthRabbitBeanPostProcessor(beanFactory);
        }
​
        @Bean
        @ConditionalOnMissingBean
        SpringRabbitTracing springRabbitTracing(MessagingTracing messagingTracing,
                SleuthMessagingProperties properties) {
            return SpringRabbitTracing.newBuilder(messagingTracing)
                    .remoteServiceName(properties.getRabbit().getRemoteServiceName()).build();
        }
    }
}

SpringRabbitTracing

通过包装MessagePostProcessor在header中增加需要传递的信息,看下TracingMessagePostProcessor 通过SimpleRabbitListenerContainerFactory中增加Advice来实现aop拦截, 所以需要跟踪下TracingRabbitListenerAdvice

代码语言:javascript
复制
public final class SpringRabbitTracing {
    
  /** Creates an instrumented {@linkplain RabbitTemplate} */
  public RabbitTemplate newRabbitTemplate(ConnectionFactory connectionFactory) {
    RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
    TracingMessagePostProcessor tracingMessagePostProcessor = new TracingMessagePostProcessor(this);
    rabbitTemplate.setBeforePublishPostProcessors(tracingMessagePostProcessor);
    return rabbitTemplate;
  }
​
  /** Instruments an existing {@linkplain RabbitTemplate} */
  public RabbitTemplate decorateRabbitTemplate(RabbitTemplate rabbitTemplate) {
    MessagePostProcessor[] beforePublishPostProcessors =
      appendTracingMessagePostProcessor(rabbitTemplate, beforePublishPostProcessorsField);
    if (beforePublishPostProcessors != null) {
      rabbitTemplate.setBeforePublishPostProcessors(beforePublishPostProcessors);
    }
    return rabbitTemplate;
  }
​
  /** Creates an instrumented {@linkplain SimpleRabbitListenerContainerFactory} */
  public SimpleRabbitListenerContainerFactory newSimpleRabbitListenerContainerFactory(
    ConnectionFactory connectionFactory
  ) {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setConnectionFactory(connectionFactory);
    factory.setAdviceChain(new TracingRabbitListenerAdvice(this));
    factory.setBeforeSendReplyPostProcessors(new TracingMessagePostProcessor(this));
    return factory;
  }
​
  /** Instruments an existing {@linkplain SimpleRabbitListenerContainerFactory} */
  public SimpleRabbitListenerContainerFactory decorateSimpleRabbitListenerContainerFactory(
    SimpleRabbitListenerContainerFactory factory
  ) {
    Advice[] advice = prependTracingRabbitListenerAdvice(factory);
    if (advice != null) factory.setAdviceChain(advice);
​
    MessagePostProcessor[] beforeSendReplyPostProcessors =
      appendTracingMessagePostProcessor(factory, beforeSendReplyPostProcessorsField);
    if (beforeSendReplyPostProcessors != null) {
      factory.setBeforeSendReplyPostProcessors(beforeSendReplyPostProcessors);
    }
​
    return factory;
  }
}

TracingMessagePostProcessor

代码基本逻辑: 获取span, 通过injector注入到request中; 这个主要是处理发送端的,span.start(timestamp).finish(timestamp)这里说明发送mq只是记录了下; 事实上发送和完成一块操作了

代码语言:javascript
复制
final class TracingMessagePostProcessor implements MessagePostProcessor {
​
  @Override public Message postProcessMessage(Message message) {
    MessageProducerRequest request = new MessageProducerRequest(message);
​
    TraceContext maybeParent = currentTraceContext.get();
    // Unlike message consumers, we try current span before trying extraction. This is the proper
    // order because the span in scope should take precedence over a potentially stale header entry.
    //
    // NOTE: Brave instrumentation used properly does not result in stale header entries, as we
    // always clear message headers after reading.
    Span span;
    if (maybeParent == null) {
      TraceContextOrSamplingFlags extracted =
        springRabbitTracing.extractAndClearTraceIdHeaders(extractor, request, message);
      span = springRabbitTracing.nextMessagingSpan(sampler, request, extracted);
    } else { // If we have a span in scope assume headers were cleared before
      span = tracer.newChild(maybeParent);
    }
​
    if (!span.isNoop()) {
      span.kind(PRODUCER).name("publish");
      if (remoteServiceName != null) span.remoteServiceName(remoteServiceName);
      // incur timestamp overhead only once
      long timestamp = tracing.clock(span.context()).currentTimeMicroseconds();
      span.start(timestamp).finish(timestamp);
    }
​
    injector.inject(span.context(), request);
    return message;
  }
}

TracingRabbitListenerAdvice

这个类主要应对消费端; 这个类是我之前想找没找到的

之前有需要对mq消费前后都进行处理的需求,当时debug没有发现合适的扩展点,所以最后是自己写了一个aop切所有consumer来实现, 看这里代码后发现consumer有相应的扩展

代码语言:javascript
复制
final class TracingRabbitListenerAdvice implements MethodInterceptor {
​
  /**
   * MethodInterceptor for {@link SimpleMessageListenerContainer.ContainerDelegate#invokeListener(Channel,
   * Message)}
   */
  @Override public Object invoke(MethodInvocation methodInvocation) throws Throwable {
    Message message = null;
    if (methodInvocation.getArguments()[1] instanceof List) {
      message = ((List<? extends Message>) methodInvocation.getArguments()[1]).get(0);
    } else {
      message = (Message) methodInvocation.getArguments()[1];
    }
    MessageConsumerRequest request = new MessageConsumerRequest(message);
​
    TraceContextOrSamplingFlags extracted =
      springRabbitTracing.extractAndClearTraceIdHeaders(extractor, request, message);
​
    // named for BlockingQueueConsumer.nextMessage, which we can't currently see
    Span consumerSpan = springRabbitTracing.nextMessagingSpan(sampler, request, extracted);
    Span listenerSpan = tracer.newChild(consumerSpan.context());
​
    if (!consumerSpan.isNoop()) {
      setConsumerSpan(consumerSpan, message.getMessageProperties());
​
      // incur timestamp overhead only once
      long timestamp = tracing.clock(consumerSpan.context()).currentTimeMicroseconds();
      consumerSpan.start(timestamp);
      long consumerFinish = timestamp + 1L; // save a clock reading
      consumerSpan.finish(consumerFinish);
​
      // not using scoped span as we want to start with a pre-configured time
      listenerSpan.name("on-message").start(consumerFinish);
    }
​
    Tracer.SpanInScope ws = tracer.withSpanInScope(listenerSpan);
    Throwable error = null;
    try {
      return methodInvocation.proceed();
    } catch (Throwable t) {
      error = t;
      throw t;
    } finally {
      if (error != null) listenerSpan.error(error);
      listenerSpan.finish();
      ws.close();
    }
  }
}

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 文章分三部分:
  • 序言
  • springcloud整合zipkin源码
    • 本次看源码的目标
      • 自动装配入口
        • 挑选阅读的配置类
          • 日志调整:- TraceEnvironmentPostProcessor
          • BraveAutoConfiguration
          • ZipkinAutoConfiguration
          • TraceWebClientAutoConfiguration
          • TracingClientHttpRequestInterceptor
          • TraceFeignClientAutoConfiguration
            • FeignContextBeanPostProcessor
              • TraceFeignObjectWrapper
                • TracingFeignClient
                • BraveRedisAutoConfiguration
                  • TraceLettuceClientResourcesBeanPostProcessor
                  • BraveMessagingAutoConfiguration
                    • SpringRabbitTracing
                      • TracingMessagePostProcessor
                        • TracingRabbitListenerAdvice
                        相关产品与服务
                        微服务引擎 TSE
                        微服务引擎(Tencent Cloud Service Engine)提供开箱即用的云上全场景微服务解决方案。支持开源增强的云原生注册配置中心(Zookeeper、Nacos 和 Apollo),北极星网格(腾讯自研并开源的 PolarisMesh)、云原生 API 网关(Kong)以及微服务应用托管的弹性微服务平台。微服务引擎完全兼容开源版本的使用方式,在功能、可用性和可运维性等多个方面进行增强。
                        领券
                        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档