首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
社区首页 >问答首页 >Kafka MirrorMaker2 -未镜像消费者组偏移量

Kafka MirrorMaker2 -未镜像消费者组偏移量
EN

Stack Overflow用户
提问于 2020-02-16 15:58:05
回答 4查看 4.4K关注 0票数 6

我已经设置了在两个DC之间复制数据的MirrorMaker2。

我的mm2.properties,

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
# mm2.properties
name=source->dest
clusters=source, dest

source.bootstrap.servers=localhost:9091
dest.bootstrap.servers=localhost:9092

source->dest.enabled=true

offset.storage.partitions=2
config.storage.replication.factor=1
status.storage.replication.factor=1

查看下面关于MM2启动的内容。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
[2020-02-16 07:31:07,547] INFO MirrorConnectorConfig values: 
    admin.timeout.ms = 60000
    checkpoints.topic.replication.factor = 3
    config.action.reload = restart
    config.properties.blacklist = [follower\.replication\.throttled\.replicas, leader\.replication\.throttled\.replicas, message\.timestamp\.difference\.max\.ms, message\.timestamp\.type, unclean\.leader\.election\.enable, min\.insync\.replicas]
    config.property.filter.class = class org.apache.kafka.connect.mirror.DefaultConfigPropertyFilter
    connector.class = org.apache.kafka.connect.mirror.MirrorCheckpointConnector
    consumer.poll.timeout.ms = 1000
    emit.checkpoints.enabled = true
    emit.checkpoints.interval.seconds = 60
    emit.heartbeats.enabled = true
    emit.heartbeats.interval.seconds = 1
    enabled = true
    errors.log.enable = false
    errors.log.include.messages = false
    errors.retry.delay.max.ms = 60000
    errors.retry.timeout = 0
    errors.tolerance = none
    group.filter.class = class org.apache.kafka.connect.mirror.DefaultGroupFilter
    groups = [.*]
    groups.blacklist = [console-consumer-.*, connect-.*, __.*]
    header.converter = null
    heartbeats.topic.replication.factor = 3
    key.converter = null
    metric.reporters = null
    name = source->dest
    offset-syncs.topic.replication.factor = 3
    offset.lag.max = 100
    refresh.groups.enabled = true
    refresh.groups.interval.seconds = 600
    refresh.topics.enabled = true
    refresh.topics.interval.seconds = 600
    replication.factor = 2
    replication.policy.class = class org.apache.kafka.connect.mirror.DefaultReplicationPolicy
    replication.policy.separator = .
    sasl.client.callback.handler.class = null
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.login.callback.handler.class = null
    sasl.login.class = null
    sasl.login.refresh.buffer.seconds = 300
    sasl.login.refresh.min.period.seconds = 60
    sasl.login.refresh.window.factor = 0.8
    sasl.login.refresh.window.jitter = 0.05
    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT
    source.cluster.alias = source
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
    ssl.endpoint.identification.algorithm = https
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLS
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    sync.topic.acls.enabled = true
    sync.topic.acls.interval.seconds = 600
    sync.topic.configs.enabled = true
    sync.topic.configs.interval.seconds = 600
    target.cluster.alias = dest
    task.assigned.groups = null
    task.assigned.partitions = null
    tasks.max = 1
    topic.filter.class = class org.apache.kafka.connect.mirror.DefaultTopicFilter
    topics = [.*]
    topics.blacklist = [.*[\-\.]internal, .*\.replica, __.*]
    transforms = []
    value.converter = null
 (org.apache.kafka.connect.mirror.MirrorConnectorConfig:347)

我的数据正在按预期进行复制。源主题在目标集群中创建为源。但是,消费者组偏移量没有被复制。

已在源群集中启动消费组。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
./kafka-console-consumer.sh --bootstrap-server localhost:9091 --topic test-1 --group test-1-group

使用了很少的消息并阻止了它。在此主题中发布了新消息,并且mirror maker还将数据镜像到目标群集。

我尝试使用来自目标集群的消息,如下所示。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic source.test-1 --group test-1-group

因为我使用相同的消费者组,所以我希望我的偏移量也是同步的,并且不会使用与我在cluster1中使用的相同的消息。但是,仍然使用所有的消息。我还有什么遗漏的吗。

EN

回答 4

Stack Overflow用户

发布于 2021-01-27 05:17:34

Kafka 2.7引入了“自动消费者偏移同步”。默认情况下,消费者偏移量不会在群集之间同步。您应该显式启用此功能。

support automated consumer offset sync across clusters in MM 2.0

票数 4
EN

Stack Overflow用户

发布于 2020-02-16 19:41:08

有几个基本原因可以解释为什么复制偏移量不是那么简单:

  1. kafka是一个至少一次的系统(忽略炒作)。这意味着mirror maker,因为它构建在kafka消费者和生产者之上,可以每次超时/断开连接,这将导致一定程度的重复记录被传递到目的地。这意味着偏移量不会在源和目标之间1:1映射。即使您尝试并使用“只有一次”支持( MM2 KIP明确表示不使用),它所做的只是跳过部分交付的批次,但如果您在源主题开始过期记录很久之后设置镜像,这些批次仍然会占用目标
  2. 的偏移量,您的目标主题将从偏移量0开始,而源主题将具有更高的“最旧”偏移量。有人试图解决这个问题(请参阅KIP-391),但它从未被接受,
  3. 通常不能保证您的镜像拓扑从单个源镜像到单个目的地。例如,the linkedin topology将多个源群集镜像到“聚合”层群集中。对于此类拓扑,映射偏移量是没有意义的

