Loading [MathJax]/jax/output/CommonHTML/config.js
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
社区首页 >问答首页 >flink kafka消费者groupId不起作用

flink kafka消费者groupId不起作用
EN

Stack Overflow用户
提问于 2016-07-28 06:14:03
回答 2查看 5.2K关注 0票数 6

我用的是卡夫卡和flink。在一个简单的程序中,我使用flink FlinkKafkaConsumer09,将组id分配给它。

根据Kafka的行为,当我使用相同的group.Id在同一主题上运行两个消费者时,它应该像消息队列一样工作。我认为这应该是这样的:如果向Kafka发送2条消息,那么flink程序中的每条或一条将处理这2条消息完全两次(假设总共有2行输出)。

但是实际的结果是,每个程序将接收2条消息。

我尝试使用随kafka服务器下载的消费者客户端。它以记录的方式工作(处理了2条消息)。

我试图使用两个卡夫卡消费者在相同的主要功能的flink程序。共处理4条信息。

我还试着运行两个flink实例,并为每个实例分配了同样的kafka消费者程序。4条信息。

有什么想法吗?这是我期望的输出:

代码语言:javascript
运行
AI代码解释
复制
1> Kafka and Flink2 says: element-65  
2> Kafka and Flink1 says: element-66 

以下是我经常得到的错误输出:

代码语言:javascript
运行
AI代码解释
复制
1> Kafka and Flink2 says: element-65  
1> Kafka and Flink1 says: element-65  
2> Kafka and Flink2 says: element-66  
2> Kafka and Flink1 says: element-66 

下面是代码段:

代码语言:javascript
运行
AI代码解释
复制
public static void main(String[] args) throws Exception {

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    ParameterTool parameterTool = ParameterTool.fromArgs(args);

    DataStream<String> messageStream = env.addSource(new FlinkKafkaConsumer09<>(parameterTool.getRequired("topic"), new SimpleStringSchema(), parameterTool.getProperties()));

    messageStream.rebalance().map(new MapFunction<String, String>() {
        private static final long serialVersionUID = -6867736771747690202L;

        @Override
        public String map(String value) throws Exception {
            return "Kafka and Flink1 says: " + value;
        }
    }).print();


    env.execute();
}

我尝试过两次运行它,并以另一种方式运行:在主函数中为每个数据流和env.execute()创建2个数据流。

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2016-07-28 23:32:10

今天在Flink用户邮件列表上有一个类似的问题,但是我在这里找不到它的链接。在这里,答案的一部分是:

