前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >springboot第69集:字节跳动后端二面经,一文让你走出微服务迷雾架构周刊

springboot第69集:字节跳动后端二面经,一文让你走出微服务迷雾架构周刊

作者头像
达达前端
发布2024-04-10 08:56:57
730
发布2024-04-10 08:56:57
举报
文章被收录于专栏:达达前端达达前端

1.  简介

1.1  消息队列简介
1.1.1  什么是消息队列

消息队列,英文名:Message Queue,经常缩写为MQ。从字面上来理解,消息队列是一种用来存储消息的队列。来看一下下面的代码:

// 1. 创建一个保存字符串的队列 QueuestringQueue = new LinkedList();

// 2. 往消息队列中放入消息 stringQueue.offer( "hello" );

// 3. 从消息队列中取出消息并打印 System.out.println(stringQueue.poll());

上述代码,创建了一个队列,先往队列中添加了一个消息,然后又从队列中取出了一个消息。这说明了队列是可以用来存取消息的。

我们可以简单理解消息队列就是将需要传输的数据存放在队列中

1.1.1  消息队列中间件

消息队列中间件就是用来存储消息的软件(组件)。举个例子来理解,为了分析网站的用户行为,我们需要记录用户的访问日志。这些一条条的日志,可以看成是一条条的消息,我们可以将它们保存到消息队列中。将来有一些应用程序需要处理这些日志,就可以随时将这些消息取出来处理。

目前市面上的消息队列有很多,例如:Kafka、RabbitMQ、ActiveMQ、RocketMQ、ZeroMQ等。

消息队列的应用场景

电商网站中,新的用户注册时,需要将用户的信息保存到数据库中,同时还需要额外发送注册的邮件通知、以及短信注册码给用户。但因为发送邮件、发送注册短信需要连接外部的服务器,需要额外等待一段时间,此时,就可以使用消息队列来进行异步处理,从而实现快速响应。

22709c50c4ce5015790565fe686a86c7.png
22709c50c4ce5015790565fe686a86c7.png

image.png

fc3974dc9b6f9aa35624948d6f2fa5bd.png
fc3974dc9b6f9aa35624948d6f2fa5bd.png

image.png

af742d61a9749f97c586fb4ad3c36309.png
af742d61a9749f97c586fb4ad3c36309.png

1.1.1.1  日志处理(大数据领域常见)

大型电商网站(淘宝、京东、国美、苏宁...)、App(抖音、美团、滴滴等)等需要分析用户行为,要根据用户的访问行为来发现用户的喜好以及活跃情况,需要在页面上收集大量的用户访问信息。

179666fd61444ea384815061b174ca13.png
179666fd61444ea384815061b174ca13.png

image.png

b1eb5c6c229b2e30a2fe09a27e3a3af1.png
b1eb5c6c229b2e30a2fe09a27e3a3af1.png

image.png

08d33d615ac628339d602103aaa9498c.png
08d33d615ac628339d602103aaa9498c.png

image.png

3be94c2e34d3d7c6cadad915fafc3fc7.png
3be94c2e34d3d7c6cadad915fafc3fc7.png

image.png

d4fa5a6a4a32db81f29b598551e629af.png
d4fa5a6a4a32db81f29b598551e629af.png

image.png

点对点模式特点:

代码语言:javascript
复制
每个消息只有一个接收者(Consumer)(即一旦被消费,消息就不再在消息队列中)

发送者和接收者间没有依赖性,发送者发送消息之后,不管有没有接收者在运行,都不会影响到发送者下次发送消息;

接收者在成功接收消息之后需向队列应答成功,以便消息队列删除当前接收的消息;
d735510816ec75638946616d768c1dd2.png
d735510816ec75638946616d768c1dd2.png

image.png

7590897799fc4a08b097c6b045c5cf98.png
7590897799fc4a08b097c6b045c5cf98.png

image.png

Kafka是由Apache软件基金会开发的一个开源流平台,由Scala和Java编写。Kafka的Apache官网是这样介绍Kakfa的。

