前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >SpringBoot基础(五、整合Kafka及原生api使用)

SpringBoot基础(五、整合Kafka及原生api使用)

作者头像
营琪
发布2019-11-12 21:02:50
7120
发布2019-11-12 21:02:50
举报
文章被收录于专栏:营琪的小记录营琪的小记录

Kafka原生API使用

引入maven坐标

代码语言:javascript
复制
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.11</artifactId>
    <version>2.3.0</version>
</dependency>

这里要注意引入的jar包与服务器安装的版本要对应,或者去官网查一下。官网连接

创建生产者

代码语言:javascript
复制
private static KafkaProducer<String, String> producer;

    static {

        Properties properties = new Properties();
        properties.put("bootstrap.servers", "VM_0_16_centos:9092");    //注意 用户名或者IP都可
        properties.put("key.serializer",
                "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer",
                "org.apache.kafka.common.serialization.StringSerializer");
/*        //自定义分区分配器  ,是一种发送到哪个分区的自定义做法,后面会介绍,这里注释
        properties.put("partitioner.class",
                "kafkastudy.CustomPartitioner");*/

        producer = new KafkaProducer<>(properties);
    }

利用生产者发送消息 :无脑发模式

代码语言:javascript
复制
    private static void sendMessageForgetResult() {
        ProducerRecord<String, String> record = new ProducerRecord<>(
                "csdn_test", "name", "ForgetResult"); //"name"也可以对value的补充说明
        producer.send(record);
        producer.close(); }

只管发送, 不管结果: 只调用接口发送消息到 Kafka 服务器, 但不管成功写入与否。 由于 Kafka 是高可用的, 因此大部分情 况下消息都会写入, 但在异常情况下会丢消息。

利用生产者发送消息 :同步发送

代码语言:javascript
复制
private static void sendMessageSync() throws Exception {
        ProducerRecord<String, String> record = new ProducerRecord<>(
                "csdn_test", "name", "sync");
        RecordMetadata result = producer.send(record).get();
        System.out.println(result.topic());   //打印返回消息,队列名
        System.out.println(result.partition());//发送到第几个分区位置
        System.out.println(result.offset());   //这是第几个发送到topic中的消息

        producer.close();}

同 步发送: 调用 send() 方法返回一个 Future 对象, 我们可以使用它的 get() 方法来判断消息发送成功与否。

利用生产者发送消息 :异步发送

代码语言:javascript
复制
    private static void sendMessageCallbacktwo() {

        ProducerRecord<String, String> record = new ProducerRecord<>(
                "csdn_test", "name", "callback"
        );
        producer.send(record, (recordMetadata,e)->{
            if (e != null) {e.printStackTrace();return; }
            System.out.println(recordMetadata.topic());
            System.out.println(recordMetadata.partition());
            System.out.println(recordMetadata.offset());
        }); //详细自己点进去看看
        producer.close(); }

异 步发送: 调用 send() 时提供一个回调方法, 当接收到 broker 结果后回调此方法。

利用生产者发送消息 :异步发送,并使用自定义分区分配器

1.Kafka创建topic时,要设置多个分区

2.实现partitioner接口的partition方法

