专栏首页码匠的流水账聊聊skywalking的HTTPAccessLog

聊聊skywalking的HTTPAccessLog

本文主要研究一下skywalking的HTTPAccessLog

HTTPAccessLog

skywalking-6.6.0/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/HTTPAccessLog.java

@ScopeDeclaration(id = HTTP_ACCESS_LOG, name = "HTTPAccessLog")
public class HTTPAccessLog extends AbstractLog {
    @Override public int scope() {
        return HTTP_ACCESS_LOG;
    }
}
  • HTTPAccessLog继承了AbstractLog,其scope方法返回的是HTTP_ACCESS_LOG

AbstractLog

skywalking-6.6.0/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/AbstractLog.java

@Setter
@Getter
public abstract class AbstractLog extends Source {
    private long timeBucket;
    private long timestamp;
    private int serviceId;
    private int serviceInstanceId;
    private int endpointId;
    private String traceId;
    private int isError;
    private String statusCode;
    private ContentType contentType = ContentType.NONE;
    private String content;

    @Override public String getEntityId() {
        throw new UnexpectedException("getEntityId is not supported in AbstractLog source");
    }
}
  • AbstractLog继承了Source,它定义了timeBucket、timestamp、serviceId、serviceInstanceId、endpointId、traceId、isError、statusCode、contentType、content属性

HTTPAccessLogDispatcher

skywalking-6.6.0/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/log/HTTPAccessLogDispatcher.java

public class HTTPAccessLogDispatcher implements SourceDispatcher<HTTPAccessLog> {

    @Override public void dispatch(HTTPAccessLog source) {
        HTTPAccessLogRecord record = new HTTPAccessLogRecord();
        record.setTimestamp(source.getTimestamp());
        record.setTimeBucket(source.getTimeBucket());
        record.setServiceId(source.getServiceId());
        record.setServiceInstanceId(source.getServiceInstanceId());
        record.setEndpointId(source.getEndpointId());
        record.setTraceId(source.getTraceId());
        record.setIsError(source.getIsError());
        record.setStatusCode(source.getStatusCode());
        record.setContentType(source.getContentType().value());
        record.setContent(source.getContent());

        RecordStreamProcessor.getInstance().in(record);
    }
}
  • HTTPAccessLogDispatcher实现了SourceDispatcher接口,其dispatch将HTTPAccessLog转换为HTTPAccessLogRecord,然后执行RecordStreamProcessor.getInstance().in(record)

RecordStreamProcessor

skywalking-6.6.0/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/RecordStreamProcessor.java

public class RecordStreamProcessor implements StreamProcessor<Record> {

    private final static RecordStreamProcessor PROCESSOR = new RecordStreamProcessor();

    private Map<Class<? extends Record>, RecordPersistentWorker> workers = new HashMap<>();

    public static RecordStreamProcessor getInstance() {
        return PROCESSOR;
    }

    public void in(Record record) {
        RecordPersistentWorker worker = workers.get(record.getClass());
        if (worker != null) {
            worker.in(record);
        }
    }

    @SuppressWarnings("unchecked")
    public void create(ModuleDefineHolder moduleDefineHolder, Stream stream, Class<? extends Record> recordClass) {
        if (DisableRegister.INSTANCE.include(stream.name())) {
            return;
        }

        StorageDAO storageDAO = moduleDefineHolder.find(StorageModule.NAME).provider().getService(StorageDAO.class);
        IRecordDAO recordDAO;
        try {
            recordDAO = storageDAO.newRecordDao(stream.builder().newInstance());
        } catch (InstantiationException | IllegalAccessException e) {
            throw new UnexpectedException("Create " + stream.builder().getSimpleName() + " record DAO failure.", e);
        }

        IModelSetter modelSetter = moduleDefineHolder.find(CoreModule.NAME).provider().getService(IModelSetter.class);
        Model model = modelSetter.putIfAbsent(recordClass, stream.scopeId(), new Storage(stream.name(), true, true, Downsampling.Second), true);
        RecordPersistentWorker persistentWorker = new RecordPersistentWorker(moduleDefineHolder, model, recordDAO);

        workers.put(recordClass, persistentWorker);
    }
}
  • RecordStreamProcessor实现了StreamProcessor接口,其in方法从workers中找出record.getClass()对应的RecordPersistentWorker,然后执行其in方法

