我用的是卡夫卡和flink。在一个简单的程序中,我使用flink FlinkKafkaConsumer09,将组id分配给它。
根据Kafka的行为,当我使用相同的group.Id在同一主题上运行两个消费者时,它应该像消息队列一样工作。我认为这应该是这样的:如果向Kafka发送2条消息,那么flink程序中的每条或一条将处理这2条消息完全两次(假设总共有2行输出)。
但是实际的结果是,每个程序将接收2条消息。
我尝试使用随kafka服务器下载的消费者客户端。它以记录的方式工作(处理了2条消息)。
我试图使用两个卡夫卡消费者在相同的主要功能的flink程序。共处理4条信息。
我还试着运行两个flink实例,并为每个实例分配了同样的kafka消费者程序。4条信息。
有什么想法吗?这是我期望的输出:
1> Kafka and Flink2 says: element-65
2> Kafka and Flink1 says: element-66
以下是我经常得到的错误输出:
1> Kafka and Flink2 says: element-65
1> Kafka and Flink1 says: element-65
2> Kafka and Flink2 says: element-66
2> Kafka and Flink1 says: element-66
下面是代码段:
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
ParameterTool parameterTool = ParameterTool.fromArgs(args);
DataStream<String> messageStream = env.addSource(new FlinkKafkaConsumer09<>(parameterTool.getRequired("topic"), new SimpleStringSchema(), parameterTool.getProperties()));
messageStream.rebalance().map(new MapFunction<String, String>() {
private static final long serialVersionUID = -6867736771747690202L;
@Override
public String map(String value) throws Exception {
return "Kafka and Flink1 says: " + value;
}
}).print();
env.execute();
}
我尝试过两次运行它,并以另一种方式运行:在主函数中为每个数据流和env.execute()创建2个数据流。
发布于 2016-07-28 23:32:10
今天在Flink用户邮件列表上有一个类似的问题,但是我在这里找不到它的链接。在这里,答案的一部分是:
在内部,Flink Kafka连接器不使用使用者组管理功能,因为它们在每个并行实例上使用更低级别的API (SimpleConsumer在0.8,而KafkaConsumer#assign(…)在0.9)来更多地控制单个分区的消耗。因此,Flink Kafka连接器中的“group.id”设置基本上只用于将偏移提交回ZK / Kafka代理。
也许这为你澄清了一些事情。
另外,还有一篇关于与Flink和Kafka合作的博客文章,这可能对你有帮助(https://data-artisans.com/blog/kafka-flink-a-practical-how-to)。
发布于 2016-08-04 03:38:13
由于flink卡夫卡消费者对group.id的使用不多,除了向动物园管理员提供补偿外,没有更多的使用。对于flink卡夫卡的消费者来说,是否有任何抵消监控的方法。我可以看到,有一种方法是在消费者组/消费者抵消检查器的帮助下,用于控制台消费者,而不是flink kafka消费者。
我们想看看我们的flink kafka消费者是如何落后/滞后于kafka主题的--在给定的时间点上,主题中的消息总数相当多,在分区级别上使用它是可以的。
https://stackoverflow.com/questions/38639019
复制相似问题