前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >基于腾讯云tdmq消息队列封装SpringBootStarter(一)

基于腾讯云tdmq消息队列封装SpringBootStarter(一)

原创
作者头像
JulyWhj
修改2022-01-14 15:31:35
2.7K0
修改2022-01-14 15:31:35
举报

基于腾讯云tdmq消息队列封装SpringBootStarter(一)

一、环境准备

1.1 注册腾讯云TDMQ

创建tdmq集群
创建tdmq集群

创建tdmq集群

创建完成后记录下集群ID(clusterId);

1.2 创建命名空间

创建好集群后,在命名空间中新建命名空间,命名空间名称可以根据实际业务场景进行区分,比如这里创建可以根据测试环境、预发布环境、生产环境等进行区分创建。

新建命名空间
新建命名空间

新建命名空间

1.3、创建好命名空间后,新建个`topic`主题。

创建topic
创建topic

创建topic

以上信息创建好后,我们在集群中可以看到集群的访问地址,如下:

查看接入地址
查看接入地址

查看接入地址

在创建tdmq集群时我们需要申请外网访问,这个需要找腾讯的客服开通。

至此,我们开发的基础环境已经准备完成。

二、编写生产者、消费者代码

2.1、创建工程

在idea中新建个工程,工程名称为spring-boot-starter-tdmq

创建工程
创建工程

创建工程

工程名称和包路径可以根据实际情况进行自定义。

2.2、引入相关依赖包

代码语言:javascript
复制
<!-- spring-boot-autoconfigure -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-autoconfigure</artifactId>
</dependency>
<!--tdmq 核心依赖 2.9.1 -->
<dependency>
    <groupId>org.apache.pulsar</groupId>
    <artifactId>pulsar-client</artifactId>
    <version>${pulsar.version}</version>
</dependency>

这里使用的是腾讯云tdmq-pulsar版,这里需要引入pulsar-client.

2.3、创建生产者

首先我们在项目中创建一个config的包路径,新建一个tdmq的配置类TdmqProperties

代码语言:javascript
复制
@Data
@ConfigurationProperties(prefix = "tdmq")
public class TdmqProperties {
    /**
     * 启用
     */
    private boolean enable = false;
    /**
     * 服务地址
     */
    private String serviceUrl;
    /**
     * token
     */
    private String token;
    /**
     * 集群ID
     */
    private String clusterId;
    /**
     * 命名空间ID
     */
    private String environmentId;
}

这里主要配置下tdmq集群相关的信息。我们先将配置信息写在配置类中,后续优化我们在使用外部配置进行初始化操作。

创建PulsarClient对象,生产者消费者都是基于PulsarClient对象进行创建。

这里我们通过SpringBoot的自动装配功能来装配PulsarClient

这里我们创建一个自动化配置类TdmqAutoConfiguration。内容如下:

代码语言:javascript
复制
@Data
@EnableConfigurationProperties({TdmqProperties.class})
public class TdmqAutoConfiguration {
    /**
     * Pulsar 客户端
     * 推荐一个进程一个实例
     *
     * @return {@link TdmqAutoConfiguration}
     */
    @Bean
    @ConditionalOnMissingBean(PulsarClient.class)
    @ConditionalOnProperty(name = "tdmq.enable", havingValue = "true")
    public PulsarClient pulsarClient(TdmqProperties mqProperties) throws PulsarClientException {
        return PulsarClient.builder()
                .serviceUrl(mqProperties.getServiceUrl())
                .authentication(AuthenticationFactory.token(mqProperties.getToken()))
                .build();
    }
}

这里增加了两个条件注解@ConditionalOnProperty@ConditionalOnMissingBean(PulsarClient.class)这里分别表示如果没有找到PulsarClient对象和配置文件中启用了tdmq功能,我们才实例化PulsarClient对象。

有了PulsarClient对象,我们可以继续编写生产者代码啦。

在工程中创建producer包路径,并在该路径下创建TdmqProucer.class,内容如下:

