前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊canal的PrometheusService

聊聊canal的PrometheusService

作者头像
code4it
发布2020-04-14 14:44:16
9630
发布2020-04-14 14:44:16
举报
文章被收录于专栏:码匠的流水账

本文主要研究一下canal的PrometheusService

CanalMetricsService

canal-1.1.4/server/src/main/java/com/alibaba/otter/canal/spi/CanalMetricsService.java

代码语言:javascript
复制
public interface CanalMetricsService {

    /**
     * Initialization on canal server startup.
     */
    void initialize();

    /**
     * Clean-up at canal server stop phase.
     */
    void terminate();

    /**
     * @return {@code true} if the metrics service is running, otherwise {@code false}.
     */
    boolean isRunning();

    /**
     * Register instance level metrics for specified instance.
     * @param instance {@link CanalInstance}
     */
    void register(CanalInstance instance);

    /**
     * Unregister instance level metrics for specified instance.
     * @param instance {@link CanalInstance}
     */
    void unregister(CanalInstance instance);

    /**
     * @param port server port for pull
     */
    void setServerPort(int port);

}
  • CanalMetricsService接口定义了initialize、terminate、isRunning、register、unregister、setServerPort方法

PrometheusService

canal-1.1.4/prometheus/src/main/java/com/alibaba/otter/canal/prometheus/PrometheusService.java

代码语言:javascript
复制
public class PrometheusService implements CanalMetricsService {

    private static final Logger          logger          = LoggerFactory.getLogger(PrometheusService.class);
    private final CanalInstanceExports   instanceExports;
    private volatile boolean             running         = false;
    private int                          port;
    private HTTPServer                   server;
    private final ClientInstanceProfiler clientProfiler;

    private PrometheusService() {
        this.instanceExports = CanalInstanceExports.instance();
        this.clientProfiler = PrometheusClientInstanceProfiler.instance();
    }

    private static class SingletonHolder {
        private static final PrometheusService SINGLETON = new PrometheusService();
    }

    public static PrometheusService getInstance() {
        return SingletonHolder.SINGLETON;
    }

    @Override
    public void initialize() {
        try {
            logger.info("Start prometheus HTTPServer on port {}.", port);
            //TODO 2.Https?
            server = new HTTPServer(port);
        } catch (IOException e) {
            logger.warn("Unable to start prometheus HTTPServer.", e);
            return;
        }
        try {
            // JVM exports
            DefaultExports.initialize();
            instanceExports.initialize();
            if (!clientProfiler.isStart()) {
                clientProfiler.start();
            }
            profiler().setInstanceProfiler(clientProfiler);
        } catch (Throwable t) {
            logger.warn("Unable to initialize server exports.", t);
        }

        running = true;
    }

    @Override
    public void terminate() {
        running = false;
        try {
            instanceExports.terminate();
            if (clientProfiler.isStart()) {
                clientProfiler.stop();
            }
            profiler().setInstanceProfiler(NOP);
            if (server != null) {
                server.stop();
            }
        } catch (Throwable t) {
            logger.warn("Something happened while terminating.", t);
        }
    }

    @Override
    public boolean isRunning() {
        return running;
    }

    @Override
    public void register(CanalInstance instance) {
        if (instance.isStart()) {
            logger.warn("Cannot register metrics for destination {} that is running.", instance.getDestination());
            return;
        }
        try {
            instanceExports.register(instance);
        } catch (Throwable t) {
            logger.warn("Unable to register instance exports for {}.", instance.getDestination(), t);
        }
        logger.info("Register metrics for destination {}.", instance.getDestination());
    }

    @Override
    public void unregister(CanalInstance instance) {
        if (instance.isStart()) {
            logger.warn("Try unregister metrics after destination {} is stopped.", instance.getDestination());
        }
        try {
            instanceExports.unregister(instance);
        } catch (Throwable t) {
            logger.warn("Unable to unregister instance exports for {}.", instance.getDestination(), t);
        }
        logger.info("Unregister metrics for destination {}.", instance.getDestination());
    }

    @Override
    public void setServerPort(int port) {
        this.port = port;
    }

}
  • PrometheusService实现了CanalMetricsService接口,其构造器单例构造CanalInstanceExports及ClientInstanceProfiler;其initialize创建HTTPServer,然后执行DefaultExports.initialize()、instanceExports.initialize()及clientProfiler.start();其terminate方法执行instanceExports.terminate()及clientProfiler.stop();其register方法执行instanceExports.register(instance);其unregister方法执行instanceExports.unregister(instance)

DefaultExports

simpleclient_hotspot-0.4.0-sources.jar!/io/prometheus/client/hotspot/DefaultExports.java

