前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Impala metrics之statestore-subscriber

Impala metrics之statestore-subscriber

作者头像
skyyws
发布2022-05-20 08:50:24
4290
发布2022-05-20 08:50:24
举报

文章目录

本文主要梳理一下Impala的“statestore-subscriber”相关的metrics,这类metrics主要是在catalog和impalad上存在。目前主要分为两种类型,下面来简单看一下。

固定的metrics

第一类就是几个固定的metrics,如下所示:

代码语言:javascript
复制
statestore-subscriber.connected
statestore-subscriber.heartbeat-interval-time
statestore-subscriber.last-recovery-duration
statestore-subscriber.last-recovery-time
statestore-subscriber.num-connection-failures

这些metrics都是在statestore的subscriber启动之后直接注册的。我们可以直接从web页面看到这些metrics的描述,也比较简单,不是本文的重点,这里不再展开:

1
1

我们这里主要来看一下第二类metrics。

Topic相关的metrics

第二类metrics主要是topic相关的,在介绍这类metrics之前,我们先简单看下目前Impala的几种metrics。

Statestore topic介绍

目前Impala一共有三类topic,subscriber可以向statestore注册,如下所示:

2
2

分别是:

  • impala-membership,成员信息同步,包括各个backends的相关信息;
  • impala-request-queue,资源队列相关的信息;
  • catalog-update,元数据相关的信息;

关于每个topic的详细信息,这里不再展开,后续有机会再详细介绍一下。当各个节点启动之后,就会注册相应的topic:

代码语言:javascript
复制
//catalog-server.cc
Status status = statestore_subscriber_->AddTopic(IMPALA_CATALOG_TOPIC,
    /* is_transient=*/ false, /* populate_min_subscriber_topic_version=*/ false,
    filter_prefix, cb);
//impala-server.cc
ABORT_IF_ERROR(exec_env->subscriber()->AddTopic(
   CatalogServer::IMPALA_CATALOG_TOPIC, /* is_transient=*/ true,
   /* populate_min_subscriber_topic_version=*/ true,
   filter_prefix, catalog_cb));
    
//admission-controller.cc
Status status = subscriber_->AddTopic(Statestore::IMPALA_REQUEST_QUEUE_TOPIC,
   /* is_transient=*/true, /* populate_min_subscriber_topic_version=*/false,
   /* filter_prefix=*/"", cb);

//cluster-membership-mgr.cc
Status status = statestore_subscriber_->AddTopic(
    Statestore::IMPALA_MEMBERSHIP_TOPIC, /* is_transient=*/ true,
    /* populate_min_subscriber_topic_version=*/ false,
    /* filter_prefix= */"", cb);

可以看到,这里impalad(coordinator角色才会注册,executro则不会)和catalogd都注册了catalog-update,另外两个topic,只有impalad会注册。

Topic update的metrics

接下来我们就看下topic update相关的metrics。我们可以在metrics.json文件中找到这两类模板,如下所示:

代码语言:javascript
复制
statestore-subscriber.topic-$0.processing-time-s
statestore-subscriber.topic-$0.update-interval

从描述来看,就是topic update的处理时间和时间间隔。下面我们就结合代码来实际来看一下:

代码语言:javascript
复制
//statestore-subscriber.cc
// Template for metrics that measure the processing time for individual topics.
const string CALLBACK_METRIC_PATTERN = "statestore-subscriber.topic-$0.processing-time-s";

// Template for metrics that measure the interval between updates for individual topics.
const string UPDATE_INTERVAL_METRIC_PATTERN = "statestore-subscriber.topic-$0.update-interval";

topic的注册流程代码如下所示:

代码语言:javascript
复制
//statestore-subscriber.cc
Status StatestoreSubscriber::AddTopic(const Statestore::TopicId& topic_id,
    bool is_transient, bool populate_min_subscriber_topic_version,
    string filter_prefix, const UpdateCallback& callback) {
  //省略部分代码
  if (registration.processing_time_metric == nullptr) {
    registration.processing_time_metric = StatsMetric<double>::CreateAndRegister(metrics_,
        CALLBACK_METRIC_PATTERN, topic_id);
    registration.update_interval_metric = StatsMetric<double>::CreateAndRegister(metrics_,
        UPDATE_INTERVAL_METRIC_PATTERN, topic_id);
    registration.update_interval_timer.Start();
  }
  //省略部分代码
}

目前一共有三个topic,我们在上面已经介绍过了。对于coordinator节点来说,启动之后,针对这两类topic metrics模板,就会有6个具体的metrics,如下所示:

3
3
Metrics更新

主要的更新处理逻辑位于函数StatestoreSubscriber::UpdateState()中。该函数的主要功能就是当进程接受到statestore发来的topic update信息时,就调用该函数,进行实际的topic更新操作。由于该函数比较长,我们分成三个部分来看一下,如下所示:

代码语言:javascript
复制
//typedef std::map<Statestore::TopicId, TTopicDelta> TopicDeltaMap
//TopicDeltaMap& incoming_topic_deltas
  vector<const TTopicDelta*> deltas_to_process;
  for (auto& delta : incoming_topic_deltas) deltas_to_process.push_back(&delta.second);
  sort(deltas_to_process.begin(), deltas_to_process.end(),
    [](const TTopicDelta* left, const TTopicDelta* right) {
      return left->topic_name < right->topic_name;
    });
  vector<unique_lock<mutex>> topic_update_locks(deltas_to_process.size());
  //省略部分代码
  for (int i = 0; i < deltas_to_process.size(); ++i) {
    const TTopicDelta& delta = *deltas_to_process[i];
    auto it = topic_registrations_.find(delta.topic_name);
    TopicRegistration& registration = it->second;
    unique_lock<mutex> ul(registration.update_lock, std::try_to_lock);
    //省略部分代码
    double interval =
        registration.update_interval_timer.ElapsedTime() / (1000.0 * 1000.0 * 1000.0);
    registration.update_interval_metric->Update(interval);
    //省略部分代码
    topic_update_locks[i].swap(ul);
  }