RecordPersistentWorker

skywalking-6.6.0/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/RecordPersistentWorker.java

public class RecordPersistentWorker extends AbstractWorker<Record> {

    private static final Logger logger = LoggerFactory.getLogger(RecordPersistentWorker.class);

    private final Model model;
    private final IRecordDAO recordDAO;
    private final IBatchDAO batchDAO;

    RecordPersistentWorker(ModuleDefineHolder moduleDefineHolder, Model model, IRecordDAO recordDAO) {
        super(moduleDefineHolder);
        this.model = model;
        this.recordDAO = recordDAO;
        this.batchDAO = moduleDefineHolder.find(StorageModule.NAME).provider().getService(IBatchDAO.class);
    }

    @Override public void in(Record record) {
        try {
            InsertRequest insertRequest = recordDAO.prepareBatchInsert(model, record);
            batchDAO.asynchronous(insertRequest);
        } catch (IOException e) {
            logger.error(e.getMessage(), e);
        }
    }
}
  • RecordPersistentWorker继承了AbstractWorker,其in方法执行recordDAO.prepareBatchInsert(model, record),然后用返回的insertRequest执行batchDAO.asynchronous(insertRequest)

小结

HTTPAccessLog继承了AbstractLog,其scope方法返回的是HTTP_ACCESS_LOG;HTTPAccessLogDispatcher实现了SourceDispatcher接口,其dispatch将HTTPAccessLog转换为HTTPAccessLogRecord,然后执行RecordStreamProcessor.getInstance().in(record)

doc

  • HTTPAccessLog

本文分享自微信公众号 - 码匠的流水账(geek_luandun),作者:码匠乱炖

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2020-03-31

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • 聊聊skywalking的HTTPAccessLog

    skywalking-6.6.0/oap-server/server-core/src/main/java/org/apache/skywalking/oap/...

    codecraft
  • 聊聊apache gossip的ActiveGossiper

    incubator-retired-gossip/gossip-base/src/main/java/org/apache/gossip/manager/Abs...

    codecraft
  • 聊聊apache gossip的ActiveGossiper

    incubator-retired-gossip/gossip-base/src/main/java/org/apache/gossip/manager/Abs...

    codecraft
  • 从0.5到1写个rpc框架 - 2:远程服务调用(grpc)

    gRPC是Google开源的跨语言远程服务调用(RPC)框架,通信协议用的HTTP/2,数据传输默认用的protocol buffers(一种轻便高效的结构化数...

    acupt
  • 扩展 Object.assign 实现深拷贝

    但深拷贝,它是基于一个原对象,完完整整拷贝一份新对象出来,假如我们的需求是要将原对象上的属性完完整整拷贝到另外一个已存在的对象上,这时候深拷贝就有点无能为力了。

    请叫我大苏
  • 基于zookeeper leader选举方式一

    一,基本介绍 Curator Framework是一个针对zookeeper做的搞层次的API,极大地简化了zookeeper的使用。它基于zookeeper构...

    Spark学习技巧
  • Spring Cloud Eureka动态扩容

    前言 本文心得包括案例基于《重新定义》,动态扩容Eureka是为了不影响已运行服务的情况下进行扩容。

    胖虎
  • 计算机通信流程

    参考地址:https://www.processon.com/view/5d784083e4b01080c73b9ca8

    Dreamy.TZK
  • 大规模群消息推送如何保证实时性?

    第一版红包功能上线后,收集到不少问题。核心问题是消息延迟,导致有些人先看到红包,有些人晚看到红包,同时导致消息顺序混乱。

    普通程序员
  • 网络编程懒人入门(九):通俗讲解,有了IP地址,为何还要用MAC地址?

    标题虽然是为了解释有了 IP 地址,为什么还要用 MAC 地址,但是本文的重点在于理解为什么要有 IP 这样的东西。本文对读者的定位是知道 MAC 地址是什么,...

    JackJiang

扫码关注云+社区

领取腾讯云代金券