前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊skywalking的storage-zipkin-plugin

聊聊skywalking的storage-zipkin-plugin

原创
作者头像
code4it
修改2020-03-30 10:33:39
4450
修改2020-03-30 10:33:39
举报
文章被收录于专栏:码匠的流水账码匠的流水账

本文主要研究一下skywalking的storage-zipkin-plugin

ZipkinStorageModuleElasticsearchProvider

skywalking-6.6.0/oap-server/server-storage-plugin/storage-zipkin-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/zipkin/elasticsearch/ZipkinStorageModuleElasticsearchProvider.java

代码语言:javascript
复制
public class ZipkinStorageModuleElasticsearchProvider extends StorageModuleElasticsearchProvider {
​
    private static final Logger logger = LoggerFactory.getLogger(ZipkinStorageModuleElasticsearchProvider.class);
    private ZipkinTraceQueryEsDAO traceQueryEsDAO;
​
    @Override
    public String name() {
        return "zipkin-elasticsearch";
    }
​
    @Override
    public void prepare() throws ServiceNotProvidedException {
        super.prepare();
        traceQueryEsDAO = new ZipkinTraceQueryEsDAO(elasticSearchClient);
        this.registerServiceImplementation(ITraceQueryDAO.class, traceQueryEsDAO);
    }
​
    @Override public void notifyAfterCompleted() {
        super.notifyAfterCompleted();
        traceQueryEsDAO.setServiceInventoryCache(getManager().find(CoreModule.NAME).provider().getService(ServiceInventoryCache.class));
    }
​
    @Override
    public String[] requiredModules() {
        return new String[] {CoreModule.NAME};
    }
}
  • ZipkinStorageModuleElasticsearchProvider继承了StorageModuleElasticsearchProvider,其prepare方法创建ZipkinTraceQueryEsDAO,然后注册为ITraceQueryDAO的实现;其notifyAfterCompleted方法执行traceQueryEsDAO.setServiceInventoryCache(getManager().find(CoreModule.NAME).provider().getService(ServiceInventoryCache.class))

ZipkinTraceQueryEsDAO

skywalking-6.6.0/oap-server/server-storage-plugin/storage-zipkin-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/zipkin/elasticsearch/ZipkinTraceQueryEsDAO.java

