通过上一篇文章Kafka:MirrorMaker-V1我们已经知道了MirrorMaker-V1的基本概念,这篇文章我们来给Kafka-cluster搭建一个mirror。
操作系统: centOs7
java: OpenJDK 64-Bit Server VM (build 25.232-b09, mixed mode)
zookeeper: apache-zookeeper-3.6.1
MirrorMaker-V1是一个独立的工具,可以在任何能访问到两个Kafka-cluster的机器上启动
启动命令
bin/kafka-mirror-maker.sh --consumer.config config/consumer.properties --producer.config config/producer.properties --whitelist '.*sync'
分析一下这个命令
--consumer.config
指的是消费者配置文件路径
当然这里的消费者指的是MirrorMaker-V1,消费的数据来自于source-cluster。
#config/consumer.properties 在网上看到有在此配置zookeeper的应该是之前的老版本。kafka_2.11-2.4.1中不需要
bootstrap.servers=kafka-cluster1:9092,kafka-cluster1:9093 # source-cluster的broker list
group.id=test-consumer-group1 # 自定义一个消费者的group id
auto.offset.reset= # latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据; earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费; none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
--producer.config
指的是生产者配置文件路径
当然这里的消费者代表的也是MirrorMaker-V1,生产的数据目的地是destination-cluster
#config/producer.properties 在网上看到有在此配置zookeeper的应该是之前的老版本。kafka_2.11-2.4.1中不需要
bootstrap.servers=kafka-cluster2:9092,kafka-cluster2:9093 # destination-cluster的broker list
compression.type=none # 数据压缩方式none, gzip, snappy, lz4, zstd
partitioner.class= # 指定分区程序路径,默认为随机分区
request.timeout.ms= # 请求超时时间
max.block.ms= # `KafkaProducer.send` and `KafkaProducer.partitionsFor` 阻塞时间
linger.ms= # 等待指定时间后批量发送
max.request.size= # 发送消息最大字节数
batch.size= # 单次批量处理的字节数
buffer.memory= # 指定等待发送消息的缓冲区大小
--whitelist
指定的是同步topic的白名单,这是个必输项。可以用Java-style regular expressions按照正则表达式来订阅topics,--whitelist '.*sync'
代表的就是订阅所有以sync结尾的topic。
启动MirrorMaker-V1后,利用kafka-producer-perf-test.sh
向Kafka-cluster1中g_sync写入数据。
bin/kafka-producer-perf-test.sh --topic g_sync --num-records 10 --throughput 1 --producer-props bootstrap.servers=kafka-cluster1:9092,kafka-cluster1:9093 --record-size 100
7 records sent, 1.3 records/sec (0.00 MB/sec), 90.1 ms avg latency, 524.0 ms max latency.
10 records sent, 1.044714 records/sec (0.00 MB/sec), 65.80 ms avg latency, 524.00 ms max latency, 12 ms 50th, 524 ms 95th, 524 ms 99th, 524 ms 99.9th.
数据写入完成后在Kafka-cluster2中消费数据,发现数据已成功同步
bin/kafka-console-consumer.sh --bootstrap-server kafka-cluster2:9092,kafka-cluster2:9093 --topic g_sync --from-beginning
SSXVNJHPDQDXVCRASTVYBCWVMGNYKRXVZXKGXTSPSJDGYLUEGQFLAQLOCFLJBEPOWFNSOMYARHAOPUFOJHHDXEHXJBHWGSMZJGNL
SSXVNJHPDQDXVCRASTVYBCWVMGNYKRXVZXKGXTSPSJDGYLUEGQFLAQLOCFLJBEPOWFNSOMYARHAOPUFOJHHDXEHXJBHWGSMZJGNL
SSXVNJHPDQDXVCRASTVYBCWVMGNYKRXVZXKGXTSPSJDGYLUEGQFLAQLOCFLJBEPOWFNSOMYARHAOPUFOJHHDXEHXJBHWGSMZJGNL
SSXVNJHPDQDXVCRASTVYBCWVMGNYKRXVZXKGXTSPSJDGYLUEGQFLAQLOCFLJBEPOWFNSOMYARHAOPUFOJHHDXEHXJBHWGSMZJGNL
SSXVNJHPDQDXVCRASTVYBCWVMGNYKRXVZXKGXTSPSJDGYLUEGQFLAQLOCFLJBEPOWFNSOMYARHAOPUFOJHHDXEHXJBHWGSMZJGNL
SSXVNJHPDQDXVCRASTVYBCWVMGNYKRXVZXKGXTSPSJDGYLUEGQFLAQLOCFLJBEPOWFNSOMYARHAOPUFOJHHDXEHXJBHWGSMZJGNL
SSXVNJHPDQDXVCRASTVYBCWVMGNYKRXVZXKGXTSPSJDGYLUEGQFLAQLOCFLJBEPOWFNSOMYARHAOPUFOJHHDXEHXJBHWGSMZJGNL
SSXVNJHPDQDXVCRASTVYBCWVMGNYKRXVZXKGXTSPSJDGYLUEGQFLAQLOCFLJBEPOWFNSOMYARHAOPUFOJHHDXEHXJBHWGSMZJGNL
SSXVNJHPDQDXVCRASTVYBCWVMGNYKRXVZXKGXTSPSJDGYLUEGQFLAQLOCFLJBEPOWFNSOMYARHAOPUFOJHHDXEHXJBHWGSMZJGNL
SSXVNJHPDQDXVCRASTVYBCWVMGNYKRXVZXKGXTSPSJDGYLUEGQFLAQLOCFLJBEPOWFNSOMYARHAOPUFOJHHDXEHXJBHWGSMZJGNL
注意:经过多次验证发现,当kafka-cluster1中创建topic后需要重启MirrorMaker,才能在kafka-cluster2中自动创建topic并完成数据同步。这个问题cosmozhu还没有找到解决方法,有解决了这个问题的同学,可以留言给我。