专栏首页Java学习录Zipkin客户端链路追踪源码解析

Zipkin客户端链路追踪源码解析

我们知道,Zipkin这个工具可以帮助我们收集分布式系统中各个系统之间的调用连关系,而且除了Servlet之外还能收集:MQ、线程池、WebSocket、Feign、Hystrix、RxJava、WebFlux等等组件之间的调用关系。本篇文章就来分析一下Zipkin是如何完成这些功能的

我们先以最常用的Servlet接受请求为例来分析

在spring-cloud-sleuth的spring.factories文件中注入的很多类中包含了一个类:TraceWebServletAutoConfiguration,一看就知道,这是为Servlet环境量身定制的一个自动装配类

在这个类中,创建了一个Filter,这个Filter就是拦截web请求,完成Servlet请求链路的收集的利器

    @Bean
    @ConditionalOnMissingBean
    public TracingFilter tracingFilter(HttpTracing tracing) {
        return (TracingFilter) TracingFilter.create(tracing);
    }

我们直接来看这个拦截器都是做了一些什么东西吧

public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException, ServletException {
        HttpServletRequest httpRequest = (HttpServletRequest)request;
        HttpServletResponse httpResponse = this.servlet.httpResponse(response);
        TraceContext context = (TraceContext)request.getAttribute(TraceContext.class.getName());
        if (context != null) {
            Scope scope = this.currentTraceContext.maybeScope(context);

            try {
                chain.doFilter(request, response);
            } finally {
                scope.close();
            }

        } else {
            Span span = this.handler.handleReceive(this.extractor, httpRequest);
            request.setAttribute(SpanCustomizer.class.getName(), span.customizer());
            request.setAttribute(TraceContext.class.getName(), span.context());
            Throwable error = null;
            Scope scope = this.currentTraceContext.newScope(span.context());

            try {
                chain.doFilter(httpRequest, httpResponse);
            } catch (ServletException | RuntimeException | Error | IOException var19) {
                error = var19;
                throw var19;
            } finally {
                scope.close();
                if (this.servlet.isAsync(httpRequest)) {
                    this.servlet.handleAsync(this.handler, httpRequest, span);
                } else {
                    this.handler.handleSend(ADAPTER.adaptResponse(httpRequest, httpResponse), error, span);
                }

            }

        }
    }
Span的创建

第一步,尝试从request中获取TraceContext,TraceContext包含了本次请求的链路信息,假如这个请求是从上游系统过来的话,那么这里就会存在这个信息。

我们先重点看不存在上游系统时的分支,这个时候,第一步就应该去创建一个span。关于span和trace的概念上篇文章已经提到过了,这里就不再展开了。

public <C> Span handleReceive(TraceContext.Extractor<C> extractor, C carrier, Req request) {
    Span span = nextSpan(extractor.extract(carrier), request);
    span.kind(Span.Kind.SERVER);
    return handleStart(request, span);
  }
  Span nextSpan(TraceContextOrSamplingFlags extracted, Req request) {
    if (extracted.sampled() == null) { // Otherwise, try to make a new decision
      extracted = extracted.sampled(sampler.trySample(adapter, request));
    }
    return extracted.context() != null
        ? tracer.joinSpan(extracted.context())
        : tracer.nextSpan(extracted);
  }

这个三目表达式的意思是看当前环境中是否存在span,如果存在则加入当前环境的span,否则继续进入创建span的逻辑

