前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊skywalking的HTTPAccessLog

聊聊skywalking的HTTPAccessLog

原创
作者头像
code4it
修改2020-04-01 10:14:08
5420
修改2020-04-01 10:14:08
举报
文章被收录于专栏:码匠的流水账码匠的流水账

本文主要研究一下skywalking的HTTPAccessLog

HTTPAccessLog

skywalking-6.6.0/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/HTTPAccessLog.java

代码语言:javascript
复制
@ScopeDeclaration(id = HTTP_ACCESS_LOG, name = "HTTPAccessLog")
public class HTTPAccessLog extends AbstractLog {
    @Override public int scope() {
        return HTTP_ACCESS_LOG;
    }
}
  • HTTPAccessLog继承了AbstractLog,其scope方法返回的是HTTP_ACCESS_LOG

AbstractLog

skywalking-6.6.0/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/AbstractLog.java

代码语言:javascript
复制
@Setter
@Getter
public abstract class AbstractLog extends Source {
    private long timeBucket;
    private long timestamp;
    private int serviceId;
    private int serviceInstanceId;
    private int endpointId;
    private String traceId;
    private int isError;
    private String statusCode;
    private ContentType contentType = ContentType.NONE;
    private String content;
​
    @Override public String getEntityId() {
        throw new UnexpectedException("getEntityId is not supported in AbstractLog source");
    }
}
  • AbstractLog继承了Source,它定义了timeBucket、timestamp、serviceId、serviceInstanceId、endpointId、traceId、isError、statusCode、contentType、content属性

HTTPAccessLogDispatcher

skywalking-6.6.0/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/log/HTTPAccessLogDispatcher.java

代码语言:javascript
复制
public class HTTPAccessLogDispatcher implements SourceDispatcher<HTTPAccessLog> {
​
    @Override public void dispatch(HTTPAccessLog source) {
        HTTPAccessLogRecord record = new HTTPAccessLogRecord();
        record.setTimestamp(source.getTimestamp());
        record.setTimeBucket(source.getTimeBucket());
        record.setServiceId(source.getServiceId());
        record.setServiceInstanceId(source.getServiceInstanceId());
        record.setEndpointId(source.getEndpointId());
        record.setTraceId(source.getTraceId());
        record.setIsError(source.getIsError());
        record.setStatusCode(source.getStatusCode());
        record.setContentType(source.getContentType().value());
        record.setContent(source.getContent());
​
        RecordStreamProcessor.getInstance().in(record);
    }
}
  • HTTPAccessLogDispatcher实现了SourceDispatcher接口,其dispatch将HTTPAccessLog转换为HTTPAccessLogRecord,然后执行RecordStreamProcessor.getInstance().in(record)

RecordStreamProcessor

skywalking-6.6.0/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/RecordStreamProcessor.java

代码语言:javascript
复制
public class RecordStreamProcessor implements StreamProcessor<Record> {
​
    private final static RecordStreamProcessor PROCESSOR = new RecordStreamProcessor();
​
    private Map<Class<? extends Record>, RecordPersistentWorker> workers = new HashMap<>();
​
    public static RecordStreamProcessor getInstance() {
        return PROCESSOR;
    }
​
    public void in(Record record) {
        RecordPersistentWorker worker = workers.get(record.getClass());
        if (worker != null) {
            worker.in(record);
        }
    }
​
    @SuppressWarnings("unchecked")
    public void create(ModuleDefineHolder moduleDefineHolder, Stream stream, Class<? extends Record> recordClass) {
        if (DisableRegister.INSTANCE.include(stream.name())) {
            return;
        }
​
        StorageDAO storageDAO = moduleDefineHolder.find(StorageModule.NAME).provider().getService(StorageDAO.class);
        IRecordDAO recordDAO;
        try {
            recordDAO = storageDAO.newRecordDao(stream.builder().newInstance());
        } catch (InstantiationException | IllegalAccessException e) {
            throw new UnexpectedException("Create " + stream.builder().getSimpleName() + " record DAO failure.", e);
        }
​
        IModelSetter modelSetter = moduleDefineHolder.find(CoreModule.NAME).provider().getService(IModelSetter.class);
        Model model = modelSetter.putIfAbsent(recordClass, stream.scopeId(), new Storage(stream.name(), true, true, Downsampling.Second), true);
        RecordPersistentWorker persistentWorker = new RecordPersistentWorker(moduleDefineHolder, model, recordDAO);
​
        workers.put(recordClass, persistentWorker);
    }
}
  • RecordStreamProcessor实现了StreamProcessor接口,其in方法从workers中找出record.getClass()对应的RecordPersistentWorker,然后执行其in方法

RecordPersistentWorker

skywalking-6.6.0/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/RecordPersistentWorker.java

代码语言:javascript
复制
public class RecordPersistentWorker extends AbstractWorker<Record> {
​
    private static final Logger logger = LoggerFactory.getLogger(RecordPersistentWorker.class);
​
    private final Model model;
    private final IRecordDAO recordDAO;
    private final IBatchDAO batchDAO;
​
    RecordPersistentWorker(ModuleDefineHolder moduleDefineHolder, Model model, IRecordDAO recordDAO) {
        super(moduleDefineHolder);
        this.model = model;
        this.recordDAO = recordDAO;
        this.batchDAO = moduleDefineHolder.find(StorageModule.NAME).provider().getService(IBatchDAO.class);
    }
​
    @Override public void in(Record record) {
        try {
            InsertRequest insertRequest = recordDAO.prepareBatchInsert(model, record);
            batchDAO.asynchronous(insertRequest);
        } catch (IOException e) {
            logger.error(e.getMessage(), e);
        }
    }
}
  • RecordPersistentWorker继承了AbstractWorker,其in方法执行recordDAO.prepareBatchInsert(model, record),然后用返回的insertRequest执行batchDAO.asynchronous(insertRequest)

小结

HTTPAccessLog继承了AbstractLog,其scope方法返回的是HTTP_ACCESS_LOG;HTTPAccessLogDispatcher实现了SourceDispatcher接口,其dispatch将HTTPAccessLog转换为HTTPAccessLogRecord,然后执行RecordStreamProcessor.getInstance().in(record)

doc

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • HTTPAccessLog
  • AbstractLog
  • HTTPAccessLogDispatcher
  • RecordStreamProcessor
  • RecordPersistentWorker
  • 小结
  • doc
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档