前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Kafka 客户端开发

Kafka 客户端开发

作者头像
IT技术小咖
发布2019-06-26 15:51:24
1.2K0
发布2019-06-26 15:51:24
举报
文章被收录于专栏:码上修行码上修行

前两篇文章讲述了 Kafka 的 工作机制 和 服务器集群部署。至此,Kafka 服务器已就绪,本文分别以官方API、Spring、SpringBoot三种构建方式,讲述了 Kafka 消费生产者和消费者的开发。

1 开发概述

Kafka 中,客户端与服务端是通过 TCP 协议进行的; Kafka 公布了所有功能协议(与特定语言无关),并把 Java 客户端作为 kafka 项目的一部分进行维护。 其他非 Java 语言的客户端则作为独立的开源项目提供,非 Java 客户端的名单可在 这里。

Kafka 提供了五类 API:

  • Producer API: 向主题(一个或多个)发布消息;
  • Consumer API: 订阅主题(一个或多个),拉取这些主题上发布的消息;
  • Stream API: 作为流处理器,从主题消费消息,向主题发布消息,把输出流转换为输入流;可参考 例子;
  • Connect API: 作为下游或上游,把主题连接到应用程序或数据系统(比如关系数据库),通常不需要直接使用这些API,而是使用 现成的连接器;
  • AdminClient API: 管理(或巡查) topic, brokers, 或其他 kafka 对象;

2 基于官方 API 开发

2.1 Maven 依赖

代码语言:javascript
复制
<dependency>
    <groupId>ch.qos.logback</groupId>
    <artifactId>logback-classic</artifactId>
    <version>1.2.3</version>
</dependency>
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>1.0.0</version>
</dependency>

2.2 logback.xml(日志配置,可选)

代码语言:javascript
复制
<?xml version="1.0" encoding="UTF-8"?>
<configuration scan="false">
    <contextName>logback</contextName>
    <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
        <encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
            <pattern>%d{HH:mm:ss.SSS}  %p  [%F:%L] - %m%n</pattern>
        </encoder>
    </appender>
    <root level="INFO">
        <appender-ref ref="CONSOLE" />
    </root>
</configuration>

2.3 演示类 KafkaClientDemo.java

生产者:相应函数为 KafkaClientDemo.producerDemo(),其中 props 完整参数配置项见 Producer Configs 消费者:相应函数为 KafkaClientDemo.consumerDemo(),其中 props 完整参数配置项见 New Consumer Configs 和 Old Consumer Configs

代码语言:javascript
复制
public class KafkaClientDemo {
    private static final Logger logger = LoggerFactory.getLogger(KafkaClientDemo.class);
    private static final String BROKER_SERVERS = "centos:9091,centos:9092,centos:9093";
    private static final String TOPIC_NAME = "topicName";
    public static void producerDemo() {
        // 配置选项
        Properties props = new Properties();
        props.put("bootstrap.servers", BROKER_SERVERS); // [必填] Kafka Broker 地址列表
        props.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer"); // [必填] KEY 的序列化类
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // [必填] VALUE 的序列化类
        // props.put("partitioner.class", "org.apache.kafka.clients.producer.internals.DefaultPartitioner"); // [默认值] 结合主题的分区个数和KEY,使得消息平均地分配给分区
        Producer<Integer, String> producer = new KafkaProducer<Integer, String>(props); // 建立连接
        for (int id = 1; id <= 8; ++id) {
            final int key = id;
            final String value = String.format("msg#%d", key);
            ProducerRecord<Integer, String> record = new ProducerRecord<Integer, String>(TOPIC_NAME, key, value);
            producer.send(record, new Callback() {
                public void onCompletion(RecordMetadata meta, Exception e) {
                    logger.info("KafkaProducer.push(\"{}\", {}, {}, {}, \"{}\") OK.",
                            meta.topic(), meta.partition(), meta.offset(), key, value);
                }
            }); // 推送消息
        }
        producer.flush(); // 提交
        producer.close(); // 关闭连接
    }
    public static void consumerDemo() {
        // 配置选项
        Properties props = new Properties();
        props.put("bootstrap.servers", BROKER_SERVERS); // [必填] Kafka Broker 地址列表
        props.put("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer"); // [必填] KEY 的反序列化类
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // [必填] VALUE 的反序列化类
        props.put("group.id", "groupName"); // [必填] 本消费者所属分组
        // 开始消费
        KafkaConsumer<Integer, String> consumer = new KafkaConsumer<Integer, String>(props); // 建立连接
        consumer.subscribe(Arrays.asList(TOPIC_NAME)); // 订阅主题
        while (true) {
            ConsumerRecords<Integer, String> records = consumer.poll(1000); // 拉取消息
            if (records.isEmpty()) {
                break;
            }
            logger.info("KafkaConsumer.poll({}) OK.", records.count());
            for (ConsumerRecord<Integer, String> record : records) {
                logger.info("KafkaConsumer.poll(\"{}\", {}, {}, {}, \"{}\") OK.",
                        record.topic(), record.partition(), record.offset(),
                        record.key(), record.value());
            }
        }
        consumer.close(); // 关闭连接
    }
    public static void main(String[] args) {
        KafkaClientDemo.producerDemo();
        KafkaClientDemo.consumerDemo();
    }
}

