首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊skywalking的TopNDatabaseStatement

聊聊skywalking的TopNDatabaseStatement

原创
作者头像
code4it
修改2020-04-03 10:47:25
7210
修改2020-04-03 10:47:25
举报

本文主要研究一下skywalking的TopNDatabaseStatement

TopNDatabaseStatement

skywalking-6.6.0/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/database/TopNDatabaseStatement.java

@Stream(name = TopNDatabaseStatement.INDEX_NAME, scopeId = DefaultScopeDefine.DATABASE_SLOW_STATEMENT, builder = TopNDatabaseStatement.Builder.class, processor = TopNStreamProcessor.class)
public class TopNDatabaseStatement extends TopN {
​
    public static final String INDEX_NAME = "top_n_database_statement";
​
    @Setter private String id;
​
    @Override public String id() {
        return id;
    }
​
    @Override public boolean equals(Object o) {
        if (this == o)
            return true;
        if (o == null || getClass() != o.getClass())
            return false;
        TopNDatabaseStatement statement = (TopNDatabaseStatement)o;
        return getServiceId() == statement.getServiceId();
    }
​
    @Override public int hashCode() {
        return Objects.hash(getServiceId());
    }
​
    public static class Builder implements StorageBuilder<TopNDatabaseStatement> {
​
        @Override public TopNDatabaseStatement map2Data(Map<String, Object> dbMap) {
            TopNDatabaseStatement statement = new TopNDatabaseStatement();
            statement.setStatement((String)dbMap.get(STATEMENT));
            statement.setTraceId((String)dbMap.get(TRACE_ID));
            statement.setLatency(((Number)dbMap.get(LATENCY)).longValue());
            statement.setServiceId(((Number)dbMap.get(SERVICE_ID)).intValue());
            statement.setTimeBucket(((Number)dbMap.get(TIME_BUCKET)).longValue());
            return statement;
        }
​
        @Override public Map<String, Object> data2Map(TopNDatabaseStatement storageData) {
            Map<String, Object> map = new HashMap<>();
            map.put(STATEMENT, storageData.getStatement());
            map.put(TRACE_ID, storageData.getTraceId());
            map.put(LATENCY, storageData.getLatency());
            map.put(SERVICE_ID, storageData.getServiceId());
            map.put(TIME_BUCKET, storageData.getTimeBucket());
            return map;
        }
    }
}
  • TopNDatabaseStatement继承了TopN,定义了Builder

TopN

skywalking-6.6.0/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/topn/TopN.java

public abstract class TopN extends Record implements ComparableStorageData {
    public static final String STATEMENT = "statement";
    public static final String LATENCY = "latency";
    public static final String TRACE_ID = "trace_id";
    public static final String SERVICE_ID = "service_id";
​
    @Getter @Setter @Column(columnName = STATEMENT, content = true) private String statement;
    @Getter @Setter @Column(columnName = LATENCY) private long latency;
    @Getter @Setter @Column(columnName = TRACE_ID) private String traceId;
    @Getter @Setter @Column(columnName = SERVICE_ID) private int serviceId;
​
    @Override public int compareTo(Object o) {
        TopN target = (TopN)o;
        return (int)(latency - target.latency);
    }
}
  • TopN定义了statement、latency、trace_id、service_id属性

DatabaseStatementDispatcher

skywalking-6.6.0/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/database/DatabaseStatementDispatcher.java

public class DatabaseStatementDispatcher implements SourceDispatcher<DatabaseSlowStatement> {
​
    @Override public void dispatch(DatabaseSlowStatement source) {
        TopNDatabaseStatement statement = new TopNDatabaseStatement();
        statement.setId(source.getId());
        statement.setServiceId(source.getDatabaseServiceId());
        statement.setLatency(source.getLatency());
        statement.setStatement(source.getStatement());
        statement.setTimeBucket(source.getTimeBucket());
        statement.setTraceId(source.getTraceId());
​
        TopNStreamProcessor.getInstance().in(statement);
    }
}
  • DatabaseStatementDispatcher实现了SourceDispatcher接口,其dispatch方法将DatabaseSlowStatement转换为TopNDatabaseStatement,然后执行TopNStreamProcessor.getInstance().in(statement)

TopNStreamProcessor

skywalking-6.6.0/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNStreamProcessor.java

public class TopNStreamProcessor implements StreamProcessor<TopN> {
​
    private static final TopNStreamProcessor PROCESSOR = new TopNStreamProcessor();
​
    @Getter private List<TopNWorker> persistentWorkers = new ArrayList<>();
    private Map<Class<? extends Record>, TopNWorker> workers = new HashMap<>();
    @Setter @Getter private int topNWorkerReportCycle = 10;
    @Setter @Getter private int topSize = 50;
​
    public static TopNStreamProcessor getInstance() {
        return PROCESSOR;
    }
​
    @SuppressWarnings("unchecked")
    public void create(ModuleDefineHolder moduleDefineHolder, Stream stream, Class<? extends TopN> topNClass) {
        if (DisableRegister.INSTANCE.include(stream.name())) {
            return;
        }
​
        StorageDAO storageDAO = moduleDefineHolder.find(StorageModule.NAME).provider().getService(StorageDAO.class);
        IRecordDAO recordDAO;
        try {
            recordDAO = storageDAO.newRecordDao(stream.builder().newInstance());
        } catch (InstantiationException | IllegalAccessException e) {
            throw new UnexpectedException("Create " + stream.builder().getSimpleName() + " top n record DAO failure.", e);
        }
​
        IModelSetter modelSetter = moduleDefineHolder.find(CoreModule.NAME).provider().getService(IModelSetter.class);
        Model model = modelSetter.putIfAbsent(topNClass, stream.scopeId(), new Storage(stream.name(), true, true, Downsampling.Second), true);
​
        TopNWorker persistentWorker = new TopNWorker(moduleDefineHolder, model, topSize, topNWorkerReportCycle * 60 * 1000L, recordDAO);
        persistentWorkers.add(persistentWorker);
        workers.put(topNClass, persistentWorker);
    }
​
    public void in(TopN topN) {
        TopNWorker worker = workers.get(topN.getClass());
        if (worker != null) {
            worker.in(topN);
        }
    }
}
  • TopNStreamProcessor实现了StreamProcessor接口,其in方法从workers中获取TopNWorker,执行worker.in(topN)

小结

TopNDatabaseStatement继承了TopN,定义了Builder;DatabaseStatementDispatcher实现了SourceDispatcher接口,其dispatch方法将DatabaseSlowStatement转换为TopNDatabaseStatement,然后执行TopNStreamProcessor.getInstance().in(statement)

doc

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • TopNDatabaseStatement
  • TopN
  • DatabaseStatementDispatcher
  • TopNStreamProcessor
  • 小结
  • doc
相关产品与服务
数据库
云数据库为企业提供了完善的关系型数据库、非关系型数据库、分析型数据库和数据库生态工具。您可以通过产品选择和组合搭建,轻松实现高可靠、高可用性、高性能等数据库需求。云数据库服务也可大幅减少您的运维工作量,更专注于业务发展,让企业一站式享受数据上云及分布式架构的技术红利!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档