public Span nextSpan(TraceContextOrSamplingFlags extracted) {
    TraceContext parent = extracted.context();
    if (extracted.samplingFlags() != null) {
      TraceContext implicitParent = currentTraceContext.get();
      if (implicitParent == null) {
        return toSpan(newContextBuilder(null, extracted.samplingFlags())
            .extra(extracted.extra()).build());
      }
      // fall through, with an implicit parent, not an extracted one
      parent = appendExtra(implicitParent, extracted.extra());
    }
    if (parent != null) {
      TraceContext.Builder builder;
      if (extracted.samplingFlags() != null) {
        builder = newContextBuilder(parent, extracted.samplingFlags());
      } else {
        builder = newContextBuilder(parent, sampler);
      }
      return toSpan(builder.build());
    }
    TraceIdContext traceIdContext = extracted.traceIdContext();
    if (extracted.traceIdContext() != null) {
      Boolean sampled = traceIdContext.sampled();
      if (sampled == null) sampled = sampler.isSampled(traceIdContext.traceId());
      return toSpan(TraceContext.newBuilder()
          .sampled(sampled)
          .debug(traceIdContext.debug())
          .traceIdHigh(traceIdContext.traceIdHigh()).traceId(traceIdContext.traceId())
          .spanId(nextId())
          .extra(extracted.extra()).build());
    }
    // TraceContextOrSamplingFlags is a union of 3 types, we've checked all three
    throw new AssertionError("should not reach here");
  }

首先会尝试获取trace,因为是第一次请求,所以这个时候trace也不存在所以会进入到toSpan方法

public Span toSpan(TraceContext context) {
    if (context == null) throw new NullPointerException("context == null");
    TraceContext decorated = propagationFactory.decorate(context);
    if (!noop.get() && Boolean.TRUE.equals(decorated.sampled())) {
      return RealSpan.create(decorated, recorder, errorParser);
    }
    return NoopSpan.create(decorated);
  }

这里如果我们没有特殊指定的话会使用RealSpan来创建span,这个span的最终实现类是AutoValue_RealSpan

接着返回最开始的handleReceive方法

  public <C> Span handleReceive(TraceContext.Extractor<C> extractor, C carrier, Req request) {
    Span span = nextSpan(extractor.extract(carrier), request);
    span.kind(Span.Kind.SERVER);
    return handleStart(request, span);
  }

span创建完毕后就会设置kind,这个kand代表了服务类型,这里就是设置了服务类型为服务端。

接下来就是去开启记录链路信息

Span handleStart(Req request, Span span) {
    if (span.isNoop()) return span;
    Scope ws = currentTraceContext.maybeScope(span.context());
    try {
      parser.request(adapter, request, span.customizer());

      Endpoint.Builder remoteEndpoint = Endpoint.newBuilder();
      if (parseRemoteEndpoint(request, remoteEndpoint)) {
        span.remoteEndpoint(remoteEndpoint.build());
      }
    } finally {
      ws.close();
    }

    return span.start();
  }

开启过程中记录了几个信息

 public <Req> void request(HttpAdapter<Req, ?> adapter, Req req, SpanCustomizer customizer) {
    customizer.name(spanName(adapter, req));
    String method = adapter.method(req);
    if (method != null) customizer.tag("http.method", method);
    String path = adapter.path(req);
    if (path != null) customizer.tag("http.path", path);
  }

  public Span start() {
      return start(clock.currentTimeMicroseconds());
  }
  synchronized MutableSpan start(long timestamp) {
    span.timestamp(this.timestamp = timestamp);
    return this;
  }

接着在回到文章最开始提到的Filter方法中

在span和trace创建完成后,会把它们添加到request中

Scope的创建

然后是一个scope的创建,这个scope和日志组件说息息相关的。简单来说,它会把traceId、parentId、spanId打印到当前系统打印的每一行日志中