在内部,Flink Kafka连接器不使用使用者组管理功能,因为它们在每个并行实例上使用更低级别的API (SimpleConsumer在0.8,而KafkaConsumer#assign(…)在0.9)来更多地控制单个分区的消耗。因此,Flink Kafka连接器中的“group.id”设置基本上只用于将偏移提交回ZK / Kafka代理。

也许这为你澄清了一些事情。

另外,还有一篇关于与Flink和Kafka合作的博客文章,这可能对你有帮助(https://data-artisans.com/blog/kafka-flink-a-practical-how-to)。

票数 6
EN

Stack Overflow用户

发布于 2016-08-04 03:38:13

由于flink卡夫卡消费者对group.id的使用不多,除了向动物园管理员提供补偿外,没有更多的使用。对于flink卡夫卡的消费者来说,是否有任何抵消监控的方法。我可以看到,有一种方法是在消费者组/消费者抵消检查器的帮助下,用于控制台消费者,而不是flink kafka消费者。

我们想看看我们的flink kafka消费者是如何落后/滞后于kafka主题的--在给定的时间点上,主题中的消息总数相当多,在分区级别上使用它是可以的。

票数 -1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/38639019

复制
相关文章
python for循环多个参数处理_python for循环嵌套
实际上,“使用for循环遍历数组的最简单方法”(Python类型被命名为“list”BTW)是第二种方法,即for item in somelist:
全栈程序员站长
2022/09/22
2.3K0
PythonWebServer如何同时处理多个请求
源于知乎上一个问题:https://www.zhihu.com/question/56472691/answer/293292349
the5fire
2019/03/01
1.9K0
【Python】函数进阶 ① ( 函数返回多个返回值 | 函数参数传递类型简介 | 位置参数 | 关键字参数 )
在函数中 , 如果要 返回 多个返回值 , 可以 在 return 语句中 , 设置多个返回值 , 这些返回值之间使用 逗号 隔开 , 这些返回值的类型是 元组 tuple 类型的 ;
韩曙亮
2023/10/11
1.6K0
【说站】python关键字如何传递参数
1、关键词传递以“形参变量名=实参”的形式参与实参关联,根据形参的名称进行参数传递,使实参和形参的顺序不一致。
很酷的站长
2022/11/23
9460
Python函数参数总结(位置参数、默认参数、可变参数、关键字参数和命名关键字参数)
Python函数的参数多达5种,不像Java那样参数只有一种,而是像C++那样提供默认参数,除此之外,还提供可变参数、关键字参数、命名关键字参数,这样就使得Python函数的参数变得十分复杂。但复杂意味着灵活便捷,Python语言之所以流行起来,与起本身巨大的灵活性是分不开的。可以说Python是最方便使用的语言。 Python参数类型: - 位置参数(positional arguments,官方定义,就是其他语言所说的参数) - 默认参数(类似C++的默认参数) - 可
Steve Wang
2018/02/05
22.3K0
Python函数参数总结(位置参数、默认参数、可变参数、关键字参数和命名关键字参数)
Golang 获取GET、POST参数及处理上传多个文件
http.request的三个属性Form、PostForm、MultipartForm:
IT工作者
2022/06/30
2.1K0
Python 的 map 函数也可以处理多个参数的函数
比起 map 带给程序员的理解负担,大多数 Python 程序员更喜欢列表推导来解决问题。
somenzz
2022/10/25
2.9K0
Python 的 map 函数也可以处理多个参数的函数
python返回多个参数
比如在游戏中经常需要从一个点移动到另一个点,给出坐标、位移和角度,就可以计算出新的新的坐标:
狼啸风云
2020/07/16
2.7K0
Python 关键字参数和可变参数
如果你有一些具有许多参数的函数,而你又希望只对其中的一些进行指定,那么你可以通过命名它们来给这些参数赋值——这就是python关键字参数(Keyword Arguments)——我们使用命名(关键字)而非位置(一直以来我们所使用的方式)来指定函数中的参数。
py3study
2020/01/16
1.2K0
JS中如何处理多个ajax并发请求?
通常 为了减少页面加载时间,先把核心内容显示处理,页面加载完成后再发送ajax请求获取其他数据 这时就可能产生多个ajax请求,为了用户体验,最好是发送并行请求,这就产生了并发问题,应该如何处理? (1)并行改串行 如果业务逻辑和用户体验允许的情况下,可以改为串行,处理起来最简单 function async1(){ //do sth... async2(); } function async2(){ //do sth... } async1(); (2)回调计数 function
dys
2018/04/02
5.5K0
函数的关键字参数
def func1(name, age, sex, *args): ''' 打印姓名,年龄,性别 ''' print(name) print(age) print(sex) # func1('python', 28,sex='man','s','23') ## 工作经验:不定长参数都是放到最后 func1(name='python', age=30, sex='nv') help(func1) # def func1(a,b, c=100,*args,**
汪凡
2018/05/29
5470
Seata如何处理跨多个请求的事务?
Seata 是一种开源的分布式事务解决方案,能够处理跨多个请求的事务,适用于各种容器、语言和数据访问类型。在微服务架构下,依赖多个服务的操作可能导致分布式事务的问题。Seata 提供了完整的解决方案以确保数据的一致性和可靠性。
用户1289394
2023/09/11
2940
Seata如何处理跨多个请求的事务?
如何将多个参数传递给 React 中的 onChange?
在 React 中,一些 HTML 元素,比如 input 和 textarea,具有 onChange 事件。onChange 事件是一个非常有用、非常常见的事件,用于捕获输入框中的文本变化。有时候,我们需要将多个参数同时传递给 onChange 事件处理函数,在本文中,我们将介绍如何实现这一目标。
网络技术联盟站
2023/06/07
2.8K0
Logstash:处理多个 input
这里的 input 可以支持多个 input,同时多个 worker 可以处理 filter 及 output:
腾讯云大数据
2020/07/31
2.8K0
Logstash:处理多个 input
如何优雅地处理命令行参数?
我们在Linux用到的命令常常支持很多参数,那么如何写一个程序,也像Linux命令一样支持很多参数呢?有什么什么优雅的处理方法?
编程珠玑
2019/08/21
1.1K0
如何给PHP添加多个错误处理函数
一些常规的PHP框架都会对PHP的错误、异常进行异常处理封装,方便框架日志记录,开发的时候方便处理。我们先看看几个框架错误处理:
写PHP的老王
2019/08/12
1.9K0
如何给PHP添加多个错误处理函数
图像处理中C++如何同时返回多个值
1 . pair<T,T> 返回两个值 //返回两个值的情况 pair<vector<double>,int> R_R(Mat& img) { int n=img.rows; vector<double> a; unsigned int m=img.cols; unsigned int k=img.rows; for(unsigned int i=0;i<m;i++) { for(unsigned int j=0;j<k;j++)
用户9831583
2022/06/16
8000
mybatis 使用tips - 使用多个参数
执行如下命令: mvn -Dmybatis.generator.overwrite=true mybatis-generator:generate 可以使用mybatis generator mybatis 使用多个参数 自定义方法需要根据多个查询条件去查询: SELECT * FROM `db_demo`.`hot_topic` WHERE lang='english' AND category='017' AND topic_type='video' ORDER BY score DESC; 推荐使用
千往
2018/01/24
1.7K0
方法关键字WebMethod,参数关键字Abstract,Constraint,Deprecated
指定此方法是否为web method。仅适用于定义为web service或web客户端的类。
用户7741497
2022/07/07
4210
Swift 使用lexicographicallyprecedes 多个参数排序
版权声明:转载请标明出处 https://blog.csdn.net/ZY_FlyWay/article/details/89184264
星宇大前端
2019/04/18
1.2K0
Swift  使用lexicographicallyprecedes 多个参数排序

相似问题

如何解决ubuntu中unicode显示错误(不同)字符的问题?

10

需要哪种字体才能显示特定的Unicode字符?

10

WSL中缺少的字体

30

在bash提示符中更改unicode字符的字体大小

10

缺少Ubuntu字体

10
添加站长 进交流群

领取专属 10元无门槛券

AI混元助手 在线答疑

扫码加入开发者社群
关注 腾讯云开发者公众号

洞察 腾讯核心技术

剖析业界实践案例

扫码关注腾讯云开发者公众号
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
查看详情【社区公告】 技术创作特训营有奖征文