爬虫架构|利用Kafka处理数据推送问题(2)

在前一篇文章爬虫架构|利用Kafka处理数据推送问题(1)中对Kafka做了一个介绍,以及环境搭建,最后是选择使用阿里云的Kafka,这一篇文章继续说使用阿里云的Kafka的一些知识。

一、发布者最佳实践

发布的完整代码(根据自己的业务做相应处理):

package com.yimian.controller.kafka;

import java.util.Date;
import java.util.Properties;
import java.util.concurrent.Future;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.SslConfigs;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import com.alibaba.fastjson.JSON;
import com.yimian.model.SpiderData;

/**
 * 生产者
 * 
 * @author huangtao
 *
 */
@Controller
@RequestMapping(value = "kafka/producer")
public class KafkaProducerController {

    private static Producer<String, String> producer;
    private static Properties kafkaProperties;

    static {
        // 设置sasl文件的路径
        JavaKafkaConfigurer.configureSasl();
        // 加载kafka.properties
        kafkaProperties = JavaKafkaConfigurer.getKafkaProperties();

        Properties props = new Properties();
        // 设置接入点,请通过控制台获取对应Topic的接入点
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getProperty("bootstrap.servers"));
        // 设置SSL根证书的路径,请记得将XXX修改为自己的路径
        // 与sasl路径类似,该文件也不能被打包到jar中
        props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, kafkaProperties.getProperty("ssl.truststore.location"));
        // 根证书store的密码,保持不变
        props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "KafkaOnsClient");
        // 接入协议,目前支持使用SASL_SSL协议接入
        props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
        // SASL鉴权方式,保持不变
        props.put(SaslConfigs.SASL_MECHANISM, "ONS");
        // Kafka消息的序列化方式
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        // 请求的最长等待时间
        props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 30 * 1000);

        // 构造Producer对象,注意,该对象是线程安全的,一般来说,一个进程内一个Producer对象即可;
        // 如果想提高性能,可以多构造几个对象,但不要太多,最好不要超过5个
        producer = new KafkaProducer<String, String>(props);
    }
    
    /**
     * 发送消息给kafka
     * @param topic
     * @param msg
     */
    public static void sendMsgToKafka(String topic, SpiderData msg) {
        try {
            // 发送消息,并获得一个Future对象
            Future<RecordMetadata> metadataFuture = producer.send(new ProducerRecord<String, String>(topic, String.valueOf(new Date().getTime()),
                    JSON.toJSONString(msg)));
            // 同步获得Future对象的结果
            RecordMetadata recordMetadata = metadataFuture.get();
            System.out.println("Produce ok:" + recordMetadata.toString());
        } catch (Exception e) {
            /**
             * 要考虑重试~
             * 在分布式环境下,由于网络等原因,偶尔的发送失败是常见的。这种失败有可能是消息已经发送成功,但是 Ack 失败,也有可能是确实没发送成功。
             * 消息队列 Kafka 是 VIP 网络架构,会主动掐掉空闲连接(一般 30 秒没活动),也就是说,不是一直活跃的客户端会经常收到”connection rest by peer”这样的错误,因此建议都考虑重试。
             */
            // 参考常见报错:
            // https://help.aliyun.com/document_detail/68168.html?spm=a2c4g.11186623.6.567.2OMgCB
            System.out.println("error occurred");
            e.printStackTrace();
        }
    }

    @RequestMapping(value = "init", produces = "text/html;charset=UTF-8")
    @ResponseBody
    public void init() {
        // 构造一个Kafka消息
        String topic = kafkaProperties.getProperty("topic"); // 消息所属的Topic,请在控制台申请之后,填写在这里
        SpiderData data = new SpiderData();
        data.setDescUrl("www.baidu.com");
        data.setTitle("百度");

        sendMsgToKafka(topic, data);
    }
}

Kafka的发送非常简单,代码片段如下:

