实战:微信小程序问卷,微服务整合
java基础+boot
主机
连接工具
域名
虚拟化软件推荐
VM
https://www.cnblogs.com/PrayzzZ/p/11330937.html
VirtualBOX
Linux-CentOS
http://mirror.bit.edu.cn/centos/7.8.2003/isos/x86_64/CentOS-7-x86_64-Minimal-2003.iso
参考
https://blog.csdn.net/wujiele/article/details/92803655
先v8改ip4信息,然后再lan共享v8,vm编辑选择nat模式进行修改网关及网段区间。设置CentOS对应网段
互相ping验证
勾选
分布式流平台
kafka由scala写出来的因此先配置jdk环境
zookeeper也是java开发的也需要jdk
(新建两个文件夹software、install)
解压到指定文件夹
tar -zxf ./software/apache-zookeeper-3.5.8-bin.tar.gz -C ./install/
[root@hadoop01 jdk1.8.0_261]# vi /etc/profile
[root@hadoop01 jdk1.8.0_261]# source /etc/profile
[root@hadoop01 jdk1.8.0_261]# java -version
java version "1.8.0_261"
# PATH
export JAVA_HOME=/opt/install/jdk1.8.0_261
export PATH=$PATH:$JAVA_HOME
1.修改配置文件
conf目录修改默认样例文件。cp复制,修改zookeeper的默认生成位置(vm未分存储磁盘不用划分,和端口号)
默认就行不用改,但是参数注意
/opt/install/apache-zookeeper-3.5.8-bin/conf
[root@hadoop01 conf]# cp zoo_sample.cfg zoo.cfg
dataDir=/tmp/zookeeper
# the port at which the clients will connect
clientPort=2181
2
启动
[root@hadoop01 bin]# ./zkServer.sh start
// 法1:检验链接成功
[root@hadoop01 bin]# ./zkCli.sh
Connecting to localhost:2181
[zk: localhost:2181(CONNECTED) 0]
// 法2:监控关键字
[root@hadoop01 bin]# ps -ef | grep zookeeper
// 法3:自动生成log日志在zookeeper下
1修改conf目录下配置
server里修改listener
advertised.listener
kafka的log(先默认,现在没有划分磁盘)
zookeeper位置(暂时默认在一台机器不用改)
[root@hadoop01 config]# pwd
/opt/install/kafka_2.13-2.6.0/config
[root@hadoop01 config]# vi server.properties
# listeners = PLAINTEXT://your.host.name:9092
listeners=PLAINTEXT://192.168.137.121:9092
advertised.listeners=PLAINTEXT://192.168.137.121:9092
// 日志位置,暂时默认
log.dirs=/tmp/kafka-logs
//链接zookeeper相关,可能为集群
# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=localhost:2181
# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=18000
基本启停命令
启动:kafka文件夹下 后台启动 生成日志
[root@hadoop01 kafka_2.13-2.6.0]# bin/kafka-server-start.sh config/server.properties &
[1] 9134
//检验1 日志输出
Connecting to zookeeper on localhost:2181
// 检验2 捕获进程日志
[root@hadoop01 kafka_2.13-2.6.0]# ps -ef|grep kafka
// 3 直接看日志
[root@hadoop01 kafka_2.13-2.6.0]# cd logs/
[root@hadoop01 logs]# ll
停止
[root@hadoop01 kafka_2.13-2.6.0]# bin/kafka-server-stop.sh
// 进程日志就没了
[root@hadoop01 kafka_2.13-2.6.0]# ps -ef|grep kafka
root 9558 1287 0 17:06 pts/0 00:00:00 grep --color=auto kafka
创建topic
[root@hadoop01 kafka_2.13-2.6.0]# bin/kafka-topics.sh --create --zookeeper 192.168.137.121:2181 --replication-factor 1 --partitions 1 --topic rxguo-topic
Created topic rxguo-topic.
查看已创建的topic信息
[root@hadoop01 kafka_2.13-2.6.0]# bin/kafka-topics.sh --list --zookeeper 192.168.137.121:2181
rxguo-topic
topic为自定义的名称
生产消息到topic
[root@hadoop01 kafka_2.13-2.6.0]# bin/kafka-console-producer.sh --broker-list 192.168.137.121:9092 --topic rxguo-topic
>hello world
>rxguo
消费消息从topic
[root@hadoop01 kafka_2.13-2.6.0]# bin/kafka-console-consumer.sh --bootstrap-server 192.168.137.121:9092 --topic rxguo-topic --from-beginning
hello world
rxguo
welcome on
admin管理级别的操作,管理类工作,单独作为一个API
1 创建Java的SpringBoot文件
删除test测试的包和mvn的依赖,以及碍眼的自动生成文件夹
准备mvn的kafka依赖
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.4.0</version>
</dependency>
或者kafka官网查看依赖包
2 收集整理开发文档
先用kafka-client2.4,官网API文档
通过kafka官网找到adminAPI->JavaDoc
http://kafka.apache.org/26/documentation.html#api
通过JavaDoc可以找到API的kafka
Admin-Client-API
自己可以整理一下配置文档
3 写java
package com.bennyrhys.kafka_study1.admin;
import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.config.ConfigResource;
import java.util.*;
public class AdminSample {
private final static String TOPIC_NAME = "rxguo-topic";
private final static AdminClient adminClient = AdminSample.adminClient();
public static void main(String[] args)throws Exception {
// 创建连接
System.out.println("adminClient:==================================" + adminClient);
// 创建Topic实例
// createTopic();
// 删除Topic实例
// delTopic();
// 获取Topic列表
// topicList();
// 描述Topic
// desccribeTopic();
// 修改Config 旧版Api->新版API(对单体支持不太友好,集群可以)
// alterConfig();
// alterConfig2();
// 查询Config
// describeConfig();
// 增加partition数量
incrPartitions(2);
while(true) {
}
}
/**
* 增加partition数量
* 之所以是incr,是因为在kafka中partitions默认只能增加,不能减少
*
* 检验:
* 法1. 调用函数 描述Topic
* name:rxguo-topic, desc:(name=rxguo-topic, internal=false, partitions=(
* partition=0, leader=192.168.137.121:9092 (id: 0 rack: null), replicas=192.168.137.121:9092 (id: 0 rack: null), isr=192.168.137.121:9092 (id: 0 rack: null)),(
* partition=1, leader=192.168.137.121:9092 (id: 0 rack: null), replicas=192.168.137.121:9092 (id: 0 rack: null), isr=192.168.137.121:9092 (id: 0 rack: null)), authorizedOperations=null)
* 法2. 命令行查看日志
* drwxr-xr-x. 2 root root 178 10月 20 09:19 rxguo-topic-0
* drwxr-xr-x. 2 root root 141 10月 20 10:23 rxguo-topic-1
* [root@hadoop01 kafka-logs]# pwd
* /tmp/kafka-logs
*/
public static void incrPartitions(int partition_num) throws Exception{
Map<String, NewPartitions> partitionsMap = new HashMap<>();
NewPartitions newPartitions = NewPartitions.increaseTo(partition_num);
partitionsMap.put(TOPIC_NAME, newPartitions);
CreatePartitionsResult createPartitionsResult = adminClient.createPartitions(partitionsMap);
createPartitionsResult.all().get();
}
/**
* 修改Topic (版2 新版API incrementalAlterConfigs())
* ConfigEntry(name=preallocate, value=false
* @throws Exception
*/
public static void alterConfig2() throws Exception {
// 组织两个参数
// alterConfigs过期了,但是暂时比incrementalAlterConfigs()好用。用法一样
// adminClient.incrementalAlterConfigs()
HashMap<ConfigResource, Collection<AlterConfigOp>> configMaps = new HashMap<>();
ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, TOPIC_NAME);
// 运算费文件配置 preallocate 默认是false,此处改为true做检验修改
AlterConfigOp alterConfigOp = new AlterConfigOp(new ConfigEntry("preallocate", "false"), AlterConfigOp.OpType.SET);
configMaps.put(configResource, Arrays.asList(alterConfigOp));
AlterConfigsResult alterConfigsResult = adminClient.incrementalAlterConfigs(configMaps);
alterConfigsResult.all().get();
}
/**
* 修改 配置信息(版1 alterConfigs过期了)
* ConfigEntry(name=preallocate, value=true, source=DYNAMIC_TOPIC_CONFIG
* @throws Exception
*/
public static void alterConfig() throws Exception {
// 组织两个参数
// alterConfigs过期了,但是暂时比incrementalAlterConfigs()好用。用法一样
// adminClient.incrementalAlterConfigs()
HashMap<ConfigResource, Config> configMaps = new HashMap<>();
ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, TOPIC_NAME);
// 运算费 preallocate 默认是false,此处改为true做检验修改
Config config = new Config(Arrays.asList(new ConfigEntry("preallocate", "true")));
configMaps.put(configResource, config);
AlterConfigsResult alterConfigsResult = adminClient.alterConfigs(configMaps);
alterConfigsResult.all().get();
}
/**
* 查询Config
* configResource:ConfigResource(type=TOPIC, name='rxguo-topic'),
* Config:Config(entries=[ConfigEntry(name=compression.type,
* value=producer,
* source=DEFAULT_CONFIG,
* isSensitive=false,
* isReadOnly=false, synonyms=[]),
* ConfigEntry(name=leader.replication.throttled.replicas,
* value=, source=DEFAULT_CONFIG,
* isSensitive=false, isReadOnly=false, synonyms=[]),
* ConfigEntry(name=message.downconversion.enable, value=true, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]),
* ConfigEntry(name=min.insync.replicas, value=1, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]),
* ConfigEntry(name=segment.jitter.ms, value=0, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]),
* ConfigEntry(name=cleanup.policy, value=delete, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]),
* ConfigEntry(name=flush.ms, value=9223372036854775807, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=follower.replication.throttled.replicas, value=, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=segment.bytes, value=1073741824, source=STATIC_BROKER_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=retention.ms, value=604800000, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=flush.messages, value=9223372036854775807, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=message.format.version, value=2.6-IV0, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=max.compaction.lag.ms, value=9223372036854775807, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=file.delete.delay.ms, value=60000, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=max.message.bytes, value=1048588, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=min.compaction.lag.ms, value=0, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=message.timestamp.type, value=CreateTime, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]),
* ConfigEntry(name=preallocate, value=false, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=min.cleanable.dirty.ratio, value=0.5, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=index.interval.bytes, value=4096, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=unclean.leader.election.enable, value=false, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=retention.bytes, value=-1, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=delete.retention.ms, value=86400000, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=segment.ms, value=604800000, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=message.timestamp.difference.max.ms, value=9223372036854775807, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=segment.index.bytes, value=10485760, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[])])
*/
public static void describeConfig() throws Exception {
// TODO 这里做预留,集群会处理
ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, TOPIC_NAME);
DescribeConfigsResult describeConfigsResult = adminClient.describeConfigs(Arrays.asList(configResource));
Map<ConfigResource, Config> configResourceConfigMap = describeConfigsResult.all().get();
configResourceConfigMap.entrySet().stream().forEach((entry)->{
System.out.println("configResource:" + entry.getKey() + ",Config:" + entry.getValue());
});
}
/**
* 描述topic
* name:rxguo-topic,
* desc:(name=rxguo-topic,
* internal=false,
* partitions=(partition=0,
* leader=192.168.137.121:9092 (id: 0 rack: null),
* replicas=192.168.137.121:9092 (id: 0 rack: null),
* isr=192.168.137.121:9092 (id: 0 rack: null)),
* authorizedOperations=null)
*/
public static void desccribeTopic() throws Exception {
DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(Arrays.asList(TOPIC_NAME));
Map<String, TopicDescription> stringTopicDescriptionMap = describeTopicsResult.all().get();
Set<Map.Entry<String, TopicDescription>> entries = stringTopicDescriptionMap.entrySet();
entries.stream().forEach((entry)->{
System.out.println("name:" + entry.getKey() + ", desc:" + entry.getValue());
});
}
/**
* 删除topic
*/
public static void delTopic() throws Exception {
DeleteTopicsResult deleteTopicsResult = adminClient().deleteTopics(Arrays.asList(TOPIC_NAME));
deleteTopicsResult.all().get();
}
/**
* 获取topic列表
* @throws Exception
*/
public static void topicList() throws Exception {
// 开启 可打印内部使用,__consumer_offsets(查看是否选择internal)
ListTopicsOptions options = new ListTopicsOptions();
options.listInternal(true);
ListTopicsResult listTopicsResult = adminClient.listTopics(options);
// ListTopicsResult listTopicsResult = adminClient.listTopics();
Set<String> names = listTopicsResult.names().get();
Collection<TopicListing> topicListings = listTopicsResult.listings().get();
Map<String, TopicListing> stringTopicListingMap = listTopicsResult.namesToListings().get();
// 打印names
names.stream().forEach(System.out::println);
// 打印topicListings
topicListings.stream().forEach((listing)->{
System.out.println(listing);
});
}
/**
* 创建Topic实例
*/
public static void createTopic() {
// 副本因子,暂时测试的单节点
Short rs = 1;
NewTopic newTopic = new NewTopic(TOPIC_NAME, 1, rs);
CreateTopicsResult topics = adminClient.createTopics(Arrays.asList(newTopic));
System.out.println("==CreateTopicsResult==:" + topics);
}
/**
* 设置AdminClient
*注意AdminClient为kafka包下的
*/
public static AdminClient adminClient() {
Properties properties = new Properties();
properties.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.137.121:9092");
return AdminClient.create(properties);
}
}
输出
- [AdminClient clientId=adminclient-4219] Kafka admin client initialized
解决:【kafka】Java连接出现Connection refused: no further information的解决方法
kafka的server配置文件需要具体指定机器ip,不能localhost
1去conf修改
localhost 192.168.137.121
localhost变成具体ip,zookeeper.connect=192.168.137.121:2181
2、重启kafka服务
注意,使用kafka自带的生产者和消费者客户端测试时,不能使用localhost,须明确指定本机域名
connect连接配置
负载均衡,同步异步
反推,如何达到高吞吐量。
发送模式
同步(理论上kafka没有同步发送这种说法)
异步
异步回调
注意:offset唯一是针对topic的 不是针对partition的
package com.bennyrhys.kafka_study1.producer;
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
public class ProducerSample {
private final static String TOPIC_NAME = "rxguo-topic";
public static void main(String[] args) throws ExecutionException, InterruptedException {
// producer异步发送
// producerSend();
// producer异步阻塞发送(同步发送)
// producerSyncSend();
// producer异步发送带回调函数
producerSendWithCallBack();
// Producer异步发送带回调函数和负载均衡Partition
// producerSendWithCallBackAndPartition();
}
/**
* Producer异步发送
*
* 验证:命令行打印输出
* key-0values-0
*/
public static void producerSend() {
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.137.121:9092");
properties.put(ProducerConfig.ACKS_CONFIG, "all");
properties.put(ProducerConfig.RETRIES_CONFIG, "0"); // 重试,流程图说了
properties.put(ProducerConfig.BATCH_SIZE_CONFIG, "16384");// 空间计量,批量写入
properties.put(ProducerConfig.LINGER_MS_CONFIG, "1"); // 1s为记时,批量写入
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, "33554432");// 缓存最大
// 序列化k,v(因为要放到通道里传输)扩展类,kafka->org包->comm->serialization
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
// Producer的主对象
Producer<String, String> producer = new KafkaProducer<>(properties);
// 消息对象
for (int i = 0; i < 10; i++) {
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, "key-"+ i + "values-" + i);
producer.send(record);
}
// 所以的通道打开需要关闭
producer.close();
}
/**
* Producer异步阻塞发送(同步发送)
* 验证:命令行打印输出
* partition:0, offset:27
* partition:1, offset:15
* partition 0, 1个数基本一致,offset不太一致
*/
public static void producerSyncSend() throws ExecutionException, InterruptedException {
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.137.121:9092");
properties.put(ProducerConfig.ACKS_CONFIG, "all");
properties.put(ProducerConfig.RETRIES_CONFIG, "0");
properties.put(ProducerConfig.BATCH_SIZE_CONFIG, "16384");
properties.put(ProducerConfig.LINGER_MS_CONFIG, "1");
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, "33554432");
// 序列化k,v(因为要放到通道里传输)扩展类,kafka->org包->comm->serialization
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
// Producer的主对象
Producer<String, String> producer = new KafkaProducer<>(properties);
// 消息对象
for (int i = 0; i < 10; i++) {
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, "key-"+ i + "values-" + i);
Future<RecordMetadata> send = producer.send(record);
// future是发送出去就不管了,但是每次get()就相当于阻塞在这里
RecordMetadata recordMetadata = send.get();
System.out.println("partition:" + recordMetadata.partition() + ", offset:" + recordMetadata.offset());
}
// 所以的通道打开需要关闭
producer.close();
}
/**
* Producer异步发送带回调函数
* send方法,添加参数CallBack
*
*/
public static void producerSendWithCallBack() {
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.137.121:9092");
properties.put(ProducerConfig.ACKS_CONFIG, "all");
properties.put(ProducerConfig.RETRIES_CONFIG, "0");
properties.put(ProducerConfig.BATCH_SIZE_CONFIG, "16384");
properties.put(ProducerConfig.LINGER_MS_CONFIG, "1");
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, "33554432");
// 序列化k,v(因为要放到通道里传输)扩展类,kafka->org包->comm->serialization
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
// Producer的主对象
Producer<String, String> producer = new KafkaProducer<>(properties);
// 消息对象
for (int i = 0; i < 10; i++) {
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, "key-"+ i , "values-" + i);
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
System.out.println("partition:" + recordMetadata.partition() + ", offset:" + recordMetadata.offset());
}
});
}
// 所以的通道打开需要关闭
producer.close();
}
/**
* Producer异步发送带回调函数和负载均衡Partition
* 添加定制化Partition的类,注意类中的截取要和字符格式一致
* partition:0, offset:52
* partition:0, offset:53
* partition:0, offset:54
* partition:0, offset:55
* partition:1, offset:60
* partition:1, offset:61
* partition:1, offset:62
* partition:1, offset:63
* partition:1, offset:64
* partition:1, offset:65
*/
public static void producerSendWithCallBackAndPartition() {
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.137.121:9092");
properties.put(ProducerConfig.ACKS_CONFIG, "all");
properties.put(ProducerConfig.RETRIES_CONFIG, "0");
properties.put(ProducerConfig.BATCH_SIZE_CONFIG, "16384");
properties.put(ProducerConfig.LINGER_MS_CONFIG, "1");
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, "33554432");
// 序列化k,v(因为要放到通道里传输)扩展类,kafka->org包->comm->serialization
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
// 制定partition的类
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "com.bennyrhys.kafka_study1.producer.SamplePartition");
// Producer的主对象
Producer<String, String> producer = new KafkaProducer<>(properties);
// 消息对象
for (int i = 0; i < 10; i++) {
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, "key-"+ i ,"values-" + i);
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
System.out.println("partition:" + recordMetadata.partition() + ", offset:" + recordMetadata.offset());
}
});
}
// 所以的通道打开需要关闭
producer.close();
}
}
TODO
能干预的地方,分区器
ProducerSample.java
/**
* Producer异步发送带回调函数和负载均衡Partition
* 添加定制化Partition的类,注意类中的截取要和字符格式一致
*
* 问题:负载均衡,自定义的partition接受不到key的数据
*/
public static void producerSendWithCallBackAndPartition() {
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.137.121:9092");
properties.put(ProducerConfig.ACKS_CONFIG, "all");
properties.put(ProducerConfig.RETRIES_CONFIG, "0");
properties.put(ProducerConfig.BATCH_SIZE_CONFIG, "16384");
properties.put(ProducerConfig.LINGER_MS_CONFIG, "1");
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, "33554432");
// 序列化k,v(因为要放到通道里传输)扩展类,kafka->org包->comm->serialization
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
// 制定partition的类
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "com.bennyrhys.kafka_study1.producer.SamplePartition");
// Producer的主对象
Producer<String, String> producer = new KafkaProducer<>(properties);
// 消息对象
for (int i = 0; i < 10; i++) {
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, "key-"+ i ,"values-" + i);
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
System.out.println("partition:" + recordMetadata.partition() + ", offset:" + recordMetadata.offset());
}
});
}
// 所以的通道打开需要关闭
producer.close();
}
SamplePartition.java
package com.bennyrhys.kafka_study1.producer;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import java.util.Map;
/**
* 自定义Producer的Partition
*/
public class SamplePartition implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
/**
* key-1
* key-2
*/
System.out.println(key);
System.out.println((String) key);
String keyStr = key + "";
String keyInt = keyStr.substring(4);
System.out.println("keyStr:" + keyStr + "keyInt:" + keyInt);
int i = Integer.parseInt(keyInt);
return i%2;
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> map) {
}
}
producer和consumer共同实现。
主要是靠producer
transId标识生产者,消费者拿到id进行去重。
all级别是正好一次
最多一次是速度最快的,因为发出去了就不校验了。
https://cloud.tencent.com/developer/article/1757007
官方javadoc之consumer
http://kafka.apache.org/26/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
自动提交offset,经典配置直接拿来用
package com.bennyrhys.kafka_study1.consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import javax.swing.*;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
public class ConsumerSample {
private final static String TOPIC_NAME = "rxguo-topic";
public static void main(String[] args) {
helloworld();
}
private static void helloworld() {
Properties props = new Properties();
props.setProperty("bootstrap.servers", "192.168.137.121:9092");
props.setProperty("group.id", "test");
props.setProperty("enable.auto.commit", "true");
props.setProperty("auto.commit.interval.ms", "1000");
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// 消费订阅哪一个topic或者哪几个topic
consumer.subscribe(Arrays.asList(TOPIC_NAME));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(10000));
for (ConsumerRecord<String, String> record : records)
System.out.printf("partition = %d, offset = %d, key = %s, value = %s%n", record.partition(), record.offset(), record.key(), record.value());
}
}
}
offset为偏移量,记录kafka上消费的问题,上次中断的偏移量位置,继续处理
kafka的consumer自动定时批量提交,假如情况特殊(数据没消费完,会造成数据丢失)
变自动为手动提交
/**
* 手动提交
*/
private static void commitedOffset() {
Properties props = new Properties();
props.setProperty("bootstrap.servers", "192.168.137.121:9092");
props.setProperty("group.id", "test");
props.setProperty("enable.auto.commit", "false");
props.setProperty("auto.commit.interval.ms", "1000");
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// 消费订阅哪一个topic或者哪几个topic
consumer.subscribe(Arrays.asList(TOPIC_NAME));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(10000));
for (ConsumerRecord<String, String> record : records) {
// 想把数据保存到数据库,成功就成功,不成功则在当前时间戳回滚
// TODO record 2 db
System.out.printf("partition = %d, offset = %d, key = %s, value = %s%n", record.partition(), record.offset(), record.key(), record.value());
// 如果失败,则回滚,不要提交offset
}
// 手动提交
consumer.commitAsync();
}
}
多个独立的consumer组成的一组group
consumer->partition,可以一对一,可以一对多
一对一
一对多
一个partition不能被多个consumer消费
注意
/**
* 手动提交(并且手动控制partition)
* 场景:适用于多线程时,一个topic下的partition,失败的partition单独处理,而不需要所有的重新来
* 缺点:全部拉取下来,只是节省之后处理逻辑。应该尝试单个partition的拉取
*/
private static void commitedOffsetWithPartition() {
Properties props = new Properties();
props.setProperty("bootstrap.servers", "192.168.137.121:9092");
props.setProperty("group.id", "test");
props.setProperty("enable.auto.commit", "false");
props.setProperty("auto.commit.interval.ms", "1000");
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// 消费订阅哪一个topic或者哪几个topic
consumer.subscribe(Arrays.asList(TOPIC_NAME));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(10000));
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, String>> pRecord = records.records(partition);
for (ConsumerRecord<String, String> record : pRecord) {
System.out.printf("partition = %d, offset = %d, key = %s, value = %s%n", record.partition(), record.offset(), record.key(), record.value());
}
long lastOffset = pRecord.get(pRecord.size() - 1).offset();
// 单个partition中的offset,并且提交
Map<TopicPartition, OffsetAndMetadata> offset = new HashMap<>();
// offset的记录的取出起始点,必须+1
offset.put(partition, new OffsetAndMetadata(lastOffset + 1));
// 提交offset
consumer.commitSync(offset);
}
}
}
创建多个concumer
// commitedOffsetWithPartition2();
/**
* 手动提交offset(并且手动控制partition)
*
*/
private static void commitedOffsetWithPartition2() {
Properties props = new Properties();
props.setProperty("bootstrap.servers", "192.168.137.121:9092");
props.setProperty("group.id", "test");
props.setProperty("enable.auto.commit", "false");
props.setProperty("auto.commit.interval.ms", "1000");
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// rxguo-topic 0 1 有两个partition
TopicPartition p0 = new TopicPartition(TOPIC_NAME, 0);
TopicPartition p1 = new TopicPartition(TOPIC_NAME, 1);
// 消费订阅哪一个topic或者哪几个topic
// consumer.subscribe(Arrays.asList(TOPIC_NAME));
// 消费订阅某个topic的某个分区
consumer.assign(Arrays.asList(p0));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(10000));
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, String>> pRecord = records.records(partition);
for (ConsumerRecord<String, String> record : pRecord) {
System.out.printf("partition = %d, offset = %d, key = %s, value = %s%n", record.partition(), record.offset(), record.key(), record.value());
}
long lastOffset = pRecord.get(pRecord.size() - 1).offset();
// 单个partition中的offset,并且提交
Map<TopicPartition, OffsetAndMetadata> offset = new HashMap<>();
// offset的记录的取出起始点,必须+1
offset.put(partition, new OffsetAndMetadata(lastOffset + 1));
// 提交offset
consumer.commitSync(offset);
}
}
}
场景:非业务、流式系统。推过来成功就成功,减少consumer的资源消耗
GPS数据,业务峰值数据监控
kafka的consumer和producer不同,consumer是线程不安全的
官方建议,自己用需要自己解决线程安全的问题
两种方式
经典线程
package com.bennyrhys.kafka_study1.consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;
public class ConsumerThreadSample {
private final static String TOPIC_NAME="rxguo-topic";
/**
* 这种类型是经典模式、每次创建一个consumer。
* 每个consumer消费一个partition的分区
*/
public static void main(String[] args) throws InterruptedException {
KafkaConsumerRunner r1 = new KafkaConsumerRunner();
Thread t1 = new Thread(r1);
t1.start();
Thread.sleep(15000);
r1.shutdown();
}
public static class KafkaConsumerRunner implements Runnable{
private final AtomicBoolean closed = new AtomicBoolean(false);
private final KafkaConsumer consumer;
public KafkaConsumerRunner() {
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.137.121:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "false");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
consumer = new KafkaConsumer<>(props);
TopicPartition p0 = new TopicPartition(TOPIC_NAME, 0);
TopicPartition p1 = new TopicPartition(TOPIC_NAME, 1);
consumer.assign(Arrays.asList(p0,p1));
}
public void run() {
try {
while(!closed.get()) {
//处理消息
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(10000));
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, String>> pRecord = records.records(partition);
// 处理每个分区的消息
for (ConsumerRecord<String, String> record : pRecord) {
System.out.printf("patition = %d , offset = %d, key = %s, value = %s%n",
record.partition(),record.offset(), record.key(), record.value());
}
// 返回去告诉kafka新的offset
long lastOffset = pRecord.get(pRecord.size() - 1).offset();
// 注意加1
consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
}
}
}catch(WakeupException e) {
if(!closed.get()) {
throw e;
}
}finally {
consumer.close();
}
}
public void shutdown() {
closed.set(true);
consumer.wakeup();
}
}
}
只创建一个consumer
package com.bennyrhys.kafka_study1.consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* 类似生产环境(只创建一个consumer)
* 创建一个线程池,只有一个kafka的consumer
* 每个consumer一个单独的线程来处理
*/
public class ConsumerRecordThreadSample {
private final static String TOPIC_NAME = "rxguo-topic";
public static void main(String[] args) throws InterruptedException {
String brokerList = "192.168.137.121:9092";
String groupId = "test";
int workerNum = 5;
CunsumerExecutor consumers = new CunsumerExecutor(brokerList, groupId, TOPIC_NAME);
consumers.execute(workerNum);
Thread.sleep(1000000);
consumers.shutdown();
}
// Consumer处理
public static class CunsumerExecutor{
private final KafkaConsumer<String, String> consumer;
private ExecutorService executors;
public CunsumerExecutor(String brokerList, String groupId, String topic) {
Properties props = new Properties();
props.put("bootstrap.servers", brokerList);
props.put("group.id", groupId);
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(topic));
}
public void execute(int workerNum) {
executors = new ThreadPoolExecutor(workerNum, workerNum, 0L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(1000), new ThreadPoolExecutor.CallerRunsPolicy());
while (true) {
ConsumerRecords<String, String> records = consumer.poll(200);
for (final ConsumerRecord record : records) {
executors.submit(new ConsumerRecordWorker(record));
}
}
}
public void shutdown() {
if (consumer != null) {
consumer.close();
}
if (executors != null) {
executors.shutdown();
}
try {
if (!executors.awaitTermination(10, TimeUnit.SECONDS)) {
System.out.println("Timeout.... Ignore for this case");
}
} catch (InterruptedException ignored) {
System.out.println("Other thread interrupted this shutdown, ignore for this case.");
Thread.currentThread().interrupt();
}
}
}
// 记录处理
public static class ConsumerRecordWorker implements Runnable {
private ConsumerRecord<String, String> record;
public ConsumerRecordWorker(ConsumerRecord record) {
this.record = record;
}
@Override
public void run() {
System.out.println("Thread - "+ Thread.currentThread().getName());
System.err.printf("patition = %d , offset = %d, key = %s, value = %s%n",
record.partition(), record.offset(), record.key(), record.value());
}
}
}
已经有自动提交为啥还要手动?
重复消费offset定位
多线程的只创建一个consumer的流式无法管控
老版本把客户端消费的offset存在zookeeper里
新版只从1左右的版本合并老版的l h,将offset消费情况转移到了kafka内部变成了一个内部的topic,存储在consumer——obejct。
存在问题:
消费失败,却不知道,无法重复消费
存储容量受限于kafka,单独抽取存放offset的topic 放到redis等存放更新起始位置
/**
* 手动指定offset起始位置,手动提交offset
*/
private static void controlOffset() {
Properties props = new Properties();
props.setProperty("bootstrap.servers", "192.168.137.121:9092");
props.setProperty("group.id", "test");
props.setProperty("enable.auto.commit", "false");
props.setProperty("auto.commit.interval.ms", "1000");
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// rxguo-topic 0 1 有两个partition
TopicPartition p0 = new TopicPartition(TOPIC_NAME, 0);
// 消费订阅某个topic的某个分区
consumer.assign(Arrays.asList(p0));
while (true) {
// 手动指定offset起始位置
consumer.seek(p0, 500);
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(10000));
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, String>> pRecord = records.records(partition);
for (ConsumerRecord<String, String> record : pRecord) {
System.out.printf("partition = %d, offset = %d, key = %s, value = %s%n", record.partition(), record.offset(), record.key(), record.value());
}
long lastOffset = pRecord.get(pRecord.size() - 1).offset();
// 单个partition中的offset,并且提交
Map<TopicPartition, OffsetAndMetadata> offset = new HashMap<>();
// offset的记录的取出起始点,必须+1
offset.put(partition, new OffsetAndMetadata(lastOffset + 1));
// 提交offset
consumer.commitSync(offset);
}
}
}
场景 令牌桶
限流主要应用在单个topic上
/**
* 流量控制-限流
*/
private static void controlPause() {
Properties props = new Properties();
props.setProperty("bootstrap.servers", "192.168.137.121:9092");
props.setProperty("group.id", "test");
props.setProperty("enable.auto.commit", "false");
props.setProperty("auto.commit.interval.ms", "1000");
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// rxguo-topic 0 1 有两个partition
TopicPartition p0 = new TopicPartition(TOPIC_NAME, 0);
TopicPartition p1 = new TopicPartition(TOPIC_NAME, 1);
// 消费订阅某个topic的某个分区
consumer.assign(Arrays.asList(p0));
long totalNum = 40; // 阈值
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(10000));
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, String>> pRecord = records.records(partition);
long num = 0;
for (ConsumerRecord<String, String> record : pRecord) {
System.out.printf("partition = %d, offset = %d, key = %s, value = %s%n", record.partition(), record.offset(), record.key(), record.value());
num++;
if (record.partition() == 0) {
if (num >= totalNum) {
consumer.pause(Arrays.asList(p0)); // 停掉
}
}
if (record.partition() == 1) {
if (num == 40) {
consumer.resume(Arrays.asList(p0)); // 启动
}
}
}
long lastOffset = pRecord.get(pRecord.size() - 1).offset();
// 单个partition中的offset,并且提交
Map<TopicPartition, OffsetAndMetadata> offset = new HashMap<>();
// offset的记录的取出起始点,必须+1
offset.put(partition, new OffsetAndMetadata(lastOffset + 1));
// 提交offset
consumer.commitSync(offset);
}
}
}
颁发generation作为携带标识(感觉像乐观锁)
心跳请求
新入组请求,则rebalance重新均衡
崩溃和主动离组差不多。被动和主动
提交位移,类似jvm里面的stop-world。新组员加入,老组员重新分配
rebanlance和位移请求间应该成功。
背景、架构、使用
stream只是处理kafka里面的数据
加载数据到kafka,或把kafka里的数据转出去
connect好处是方便链接第三方数据源,但是需要定制化。
下载第三方链接工具https://www.confluent.io
怎么链接,怎么读数据
https://www.cnblogs.com/cnbp/p/12620825.html
其他迁移方案
elk的logsteach
数据库到es,es到其他
create table users_bak(
`uuid` INT PRIMARY KEY AUTO_INCREMENT,
`name` VARCHAR(20),
`age` INT
)
建立connect
依赖连接包地址https://www.confluent.io/connector/kafka-connect-jdbc/
配服务器
// 解压
[root@hadoop01 plugins]# pwd
/opt/plugins
[root@hadoop01 plugins]# rz
[root@hadoop01 plugins]# ll
总用量 11592
-rw-r--r--. 1 root root 8531741 6月 2 23:09 confluentinc-kafka-connect-jdbc-5.4.1.zip
-rw-r--r--. 1 root root 1006956 6月 2 23:09 mysql-connector-java-5.1.48.jar
-rw-r--r--. 1 root root 2330539 6月 2 23:09 mysql-connector-java-8.0.18.jar
// 下载unzip解压工具
[root@hadoop01 plugins]# yum install -y unzip
// 解压
unzip 文件名
// 拷贝到解压包的lib目录下
[root@hadoop01 lib]# pwd
/opt/plugins/confluentinc-kafka-connect-jdbc-5.4.1/lib
[root@hadoop01 lib]# cp /opt/plugins/mysql-connector-java-*.jar ./
[root@hadoop01 lib]# ll
总用量 11668
-rw-r--r--. 1 root root 17561 3月 5 2020 common-utils-5.4.1.jar
-rw-r--r--. 1 root root 317816 3月 5 2020 jtds-1.3.1.jar
-rw-r--r--. 1 root root 228877 3月 5 2020 kafka-connect-jdbc-5.4.1.jar
-rw-r--r--. 1 root root 1006956 10月 23 03:00 mysql-connector-java-5.1.48.jar
-rw-r--r--. 1 root root 2330539 10月 23 03:00 mysql-connector-java-8.0.18.jar
-rw-r--r--. 1 root root 927447 3月 5 2020 postgresql-42.2.10.jar
-rw-r--r--. 1 root root 41139 3月 5 2020 slf4j-api-1.7.26.jar
-rw-r--r--. 1 root root 7064881 3月 5 2020 sqlite-jdbc-3.25.2.jar
修改kafka配置
source是上传,sink是输出
单机版的配置standalone,集群版的distributed
json转换
rest端口打开8083
引入当期文件存储的外部依赖目录
## connect启动命令
## 后台启动在 kafka目录下
bin/connect-distributed.sh -daemon config/connect-distributed.properties
## 前台启动
bin/connect-distributed.sh config/connect-distributed.properties
## 验证启动成功
http://192.168.137.121:8083/connector-plugins
http://192.168.137.121:8083/connectors
## 数据源里传数据(windows gitbash执行)
curl -X POST -H 'Content-Type: application/json' -i 'http://192.168.137.121:8083/connectors' \
--data \
'{"name":"imooc-upload-mysql","config":{
"connector.class":"io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url":"jdbc:mysql://10.7.107.11:3306/kafka_study?user=root&password=",
"table.whitelist":"users",
"incrementing.column.name": "uuid",
"mode":"incrementing",
"topic.prefix": "imooc-mysql-"}}'
## 打印存入数据
bin/kafka-console-consumer.sh --bootstrap-server 192.168.137.121:9092 --topic imooc-mysql-users --from-beginning
## 从kafka转出数据
curl -X POST -H 'Content-Type: application/json' -i 'http://192.168.137.121:8083/connectors' \
--data \
'{"name":"imooc-download-mysql","config":{
"connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url":"jdbc:mysql://10.7.107.11:3306/kafka_study?user=root&password=",
"topics":"imooc-mysql-users",
"auto.create":"false",
"insert.mode": "upsert",
"pk.mode":"record_value",
"pk.fields":"uuid",
"table.name.format": "users_bak"}}'
bin/kafka-console-consumer.sh --bootstrap-server 192.168.220.128:9092 --topic test-mysql-jdbc-users --from-beginning
curl -X DELETE -i 'http://192.168.220.128:8083/connectors/load-mysql-data'
验证,虽然是404但是已经成功了
先在本地ifconfig,再通过
10.7.107.11
git bash执行
curl -X POST -H 'Content-Type: application/json' -i 'http://192.168.137.121:8083/connectors' \
--data \
'{"name":"imooc-upload-mysql","config":{
"connector.class":"io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url":"jdbc:mysql://192.168.152.1:3306/kafka_study?user=root&password=",
"table.whitelist":"users",
"incrementing.column.name": "uuid",
"mode":"incrementing",
"topic.prefix": "imooc-mysql-"}}'
https://blog.csdn.net/m0_37460012/article/details/90290148
//虚拟机安装测试网络连接
yum install telnet
数据库字段
https://www.cnblogs.com/ziroro/p/9479869.html
接下来就是查看 my.cnf 了,发现还真的有这个参数
bind-address = 127.0.0.1
把他注释了之后,重启MySQL。连接正常
kafkaconsumer
查看任务
验证数据库数据同步
建议一直选用分布式dis
connect项目管理者
task任务执行
任务重平衡
数据迁移
如果elk,loagstach做一些数据牵引
debezium文档
docker官网
镜像:参考docker菜鸟教程
https://www.runoob.com/docker/centos-docker-install.html
下载mysql插件
tar -zxf debezium-connector-mysql-0.9.5.Final-plugin.tar.gz -C ../plugins/
tar -zxf debezium-connector-mongodb-0.9.5.Final-plugin.tar.gz -C ../plugins/
配置启动connect 同上
## connect启动命令
## 后台启动在 kafka目录下
bin/connect-distributed.sh -daemon config/connect-distributed.properties
## 前台启动
bin/connect-distributed.sh config/connect-distributed.properties
## 验证启动成功
http://192.168.137.121:8083/connector-plugins
http://192.168.137.121:8083/connectors
执行连接器json
注意白名单命名规则
表的白名单,数据库.表名
{
"name": "rxguo-upload-mysql4",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "10.7.107.11",
"database.port": "3306",
"database.user": "root",
"database.password": "",
"database.server.id": "1008611",
"database.server.name": "mysql-connect",
"table.whitelist": "kafka_study.users",
"database.history.kafka.bootstrap.servers": "192.168.137.121:9092",
"database.history.kafka.topic": "dbhistory.mysql-connect",
"database.serverTimezone":"Asia/Shanghai" ,
"include.schema.changes": "true"
}
}
postman验证get(config\status)
url:http://192.168.137.121:8083/connectors/rxguo-upload-mysql3
centos控制台 kafka中输出
注意验证规则servername
fulfillment.inventory.products == servername.数据库名.表名
// 查看创建的topic信息
bin/kafka-topics.sh --list --zookeeper 192.168.137.121:2181 rxguo-topic
// 输出kafka对应的连接器内容
bin/kafka-console-consumer.sh --bootstrap-server 192.168.137.121:9092 --topic mysql-connect.kafka_study.users --from-beginning
将kafka的数据同步到本地
curl -X DELETE -i 'http://192.168.220.128:8083/connectors/load-mysql-data'
Debezium Mongo保证在集群基础之上
【ManggoDB】安装配置(伪分布式集群搭建)Replia Set
一台机器部署集群
复制三台kafka的文件
cp -r ka/ ka1/
修改配置文件
注意修改端口号,日志生成目录,连接zookeeper信息
启动
依次修改完三个文件再启动。三个不同的kafka文件
验证
依次查询zookeeper、kafka_1\kafka_\2\kafka_3
ps -ef | grep kafka_1
kafka的进程称为broker。本次模拟broker1、2、3
同一个topic的partition一般分布在不同的节点机器上。每个partition的副本集也分布在其他机器上
代码:先创建topic,再打印topic描述
三个partition,三个副本集
可能会先报错,因为强一致性,然后再读取描述
/**
* 创建Topic实例
*/
public static void createTopic() {
// 副本因子,暂时测试的单节点
Short rs = 3;
NewTopic newTopic = new NewTopic(TOPIC_NAME, 3, rs);
CreateTopicsResult topics = adminClient.createTopics(Arrays.asList(newTopic));
System.out.println("==CreateTopicsResult==:" + topics);
}
name:rxguo-topic3,
desc:(name=rxguo-topic3, internal=false,
partitions=
(partition=0, leader=192.168.137.121:9292
(id: 2 rack: null), replicas=192.168.137.121:9292
(id: 2 rack: null), 192.168.137.121:9192
(id: 1 rack: null), 192.168.137.121:9392
(id: 3 rack: null), isr=192.168.137.121:9292
(id: 2 rack: null), 192.168.137.121:9192
(id: 1 rack: null), 192.168.137.121:9392
(id: 3 rack: null)),
(partition=1, leader=192.168.137.121:9392
(id: 3 rack: null), replicas=192.168.137.121:9392
(id: 3 rack: null), 192.168.137.121:9292
(id: 2 rack: null), 192.168.137.121:9192
(id: 1 rack: null), isr=192.168.137.121:9392
(id: 3 rack: null), 192.168.137.121:9292
(id: 2 rack: null), 192.168.137.121:9192
(id: 1 rack: null)),
(partition=2, leader=192.168.137.121:9192
(id: 1 rack: null), replicas=192.168.137.121:9192
(id: 1 rack: null), 192.168.137.121:9392
(id: 3 rack: null), 192.168.137.121:9292
(id: 2 rack: null), isr=192.168.137.121:9192
(id: 1 rack: null), 192.168.137.121:9392
(id: 3 rack: null), 192.168.137.121:9292
(id: 2 rack: null)), authorizedOperations=null)
关键字
理解副本
1副本集
2语义担保 isr和leader同步数据
kafka没有采用多数投票来选举leader
选出新的leader,通过controller找速度比较快根据ISR
Controller比较快,是因为broker节点机器抢占zookeeper,谁先抢占入住,谁就最快
管理broker是否损坏,损坏的broker里面有多少leader和fllower
极端情况:leader和ISR备份信息同时丢失
脏选举:
1死等ISR,数据不丢失牺牲时间
2允许数据部分丢失保证时效,选用fllower信息
插件:方便,本地拷贝到公网命令
效果,弹窗选择
yum install -y lrzsz
rz
// 下载unzip解压工具
[root@hadoop01 plugins]# yum install -y unzip
// 解压
unzip 文件名
-C 指定目录
tar -zxf debezium-connector-mysql-0.9.5.Final-plugin.tar.gz -C ../plugins/
tar -zxf debezium-connector-mongodb-0.9.5.Final-plugin.tar.gz -C ../plugins/
生产者key冲突,自定义负载均衡partition解决
单例producer,防止service的频繁创建,上下文切换