2.4 运行结果

代码语言:javascript
复制
## 生产者
11:36:56.345  INFO  [KafkaClientDemo.java:36] - KafkaProducer.push("topicName", 0, 3, 1, "msg#1") OK.
11:36:56.346  INFO  [KafkaClientDemo.java:36] - KafkaProducer.push("topicName", 0, 4, 7, "msg#7") OK.
11:36:56.348  INFO  [KafkaClientDemo.java:36] - KafkaProducer.push("topicName", 0, 5, 8, "msg#8") OK.
11:36:56.349  INFO  [KafkaClientDemo.java:36] - KafkaProducer.push("topicName", 1, 2, 3, "msg#3") OK.
11:36:56.349  INFO  [KafkaClientDemo.java:36] - KafkaProducer.push("topicName", 1, 3, 4, "msg#4") OK.
11:36:56.370  INFO  [KafkaClientDemo.java:36] - KafkaProducer.push("topicName", 2, 3, 2, "msg#2") OK.
11:36:56.371  INFO  [KafkaClientDemo.java:36] - KafkaProducer.push("topicName", 2, 4, 5, "msg#5") OK.
11:36:56.371  INFO  [KafkaClientDemo.java:36] - KafkaProducer.push("topicName", 2, 5, 6, "msg#6") OK.
## 消费者
11:36:56.764  INFO  [KafkaClientDemo.java:62] - KafkaConsumer.poll(3) OK.
11:36:56.766  INFO  [KafkaClientDemo.java:64] - KafkaConsumer.poll("topicName", 0, 3, 1, "msg#1") OK.
11:36:56.766  INFO  [KafkaClientDemo.java:64] - KafkaConsumer.poll("topicName", 0, 4, 7, "msg#7") OK.
11:36:56.766  INFO  [KafkaClientDemo.java:64] - KafkaConsumer.poll("topicName", 0, 5, 8, "msg#8") OK.
11:36:56.771  INFO  [KafkaClientDemo.java:62] - KafkaConsumer.poll(5) OK.
11:36:56.771  INFO  [KafkaClientDemo.java:64] - KafkaConsumer.poll("topicName", 1, 2, 3, "msg#3") OK.
11:36:56.771  INFO  [KafkaClientDemo.java:64] - KafkaConsumer.poll("topicName", 1, 3, 4, "msg#4") OK.
11:36:56.773  INFO  [KafkaClientDemo.java:64] - KafkaConsumer.poll("topicName", 2, 3, 2, "msg#2") OK.
11:36:56.773  INFO  [KafkaClientDemo.java:64] - KafkaConsumer.poll("topicName", 2, 4, 5, "msg#5") OK.
11:36:56.773  INFO  [KafkaClientDemo.java:64] - KafkaConsumer.poll("topicName", 2, 5, 6, "msg#6") OK.

3 基于 Spring 开发