代码语言:javascript
复制
@Service
public class TdmqProucer {
    @Autowired
    private PulsarClient pulsarClient;

    /**
     * 发送消息
     *
     * @param message 消息内容
     * @return 消息ID
     * @throws PulsarClientException
     */
    public MessageId sendMsg(String message) throws PulsarClientException {
        Producer<byte[]> producer = pulsarClient.newProducer().topic("clusterId/environmentId/test").create();
        MessageId messageId = producer.newMessage().value(message.getBytes()).send();
        producer.close();
        return messageId;
    }
}

2.4、创建消息消费者

在工程中创建consumer包路径,并在该路径下创建TdmqConsumer类,内容如下:

代码语言:javascript
复制
/**
 * @Author julyWhj
 * @Description 消费者$
 * @Date 2022/1/2 10:13 上午
 **/
@Slf4j
@Service
public class TdmqConsumer {
    @Autowired
    private PulsarClient pulsarClient;
    private Consumer<byte[]> consumer;

    @PostConstruct
    public void initConsumer() throws PulsarClientException {
        log.info("MessageLoggingListener is start");
        consumer = pulsarClient.newConsumer().topic("clusterId/environmentId/自己的topic").subscriptionName("自己的subscriptionName").subscriptionType(SubscriptionType.Exclusive)
                .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
                .subscribe();
        Executors.newSingleThreadExecutor().submit(this::consumer);
    }

    /**
     * 销毁容器
     *
     * @throws PulsarClientException
     */
    @PreDestroy
    public void destroy() throws PulsarClientException {
        consumer.close();
        pulsarClient.close();
    }

    public void consumer() {
        while (true) {
            Message<byte[]> message = null;
            try {
                //使用while(true)无问题,该处会发生阻塞释放CPU资源
                message = consumer.receive();
                String json = new String(message.getData());
                if (StringUtils.isNotEmpty(json)) {
                    log.info("获取消息数据内容:{}", json);
                }
                consumer.acknowledge(message);
            } catch (PulsarClientException e) {
                log.error("数据消费失败", e);
            }
        }
    }
}

2.5、测试生产者和消费者

编写单元测试类

代码语言:javascript
复制
@Slf4j
@SpringBootTest
class SpringBootStarterTdmqApplicationTests {
    @Autowired
    private TdmqProucer proucer;

    @Test
    public void producer() throws PulsarClientException {
        MessageId messageId = proucer.sendMsg("/clusterId/environmentId/自己的topic", "发送消息测试");
        log.info("send msg is success Id = {}", messageId);
    }
}

我们看下测试结果:

测试结果
测试结果

测试结果

最简单的生产消费已经完成了。接下来我们需要对代码进一步封装,这里我们参考amqp的方式,分别封装proucerTemplate和TdmqConsumer注解。实现消费者功能达到一下情况:

代码语言:javascript
复制
 @TdmqConsumer(topic = TopicConstant.MESSAGE_LOGGING_TOPIC, clazz = CreateMsgBean.class, subscriptionName = "subscriptionName")
    void consume(JddMessage<String> msg) {
        log.info("------------{}", JSONUtil.toJsonStr(msg));
    }

OK,基本使用先到这里,我们开始后续内容的封装优化。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 基于腾讯云tdmq消息队列封装SpringBootStarter(一)
    • 一、环境准备
      • 1.1 注册腾讯云TDMQ
      • 1.2 创建命名空间
      • 1.3、创建好命名空间后,新建个`topic`主题。
    • 二、编写生产者、消费者代码
      • 2.1、创建工程
      • 2.2、引入相关依赖包
      • 2.3、创建生产者
      • 2.4、创建消息消费者
相关产品与服务
消息队列 TDMQ
消息队列 TDMQ (Tencent Distributed Message Queue)是腾讯基于 Apache Pulsar 自研的一个云原生消息中间件系列,其中包含兼容Pulsar、RabbitMQ、RocketMQ 等协议的消息队列子产品,得益于其底层计算与存储分离的架构,TDMQ 具备良好的弹性伸缩以及故障恢复能力。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档