代码语言:javascript
复制
public class DefaultExports {
  private static boolean initialized = false;
  /**
   * Register the default Hotspot collectors.
   */
  public static synchronized void initialize() {
    if (!initialized) {
      new StandardExports().register();
      new MemoryPoolsExports().register();
      new BufferPoolsExports().register();
      new GarbageCollectorExports().register();
      new ThreadExports().register();
      new ClassLoadingExports().register();
      new VersionInfoExports().register();
      initialized = true;
    }
  }

}
  • DefaultExports的initialize方法注册了StandardExports、MemoryPoolsExports、BufferPoolsExports、GarbageCollectorExports、ThreadExports、ClassLoadingExports、VersionInfoExports

CanalInstanceExports

canal-1.1.4/prometheus/src/main/java/com/alibaba/otter/canal/prometheus/CanalInstanceExports.java

代码语言:javascript
复制
public class CanalInstanceExports {

    private static final Logger      logger           = LoggerFactory.getLogger(CanalInstanceExports.class);
    public static final String       DEST             = "destination";
    public static final String[]     DEST_LABELS      = {DEST};
    public static final List<String> DEST_LABELS_LIST = Collections.singletonList(DEST);
    private final Collector          storeCollector;
    private final Collector          entryCollector;
    private final Collector          metaCollector;
    private final Collector          sinkCollector;
    private final Collector          parserCollector;

    private CanalInstanceExports() {
        this.storeCollector = StoreCollector.instance();
        this.entryCollector = EntryCollector.instance();
        this.metaCollector = MetaCollector.instance();
        this.sinkCollector = SinkCollector.instance();
        this.parserCollector = ParserCollector.instance();
    }

    private static class SingletonHolder {
        private static final CanalInstanceExports SINGLETON = new CanalInstanceExports();
    }

    public static CanalInstanceExports instance() {
        return SingletonHolder.SINGLETON;
    }

    public void initialize() {
        storeCollector.register();
        entryCollector.register();
        metaCollector.register();
        sinkCollector.register();
        parserCollector.register();
    }

    public void terminate() {
        CollectorRegistry.defaultRegistry.unregister(storeCollector);
        CollectorRegistry.defaultRegistry.unregister(entryCollector);
        CollectorRegistry.defaultRegistry.unregister(metaCollector);
        CollectorRegistry.defaultRegistry.unregister(sinkCollector);
        CollectorRegistry.defaultRegistry.unregister(parserCollector);
    }

    void register(CanalInstance instance) {
        requiredInstanceRegistry(storeCollector).register(instance);
        requiredInstanceRegistry(entryCollector).register(instance);
        requiredInstanceRegistry(metaCollector).register(instance);
        requiredInstanceRegistry(sinkCollector).register(instance);
        requiredInstanceRegistry(parserCollector).register(instance);
        logger.info("Successfully register metrics for instance {}.", instance.getDestination());
    }

    void unregister(CanalInstance instance) {
        requiredInstanceRegistry(storeCollector).unregister(instance);
        requiredInstanceRegistry(entryCollector).unregister(instance);
        requiredInstanceRegistry(metaCollector).unregister(instance);
        requiredInstanceRegistry(sinkCollector).unregister(instance);
        requiredInstanceRegistry(parserCollector).unregister(instance);
        logger.info("Successfully unregister metrics for instance {}.", instance.getDestination());
    }

    private InstanceRegistry requiredInstanceRegistry(Collector collector) {
        if (!(collector instanceof InstanceRegistry)) {
            throw new IllegalArgumentException("Canal prometheus collector need to implement InstanceRegistry.");
        }
        return (InstanceRegistry) collector;
    }

}
  • CanalInstanceExports的构造器单例创建了storeCollector、entryCollector、metaCollector、sinkCollector、parserCollector;其initialize方法执行这几个collector的register方法;其register方法将instance注册到这几个collector;其unregister方法将instance从这几个collector注销掉;其terminate方法将这几个collector从CollectorRegistry.defaultRegistry中注销掉

PrometheusClientInstanceProfiler

canal-1.1.4/prometheus/src/main/java/com/alibaba/otter/canal/prometheus/impl/PrometheusClientInstanceProfiler.java

代码语言:javascript
复制
public class PrometheusClientInstanceProfiler implements ClientInstanceProfiler {

    private static final long   NANO_PER_MILLI = 1000 * 1000L;
    private static final String PACKET_TYPE    = "canal_instance_client_packets";
    private static final String OUTBOUND_BYTES = "canal_instance_client_bytes";
    private static final String EMPTY_BATCHES  = "canal_instance_client_empty_batches";
    private static final String ERRORS         = "canal_instance_client_request_error";
    private static final String LATENCY        = "canal_instance_client_request_latency";
    private final Counter       outboundCounter;
    private final Counter       packetsCounter;
    private final Counter       emptyBatchesCounter;
    private final Counter       errorsCounter;
    private final Histogram     responseLatency;
    private volatile boolean    running        = false;