官网: http://projects.spring.io/spring-kafka/ 介绍: https://docs.spring.io/spring-kafka/docs/2.1.2.RELEASE/reference/html/ API: http://docs.spring.io/spring-kafka/docs/2.1.2.RELEASE/api/

3.1 Maven 依赖

环境要求: Apache Kafka 1.0.0, Java 8+

代码语言:javascript
复制
<dependency><!-- logback 日志 -->
    <groupId>ch.qos.logback</groupId>
    <artifactId>logback-classic</artifactId>
    <version>1.2.3</version>
</dependency>
<dependency><!-- spring-kafka -->
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.1.2.RELEASE</version>
</dependency>

3.2 spring.xml

代码语言:javascript
复制
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
            http://www.springframework.org/schema/beans/spring-beans.xsd
            http://www.springframework.org/schema/context
            http://www.springframework.org/schema/context/spring-context.xsd">
    <!-- Bean 类的扫描路径(package name) -->
    <context:component-scan base-package="wang.kefeng"/>
    <!-- 生产者工厂(KafkaProducerFactory): 用于创建 KafkaProducer 对象(KafkaTemplate) -->
    <bean id="producerFactory"
          class="org.springframework.kafka.core.DefaultKafkaProducerFactory">
        <constructor-arg>
            <map>
                <entry key="bootstrap.servers" value="centos:9091,centos:9092,centos:9093"/>
                <entry key="key.serializer" value="org.apache.kafka.common.serialization.IntegerSerializer"/>
                <entry key="value.serializer" value="org.apache.kafka.common.serialization.StringSerializer"/>
            </map>
        </constructor-arg>
    </bean>
    <!-- 生产者模板(KafkaTemplate): 用于发布消息 -->
    <bean id="kafkaTemplate" class="org.springframework.kafka.core.KafkaTemplate">
        <constructor-arg ref="producerFactory"/>
        <constructor-arg name="autoFlush" value="true"/>
        <property name="defaultTopic" value="topicName"/>
        <property name="producerListener"><!-- 消息发送成功的回调 -->
            <bean class="wang.kefeng.KafkaProducerListener"/>
        </property>
    </bean>
    <!-- 消费者工厂(KafkaConsumerFactory): 用于创建 KafkaConsumer 对象 -->
    <bean id="consumerFactory"
          class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
        <constructor-arg>
            <map>
                <entry key="bootstrap.servers" value="centos:9091,centos:9092,centos:9093"/>
                <entry key="key.deserializer" value="org.apache.kafka.common.serialization.IntegerDeserializer"/>
                <entry key="value.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/>
                <entry key="group.id" value="groupName"/>
            </map>
        </constructor-arg>
    </bean>
    <!-- 消费者容器 -->
    <bean class="org.springframework.kafka.listener.KafkaMessageListenerContainer" init-method="doStart">
        <constructor-arg ref="consumerFactory"/>
        <constructor-arg><!-- 配置信息 -->
            <bean class="org.springframework.kafka.listener.config.ContainerProperties">
                <constructor-arg value="topicName"/>
                <property name="messageListener"><!-- 消息接收成功的回调 -->
                    <bean class="wang.kefeng.KafkaConsumerListener"/>
                </property>
            </bean>
        </constructor-arg>
    </bean>
</beans>

3.3 KafkaProducerListener.java

代码语言:javascript
复制
public class KafkaProducerListener extends ProducerListenerAdapter<Integer, String> {
    private static final Logger logger = LoggerFactory.getLogger(KafkaProducerListener.class);
    @Override
    public void onSuccess(String topic, Integer partition, Integer key, String value, RecordMetadata recordMetadata) {
        logger.info("KafkaProducerListener.onSuccess(\"{}\", {}, \"{}\") OK.", topic, key, value);
    }
    @Override
    public void onError(String topic, Integer partition, Integer key, String value, Exception exception) {
        logger.warn("KafkaProducerListener.onError(\"{}\", {}, \"{}\") OK.", topic, key, value);
    }
    @Override
    public boolean isInterestedInSuccess() {
        return true;
    }
}

3.4 KafkaProducerListener.java

