前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Kafka(0.11.0.2版本)堆内存不能正常回收问题分析【实战笔记】

Kafka(0.11.0.2版本)堆内存不能正常回收问题分析【实战笔记】

作者头像
瓜农老梁
发布2019-08-30 21:19:39
1.2K0
发布2019-08-30 21:19:39
举报
文章被收录于专栏:瓜农老梁瓜农老梁
一、问题描述

短信报警堆内存GC后依然超过4G内存,跟上篇文章所说情况相同。只是上次情况告警短信没发出来。这次介入前,dump了该节点的堆照,方便定位引起的问题。 告警GC日志,回收后依然在4G内存,回收前后只减少了几百M。

代码语言:javascript
复制
2019-05-26T20:41:52.086+0800: 12768164.084: [GC pause (G1 Evacuation Pause) (young), 0.0296753 secs]
   [Parallel Time: 27.0 ms, GC Workers: 28]
      [GC Worker Start (ms): Min: 12768164084.0, Avg: 12768164084.2, Max: 12768164084.5, Diff: 0.5]
      [Ext Root Scanning (ms): Min: 18.1, Avg: 19.0, Max: 19.9, Diff: 1.8, Sum: 532.0]
      [Update RS (ms): Min: 1.2, Avg: 1.7, Max: 2.3, Diff: 1.1, Sum: 47.6]
         [Processed Buffers: Min: 1, Avg: 2.4, Max: 8, Diff: 7, Sum: 68]
      [Scan RS (ms): Min: 1.2, Avg: 1.8, Max: 2.1, Diff: 0.9, Sum: 49.6]
      [Code Root Scanning (ms): Min: 0.0, Avg: 0.0, Max: 0.0, Diff: 0.0, Sum: 0.0]
      [Object Copy (ms): Min: 3.1, Avg: 3.9, Max: 4.6, Diff: 1.4, Sum: 110.6]
      [Termination (ms): Min: 0.0, Avg: 0.0, Max: 0.0, Diff: 0.0, Sum: 0.4]
         [Termination Attempts: Min: 1, Avg: 1.0, Max: 1, Diff: 0, Sum: 28]
      [GC Worker Other (ms): Min: 0.0, Avg: 0.1, Max: 0.1, Diff: 0.1, Sum: 1.6]
      [GC Worker Total (ms): Min: 26.2, Avg: 26.5, Max: 26.7, Diff: 0.5, Sum: 742.2]
      [GC Worker End (ms): Min: 12768164110.7, Avg: 12768164110.7, Max: 12768164110.8, Diff: 0.1]
   [Code Root Fixup: 0.1 ms]
   [Code Root Purge: 0.0 ms]
   [Clear CT: 0.7 ms]
   [Other: 1.9 ms]
      [Choose CSet: 0.0 ms]
      [Ref Proc: 0.6 ms]
      [Ref Enq: 0.0 ms]
      [Redirty Cards: 0.3 ms]
      [Humongous Register: 0.1 ms]
      [Humongous Reclaim: 0.0 ms]
      [Free CSet: 0.4 ms]
   [Eden: 400.0M(400.0M)->0.0B(400.0M) Survivors: 8192.0K->8192.0K Heap: 4426.3M(8192.0M)->4024.2M(8192.0M)]
 [Times: user=0.75 sys=0.00, real=0.03 secs]
二、堆内存分析

有个对象的内存占用高达2.5G

通过图示可以看到该类: org.apache.kafka.common.metrics.JmxReporter有个Map,持有高达近330万个的子Map对象。这些子Map中的结构都类似,只是clientId数值不同。 问题:为何消费者注册到该Reporter不删除呢?

三、代码追踪

JmxReport类分析

下面贴出JmxReporter完整的类,成员变量 private final Map<String, KafkaMbean> mbeans = new HashMap<String, KafkaMbean>();即该mbeans持有330万个子对象。 KafkaMbean中的两个成员变量: private final ObjectName objectName; private final Map<String, KafkaMetric> metrics; metrics中的key即为:堆分析中的kafka.server:type=Request,client-id=admin-3685211

