前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >【Kafka】核心API

【Kafka】核心API

作者头像
瑞新
发布2020-12-07 10:35:51
1.2K0
发布2020-12-07 10:35:51
举报
文章被收录于专栏:用户3288143的专栏

资料

官方文档kafka.apache.org/

导学

实战:微信小程序问卷,微服务整合

思维导图

问卷

前置基础

java基础+boot

适合人群

开发环境准备

主机

连接工具

域名

虚拟化软件推荐

VM

https://www.cnblogs.com/PrayzzZ/p/11330937.html

VirtualBOX

Linux-CentOS

http://mirror.bit.edu.cn/centos/7.8.2003/isos/x86_64/CentOS-7-x86_64-Minimal-2003.iso

虚拟机网络

参考

https://blog.csdn.net/wujiele/article/details/92803655

先v8改ip4信息,然后再lan共享v8,vm编辑选择nat模式进行修改网关及网段区间。设置CentOS对应网段

互相ping验证

勾选

Kafka配置

分布式流平台

kafka由scala写出来的因此先配置jdk环境

zookeeper也是java开发的也需要jdk

上传文件并解压

(新建两个文件夹software、install)

解压到指定文件夹

代码语言:javascript
复制
 tar -zxf ./software/apache-zookeeper-3.5.8-bin.tar.gz -C ./install/

配置环境变量java

代码语言:javascript
复制
[root@hadoop01 jdk1.8.0_261]# vi /etc/profile
[root@hadoop01 jdk1.8.0_261]# source /etc/profile
[root@hadoop01 jdk1.8.0_261]# java -version
java version "1.8.0_261"


# PATH
export JAVA_HOME=/opt/install/jdk1.8.0_261
export PATH=$PATH:$JAVA_HOME

zookeeper配置

1.修改配置文件

conf目录修改默认样例文件。cp复制,修改zookeeper的默认生成位置(vm未分存储磁盘不用划分,和端口号)

默认就行不用改,但是参数注意

代码语言:javascript
复制
/opt/install/apache-zookeeper-3.5.8-bin/conf

[root@hadoop01 conf]# cp zoo_sample.cfg zoo.cfg 

dataDir=/tmp/zookeeper
# the port at which the clients will connect
clientPort=2181

2

启动

代码语言:javascript
复制
[root@hadoop01 bin]# ./zkServer.sh start
// 法1:检验链接成功
[root@hadoop01 bin]# ./zkCli.sh
Connecting to localhost:2181
[zk: localhost:2181(CONNECTED) 0] 
// 法2:监控关键字
[root@hadoop01 bin]# ps -ef | grep zookeeper
// 法3:自动生成log日志在zookeeper下

kafka配置

1修改conf目录下配置

server里修改listener

advertised.listener

kafka的log(先默认,现在没有划分磁盘)

zookeeper位置(暂时默认在一台机器不用改)

代码语言:javascript
复制
[root@hadoop01 config]# pwd
/opt/install/kafka_2.13-2.6.0/config
[root@hadoop01 config]# vi server.properties 


#     listeners = PLAINTEXT://your.host.name:9092
listeners=PLAINTEXT://192.168.137.121:9092

advertised.listeners=PLAINTEXT://192.168.137.121:9092
// 日志位置,暂时默认
log.dirs=/tmp/kafka-logs

//链接zookeeper相关,可能为集群
# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=localhost:2181

# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=18000

基本启停命令

启动:kafka文件夹下 后台启动 生成日志

代码语言:javascript
复制
[root@hadoop01 kafka_2.13-2.6.0]# bin/kafka-server-start.sh config/server.properties &
[1] 9134
//检验1 日志输出
Connecting to zookeeper on localhost:2181 
// 检验2 捕获进程日志
[root@hadoop01 kafka_2.13-2.6.0]# ps -ef|grep kafka
// 3 直接看日志
[root@hadoop01 kafka_2.13-2.6.0]# cd logs/
[root@hadoop01 logs]# ll

停止

代码语言:javascript
复制
[root@hadoop01 kafka_2.13-2.6.0]# bin/kafka-server-stop.sh
// 进程日志就没了
[root@hadoop01 kafka_2.13-2.6.0]# ps -ef|grep kafka
root       9558   1287  0 17:06 pts/0    00:00:00 grep --color=auto kafka

创建topic

代码语言:javascript
复制
[root@hadoop01 kafka_2.13-2.6.0]# bin/kafka-topics.sh --create --zookeeper 192.168.137.121:2181 --replication-factor 1 --partitions 1 --topic rxguo-topic
Created topic rxguo-topic.

查看已创建的topic信息

代码语言:javascript
复制
[root@hadoop01 kafka_2.13-2.6.0]# bin/kafka-topics.sh --list --zookeeper 192.168.137.121:2181
rxguo-topic

topic为自定义的名称

生产消息到topic

代码语言:javascript
复制
[root@hadoop01 kafka_2.13-2.6.0]# bin/kafka-console-producer.sh --broker-list 192.168.137.121:9092 --topic rxguo-topic
>hello world
>rxguo

消费消息从topic

代码语言:javascript
复制
[root@hadoop01 kafka_2.13-2.6.0]# bin/kafka-console-consumer.sh --bootstrap-server 192.168.137.121:9092 --topic rxguo-topic --from-beginning
hello world
rxguo
welcome on

kafka基础概念

Kafka客户端操作

admin管理级别的操作,管理类工作,单独作为一个API

区别

Java客户端

1 创建Java的SpringBoot文件

删除test测试的包和mvn的依赖,以及碍眼的自动生成文件夹

准备mvn的kafka依赖

代码语言:javascript
复制
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.4.0</version>
        </dependency>

或者kafka官网查看依赖包

2 收集整理开发文档

先用kafka-client2.4,官网API文档

javadoc

通过kafka官网找到adminAPI->JavaDoc

http://kafka.apache.org/26/documentation.html#api

通过JavaDoc可以找到API的kafka

Admin-Client-API

自己可以整理一下配置文档

3 写java

代码语言:javascript
复制
package com.bennyrhys.kafka_study1.admin;

import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.config.ConfigResource;

import java.util.*;

public class AdminSample {

    private final static String TOPIC_NAME = "rxguo-topic";

    private final static AdminClient adminClient = AdminSample.adminClient();

    public static void main(String[] args)throws Exception {
//      创建连接
        System.out.println("adminClient:==================================" + adminClient);
//        创建Topic实例
//        createTopic();
//        删除Topic实例
//        delTopic();
//        获取Topic列表
//        topicList();
//        描述Topic
//        desccribeTopic();
//        修改Config 旧版Api->新版API(对单体支持不太友好,集群可以)
//        alterConfig();
//        alterConfig2();
//        查询Config
//        describeConfig();
//        增加partition数量
        incrPartitions(2);

        while(true) {

        }
    }