Apache Kafka是一个分布式流平台。一个分布式的流平台应该包含3点关键的能力:

1. 发布和订阅流数据流,类似于消息队列或者是企业消息传递系统

2. 以容错的持久化方式存储数据流

处理数据流

1. Publish and subscribe:发布与订阅

2. Store:存储

3. Process:处理

我们通常将Apache Kafka用在两类程序:

1. 建立实时数据管道,以可靠地在系统或应用程序之间获取数据

2. 构建实时流应用程序,以转换或响应数据流

d98869cb89133c7db0ad34cc5acc1950.png
d98869cb89133c7db0ad34cc5acc1950.png

image.png

上图,我们可以看到:

1. Producers:可以有很多的应用程序,将消息数据放入到Kafka集群中。

2. Consumers:可以有很多的应用程序,将消息数据从Kafka集群中拉取出来。

3. Connectors:Kafka的连接器可以将数据库中的数据导入到Kafka,也可以将Kafka的数据导出到

数据库中。

4. Stream Processors:流处理器可以Kafka中拉取数据,也可以将数据写入到Kafka中。

Kafka比ActiveMQ牛逼得多

特性

ActiveMQ

RabbitMQ

Kafka

RocketMQ

所属社区/公司

Apache

Mozilla Public License

Apache

Apache/Ali

成熟度

成熟

成熟

成熟

比较成熟

生产者-消费者模式

支持

支持

支持

支持

发布-订阅

支持

支持

支持

支持

REQUEST-REPLY

支持

支持

-

支持

API完备性

低(静态配置)

多语言支持

支持JAVA优先

语言无关

支持,JAVA优先

支持

单机呑吐量

万级(最差)

万级

十万级

十万级(最高)

消息延迟

-

微秒级

毫秒级

-

可用性

高(主从)

高(主从)

非常高(分布式)

消息丢失

-

理论上不会丢失

-

消息重复

-

可控制

理论上会有重复

-

事务

支持

不支持

支持

支持

文档的完备性

提供快速入门

首次部署难度

-

可以注意到Kafka的版本号为:kafka_2.12-2.4.1,因为kafka主要是使用scala语言开发的,2.12为scala的版本号。http://kafka.apache.org/downloads可以查看到每个版本的发布时间。

7ceddcdaf89b0c0732cfaaa61be069ab.png
7ceddcdaf89b0c0732cfaaa61be069ab.png

image.png

ab9f733bc9bab4cc775df4641189b859.png
ab9f733bc9bab4cc775df4641189b859.png

image.png

662489c8f06234ea418fcee92030b60d.png
662489c8f06234ea418fcee92030b60d.png

image.png

47bad20b06ac38bb2405808915891b75.png
47bad20b06ac38bb2405808915891b75.png

image.png

381fc1f613df02573a2c712ad959085a.png
381fc1f613df02573a2c712ad959085a.png

image.png

目录名称

说明

bin

Kafka的所有执行脚本都在这里。例如:启动Kafka服务器、创建Topic、生产者、消费者程序等等

config

Kafka的所有配置文件

libs

运行Kafka所需要的所有JAR包

logs

Kafka的所有日志文件,如果Kafka出现一些问题,需要到该目录中去查看异常信息

site-docs

Kafka的网站帮助文件

9d2bf1f2e4a37d342f2b17dc4aa43926.png
9d2bf1f2e4a37d342f2b17dc4aa43926.png

image.png

073c75ce65bc7313fa39a485b43e17d1.png
073c75ce65bc7313fa39a485b43e17d1.png

image.png

9ec975649e379c3b93f918553caedd6d.png
9ec975649e379c3b93f918553caedd6d.png

image.png

1645b38a338e56aa3a1b1f35e9b8c6d3.png
1645b38a338e56aa3a1b1f35e9b8c6d3.png

image.png

8ea2187641a8b707907e8bf6b38520d5.png
8ea2187641a8b707907e8bf6b38520d5.png

image.png