    private static class SingletonHolder {
        private static final PrometheusClientInstanceProfiler SINGLETON = new PrometheusClientInstanceProfiler();
    }

    public static PrometheusClientInstanceProfiler instance() {
        return SingletonHolder.SINGLETON;
    }

    private PrometheusClientInstanceProfiler() {
        this.outboundCounter = Counter.build()
                .labelNames(DEST_LABELS)
                .name(OUTBOUND_BYTES)
                .help("Total bytes sent to client.")
                .create();
        this.packetsCounter = Counter.build()
                .labelNames(new String[]{DEST, "packetType"})
                .name(PACKET_TYPE)
                .help("Total packets sent to client.")
                .create();
        this.emptyBatchesCounter = Counter.build()
                .labelNames(DEST_LABELS)
                .name(EMPTY_BATCHES)
                .help("Total empty batches sent to client.")
                .create();
        this.errorsCounter = Counter.build()
                .labelNames(new String[]{DEST, "errorCode"})
                .name(ERRORS)
                .help("Total client request errors.")
                .create();
        this.responseLatency = Histogram.build()
                .labelNames(DEST_LABELS)
                .name(LATENCY)
                .help("Client request latency.")
                // buckets in milliseconds
                .buckets(2.5, 10.0, 25.0, 100.0)
                .create();
    }

    @Override
    public void profiling(ClientRequestResult result) {
        String destination = result.getDestination();
        PacketType type = result.getType();
        outboundCounter.labels(destination).inc(result.getAmount());
        short errorCode = result.getErrorCode();
        if (errorCode > 0) {
            errorsCounter.labels(destination, Short.toString(errorCode)).inc();
        }
        long latency = result.getLatency();
        responseLatency.labels(destination).observe(((double) latency) / NANO_PER_MILLI);
        switch (type) {
            case GET:
                boolean empty = result.getEmpty();
                // 区分一下空包
                if (empty) {
                    emptyBatchesCounter.labels(destination).inc();
                } else {
                    packetsCounter.labels(destination, type.name()).inc();
                }
                break;
            // reserve for others
            default:
                packetsCounter.labels(destination, type.name()).inc();
                break;
        }
    }

    @Override
    public void start() {
        if (outboundCounter != null) {
            outboundCounter.register();
        }
        if (packetsCounter != null) {
            packetsCounter.register();
        }
        if (emptyBatchesCounter != null) {
            emptyBatchesCounter.register();
        }
        if (errorsCounter != null) {
            errorsCounter.register();
        }
        if (responseLatency != null) {
            responseLatency.register();
        }
        running = true;
    }

    @Override
    public void stop() {
        running = false;
        if (outboundCounter != null) {
            CollectorRegistry.defaultRegistry.unregister(outboundCounter);
        }
        if (packetsCounter != null) {
            CollectorRegistry.defaultRegistry.unregister(packetsCounter);
        }
        if (emptyBatchesCounter != null) {
            CollectorRegistry.defaultRegistry.unregister(emptyBatchesCounter);
        }
        if (errorsCounter != null) {
            CollectorRegistry.defaultRegistry.unregister(errorsCounter);
        }
        if (responseLatency != null) {
            CollectorRegistry.defaultRegistry.unregister(responseLatency);
        }
    }

    @Override
    public boolean isStart() {
        return running;
    }
}
  • PrometheusClientInstanceProfiler的构造器创建了outboundCounter、packetsCounter、emptyBatchesCounter、errorsCounter、responseLatency这几个metrics;其start方法分别注册这几个metrics;其stop方法将这几个metrics从CollectorRegistry.defaultRegistry注销掉;其profiling方法根据ClientRequestResult来更新这些metrics

小结

PrometheusService实现了CanalMetricsService接口,其构造器单例构造CanalInstanceExports及ClientInstanceProfiler;其initialize创建HTTPServer,然后执行DefaultExports.initialize()、instanceExports.initialize()及clientProfiler.start();其terminate方法执行instanceExports.terminate()及clientProfiler.stop();其register方法执行instanceExports.register(instance);其unregister方法执行instanceExports.unregister(instance)

doc

  • PrometheusService
本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2020-04-11,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 码匠的流水账 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • CanalMetricsService
  • PrometheusService
  • DefaultExports
  • CanalInstanceExports
  • PrometheusClientInstanceProfiler
  • 小结
  • doc
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档