使用storm trident消费kafka消息

storm通过保证数据至少被处理一次来保证数据的完整性,由于元祖可以重发,对于一些需要数据精确的场景,可以考虑用storm trident实现。 传统的事物型拓扑中存在几种bolt: 1.1 BasicBolt 这是最基本的Bolt,BasicBolt每次只能处理一个tuple,而且必须等前一个tuple成功处理后下一个tuple才能继续处理,显然效率不高。 1.2 BatchBolt storm的一个优势就是能够批量处理tuple,BatchBolt支持批量处理tuple,每一个batch中的tuple都会调用execute(),处理完成后调用finishBatch方法。 1.3 Committer BatchBolt 标记为Committer的BatchBolt和基本的BasicBolt的区别在于二者调用finishBatch()的时机不同,标记为Committer的BatchBolt在提交阶段就会调用finishBatch()。

二、storm trident的使用 storm目前的版本已经将事物拓扑的实现封装trident,trident目前支持3种不同的事物接口,一种是非事物型的(不介绍,因为基本不用),一种是事务性的TransactionalTridentKafkaSpout,而我们比较常用的是透明型事物OpaqueTridentKafkaSpout(事务型应用最重要的一点是要判断一批消息是新的还是已来过的)。 2.1 TransactionalTridentKafkaSpout

原理是每次在数据库中存了txid,IPartitionedTransactionalSpout的每一个tuple都会绑定在固定的批次(batch)中。 一个批次无论重发多少次,它也只有一个唯一且相同的事务ID,它所包含的内容都是完全一致的,而一个tuple无论被重发多少次只会在同一个批次里。 但貌似目前TransactionalTridentKafkaSpout有个bug,启动会报:classCastException(非代码问题)

具体可参考: Java代码 1. issue:https://issues.apache.org/jira/browse/STORM-1728 然而我们可以想到的是,IPartitionedTransactionalSpout会有一个问题,假设一批消息在被bolt消费过程中失败了,需要spout重发,此时如果正巧遇到消息发送中间件故障,例如某一个分区不可读,spout为了保证重发时每一批次包含的tuple一致,它只能等待消息中间件恢复,也就是卡在那里无法再继续发送给bolt消息了,直至消息中间件恢复(因为它必须发送一样的Batch)。 2.2 OpaqueTridentKafkaSpout IOpaquePartitionedTransactionalSpout不保证每次重发一个批次的消息所包含的tuple完全一致。也就是说某个tuple可能第一次在txid=1的批次中出现,后面有可能在txid=3的批次中出现。这种情况只出现在当某一批次消息消费失败需要重发且恰巧消息中间件故障时。这时,IOpaquePartitionedTransactionalSpout不是等待消息中间件故障恢复,而是先读取可读的partition。例如txid=1的批次在消费过程中失败了,需要重发,恰巧消息中间件的16个分区有1个分区(partition=3)因为故障不可读了。这时候IOpaquePartitionedTransactionalSpout会先读另外的15个分区,完成txid=1这个批次的发送,这时候同样的批次其实包含的tuple已经少了。假设在txid=3时消息中间件的故障恢复了,那之前在txid=1且在分区partition=3的还没有被发送的tuple会被重新发送, 包含在txid=3的批次中,所以其不保证每批次的batch包含的tuple是一样的。

http://workman666.iteye.com/blog/2348863

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏派森公园

深入理解Linux LA

经常和Linux打交道的童鞋都知道,load averages是衡量机器负载的关键指标,但是这个指标是怎样定义出来的呢?

17640
来自专栏人工智能LeadAI

配置深度学习主机与环境(TensorFlow+1080Ti) | 第四章 基于Anaconda的TensorFlow安装

配置深度学习主机与环境(TensorFlow+1080Ti): 01 概念介绍 Anaconda Anaconda(https://www.continuu...

42850
来自专栏专知

Python网络爬虫与信息抽取笔记02 requests库入门

18940
来自专栏葡萄城控件技术团队

ActiveReports 报表应用教程 (11)---交互式报表之文档目录

通过文档目录,用户可以非常清晰的查看报表数据结构,并能方便地跳转到指定的章节,最终还可以将报表导出为PDF等格式的文件。本文以2012年各月产品销售分类汇总报表...

21090
来自专栏美团技术团队

基于rsync的文件增量同步方案

背景 犀牛云盘是美团点评内部一个基于美团云的文件协作平台,核心是文件的结构化云存储以及上传和下载的体验优化。文件同步是云盘功能的重要部分(包括文件内容的同步和文...

81640
来自专栏企鹅号快讯

分布式TensorFlow入坑指南:从实例到代码带你玩转多机器深度学习

AI UNION 人工智能产业技术创新战略联盟 这里是人工智能联盟,汇聚了最新的AI新闻资讯,还有最前沿的国内外AI开源技术,最具价值的AI创新企业,最具权威的...

21970
来自专栏极客慕白的成长之路

Kraken.io – 最好用的在线图片压缩工具

Kraken.io 是一个在线图片优化和压缩服务,在保持图像原始质量的同时尽可能的缩减图像大小,支持 JPEG、PNG、GIF 动画和 SVG 格式。

7910
来自专栏Youngxj

涛先森悬浮客服插件1.1

18950
来自专栏ATYUN订阅号

腾讯开源围棋AI程序PhoenixGo,复现AlphaGo Zero

PhoenixGo是一个围棋AI程序,它执行AlphaGo Zero论文“掌握无人知识的Go游戏”。它也被称为FoxGo中的“BensonDarr”,CGOS中...

26120
来自专栏Spark学习技巧

如何理解Linux中的load averages?

经常和 Linux 打交道的童鞋都知道,load averages 是衡量机器负载的关键指标,但是这个指标是怎样定义出来的呢?

9720

扫码关注云+社区

领取腾讯云代金券