f58608acf89dae0e23873508be6902ab.png
f58608acf89dae0e23873508be6902ab.png

image.png

2cc3f9b00567344ffbaff14d98948012.png
2cc3f9b00567344ffbaff14d98948012.png

image.png

c2b006a8f6ede522c3ad9f66ac20465b.png
c2b006a8f6ede522c3ad9f66ac20465b.png

image.png

1fe665119c0d7987e9382655f12c328b.png
1fe665119c0d7987e9382655f12c328b.png

image.png

7f3d757e900fada83945f5e0577ba123.png
7f3d757e900fada83945f5e0577ba123.png

image.png

基于1个分区1个副本的基准测试

测试步骤:

1. 启动Kafka集群

2. 创建一个1个分区1个副本的topic: benchmark

3. 同时运行生产者、消费者基准测试程序

4. 观察结果

f133c0a1d234aaf79cafbd2a51206bdd.png
f133c0a1d234aaf79cafbd2a51206bdd.png

image.png

f17c374d2a730489bdbf3ca6b2197727.png
f17c374d2a730489bdbf3ca6b2197727.png

image.png

557f2c6f9ccb609835d6d2a5927d827c.png
557f2c6f9ccb609835d6d2a5927d827c.png

image.png

37c71d1966b5f5eff328d6be5bc01a90.png
37c71d1966b5f5eff328d6be5bc01a90.png

image.png

9eb241bacac223171dda609a7bd70db2.png
9eb241bacac223171dda609a7bd70db2.png

image.png

6d2b1de3019442469fd1412291f46c5b.png
6d2b1de3019442469fd1412291f46c5b.png

image.png

ecb0c48d8aa6dbf94e9f584f3bdc86ba.png
ecb0c48d8aa6dbf94e9f584f3bdc86ba.png

image.png

a59735c092ecfb3f79958e566bcbf034.png
a59735c092ecfb3f79958e566bcbf034.png

image.png

6f401e9a8c7189154db04b6ec605b3e2.png
6f401e9a8c7189154db04b6ec605b3e2.png

image.png

5b44c82671860078469bea8ddd1ed257.png
5b44c82671860078469bea8ddd1ed257.png

image.png

4902a2fd949ebca8f5689c0816eacec7.png
4902a2fd949ebca8f5689c0816eacec7.png

image.png

734593dbaa3fab104f6749e8bee25748.png
734593dbaa3fab104f6749e8bee25748.png

image.png

047222f5c7d69b05ba62024daae6c8c5.png
047222f5c7d69b05ba62024daae6c8c5.png

image.png

3edf75d0264c4ce4aee9aa7a21e104eb.png
3edf75d0264c4ce4aee9aa7a21e104eb.png

image.png

4461c5a23df00284e589828bb4b5e347.png
4461c5a23df00284e589828bb4b5e347.png

image.png

1b937ad980d7c6632ce13ade7e5c61b8.png
1b937ad980d7c6632ce13ade7e5c61b8.png

image.png

e5286332a0baca1bd968108c84caab7e.png
e5286332a0baca1bd968108c84caab7e.png

image.png

ef0a68245579a8f8c2bfa7bacd8ef648.png
ef0a68245579a8f8c2bfa7bacd8ef648.png

image.png

