导 读
承接上文,本文主要介绍业务代码升级相关内容。业务代码升级,主要是从旧的基于Zookeeper的kakfa消费方式,切到新的消费者API上。
代码调整
1
调整1
使用新版storm-kafka-clien包代替老版本使用的storm-kafka(此处需要特别注意,网上很多demo都是基于老版本的storm写的,所以引用的包还是storm-kafka)。
即使用
代替
2
调整2
storm-kafka包升级以后,消费kafka的配置方式也发生了变化。
1.0.2版本的配置方式
当前的版本storm会将offset写入到zookeeper中,根据offset从broker中获取数据。每个Spout中包含一个consumer消费者,通过SpoutConfig完成相应的配置。代码如下:
相关参数见下表:
1.2.2版本的配置方式
当前版本中会将 kafka消息的 topic, partition, offset, key, value作为一个 ConsumerRecord 发送,当我们仅需要 value时,需要通过 SimpleRecordTranslator 对数据进行过滤。
当前版本中摒弃了之前版本中通过startOffsetTime, scheme, fetchSizeBytes, bufferSizeBytes, fetchMaxWait, clientId 等配置字段,通过setProp的方式作为kafka的配置参数,配置方式如下图所示。
相关参数见下表:
要点总结与归纳
1
使用新版的storm-kafka-client
storm-kafka 是老版本的包,官方推荐使用storm-kafka-client 代替。凡是网上搜到的代码中,使用storm-kafka ,还是使用基于ZK模式消费kafka,并没有使用 kafka新消费者API。两者在使用方式上不相同,需要注意。
2
切换时如何保证数据不丢失?
在升级过程中,我们希望能够做到平滑升级,保证数据不丢失。
方案一
如果在后半夜或者周末的时候业务会暂停,没有新的kafka消息进入,数据切换就很简单。只需要新版的Consumer从最近的offset开始消费就可以。
推荐将 setFirstPollOffsetStrategy配置为 UNCOMMITTED_LATEST,当业务启动数据进入后,即开始消费。
方案二
如果业务数据是不间断的,又不希望数据丢失,需要将将zk中存储的offset,写入到为“_consumer_offsets”的topic中(在0.10.x以后的kafka版本中,将offset写入到了_consumer_offsets的topic中),storm官方提供了相应的升级方案。
升级小花絮
在整个升级过程中,我们还遇到了一个问题,即升级后,新版本storm使用kafka新消费者无法获取数据,但是切回老版本ZK模式,却没有问题。
经过相关同事排查,在kafka服务器上,我们看到有报错日志如下:
The replication factor for the offsets topic (set higher to ensure availability). Internal topic creation will fail until the cluster size meets this replication factor requirement.
即这个参数的意思是内部topic副本配置数量,如果集群数量(broker)小于副本数量,topic创建会失败。
内部topic是指什么? 老版本kafka消费者,是基于ZK的,offset数据也保存在ZK上面;新版本kafka,offset都保存在broker上面的topic,即内部topic。
早期kafka搭建时,使用版本是0.8.1,只搭建了2个broker,虽然后面升级到0.11.0,但是由于一直使用基于ZK的消费模式,导致问题一直没用报出来,直到这次使用新消费者,才发现问题。
问题清楚后,我们扩展了kafka集群节点数量,从2个扩展到3个,随后新版storm 可以正常消费数据。
至此storm从1.0.2 升级到 1.2.2全流程完结。
▼
领取专属 10元无门槛券
私享最新 技术干货