public Scope newScope(@Nullable TraceContext currentSpan) {
        final String previousTraceId = MDC.get("traceId");
        final String previousParentId = MDC.get("parentId");
        final String previousSpanId = MDC.get("spanId");
        final String spanExportable = MDC.get("spanExportable");
        final String legacyPreviousTraceId = MDC.get(LEGACY_TRACE_ID_NAME);
        final String legacyPreviousParentId = MDC.get(LEGACY_PARENT_ID_NAME);
        final String legacyPreviousSpanId = MDC.get(LEGACY_SPAN_ID_NAME);
        final String legacySpanExportable = MDC.get(LEGACY_EXPORTABLE_NAME);

        if (currentSpan != null) {
            String traceIdString = currentSpan.traceIdString();
            MDC.put("traceId", traceIdString);
            MDC.put(LEGACY_TRACE_ID_NAME, traceIdString);
            String parentId = currentSpan.parentId() != null ?
                    HexCodec.toLowerHex(currentSpan.parentId()) :
                    null;
            replace("parentId", parentId);
            replace(LEGACY_PARENT_ID_NAME, parentId);
            String spanId = HexCodec.toLowerHex(currentSpan.spanId());
            MDC.put("spanId", spanId);
            MDC.put(LEGACY_SPAN_ID_NAME, spanId);
            String sampled = String.valueOf(currentSpan.sampled());
            MDC.put("spanExportable", sampled);
            MDC.put(LEGACY_EXPORTABLE_NAME, sampled);
            log("Starting scope for span: {}", currentSpan);
            if (currentSpan.parentId() != null) {
                if (log.isTraceEnabled()) {
                    log.trace("With parent: {}", currentSpan.parentId());
                }
            }
        }
        else {
            MDC.remove("traceId");
            MDC.remove("parentId");
            MDC.remove("spanId");
            MDC.remove("spanExportable");
            MDC.remove(LEGACY_TRACE_ID_NAME);
            MDC.remove(LEGACY_PARENT_ID_NAME);
            MDC.remove(LEGACY_SPAN_ID_NAME);
            MDC.remove(LEGACY_EXPORTABLE_NAME);
        }

        Scope scope = this.delegate.newScope(currentSpan);

        class ThreadContextCurrentTraceContextScope implements Scope {
            @Override public void close() {
                log("Closing scope for span: {}", currentSpan);
                scope.close();
                replace("traceId", previousTraceId);
                replace("parentId", previousParentId);
                replace("spanId", previousSpanId);
                replace("spanExportable", spanExportable);
                replace(LEGACY_TRACE_ID_NAME, legacyPreviousTraceId);
                replace(LEGACY_PARENT_ID_NAME, legacyPreviousParentId);
                replace(LEGACY_SPAN_ID_NAME, legacyPreviousSpanId);
                replace(LEGACY_EXPORTABLE_NAME, legacySpanExportable);
            }
        }
        return new ThreadContextCurrentTraceContextScope();
    }
Span的上送

接下来当剩下的执行链执行完毕后,本次请求也就该结束了。在请求结束时,span就会被上送到Zipkin服务端中

 public void handleSend(@Nullable Resp response, @Nullable Throwable error, Span span) {
    handleFinish(response, error, span);
  }

    void handleFinish(@Nullable Resp response, @Nullable Throwable error, Span span) {
    if (span.isNoop()) return;
    try {
      Scope ws = currentTraceContext.maybeScope(span.context());
      try {
        parser.response(adapter, response, error, span.customizer());
      } finally {
        ws.close(); // close the scope before finishing the span
      }
    } finally {
      finishInNullScope(span);
    }
  }

首先在span中记录本次调用的相应信息

public <Resp> void response(HttpAdapter<?, Resp> adapter, @Nullable Resp res,
      @Nullable Throwable error, SpanCustomizer customizer) {
    int statusCode = 0;
    if (res != null) {
      statusCode = adapter.statusCodeAsInt(res);
      String nameFromRoute = spanNameFromRoute(adapter, res, statusCode);
      if (nameFromRoute != null) customizer.name(nameFromRoute);
      String maybeStatus = maybeStatusAsString(statusCode, 299);
      if (maybeStatus != null) customizer.tag("http.status_code", maybeStatus);
    }
    error(statusCode, error, customizer);
  }

接着清空Scope

void finishInNullScope(Span span) {
    Scope ws = currentTraceContext.maybeScope(null);
    try {
      span.finish();
    } finally {
      ws.close();
    }
  }

之后说span的上传

public void finish(TraceContext context) {
    MutableSpan span = spanMap.remove(context);
    if (span == null || noop.get()) return;
    synchronized (span) {
      span.finish(span.clock.currentTimeMicroseconds());
      reporter.report(span.toSpan());
    }
  }

