前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Zipkin服务端源码解析

Zipkin服务端源码解析

作者头像
Java学习录
发布2019-11-21 15:04:43
7850
发布2019-11-21 15:04:43
举报
文章被收录于专栏:Java学习录Java学习录

还记得这个我们zipkin系列的第一篇文章中提到过的架构图么,服务端组件就这么几个,很简单。

其中稍微有点内涵的也就是collector和storage分别提供了多种不同的实现,collector支持http、rabbitMq、kafka而storage支持内存、mysql、es

现在我们来了解一下服务端的实现。服务端开启自动装配使用的是@EnableZipkinServer注解

代码语言:javascript
复制
@Target(ElementType.TYPE)@Retention(RetentionPolicy.RUNTIME)@Documented@Import(InternalZipkinConfiguration.class)public @interface EnableZipkinServer {
}

接着深入InternalZipkinConfiguration这个类

代码语言:javascript
复制
@Configuration@Import({  ZipkinServerConfiguration.class,  TracingConfiguration.class,  ZipkinQueryApiV2.class,  ZipkinHttpCollector.class,  MetricsHealthController.class})public class InternalZipkinConfiguration {}

可以看到这个类一共又引入了这么多的装配类,一个一个来分析吧

ZipkinServerConfiguration

这个类中,主要是配置了Zipkin的健康检查、web容器以及、默认的存储的方式的配置

其中健康检查的bean包括以下几个:

  1. ZipkinHealthIndicator 关于SpringBoot中健康检查的原理可以参考这篇文章:SpringBoot健康检查实现原理
  2. 健康检查的端口在这个类里MetricsHealthController,其中暴露了两个端口health和metrics

Web容器主要是由这个类负责的:UndertowServletWebServerFactory

InMemoryConfiguration 这个类则是存储的自动装配类,如果没有选择使用MySQL或者ES的话则会使用内存进行存储

TracingConfiguration

Zipkin服务端其实也会存储trace信息,这个时候就需要一些Client的配置。这个类里面就是这些配置 这里就不详细展开了

ZipkinQueryApiV2

Zipkin V2版本的查询API,这个API是供Zipkin的UI界面使用的,其中包含这几个接口

  1. dependencies
  2. services
  3. spans
  4. traces
  5. /trace/{traceIdHex}
ZipkinHttpCollector

Zipkin默认的Collector使用http协议里收集Trace信息,这里就是本文的重点

核心接受请求的方法是handleRequest

代码语言:javascript
复制
  public void handleRequest(HttpServerExchange exchange) throws Exception {    boolean v2 = exchange.getRelativePath().equals("/api/v2/spans");    boolean v1 = !v2 && exchange.getRelativePath().equals("/api/v1/spans");    if (!v2 && !v1) {      next.handleRequest(exchange);      return;    }
    if (!POST.equals(exchange.getRequestMethod())) {      next.handleRequest(exchange);      return;    }
    String contentTypeValue = exchange.getRequestHeaders().getFirst(CONTENT_TYPE);    boolean json = contentTypeValue == null || contentTypeValue.startsWith("application/json");    boolean thrift = !json && contentTypeValue.startsWith("application/x-thrift");    boolean proto = v2 && !json && contentTypeValue.startsWith("application/x-protobuf");    if (!json && !thrift && !proto) {      exchange          .setStatusCode(400)          .getResponseSender()          .send("unsupported content type " + contentTypeValue + "\n");      return;    }
    HttpCollector collector = v2 ? (json ? JSON_V2 : PROTO3) : thrift ? THRIFT : JSON_V1;    metrics.incrementMessages();    exchange.getRequestReceiver().receiveFullBytes(collector, errorCallback);  }

这里一共有这么几个逻辑:

  1. 首先判断接口版本,现在是v2版本
  2. 然后判断数据格式,现在是json版本
  3. 这里的collector的格式就是JSON_V2
  4. 添加监控数据
  5. 数据处理

默认情况下使用的是异步方式处理数据的,这个处理的实现类是这个AsyncReceiverImplreceiveFullBytes方法中首先会对这次请求的参数进行处理,处理完成之后的json格式如下:

代码语言:javascript
复制
[{    "traceId": "e840c83e65d10358",    "id": "e840c83e65d10358",    "kind": "SERVER",    "name": "get /user/getuser/{id}",    "timestamp": 1574174031876439,    "duration": 5911123,    "localEndpoint": {        "serviceName": "consumer-demo-feign",        "ipv4": "192.168.0.15"    },    "remoteEndpoint": {        "ipv6": "::1",        "port": 60716    },    "tags": {        "http.method": "GET",        "http.path": "/user/getUser/3",        "mvc.controller.class": "UserController",        "mvc.controller.method": "getUser"    }}]

当json数据格式化之后,接下来就是具体的处理

代码语言:javascript
复制
 public synchronized Call<Void> accept(List<Span> spans) {        int delta = spans.size();        int spansToRecover = this.spansByTraceIdTimeStamp.size() + delta - this.maxSpanCount;        this.evictToRecoverSpans(spansToRecover);        Iterator var4 = spans.iterator();
        while(var4.hasNext()) {            Span span = (Span)var4.next();            long timestamp = span.timestampAsLong();            String lowTraceId = lowTraceId(span.traceId());            InMemoryStorage.TraceIdTimestamp traceIdTimeStamp = new InMemoryStorage.TraceIdTimestamp(lowTraceId, timestamp);            this.spansByTraceIdTimeStamp.put(traceIdTimeStamp, span);            this.traceIdToTraceIdTimeStamps.put(lowTraceId, traceIdTimeStamp);            ++this.acceptedSpanCount;            if (this.searchEnabled) {                String spanName = span.name();                if (span.localServiceName() != null) {                    this.serviceToTraceIds.put(span.localServiceName(), lowTraceId);                    if (spanName != null) {                        this.serviceToSpanNames.put(span.localServiceName(), spanName);                    }                }
                if (span.remoteServiceName() != null) {                    this.serviceToTraceIds.put(span.remoteServiceName(), lowTraceId);                    if (spanName != null) {                        this.serviceToSpanNames.put(span.remoteServiceName(), spanName);                    }                }            }        }
        return Call.create((Object)null);    }

实际上,最终的span数据都是在这么几个对象中存在的

代码语言:javascript
复制
private final InMemoryStorage.SortedMultimap<InMemoryStorage.TraceIdTimestamp, Span> spansByTraceIdTimeStamp;private final InMemoryStorage.SortedMultimap<String, InMemoryStorage.TraceIdTimestamp> traceIdToTraceIdTimeStamps;private final InMemoryStorage.ServiceNameToTraceIds serviceToTraceIds;private final InMemoryStorage.SortedMultimap<String, String> serviceToSpanNames;
本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2019-11-20,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 Java学习录 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • ZipkinServerConfiguration
  • TracingConfiguration
  • ZipkinQueryApiV2
  • ZipkinHttpCollector
相关产品与服务
对象存储
对象存储(Cloud Object Storage,COS)是由腾讯云推出的无目录层次结构、无数据格式限制,可容纳海量数据且支持 HTTP/HTTPS 协议访问的分布式存储服务。腾讯云 COS 的存储桶空间无容量上限,无需分区管理,适用于 CDN 数据分发、数据万象处理或大数据计算与分析的数据湖等多种场景。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档