    /**
     * 增加partition数量
     * 之所以是incr,是因为在kafka中partitions默认只能增加,不能减少
     *
     * 检验:
     * 法1. 调用函数 描述Topic
     * name:rxguo-topic, desc:(name=rxguo-topic, internal=false, partitions=(
     * partition=0, leader=192.168.137.121:9092 (id: 0 rack: null), replicas=192.168.137.121:9092 (id: 0 rack: null), isr=192.168.137.121:9092 (id: 0 rack: null)),(
     * partition=1, leader=192.168.137.121:9092 (id: 0 rack: null), replicas=192.168.137.121:9092 (id: 0 rack: null), isr=192.168.137.121:9092 (id: 0 rack: null)), authorizedOperations=null)
     * 法2. 命令行查看日志
     * drwxr-xr-x. 2 root root  178 10月 20 09:19 rxguo-topic-0
     * drwxr-xr-x. 2 root root  141 10月 20 10:23 rxguo-topic-1
     * [root@hadoop01 kafka-logs]# pwd
     * /tmp/kafka-logs
     */
    public static void incrPartitions(int partition_num) throws Exception{
        Map<String, NewPartitions> partitionsMap = new HashMap<>();
        NewPartitions newPartitions = NewPartitions.increaseTo(partition_num);
        partitionsMap.put(TOPIC_NAME, newPartitions);

        CreatePartitionsResult createPartitionsResult = adminClient.createPartitions(partitionsMap);
        createPartitionsResult.all().get();
    }

    /**
     * 修改Topic (版2 新版API incrementalAlterConfigs())
     * ConfigEntry(name=preallocate, value=false
     * @throws Exception
     */
    public static void alterConfig2() throws Exception {
//        组织两个参数
//        alterConfigs过期了,但是暂时比incrementalAlterConfigs()好用。用法一样
//        adminClient.incrementalAlterConfigs()
        HashMap<ConfigResource, Collection<AlterConfigOp>> configMaps = new HashMap<>();
        ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, TOPIC_NAME);
//      运算费文件配置 preallocate 默认是false,此处改为true做检验修改
        AlterConfigOp alterConfigOp = new AlterConfigOp(new ConfigEntry("preallocate", "false"), AlterConfigOp.OpType.SET);
        configMaps.put(configResource, Arrays.asList(alterConfigOp));

        AlterConfigsResult alterConfigsResult = adminClient.incrementalAlterConfigs(configMaps);
        alterConfigsResult.all().get();
    }
    /**
     * 修改 配置信息(版1 alterConfigs过期了)
     * ConfigEntry(name=preallocate, value=true, source=DYNAMIC_TOPIC_CONFIG
     * @throws Exception
     */
    public static void alterConfig() throws Exception {
//        组织两个参数
//        alterConfigs过期了,但是暂时比incrementalAlterConfigs()好用。用法一样
//        adminClient.incrementalAlterConfigs()
        HashMap<ConfigResource, Config> configMaps = new HashMap<>();
        ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, TOPIC_NAME);
//      运算费 preallocate 默认是false,此处改为true做检验修改
        Config config = new Config(Arrays.asList(new ConfigEntry("preallocate", "true")));
        configMaps.put(configResource, config);

        AlterConfigsResult alterConfigsResult = adminClient.alterConfigs(configMaps);
        alterConfigsResult.all().get();
    }


    /**
     * 查询Config
     * configResource:ConfigResource(type=TOPIC, name='rxguo-topic'),
     * Config:Config(entries=[ConfigEntry(name=compression.type,
     *                                    value=producer,
     *                                    source=DEFAULT_CONFIG,
     *                                    isSensitive=false,
     *                                    isReadOnly=false, synonyms=[]),
*                             ConfigEntry(name=leader.replication.throttled.replicas,
     *                                    value=, source=DEFAULT_CONFIG,
     *                                    isSensitive=false, isReadOnly=false, synonyms=[]),
 *                            ConfigEntry(name=message.downconversion.enable, value=true, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]),
 *                            ConfigEntry(name=min.insync.replicas, value=1, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]),
 *                            ConfigEntry(name=segment.jitter.ms, value=0, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]),
 *                            ConfigEntry(name=cleanup.policy, value=delete, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]),
 *                            ConfigEntry(name=flush.ms, value=9223372036854775807, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=follower.replication.throttled.replicas, value=, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=segment.bytes, value=1073741824, source=STATIC_BROKER_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=retention.ms, value=604800000, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=flush.messages, value=9223372036854775807, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=message.format.version, value=2.6-IV0, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=max.compaction.lag.ms, value=9223372036854775807, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=file.delete.delay.ms, value=60000, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=max.message.bytes, value=1048588, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=min.compaction.lag.ms, value=0, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=message.timestamp.type, value=CreateTime, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]),
 *                            ConfigEntry(name=preallocate, value=false, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=min.cleanable.dirty.ratio, value=0.5, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=index.interval.bytes, value=4096, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=unclean.leader.election.enable, value=false, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=retention.bytes, value=-1, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=delete.retention.ms, value=86400000, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=segment.ms, value=604800000, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=message.timestamp.difference.max.ms, value=9223372036854775807, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=segment.index.bytes, value=10485760, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[])])
     */
    public static void describeConfig() throws Exception {
        //        TODO 这里做预留,集群会处理
        ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, TOPIC_NAME);
        DescribeConfigsResult describeConfigsResult = adminClient.describeConfigs(Arrays.asList(configResource));
        Map<ConfigResource, Config> configResourceConfigMap = describeConfigsResult.all().get();
        configResourceConfigMap.entrySet().stream().forEach((entry)->{
            System.out.println("configResource:" + entry.getKey() + ",Config:" + entry.getValue());
        });
    }


    /**
     * 描述topic
     * name:rxguo-topic,
     * desc:(name=rxguo-topic,
     *      internal=false,
     *      partitions=(partition=0,
     *                  leader=192.168.137.121:9092 (id: 0 rack: null),
     *                  replicas=192.168.137.121:9092 (id: 0 rack: null),
     *                  isr=192.168.137.121:9092 (id: 0 rack: null)),
*           authorizedOperations=null)
     */
    public static void desccribeTopic() throws Exception {
        DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(Arrays.asList(TOPIC_NAME));
        Map<String, TopicDescription> stringTopicDescriptionMap = describeTopicsResult.all().get();
        Set<Map.Entry<String, TopicDescription>> entries = stringTopicDescriptionMap.entrySet();
        entries.stream().forEach((entry)->{
            System.out.println("name:" + entry.getKey() + ", desc:" + entry.getValue());
        });
    }

    /**
     * 删除topic
     */
    public static void delTopic() throws Exception {
        DeleteTopicsResult deleteTopicsResult = adminClient().deleteTopics(Arrays.asList(TOPIC_NAME));
        deleteTopicsResult.all().get();
    }

    /**
     * 获取topic列表
     * @throws Exception
     */
    public static void topicList() throws Exception {
        // 开启 可打印内部使用,__consumer_offsets(查看是否选择internal)
        ListTopicsOptions options = new ListTopicsOptions();
        options.listInternal(true);

        ListTopicsResult listTopicsResult = adminClient.listTopics(options);
//        ListTopicsResult listTopicsResult = adminClient.listTopics();
        Set<String> names = listTopicsResult.names().get();
        Collection<TopicListing> topicListings = listTopicsResult.listings().get();
        Map<String, TopicListing> stringTopicListingMap = listTopicsResult.namesToListings().get();

        // 打印names
        names.stream().forEach(System.out::println);
        // 打印topicListings
        topicListings.stream().forEach((listing)->{
            System.out.println(listing);
        });

    }

    /**
     * 创建Topic实例
     */
    public static void createTopic() {
        // 副本因子,暂时测试的单节点
        Short rs = 1;
        NewTopic newTopic = new NewTopic(TOPIC_NAME, 1, rs);
        CreateTopicsResult topics = adminClient.createTopics(Arrays.asList(newTopic));
        System.out.println("==CreateTopicsResult==:" + topics);
    }

    /**
     * 设置AdminClient
     *注意AdminClient为kafka包下的
     */
    public static AdminClient adminClient() {
        Properties properties = new Properties();
        properties.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.137.121:9092");

        return AdminClient.create(properties);
    }
}
代码语言:javascript
复制
输出
- [AdminClient clientId=adminclient-4219] Kafka admin client initialized

