专栏首页JavaJourney整活了!结合API操作Kafka集群,理解producer&consumer&topic&partition

整活了!结合API操作Kafka集群,理解producer&consumer&topic&partition

Kafka集群搭建

本文通过实操Kafka的API来理解topic、partition等相关概念,我将通过搭建一个Kafka集群来实现它。

Kafka集群依赖于ZooKeeper对其Broker进行协调管理,所以我们也需要考虑搭建一个ZooKeeper集群。

主机规划

主机名称

角色

IP 地址

基础软件

node01

kafka 集群节点

192.168.242.118

JDK1.8

node02

kafka 集群节点、ZooKeeper 集群节点

192.168.242.117

JDK1.8

node03

kafka 集群节点、ZooKeeper 集群节点

192.168.242.116

JDK1.8

node04

ZooKeeper 集群节点

192.168.242.115

JDK1.8

其中:

  • IP 地址与主机名之间的映射关系配置好。编辑/etc/hosts,添加如下内容:
192.168.242.118 node01
192.168.242.117 node02
192.168.242.116 node03
192.168.242.115 node04
  • node01、node02、node03 为 Kafka 集群节点,node02、node03、node04 为 ZooKeeper 集群节点。
  • ZooKeeper 集群已搭建完毕。

附 ZooKeeper 集群搭建过程: 在node02:

  1. tar -zxf apache-zookeeper-3.5.8-bin.tar.gz
  2. mv apache-zookeeper-3.5.8-bin /usr/local/zookeeper
  3. cd /usr/local/zookeeper/conf
  4. cp zoo_example.cfg zoo.cfg
  5. vi zoo.cfg

设置 dataDir=/var/zookeeper 末尾添加:server.1=node02:2888:3888 server.2=node03:2888:3888 server.3=node04:2888:3888

  1. mkdir -p /var/zookeeper
  2. echo 1 > /var/zookeeper/myid
  3. vi /etc/profile

export JAVA_HOME=/usr/local/java export ZK_HOME=/usr/local/zookeeper export PATH=

PATH:

JAVA_HOME/bin:$ZK_HOME/bin

  1. source /etc/profile
  2. scp分发zk相关配置到node03、node04

scp -r /usr/local/zookeeper/ root@node03:/usr/local/ scp /usr/local/zookeeper/conf/zoo.cfg root@node03:/usr/local/zookeeper/conf/ scp /etc/profile root@node03:/etc scp -r /usr/local/zookeeper/ root@node04:/usr/local/ scp /usr/local/zookeeper/conf/zoo.cfg root@node04:/usr/local/zookeeper/conf/ scp /etc/profile root@node04:/etc

  1. 在node03

mkdir -p /var/zookeeper echo 2 > /var/zookeeper/myid source /etc/profile

  1. 在node04

mkdir -p /var/zookeeper echo 3 > /var/zookeeper/myid source /etc/profile

  1. 启动zk集群,在node02/node03/node04三个节点均执行

zkServer.sh start

ZK集群搭建完成后,可用zkServer.sh status查看ZK集群状态:

node02,follower:

node03,leader:

node04,leader:

Kafka 集群

解压完Kafka安装文件后,修改配置文件config/server.properties

broker.id=0

listeners=PLAINTEXT://node01:9092

log.dirs=/var/kafka-logs

zookeeper.connect=node02:2181,node03:2181,node04:2181/kafka

与ZK集群搭建一样,使用SCP分发,注意修改 broker.idlisteners

这里值得注意的是ZK连接配置项要带上/kafka。

凡是使用 ZooKeeper 的技术,一般按照项目部门之类的加一个节点路径,不要在 ZK 根节点创建自己的东西,防止难以维护。

配置Kafka环境变量,方便使用Kafka命令,编辑文件/etc/profile

export JAVA_HOME=/usr/local/java
export KAFKA_HOME=/usr/local/kafka
export PATH=$PATH:$JAVA_HOME/bin:$KAFKA_HOME/bin

启动 Kafka 集群

每台Kafka集群节点执行命令kafka-server-start.sh

# 前台启动
kafka-server-start.sh $KAFKA_HOME/config/server.properties

# 后台启动
kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties

# 查看topic
kafka-topics.sh --bootstrap-server node01:9092,node02:9092,node03:9092 --list

启动Kafka之后,再来看一下 ZK 节点:

[zk: localhost:2181(CONNECTED) 7] ls /
[kafka, zookeeper]
[zk: localhost:2181(CONNECTED) 5] ls /kafka
[admin, brokers, cluster, config, consumers, controller, controller_epoch, isr_change_notification, latest_producer_id_block, log_dir_event_notification]