看看MM2的KIP,有一个提到的“偏移同步主题”。在您的代码中,您可以使用类RemoteClusterUtils在集群之间转换检查点:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
Map<TopicPartition, Long> newOffsets = RemoteClusterUtils.translateOffsets(
   newClusterProperties, oldClusterName, consumerGroupId
);
consumer.seek(newOffsets);

这是从以下演示文稿中摘录的- https://www.slideshare.net/ConfluentInc/disaster-recovery-with-mirrormaker-20-ryanne-dolan-cloudera-kafka-summit-london-2019

或者,您可以使用seek by timespamp将目标上的使用者组启动到将数据传送到目标(或传送到源,如果目标上的日志附加时间戳的代理设置不会覆盖这些时间)的大致时间。为了安全起见,你需要稍微倒带一下。

票数 3
EN

Stack Overflow用户

发布于 2020-08-17 11:57:10

正在按预期复制我的数据。源主题在目标集群中创建为源。但是,消费者组偏移量没有被复制。

默认情况下,MM2不会从kafka-console-consumer复制消费者组。在启动时的MM2日志中,我们可以看到该groups.blacklist = [console-consumer-.*, connect-.*, __.*]。我相信你的mm2.properties配置文件中有you can override this

因为我使用相同的消费者组,所以我希望我的偏移量也是同步的,并且不会使用与我在cluster1中使用的相同的消息。

正确镜像使用者组并启用检查点后,应该会在目标群集中自动创建一个内部主题(类似于dest.checkpoints.internal)。此检查点主题包含每个使用者组中镜像主题分区的源群集中和目标群集中的上次提交偏移量。

然后,您可以使用Kafka的RemoteClusterUtils实用程序类来转换这些偏移量,并获得source.test-1的同步偏移量,这些偏移量映射到消费者最后提交的test-1偏移量。如果您最终使用Java语言创建了消费者,则可以将RemoteClusterUtils作为依赖项添加到您的项目中:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>connect-mirror-client</artifactId>
    <version>2.4.0</version>
</dependency>

否则,您很可能需要编写一个包装RemoteClusterUtils.java的工具来获取转换后的偏移量。此功能或something similar似乎计划作为future release for MM2的一部分。

票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/60250330