解决:【kafka】Java连接出现Connection refused: no further information的解决方法

kafka的server配置文件需要具体指定机器ip,不能localhost

1去conf修改

localhost 192.168.137.121

localhost变成具体ip,zookeeper.connect=192.168.137.121:2181

2、重启kafka服务

注意,使用kafka自带的生产者和消费者客户端测试时,不能使用localhost,须明确指定本机域名

Kafka核心API之Producer

connect连接配置

负载均衡,同步异步

反推,如何达到高吞吐量。

API

发送模式

同步(理论上kafka没有同步发送这种说法)

异步

异步回调

注意:offset唯一是针对topic的 不是针对partition的

代码语言:javascript
复制
package com.bennyrhys.kafka_study1.producer;

import org.apache.kafka.clients.producer.*;

import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

public class ProducerSample {

    private final static String TOPIC_NAME = "rxguo-topic";

    public static void main(String[] args) throws ExecutionException, InterruptedException {
//        producer异步发送
//        producerSend();
//        producer异步阻塞发送(同步发送)
//        producerSyncSend();
//        producer异步发送带回调函数
        producerSendWithCallBack();
//        Producer异步发送带回调函数和负载均衡Partition
//        producerSendWithCallBackAndPartition();
    }

    /**
     * Producer异步发送
     *
     * 验证:命令行打印输出
     * key-0values-0
     */
    public static void producerSend() {
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.137.121:9092");
        properties.put(ProducerConfig.ACKS_CONFIG, "all");
        properties.put(ProducerConfig.RETRIES_CONFIG, "0"); // 重试,流程图说了
        properties.put(ProducerConfig.BATCH_SIZE_CONFIG, "16384");// 空间计量,批量写入
        properties.put(ProducerConfig.LINGER_MS_CONFIG, "1"); // 1s为记时,批量写入
        properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, "33554432");// 缓存最大

        // 序列化k,v(因为要放到通道里传输)扩展类,kafka->org包->comm->serialization
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");

        // Producer的主对象
        Producer<String, String> producer = new KafkaProducer<>(properties);

        // 消息对象
        for (int i = 0; i < 10; i++) {
            ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, "key-"+ i + "values-" + i);
            producer.send(record);
        }

        // 所以的通道打开需要关闭
        producer.close();
    }

    /**
     * Producer异步阻塞发送(同步发送)
     * 验证:命令行打印输出
     * partition:0, offset:27
     * partition:1, offset:15
     * partition 0, 1个数基本一致,offset不太一致
     */
    public static void producerSyncSend() throws ExecutionException, InterruptedException {
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.137.121:9092");
        properties.put(ProducerConfig.ACKS_CONFIG, "all");
        properties.put(ProducerConfig.RETRIES_CONFIG, "0");
        properties.put(ProducerConfig.BATCH_SIZE_CONFIG, "16384");
        properties.put(ProducerConfig.LINGER_MS_CONFIG, "1");
        properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, "33554432");

        // 序列化k,v(因为要放到通道里传输)扩展类,kafka->org包->comm->serialization
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");

        // Producer的主对象
        Producer<String, String> producer = new KafkaProducer<>(properties);

        // 消息对象
        for (int i = 0; i < 10; i++) {
            ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, "key-"+ i + "values-" + i);
            Future<RecordMetadata> send = producer.send(record);
            // future是发送出去就不管了,但是每次get()就相当于阻塞在这里
            RecordMetadata recordMetadata = send.get();
            System.out.println("partition:" + recordMetadata.partition() + ", offset:" + recordMetadata.offset());
        }

        // 所以的通道打开需要关闭
        producer.close();
    }

    /**
     * Producer异步发送带回调函数
     * send方法,添加参数CallBack
     *
     */
    public static void producerSendWithCallBack() {
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.137.121:9092");
        properties.put(ProducerConfig.ACKS_CONFIG, "all");
        properties.put(ProducerConfig.RETRIES_CONFIG, "0");
        properties.put(ProducerConfig.BATCH_SIZE_CONFIG, "16384");
        properties.put(ProducerConfig.LINGER_MS_CONFIG, "1");
        properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, "33554432");

        // 序列化k,v(因为要放到通道里传输)扩展类,kafka->org包->comm->serialization
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");

        // Producer的主对象
        Producer<String, String> producer = new KafkaProducer<>(properties);

        // 消息对象
        for (int i = 0; i < 10; i++) {
            ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, "key-"+ i , "values-" + i);
            producer.send(record, new Callback() {
                @Override
                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                    System.out.println("partition:" + recordMetadata.partition() + ", offset:" + recordMetadata.offset());
                }
            });
        }

        // 所以的通道打开需要关闭
        producer.close();
    }

    /**
     * Producer异步发送带回调函数和负载均衡Partition
     * 添加定制化Partition的类,注意类中的截取要和字符格式一致
     * partition:0, offset:52
     * partition:0, offset:53
     * partition:0, offset:54
     * partition:0, offset:55
     * partition:1, offset:60
     * partition:1, offset:61
     * partition:1, offset:62
     * partition:1, offset:63
     * partition:1, offset:64
     * partition:1, offset:65
     */
    public static void producerSendWithCallBackAndPartition() {
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.137.121:9092");
        properties.put(ProducerConfig.ACKS_CONFIG, "all");
        properties.put(ProducerConfig.RETRIES_CONFIG, "0");
        properties.put(ProducerConfig.BATCH_SIZE_CONFIG, "16384");
        properties.put(ProducerConfig.LINGER_MS_CONFIG, "1");
        properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, "33554432");

        // 序列化k,v(因为要放到通道里传输)扩展类,kafka->org包->comm->serialization
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        // 制定partition的类
        properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "com.bennyrhys.kafka_study1.producer.SamplePartition");

        // Producer的主对象
        Producer<String, String> producer = new KafkaProducer<>(properties);

        // 消息对象
        for (int i = 0; i < 10; i++) {
            ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, "key-"+ i ,"values-" + i);
            producer.send(record, new Callback() {
                @Override
                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                    System.out.println("partition:" + recordMetadata.partition() + ", offset:" + recordMetadata.offset());
                }
            });
        }

        // 所以的通道打开需要关闭
        producer.close();
    }
}