Future<RecordMetadata> metadataFuture = producer.send(new ProducerRecord<String, String>(
    topic,   \\ topic
    null,    \\ 分区编号,这里最好为 null,交给 producer 去分配
    System.currentTimeMillis(), \\时间戳
    String.valueOf(message.hashCode()), \\ key,可以在控制台通过这个 Key 查找消息,这个 key 最好唯一;
    message)); \\ value,消息内容

message可以是一个JSON类型的对象,如上面例子中的JSON.toJSONString(new SpiderData())

1.1、Key 和 Value

Kafka 0.10.0.0 的消息字段只有两个:Key 和 Value。为了便于追踪,重要消息最好都设置一个唯一的 Key。通过 Key 追踪某消息,打印发送日志和消费日志,了解该消息的发送和消费情况;更重要的是,您可以在控制台可以根据 Key 查询消息的内容。

1.2、失败重试

在分布式环境下,由于网络等原因,偶尔的发送失败是常见的。这种失败有可能是消息已经发送成功,但是 Ack 失败,也有可能是确实没发送成功。 消息队列 Kafka 是 VIP 网络架构,会主动掐掉空闲连接(一般 30 秒没活动),也就是说,不是一直活跃的客户端会经常收到”connection rest by peer”这样的错误,因此建议都考虑重试。

1.3、异步发送

需要注意的是这个接口是异步发送的;如果你想得到发送的结果,可以调用metadataFuture.get(timeout, TimeUnit.MILLISECONDS)

1.4、线程安全

Producer 是线程安全的,且可以往任何 Topic 发送消息。一般一个应用,对应一个 Producer 就足够了。

1.5、Ack

消息队列 Kafka 没有考虑这个参数,都认为是“all”,即所有消息同步到 Slave 节点后才会返回成功的确认消息给客户端。

1.6、Batch

Batch 的基本思路是:把消息缓存在内存中,并进行打包发送。Kafka 通过 Batch 来提高吞吐,但同时也会增加延迟,生产时应该对两者予以权衡。 在构建 Producer 时,需要考虑以下两个参数:

  • batch.size : 发往每个 Partition 的消息个数缓存量达到这个数值时,就会触发一次网络请求,把消息真正发往服务器;
  • linger.ms : 每个消息待在缓存中的最大时间,超过这个时间,就会忽略 batch.size 的限制,立即把消息发往服务器。

由此可见,Kafka 什么时候把消息真正发往服务器,是通过上面两个参数共同决定的; batch.size 有助于提高吞吐,linger.ms 有助于控制延迟。您可以根据具体业务进行调整。

1.7、OOM

结合 Kafka Batch 的设计思路,Kafka 会缓存消息并打包发送,如果缓存太多,则有可能造成 OOM。

  • buffer.memory : 所有缓存消息的总体大小超过这个数值后,就会触发把消息发往服务器。此时会忽略 batch.sizelinger.ms 的限制。
  • buffer.memory 的默认数值是 32M,对于单个 Producer 来说,可以保证足够的性能。需要注意的是,如果你在同一个 JVM 中启动多个 Producer,那么每个 Producer 都有可能占用32M 缓存空间,此时便有可能触发 OOM。
  • 在生产时,一般没有必要启动多个 Producer;如果特殊情况需要,则需要考虑buffer.memory的大小,避免触发 OOM。

1.8、分区顺序

单个分区内,消息是按照发送顺序储存的,是基本有序的。 但消息队列 Kafka 并不保证单个分区内绝对有序,所以在某些情况下,会发生少量消息乱序。比如:消息队列 Kafka 为了提高可用性,某个分区挂掉后把消息 Failover 到其它分区。

二、订阅者最佳实践

消费的完整代码(根据自己的业务做相应处理):

package com.yimian.controller.kafka;

import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.SslConfigs;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.yimian.model.SpiderData;

/**
 * 消费者
 * 
 * @author huangtao
 *
 */
@Controller
@RequestMapping(value = "kafka/consumer")
public class KafkaConsumerController {

    private static Consumer<String, String> consumer;

