首页
学习
活动
专区
工具
TVP
发布
精选内容/技术社群/优惠产品,尽在小程序
立即前往

Leader丢失导致的Consumer挂起故障解决

现象

最近发现线上的Kafka Consumer Client频繁出现无法消费的情况,导致offset积压。但是在重启Kafka Broker之后又正常了。 而Cloudera Manager在重启之前,我们发现三台broker中并没有KakfaController。让人很是不解。

排查步骤

检查Topic的状态

目前已经没办法复现当时的场景,我们简单描述下,通过命令

kafka-topics --zookeeper hadoop02 --desc

检查Topic的状态如下:

这张图是后续补的,当时的现象是Leader的值均为-1,Isr的值也均为-1

至此我们猜测是因为KakfaController丢失导致的partition leader为-1,进而导致的Consumer端无法正常消费。

查看Leader的选举方式

在这里我们需要先看下KafkaController,KafkaServer.startup()的时候会新建KafkaController,而KafkaController在启动时,启动了Controller的Elector

defstartup() = {

inLock(controllerContext.controllerLock) {

info("Controller starting up");

registerSessionExpirationListener()

isRunning=true

controllerElector.startup

info("Controller startup complete")

}

}

我们再点进去查看下Controller是怎么elect出来的:

defelect:Boolean= {

valtimestamp = SystemTime.milliseconds.toString

valelectString = Json.encode(Map("version"->1,"brokerid"-> brokerId,"timestamp"-> timestamp))

leaderId= getControllerID

/*

* We can get here during the initial startup and the handleDeleted ZK callback. Because of the potential race condition,

* it's possible that the controller has already been elected when we get here. This check will prevent the following

* createEphemeralPath method from getting into an infinite loop if this broker is already the controller.

*/

if(leaderId!= -1) {

debug("Broker %d has been elected as leader, so stopping the election process.".format(leaderId))

returnamILeader

}

try{

createEphemeralPathExpectConflictHandleZKBug(controllerContext.zkClient,electionPath,electString,brokerId,

(controllerString :String,leaderId :Any) => KafkaController.parseControllerId(controllerString) == leaderId.asInstanceOf[Int],

controllerContext.zkSessionTimeout)

info(brokerId +" successfully elected as leader")

leaderId= brokerId

onBecomingLeader()

}catch{

casee: ZkNodeExistsException =>

// If someone else has written the path, then

leaderId= getControllerID

if(leaderId!= -1)

debug("Broker %d was elected as leader instead of broker %d".format(leaderId,brokerId))

else

warn("A leader has been elected but just resigned, this will result in another round of election")

casee2:Throwable=>

error("Error while electing or becoming leader on broker %d".format(brokerId),e2)

resign()

}

amILeader

}

从上面的选举代码中我们可以看出

inLock(controllerContext.controllerLock) {

debug("%s leader change listener fired for path %s to handle data deleted: trying to elect as a leader"

.format(brokerId,dataPath))

if(amILeader)

onResigningAsLeader()

elect

}

我们知道kafka消费的时候需要和Leader通信,而Leader不存在导致的没办法消费很容易理解,那么为什么Controller丢失会导致partition的leader不正常呢?我们来看下面一张图片(图片来自CSDN博主:happy19870612):

valreplicaStateMachine =newReplicaStateMachine(this)

我们可以发现Replica的状态机管理是在KafkaController中完成的,也就是说Controller丢失的情况下,也就失去了与Zookeeper交互的能力。默认情况下Leader必须从ISR列表中选择,我们发现列表为空(经过排查发现是Kafka的bug,在Controller和Zookeeper通信过程中出现问题时,可能导致leader丢失而无法通信的情况,这个可能性是很大的,因为zookeeper在高并发环境是容易超时,这就是为什么在kafka 0.8.2.1之后更建议我们使用kafka topic的方式存储offset而不是存储在zookeeper中。

检查系统日志

当然一般情况下我们会先检查系统日志是否有报异常,这种定位问题效率最高。我们来看看kafka的server log是不是有和zookeeper相关的异常

从图上可以发现,的确存在zookeeper连接失败的情况,另外我们发现一个比较诡异的事情:

2018-04-10 00:30:11,149INFO kafka.controller.KafkaController: [Controller218]: Currently active brokers in the cluster: Set()

所有的broker都临时下线了,然后我查看了其他broker的server log发现所有机器在同一时间均出现了zookeeper连接超时的情况,导致了后续一连串的ERROR:

解决方式

那么问题已经很明了了,我们又检查了下凌晨的网络IO:

PS:当zookeeper服务器端和客户端版本不一致的时候也会导致连接超时的情况。

  • 发表于:
  • 原文链接https://kuaibao.qq.com/s/20180829G184XU00?refer=cp_1026
  • 腾讯「腾讯云开发者社区」是腾讯内容开放平台帐号(企鹅号)传播渠道之一,根据《腾讯内容开放平台服务协议》转载发布内容。
  • 如有侵权,请联系 cloudcommunity@tencent.com 删除。

扫码

添加站长 进交流群

领取专属 10元无门槛券

私享最新 技术干货

扫码加入开发者社群
领券