复制
相关文章
Kafka消费者组
消费者组是 Kafka 独有的概念,消费者组是 Kafka 提供的可扩展且具有容错性的消费者机制。
程序员酷森
2020/10/18
1.9K0
Kafka消费者组是什么?
Consumer Group 是Kafka提供的可扩展且具有容错性的消费者机制。在组内多个消费者实例(Consumer Instance ),它们共享一个公共的ID即 Group ID 。组内的所有消费者协调在一起消费订阅主题(Subscribed Topics)的所有分区(Partition)。当然一个分区只能有同一个消费者组的一个Consumer 实例消费。 Consumer Group 有三个特性:
chengcheng222e
2021/11/04
4.7K0
kafka的消费者组(下)
上一文对消费者组的一些概念,基本原理进行了简单描述,本文继续来聊聊消费者组中另外一个比较重要的内容:偏移量的存储。
陈猿解码
2023/02/28
8020
kafka的消费者组(下)
Kafka 新版消费者 API(二):提交偏移量
最简单的提交方式是让消费者自动提交偏移量。如果 enable.auto.commit 被设为 true,那么每过 5s,消费者会自动把从 poll() 方法接收到的最大偏移量提交上去。提交时间间隔由 auto.commit.interval.ms 控制,默认值是5s。消费者每次获取新数据时都会先把上一次poll()方法返回的最大偏移量提交上去。
CoderJed
2018/09/13
5.7K1
Kafka消费者 之 如何提交消息的偏移量
由于消费者模块的知识涉及太多,所以决定先按模块来整理知识,最后再进行知识模块汇总。
create17
2019/07/12
3.8K0
Kafka消费者 之 如何提交消息的偏移量
kafka的消费者组(上)
最近在排查一个sparkstreaming在操作kafka时,rebalance触发了一个异常引起任务失败,而组内小伙伴对消费者组的一些基本知识不是很了解,所以抽了些时间进行相关原理的整理。本文就来聊聊相关内容。
陈猿解码
2023/02/28
9410
kafka的消费者组(上)
Kafka 3.0 重磅发布,有哪些值得关注的特性?
Apache Kafka 是一个分布式开源流平台,被广泛应用于各大互联网公司。Kafka 设计之初被用于消息队列,自 2011 年由 LinkedIn 开源以来,Kafka 迅速从消息队列演变为成熟的事件流处理平台。
大数据技术架构
2021/10/21
2K0
Kafka 3.0 重磅发布,有哪些值得关注的特性?
【kafka原理】消费者提交已消费的偏移量
那在上一篇文章中我们了解了 消费者偏移量__consumer_offsets_,知道了 消费者在消费了消息之后会把消费的offset 更新到以 名称为__consumer_offsets_的内置Topic中; 每个消费组都有维护一个当前消费组的offset; 那么就会有以下疑问
石臻臻的杂货铺[同名公众号]
2021/07/14
1.5K2
跨数据中心下的 Kafka 高可用架构分析
导语 本文介绍了 Kafka 跨数据中心的两种部署方式,简要分析两种方式下的不同架构以及优缺点,对这些架构可能碰到的问题也提供了一些解决思路;同时也说明了 Kafka 跨数据中心部署的社区解决方案和商业化解决方案。 背景 Kafka 作为世界上最流行的消息中间件之一,一般是客户数据链路中的核心组件,高可用性是客户很关注的因素。近期在对接云上客户时发现,客户对 Kafka 的高可用也有需求,行业架构师也想了解 Kafka 高可用的方案细节;有些客户是需要云上 Kafka 的高可用能力,有些客户需要 IDC
腾讯云中间件团队
2023/04/28
1.8K0
跨数据中心下的 Kafka 高可用架构分析
【kafka原理】 消费者偏移量__consumer_offsets_相关解析
我们在kafka的log文件中发现了还有很多以 __consumer_offsets_的文件夹;总共50个;
石臻臻的杂货铺[同名公众号]
2021/07/14
6.1K0
Kafka 为什么使用消费者组?
所有消费者一起消费所有的分区,例如 C1 和 C2 共同完成了对 P0、P1、P2、P3 的消费。
dys
2019/11/19
2K0
Kafka 为什么使用消费者组?
Kafka 3.0发布,这几个新特性非常值得关注!
Apache Kafka 是一个分布式开源流平台,被广泛应用于各大互联网公司。Kafka 设计之初被用于消息队列,自 2011 年由 LinkedIn 开源以来,Kafka 迅速从消息队列演变为成熟的事件流处理平台。
大数据老哥
2021/11/04
3.6K0
Kafka 3.0重磅发布,弃用 Java 8 的支持!
Kafka 具有四个核心 API,借助这些 API,Kafka 可以用于以下两大类应用:
Spark学习技巧
2022/01/13
2.3K0
Kafka 3.0重磅发布,弃用 Java 8 的支持!
Kafka 3.0重磅发布,都更新了些啥?
Kafka 设计之初被用于消息队列,自 2011 年由 LinkedIn 开源以来,Kafka 迅速从消息队列演变为成熟的事件流处理平台。
用户2781897
2021/10/14
2.2K0
面试系列-kafka偏移量提交
Kafka 中默认的消费位移的提交方式为自动提交,这个由消费者客户端参数 enable.auto.commit 配置,默认值为 true 。这个默认的自动提交不是每消费一条消息就提交一次,而是定期提交,这个定期的周期时间由客户端 auto.commit.interval.ms 配置,默认值为 5 秒,此参数生效的前提是 enable.auto.commit 参数为 true;
用户4283147
2022/12/29
1.1K0
面试系列-kafka偏移量提交
8.【kafka运维】kafka-dump-log.sh消费者组管理
配置项为log.index.size.max.bytes; 来控制创建索引的大小;
石臻臻的杂货铺[同名公众号]
2021/08/19
4640
7.【kafka运维】 kafka-consumer-groups.sh消费者组管理
先调用MetadataRequest拿到所有在线Broker列表 再给每个Broker发送ListGroupsRequest请求获取 消费者组数据
石臻臻的杂货铺[同名公众号]
2021/08/16
8.5K0
怎么理解Kafka消费者与消费组之间的关系?
与生产者对应的是消费者,应用程序可以通过 KafkaConsumer 来订阅主题,并从订阅的主题中拉取消息。不过在使用 KafkaConsumer 消费消息之前需要先了解消费者和消费组的概念,否则无法理解如何使用KafkaConsumer。
码农架构
2021/02/08
2.2K0
kafka消费者
当有多个应用程序都需要从Kafka获取消息时,让每个app对应一个消费者组,从而使每个应用程序都能获取一个或多个Topic的全部消息;在每个消费者组中,往消费者组中添加消费者来伸缩读取能力和处理能力,消费者组中的每个消费者只处理每个Topic的一部分的消息,每个消费者对应一个线程。
yiduwangkai
2022/01/20
9690
点击加载更多

相似问题

Kafka:描述消费者组偏移量

20

如何删除Kafka消费者组重置偏移量?

5253

删除未使用的kafka消费者组

23

KAFKA / AWS MSK,无法重置消费者组偏移量

11

为消费者组中未连接的kafka主题设置偏移量,而不停止消费者

12
添加站长 进交流群

领取专属 10元无门槛券

AI混元助手 在线答疑

扫码加入开发者社群
关注 腾讯云开发者公众号

洞察 腾讯核心技术

剖析业界实践案例

扫码关注腾讯云开发者公众号
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
查看详情【社区公告】 技术创作特训营有奖征文