    static {
        // 设置sasl文件的路径
        JavaKafkaConfigurer.configureSasl();

        // 加载kafka.properties
        Properties kafkaProperties = JavaKafkaConfigurer.getKafkaProperties();

        Properties props = new Properties();
        // 设置接入点,请通过控制台获取对应Topic的接入点
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getProperty("bootstrap.servers"));
        // 设置SSL根证书的路径,请记得将XXX修改为自己的路径
        // 与sasl路径类似,该文件也不能被打包到jar中
        props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, kafkaProperties.getProperty("ssl.truststore.location"));
        // 根证书store的密码,保持不变
        props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "KafkaOnsClient");
        // 接入协议,目前支持使用SASL_SSL协议接入
        props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
        // SASL鉴权方式,保持不变
        props.put(SaslConfigs.SASL_MECHANISM, "ONS");
        // 两次poll之间的最大允许间隔
        // 请不要改得太大,服务器会掐掉空闲连接,不要超过30000
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 25000);
        // 每次poll的最大数量
        // 注意该值不要改得太大,如果poll太多数据,而不能在下次poll之前消费完,则会触发一次负载均衡,产生卡顿
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 30);
        // 消息的反序列化方式
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringDeserializer");
        // 当前消费实例所属的消费组,请在控制台申请之后填写
        // 属于同一个组的消费实例,会负载消费消息
        props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaProperties.getProperty("group.id"));

        // 构造消息对象,也即生成一个消费实例
        consumer = new KafkaConsumer<String, String>(props);

        // 设置消费组订阅的Topic,可以订阅多个
        // 如果GROUP_ID_CONFIG是一样,则订阅的Topic也建议设置成一样
        List<String> subscribedTopics = new ArrayList<String>();
        // 如果需要订阅多个Topic,则在这里add进去即可
        // 每个Topic需要先在控制台进行创建
        subscribedTopics.add(kafkaProperties.getProperty("topic"));
        consumer.subscribe(subscribedTopics);
    }
    
    @RequestMapping(value = "init", produces = "text/html;charset=UTF-8")
    @ResponseBody
    public void init() {
        // 循环消费消息
        while (true) {
            try {
                ConsumerRecords<String, String> records = consumer.poll(1000);
                // 必须在下次poll之前消费完这些数据, 且总耗时不得超过SESSION_TIMEOUT_MS_CONFIG
                // 建议开一个单独的线程池来消费消息,然后异步返回结果
                for (ConsumerRecord<String, String> record : records) {
                    JSONObject jsonMsg = JSON.parseObject(record.value());  
                    SpiderData spiderData = JSONObject.toJavaObject(jsonMsg, SpiderData.class);  
                    
                    System.out.println(spiderData.toString());
                }
            } catch (Exception e) {
                try {
                    Thread.sleep(1000);
                } catch (Throwable ignore) {

                }
                // 参考常见报错:
                // https://help.aliyun.com/document_detail/68168.html?spm=a2c4g.11186623.6.567.2OMgCB
                e.printStackTrace();
            }
        }
    }
}

消费时把JSON数据反序列化:

for (ConsumerRecord<String, String> record : records) {
    JSONObject jsonMsg = JSON.parseObject(record.value());  
    SpiderData spiderData = JSONObject.toJavaObject(jsonMsg, SpiderData.class);  
}

2.1、消费消息基本流程

Kafka 订阅者在订阅消息时的基本流程是:

  1. Poll 数据
  2. 执行消费逻辑
  3. 再次 poll 数据

2.2、负载消费

每个 Consumer Group 可以包含多个消费实例,也即可以启动多个 Kafka Consumer,并把参数 group.id 设置成相同的值。属于同一个 Consumer Group 的消费实例会负载消费订阅的 topic。

示例1:Consumer Group A 订阅了 Topic A,并开启三个消费实例 C1、C2、C3,则发送到 Topic A 的每条消息最终只会传给 C1、C2、C3 的某一个。Kafka 默认会均匀地把消息传给各个消息实例,以做到消费负载均衡。