代码语言:javascript
复制
public class CustomPartitioner implements Partitioner {
    @Override
    public int partition(String topic,Object key, byte[] keyBytes,
                         Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitionInfos = cluster.partitionsForTopic(topic);
        int numPartitions = partitionInfos.size();      
        //判断判断kafka是否有多个分区,是否有key,我们要根据key进行分区分配
        if (null == keyBytes || !(key instanceof String)) throw new InvalidRecordException("kafka message must have key");
        if (numPartitions == 1) return 0;     
        if (key.equals("name")) return numPartitions - 1;    //key为"name"的分配最后一个
        return Math.abs(Utils.murmur2(keyBytes)) % (numPartitions - 1);} //其他随机分配

3.创建KafkaProducer对象时,添加自定义分区分配器

代码语言:javascript
复制
      //自定义分区分配器
        properties.put("partitioner.class",
                "kafkastudy.CustomPartitioner");

4.异步发送

代码语言:javascript
复制
    private static void sendMessageCallback() {
        ProducerRecord<String, String> record = new ProducerRecord<>(
             "csdn_test", "name", "callback");
        producer.send(record, new MyProducerCallback());
        record = new ProducerRecord<>("csdn_test", "name-x", "callback");
        producer.send(record, new MyProducerCallback());
        record = new ProducerRecord<>("csdn_test", "name-y", "callback");
        producer.send(record, new MyProducerCallback());
        producer.close(); }
    //配置异步消息
    private static class MyProducerCallback implements Callback {
        @Override
        public void onCompletion(RecordMetadata recordMetadata, Exception e) {
            if (e != null) { e.printStackTrace();return;    }
            System.out.println(recordMetadata.topic());
            System.out.println(recordMetadata.partition());
            System.out.println(recordMetadata.offset());    }   }

创建消费者 配置信息

代码语言:javascript
复制
    private static KafkaConsumer consumer;
    private static Properties properties;
    static {
        properties = new Properties();
        properties.put("bootstrap.servers", "VM_0_16_centos:9092");
        properties.put("key.deserializer",
                "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("value.deserializer",
                "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("group.id", "KafkaStudy");  //组id,标识这是一个消费者,Kafka就会向它投递  }

因为消费消息 要提交位移信息,这个位移信息是需要写入配置文件的,所以不能静态方法生成了。(主要为了方便(懒))。

利用消费者消费信息:自动提交位移

代码语言:javascript
复制
private static void generalConsmerMessageAutoCommit() {
    properties.put("enable.auto.commit", true);
    consumer = new KafkaConsumer(properties);
    consumer.subscribe(Collections.singleton("csdn_test"));
    while (true) {
        AtomicBoolean flag = new AtomicBoolean(true);
        ConsumerRecords<String, String> records = consumer.poll(5);
        records.forEach(x -> {
            System.out.println(String.format("topic = %s,partition = %s, key = %s, value = %s ",
                    x.topic(), x.partition(), x.key(), x.value()));
//                if (x.value().equals("done")) {
            if ("done".equals(x.value())) {
                flag.set(false);
            }
        });
        if (!flag.get()) {
            break;
        }
    }
}

利用消费者消费信息:手动提交位移

代码语言:javascript
复制
private static void generalConsumeMessageSyncCommit() {

    properties.put("auto.commit.offset", false);
    consumer = new KafkaConsumer<>(properties);
    consumer.subscribe(Collections.singletonList("csdn_test"));

    while (true) {
        boolean flag = true;

        ConsumerRecords<String, String> records = consumer.poll(5);
        for (ConsumerRecord<String, String> record : records) {
            System.out.println(String.format(
                    "topic = %s, partition = %s, key = %s, value = %s",
                    record.topic(), record.partition(),
                    record.key(), record.value()
            ));
            if ("done".equals(record.value())) {
                flag = false;
            }
        }

        try {
            consumer.commitSync();
        } catch (CommitFailedException ex) {
            System.out.println("commit failed error: "
                    + ex.getMessage());
        }

        if (!flag) {
            break;
        }
    }
}

利用消费者消费信息:手动异步提交当前位移

代码语言:javascript
复制
private static void generalConsumeMessageAsyncCommit() {

        properties.put("auto.commit.offset", false);
        consumer = new KafkaConsumer<>(properties);
        consumer.subscribe(Collections.singletonList("csdn_test"));

        while (true) {
            boolean flag = true;

            ConsumerRecords<String, String> records = consumer.poll(5);
            for (ConsumerRecord<String, String> record : records) {
                System.out.println(String.format(
                        "topic = %s, partition = %s, key = %s, value = %s",
                        record.topic(), record.partition(),
                        record.key(), record.value()
                ));
                if ("done".equals(record.value())) {
                    flag = false;
                }
            }

            consumer.commitAsync();

            if (!flag) {
                break;
            }
        }
    }

利用消费者消费信息:手动异步提交位移带回测

代码语言:javascript
复制
private static void generalConsumeMessageAsyncCommitWithCallback() {

    properties.put("auto.commit.offset", false);
    consumer = new KafkaConsumer<>(properties);
    consumer.subscribe(Collections.singletonList("csdn_test"));

    while (true) {
        boolean flag = true;

        ConsumerRecords<String, String> records = consumer.poll(5);
        for (ConsumerRecord<String, String> record : records) {
            System.out.println(String.format(
                    "topic = %s, partition = %s, key = %s, value = %s",
                    record.topic(), record.partition(),
                    record.key(), record.value()
            ));
            if ("done".equals(record.value())) {
                flag = false;
            }

            consumer.commitAsync((map, e) -> {
                if (e != null) {
                    System.out.println("commit failed for offsets: " +
                            e.getMessage());
                }
            });

            if (!flag) {
                break;
            }
        }
    }
}

利用消费者消费信息:混合同步与异步提交位移

代码语言:javascript
复制
private static void mixSyncAndAsyncCommit() {

        properties.put("auto.commit.offset", false);
        consumer = new KafkaConsumer<>(properties);
        consumer.subscribe(Collections.singletonList("csdn_test"));
        try {
            while (true) {
                ConsumerRecords<String, String> records =
                        consumer.poll(100);

                for (ConsumerRecord<String, String> record : records) {
                    System.out.println(String.format(
                            "topic = %s, partition = %s, key = %s, " + "value = %s",
                            record.topic(), record.partition(),
                            record.key(), record.value()
                    ));
                }
                consumer.commitAsync();
            }
        } catch (Exception ex) {
            System.out.println("commit async error: " + ex.getMessage());
        } finally {
            try {
                consumer.commitSync();
            } finally {
                consumer.close();
            }
        }
    }

spring boot 整合 Kafka

引入maven

代码语言:javascript
复制
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.2.RELEASE</version>
</parent>

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-test</artifactId>
</dependency>

jar包引入要注意版本!!!

配置spring kafka

代码语言:javascript
复制
spring:
  kafka:
    bootstrap-servers: VM_0_16_centos:9092
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer:
      group-id: test
      enable-auto-commit: true
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

启动类

代码语言:javascript
复制
@SpringBootApplication
public class Application {
    public static void main(String[] args) {
        SpringApplication.run(Application.class, args); }}

利用生成者发送消息

代码语言:javascript
复制
@RunWith(SpringRunner.class)
@SpringBootTest(classes = {Application.class},webEnvironment = SpringBootTest.WebEnvironment.NONE)
public class SpringKafkaTest {
    @Autowired
    private KafkaTemplate<String,String> kafkaTemplate;

    @Test
    public void ProducerTest() {
        kafkaTemplate.send("csdn_test", "name", "Springkafka");
    }
}

spring boot 整合Kafka只介绍一个例子 ,不像原生api 要写很多代码,在spring boot的配置文件中添加一些配置就搞定了,这个更多去查看文档。

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2019-11-06 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Kafka原生API使用
    • 创建生产者
      • 利用生产者发送消息 :无脑发模式
        • 利用生产者发送消息 :同步发送
          • 利用生产者发送消息 :异步发送
            • 利用生产者发送消息 :异步发送,并使用自定义分区分配器
              • 创建消费者 配置信息
                • 利用消费者消费信息:自动提交位移
                  • 利用消费者消费信息:手动提交位移
                    • 利用消费者消费信息:手动异步提交当前位移
                      • 利用消费者消费信息:手动异步提交位移带回测
                        • 利用消费者消费信息:混合同步与异步提交位移
                        • spring boot 整合 Kafka
                          • 配置spring kafka
                            • 启动类
                              • 利用生成者发送消息
                              领券
                              问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档