专栏首页码匠的流水账聊聊skywalking的storage-zipkin-plugin

聊聊skywalking的storage-zipkin-plugin

本文主要研究一下skywalking的storage-zipkin-plugin

ZipkinStorageModuleElasticsearchProvider

skywalking-6.6.0/oap-server/server-storage-plugin/storage-zipkin-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/zipkin/elasticsearch/ZipkinStorageModuleElasticsearchProvider.java

public class ZipkinStorageModuleElasticsearchProvider extends StorageModuleElasticsearchProvider {

    private static final Logger logger = LoggerFactory.getLogger(ZipkinStorageModuleElasticsearchProvider.class);
    private ZipkinTraceQueryEsDAO traceQueryEsDAO;

    @Override
    public String name() {
        return "zipkin-elasticsearch";
    }

    @Override
    public void prepare() throws ServiceNotProvidedException {
        super.prepare();
        traceQueryEsDAO = new ZipkinTraceQueryEsDAO(elasticSearchClient);
        this.registerServiceImplementation(ITraceQueryDAO.class, traceQueryEsDAO);
    }

    @Override public void notifyAfterCompleted() {
        super.notifyAfterCompleted();
        traceQueryEsDAO.setServiceInventoryCache(getManager().find(CoreModule.NAME).provider().getService(ServiceInventoryCache.class));
    }

    @Override
    public String[] requiredModules() {
        return new String[] {CoreModule.NAME};
    }
}
  • ZipkinStorageModuleElasticsearchProvider继承了StorageModuleElasticsearchProvider,其prepare方法创建ZipkinTraceQueryEsDAO,然后注册为ITraceQueryDAO的实现;其notifyAfterCompleted方法执行traceQueryEsDAO.setServiceInventoryCache(getManager().find(CoreModule.NAME).provider().getService(ServiceInventoryCache.class))

ZipkinTraceQueryEsDAO

skywalking-6.6.0/oap-server/server-storage-plugin/storage-zipkin-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/zipkin/elasticsearch/ZipkinTraceQueryEsDAO.java

public class ZipkinTraceQueryEsDAO extends EsDAO implements ITraceQueryDAO {
    @Setter
    private ServiceInventoryCache serviceInventoryCache;

    public ZipkinTraceQueryEsDAO(
        ElasticSearchClient client) {
        super(client);
    }