源码分析流程

TODO

生产者理解

  • 直接发送 不论单节点,多借点,先发送到集群中的leader节点
  • 负载均衡 加载负载均衡器,计算分区(默认类似随机分配)
  • 异步发送 fature返回值,通过批量发送,减少io次数

能干预的地方,分区器

Producer自定义Partition负载均衡

ProducerSample.java

代码语言:javascript
复制
       /**
     * Producer异步发送带回调函数和负载均衡Partition
     * 添加定制化Partition的类,注意类中的截取要和字符格式一致
     *
     * 问题:负载均衡,自定义的partition接受不到key的数据
     */
    public static void producerSendWithCallBackAndPartition() {
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.137.121:9092");
        properties.put(ProducerConfig.ACKS_CONFIG, "all");
        properties.put(ProducerConfig.RETRIES_CONFIG, "0");
        properties.put(ProducerConfig.BATCH_SIZE_CONFIG, "16384");
        properties.put(ProducerConfig.LINGER_MS_CONFIG, "1");
        properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, "33554432");

        // 序列化k,v(因为要放到通道里传输)扩展类,kafka->org包->comm->serialization
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        // 制定partition的类
        properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "com.bennyrhys.kafka_study1.producer.SamplePartition");

        // Producer的主对象
        Producer<String, String> producer = new KafkaProducer<>(properties);

        // 消息对象
        for (int i = 0; i < 10; i++) {
            ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, "key-"+ i ,"values-" + i);
            producer.send(record, new Callback() {
                @Override
                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                    System.out.println("partition:" + recordMetadata.partition() + ", offset:" + recordMetadata.offset());
                }
            });
        }

        // 所以的通道打开需要关闭
        producer.close();
    }

SamplePartition.java

代码语言:javascript
复制
package com.bennyrhys.kafka_study1.producer;

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;

import java.util.Map;

/**
 * 自定义Producer的Partition
 */
public class SamplePartition implements Partitioner {
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        /**
         * key-1
         * key-2
         */
        System.out.println(key);
        System.out.println((String) key);

        String keyStr = key + "";
        String keyInt = keyStr.substring(4);
        System.out.println("keyStr:" + keyStr + "keyInt:" + keyInt);

        int i = Integer.parseInt(keyInt);
        return i%2;
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> map) {

    }
}

消息传递保障

producer和consumer共同实现。

主要是靠producer

transId标识生产者,消费者拿到id进行去重。

all级别是正好一次

最多一次是速度最快的,因为发出去了就不校验了。

【实战】Kafka Producer实战——调查问卷

https://cloud.tencent.com/developer/article/1757007

Kafka核心API之Consumer

官方javadoc之consumer

http://kafka.apache.org/26/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html

helloworld(傻瓜式 不推荐)

自动提交offset,经典配置直接拿来用

代码语言:javascript
复制
package com.bennyrhys.kafka_study1.consumer;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import javax.swing.*;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

public class ConsumerSample {

    private final static String TOPIC_NAME = "rxguo-topic";

    public static void main(String[] args) {
        helloworld();
    }

    private static void helloworld() {
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", "192.168.137.121:9092");
        props.setProperty("group.id", "test");
        props.setProperty("enable.auto.commit", "true");
        props.setProperty("auto.commit.interval.ms", "1000");
        props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

        // 消费订阅哪一个topic或者哪几个topic
        consumer.subscribe(Arrays.asList(TOPIC_NAME));
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(10000));
            for (ConsumerRecord<String, String> record : records)
                System.out.printf("partition = %d, offset = %d, key = %s, value = %s%n", record.partition(), record.offset(), record.key(), record.value());
        }
    }
}

存在问题

offset为偏移量,记录kafka上消费的问题,上次中断的偏移量位置,继续处理

kafka的consumer自动定时批量提交,假如情况特殊(数据没消费完,会造成数据丢失)

变自动为手动提交

手动提交

代码语言:javascript
复制
 /**
     * 手动提交
     */
    private static void commitedOffset() {
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", "192.168.137.121:9092");
        props.setProperty("group.id", "test");
        props.setProperty("enable.auto.commit", "false");
        props.setProperty("auto.commit.interval.ms", "1000");
        props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

        // 消费订阅哪一个topic或者哪几个topic
        consumer.subscribe(Arrays.asList(TOPIC_NAME));
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(10000));
            for (ConsumerRecord<String, String> record : records) {
                // 想把数据保存到数据库,成功就成功,不成功则在当前时间戳回滚
                // TODO record 2 db
                System.out.printf("partition = %d, offset = %d, key = %s, value = %s%n", record.partition(), record.offset(), record.key(), record.value());
                // 如果失败,则回滚,不要提交offset
            }
            // 手动提交
            consumer.commitAsync();
        }
    }

理解Consumer Group手动控制一到多个分区

多个独立的consumer组成的一组group

consumer->partition,可以一对一,可以一对多

一对一

一对多

一个partition不能被多个consumer消费

注意

单Partition提交Offset

