我已经设置了在两个DC之间复制数据的MirrorMaker2。
我的mm2.properties,
# mm2.properties
name=source->dest
clusters=source, dest
source.bootstrap.servers=localhost:9091
dest.bootstrap.servers=localhost:9092
source->dest.enabled=true
offset.storage.partitions=2
config.storage.replication.factor=1
status.storage.replication.factor=1
查看下面关于MM2启动的内容。
[2020-02-16 07:31:07,547] INFO MirrorConnectorConfig values:
admin.timeout.ms = 60000
checkpoints.topic.replication.factor = 3
config.action.reload = restart
config.properties.blacklist = [follower\.replication\.throttled\.replicas, leader\.replication\.throttled\.replicas, message\.timestamp\.difference\.max\.ms, message\.timestamp\.type, unclean\.leader\.election\.enable, min\.insync\.replicas]
config.property.filter.class = class org.apache.kafka.connect.mirror.DefaultConfigPropertyFilter
connector.class = org.apache.kafka.connect.mirror.MirrorCheckpointConnector
consumer.poll.timeout.ms = 1000
emit.checkpoints.enabled = true
emit.checkpoints.interval.seconds = 60
emit.heartbeats.enabled = true
emit.heartbeats.interval.seconds = 1
enabled = true
errors.log.enable = false
errors.log.include.messages = false
errors.retry.delay.max.ms = 60000
errors.retry.timeout = 0
errors.tolerance = none
group.filter.class = class org.apache.kafka.connect.mirror.DefaultGroupFilter
groups = [.*]
groups.blacklist = [console-consumer-.*, connect-.*, __.*]
header.converter = null
heartbeats.topic.replication.factor = 3
key.converter = null
metric.reporters = null
name = source->dest
offset-syncs.topic.replication.factor = 3
offset.lag.max = 100
refresh.groups.enabled = true
refresh.groups.interval.seconds = 600
refresh.topics.enabled = true
refresh.topics.interval.seconds = 600
replication.factor = 2
replication.policy.class = class org.apache.kafka.connect.mirror.DefaultReplicationPolicy
replication.policy.separator = .
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
source.cluster.alias = source
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
sync.topic.acls.enabled = true
sync.topic.acls.interval.seconds = 600
sync.topic.configs.enabled = true
sync.topic.configs.interval.seconds = 600
target.cluster.alias = dest
task.assigned.groups = null
task.assigned.partitions = null
tasks.max = 1
topic.filter.class = class org.apache.kafka.connect.mirror.DefaultTopicFilter
topics = [.*]
topics.blacklist = [.*[\-\.]internal, .*\.replica, __.*]
transforms = []
value.converter = null
(org.apache.kafka.connect.mirror.MirrorConnectorConfig:347)
我的数据正在按预期进行复制。源主题在目标集群中创建为源。但是,消费者组偏移量没有被复制。
已在源群集中启动消费组。
./kafka-console-consumer.sh --bootstrap-server localhost:9091 --topic test-1 --group test-1-group
使用了很少的消息并阻止了它。在此主题中发布了新消息,并且mirror maker还将数据镜像到目标群集。
我尝试使用来自目标集群的消息,如下所示。
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic source.test-1 --group test-1-group
因为我使用相同的消费者组,所以我希望我的偏移量也是同步的,并且不会使用与我在cluster1中使用的相同的消息。但是,仍然使用所有的消息。我还有什么遗漏的吗。
发布于 2021-01-27 05:17:34
Kafka 2.7引入了“自动消费者偏移同步”。默认情况下,消费者偏移量不会在群集之间同步。您应该显式启用此功能。
support automated consumer offset sync across clusters in MM 2.0
发布于 2020-02-16 19:41:08
有几个基本原因可以解释为什么复制偏移量不是那么简单:
看看MM2的KIP,有一个提到的“偏移同步主题”。在您的代码中,您可以使用类RemoteClusterUtils在集群之间转换检查点:
Map<TopicPartition, Long> newOffsets = RemoteClusterUtils.translateOffsets(
newClusterProperties, oldClusterName, consumerGroupId
);
consumer.seek(newOffsets);
这是从以下演示文稿中摘录的- https://www.slideshare.net/ConfluentInc/disaster-recovery-with-mirrormaker-20-ryanne-dolan-cloudera-kafka-summit-london-2019
或者,您可以使用seek by timespamp将目标上的使用者组启动到将数据传送到目标(或传送到源,如果目标上的日志附加时间戳的代理设置不会覆盖这些时间)的大致时间。为了安全起见,你需要稍微倒带一下。
发布于 2020-08-17 11:57:10
正在按预期复制我的数据。源主题在目标集群中创建为源。但是,消费者组偏移量没有被复制。
默认情况下,MM2不会从kafka-console-consumer
复制消费者组。在启动时的MM2日志中,我们可以看到该groups.blacklist = [console-consumer-.*, connect-.*, __.*]
。我相信你的mm2.properties
配置文件中有you can override this。
因为我使用相同的消费者组,所以我希望我的偏移量也是同步的,并且不会使用与我在cluster1中使用的相同的消息。
正确镜像使用者组并启用检查点后,应该会在目标群集中自动创建一个内部主题(类似于dest.checkpoints.internal
)。此检查点主题包含每个使用者组中镜像主题分区的源群集中和目标群集中的上次提交偏移量。
然后,您可以使用Kafka的RemoteClusterUtils实用程序类来转换这些偏移量,并获得source.test-1
的同步偏移量,这些偏移量映射到消费者最后提交的test-1
偏移量。如果您最终使用Java语言创建了消费者,则可以将RemoteClusterUtils
作为依赖项添加到您的项目中:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-mirror-client</artifactId>
<version>2.4.0</version>
</dependency>
否则,您很可能需要编写一个包装RemoteClusterUtils.java
的工具来获取转换后的偏移量。此功能或something similar似乎计划作为future release for MM2的一部分。
https://stackoverflow.com/questions/60250330
复制相似问题