    @Override
    public TraceBrief queryBasicTraces(long startSecondTB, long endSecondTB, long minDuration, long maxDuration,
        String endpointName, int serviceId, int serviceInstanceId, int endpointId, String traceId, int limit, int from,
        TraceState traceState, QueryOrder queryOrder) throws IOException {

        SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource();

        BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
        sourceBuilder.query(boolQueryBuilder);
        List<QueryBuilder> mustQueryList = boolQueryBuilder.must();

        if (startSecondTB != 0 && endSecondTB != 0) {
            mustQueryList.add(QueryBuilders.rangeQuery(TIME_BUCKET).gte(startSecondTB).lte(endSecondTB));
        }

        if (minDuration != 0 || maxDuration != 0) {
            RangeQueryBuilder rangeQueryBuilder = QueryBuilders.rangeQuery(LATENCY);
            if (minDuration != 0) {
                rangeQueryBuilder.gte(minDuration);
            }
            if (maxDuration != 0) {
                rangeQueryBuilder.lte(maxDuration);
            }
            boolQueryBuilder.must().add(rangeQueryBuilder);
        }
        if (!Strings.isNullOrEmpty(endpointName)) {
            mustQueryList.add(QueryBuilders.matchPhraseQuery(ENDPOINT_NAME, endpointName));
        }
        if (serviceId != 0) {
            boolQueryBuilder.must().add(QueryBuilders.termQuery(SERVICE_ID, serviceId));
        }
        if (serviceInstanceId != 0) {
            boolQueryBuilder.must().add(QueryBuilders.termQuery(SERVICE_INSTANCE_ID, serviceInstanceId));
        }
        if (endpointId != 0) {
            boolQueryBuilder.must().add(QueryBuilders.termQuery(ENDPOINT_ID, endpointId));
        }
        if (!Strings.isNullOrEmpty(traceId)) {
            boolQueryBuilder.must().add(QueryBuilders.termQuery(TRACE_ID, traceId));
        }
        switch (traceState) {
            case ERROR:
                mustQueryList.add(QueryBuilders.matchQuery(IS_ERROR, BooleanUtils.TRUE));
                break;
            case SUCCESS:
                mustQueryList.add(QueryBuilders.matchQuery(IS_ERROR, BooleanUtils.FALSE));
                break;
        }

        TermsAggregationBuilder builder = AggregationBuilders.terms(TRACE_ID).field(TRACE_ID).size(limit)
            .subAggregation(
                AggregationBuilders.max(LATENCY).field(LATENCY)
            )
            .subAggregation(
                AggregationBuilders.min(START_TIME).field(START_TIME)
            );
        switch (queryOrder) {
            case BY_START_TIME:
                builder.order(BucketOrder.aggregation(START_TIME, false));
                break;
            case BY_DURATION:
                builder.order(BucketOrder.aggregation(LATENCY, false));
                break;
        }
        sourceBuilder.aggregation(builder);

        SearchResponse response = getClient().search(ZipkinSpanRecord.INDEX_NAME, sourceBuilder);

        TraceBrief traceBrief = new TraceBrief();

        Terms terms = response.getAggregations().get(TRACE_ID);

        for (Terms.Bucket termsBucket : terms.getBuckets()) {
            BasicTrace basicTrace = new BasicTrace();

            basicTrace.setSegmentId(termsBucket.getKeyAsString());
            Min startTime = termsBucket.getAggregations().get(START_TIME);
            Max latency = termsBucket.getAggregations().get(LATENCY);
            basicTrace.setStart(String.valueOf((long)startTime.getValue()));
            basicTrace.getEndpointNames().add("");
            basicTrace.setDuration((int)latency.getValue());
            basicTrace.setError(false);
            basicTrace.getTraceIds().add(termsBucket.getKeyAsString());
            traceBrief.getTraces().add(basicTrace);
        }

        return traceBrief;
    }

    @Override public List<SegmentRecord> queryByTraceId(String traceId) throws IOException {
        return Collections.emptyList();
    }

    @Override public List<org.apache.skywalking.oap.server.core.query.entity.Span> doFlexibleTraceQuery(
        String traceId) throws IOException {
        SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource();
        sourceBuilder.query(QueryBuilders.termQuery(TRACE_ID, traceId));
        sourceBuilder.sort(START_TIME, SortOrder.ASC);
        sourceBuilder.size(1000);

        SearchResponse response = getClient().search(ZipkinSpanRecord.INDEX_NAME, sourceBuilder);

        List<org.apache.skywalking.oap.server.core.query.entity.Span> spanList = new ArrayList<>();

        for (SearchHit searchHit : response.getHits().getHits()) {
            int serviceId = ((Number)searchHit.getSourceAsMap().get(SERVICE_ID)).intValue();
            String dataBinaryBase64 = (String)searchHit.getSourceAsMap().get(SegmentRecord.DATA_BINARY);
            Span span = SpanBytesDecoder.PROTO3.decodeOne(Base64.getDecoder().decode(dataBinaryBase64));

            org.apache.skywalking.oap.server.core.query.entity.Span swSpan = new org.apache.skywalking.oap.server.core.query.entity.Span();

            swSpan.setTraceId(span.traceId());
            swSpan.setEndpointName(span.name());
            swSpan.setStartTime(span.timestamp() / 1000);
            swSpan.setEndTime(swSpan.getStartTime() + span.durationAsLong() / 1000);
            span.tags().forEach((key, value) -> {
                swSpan.getTags().add(new KeyValue(key, value));
            });
            span.annotations().forEach(annotation -> {
                LogEntity entity = new LogEntity();
                entity.setTime(annotation.timestamp() / 1000);
                entity.getData().add(new KeyValue("annotation", annotation.value()));
                swSpan.getLogs().add(entity);
            });
            if (serviceId != Const.NONE) {
                swSpan.setServiceCode(serviceInventoryCache.get(serviceId).getName());
            }
            swSpan.setSpanId(0);
            swSpan.setParentSpanId(-1);
            swSpan.setSegmentSpanId(span.id());
            swSpan.setSegmentId(span.id());
            Span.Kind kind = span.kind();
            switch (kind) {
                case CLIENT:
                case PRODUCER:
                    swSpan.setType("Entry");
                    break;
                case SERVER:
                case CONSUMER:
                    swSpan.setType("Exit");
                    break;
                default:
                    swSpan.setType("Local");

            }

            if (StringUtil.isEmpty(span.parentId())) {
                swSpan.setRoot(true);
                swSpan.setSegmentParentSpanId("");
            } else {
                Ref ref = new Ref();
                ref.setTraceId(span.traceId());
                ref.setParentSegmentId(span.parentId());
                ref.setType(RefType.CROSS_PROCESS);
                ref.setParentSpanId(0);

                swSpan.getRefs().add(ref);
                swSpan.setSegmentParentSpanId(span.parentId());
            }
            spanList.add(swSpan);
        }
        return spanList;
    }
}
  • ZipkinTraceQueryEsDAO继承了EsDAO并实现了ITraceQueryDAO接口,其queryBasicTraces方法构造SearchSourceBuilder然后执行getClient().search(ZipkinSpanRecord.INDEX_NAME, sourceBuilder),最后将返还结果解析为TraceBrief;其doFlexibleTraceQuery方法根据traceId和start_time构建SearchSourceBuilder,然后执行getClient().search(ZipkinSpanRecord.INDEX_NAME, sourceBuilder),最后将返回结果解析为spanList