代码语言:javascript
复制
    /**
     * 手动提交(并且手动控制partition)
     * 场景:适用于多线程时,一个topic下的partition,失败的partition单独处理,而不需要所有的重新来
     * 缺点:全部拉取下来,只是节省之后处理逻辑。应该尝试单个partition的拉取
     */
    private static void commitedOffsetWithPartition() {
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", "192.168.137.121:9092");
        props.setProperty("group.id", "test");
        props.setProperty("enable.auto.commit", "false");
        props.setProperty("auto.commit.interval.ms", "1000");
        props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

        // 消费订阅哪一个topic或者哪几个topic
        consumer.subscribe(Arrays.asList(TOPIC_NAME));
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(10000));
            for (TopicPartition partition : records.partitions()) {
                List<ConsumerRecord<String, String>> pRecord = records.records(partition);
                for (ConsumerRecord<String, String> record : pRecord) {
                    System.out.printf("partition = %d, offset = %d, key = %s, value = %s%n", record.partition(), record.offset(), record.key(), record.value());
                }
                long lastOffset = pRecord.get(pRecord.size() - 1).offset();
                // 单个partition中的offset,并且提交
                Map<TopicPartition, OffsetAndMetadata> offset = new HashMap<>();
                // offset的记录的取出起始点,必须+1
                offset.put(partition, new OffsetAndMetadata(lastOffset + 1));
                // 提交offset
                consumer.commitSync(offset);
            }
        }
    }

手动提交offset(订阅topic下的某个或某些partition分区)

创建多个concumer

// commitedOffsetWithPartition2();

代码语言:javascript
复制
    /**
     * 手动提交offset(并且手动控制partition)
     *
     */
    private static void commitedOffsetWithPartition2() {
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", "192.168.137.121:9092");
        props.setProperty("group.id", "test");
        props.setProperty("enable.auto.commit", "false");
        props.setProperty("auto.commit.interval.ms", "1000");
        props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

        // rxguo-topic 0 1 有两个partition
        TopicPartition p0 = new TopicPartition(TOPIC_NAME, 0);
        TopicPartition p1 = new TopicPartition(TOPIC_NAME, 1);


        // 消费订阅哪一个topic或者哪几个topic
//        consumer.subscribe(Arrays.asList(TOPIC_NAME));

        // 消费订阅某个topic的某个分区
        consumer.assign(Arrays.asList(p0));

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(10000));
            for (TopicPartition partition : records.partitions()) {
                List<ConsumerRecord<String, String>> pRecord = records.records(partition);
                for (ConsumerRecord<String, String> record : pRecord) {
                    System.out.printf("partition = %d, offset = %d, key = %s, value = %s%n", record.partition(), record.offset(), record.key(), record.value());
                }
                long lastOffset = pRecord.get(pRecord.size() - 1).offset();
                // 单个partition中的offset,并且提交
                Map<TopicPartition, OffsetAndMetadata> offset = new HashMap<>();
                // offset的记录的取出起始点,必须+1
                offset.put(partition, new OffsetAndMetadata(lastOffset + 1));
                // 提交offset
                consumer.commitSync(offset);
            }
        }
    }

多线程创建一个consumer

场景:非业务、流式系统。推过来成功就成功,减少consumer的资源消耗

GPS数据,业务峰值数据监控

kafka的consumer和producer不同,consumer是线程不安全的

官方建议,自己用需要自己解决线程安全的问题

两种方式

  • partition和分区对应
  • 使用一个consumer

经典线程

代码语言:javascript
复制
package com.bennyrhys.kafka_study1.consumer;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;

import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;


public class ConsumerThreadSample {
    private final static String TOPIC_NAME="rxguo-topic";

    /**
     *  这种类型是经典模式、每次创建一个consumer。
     *  每个consumer消费一个partition的分区
     */
    public static void main(String[] args) throws InterruptedException {
        KafkaConsumerRunner r1 = new KafkaConsumerRunner();
        Thread t1 = new Thread(r1);

        t1.start();

        Thread.sleep(15000);

        r1.shutdown();
    }

    public static class KafkaConsumerRunner implements Runnable{
        private final AtomicBoolean closed = new AtomicBoolean(false);
        private final KafkaConsumer consumer;

        public KafkaConsumerRunner() {
            Properties props = new Properties();
            props.put("bootstrap.servers", "192.168.137.121:9092");
            props.put("group.id", "test");
            props.put("enable.auto.commit", "false");
            props.put("auto.commit.interval.ms", "1000");
            props.put("session.timeout.ms", "30000");
            props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

            consumer = new KafkaConsumer<>(props);

            TopicPartition p0 = new TopicPartition(TOPIC_NAME, 0);
            TopicPartition p1 = new TopicPartition(TOPIC_NAME, 1);

            consumer.assign(Arrays.asList(p0,p1));
        }


        public void run() {
            try {
                while(!closed.get()) {
                    //处理消息
                    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(10000));

                    for (TopicPartition partition : records.partitions()) {
                        List<ConsumerRecord<String, String>> pRecord = records.records(partition);
                        // 处理每个分区的消息
                        for (ConsumerRecord<String, String> record : pRecord) {
                            System.out.printf("patition = %d , offset = %d, key = %s, value = %s%n",
                                    record.partition(),record.offset(), record.key(), record.value());
                        }

                        // 返回去告诉kafka新的offset
                        long lastOffset = pRecord.get(pRecord.size() - 1).offset();
                        // 注意加1
                        consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
                    }

                }
            }catch(WakeupException e) {
                if(!closed.get()) {
                    throw e;
                }
            }finally {
                consumer.close();
            }
        }

        public void shutdown() {
            closed.set(true);
            consumer.wakeup();
        }
    }

}

只创建一个consumer

代码语言:javascript
复制
package com.bennyrhys.kafka_study1.consumer;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * 类似生产环境(只创建一个consumer)
 * 创建一个线程池,只有一个kafka的consumer
 * 每个consumer一个单独的线程来处理
 */
public class ConsumerRecordThreadSample {
    private final static String TOPIC_NAME = "rxguo-topic";

    public static void main(String[] args) throws InterruptedException {
        String brokerList = "192.168.137.121:9092";
        String groupId = "test";
        int workerNum = 5;

        CunsumerExecutor consumers = new CunsumerExecutor(brokerList, groupId, TOPIC_NAME);
        consumers.execute(workerNum);

        Thread.sleep(1000000);

        consumers.shutdown();

    }

    // Consumer处理
    public static class CunsumerExecutor{
        private final KafkaConsumer<String, String> consumer;
        private ExecutorService executors;

        public CunsumerExecutor(String brokerList, String groupId, String topic) {
            Properties props = new Properties();
            props.put("bootstrap.servers", brokerList);
            props.put("group.id", groupId);
            props.put("enable.auto.commit", "true");
            props.put("auto.commit.interval.ms", "1000");
            props.put("session.timeout.ms", "30000");
            props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            consumer = new KafkaConsumer<>(props);
            consumer.subscribe(Arrays.asList(topic));
        }

        public void execute(int workerNum) {
            executors = new ThreadPoolExecutor(workerNum, workerNum, 0L, TimeUnit.MILLISECONDS,
                    new ArrayBlockingQueue<>(1000), new ThreadPoolExecutor.CallerRunsPolicy());

            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(200);
                for (final ConsumerRecord record : records) {
                    executors.submit(new ConsumerRecordWorker(record));
                }
            }
        }