代码语言:javascript
复制
<**repositories**><!-- 代码库 -->  
    <**repository**>  
        <**id**>central</**id**>  
        <**url**>http://maven.aliyun.com/nexus/content/groups/public//</**url**>  
        <**releases**>  
            <**enabled**>true</**enabled**>  
        </**releases**>  
        <**snapshots**>  
            <**enabled**>true</**enabled**>  
            <**updatePolicy**>always</**updatePolicy**>  
            <**checksumPolicy**>fail</**checksumPolicy**>  
        </**snapshots**>  
    </**repository**>  
</**repositories**>  
  
<**dependencies**>  
    <!-- kafka客户端工具 -->  
    <**dependency**>  
        <**groupId**>org.apache.kafka</**groupId**>  
        <**artifactId**>kafka-clients</**artifactId**>  
        <**version**>2.4.1</**version**>  
    </**dependency**>  
  
    <!-- 工具类 -->  
    <**dependency**>  
        <**groupId**>org.apache.commons</**groupId**>  
        <**artifactId**>commons-io</**artifactId**>  
        <**version**>1.3.2</**version**>  
    </**dependency**>  
  
    <!-- SLF桥接LOG4J日志 -->  
    <**dependency**>  
        <**groupId**>org.slf4j</**groupId**>  
        <**artifactId**>slf4j-log4j12</**artifactId**>  
        <**version**>1.7.6</**version**>  
    </**dependency**>  
  
    <!-- SLOG4J日志 -->  
    <**dependency**>  
        <**groupId**>log4j</**groupId**>  
        <**artifactId**>log4j</**artifactId**>  
        <**version**>1.2.16</**version**>  
    </**dependency**>  
</**dependencies**>  
  
<**build**>  
    <**plugins**>  
        <**plugin**>  
            <**groupId**>org.apache.maven.plugins</**groupId**>  
            <**artifactId**>maven-compiler-plugin</**artifactId**>  
            <**version**>3.7.0</**version**>  
            <**configuration**>  
                <**source**>1.8</**source**>  
                <**target**>1.8</**target**>  
            </**configuration**>  
        </**plugin**>  
    </**plugins**>  
</**build**>

将log4j.properties配置文件放入到resources文件夹中

代码语言:javascript
复制
**log4j.rootLogger**=**INFO,stdout****  
****log4j.appender.stdout**=**org.apache.log4j.ConsoleAppender****  
****log4j.appender.stdout.layout**=**org.apache.log4j.PatternLayout****  
****log4j.appender.stdout.layout.ConversionPattern**= **%5p - %m%n**
f716a16a30e86e5da0d8ce6004d4623e.png
f716a16a30e86e5da0d8ce6004d4623e.png

image.png

代码语言:javascript
复制
**public class** KafkaProducerTest {  
    **public static void** main(String[] args) {  
        // 1. 创建用于连接Kafka的Properties配置  
        Properties props = **new** Properties();  
        props.put( **"bootstrap.servers"** , **"192.168.88.100:9092"** );  
        props.put( **"acks"** , **"all"** );  
        props.put( **"key.serializer"** , **"org.apache.kafka.common.serialization.StringSerializer"** );  
        props.put( **"value.serializer"** , **"org.apache.kafka.common.serialization.StringSerializer"** );  
  
        // 2. 创建一个生产者对象KafkaProducer  
        KafkaProducer<String, String> producer = **new** KafkaProducer<String, String>(props);  
  
        // 3. 调用send发送1-100消息到指定Topic test  
        **for**(**int** i = 0; i < 100; ++i) {  
            **try** {  
                // 获取返回值Future,该对象封装了返回值  
                Future<RecordMetadata> future = producer.send(**new** ProducerRecord<String, String>( **"test"** , **null**, i + **""** ));  
                // 调用一个Future.get()方法等待响应  
                future.get();  
            } **catch** (InterruptedException e) {  
                e.printStackTrace();  
            } **catch** (ExecutionException e) {  
                e.printStackTrace();  
            }  
        }  
  
        // 5. 关闭生产者  
        producer.close();  
    }  
}
c7f8e24a2b827f774598e2de57a39567.png
c7f8e24a2b827f774598e2de57a39567.png

image.png

a6b635388fba60206bc7ab0c112a3bfb.png
a6b635388fba60206bc7ab0c112a3bfb.png

image.png

代码语言:javascript
复制
**public class** KafkaProducerTest {  
    **public static void** main(String[] args) {  
        // 1. 创建用于连接Kafka的Properties配置  
        Properties props = **new** Properties();  
        props.put( **"bootstrap.servers"** , **"node1.itcast.cn:9092"** );  
        props.put( **"acks"** , **"all"** );  
        props.put( **"key.serializer"** , **"org.apache.kafka.common.serialization.StringSerializer"** );  
        props.put( **"value.serializer"** , **"org.apache.kafka.common.serialization.StringSerializer"** );  
  
        // 2. 创建一个生产者对象KafkaProducer  
        KafkaProducer<String, String> producer = **new** KafkaProducer<String, String>(props);  
  
        // 3. 调用send发送1-100消息到指定Topic test  
        **for**(**int** i = 0; i < 100; ++i) {  
            **try** {  
                // 获取返回值Future,该对象封装了返回值  
                Future<RecordMetadata> future = producer.send(**new** ProducerRecord<String, String>( **"test"** , **null**, i + **""** ));  
                // 调用一个Future.get()方法等待响应  
                future.get();  
            } **catch** (InterruptedException e) {  
                e.printStackTrace();  
            } **catch** (ExecutionException e) {  
                e.printStackTrace();  
            }  
        }  
  
        // 5. 关闭生产者  
        producer.close();  
    }  
}
代码语言:javascript
复制
**public class** KafkaProducerTest {  
    **public static void** main(String[] args) {  
        // 1. 创建用于连接Kafka的Properties配置  
        Properties props = **new** Properties();  
        props.put( **"bootstrap.servers"** , **"node1.itcast.cn:9092"** );  
        props.put( **"acks"** , **"all"** );  
        props.put( **"key.serializer"** , **"org.apache.kafka.common.serialization.StringSerializer"** );  
        props.put( **"value.serializer"** , **"org.apache.kafka.common.serialization.StringSerializer"** );  
  
        // 2. 创建一个生产者对象KafkaProducer  
        KafkaProducer<String, String> producer = **new** KafkaProducer<String, String>(props);  
  
        // 3. 调用send发送1-100消息到指定Topic test  
        **for**(**int** i = 0; i < 100; ++i) {  
            // 一、同步方式  
            // 获取返回值Future,该对象封装了返回值  
            // Future<RecordMetadata> future = producer.send(new ProducerRecord<String, String>("test", null, i + ""));  
            // 调用一个Future.get()方法等待响应  
            // future.get();  
  
            // 二、带回调函数异步方式  
            producer.send(**new** ProducerRecord<String, String>( **"test"** , **null**, i + **""** ), **new** Callback() {  
                @Override  
                **public void** onCompletion(RecordMetadata metadata, Exception exception) {  
                    **if**(exception != **null**) {  
                        System.***out***.println( **"** **发送消息出现异常** **"** );  
                    }  
                    **else** {  
                        String topic = metadata.topic();  
                        **int** partition = metadata.partition();  
                        **long** offset = metadata.offset();  
  
                        System.***out***.println( **"** **发送消息到** **Kafka** **中的名字为** **"** + topic + **"** **的主题,第** **"** + partition + **"** **分区,第** **"** + offset + **"** **条数据成功** **!"** );  
                    }  
                }  
            });  
        }  
  
        // 5. 关闭生产者  
        producer.close();  
    }  
}
64a629825d3bd26dd1b1af0b041775da.png
64a629825d3bd26dd1b1af0b041775da.png

image.png

63445e5fa415b3804a9f1798b1421f5f.png
63445e5fa415b3804a9f1798b1421f5f.png

image.png

e6fcf4c3da5bd92939ecf0b11a496799.png
e6fcf4c3da5bd92939ecf0b11a496799.png

image.png

ZK用来管理和协调broker,并且存储了Kafka的元数据(例如:有多少topic、partition、consumer)

ZK服务主要用于通知生产者和消费者Kafka集群中有新的broker加入、或者Kafka集群中出现故障的broker。

Kafka正在逐步想办法将ZooKeeper剥离,维护两套集群成本较高,社区提出KIP-500就是要替换掉ZooKeeper的依赖。“Kafka on Kafka”——Kafka自己来管理自己的元数据

1.1.1  producer(生产者)

生产者负责将数据推送给broker的topic

1.1.2  consumer(消费者)

消费者负责从broker的topic中拉取数据,并自己进行处理

1.1.3  consumer group(消费者组)
ee762be7cc9b33cee5acebacaf1c507b.png
ee762be7cc9b33cee5acebacaf1c507b.png

image.png

5d5afc9956894c806eedd82ac60230e9.png
5d5afc9956894c806eedd82ac60230e9.png

image.png

b6d7966b5099fc4d7863adf0f3a90fdf.png
b6d7966b5099fc4d7863adf0f3a90fdf.png

image.png

13245a957d808283d0ce9dcee156d0ce.png
13245a957d808283d0ce9dcee156d0ce.png

image.png

6c0c63756a1b6a222eb33ed3269631b0.png
6c0c63756a1b6a222eb33ed3269631b0.png

image.png

c3b3bd38ba44dca5556ab96cf6f5fc46.png
c3b3bd38ba44dca5556ab96cf6f5fc46.png

image.png

代码语言:javascript
复制
// 3. 发送1-100数字到Kafka的test主题中  
**while**(**true**) {  
    **for** (**int** i = 1; i <= 100; ++i) {  
        // 注意:send方法是一个异步方法,它会将要发送的数据放入到一个buffer中,然后立即返回  
        // 这样可以让消息发送变得更高效  
        producer.send(**new** ProducerRecord<>( **"test"** , i + **""** ));  
    }  
    Thread.*sleep*(3000);  
}
9e6acda9d94bfdc6fe9d41f126d93fc2.png
9e6acda9d94bfdc6fe9d41f126d93fc2.png

image.png

8a8e70c271044643062c7d1bf3ca5786.png
8a8e70c271044643062c7d1bf3ca5786.png

image.png

ab98a004e2fb14a5ade977f45db6c5d3.png
ab98a004e2fb14a5ade977f45db6c5d3.png

image.png

1445a88281d41317fc3772ff38f8d2b0.png
1445a88281d41317fc3772ff38f8d2b0.png

image.png

e762798a0ca0cd7b5721e24f14c52c2a.png
e762798a0ca0cd7b5721e24f14c52c2a.png

image.png

a57413f3c609a4f7012c6124c9af353e.png
a57413f3c609a4f7012c6124c9af353e.png

image.png

a300f3f8a06b62c0554888b4ad9ebaea.png
a300f3f8a06b62c0554888b4ad9ebaea.png

image.png

c032281f4cb86d04b182a7abf7d6a6ea.png
c032281f4cb86d04b182a7abf7d6a6ea.png

image.png

bf315ef2a5486905769a02b4b4fbe55a.png
bf315ef2a5486905769a02b4b4fbe55a.png

image.png

34b6448f590af06940a95b98d9599c07.png
34b6448f590af06940a95b98d9599c07.png

image.png

代码语言:javascript
复制
*// 1. 创建消费者*

    **public** **static** Consumer **<** String, String **>**  createConsumer() {

         *// 1. 创建Kafka消费者配置*

        Properties **props** **=** **new** Properties();

        **props**.setProperty( **"** **bootstrap.servers** **"** ,  **"** **node1.itcast.cn:9092** **"** );

        **props**.setProperty( **"** **group.id** **"** ,  **"** **ods_user** **"** );

        **props**.put( **"** **isolation.level** **"** , **"** **read_committed** **"** );

        **props**.setProperty( **"** **enable.auto.commit** **"** ,  **"** **false** **"** );

        **props**.setProperty( **"** **key.deserializer** **"** ,  **"** **org.apache.kafka.common.serialization.StringDeserializer** **"** );

        **props**.setProperty( **"** **value.deserializer** **"** ,  **"** **org.apache.kafka.common.serialization.StringDeserializer** **"** );

 

         *// 2. 创建Kafka消费者*

        KafkaConsumer<String, String> **consumer** **=** **new** KafkaConsumer<>(props);

 

         *// 3. 订阅要消费的主题*

        **consumer**.subscribe(**Arrays**.asList( **"** **ods_user** **"** ));

        

        **return** consumer;

}