一个TTopicDelta对象表示的就是一个topic的相关update信息。第一部分就是将接收到的topic update数据放到一个vector当中,然后循环更新topic的update_interval_metric,这个成员也就是上面提到的update-interval。所以,这个metric的含义就是表示,从上次处理完成topic update,到本次接收到topic update,这中间的时间间隔。update_interval_timer变量就是用来追踪topic update的,该变量在上面注册topic的时候就会启动,每次处理完之后会重置,这个我们会在下面提到。更新完成后,将当前这个topic对应锁加入到topic_update_locks中,后续处理topic update的时候,还会需要对该lock进行检查是否已经持有,也就是我们第二部分要讲的内容:

代码语言:javascript
复制
  MonotonicStopWatch sw;
  sw.Start();
  for (int i = 0; i < deltas_to_process.size(); ++i) {
    if (!topic_update_locks[i].owns_lock()) continue;
    const TTopicDelta& delta = *deltas_to_process[i];
    auto it = topic_registrations_.find(delta.topic_name);
    TopicRegistration& registration = it->second;
    //省略部分代码
    MonotonicStopWatch update_callback_sw;
    update_callback_sw.Start();
    for (const UpdateCallback& callback : registration.callbacks) {
      callback(incoming_topic_deltas, subscriber_topic_updates);
    }
    update_callback_sw.Stop();
    registration.current_topic_version = delta.to_version;
    registration.processing_time_metric->Update(
        sw.ElapsedTime() / (1000.0 * 1000.0 * 1000.0));
  }

第二部分就是实际处理对应的topic update,主要就是调用topic对应的callback函数。例如对于coordinator的catalog-update函数,就要调用ImpalaServer::CatalogUpdateCallback()函数。这个callback也是在注册topic的时候指定的。处理完该topic对应的update时,就会更新processing_time_metric变量,也就是上面对应的update-interval。所以说,该metric记录的就是每个topic在进行update时,实际消耗的时间。接下来看下第三部分:

代码语言:javascript
复制
  for (int i = 0; i < deltas_to_process.size(); ++i) {
    if (!topic_update_locks[i].owns_lock()) continue;

    const TTopicDelta& delta = *deltas_to_process[i];
    auto it = topic_registrations_.find(delta.topic_name);
    TopicRegistration& registration = it->second;
    registration.update_interval_timer.Reset();
  }
  sw.Stop();
  topic_update_duration_metric_->Update(sw.ElapsedTime() / (1000.0 * 1000.0 * 1000.0));

第三部分的逻辑比较简单,就是通过循环处理将每个topic对应的update_interval_timer变量进行重置,主要就是为了下次统计topic update的时间间隔。 也就是说,经过一次UpdateState()函数调用之后,本次从statestore发过来的topic update都会进行处理,并且每个topic对应的metrics都会进行更新。

Topic update总计metrics

最后还有两个总计的metrics,如下所示:

代码语言:javascript
复制
statestore-subscriber.topic-update-duration
statestore-subscriber.topic-update-interval-time

这两个metrics是针对所有topic的处理时间和时间间隔的更新。注册代码如下所示:

代码语言:javascript
复制
//statestore-subscriber.cc
  topic_update_interval_metric_ = StatsMetric<double>::CreateAndRegister(metrics_,
      "statestore-subscriber.topic-update-interval-time");
  topic_update_duration_metric_ = StatsMetric<double>::CreateAndRegister(metrics_,
      "statestore-subscriber.topic-update-duration");

注册完成之后,也会在UpdateState()函数中进行更新。对于“statestore-subscriber.topic-update-interval-time”,这个metric的更新时机如下所示:

代码语言:javascript
复制
for (int i = 0; i < deltas_to_process.size(); ++i) {
  //省略部分代码
  double interval =
      registration.update_interval_timer.ElapsedTime() / (1000.0 * 1000.0 * 1000.0);
  registration.update_interval_metric->Update(interval);
  topic_update_interval_metric_->Update(interval);
  //省略部分代码
}

可以看到,在对每一个topic update进行时间间隔统计的时候,也会对这个metric进行更新。也就是说,这个metric包含了三个topic的更新时间间隔信息。 关于“statestore-subscriber.topic-update-duration”可以在上一节的第三部分代码最末端看到。也就是说,每次处理完本轮所有topic update的时候,才会将该metric进行更新。

小结

到这里,关于“statestore-subscriber”相关的metrics就介绍的差不多了。这一系列的metrics相对比较简单,都是在StatestoreSubscriber::UpdateState()这个函数中进行更新的,反映的是当前subscriber在处理topic update时的一些负载情况,我们可以根据这些metrics,来排查一下topic同步慢的问题,比如元数据过期等。

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2021-12-16,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 文章目录
  • 固定的metrics
  • Topic相关的metrics
    • Statestore topic介绍
      • Topic update的metrics
        • Metrics更新
        • Topic update总计metrics
    • 小结
    领券
    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档