        public void shutdown() {
            if (consumer != null) {
                consumer.close();
            }
            if (executors != null) {
                executors.shutdown();
            }
            try {
                if (!executors.awaitTermination(10, TimeUnit.SECONDS)) {
                    System.out.println("Timeout.... Ignore for this case");
                }
            } catch (InterruptedException ignored) {
                System.out.println("Other thread interrupted this shutdown, ignore for this case.");
                Thread.currentThread().interrupt();
            }
        }


    }

    // 记录处理
    public static class ConsumerRecordWorker implements Runnable {

        private ConsumerRecord<String, String> record;

        public ConsumerRecordWorker(ConsumerRecord record) {
            this.record = record;
        }

        @Override
        public void run() {
            System.out.println("Thread - "+ Thread.currentThread().getName());
            System.err.printf("patition = %d , offset = %d, key = %s, value = %s%n",
                    record.partition(), record.offset(), record.key(), record.value());
        }

    }
}

offset手动控制

已经有自动提交为啥还要手动?

重复消费offset定位

多线程的只创建一个consumer的流式无法管控

老版本把客户端消费的offset存在zookeeper里

新版只从1左右的版本合并老版的l h,将offset消费情况转移到了kafka内部变成了一个内部的topic,存储在consumer——obejct。

存在问题:

消费失败,却不知道,无法重复消费

存储容量受限于kafka,单独抽取存放offset的topic 放到redis等存放更新起始位置

代码语言:javascript
复制
    /**
     * 手动指定offset起始位置,手动提交offset
     */
    private static void controlOffset() {
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", "192.168.137.121:9092");
        props.setProperty("group.id", "test");
        props.setProperty("enable.auto.commit", "false");
        props.setProperty("auto.commit.interval.ms", "1000");
        props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

        // rxguo-topic 0 1 有两个partition
        TopicPartition p0 = new TopicPartition(TOPIC_NAME, 0);




        // 消费订阅某个topic的某个分区
        consumer.assign(Arrays.asList(p0));

        while (true) {
            // 手动指定offset起始位置
            consumer.seek(p0, 500);

            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(10000));
            for (TopicPartition partition : records.partitions()) {
                List<ConsumerRecord<String, String>> pRecord = records.records(partition);
                for (ConsumerRecord<String, String> record : pRecord) {
                    System.out.printf("partition = %d, offset = %d, key = %s, value = %s%n", record.partition(), record.offset(), record.key(), record.value());
                }
                long lastOffset = pRecord.get(pRecord.size() - 1).offset();
                // 单个partition中的offset,并且提交
                Map<TopicPartition, OffsetAndMetadata> offset = new HashMap<>();
                // offset的记录的取出起始点,必须+1
                offset.put(partition, new OffsetAndMetadata(lastOffset + 1));
                // 提交offset
                consumer.commitSync(offset);
            }
        }
    }

面试题:consumer限流

场景 令牌桶

限流主要应用在单个topic上

代码语言:javascript
复制
    /**
     * 流量控制-限流
     */
    private static void controlPause() {
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", "192.168.137.121:9092");
        props.setProperty("group.id", "test");
        props.setProperty("enable.auto.commit", "false");
        props.setProperty("auto.commit.interval.ms", "1000");
        props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

        // rxguo-topic 0 1 有两个partition
        TopicPartition p0 = new TopicPartition(TOPIC_NAME, 0);
        TopicPartition p1 = new TopicPartition(TOPIC_NAME, 1);

        // 消费订阅某个topic的某个分区
        consumer.assign(Arrays.asList(p0));

        long totalNum = 40; // 阈值
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(10000));
            for (TopicPartition partition : records.partitions()) {
                List<ConsumerRecord<String, String>> pRecord = records.records(partition);
                long num = 0;
                for (ConsumerRecord<String, String> record : pRecord) {
                    System.out.printf("partition = %d, offset = %d, key = %s, value = %s%n", record.partition(), record.offset(), record.key(), record.value());
                    num++;
                    if (record.partition() == 0) {
                        if (num >= totalNum) {
                            consumer.pause(Arrays.asList(p0)); // 停掉
                        }
                    }
                    if (record.partition() == 1) {
                        if (num == 40) {
                            consumer.resume(Arrays.asList(p0)); // 启动
                        }
                    }

                }
                long lastOffset = pRecord.get(pRecord.size() - 1).offset();
                // 单个partition中的offset,并且提交
                Map<TopicPartition, OffsetAndMetadata> offset = new HashMap<>();
                // offset的记录的取出起始点,必须+1
                offset.put(partition, new OffsetAndMetadata(lastOffset + 1));
                // 提交offset
                consumer.commitSync(offset);
            }
        }
    }

面试题Consumer Rebalance负载均衡

颁发generation作为携带标识(感觉像乐观锁)

心跳请求

新入组请求,则rebalance重新均衡

崩溃和主动离组差不多。被动和主动

提交位移,类似jvm里面的stop-world。新组员加入,老组员重新分配

rebanlance和位移请求间应该成功。

Kafka核心API之Stream

Kafka核心API之Connect

介绍

背景、架构、使用

stream只是处理kafka里面的数据

加载数据到kafka,或把kafka里的数据转出去

connect好处是方便链接第三方数据源,但是需要定制化。

下载第三方链接工具https://www.confluent.io

怎么链接,怎么读数据

本地数据库环境

https://www.cnblogs.com/cnbp/p/12620825.html

其他迁移方案

elk的logsteach

数据库到es,es到其他

mysql数据库+数据库工具navicat

代码语言:javascript
复制
create table users_bak(
`uuid` INT PRIMARY KEY AUTO_INCREMENT,
`name` VARCHAR(20),
`age` INT
)

建立connect

依赖连接包地址https://www.confluent.io/connector/kafka-connect-jdbc/

配服务器

代码语言:javascript
复制
// 解压
[root@hadoop01 plugins]# pwd
/opt/plugins
[root@hadoop01 plugins]# rz

[root@hadoop01 plugins]# ll
总用量 11592
-rw-r--r--. 1 root root 8531741 6月   2 23:09 confluentinc-kafka-connect-jdbc-5.4.1.zip
-rw-r--r--. 1 root root 1006956 6月   2 23:09 mysql-connector-java-5.1.48.jar
-rw-r--r--. 1 root root 2330539 6月   2 23:09 mysql-connector-java-8.0.18.jar
// 下载unzip解压工具
[root@hadoop01 plugins]# yum install -y unzip
// 解压
unzip 文件名

// 拷贝到解压包的lib目录下
[root@hadoop01 lib]# pwd
/opt/plugins/confluentinc-kafka-connect-jdbc-5.4.1/lib

