kafka的编程模型

1.kafka消费者编程模型

分区消费模型

组(group)消费模型

1.1.分区消费模型

1.1.1.分区消费架构图,每个分区对应一个消费者。

1.1.2.分区消费模型伪代码描述

指定偏移量,用于从上次消费的地方开始消费.

提交offset ,java客户端会自动提交的集群,所以这一步可选。

1.2.组(group)消费模型

1.2.1.组消费模型架构图

每个组都消费该topic的全量数据,一条消息会发给groupA和groupB.

1.2.2.组消费模型伪代码:

流数N:表示一个consumer组里面有几个consumer 实例,上例中组A创建2个流,组B创建4个流。

1.2.3.consumer分配算法

当kafka的分区个数大于组A里consumer实例个数时,怎么去分配,以下为分配步骤:

1.3.两种消费模型对比

Partition消费模型更加灵活但是:

(1)需要自己处理各种异常情况;

(2)需要自己管理offset(以实现消息传递的其他语义);

Group消费模型更加简单,但是不灵活:

(1)不需要自己处理异常情况,不需要自己管理offset;

(2)只能实现kafka默认的最少一次消息传递语义;

知识补充:消息传递的3中语义:

至少一次,(消息不会丢,消息者至少得到一次,但有可能会重复,生产者向消费者发送之后,会等待消费者确认,没收到确认会再发) (kafka 默认实现的语义)。

至多一次,(消息会丢)

有且只有一次。

1.4.java 客户端参数调优

fetchSize: 从服务器获取单包大小;

bufferSize: kafka客户端缓冲区大小;

group.id: 分组消费时分组名 (指定的每个组将获得全量的数据)

2.生产者消费模型

同步生产模型

异步生产模型

2.1. 同步生产模型

至少成功一次 , 发送给kafka消费者

2.2.异步生产模型

打包发送给kafka broker。

2.3.两种生产模型伪代码描述

main()

创建到kafka broker的连接:KafkaClient(host,port)

选择或者自定义生产者负载均衡算法 partitioner (算法有:hash,轮询,随机)

设置生产者参数 (缓存队列长度,发送时间,同步/异步参数设置)

根据负载均衡算法和设置的生产者参数构造Producer对象

while True

getMessage:从上游获得一条消息

按照kafka要求的消息格式构造kafka消息

根据分区算法得到分区

发送消息

处理异常

2.4.两种生产模型对比

同步生产模型:

(1)低消息丢失率;

(2)高消息重复率(由于网络原因,回复确认未收到);

(3)高延迟 (每发一条消息需要确认)

(使用在不丢消息场景)

异步生产模型:

(1)低延迟;

(2)高发送性能;(每秒一个分区发50万条)

(3)高消息丢失率(无确认机制,发送端队列满了,消息会丢掉;整个队列发送给)

(使用在允许丢消息场景,偶尔丢一条)

2.5.java客户端代码实现 (自定义分区)

//同步配置参数:

默认的序列化方式:字节序列化。

设定分区算法:默认是对key进行hash分区算法,可以自定义分区算法。

确认机制 request.require.acks: 合理设置为1; 0: 绝不等确认 1: leader的一个副本收到这条消息,并发回确认 -1: leader的所有副本都收到这条消息,并发回确认

消息是以key-value的形式发送的,key必须要设置。

2.6.java客户端参数调优

message.send.max.retries: 发送失败重试次数;

retry.backoff.ms :未接到确认,认为发送失败的时间;

producer.type: 同步发送或者异步发送;

batch.num.messages: 异步发送时,累计最大消息数;

queue.buffering.max.ms:异步发送时,累计最大时间;

本文版本主要是针对0.8.2,配套学习教程,浪尖已经分享到了知识星球。

原文发布于微信公众号 - Spark学习技巧(bigdatatip)

原文发表时间:2018-05-27

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏性能与架构

nginx 负载均衡策略

1. 轮询 轮询方式是nginx负载均衡的默认策略,根据每个server的权重值来轮流发送请求,例如: upstream backend { server...

3487
来自专栏匠心独运的博客

分布式定时任务Elastic-Job框架在SpringBoot工程中的应用实践(二)

文章摘要:在生产环境中部署Elastic-Job集群后,那么如何来运维监控线上跑着的定时任务呢? 如果在生产环境的大规模服务器集群上部署了集成Elastic-...

1732
来自专栏云计算教程系列

如何在Debian 9上安装R

R是一种开源编程语言,专门用于统计计算和图形。在R统计计算基础的支持下,它被广泛用于开发统计软件和执行数据分析。R是一种日益流行且可扩展的语言,具有活跃的社区,...

1300
来自专栏逆向技术

脱壳第三讲,UPX压缩壳,以及补充壳知识

           脱壳第三讲,UPX压缩壳,以及补充壳知识 一丶什么是压缩壳.以及壳的原理 在理解什么是压缩壳的时候,我们先了解一下什么是壳 1.什么是壳 ...

2518
来自专栏码洞

一种简单的Failover机制

在应用结构上有这样一个业务场景,机房里部署了多个物理数据库的Proxy无状态节点,业务端通过Proxy节点间接和存储DB交互。Proxy支持了分库分表的特性,管...

1092
来自专栏杨建荣的学习笔记

数据迁移工具简单分析 (r2笔记59天)

exp/imp 对于数据结构的复制和同步,还是比较理想的工具。 在数据量比较小的情况下,这个工具的性能要远远好于datapump,而且重点推荐,他对于各种常用...

2765
来自专栏编程

Python加圣诞帽

01 ? 前段时间好多人@官方微信许愿说要给自己的头像加圣诞帽,总觉得不太可靠,不晓得最后是PS的还是在微信小程序里搜索圣诞头像给自己戴的,嘿嘿~ 之前在Git...

24210
来自专栏云加头条

智能云上手指南:如何将历史数据迁移到万象优图

6 月 21 日,腾讯云在 2017「云+未来」峰会上推出了战略新品——智能云,宣布将腾讯积累近 20 年的 AI 能力向政府、企业和开发者开放,其中首批开放计...

3063
来自专栏杨建荣的学习笔记

海量数据迁移之数据抽取流程 (r4笔记第72天)

在之前的一些博文中花了大篇幅介绍了采用外部表抽取的一些细节,可能细节到了,基本原理的内容还希望再补充补充。 采用外部表抽取数据的流程图如下: ? 大体标注...

3434
来自专栏深度学习那些事儿

关于在ubuntu上源码安装TensorFLow-1.7.0-cuda9.1-cudnn7.1.2过程中问题解决方案

实验室服务器上装载的cuda版本是最新的9.1,和从官网下载下来的TensorFlow不匹配,因为官方的是通过cuda9.0进行编译的。因此,这篇文章讨论关于T...

2144

扫码关注云+社区