多了个 kafka 节点,这是可以想到为什么之前的配置文件zookeeper.connect=node02:2181,node03:2181,node04:2181/kafka这里最后要加个/kafka了,就是 kafka 启动之后生成了很多内容,如果都放到 zk 根节点将很难维护。

Kafka 生成的一些内容:

[zk: localhost:2181(CONNECTED) 8] ls /kafka/cluster
[id]
[zk: localhost:2181(CONNECTED) 9] get /kafka/cluster/id
{"version":"1","id":"7V2aCgVnQhuPdkdryBXt4w"}
[zk: localhost:2181(CONNECTED) 10] ls /kafka/con
config             consumers          controller         controller_epoch
[zk: localhost:2181(CONNECTED) 10] ls /kafka/controller
controller         controller_epoch
[zk: localhost:2181(CONNECTED) 10] ls /kafka/controller
[]
[zk: localhost:2181(CONNECTED) 11] get /kafka/controller
controller         controller_epoch
[zk: localhost:2181(CONNECTED) 11] get /kafka/controller
{"version":1,"brokerid":0,"timestamp":"1608979966643"}

API

本文代码仓库:https://gitee.com/xblzer/kafka-api/tree/master/code/kafka-api

Topic的管理相关和Producer生产消息的API非常简单,这里不做特别说明了,代码中有注释,下面从Consumer相关的API开始展开说明。

Consumer

sub 订阅模式

订阅模式,必须设置消费者组,去掉消费者组

注释掉
// props.put(ConsumerConfig.GROUP_ID_CONFIG, "group01");

执行报错

org.apache.kafka.common.errors.InvalidGroupIdException: To use the group management or offset commit APIs, you must provide a valid group.id in the consumer configuration.

也就是说,订阅模式可以用到消费者组的管理机制,在配置消费者的时候必须提供有效的group.id

Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "node01:9092,node02:9092,node03:9092");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
//消费者组,如果不设置消费者组会报错
props.put(ConsumerConfig.GROUP_ID_CONFIG, "group01");

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
//订阅topic名称是“topic”开头的topic
consumer.subscribe(Pattern.compile("^topic.*"));

现在我们只开启一个Consumer客户端,可以看到该消费者对产生的消息全部消费了:

单消费线程

再使用线程池,构造三个消费者线程,模拟不同的消费者客户端(属于同一消费组)。

也可以在kafka服务器开几个命令终端,命令如下 kafka-console-consumer.sh --bootstrap-server node01:9092,node02:9092,node03:9092 --topic topic01 --group group01 --property print.key=true --property print.value=true --property key.separator=,

这个命令的参数可以用 kafka-console-consumer.sh --help查看。

线程池模拟多个消费者客户端:

/**
 * 多个线程,不同的消费者(属于同一消费组)
 */
@Test
@SneakyThrows
public void testKafkaConsumer() {
    Properties props = new Properties();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "node01:9092,node02:9092,node03:9092");
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    //消费者组,如果不设置消费者组会报错
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "group01");

    //开启三个线程,跑三个consumer客户端,他们属于同一消费组“group01”
    ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
            3,
            16,
            0L,
            TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<>(1024),
            new ThreadFactoryBuilder().setNameFormat("[消费者‐%d]").build(),
            new ThreadPoolExecutor.AbortPolicy()
    );
    for (int i = 0; i < 3; i++) {
        threadPoolExecutor.execute(() -> {
            KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
            //订阅topic名称是“topic”开头的topic
            consumer.subscribe(Pattern.compile("^topic.*"));
            //订阅topic01
//        consumer.subscribe(Arrays.asList("topic01"));

            
            while (true) {
                ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));
                if (!consumerRecords.isEmpty()) {
                    Iterator<ConsumerRecord<String, String>> consumerRecordIterator = consumerRecords.iterator();
                    while (consumerRecordIterator.hasNext()) {
                        ConsumerRecord<String, String> consumerRecord = consumerRecordIterator.next();
                        String key = consumerRecord.key();
                        String value = consumerRecord.value();
                        //消息所在分区
                        int partition = consumerRecord.partition();
                        //消息在所在分区的偏移量
                        long offset = consumerRecord.offset();
                        System.out.println("线程" + Thread.currentThread().getName() + "key:" + key + ",value:" + value + ",partition:" + partition + ",offset:" + offset);
                    }
                }
            }
        });
    }
    threadPoolExecutor.shutdown();
    threadPoolExecutor.awaitTermination(1, TimeUnit.HOURS);
}

运行测试类,

Kafka Consumer协调器分配消费分区