代码语言:javascript
复制
package org.apache.kafka.common.metrics;

import java.lang.management.ManagementFactory;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import javax.management.Attribute;
import javax.management.AttributeList;
import javax.management.AttributeNotFoundException;
import javax.management.DynamicMBean;
import javax.management.InvalidAttributeValueException;
import javax.management.JMException;
import javax.management.MBeanAttributeInfo;
import javax.management.MBeanException;
import javax.management.MBeanInfo;
import javax.management.MBeanServer;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
import javax.management.ReflectionException;

import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.MetricName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Register metrics in JMX as dynamic mbeans based on the metric names
 */
public class JmxReporter implements MetricsReporter {

    private static final Logger log = LoggerFactory.getLogger(JmxReporter.class);
    private static final Object LOCK = new Object();
    private String prefix;
    private final Map<String, KafkaMbean> mbeans = new HashMap<String, KafkaMbean>();

    public JmxReporter() {
        this("");
    }

    /**
     * Create a JMX reporter that prefixes all metrics with the given string.
     */
    public JmxReporter(String prefix) {
        this.prefix = prefix;
    }

    @Override
    public void configure(Map<String, ?> configs) {}

    @Override
    public void init(List<KafkaMetric> metrics) {
        synchronized (LOCK) {
            for (KafkaMetric metric : metrics)
                addAttribute(metric);
            for (KafkaMbean mbean : mbeans.values())
                reregister(mbean);
        }
    }

    boolean containsMbean(String mbeanName) {
        return mbeans.containsKey(mbeanName);
    }
    @Override
    public void metricChange(KafkaMetric metric) {
        synchronized (LOCK) {
            KafkaMbean mbean = addAttribute(metric);
            reregister(mbean);
        }
    }

    @Override
    public void metricRemoval(KafkaMetric metric) {
        synchronized (LOCK) {
            MetricName metricName = metric.metricName();
            String mBeanName = getMBeanName(prefix, metricName);
            KafkaMbean mbean = removeAttribute(metric, mBeanName);
            if (mbean != null) {
                if (mbean.metrics.isEmpty()) {
                    unregister(mbean);
                    mbeans.remove(mBeanName);
                } else
                    reregister(mbean);
            }
        }
    }

    private KafkaMbean removeAttribute(KafkaMetric metric, String mBeanName) {
        MetricName metricName = metric.metricName();
        KafkaMbean mbean = this.mbeans.get(mBeanName);
        if (mbean != null)
            mbean.removeAttribute(metricName.name());
        return mbean;
    }

    private KafkaMbean addAttribute(KafkaMetric metric) {
        try {
            MetricName metricName = metric.metricName();
            String mBeanName = getMBeanName(prefix, metricName);
            if (!this.mbeans.containsKey(mBeanName))
                mbeans.put(mBeanName, new KafkaMbean(mBeanName));
            KafkaMbean mbean = this.mbeans.get(mBeanName);
            mbean.setAttribute(metricName.name(), metric);
            return mbean;
        } catch (JMException e) {
            throw new KafkaException("Error creating mbean attribute for metricName :" + metric.metricName(), e);
        }
    }

    /**
     * @param metricName
     * @return standard JMX MBean name in the following format domainName:type=metricType,key1=val1,key2=val2
     */
    static String getMBeanName(String prefix, MetricName metricName) {
        StringBuilder mBeanName = new StringBuilder();
        mBeanName.append(prefix);
        mBeanName.append(":type=");
        mBeanName.append(metricName.group());
        for (Map.Entry<String, String> entry : metricName.tags().entrySet()) {
            if (entry.getKey().length() <= 0 || entry.getValue().length() <= 0)
                continue;
            mBeanName.append(",");
            mBeanName.append(entry.getKey());
            mBeanName.append("=");
            mBeanName.append(entry.getValue());
        }
        return mBeanName.toString();
    }

    public void close() {
        synchronized (LOCK) {
            for (KafkaMbean mbean : this.mbeans.values())
                unregister(mbean);
        }
    }