代码语言:javascript
复制
public class KafkaConsumerListener implements MessageListener<Integer, String> {
    private static final Logger logger = LoggerFactory.getLogger(KafkaConsumerListener.class);
    public void onMessage(ConsumerRecord<Integer, String> record) {
        logger.info("KafkaConsumerListener.onMessage(\"{}\", {}, \"{}\") OK.",
                record.topic(), record.key(), record.value());
    }
}

3.5 KafkaClientDemo.java

代码语言:javascript
复制
@Component
public class KafkaClientDemo {
    @Resource
    private KafkaTemplate<Integer, String> kafkaTemplate;
    public void producerDemo() {
        for (int key = 1; key <= 8; ++key) {
            String value = String.format("msg#%d", key);
            kafkaTemplate.sendDefault(key, value);
        }
    }
    public static void main(String[] args) {
        try (ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("classpath:spring.xml")) {
            context.start(); // 启动消费者线程,收到消息后自动回调 KafkaConsumerListener.onMessage()
            context.getBean(KafkaClientDemo.class).producerDemo(); // 生产消费
            try {
                Thread.sleep(600000); // 等待消费者线程(10min)
            } catch (InterruptedException e) {
            }
        }
    }
}

3.6 运行结果

代码语言:javascript
复制
## 生产者
13:05:13.734  INFO  [KafkaProducerListener.java:16] - KafkaProducerListener.onSuccess("topicName", 1, "msg#1") OK.
13:05:13.740  INFO  [KafkaProducerListener.java:16] - KafkaProducerListener.onSuccess("topicName", 2, "msg#2") OK.
13:05:13.759  INFO  [KafkaProducerListener.java:16] - KafkaProducerListener.onSuccess("topicName", 3, "msg#3") OK.
13:05:13.765  INFO  [KafkaProducerListener.java:16] - KafkaProducerListener.onSuccess("topicName", 4, "msg#4") OK.
13:05:13.770  INFO  [KafkaProducerListener.java:16] - KafkaProducerListener.onSuccess("topicName", 5, "msg#5") OK.
13:05:13.778  INFO  [KafkaProducerListener.java:16] - KafkaProducerListener.onSuccess("topicName", 6, "msg#6") OK.
13:05:13.783  INFO  [KafkaProducerListener.java:16] - KafkaProducerListener.onSuccess("topicName", 7, "msg#7") OK.
13:05:13.788  INFO  [KafkaProducerListener.java:16] - KafkaProducerListener.onSuccess("topicName", 8, "msg#8") OK.
## 消费者
13:05:16.920  INFO  [KafkaConsumerListener.java:15] - KafkaConsumerListener.onMessage("topicName", 3, "msg#3") OK.
13:05:16.920  INFO  [KafkaConsumerListener.java:15] - KafkaConsumerListener.onMessage("topicName", 4, "msg#4") OK.
13:05:16.922  INFO  [KafkaConsumerListener.java:15] - KafkaConsumerListener.onMessage("topicName", 1, "msg#1") OK.
13:05:16.922  INFO  [KafkaConsumerListener.java:15] - KafkaConsumerListener.onMessage("topicName", 7, "msg#7") OK.
13:05:16.922  INFO  [KafkaConsumerListener.java:15] - KafkaConsumerListener.onMessage("topicName", 8, "msg#8") OK.
13:05:16.922  INFO  [KafkaConsumerListener.java:15] - KafkaConsumerListener.onMessage("topicName", 2, "msg#2") OK.
13:05:16.922  INFO  [KafkaConsumerListener.java:15] - KafkaConsumerListener.onMessage("topicName", 5, "msg#5") OK.
13:05:16.922  INFO  [KafkaConsumerListener.java:15] - KafkaConsumerListener.onMessage("topicName", 6, "msg#6") OK.

4 基于 SpringBoot 开发

创建 SpringBoot 工程。

4.1 Maven 依赖

代码语言:javascript
复制
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

4.2 application.properties

代码语言:javascript
复制
# Brokers 地址列表
spring.kafka.bootstrap-servers=centos:9091,centos:9092,centos:9093
# 默认消费者组
spring.kafka.consumer.group-id=groupName
# 序列化/反序列化
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

4.3 KafkaClientDemo.java