可以看到Consumer协调器(ConsumerCoordinator)分配消费分区情况:

线程名称

消费者

分区

消费者-0

consumer-1

topic01的分区0,topic02的分区0

消费者-1

consumer-2

topic01的分区1,topic02的分区1

消费者-2

consumer-3

topic01的分区2

重新生产消息,查看消息消费情况:

10 条 record 被同一个消费组的三个消费者消费,这个是消费者组的特性之一,组内平分消费分区的消费进行消费,有一个负载均衡的理念在里面。

当消息中不带key(key=null)时,将按照轮询的方式对partition中的消息进行消费:

客户端宕机

再启动一个消费者客户端测试,

重新分配消费分区

重新分配消费分区

控制台有新的日志输出,可以看到ConsumerCoordinator重新分配了消费分区:

线程名称

消费者

分区

消费者-0

consumer-1

topic01的分区1,topic02的分区1

消费者-1

consumer-2

topic01的分区2

消费者-2

consumer-3

没有分配消费分区

新开的线程

新开的消费者

topic01的分区0,topic02的分区0

执行一下生产者,看下消费情况:

消费者消费分配给自己的分区内的消息!

这个时候把新开的那个Consumer断开,模拟消费者宕机,看Kafka的重新分配:

rebalancing:有一个Consumer宕机重新分配

Kafka消费者组内分区消费负载均衡。

消费者 assign 手动指定分区模式

上面演示的是consumer主动订阅,主动订阅的情况下,消费者协调器会协调消费者进行分区消费,有一个负载均衡的理念在里面。

手动指定分区进行消费的话,就会失去组的特性,assign 方法:

//从开始位置消费
List<TopicPartition> partitions = Arrays.asList(new TopicPartition("topic01", 0));
consumer.assign(partitions);
consumer.seekToBeginning(partitions);

Kafka的分区

探究Kafka高性能之道 一文中,我已提到了Kafka是如何决定发送消息到topic的哪个分区的:

kafka架构

Kafka默认的分区策略在DefaultPartitioner中也有定义:


/**
 * The default partitioning strategy:
 * <ul>
 * <li>If a partition is specified in the record, use it
 * <li>If no partition is specified but a key is present choose a partition based on a hash of the key
 * <li>If no partition or key is present choose a partition in a round-robin fashion
 */
public class DefaultPartitioner implements Partitioner {
    //...
    
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        if (keyBytes == null) {
            int nextValue = nextValue(topic);
            List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
            if (availablePartitions.size() > 0) {
                int part = Utils.toPositive(nextValue) % availablePartitions.size();
                return availablePartitions.get(part).partition();
            } else {
                // no partitions are available, give a non-available partition
                return Utils.toPositive(nextValue) % numPartitions;
            }
        } else {
            // hash the keyBytes to choose a partition
            return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
        }
    }
    
    //...
}

这里说明了:

  • 如果发送消息指定了分区,那么消息全发送到指定的分区中
  • 如果消息没有指定分区但是设置了key,那么按照消息的key进行hash然后和分区数进行取模,得到一个值x,Kafka就往分区x中发送消息
  • 如果分区和key都没有指定,则默认采用轮询的方式。

上面已经使用API得到了验证。

一般情况下,这种默认的分区策略就满足生产需求了,但是如果有特殊的业务需求,还可以自定义分区策略,

public void testProducer() {
    Properties props = new Properties();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "node01:9092,node02:9092,node03:9092");
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

    //定义Partitioner
    props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, MyPartitioner.class.getName());
    
    //TODO
}

先看一下,ProducerConfig源码中关于分区配置的说明:

public static final String PARTITIONER_CLASS_CONFIG = "partitioner.class";
private static final String PARTITIONER_CLASS_DOC = "Partitioner class that implements the <code>org.apache.kafka.clients.producer.Partitioner</code> interface.";

自定义的Partitioner必须实现org.apache.kafka.clients.producer.Partitioner接口,这里自定义一个Partitioner,分区策略也按照DefaultPartitioner的策略来,只是其实现略有不同:

public class MyPartitioner implements Partitioner {

    private AtomicInteger counter = new AtomicInteger(0);

    /**
     * 返回分区号
     * @param topic topic
     * @param key key
     * @param keyBytes key的字节数
     * @param value value
     * @param valueBytes value的字节数
     * @param cluster 集群信息
     * @return 分区号
     */
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        //先获取集群的分区数
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();