代码语言:javascript
复制
public class ZipkinTraceQueryEsDAO extends EsDAO implements ITraceQueryDAO {
    @Setter
    private ServiceInventoryCache serviceInventoryCache;
​
    public ZipkinTraceQueryEsDAO(
        ElasticSearchClient client) {
        super(client);
    }
​
    @Override
    public TraceBrief queryBasicTraces(long startSecondTB, long endSecondTB, long minDuration, long maxDuration,
        String endpointName, int serviceId, int serviceInstanceId, int endpointId, String traceId, int limit, int from,
        TraceState traceState, QueryOrder queryOrder) throws IOException {
​
        SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource();
​
        BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
        sourceBuilder.query(boolQueryBuilder);
        List<QueryBuilder> mustQueryList = boolQueryBuilder.must();
​
        if (startSecondTB != 0 && endSecondTB != 0) {
            mustQueryList.add(QueryBuilders.rangeQuery(TIME_BUCKET).gte(startSecondTB).lte(endSecondTB));
        }
​
        if (minDuration != 0 || maxDuration != 0) {
            RangeQueryBuilder rangeQueryBuilder = QueryBuilders.rangeQuery(LATENCY);
            if (minDuration != 0) {
                rangeQueryBuilder.gte(minDuration);
            }
            if (maxDuration != 0) {
                rangeQueryBuilder.lte(maxDuration);
            }
            boolQueryBuilder.must().add(rangeQueryBuilder);
        }
        if (!Strings.isNullOrEmpty(endpointName)) {
            mustQueryList.add(QueryBuilders.matchPhraseQuery(ENDPOINT_NAME, endpointName));
        }
        if (serviceId != 0) {
            boolQueryBuilder.must().add(QueryBuilders.termQuery(SERVICE_ID, serviceId));
        }
        if (serviceInstanceId != 0) {
            boolQueryBuilder.must().add(QueryBuilders.termQuery(SERVICE_INSTANCE_ID, serviceInstanceId));
        }
        if (endpointId != 0) {
            boolQueryBuilder.must().add(QueryBuilders.termQuery(ENDPOINT_ID, endpointId));
        }
        if (!Strings.isNullOrEmpty(traceId)) {
            boolQueryBuilder.must().add(QueryBuilders.termQuery(TRACE_ID, traceId));
        }
        switch (traceState) {
            case ERROR:
                mustQueryList.add(QueryBuilders.matchQuery(IS_ERROR, BooleanUtils.TRUE));
                break;
            case SUCCESS:
                mustQueryList.add(QueryBuilders.matchQuery(IS_ERROR, BooleanUtils.FALSE));
                break;
        }
​
        TermsAggregationBuilder builder = AggregationBuilders.terms(TRACE_ID).field(TRACE_ID).size(limit)
            .subAggregation(
                AggregationBuilders.max(LATENCY).field(LATENCY)
            )
            .subAggregation(
                AggregationBuilders.min(START_TIME).field(START_TIME)
            );
        switch (queryOrder) {
            case BY_START_TIME:
                builder.order(BucketOrder.aggregation(START_TIME, false));
                break;
            case BY_DURATION:
                builder.order(BucketOrder.aggregation(LATENCY, false));
                break;
        }
        sourceBuilder.aggregation(builder);
​
        SearchResponse response = getClient().search(ZipkinSpanRecord.INDEX_NAME, sourceBuilder);
​
        TraceBrief traceBrief = new TraceBrief();
​
        Terms terms = response.getAggregations().get(TRACE_ID);
​
        for (Terms.Bucket termsBucket : terms.getBuckets()) {
            BasicTrace basicTrace = new BasicTrace();
​
            basicTrace.setSegmentId(termsBucket.getKeyAsString());
            Min startTime = termsBucket.getAggregations().get(START_TIME);
            Max latency = termsBucket.getAggregations().get(LATENCY);
            basicTrace.setStart(String.valueOf((long)startTime.getValue()));
            basicTrace.getEndpointNames().add("");
            basicTrace.setDuration((int)latency.getValue());
            basicTrace.setError(false);
            basicTrace.getTraceIds().add(termsBucket.getKeyAsString());
            traceBrief.getTraces().add(basicTrace);
        }
​
        return traceBrief;
    }
​
    @Override public List<SegmentRecord> queryByTraceId(String traceId) throws IOException {
        return Collections.emptyList();
    }
​
    @Override public List<org.apache.skywalking.oap.server.core.query.entity.Span> doFlexibleTraceQuery(
        String traceId) throws IOException {
        SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource();
        sourceBuilder.query(QueryBuilders.termQuery(TRACE_ID, traceId));
        sourceBuilder.sort(START_TIME, SortOrder.ASC);
        sourceBuilder.size(1000);
​
        SearchResponse response = getClient().search(ZipkinSpanRecord.INDEX_NAME, sourceBuilder);
​
        List<org.apache.skywalking.oap.server.core.query.entity.Span> spanList = new ArrayList<>();
​
        for (SearchHit searchHit : response.getHits().getHits()) {
            int serviceId = ((Number)searchHit.getSourceAsMap().get(SERVICE_ID)).intValue();
            String dataBinaryBase64 = (String)searchHit.getSourceAsMap().get(SegmentRecord.DATA_BINARY);
            Span span = SpanBytesDecoder.PROTO3.decodeOne(Base64.getDecoder().decode(dataBinaryBase64));
​
            org.apache.skywalking.oap.server.core.query.entity.Span swSpan = new org.apache.skywalking.oap.server.core.query.entity.Span();
​
            swSpan.setTraceId(span.traceId());
            swSpan.setEndpointName(span.name());
            swSpan.setStartTime(span.timestamp() / 1000);
            swSpan.setEndTime(swSpan.getStartTime() + span.durationAsLong() / 1000);
            span.tags().forEach((key, value) -> {
                swSpan.getTags().add(new KeyValue(key, value));
            });
            span.annotations().forEach(annotation -> {
                LogEntity entity = new LogEntity();
                entity.setTime(annotation.timestamp() / 1000);
                entity.getData().add(new KeyValue("annotation", annotation.value()));
                swSpan.getLogs().add(entity);
            });
            if (serviceId != Const.NONE) {
                swSpan.setServiceCode(serviceInventoryCache.get(serviceId).getName());
            }
            swSpan.setSpanId(0);
            swSpan.setParentSpanId(-1);
            swSpan.setSegmentSpanId(span.id());
            swSpan.setSegmentId(span.id());
            Span.Kind kind = span.kind();
            switch (kind) {
                case CLIENT:
                case PRODUCER:
                    swSpan.setType("Entry");
                    break;
                case SERVER:
                case CONSUMER:
                    swSpan.setType("Exit");
                    break;
                default:
                    swSpan.setType("Local");
​
            }
​
            if (StringUtil.isEmpty(span.parentId())) {
                swSpan.setRoot(true);
                swSpan.setSegmentParentSpanId("");
            } else {
                Ref ref = new Ref();
                ref.setTraceId(span.traceId());
                ref.setParentSegmentId(span.parentId());
                ref.setType(RefType.CROSS_PROCESS);
                ref.setParentSpanId(0);
​
                swSpan.getRefs().add(ref);
                swSpan.setSegmentParentSpanId(span.parentId());
            }
            spanList.add(swSpan);
        }
        return spanList;
    }
}
  • ZipkinTraceQueryEsDAO继承了EsDAO并实现了ITraceQueryDAO接口,其queryBasicTraces方法构造SearchSourceBuilder然后执行getClient().search(ZipkinSpanRecord.INDEX_NAME, sourceBuilder),最后将返还结果解析为TraceBrief;其doFlexibleTraceQuery方法根据traceId和start_time构建SearchSourceBuilder,然后执行getClient().search(ZipkinSpanRecord.INDEX_NAME, sourceBuilder),最后将返回结果解析为spanList

小结

ZipkinStorageModuleElasticsearchProvider继承了StorageModuleElasticsearchProvider,其prepare方法创建ZipkinTraceQueryEsDAO,然后注册为ITraceQueryDAO的实现;其notifyAfterCompleted方法执行traceQueryEsDAO.setServiceInventoryCache(getManager().find(CoreModule.NAME).provider().getService(ServiceInventoryCache.class))

doc

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • ZipkinStorageModuleElasticsearchProvider
  • ZipkinTraceQueryEsDAO
  • 小结
  • doc
相关产品与服务
Elasticsearch Service
腾讯云 Elasticsearch Service(ES)是云端全托管海量数据检索分析服务,拥有高性能自研内核,集成X-Pack。ES 支持通过自治索引、存算分离、集群巡检等特性轻松管理集群,也支持免运维、自动弹性、按需使用的 Serverless 模式。使用 ES 您可以高效构建信息检索、日志分析、运维监控等服务,它独特的向量检索还可助您构建基于语义、图像的AI深度应用。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档