前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >实战笔记:来一起探究下Kafka是如何实现万亿级海量数据的高并发写入的?

实战笔记:来一起探究下Kafka是如何实现万亿级海量数据的高并发写入的?

作者头像
艾编程
发布2020-06-12 10:37:59
4410
发布2020-06-12 10:37:59
举报
文章被收录于专栏:艾编程艾编程

前两天为大家分享了一篇关于kafka和RocketMQ选型的内容,那么今天就为大家分享,kafkaKafka海量数据解决方案之测试方案和监控集群应用详解,今天的内容和前两天的内容是关联的,推荐一下,可以关注我的账号看前面的内容哦,同时还有视频教程,废话不多说,开始为大家分享实战笔记干货!

测试方案

1、添加model

代码语言:javascript
复制
public class UserDataSource {

    public static void main(String args[]) throws InterruptedException {
        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.90.131:9092");
        props.put("acks", "all");

        props.put("delivery.timeout.ms", 30000);
        props.put("request.timeout.ms", 20000);

        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);

        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");

        Producer<String, byte[]> producer = new KafkaProducer<>(props);


        while (true){
            User car = next();
            byte[] carBinary = ObjectBinaryUtil.toBinary(car);

            ProducerRecord<String, byte[]> record = new ProducerRecord<String, byte[]>(
                    "user",
                    car.getId(),
                    carBinary);
            producer.send(record);

            Thread.sleep(200);
            System.out.println("published...");
        }

        //producer.close();
    }

    private static User next (){
        Random random =  new Random();
        User u = new User(
                random.nextInt(10) + "",
                true,
                "",
                1);
        return u;
    }
}

2、生成数据

代码语言:javascript
复制
Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.90.131:9092");
        props.put("acks", "all");

        props.put("delivery.timeout.ms", 30000);
        props.put("request.timeout.ms", 20000);

        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);

        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");

        Producer<String, byte[]> producer = new KafkaProducer<>(props);


        while (true){
            User car = next();
            byte[] carBinary = ObjectBinaryUtil.toBinary(car);

            ProducerRecord<String, byte[]> record = new ProducerRecord<String, byte[]>(
                    "user",
                    car.getId(),
                    carBinary);
            producer.send(record);

            Thread.sleep(200);
            System.out.println("published...");
        }

        //producer.close();
    }

    private static User next (){
        Random random =  new Random();
        User u = new User(
                random.nextInt(10) + "",
                true,
                "",
                1);
        return u;
    }

3、创建topic

代码语言:javascript
复制
bin/kafka-topics.sh --create \
  --bootstrap-server 192.168.90.131:9092 \
  --replication-factor 1 \
  --partitions 3 \
  --topic user

4、添加CarConsume

代码语言:javascript
复制
public static void main(String args[]){
        //要消费的topic名称
        String topic = "user";

        List<TopicPartition> partitions = new ArrayList<>();
        for (int i=0; i<3; i++){
            //构建partition 对象
            TopicPartition p = new TopicPartition(topic, i);
            partitions.add(p);
        }

        //目标表
        String targetTable = "user";

        //实例化exact once consumer
        ExactOnceConsumer<Electrocar> exactConsumer = 
        new ExactOnceConsumer(topic, partitions, targetTable);

        //从指定offset开始消费
        exactConsumer.seek();

        //开始消费
        exactConsumer.subscribe();
    }

5、添加 kafka.user 表

1 2 3 4 5 6 7 8 9 10

drop table user; CREATE TABLE `user` ( `topic` varchar(20) DEFAULT NULL, `pid` int(11) DEFAULT NULL, `offset` mediumtext, `id` int(11) DEFAULT NULL, `gender` tinyint(1) DEFAULT NULL, `name` varchar(20) DEFAULT NULL, `age` int DEFAULT NULL ) ENGINE=InnoDB DEFAULT CHARSET=utf8;

6、添加 UserConsume

#直接 拷贝CarConsume

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25

