前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >kafka 结合springboot实战--第三节

kafka 结合springboot实战--第三节

作者头像
六个核弹
发布2022-12-23 20:44:38
3570
发布2022-12-23 20:44:38
举报

消息转发

kafka 消费者可以将消费到的消息转发到指定的主题中去,比如一条消息需要经过多次流转加工才能走完整个业务流程,需要多个consumer来配合完成。转发代码示例如下:

代码语言:javascript
复制

    @KafkaListener(topics = "send-a")
    @SendTo("send-b")
    public String sendTest0(ConsumerRecord<?, String> record){
        System.out.println(record.value());
        return "转发消息"+record.value();
    }

    @KafkaListener(topics = "send-b")
    public void sendTest1(ConsumerRecord<?, String> record){
        System.out.println(record.value());
    }

    @Scheduled(cron = "*/15 * * * * ?")
    @Transactional
    public void producerTest(){
        kafkaTemplate.send("send-a","xxxxxxxxxxxxxx");
    }

生产者获取消费者响应

结合 @sendTo注解ReplyingKafkaTemplate 类 生产者可以获取消费者消费消息的结果; 因为 ReplyingKafkaTemplate 是kafkaTemplate 的一个子类,当你往spring 容器注册 这个bean, kafkaTemplate 的自动装配就会关闭,但是kafkaTemplate 是必须的,因此你需要把这两个bean 都手动注册上。配置示例:

代码语言:javascript
复制

@Configuration
public class KafkaConfig {

    @Bean
    public NewTopic topic2() {
        return new NewTopic("topic-kl", 1, (short) 1);
    }



    @Bean
    public AdminClient init( KafkaProperties kafkaProperties){
        return KafkaAdminClient.create(kafkaProperties.buildAdminProperties());
    }

    /**
     * 同步的kafka需要ReplyingKafkaTemplate,指定repliesContainer
     *
     * @param producerFactory
     * @param repliesContainer
     * @return
     */
    @Bean
    public ReplyingKafkaTemplate<String, String, String> replyingTemplate(
        ProducerFactory<String, String> producerFactory,
        ConcurrentMessageListenerContainer<String, String> repliesContainer) {
        ReplyingKafkaTemplate template = new ReplyingKafkaTemplate<>(producerFactory, repliesContainer);
        //同步相应超时时间:10s
        template.setReplyTimeout(10000);
        return template;
    }

    @Bean
    public ProducerFactory<String,String> producerFactory(KafkaProperties properties) {
        DefaultKafkaProducerFactory<String, String> producerFactory = new DefaultKafkaProducerFactory<>(properties.buildProducerProperties());
        producerFactory.setTransactionIdPrefix(properties.getProducer().getTransactionIdPrefix());
        return  producerFactory;
//        return new DefaultKafkaProducerFactory<>(properties.producerConfigs(properties));
    }

    public Map<String, Object> producerConfigs(KafkaProperties properties) {
        Map<String, Object> props = new HashMap<>();
        //用于建立与kafka集群的连接,这个list仅仅影响用于初始化的hosts,来发现全部的servers。 格式:host1:port1,host2:port2,…,
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,String.join(",",properties.getBootstrapServers()));
        // 重试次数
        props.put(ProducerConfig.RETRIES_CONFIG, 3);
        // Producer可以将发往同一个Partition的数据做成一个Produce Request发送请求以减少请求次数,该值即为每次批处理的大小,若将该值设为0,则不会进行批处理
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
        // Producer可以用来缓存数据的内存大小。该值实际为RecordAccumulator类中的BufferPool,即Producer所管理的最大内存。
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
        //发送一次message最大大小,默认是1M
        //props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 20971520);
        // 序列化器
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return props;
    }

    /**
     * 指定consumer返回数据到指定的topic
     * @return
     */
    @Bean
    public ConcurrentMessageListenerContainer<String, String>
    repliesContainer(ConcurrentKafkaListenerContainerFactory<String, String> containerFactory) {
        ConcurrentMessageListenerContainer<String, String> repliesContainer =
            containerFactory.createContainer("topic-return");
        repliesContainer.setAutoStartup(true);
        return repliesContainer;
    }

    @Bean
//    @ConditionalOnMissingBean(KafkaTemplate.class)
    public KafkaTemplate<?, ?> kafkaTemplate(ProducerFactory<String, String> kafkaProducerFactory,
                                             ObjectProvider<RecordMessageConverter> messageConverter,KafkaProperties properties) {
        KafkaTemplate<String, String> kafkaTemplate = new KafkaTemplate<>(kafkaProducerFactory);
        messageConverter.ifUnique(kafkaTemplate::setMessageConverter);
        kafkaTemplate.setProducerListener( new LoggingProducerListener<>());
        kafkaTemplate.setDefaultTopic(properties.getTemplate().getDefaultTopic());
        return kafkaTemplate;
    }


}

生产者接收消费者返回值(这俩最好不要开到一个应用中,否则会很容易生产者超时,观察不到返回的结果):

代码语言:javascript
复制

    @Scheduled(cron = "*/1 * * * * ?")
    @Transactional
    public void returnTestProducer(){
        ProducerRecord<String, String> record = new ProducerRecord<>("topic-return", "test-return");
        RequestReplyFuture<String, String, String> replyFuture = replyingTemplate.sendAndReceive(record);
        try {
            String value = replyFuture.get().value();
            System.out.println(value);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    }

    @KafkaListener(topics = "topic-return")
    @SendTo
    public String listen(String message) {
        return "consumer return:".concat(message);
    }

结语

kafka 的相关知识更新完了,这是最后一节。内容比较粗糙,没有涉及到一些业务场景的设计使用,但是作为入门教程还是很不错的,感谢阅读。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2021-03-26,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 六个核弹 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 消息转发
  • 生产者获取消费者响应
  • 结语
相关产品与服务
容器服务
腾讯云容器服务(Tencent Kubernetes Engine, TKE)基于原生 kubernetes 提供以容器为核心的、高度可扩展的高性能容器管理服务,覆盖 Serverless、边缘计算、分布式云等多种业务部署场景,业内首创单个集群兼容多种计算节点的容器资源管理模式。同时产品作为云原生 Finops 领先布道者,主导开源项目Crane,全面助力客户实现资源优化、成本控制。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档