编写一个方法 createProducer,返回一个生产者对象。注意:需要配置事务的id,开启了事务会默认开启幂等性。

fea6da42b91d5be6c0f48eaa0cde3cd5.png
fea6da42b91d5be6c0f48eaa0cde3cd5.png

image.png

ee49b8ea56c65bd4c83c97775f7b5c19.png
ee49b8ea56c65bd4c83c97775f7b5c19.png

image.png

1.1.1.1  编写代码消费并生产数据

实现步骤:

1. 调用之前实现的方法,创建消费者、生产者对象

2. 生产者调用initTransactions初始化事务

3. 编写一个while死循环,在while循环中不断拉取数据,进行处理后,再写入到指定的topic

(1) 生产者开启事务

(2) 消费者拉取消息

(3) 遍历拉取到的消息,并进行预处理(将1转换为男,0转换为女)

(4) 生产消息到dwd_user topic中

(5) 提交偏移量到事务中

(6) 提交事务

(7) 捕获异常,如果出现异常,则取消事务

代码语言:javascript
复制
**public** **static** void main(String[] args) {

        Consumer<String, String> **consumer** **=** createConsumer();

        Producer<String, String> **producer** **=** createProducer();

         *// 初始化事务*

        **producer**.initTransactions();

 

        **while**(true) {

            **try** {

                 *// 1. 开启事务*

                **producer**.beginTransaction();

                 *// 2. 定义Map结构,用于保存分区对应的offset*

                Map<TopicPartition, OffsetAndMetadata> **offsetCommits** **=** **new** HashMap<>();

                 *// 2. 拉取消息*

                ConsumerRecords<String, String> **records** **=** **consumer**.poll(**Duration**.ofSeconds(2));

                **for** (ConsumerRecord<String, String> **record**  **:**  records) {

                     *// 3. 保存偏移量*

                    **offsetCommits**.put(**new** TopicPartition(**record**.topic(), **record**.partition()),

                            **new** OffsetAndMetadata(**record**.offset() + 1));

                     *// 4. 进行转换处理*

                    String[] **fields** **=** **record**.value().split( **"** **,** **"** );

                    fields[1] **=** fields[1].equalsIgnoreCase( **"** **1** **"** )  **?**   **"** **男** **"** **:** **"** **女** **"** ;

                    String **message** **=** fields[0]  **+**   **"** **,** **"**   **+**  fields[1]  **+**   **"** **,** **"**   **+**  fields[2];

                     *// 5. 生产消息到dwd_user*

                    **producer**.send(**new** ProducerRecord<>( **"** **dwd_user** **"** , message));

                }

                 *// 6. 提交偏移量到事务*

                **producer**.sendOffsetsToTransaction(offsetCommits,  **"** **ods_user** **"** );

                 *// 7. 提交事务*

                **producer**.commitTransaction();

            } **catch** (Exception **e**) {

                 *// 8. 放弃事务*

                **producer**.abortTransaction();

            }

        }

    }