public class UserConsume { public static void main(String args[]){ //要消费的topic名称 String topic = "user"; List<TopicPartition> partitions = new ArrayList<>(); for (int i=0; i<3; i++){ //构建partition 对象 TopicPartition p = new TopicPartition(topic, i); partitions.add(p); } //目标表 String targetTable = "user"; //实例化exact once consumer ExactOnceConsumer<Electrocar> exactConsumer = new ExactOnceConsumer(topic, partitions, targetTable); //从指定offset开始消费 exactConsumer.seek(); //开始消费 exactConsumer.subscribe(); } }

7、 完善seek

seek 中offset还是写死的,应该从MySQL获取最新的offset

1

SQL: select max(offset+0) from kafka.electrocar where pid=1;

代码语言:javascript
复制
public long offsetByPartition(TopicPartition p){
        String sql = String.format("select max(offset+0) from %s where pid=%d", this.targetTable, p.partition());

        Statement stat = null;
        try {
            stat = jdbcConn.createStatement();
            ResultSet rs = stat.executeQuery(sql);

            if (rs.next()){
                return rs.getInt(1);
            }
        } catch (SQLException e) {
            if (stat !=null){
                try {
                    stat.close();
                } catch (SQLException e1) {
                    e1.printStackTrace();
                }
            }
        }
        return 0;
    }

8、测试offset边界

代码语言:javascript
复制
#清理数据
delete from kafka.electrocar;

执行carConsume
停止carConsume

#查看是否有重复数据
select pid,offset,count(*) ct 
from kafka.electrocar 
group by pid,offset 
having ct>1;

监控 集群/应用

1 、安装 KafkaOffsetMonitor

特点:权限小、侵入性小,快速实现必要的功能

在GitHub中 搜KafkaOffsetMonitor

注意:KafkaOffsetMonitor中引入了一些外网的js\css 文件,导致你的web异常

代码语言:javascript
复制
java -Xms512M -Xmx512M -Xss1024K -cp KafkaOffsetMonitor-assembly-0.2.1.jar com.quantifind.kafka.offsetapp.OffsetGetterWeb \
--port 8088 \
--zk 192.168.90.131:2181 \
--refresh 5.minutes \
--retain 1.da

KafkaOffsetMonitor 不仅可以监控集群状态,还可以帮我们监控消费进度

只要把进度写到 ZK 的

/consumers/{group_id}/offsets/{Topic}/

2 、获取最新消费进度

哪里可以获取消费进度呢?MySQL中不太好使用

代码语言:javascript
复制
Demo03:

consumer.commitAsync();   
要提交监督,说明consumer一定是有这个进度在内存

这段代码获取offset
this.subscriptions.allConsumed()

private subscriptions 无法使用,用反射获取
            Field f = KafkaConsumer.class.getDeclaredField("subscriptions");
            f.setAccessible(true);
            SubscriptionState subState = (SubscriptionState) f.get(consumer);

#执行allConsumed();

遍历
for (TopicPartition p : latestOffsets.keySet()){
    if (latestOffsets.containsKey(p)){
        long offset = latestOffsets.get(p).offset();
        System.out.println(String.format("pid:%d,offset:%d", 
                p.partition(), 
                offset));
    }
}

封装

代码语言:javascript
复制
/添加字段
private SubscriptionState subState;

private void setSubState(){
        try {
            Field f = KafkaConsumer.class.getDeclaredField("subscriptions");
            f.setAccessible(true);
            this.subState = (SubscriptionState) f.get(this.kafkaConsumer);
        } catch (NoSuchFieldException e) {
            e.printStackTrace();
        } catch (IllegalAccessException e) {
            e.printStackTrace();
        }
    }

//在init 调用
setSubState();
System.out.println("Sub state inited...");

3、减小ZK的压力

(1)、实时更新ZK好吗? 不好,ZK的读、写都是事务

要添加一个线程每3min更新一次,添加

1 2 3

public class ZkUptThread extends Thread{ }

实战笔记:Kafka是如何实现十几万的海量数据的高并发写入的?
实战笔记:Kafka是如何实现十几万的海量数据的高并发写入的?
代码语言:javascript
复制
//内存中试试更新的Offset
    public Map<TopicPartition, Long> imOffsets = new ConcurrentHashMap<>();
    