Kafka 负载消费的内部原理是,把订阅的 Topic 的分区,平均分配给各个消费实例。因此,消费实例的个数不要大于分区的数量,否则会有实例分配不到任何分区而处于空跑状态。这个负载均衡发生的时间,除了第一次启动上线之外,后续消费实例发生重启、增加、减少等变更时,都会触发一次负载均衡。

消息队列 Kafka 分区的数量至少是 16 个,已经足够满足大部分用户的需求,且云上服务会根据容量调整分区数。

2.3、多个订阅

一个 Consumer Group 可以订阅多个 Topic。一个 Topic 也可以被多个 Consumer Group 订阅,且各个 Consumer Group 独立消费 Topic 下的所有消息。

示例1:Consumer Group A 订阅了 Topic A,Consumer Group B 也订阅了 Topic A,则发送到 Topic A 的每条消息,不仅会传一份给 Consumer Group A 的消费实例,也会传一份给 Consumer Group B 的消费实例,且这两个过程相互独立,相互没有任何影响。

2.4、消费位点

每个 Topic 会有多个分区,每个分区会统计当前消息的总条数,这个称为最大位点 MaxOffset。Kafka Consumer 会按顺序依次消费分区内的每条消息,记录已经消费了的消息条数,称为ConsumerOffset。

剩余的未消费的条数(也称为消息堆积量) = MaxOffset - ConsumerOffset

2.5、位点提交

Kafka 消费者有两个相关参数:

  • enable.auto.commit:默认值为 true。
  • auto.commit.interval.ms: 默认值为 1000,也即 1s。

这两个参数组合的结果就是,每次 poll 时,再拉取数据前会预先做下面这件事:

  • 检查上次提交位点的时间,如果距离当前时间已经超过 auto.commit.interval.ms,则启动位点提交动作;

因此,如果 enable.auto.commit 设置为 true,需要在每次 poll 时,确保前一次 poll 出来的数据已经消费完毕,否则可能导致位点跳跃;

如果想自己控制位点提交,则把 enable.auto.commit 设为 false,并调用 commit(offsets)函数自行控制位点提交。

2.6、消息重复以及消费幂等

Kafka 消费的语义是 “At Lease Once”, 也就是至少投递一次,保证消息不丢,但是不会保证消息不重复。在出现网络问题、客户端重启时均有可能出现少量重复消息,此时应用消费端,如果对消息重复比较敏感(比如说订单交易类),则应该做到消息幂等。

以数据库类应用为例,常用做法是:

  • 发送消息时,传入 key 作为唯一流水号ID;
  • 消费消息时,判断 key 是否已经消费过,如果已经消费过了,则忽略,如果没消费过,则消费一次;

当然,如果应用本身对少量消息重复不敏感,则不需要做此类幂等检查。

2.7、消费失败

Kafka 是按分区一条一条消息顺序向前消费推进的,如果消费端拿到某条消息后消费逻辑失败,比如应用服务器出现了脏数据,导致某条消息处理失败,等待人工干预,该怎么办呢?

  • 如果失败后一直尝试再次执行消费逻辑,则有可能造成消费线程阻塞在当前消息,无法向前推进,造成消息堆积;
  • 由于 Kafka 自身没有处理失败消息的设计,实践中通常会打印失败的消息、或者存储到某个服务(比如创建一个 Topic 专门用来放失败的消息),然后定时 check 失败消息的情况,分析失败原因,根据情况处理。

2.8、消费阻塞以及堆积

消费端最常见的问题就是消费堆积,最常造成堆积的原因是:

  • 消费速度跟不上生产速度,此时应该提高消费速度,详情见本文下一节<提高消费速度>;
  • 消费端产生了阻塞;

消费端拿到消息后,执行消费逻辑,通常会执行一些远程调用,如果这个时候同步等待结果,则有可能造成一直等待,消费进程无法向前推进。

消费端应该竭力避免堵塞消费线程,如果存在等待调用结果的情况,设置等待的超时时间,超过时间后,作消费失败处理。

2.9、提高消费速度

提高消费速度有两个办法:

  • 增加 Consumer 实例个数;
  • 增加消费线程;

