kafka0.9.0 新特性(对比0.8)

1、引入新的Consumer API

0.9.0相比0.8.2,引入了一个新的Consumer API,这个API不再使用high level和low level的基于zookeeper的client;不过仍然支持0.8.0的client。

新的API通过如下方式引入依赖:

   <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>0.9.0.0</version>
    </dependency>

consumer的offset

kafka0.8.0 的 consumer客户端需要不断与kafka集群的zookeeper交互,以获取最新的offset。而新的consumer的offset则是交给kafka来管理,kafka通过创建专用的topic进行管理不同partition的offset。kafka自己维护了partition的offset,以供同一个partition的不同consumer使用。(图中last commit offset 就是已经确认消费的offset)

image.png

假设现在某一个consumer消费到current position时,未来得及确认已消费就挂掉了,那么下次其他consumer来拉数据时,就从last commit offset开始,重复消费1~6.Consumer的commit。

如果配置了enable.auto.commit为true和auto.commit.interval.ms=xxx,那么就按照这个频率进行commit; 为false时,就需要手动进行commit,可以使用同步方式commitSync,也可以使用 commitAsync 进行异步commit,对于异步确认的话,会返回一个hook,可以利用这个hook进行一定的业务逻辑处理。

consumer通过subscribe方法来订阅它感兴趣的topic,每次订阅之后kafka又有新的consumer加进来的话,那么就要对该topic的position进行重新分配(consumer和partition的比例最好是一个1:1)。

一般而言这个过程是consumer不感兴趣的,因此无需知道;但是如果consumer愿意感知这个事情,那么就可以使用 ConsumerRebalanceListener这个类来进行监听。

另外Consumer可以订阅特殊的partition,实现指定消费partition的功能。适用于一些特殊的场景,比如:消费者所要消费的partition与消费者具有某种联系;或者消费者本身具有高可用性,如果消费者挂掉了,没有必要让kafka来重新分配partition。使用TopicPartition来表示某一个topic的指定partition。

在kafka外部存储offset

允许在kafka外部存储offset,也就是consumer和kafka同时维护一个offset,消费者程序不一定要使用kafka内置的offset存储,而是可以自主选择offset的存储方式。如果能够实现offset和result的原子性保存,将会实现exactly once的事务性保证,要比kafka的offset提交机制所提供的at-least once更加强壮。比如使用外部数据库的事务来保存数据处理结果和offset的一致性,要么共同成功并存储,要么失败回滚。

使用方法:首先将auto.commit提交设置为false,然后使用 ConsumerRecord 来存储offset,需要定位时,使用seek即可。

支持多线程

通过引入wakeupException实现,原理类似于多线程中的InterruptException(通过WakeupException就可以对Consumer进行优雅的控制。而且多个线程公用一个Consumer,Consumer本身非线程安全,因此如果不加外部控制,会导致跑出ConcurrentModificationException。多线程很可能导致非顺序消费数据的问题,但是将消费和业务处理分离,耦合性降低

2、引入了安全管理机制:

    1. 客户端(producer和consumer)连接broker时,可以使用SSL或者SASL进行验证。
    1. 验证从broker到zookeeper的连接
    1. 使用SSL对broker和client之间,broker之间以及使用SSL的工具进行数据编码(这有可能导致性能恶化,取决于CPU和JVM实现)
    1. 验证客户端的读写操作
    1. 验证是一个可插拔式的服务,并且支持统一整个验证服务。

SSL或者SASL都是可选择项,如果需要使用,那么就需要进行额外的配置。

3、引入了Kafka Connect:

kafka connect是一个支持Scala的可靠工具。使用它来定义一个数据导入与导出的connector很容易。具有时延低,API操作简单的特征,支持分布式或单机模式。


个人介绍: 高广超:多年一线互联网研发与架构设计经验,擅长设计与落地高可用、高性能、可扩展的互联网架构。

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏FreeBuf

无线安全审计工具FruityWifi初体验

FruityWIfi是一款有名的无线安全审计的开源工具,其灵感来自于wifipineapple,目前该工具已经更新到2.4。它能够让用户通过web界面来控制和管...

2867
来自专栏北京马哥教育

Python框架:Django写图书管理系统(LMS)

今天我会带大家真正写一个Django项目,对于入门来说是有点难度的,因为逻辑比较复杂,但是真正的知识就是函数与面向对象,这也是培养用Django思维写项目的开始

2031
来自专栏Java工程师日常干货

RocketMQ实战(一)What is RocketMQ?初步理解Producer/Consumer Group install RocketMQ

阿里巴巴有2大核心的分布式技术,一个是OceanBase,另一个就是RocketMQ。在实际项目中已经领教过RocketMQ的强大,本人计划写一个RocketM...

1063
来自专栏架构之美

如何打造高可靠高性能的消息队列(ZZMQ)

互联网公司使用最频繁的服务调用组件是RPC框架,RPC同步调用有些场景并不是很适用,而这些场景刚好是一个可靠MQ的适用场景。

4374
来自专栏JAVA烂猪皮

RabbitMQ技术详解

RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、A...

1321
来自专栏Pythonista

centos7安装rabbitmq

RabbitMQ在全球范围内在小型初创公司和大型企业中进行了超过35,000次RabbitMQ生产部署,是最受欢迎的开源消息代理。

1922
来自专栏idealclover的填坑日常

Ubuntu 18.04/16.04系统安装网易云音乐无法启动或安装解决方案

由于netease-cloud-music_1.1.0_amd64_ubuntu.deb打包可能有问题,在Ubuntu 16.04/18.04版本中虽然可以安装...

6463
来自专栏地方网络工作室的专栏

Vue2+VueRouter2+Webpack+Axios 构建项目实战2017重制版(十三)集成 UEditor 百度富文本编辑器

Vue2+VueRouter2+Webpack+Axios 构建项目实战2017重制版(十三)集成 UEditor 百度富文本编辑器 前情回顾 通过前面系统的学...

3168
来自专栏FreeBuf

如何设置自己的Dionaea蜜罐来收集恶意软件样本

许多安全人员都热衷于恶意软件的逆向工程。在本文中我将教大家设置一个自己的Dionaea蜜罐,来协助我们恶意软件样本的收集工作。

1434
来自专栏匠心独运的博客

消息中间件—RocketMQ消息存储(一)一、MQ消息队列的一般存储方式二、RocketMQ消息存储整体架构三、RocketMQ文件存储模型层次结构四、总结

文章摘要:MQ分布式消息队列大致流程在于消息的一发一收一存,本篇将为大家主要介绍下RocketMQ存储部分的架构 消息存储是MQ消息队列中最为复杂和最为重要的...

5022

扫码关注云+社区

领取腾讯云代金券