    //记录ZooKeeper中的Offset
    public Map<TopicPartition, Long> zkOffsets = new HashMap<>();

4、更新 InMemoryOffset

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15

// 在 ZkUptThread 中 public void uptIMOffset(SubscriptionState subs){ //执行allConsumed Map<TopicPartition, OffsetAndMetadata> latestOffsets = subs.allConsumed(); for (TopicPartition p : latestOffsets.keySet()){ if (latestOffsets.containsKey(p)){ long offset = latestOffsets.get(p).offset(); this.imOffsets.put(p, offset); } } } // exactOnceConsumer.subscribe 中调用uptIMOffset

5 run方法逻辑

offset未更新时,就不需要更新ZK

代码语言:javascript
复制
@Override
    public void run() {
        // 写成 一个循环
        while (true){
            try {
                for (Map.Entry<TopicPartition, Long> entry : imOffsets.entrySet()) {
                    long imOffset = entry.getValue();   //内存中offset

                    //若zkOffset 和 imOffset 相等,不作操作
                    if (zkOffsets.containsKey(entry.getKey())&&
                            zkOffsets.get(entry.getKey()) == imOffset){
                        continue;
                    }else{
                        //否则,更新 zk 中的offset
                        uptZk(entry.getKey(), imOffset);
                        zkOffsets.put(entry.getKey(), imOffset);
                    }
                }
                Thread.sleep(1000*10);
                System.out.println("ZkUpdThread loop once ...");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

6、更新ZooKeeper

代码语言:javascript
复制
依赖:
<dependency>
      <groupId>org.apache.curator</groupId>
      <artifactId>curator-recipes</artifactId>
      <version>4.0.0</version>
    </dependency>
    
   <dependencyManagement>
    <dependencies>
      <dependency>
        <groupId>org.apache.zookeeper</groupId>
        <artifactId>zookeeper</artifactId>
        <version>3.4.13</version>
      </dependency>
    </dependencies>
  </dependencyManagement>
代码语言:javascript
复制
依赖:
<dependency>
      <groupId>org.apache.curator</groupId>
      <artifactId>curator-recipes</artifactId>
      <version>4.0.0</version>
    </dependency>
    
   <dependencyManagement>
    <dependencies>
      <dependency>
        <groupId>org.apache.zookeeper</groupId>
        <artifactId>zookeeper</artifactId>
        <version>3.4.13</version>
      </dependency>
    </dependencies>
  </dependencyManagement>
代码语言:javascript
复制
//添加字段 zkClient
private CuratorFramework zkClient;


//在狗仔函数中实例化 curator
public ZkUptThread(){
    //retry10次,每次等5s
    RetryPolicy retry =  new RetryNTimes(10,5000);
    //创建curator 实例
    zkClient = CuratorFrameworkFactory.newClient("192.168.90.131:2181",retry);
}
代码语言:javascript
复制
private void uptZk(TopicPartition partition, long offset){
        //拼接要更新的路径
        String path = String.format("/consumers/%s/offsets/%s/%d",groupid, topic, partition.partition());
        try {
            byte[] offsetBytes = String.format("%d",offset).getBytes();

            if (zkClient.checkExists().forPath(path) != null){
                //upd
                zkClient.setData().forPath(path,offsetBytes);
                System.out.println("update offset Znode...");
            }else{
                //insert
                zkClient.create().creatingParentsIfNeeded()
                        .withMode(CreateMode.PERSISTENT)
                        .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
                        .forPath(path,offsetBytes);
                System.out.println("add offset Znode...");
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

7、在ExactOnceConsumer 中创建线程

代码语言:javascript
复制
#添加字段
private ZkUptThread zkUptThread;

#写set方法
private void setZkUptThread(){
    zkUptThread = new ZkUptThread(topic,groupid);
    zkUptThread.start();
}

#在init犯法中调用 setZkUptThread
setZkUptThread();
System.out.println("uptZK Thread started...");

#在subscribe方法中,每次循环后都要调用
this.zkUptThread.uptIMOffset(subState);

线上经验分享:流量激增、多网卡方案

实战笔记:Kafka是如何实现十几万的海量数据的高并发写入的?
实战笔记:Kafka是如何实现十几万的海量数据的高并发写入的?
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2020-05-22 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 测试方案
  • 1、添加model
    • 2、生成数据
      • 3、创建topic
        • 4、添加CarConsume
          • 5、添加 kafka.user 表
            • 6、添加 UserConsume
              • 7、 完善seek
                • 8、测试offset边界
                • 监控 集群/应用
                  • 1 、安装 KafkaOffsetMonitor
                    • 2 、获取最新消费进度
                      • 3、减小ZK的压力
                        • 4、更新 InMemoryOffset
                          • 5 run方法逻辑
                            • 6、更新ZooKeeper
                              • 7、在ExactOnceConsumer 中创建线程
                              • 线上经验分享:流量激增、多网卡方案
                              相关产品与服务
                              云数据库 MySQL
                              腾讯云数据库 MySQL(TencentDB for MySQL)为用户提供安全可靠,性能卓越、易于维护的企业级云数据库服务。其具备6大企业级特性,包括企业级定制内核、企业级高可用、企业级高可靠、企业级安全、企业级扩展以及企业级智能运维。通过使用腾讯云数据库 MySQL,可实现分钟级别的数据库部署、弹性扩展以及全自动化的运维管理,不仅经济实惠,而且稳定可靠,易于运维。
                              领券
                              问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档