增加 Consumer 实例,可以在进程内直接增加(需要保证每个实例一个线程,否则没有太大意义),也可以部署多个消费实例进程;需要注意的是,实例个数超过分区数量后就不再能提高速度,将会有消费实例不工作;

增加 Consumer 实例本质上也是增加线程的方式来提升速度,因此更加重要的性能提升方式是增加消费线程,最基本的方式如下:

  1. 定义一个线程池;
  2. poll 数据;
  3. 把数据提交到线程池进行并发处理;
  4. 等并发结果返回成功再次poll数据执行。

2.10消息过滤

Kafka 自身没有消息过滤的语义。实践中可以采取以下两个办法:

  • 如果过滤的种类不多,可以采取多个 Topic 的方式达到过滤的目的;
  • 如果过滤的种类多,则最好在客户端业务层面自行过滤。

实践中根据业务具体情况进行选择,可以综合运用上面两种办法。

2.11、消息广播

Kafka 自身没有消息广播的语义,可以通过创建不同的 Consumer Group来模拟实现。

2.12、订阅关系

同一个 Consumer Group 内,各个消费实例订阅的 Topic 最好保持一致,避免给排查问题带来干扰。


参考资料:阿里云消息队列Kafka的帮助文档

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏Web项目聚集地

三周学会小程序第五讲:登录的原理和实现

前面我们耗费在环境搭建上面已经很多时间,这一讲开始真正的和小程序功能对接。 登录便是小程序的开始,小程序可以方便的使用微信登录,获取用户的个人信息,这样我们就能...

1302
来自专栏黄Java的地盘

提高代码质量——使用Jest和Sinon给已有的代码添加单元测试

在日常的功能开发中,我们的代码测试都依赖于自己或者QA进行测试。这些操作不仅费时费力,而且还依赖开发者自身的驱动。在开发一些第三方依赖的库时,我们也没有办法给第...

2030
来自专栏转载gongluck的CSDN博客

Brpc学习:简单回显服务器/客户端

sudo apt-get install git g++ make libssl-dev sudo apt-get install realpath libgf...

2.3K6
来自专栏北京马哥教育

深入浅出:Linux设备驱动之中断与定时器

“我叮咛你的 你说 不会遗忘 你告诉我的 我也全部珍藏 对于我们来说 记忆是飘不落的日子 永远不会发黄 相聚的时候 总是很短 期待的时候 总是很长 岁月的溪水边...

4279
来自专栏陈树义

Linux学习总结(十一)—— Linux常用命令:版本信息查看(RedHat、CentOS、Debian、Ubuntu、Fedora、Oracle)

这篇文章收集了CentOS、Oracle、RedHat等系统查看发行版本、内核版本、位数的方法,欢迎补充。 系统 发行版本 -- 内核版本、位数 ...

3705
来自专栏Java帮帮-微信公众号-技术文章全总结

day07.HDFS学习【大数据教程】

分而治之:将大文件、大批量文件,分布式存放在大量服务器上,以便于采取分而治之的方式对海量数据进行运算分析;

2224
来自专栏匠心独运的博客

消息中间件—RabbitMQ(集群监控篇1)

摘要:任何没有监控的系统上线,一旦在生产环境发生故障,那么排查和修复问题的及时性将无法得到保证

2913
来自专栏Spark学习技巧

Kafka源码系列之通过源码分析Producer性能瓶颈

Kafka源码系列之通过源码分析Producer性能瓶颈 本文,kafka源码是以0.8.2.2,原因是浪尖一直没对kafka系统进行升级。主要是java的ka...

3056
来自专栏MessageQueue

消息中间件核心实体(0)

从之前讨论的架构来说,消息中间件也是有主从复制这个模块的,像Rocket就支持主从模式。

1054
来自专栏IT技术精选文摘

JVM致命错误日志(hs_err_pid.log)分析

当jvm出现致命错误时,会生成一个错误文件 hs_err_pid<pid>.log,其中包括了导致jvm crash的重要信息,可以通过分析该文件定位到导致cr...

6635

扫码关注云+社区

领取腾讯云代金券