[root@hadoop01 lib]# cp /opt/plugins/mysql-connector-java-*.jar ./
[root@hadoop01 lib]# ll
总用量 11668
-rw-r--r--. 1 root root   17561 3月   5 2020 common-utils-5.4.1.jar
-rw-r--r--. 1 root root  317816 3月   5 2020 jtds-1.3.1.jar
-rw-r--r--. 1 root root  228877 3月   5 2020 kafka-connect-jdbc-5.4.1.jar
-rw-r--r--. 1 root root 1006956 10月 23 03:00 mysql-connector-java-5.1.48.jar
-rw-r--r--. 1 root root 2330539 10月 23 03:00 mysql-connector-java-8.0.18.jar
-rw-r--r--. 1 root root  927447 3月   5 2020 postgresql-42.2.10.jar
-rw-r--r--. 1 root root   41139 3月   5 2020 slf4j-api-1.7.26.jar
-rw-r--r--. 1 root root 7064881 3月   5 2020 sqlite-jdbc-3.25.2.jar

修改kafka配置

source是上传,sink是输出

单机版的配置standalone,集群版的distributed

配置文件

json转换

rest端口打开8083

引入当期文件存储的外部依赖目录

启动命令

代码语言:javascript
复制
## connect启动命令
## 后台启动在 kafka目录下
bin/connect-distributed.sh -daemon config/connect-distributed.properties
## 前台启动
bin/connect-distributed.sh config/connect-distributed.properties


## 验证启动成功
http://192.168.137.121:8083/connector-plugins

http://192.168.137.121:8083/connectors

## 数据源里传数据(windows gitbash执行)
curl -X POST -H 'Content-Type: application/json' -i 'http://192.168.137.121:8083/connectors' \
--data \
'{"name":"imooc-upload-mysql","config":{
"connector.class":"io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url":"jdbc:mysql://10.7.107.11:3306/kafka_study?user=root&password=",
"table.whitelist":"users",
"incrementing.column.name": "uuid",
"mode":"incrementing",
"topic.prefix": "imooc-mysql-"}}'

## 打印存入数据
bin/kafka-console-consumer.sh --bootstrap-server 192.168.137.121:9092 --topic imooc-mysql-users --from-beginning

## 从kafka转出数据
curl -X POST -H 'Content-Type: application/json' -i 'http://192.168.137.121:8083/connectors' \
--data \
'{"name":"imooc-download-mysql","config":{
"connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url":"jdbc:mysql://10.7.107.11:3306/kafka_study?user=root&password=",
"topics":"imooc-mysql-users",
"auto.create":"false",
"insert.mode": "upsert",
"pk.mode":"record_value",
"pk.fields":"uuid",
"table.name.format": "users_bak"}}'


bin/kafka-console-consumer.sh --bootstrap-server 192.168.220.128:9092 --topic test-mysql-jdbc-users --from-beginning


curl -X DELETE -i 'http://192.168.220.128:8083/connectors/load-mysql-data'

验证,虽然是404但是已经成功了

通过Confluent connect 将mysql传入kafka

修改本地数据源和服务器数据连通

先在本地ifconfig,再通过

10.7.107.11

git bash执行

代码语言:javascript
复制
curl -X POST -H 'Content-Type: application/json' -i 'http://192.168.137.121:8083/connectors' \
--data \
'{"name":"imooc-upload-mysql","config":{
"connector.class":"io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url":"jdbc:mysql://192.168.152.1:3306/kafka_study?user=root&password=",
"table.whitelist":"users",
"incrementing.column.name": "uuid",
"mode":"incrementing",
"topic.prefix": "imooc-mysql-"}}'

解决本地数据库拒绝连接问题

https://blog.csdn.net/m0_37460012/article/details/90290148

代码语言:javascript
复制
//虚拟机安装测试网络连接
yum install telnet

数据库字段

https://www.cnblogs.com/ziroro/p/9479869.html

接下来就是查看 my.cnf 了,发现还真的有这个参数

bind-address = 127.0.0.1

把他注释了之后,重启MySQL。连接正常

创建连接+验证连接

传入数据验证读取

kafkaconsumer

通过kafka取出数据

查看任务

验证数据库数据同步

undefined(https://img-blog.csdnimg.cn/20201029105002991.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3dlaXhpbl80MzQ2OTY4MA==,size_16,color_FFFFFF,t_70#pic_center

connect关键字

建议一直选用分布式dis

connect项目管理者

task任务执行

任务重平衡

数据迁移

如果elk,loagstach做一些数据牵引

Connect 之 Debezium MySQL

debezium文档

docker官网

https://docs.docker.com/

镜像:参考docker菜鸟教程

https://www.runoob.com/docker/centos-docker-install.html

下载mysql插件

代码语言:javascript
复制
tar -zxf debezium-connector-mysql-0.9.5.Final-plugin.tar.gz -C ../plugins/
tar -zxf debezium-connector-mongodb-0.9.5.Final-plugin.tar.gz -C ../plugins/

配置启动connect 同上

代码语言:javascript
复制
## connect启动命令
## 后台启动在 kafka目录下
bin/connect-distributed.sh -daemon config/connect-distributed.properties
## 前台启动
bin/connect-distributed.sh config/connect-distributed.properties


## 验证启动成功
http://192.168.137.121:8083/connector-plugins

http://192.168.137.121:8083/connectors

执行连接器json

注意白名单命名规则

表的白名单,数据库.表名

代码语言:javascript
复制
{
  "name": "rxguo-upload-mysql4",  
  "config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector", 
    "database.hostname": "10.7.107.11", 
    "database.port": "3306", 
    "database.user": "root", 
    "database.password": "", 
    "database.server.id": "1008611", 
    "database.server.name": "mysql-connect", 
    "table.whitelist": "kafka_study.users", 
    "database.history.kafka.bootstrap.servers": "192.168.137.121:9092", 
    "database.history.kafka.topic": "dbhistory.mysql-connect", 
    "database.serverTimezone":"Asia/Shanghai" ,
    "include.schema.changes": "true" 
  }
}

postman验证get(config\status)

url:http://192.168.137.121:8083/connectors/rxguo-upload-mysql3

centos控制台 kafka中输出

注意验证规则servername

fulfillment.inventory.products == servername.数据库名.表名

代码语言:javascript
复制
// 查看创建的topic信息
bin/kafka-topics.sh --list --zookeeper 192.168.137.121:2181 rxguo-topic
// 输出kafka对应的连接器内容
bin/kafka-console-consumer.sh --bootstrap-server 192.168.137.121:9092 --topic mysql-connect.kafka_study.users  --from-beginning

将kafka的数据同步到本地

代码语言:javascript
复制
curl -X DELETE -i 'http://192.168.220.128:8083/connectors/load-mysql-data'

Connect 之 Debezium MongoDB

Debezium Mongo保证在集群基础之上