小结

ZipkinStorageModuleElasticsearchProvider继承了StorageModuleElasticsearchProvider,其prepare方法创建ZipkinTraceQueryEsDAO,然后注册为ITraceQueryDAO的实现;其notifyAfterCompleted方法执行traceQueryEsDAO.setServiceInventoryCache(getManager().find(CoreModule.NAME).provider().getService(ServiceInventoryCache.class))

doc

  • ZipkinStorageModuleElasticsearchProvider

本文分享自微信公众号 - 码匠的流水账(geek_luandun),作者:码匠乱炖

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2020-03-27

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • 聊聊skywalking的storage-zipkin-plugin

    skywalking-6.6.0/oap-server/server-storage-plugin/storage-zipkin-plugin/src/main...

    codecraft
  • leetcode之山羊拉丁文

    这里先将字符串按空格分割为单词,然后遍历每个单词,判断首字母是否为元音,是的话在后面添加ma,不是的话将首字母移到后面再拼接ma,最后再根据单词在句子中的ind...

    codecraft
  • 聊聊nacos的ServerListManager

    nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/cluster/ServerListMana...

    codecraft
  • 聊聊skywalking的storage-zipkin-plugin

    skywalking-6.6.0/oap-server/server-storage-plugin/storage-zipkin-plugin/src/main...

    codecraft
  • 凯撒加密算法(最简单的对称加密)

    java404
  • group by..with rollup学习实例

    首先打开题目,发现 index.php 是登录界面,寻找 register.php,表单内容如下

    安恒网络空间安全讲武堂
  • 阶段01Java基础day03JAVA基础

    声明:本文为原创,作者为 对弈,转载时请保留本声明及附带文章链接:http://www.duiyi.xyz/c%e5%ae%9e%e7%8e%b0%e9%9b%...

    对弈
  • Nginx配置upstream实现负载均衡

    如今负载均衡初步完毕了。upstream依照轮询(默认)方式进行负载,每一个请求按时间顺序逐一分配到不同的后端服务器。假设后端服务器down掉。能自己主动剔除。...

    程序员小明
  • Nginx配置upstream实现负载均衡

    如今负载均衡初步完毕了。upstream依照轮询(默认)方式进行负载,每一个请求按时间顺序逐一分配到不同的后端服务器。假设后端服务器down掉。能自己主动剔除。...

    乱敲代码
  • iOS中UITableView和UICollectionView的默认空态页

    王大锤

扫码关注云+社区

领取腾讯云代金券