具体上传的实现是由Sender接口的实现类实现的,它的实现类默认情况下是这三个

而一个span内容则是这样的

RabbitMQ链路追踪

当看完SpringMVC链路追踪的实现方式之后,再去看其他的方式,我想肯定是非常简单的。这里我们以RabbitMQ为例:

首先查找spring-cloud-sleuth的spring.factories文件,看到关于消息中间件的追踪配置类是这个TraceMessagingAutoConfiguration

看这个类关于RabbitMQ的东西

 @Configuration
    @ConditionalOnProperty(value = "spring.sleuth.messaging.rabbit.enabled", matchIfMissing = true)
    @ConditionalOnClass(RabbitTemplate.class)
    protected static class SleuthRabbitConfiguration {
        @Bean
        @ConditionalOnMissingBean
        SpringRabbitTracing springRabbitTracing(Tracing tracing,
                SleuthMessagingProperties properties) {
            return SpringRabbitTracing.newBuilder(tracing)
                    .remoteServiceName(properties.getMessaging().getRabbit().getRemoteServiceName())
                    .build();
        }

        @Bean
        @ConditionalOnMissingBean
        static SleuthRabbitBeanPostProcessor sleuthRabbitBeanPostProcessor(BeanFactory beanFactory) {
            return new SleuthRabbitBeanPostProcessor(beanFactory);
        }
    }

这里其实大致就可以猜测出来了,肯定是使用了SleuthRabbitBeanPostProcessor在构造RabbitTemplate的使用做了一些改造,比如说加个拦截器啥的,然后当使用RabbitTemplate发送消息时自动添加Header等东西就完成了整个流程了

本文分享自微信公众号 - Java学习录(Javaxuexilu),作者:Java学习录

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2019-11-19

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • Zipkin服务端源码解析

    还记得这个我们zipkin系列的第一篇文章中提到过的架构图么,服务端组件就这么几个,很简单。

    Java学习录
  • CentsOS原生RabbitMQ安装过程

    RabbitMQ安装时与Erlang的版本一定要保持以下的对应关系,否则会引发无法启动的问题

    Java学习录
  • Spring扩展点之FactoryBean接口

    由接口定义可以看出来,实现这个接口的bean不是主要功能,getObject()创建的对象才是重点。那么在这我们就可以猜到了,可以是使用FactoryBean创...

    Java学习录
  • Mac全栈开发-Markdown手册

    Headings from h1 through h6 are constructed with a # for each level:

    用户1065635
  • 网页版通讯录

    目前移动端的应用越来越火了,其中以网页为载体的应用也是层出不穷。今天我给大家介绍一个用网页来实现制作通讯录的方法。

    无邪Z
  • 论文赏析[ACL18]基于RNN和动态规划的线性时间成分句法分析

    Linear-Time Constituency Parsing with RNNs and Dynamic Programminggodweiyang.com

    godweiyang
  • linux 定时任务 Crontab 使用方法

    用户所建立的 crontab 文件中,每一行都代表一项任务,每行的每个字段代表一项设置,它的格式共分为六个字段,前五段是时间设定段,第六段是要执行的命令段,格式...

    huginn 中文网
  • 创建一个新的 Huginn Agent

    请注意:Huginn API 一直在改进,因此一些无用的 Agent 或将被放弃。我们非常希望您能将您的使用方法以及 API 应该更改什么告诉我们。查看 #60...

    huginn 中文网
  • 筛选 RSS 源,制作新的全文 RSS 源

    问题:假如你喜欢的网站只提供摘要型的 RSS 源,但是你希望能在 RSS 阅读器中阅读全文 RSS,同时还希望它只推送某些特定的文章 解决方法:利用 Hugin...

    huginn 中文网
  • 利用 Cloudmailin 服务解析邮件

    Cloudmailin 服务可以将邮件转化成 HTTP POST,这与 Webhook Agent 结合使用的话,可以实现很多有趣的功能,具体的设置步骤如下:

    huginn 中文网

扫码关注云+社区

领取腾讯云代金券