【ManggoDB】安装配置(伪分布式集群搭建)Replia Set

Kafka集群

集群部署并启动

一台机器部署集群

复制三台kafka的文件

cp -r ka/ ka1/

修改配置文件

注意修改端口号,日志生成目录,连接zookeeper信息

启动

依次修改完三个文件再启动。三个不同的kafka文件

验证

依次查询zookeeper、kafka_1\kafka_\2\kafka_3

ps -ef | grep kafka_1

副本集

kafka的进程称为broker。本次模拟broker1、2、3

同一个topic的partition一般分布在不同的节点机器上。每个partition的副本集也分布在其他机器上

代码体验副本集

代码:先创建topic,再打印topic描述

三个partition,三个副本集

可能会先报错,因为强一致性,然后再读取描述

代码语言:javascript
复制
    /**
     * 创建Topic实例
     */
    public static void createTopic() {
        // 副本因子,暂时测试的单节点
        Short rs = 3;
        NewTopic newTopic = new NewTopic(TOPIC_NAME, 3, rs);
        CreateTopicsResult topics = adminClient.createTopics(Arrays.asList(newTopic));
        System.out.println("==CreateTopicsResult==:" + topics);
    }


name:rxguo-topic3, 
desc:(name=rxguo-topic3, internal=false, 
partitions=
(partition=0, leader=192.168.137.121:9292 
	(id: 2 rack: null), replicas=192.168.137.121:9292 
	(id: 2 rack: null), 192.168.137.121:9192 
	(id: 1 rack: null), 192.168.137.121:9392 
	(id: 3 rack: null), isr=192.168.137.121:9292 
	(id: 2 rack: null), 192.168.137.121:9192 
	(id: 1 rack: null), 192.168.137.121:9392 
	(id: 3 rack: null)),
(partition=1, leader=192.168.137.121:9392 
	(id: 3 rack: null), replicas=192.168.137.121:9392 
	(id: 3 rack: null), 192.168.137.121:9292 
	(id: 2 rack: null), 192.168.137.121:9192 
	(id: 1 rack: null), isr=192.168.137.121:9392 
	(id: 3 rack: null), 192.168.137.121:9292 
	(id: 2 rack: null), 192.168.137.121:9192 
	(id: 1 rack: null)),
(partition=2, leader=192.168.137.121:9192 
	(id: 1 rack: null), replicas=192.168.137.121:9192 
	(id: 1 rack: null), 192.168.137.121:9392 
	(id: 3 rack: null), 192.168.137.121:9292 
	(id: 2 rack: null), isr=192.168.137.121:9192 
	(id: 1 rack: null), 192.168.137.121:9392
	(id: 3 rack: null), 192.168.137.121:9292 
	(id: 2 rack: null)), authorizedOperations=null)

关键字

理解副本

kafka节点故障

kafka节点故障:数据基本不会丢失

1副本集

2语义担保 isr和leader同步数据

面试题 kafkaLeader选举

kafka没有采用多数投票来选举leader

选出新的leader,通过controller找速度比较快根据ISR

Controller比较快,是因为broker节点机器抢占zookeeper,谁先抢占入住,谁就最快

管理broker是否损坏,损坏的broker里面有多少leader和fllower

极端情况:leader和ISR备份信息同时丢失

脏选举:

1死等ISR,数据不丢失牺牲时间

2允许数据部分丢失保证时效,选用fllower信息

配置建议

Linux命令

插件上传文件

插件:方便,本地拷贝到公网命令

效果,弹窗选择

yum install -y lrzsz

rz

解压zip文件

代码语言:javascript
复制
// 下载unzip解压工具
[root@hadoop01 plugins]# yum install -y unzip
// 解压
unzip 文件名

解压tar

-C 指定目录

代码语言:javascript
复制
tar -zxf debezium-connector-mysql-0.9.5.Final-plugin.tar.gz -C ../plugins/
tar -zxf debezium-connector-mongodb-0.9.5.Final-plugin.tar.gz -C ../plugins/

面试亮点

生产者key冲突,自定义负载均衡partition解决

单例producer,防止service的频繁创建,上下文切换

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2020/11/13 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 资料
  • 导学
    • 思维导图
      • 问卷
        • 前置基础
          • 适合人群
          • 开发环境准备
            • 虚拟机网络
            • Kafka配置
              • 上传文件并解压
                • 配置环境变量java
                  • zookeeper配置
                    • kafka配置
                      • kafka基础概念
                      • Kafka客户端操作
                        • 区别
                          • Java客户端
                          • Kafka核心API之Producer
                            • API
                              • 源码分析流程
                                • 生产者理解
                                  • Producer自定义Partition负载均衡
                                    • 消息传递保障
                                    • 【实战】Kafka Producer实战——调查问卷
                                    • Kafka核心API之Consumer
                                      • helloworld(傻瓜式 不推荐)
                                        • 存在问题
                                          • 手动提交
                                            • 理解Consumer Group手动控制一到多个分区
                                              • 单Partition提交Offset
                                                • 手动提交offset(订阅topic下的某个或某些partition分区)
                                                  • 多线程创建一个consumer
                                                    • offset手动控制
                                                      • 面试题:consumer限流
                                                        • 面试题Consumer Rebalance负载均衡
                                                        • Kafka核心API之Stream
                                                        • Kafka核心API之Connect
                                                          • 介绍
                                                            • 本地数据库环境
                                                              • 配置文件
                                                                • 启动命令
                                                                • 通过Confluent connect 将mysql传入kafka
                                                                  • 修改本地数据源和服务器数据连通
                                                                    • 解决本地数据库拒绝连接问题
                                                                      • 创建连接+验证连接
                                                                        • 传入数据验证读取
                                                                          • 通过kafka取出数据
                                                                            • connect关键字
                                                                            • Connect 之 Debezium MySQL
                                                                            • Connect 之 Debezium MongoDB
                                                                            • Kafka集群
                                                                              • 集群部署并启动
                                                                                • 副本集
                                                                                  • 代码体验副本集
                                                                                    • kafka节点故障
                                                                                      • kafka节点故障:数据基本不会丢失
                                                                                        • 面试题 kafkaLeader选举
                                                                                          • 配置建议
                                                                                          • Linux命令
                                                                                            • 插件上传文件
                                                                                              • 解压zip文件
                                                                                                • 解压tar
                                                                                                • 面试亮点
                                                                                                相关产品与服务
                                                                                                负载均衡
                                                                                                负载均衡(Cloud Load Balancer,CLB)提供安全快捷的流量分发服务,访问流量经由 CLB 可以自动分配到云中的多台后端服务器上,扩展系统的服务能力并消除单点故障。负载均衡支持亿级连接和千万级并发,可轻松应对大流量访问,满足业务需求。
                                                                                                领券
                                                                                                问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档