        if (keyBytes == null) {
            int increment = counter.getAndIncrement();
            //拿这个值模上分区数
            // increment & Integer.MAX_VALUE 保证是个正数
            return (increment & Integer.MAX_VALUE) % numPartitions;
        } else {
            return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
        }
    }

    @Override
    public void close() {
        System.out.println("MyPartitioner#close.");
    }

    @Override
    public void configure(Map<String, ?> map) {
        System.out.println("MyPartitioner#configure.");
    }
}

Producer这里:

//定义Partitioner
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, MyPartitioner.class.getName());

//创建生产者
KafkaProducer<String, String> producer = new KafkaProducer<>(props);

运行生产这实例,这里生产topic02的消息:

可以看到分区策略走的是我们自定义的分区策略,消费者:

前面API创建topic02的时候只设置了两个分区,所以这里是两个分区的轮询。同理可以验证消息带key的分区消费策略。

序列化

前面API演示的时候,生产者和消费者有两个重要的配置,

ProducerConfig

  • KEY_SERIALIZER_CLASS_CONFIG
  • VALUE_SERIALIZER_CLASS_CONFIG

ConsumerConfig

  • KEY_DESERIALIZER_CLASS_CONFIG
  • VALUE_DESERIALIZER_CLASS_CONFIG

这个是生产者生产消息是需要对key和value进行序列化,消费者消费消息需要对其进行反序列化,前面序列化和反序列化类是StringSerializerStringDeserializer,跟一下源码,可以看到他们都实现了规定好的接口(Serializer<String>Deserializer<String>):

StringSerializer

StringDeserializer

生产环境中,我们发送的消息有时是对象,此时我们可以自定义对象序列化类,这样可以完成对象消息的传输,自定义序列化实现SerializerDeserializer接口即可。

这里借助于commons-lang3包下的SerializationUtils来进行序列化和反序列化:

//序列化
@Override
public byte[] serialize(String topic, Object data) {
//        return new byte[0];
    return SerializationUtils.serialize((Serializable) data);
}
//反序列化
@Override
public Object deserialize(String topic, byte[] data) {
    System.out.println("自定义反序列化 topic:" + topic);
    return SerializationUtils.deserialize(data);
}

生产消息,key是String类型,value是Order对象:

props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ObjectSerializer.class.getName());

//创建生产者
KafkaProducer<String, Order> producer = new KafkaProducer<>(props);

消费消息:

props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ObjectDeserializer.class.getName());
props.put(ConsumerConfig.GROUP_ID_CONFIG, "group01");

KafkaConsumer<String, Order> consumer = new KafkaConsumer<>(props);

启动生产者:

自定义的序列化生效,再启动消费者,控制台打印:

成功将Order信息打印出来,自定义反序列化也生效了。

拦截器

发送数据的时候,可以通过拦截器拿到数据的一些消息,然后可以任意摆布这些数据了(对数据做一些装饰),比如发送失败了,我们可以通过拦截器把错误信息拿到进行分析。

只要在ProducerConfig中配置INTERCEPTOR_CLASSES_CONFIG这个配置项就可以设置拦截器了,和前面的PartitionerSerializer同理,看一下这个配置项的源码描述:

/** <code>interceptor.classes</code> */
public static final String INTERCEPTOR_CLASSES_CONFIG = "interceptor.classes";
public static final String INTERCEPTOR_CLASSES_DOC = "A list of classes to use as interceptors. "
                                                    + "Implementing the <code>org.apache.kafka.clients.producer.ProducerInterceptor</code> interface allows you to intercept (and possibly mutate) the records "
                                                    + "received by the producer before they are published to the Kafka cluster. By default, there are no interceptors.";

这里说明了默认是没有拦截器的,自定义拦截器需要实现ProducerInterceptor接口。

public class MyProducerInterceptor implements ProducerInterceptor {
    @Override
    public ProducerRecord onSend(ProducerRecord record) {
        return new ProducerRecord(record.topic(), record.key(), record.value() + " --- 拦截了。");
    }

    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
        System.out.println("metadata:" + metadata + ",exception:" + exception);
    }

    @Override
    public void close() {
        System.out.println("MyProducerInterceptor#close");
    }

    @Override
    public void configure(Map<String, ?> configs) {
        System.out.println("MyProducerInterceptor#configure");
    }
}

运行生产者消费者即可观察到消息成功拦截。