8e8e231d3e89b9af6f3817770ddf1043.png
8e8e231d3e89b9af6f3817770ddf1043.png

image.png

93886463917e48d4bd2d1088962c4c70.png
93886463917e48d4bd2d1088962c4c70.png

image.png

1.  分区和副本机制

1.1  生产者分区写入策略

生产者写入消息到topic,Kafka将依据不同的策略将数据分配到不同的分区中

1. 轮询分区策略

2. 随机分区策略

3. 按key分区分配策略

4. 自定义分区策略

1.1.1  轮询策略
27ea5d2457c8b82ce6e3ed279653eae0.png
27ea5d2457c8b82ce6e3ed279653eae0.png

image.png

f9c1512d4b642b5ca8a29a5c74d4fbe9.png
f9c1512d4b642b5ca8a29a5c74d4fbe9.png

image.png

324239113b130e2b5c2340c1cf5572a1.png
324239113b130e2b5c2340c1cf5572a1.png

image.png

73865c725ae9c84b934903ab7e6395d1.png
73865c725ae9c84b934903ab7e6395d1.png

image.png

926f428f13082eebc09d127d0dfc9e13.png
926f428f13082eebc09d127d0dfc9e13.png

image.png

4e30e11c6993d564e8df2d5113c6d797.png
4e30e11c6993d564e8df2d5113c6d797.png