    private void unregister(KafkaMbean mbean) {
        MBeanServer server = ManagementFactory.getPlatformMBeanServer();
        try {
            if (server.isRegistered(mbean.name()))
                server.unregisterMBean(mbean.name());
        } catch (JMException e) {
            throw new KafkaException("Error unregistering mbean", e);
        }
    }

    private void reregister(KafkaMbean mbean) {
        unregister(mbean);
        try {
            ManagementFactory.getPlatformMBeanServer().registerMBean(mbean, mbean.name());
        } catch (JMException e) {
            throw new KafkaException("Error registering mbean " + mbean.name(), e);
        }
    }

    private static class KafkaMbean implements DynamicMBean {
        private final ObjectName objectName;
        private final Map<String, KafkaMetric> metrics;

        public KafkaMbean(String mbeanName) throws MalformedObjectNameException {
            this.metrics = new HashMap<String, KafkaMetric>();
            this.objectName = new ObjectName(mbeanName);
        }

        public ObjectName name() {
            return objectName;
        }

        public void setAttribute(String name, KafkaMetric metric) {
            this.metrics.put(name, metric);
        }

        @Override
        public Object getAttribute(String name) throws AttributeNotFoundException, MBeanException, ReflectionException {
            if (this.metrics.containsKey(name))
                return this.metrics.get(name).value();
            else
                throw new AttributeNotFoundException("Could not find attribute " + name);
        }

        @Override
        public AttributeList getAttributes(String[] names) {
            try {
                AttributeList list = new AttributeList();
                for (String name : names)
                    list.add(new Attribute(name, getAttribute(name)));
                return list;
            } catch (Exception e) {
                log.error("Error getting JMX attribute: ", e);
                return new AttributeList();
            }
        }

        public KafkaMetric removeAttribute(String name) {
            return this.metrics.remove(name);
        }

        @Override
        public MBeanInfo getMBeanInfo() {
            MBeanAttributeInfo[] attrs = new MBeanAttributeInfo[metrics.size()];
            int i = 0;
            for (Map.Entry<String, KafkaMetric> entry : this.metrics.entrySet()) {
                String attribute = entry.getKey();
                KafkaMetric metric = entry.getValue();
                attrs[i] = new MBeanAttributeInfo(attribute,
                                                  double.class.getName(),
                                                  metric.metricName().description(),
                                                  true,
                                                  false,
                                                  false);
                i += 1;
            }
            return new MBeanInfo(this.getClass().getName(), "", attrs, null, null, null);
        }

        @Override
        public Object invoke(String name, Object[] params, String[] sig) throws MBeanException, ReflectionException {
            throw new UnsupportedOperationException("Set not allowed.");
        }

        @Override
        public void setAttribute(Attribute attribute) throws AttributeNotFoundException,
                                                     InvalidAttributeValueException,
                                                     MBeanException,
                                                     ReflectionException {
            throw new UnsupportedOperationException("Set not allowed.");
        }

        @Override
        public AttributeList setAttributes(AttributeList list) {
            throw new UnsupportedOperationException("Set not allowed.");
        }

    }

}

Jconsole查看

kafka.server:type=Request

四、问题解决

刚开始觉得是我们使用的问题,是否资源没有关闭,查看源代码也未能看出哪里出了问题。 后来确定为kafka 0.11.0.2版本的Bug,在0.11.0.3版本已经修复。

ISSUE https://issues.apache.org/jira/browse/KAFKA-6199 https://issues.apache.org/jira/browse/KAFKA-6307

两个版本的源码对比 JmxReporter

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2019-08-28,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 瓜农老梁 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、问题描述
  • 二、堆内存分析
  • 三、代码追踪
  • 四、问题解决
相关产品与服务
短信
腾讯云短信(Short Message Service,SMS)可为广大企业级用户提供稳定可靠,安全合规的短信触达服务。用户可快速接入,调用 API / SDK 或者通过控制台即可发送,支持发送验证码、通知类短信和营销短信。国内验证短信秒级触达,99%到达率;国际/港澳台短信覆盖全球200+国家/地区,全球多服务站点,稳定可靠。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档