小结

  1. Kafka集群需要ZooKeeper对其Broker进行协调管理,搭建Kafka集群前需要搭建ZK集群,搭建ZK集群需要注意配置每台节点的myid
  2. Kafka集群的每个节点的配置文件中,需要注意的配置项(KAFKA_HOME/config/server.properties文件broker.idlistenerslog.dirszookeeper.connect
  3. Kafka基础API对topic进行管理,实现Producer生产消息,Consumer消费消息,并通过运行情况理解topic的分区,以及消费者组内消费消息的负载均衡。
  4. 利用Kafka相关API实现自定义的分区策略、自定义序列化、以及自定义Producer拦截器。
文章分享自微信公众号:
行百里er

本文参与 腾讯云自媒体分享计划 ,欢迎热爱写作的你一起参与!

作者:行百里er
原始发表时间:2021-05-11
如有侵权,请联系 cloudcommunity@tencent.com 删除。
登录 后参与评论
0 条评论

相关文章

  • 案例分享 | Yelp 如何在 Kubernetes 上运行 Kafka(第 1 部分 - 架构)

    在 Yelp,Kafka 每天接收数百亿条消息来推进数据驱动并为关键业务管道和服务提供支持。我们最近通过在 PaaSTA (Yelp 自己的平台即服务)上运行集...

    灵雀云
  • 【大家的项目】Rust Web开发框架 Poem 0.3发布!

    在和社区的朋友聊axum的过程中,发现大家都不太玩得明白,我突然就想做一个用起来简单点的。

    MikeLoveRust
  • Kafka及周边深度了解

    文章有点长,但是写的都挺直白的,慢慢看下来还是比较容易看懂,从Kafka的大体简介到Kafka的周边产品比较,再到Kafka与Zookeeper的关系,进一步理...

    别打名名
  • Flink 1.10 升级 Flink 1.12 预期收益评估

    Flink 1.12 版本在 20 年 12 月已经正式 Release,目前我们的 Flink SQL 作业的 Flink 引擎版本还是 1.10,本文主要用...

    LakeShen
  • 【深度知识】Kafka原理入门和详解

    Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网...

    辉哥
  • 大数据技术学习路线

    加米谷大数据
  • kafka性能技术分析

    1.通过磁盘顺序读写,效率高,appendLog,对比raid-5 7200rpm的磁盘

    挖掘大数据
  • 如何成为大数据Spark高手

    Spark是发源于美国加州大学伯克利分校AMPLab的集群计算平台,它立足于内存计算,性能超过Hadoop百倍,从多迭代批量处理出发,兼收并蓄数据仓库、流处理和...

    企鹅号小编
  • Kafka 2.8.0 正式发布,与ZooKeeper正式分手!

    导读:目前 Kafka 已经定位为一个分布式流式处理平台,它以高吞吐、可持久化、可水平扩展、支持流数据处理等多种特性而被广泛使用。目前越来越多的开源分布式处理系...

    码农架构
  • 大数据经典学习路线(及供参考)不容错过

    熟练使用Linux,熟练安装Linux上的软件,了解熟悉负载均衡、高可靠等集群相关概念,搭建互联网高并发、高可靠的服务架构;

    用户2292346
  • 2019精炼的大数据技术学习路线

    近年来大数据BigData、人工智能AI、物联网Iot等行业发展迅猛,很多人都想要从事大数据技术开发工作,但是,请问要怎么做,路线是什么?从哪里开始学?学哪些?...

    用户2292346
  • 干货 | 如何成为大数据Spark高手

    Spark是发源于美国加州大学伯克利分校AMPLab的集群计算平台,它立足于内存计算,性能超过Hadoop百倍,从多迭代批量处理出发,兼收并蓄数据仓库、流处理和...

    Spark学习技巧
  • 作为一个Java架构师程序员 你应该会什么

    一,JAVA架构师 1、语法:Java 程序员必须比较熟悉语法,在写代码的时候IDE 的编辑器对 某一行报错应该能够根据报错信息 知道是什么样的语法错误并且知道...

    Java高级架构
  • Kafka异地双活深度讲解 - Mirrormaker V2

    总结:Apache Kafka Mirrormaker V1的解决方案在提供企业管理的灾难恢复方面存在局限性。MM V2(KIP-382)针对MM V1 进行了...

    Fayson
  • Kafka原理篇:图解kakfa架构原理

    这是[码哥]Kafka 系列文章的第二篇,码哥将从原理、实践和源码角度为大家深入剖析并实践 Kafka。此系列包括[原理篇]、[实践篇]和[源码篇]。这篇是[原...

    码哥字节
  • 数据订阅案例

    我们会通过模拟从库向主库获取对应 binlog 内容进行分析,大概架构图如下,我们会通过解析 binlog ,按照订阅通道配置的库表进行分析,所以几乎对主库没有...

    宗文
  • JAVA程序员如何提升自己?

    加米谷大数据

扫码关注腾讯云开发者

领取腾讯云代金券