image.png

0cbea632daf6d9739c88e1790ba2adcb.png
0cbea632daf6d9739c88e1790ba2adcb.png

image.png

4b29fa8ced7162f4dbec6afb73e2e7dd.png
4b29fa8ced7162f4dbec6afb73e2e7dd.png

image.png

指标

意义

Brokers Spread

broker使用率

Brokers Skew

分区是否倾斜

Brokers Leader Skew

leader partition是否存在倾斜

4964434ead544e1b54b95e54bf445b18.png
4964434ead544e1b54b95e54bf445b18.png

image.png

55a434be2ead8241ffc173367d9f96d8.png
55a434be2ead8241ffc173367d9f96d8.png

image.png

1cff013de5aec0b957d66b44e3d3d532.png
1cff013de5aec0b957d66b44e3d3d532.png

image.png

752c1d7b061479ac9660415266540917.png
752c1d7b061479ac9660415266540917.png

image.png

883d4c0e951572ef1a5141d29b1f7a71.png
883d4c0e951572ef1a5141d29b1f7a71.png

image.png

f72a9123ac4493a5222aa5649a428855.png
f72a9123ac4493a5222aa5649a428855.png

image.png

960f61e540abc94f72561df5ae11ccd1.png
960f61e540abc94f72561df5ae11ccd1.png

