lz_rec_push_kafka_consume 该项目通过kafka与算法进行交互,通过push推荐平台(lz_rec_push_platform)预生成消息体。
发现项目的k8s容器会出现重启现象,重启时间刚好是push扩量,每小时push数据量扩大5倍左右。 发生问题时,容器配置:CPU:4个,内存:堆内3G,堆外1G。
注:容器重启机制:k8s监控发现“实例”内存使用超过申请时,会对容器进行重启。该动作是直接使用kill -9的,而非通过jvm指令对虚拟机进行重启,所以此处别想dump堆。
一开始怀疑是内存,但是内存不足的话,应该是出现oom的情况。所以先排除堆内内存不足的问题。将实例内存扩大至:6G,堆内5G,堆外1G。发现重启现象没有丝毫改善。
byte[] input = log.getBytes();
try (final ByteArrayOutputStream outputStream = new ByteArrayOutputStream(input.length)) {
final Deflater compressor = new Deflater();
compressor.setInput(input);
compressor.finish();
byte[] buffer = new byte[1024];
int offset = 0;
for (int length = compressor.deflate(buffer, offset, buffer.length); length > 0; length = compressor.deflate(buffer, offset, buffer.length)) {
outputStream.write(buffer, 0, length);
outputStream.flush();
}
//compressor.end();
return Base64Utils.encodeToString(outputStream.toByteArray());
}
}
public static String zipDecompress(final String str) throws Exception {
byte[] input = Base64Utils.decodeFromString(str);
try (final ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(input.length)) {
final Inflater decompressor = new Inflater();
decompressor.setInput(input);
byte[] buffer = new byte[1024];
for (int length = decompressor.inflate(buffer); length > 0 || !decompressor.finished(); length = decompressor.inflate(buffer)) {
byteArrayOutputStream.write(buffer, 0, length);
}
//decompressor.end();
return new String(byteArrayOutputStream.toByteArray());
}
}
/**
* Closes the compressor and discards any unprocessed input.
* This method should be called when the compressor is no longer
* being used, but will also be called automatically by the
* finalize() method. Once this method is called, the behavior
* of the Deflater object is undefined.
*/
/**
* Closes the compressor and discards any unprocessed input.
* This method should be called when the compressor is no longer
* being used, but will also be called automatically by the
* finalize() method. Once this method is called, the behavior
* of the Deflater object is undefined.
*/
public void end() {
synchronized (zsRef) {
long addr = zsRef.address();
zsRef.clear();
if (addr != 0) {
end(addr);
buf = null;
}
}
}
/**
* Closes the compressor when garbage is collected.
*/
protected void finalize() {
end();
}
思考:项目发生重启是在kafka数据扩量后才出现的,那为何扩量前没有这个问题的出现呢?其实问题一直是存在的,只是数据量小的情况下,引用都在垃圾回收后能正常释放堆外内存。但是扩量后,瞬间的流量增高,产生大量的堆外内存使用引用。在下一次垃圾回收之前ReferenceQueue队列已经堆积了大量的引用,将容器内的堆外内存撑爆。
去除压缩与解压缩动作后,发版观察。项目的k8s实例资源监控处在合理范围。
至此,堆外内存问题已经解决了。
问题:使用资源时,保持着资源使用后及时释放的习惯。该问题便是由压缩使用有误引起的,应该也算是低级错误了。
由于第一次排查堆外内存泄漏的问题,没有丰富的经验去锁定问题点达到快速排查,走了不着弯路。该文章略显啰嗦,但是主要目的还是想记录下排查问题的过程。第一次发博客,写作思路上有点紊乱,请多多包涵。如果有什么措辞不当的,还望指出。有什么好的建议也希望能指点一二。