代码语言:javascript
复制
@Component
@EnableScheduling
public class KafkaClientDemo {
    private static final String TOPIC_NAME = "topicName";
    private static final Logger logger = LoggerFactory.getLogger(KafkaClientDemo.class);
    private int key = 0;
    @Autowired
    private KafkaTemplate kafkaTemplate;
    // 生产者: 每秒推送一个消息
    @Scheduled(fixedRate = 1000)
    public void producerDemo() {
        String message = String.format("msg#%d", ++key);
        logger.info("KafkaClientDemo.producerDemo(\"{}\") ...", message);
        kafkaTemplate.send(TOPIC_NAME, message);
    }
    // 消费者1: 订阅主题(topicName)并接收消息
    @KafkaListener(topics = {TOPIC_NAME})
    public void consumerDemo1(String message) {
        logger.info("KafkaClientDemo.consumerDemo1(\"{}\") OK.", message);
    }
    // 消费者2: 订阅主题(topicName)并接收消息
    @KafkaListener(topics = {TOPIC_NAME})
    public void consumerDemo2(String message) {
        logger.info("KafkaClientDemo.consumerDemo2(\"{}\") OK.", message);
    }
}

4.4 运行结果

运行 SpringBoot 的 Application 类(无需任何调整),结果如下:

代码语言:javascript
复制
## 可见:一个生产者定时投递消息;两个消费者(属于同一消费者组 groupName)交替收取消息。
14:36:52.586  INFO  [KafkaClientDemo.java:30] - KafkaClientDemo.producerDemo("msg#1") ...
14:36:52.889  INFO  [KafkaClientDemo.java:37] - KafkaClientDemo.consumerDemo1("msg#1") OK.
14:36:53.583  INFO  [KafkaClientDemo.java:30] - KafkaClientDemo.producerDemo("msg#2") ...
14:36:53.603  INFO  [KafkaClientDemo.java:43] - KafkaClientDemo.consumerDemo2("msg#2") OK.
14:36:54.583  INFO  [KafkaClientDemo.java:30] - KafkaClientDemo.producerDemo("msg#3") ...
14:36:54.613  INFO  [KafkaClientDemo.java:37] - KafkaClientDemo.consumerDemo1("msg#3") OK.
14:36:55.583  INFO  [KafkaClientDemo.java:30] - KafkaClientDemo.producerDemo("msg#4") ...
14:36:55.600  INFO  [KafkaClientDemo.java:37] - KafkaClientDemo.consumerDemo1("msg#4") OK.
14:36:56.583  INFO  [KafkaClientDemo.java:30] - KafkaClientDemo.producerDemo("msg#5") ...
14:36:56.598  INFO  [KafkaClientDemo.java:43] - KafkaClientDemo.consumerDemo2("msg#5") OK.

作者:王克锋

出处:https://kefeng.wang/2017/11/18/kafka-development/

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2019-05-11,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 码上修行 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1 开发概述
  • 2 基于官方 API 开发
    • 2.1 Maven 依赖
      • 2.2 logback.xml(日志配置,可选)
        • 2.3 演示类 KafkaClientDemo.java
          • 2.4 运行结果
          • 3 基于 Spring 开发
            • 3.1 Maven 依赖
              • 3.2 spring.xml
                • 3.3 KafkaProducerListener.java
                  • 3.4 KafkaProducerListener.java
                    • 3.5 KafkaClientDemo.java
                      • 3.6 运行结果
                      • 4 基于 SpringBoot 开发
                        • 4.1 Maven 依赖
                          • 4.2 application.properties
                            • 4.3 KafkaClientDemo.java
                              • 4.4 运行结果
                              相关产品与服务
                              容器服务
                              腾讯云容器服务(Tencent Kubernetes Engine, TKE)基于原生 kubernetes 提供以容器为核心的、高度可扩展的高性能容器管理服务,覆盖 Serverless、边缘计算、分布式云等多种业务部署场景,业内首创单个集群兼容多种计算节点的容器资源管理模式。同时产品作为云原生 Finops 领先布道者,主导开源项目Crane,全面助力客户实现资源优化、成本控制。
                              领券
                              问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档