image.png

b7dbf40490763eeccbea7f15c150a5c5.png
b7dbf40490763eeccbea7f15c150a5c5.png

image.png

指标****

单分区单副本(ack=0)****

单分区单副本(ack=1)****

单分区单副本(ack=-1/all)****

吞吐量

165875.991109/s每秒16.5W条记录

93092.533979/s每秒9.3W条记录

73586.766156 /s每秒7.3W调记录

吞吐速率

158.19 MB/sec

88.78 MB/sec

70.18 MB

平均延迟时间

192.43 ms

346.62 ms

438.77 ms

最大延迟时间

670.00 ms

1003.00 ms

1884.00 ms

2ccb043c3b53770a6645428d3602aa45.png
2ccb043c3b53770a6645428d3602aa45.png

image.png

928f56c1f61b3b02b5e92ee841f407e9.png
928f56c1f61b3b02b5e92ee841f407e9.png

image.png

指标****

单分区单副本(ack=0)****

单分区单副本(ack=1)****

吞吐量

165875.991109 records/sec每秒16.5W条记录

93092.533979 records/sec每秒9.3W条记录

吞吐速率

158.19 MB/sec每秒约160MB数据

88.78 MB/sec每秒约89MB数据

平均延迟时间

192.43 ms avg latency

346.62 ms avg latency

最大延迟时间

670.00 ms max latency

1003.00 ms max latency

fd6b52ae748953d7b557bc30323d3560.png
fd6b52ae748953d7b557bc30323d3560.png
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2024-04-07,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1.  简介
    • 1.1  消息队列简介
      • 1.1.1  什么是消息队列
      • 1.1.1  消息队列中间件
      • 1.1.1  producer(生产者)
      • 1.1.2  consumer(消费者)
      • 1.1.3  consumer group(消费者组)
  • 1.  分区和副本机制
    • 1.1  生产者分区写入策略
      • 1.1.1  轮询策略
相关产品与服务
消息队列
腾讯云消息队列 TDMQ 是分布式架构中的重要组件,提供异步通信的基础能力,通过应用解耦降低系统复杂度,提升系统可用性和可扩展性。TDMQ 产品系列提供丰富的产品形态,包含 CKafka、RocketMQ、RabbitMQ、Pulsar、CMQ 五大产品,覆盖在线和离线场景,满足金融、互联网、教育、物流、能源等不同行业和场景的需求。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档