继之前《Kafka运维篇之初识Streams Messaging Manager》、《Kafka运维篇之使用SMM监控Kafka集群》和《Kafka运维篇之使用SMM预警策略管理Kafka预警》之后。我们今天介绍使用SMM来监控Kafka端到端的延迟。
Streams MessagingManager(SMM)是一种操作监视和管理工具,可在企业ApacheKafka®环境中提供端到端的可见性。SMM还提供了Kafka的端到端延迟监控。
端到端延迟概述
延迟是消费者消耗Topic中产生的消息所花费的时间。 您可以使用SMM UI监视Topic中的端到端延迟。使用延迟功能可以实现以下目标:
• 验证是否满足端到端处理时间SLA。
• 确定缓慢或落后的消费者。
• 验证消息是否消费过多或不足。
您可以在SMM UI的以下两个图中找到有关在Topic中生成的消息数,从Topic消耗的消息数以及使用消息期间的延迟详细信息的详细信息:
• 已消耗消息。该图为您提供了所选时间范围内某个Topic的所有消费者组的总体已产生消息数和已消耗消息数。产生和消耗的消息计数中的任何差异都以红色突出显示。
在前面的图像中,线性形式表示最近一小时产生的消息数,填充区域表示最近一小时消耗的消息数(粒度为30秒)。蓝色区域表示已消耗所有产生的消息。红色区域表示产生和消耗的消息计数之间的差异,并且可能意味着消息消耗过多或消耗不足。
在图像中,有两个红色区域。左侧的第一个红色区域表示已使用消息的数量大于已生成消息的数量。这表示消息的过度消耗,当消费者组偏移量重置为较旧的偏移量以重新处理消息时,或者当生产者或消费者以不干净的方式关闭时,可能会发生消息的过度消耗。
最后一个红色区域表示已使用消息的数量少于已产生消息的数量。这表示消息消耗不足,当消费者组偏移量设置为较新的偏移量时,会导致消息不足,从而导致消费者组跳过某些消息的处理。
图的最右边部分显示了当前的处理窗口,在此窗口中,消费者仍在使用生成的消息。因此,该区域应标记为红色,并表示消息不足。
图像中的所有其他区域均为蓝色,表示所有产生的消息都已耗尽。
• 端到端延迟。“端到端延迟”图为您提供了在特定时间范围内在特定时间范围内以毫秒为单位的特定消息中产生的延迟范围和使用消息的平均延迟的详细信息。
在上图中,垂直线表示等待时间范围,虚线表示在最近一小时内以30秒的粒度使用生成的消息时的平均等待时间。您可以看到,在6月26日星期三的12:29:00,延迟范围在4到218毫秒之间,平均延迟为69毫秒。
注意
您还可以根据您在预警策略中配置的条件创建预警以接收通知,以监视系统中的延迟。
指标的粒度
SMM使用REST API来显示指标。所有度量均可以两种不同的粒度查询:30秒和15分钟。指标针对已定义的存储桶进行了预汇总。根据查询数据的时间,Topic的粒度和分区,分区,消费者组ID和客户端ID的不同维度,计算数据并将其呈现为JSON。在开始使用SMM监视延迟之前,请仔细阅读以下详细信息:
• 当您选择的时间比当前时间晚24小时时,将从REST服务器以30秒的度量粒度检索数据。
• 如果您选择的时间比当前时间早24小时,则会从REST服务器以15分钟的度量粒度检索数据。
• SMM UI会定期轮询API以进行更新(如果所选时间比当前时间晚24小时,则每30秒轮询一次,否则每15分钟一次)。
• 默认情况下,30秒粒度度量标准存储24小时,而15分钟粒度度量标准存储2周。
启用拦截器
拦截器会定期将度量标准发布到Kafka。指标包括生产者方的计数,以及消费者方的计数,平均延迟,最小和最大延迟。
您需要为消费者,生产者和KafkaStreams应用程序启用拦截器,以使SMM能够获取指标。如果未启用拦截器,则无法在SMM中看到任何度量标准。
在应用程序中启用拦截器的步骤
将以下jar添加到应用程序的类路径或作为应用程序中的依赖项:
<dependency>
<groupId>com.hortonworks.smm</groupId>
<artifactId>monitoring-interceptors</artifactId>
</dependency>
启用消费者拦截器的步骤
执行以下步骤以启用消费者拦截器:
1) 将interceptor.classes属性添加到传递给KafkaConsumer构造函数的消费者配置。
2) client.id如下配置属性:
KafkaConsumer<Integer, String> createKafkaConsumer(String bootstrapServers, String groupId, String clientIdentifier) {
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerDeserializer");
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
properties.put(ConsumerConfig.CLIENT_ID_CONFIG, clientIdentifier);
//Add ConsumerInterceptor like this
properties.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
"com.hortonworks.smm.kafka.monitoring.interceptors.MonitoringConsumerInterceptor");
return new KafkaConsumer<Integer, String>(properties);
}
注意
我们建议您配置client.id属性。它有助于识别消费者实例。如果未配置,则延迟度量将获取默认消费者ID。
启用生产者拦截器的步骤
将该interceptor.classes属性添加到生产者配置中,该配置信息将传递给KafkaProducer构造函数,如下所示:
KafkaProducer<Integer, String> createKafkaProducer(String bootstrapServers) {
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
//Add ProducerInterceptor like this
properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
"com.hortonworks.smm.kafka.monitoring.interceptors.MonitoringProducerInterceptor");
return new KafkaProducer<Integer, String>(properties);
}
在KafkaStreams应用程序中启用拦截器的步骤
将producer.interceptor.classes和 consumer.interceptor.classes属性添加到Kafka Streams配置中,如下所示:
void startKafkaStreams(StreamsBuilder builder) {
KafkaStreams kstreams = new KafkaStreams(builder.build(), getKafkaStreamsConfiguration());
kstreams.start();
}
Properties getKafkaStreamsConfiguration() {
Properties config = new Properties();
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
config.put(StreamsConfig.APPLICATION_ID_CONFIG, appId);
config.put(StreamsConfig.CLIENT_ID_CONFIG, clientId);
//Add producer interceptor like this
config.put(
StreamsConfig.PRODUCER_PREFIX + ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
"com.hortonworks.smm.kafka.monitoring.interceptors.MonitoringProducerInterceptor");
//Add consumer interceptor like this
config.put(
StreamsConfig.CONSUMER_PREFIX + ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
"com.hortonworks.smm.kafka.monitoring.interceptors.MonitoringConsumerInterceptor");
return config;
}
监控端到端延迟
执行以下步骤,以监视SMM UI中的端到端延迟:
1) 转到SMM UI中的Topic。
2) 选择要验证其详细信息的Topic。
3) 单击所选Topic旁边的配置文件图标
。
这将带您到“ 指标”页面,您可以在其中找到“ 消耗的消息”和“端到端延迟”图以及其他Topic详细信息。在“ 指标”页面上,这两个图为您提供了所有消费者组之间的延迟和已消耗消息计数的汇总结果。
4) 要按单个消费者组,客户端和分区验证详细信息,请转到“ 延迟”选项卡。
出现“ 延迟”页面,如下图所示:
延迟视图为您提供了端到端延迟情况的强大快照:Topic的消费者组数量,特定消费者组内的客户端数量以及Topic中的分区数量以及Messages Consumed和End-to-end Latency图。
5) 从“ 消费者组” 下拉列表中选择任何消费者组,如下图所示:
在该图像中,选择了group10消费者组。该 Latency选项卡显示group10消费组中有3个客户端,并且该Topic中有10个分区。
6) 从“ 客户端”下拉列表中选择任何客户端,如下图所示:
在该图像中,选择了host-1客户端。在这种情况下,“消耗的消息”和“端到端延迟”图仅显示主机1客户端的数据。在这里,您可以监视仅主机1产生的消息数,消耗的消息数,延迟范围和平均延迟。将鼠标悬停在图形上并在选定的时间范围内的任何时间点获取数据。您可以在“已消耗的消息”图中看到host-1消耗了所有生成的消息,并在最近的时间活动消耗了数据。您可以在“端到端延迟”图表中看到,延迟范围和平均延迟小于250毫秒。
7) 要获取有关host-1正在使用数据的分区的详细信息,请单击 Partitions。
出现Topic中的分区列表,如下图所示:
在该图中,您可以看到host-1正在使用3个分区中的数据:P1,P2和P3。其他分区对于主机1无效。
8) 从列表中选择任何活动分区。
的等待时间标签显示主机1和所选择的分区(例如,P1)之间的交易详情,如下面的图像中:
现在,您已获得主机1客户端的详细信息。同样,您可以获取其他客户端的详细信息。
9) 请按照步骤6到8来获取所有其他客户端的数据。
10) 请按照步骤5到8来获取所有其他消费者组的数据。
要一次清除所有选择,请单击 页面右上角的“ 清除”按钮。
• 要清除消费者组,客户端或分区的选择,请单击每个下拉菜单上的删除图标。
• 要选择其他时间范围,请使用页面右上角的“ 时间范围”和“ 快速范围”选项,如下图所示:
端到端延迟用例
让我们逐步了解SMM如何为Platform Operations用户回答一些问题:
用例1:验证是否满足端到端处理时间SLA。
服务级别协议(SLA)是服务提供商与服务用户之间的一项承诺。服务的特定方面在服务提供商和服务用户之间达成一致。SLA的最常见组成部分是,应按合同约定向用户提供服务。例如,您同意Cloudera使用消息的平均延迟值和最大延迟值。因此,在生产者产生消息之后,如果消息花费了约定的时间以供消费者使用,则将满足SLA。
1) 转到SMM UI中的Topic。
2) 选择要验证其详细信息的Topic。
3) 单击所选Topic旁边的配置文件图标。
4) 检查等待时间图,看看平均和最大等待时间是否符合预期。
5) 如果延迟不符合预期,请转到“ 延迟”选项卡。
6) 检查客户端数量是否符合预期。如果不是,那么您可能要检查丢失的客户端实例。
7) 如果客户端数量符合预期,请检查消息计数中是否存在峰值。在“时间范围”窗格中选择一个1周的时间,然后查看传入消息是否激增,可以解释时间违反SLA。
8) 如果即使所有检查都变为肯定之后,仍然违反时间SLA,请转到用例2。
用例2:确定缓慢或落后的消费者。
基于流的应用程序流程包括对消息进行应用程序轮询,获取和处理消息,执行可选的阻止操作(例如与数据库或本地文件系统进行交互),然后再次对消息进行应用程序轮询。但是,在某些情况下,由于消息处理,系统瓶颈或外部瓶颈引起的延迟可能会变得很长,您可能想了解哪个流程实例面临问题。
1) 转到SMM UI中的Topic。
2) 选择要验证其详细信息的Topic。
3) 单击所选Topic旁边的配置文件图标。
4) 转到“ 延迟”选项卡。
5) 选择一个组后,检查每个客户端的等待时间和消息计数。
这可能会导致您的消费缓慢。
让我们来看一个例子。
在上图中,选择group10消费者组以检查每个客户端的延迟和消息计数。
在上图中,您看到组10的活动客户端列表:host1,host2,host3和busy-host。现在,您需要选择每个客户端并检查延迟和消息计数。
在上图中,您可以看到host1消耗了所有产生的消息,并且平均延迟时间和延迟范围都处于良好范围内。
在上图中,您可以看到host2消耗了所有生成的消息。同样,延迟范围内几乎没有峰值,但是平均延迟范围内比较大。
在上图中,您可以看到host3消耗了所有产生的消息。此外,延迟期范围偶尔会出现峰值,但平均延迟期在良好范围内。
在上图中,您可以看到busy-host占用了所有已生成的消息。延迟范围经常出现尖峰,并且延迟范围和平均延迟都很高。因此,繁忙主机是耗时缓慢的客户端。
所有客户端的等待时间可能更长。这可能意味着存在共同的瓶颈。例如,客户端正在通过网络与外部存储进行交互,并且由于网络问题而在消费消息方面存在延迟。
如果只有一个客户端运行缓慢,则必须检查其他客户端的消息计数以及系统参数(如CPU和内存)。
这满足了您识别耗时缓慢的应用程序的需求。
用例3:验证消息是否消耗过多或不足。
消息可能会过度消耗。可能由于以下原因而发生:
• 如果生产者和消费者以不清洁的方式关闭或生产者和消费者以意外的方式关闭了。例如,Kafka生产者产生了一些消息,但是在生产者收到Broker的任何确认之前就关闭了。同样,Kafka消费者消耗了一些消息,但是在此最后一点提交补偿之前被关闭了。
• 如果消费者被重置为较早的偏移量(后处理方案)。
如果使用方重置为新的偏移量(实时应用程序要求),则消息可能会消耗不足。如果集群处于不正常状态,则消息使用量可能过多或不足。
1) 转到SMM UI中的Topic。
2) 选择要验证其详细信息的Topic。
3) 单击所选Topic旁边的配置文件图标。
4) 转到“ 延迟”选项卡。
5) 选择一个组后,在“ 已消耗消息”图中检查每个客户端的已产生消息和已使用消息计数。
这可以帮助您验证消费者是否正在使用Topic中产生的所有消息。您也可能会发现消息中任何过度消费或消费不足的情况。
在该图像中,您可以看到对于group10消费者组,图中有三个红色尖峰Messages Consumed 。
左侧的第一个尖峰表示已使用消息的数量大于已生成消息的数量。因此,这是消息的过度消耗。
第二次和第三个峰值表明消息消耗过多,然后消息消耗不足。
来源:https://docs.cloudera.com/csp/2.0.1/monitoring-end-to-end-latency/topics/smm-end-to-end-latency-overview.html