前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >分布式消息中间件TDMQ架构及使用案例最佳实践

分布式消息中间件TDMQ架构及使用案例最佳实践

原创
作者头像
邓愉悦
修改2020-10-26 16:24:34
1.7K0
修改2020-10-26 16:24:34
举报
文章被收录于专栏:腾讯云中间件专家服务

背景

TDMQ是基于pulsar的金融级分布式消息中间件,是一个具备跨域、高可用、高并发的MQ。拥有原生的java、C++,Python,Go API,同时支持多种协议的接入(kafka、AMQP等)。同时支持 Kafka 协议以及 HTTP Proxy 方式接入,可为分布式应用系统提供异步解耦和削峰填谷的能力,具备互联网应用所需的海量消息堆积、高吞吐、可靠重试等特性。目前TDMQ已逐步成为新一代分布式云上消息中间件。能够很好的兼容和满足客户丰富的业务场景。

TDMQ的功能全景图
TDMQ的功能全景图

TDMQ的整体架构如下:

TDMQ整体架构
TDMQ整体架构

从上面TDMQ的功能全景和架构图中,可以看出,整个TDMQ是采用服务与存储相分离的架构。同时服务都是无状态的,这样的话,可以方便随时进行扩展,同时提升了整体的容灾能力。

TDMQ各组件说明

topic 介绍

topic是所有消息的集合,所有生产者的消息,都会归属到指定的topic之中, 所有在 topic 里的消息,会按照一定的规则,被切分成不同的分区(Partition)。一个分区会落靠在某一个服务器上,原理类似于 Kafka Topic Partition。

topic的架构如下所示:

topic逻辑架构图
topic逻辑架构图

topic的命名规则如下:Topic完整名称由:租户名 + 命名空间 + Topic。其中租户名为APPID,命名空间为环境变量。

topic完整命名组成
topic完整命名组成

broker 介绍

broker端架构
broker端架构

Broker负责消息的收发,数据不会真正存储在 broker,但会分配topic的控制权。

客户端实践

1、客户端初始化,创建Client(java语言为例)

(1)、使用域名的访问模式:

Map<String, String> authParams = new HashMap<>();

authParams.put("secretId", "***");

authParams.put("secretKey", "***");

authParams.put("region", "ap-guangzhou");//地域信息

PulsarClient client = PulsarClient.builder().authenticationCloud(

"com.tencent.tdmq.client.impl.auth.AuthenticationCloudCam", authParams)

.serviceUrl("pulsar://tdmq.åtencentcloud.example.com:6650").build();

(2)、用多个broker ip地址方式

Map<String, String> authParams = new HashMap<>();

authParams.put("secretId", "***********************************************");

authParams.put("secretKey", "***********************************************");

authParams.put("region", "ap-guangzhou");//地域信息

authParams.put("apiUrl", "");//腾讯云CAM地址

PulsarClient client = PulsarClient.builder().authenticationCloud(

"com.tencent.tdmq.client.impl.auth.AuthenticationCloudCam", authParams)

.serviceUrl("pulsar://host1:6650,host2:6650").build();

(3)、根据不同的网络环境,使用不同的netModel模式方法。

Map<String, String> authParams = new HashMap<>();

authParams.put("secretId", "***********************************************");

authParams.put("secretKey", "***********************************************");

authParams.put("region", "ap-guangzhou");//地域信息

PulsarClient client = PulsarClient.builder().authenticationCloud(

"com.tencent.tdmq.client.impl.auth.AuthenticationCloudCam", authParams)

.netModelKey("customNetModelKey")

.serviceUrl("pulsar://host1:6650,host2:6650").build();

(4)、生产消费的例子

//创建生产者对象

Producer<byte[]> producer = client.newProducer()

.batchingMaxBytes(1024*32)

.batchingMaxMessages(1000)

.topic(topic)

.create();

for (int i = 0; i < 5; i++) {

String value = "my-sync-message-" + i;

MessageId msgId = producer.newMessage().value(value.getBytes()).send();

System.out.println("produce sync msg id:" + msgId + ", value:" + value);

}

//创建消费者对象

Consumer<byte[]> consumer = client.newConsumer()

.topic(topic)

.subscriptionName(groupName)

.subscribe();

for (int i = 0; i < 5; i++) {

Message<byte[]> msg = consumer.receive();

String msgId = ((TopicMessageIdImpl)msg.getMessageId()).getInnerMessageId().toString();

String value = new String(msg.getValue());

System.out.println("receive msg " + msgId + ",value:" + value );

consumer.acknowledge(msg);// 确认消息

}

TDMQ消息的订阅模式

TDMQ支持3中订阅模式:独占模式、共享模式、故障转移定位模式。三种区别如图所示:

TDMQ3种订阅模式
TDMQ3种订阅模式

1、独占模式

对于一个topic来说,不管有多少个Consumer 同时存在,只会有一个Consumer是活跃的,也就是说只有一个Consumer能够收到这个topic下面的所有消息,这种模式就是Pulsar订阅模式中的独占订阅(Exclusive)。

Consumer consumer = client.newConsumer()

.topic("my-topic")

.subscriptionName("my-subscription")

.subscriptionType(SubscriptionType.Exclusive)

.subscribe();

如果多个consumer去订阅这个topic,就好出现报错。

大于一个实例独占订阅topic
大于一个实例独占订阅topic

2、故障转移订阅

Failover(故障转移订阅)则是多个 consumer 可以附加到同一订阅。但是,对于给定的主题分区,将选择一个 consumer 作为该主题分区的主使用者,其他 consumer 将被指定为故障转移消费者,当主消费者断开连接时,分区将被重新分配给其中一个故障转移消费者,而新分配的消费者将成为新的主消费者。发生这种情况时,所有未确认的消息都将传递给新的主消费者,这类似于 Apache Kafka 中的使用者分区重新平衡。

Consumer consumer = client.newConsumer()

.topic("my-topic")

.subscriptionName("my-subscription")

.subscriptionType(SubscriptionType.Failover)

.subscribe();

3、共享订阅

是可以将所需数量的 consumer 附加到同一订阅。消息以多个 consumer 的循环尝试分发形式传递,并且任何给定的消息仅传递给一个 consumer。当消费者断开连接时,所有传递给它并且未被确认的消息将被重新安排,以便发送给该订阅上剩余的 consumer。需要指出的是,TDMQ对consumer数量没有明确的限制。

Consumer consumer = client.newConsumer()

.topic("my-topic")

.subscriptionName("my-subscription")

.subscriptionType(SubscriptionType.Shared)

.subscribe();

TDMQ高级特性

1、顺序消息

(1)、全局有序必须为非分区Topic

//只能允许一个消费者

Consumer<byte[]> consumer = client.newConsumer()

.subscriptionType(SubscriptionType.Exclusive)

.topic(topic)

.subscriptionName(groupName)

.subscribe();

(2)、局部有序通过设置key 来达到局部顺序的目的

//设置key,保证相同可以的消息发送到同一个分区里,只能允许一个消费者

MessageId msgId = producer.newMessage().key(key).value(value.getBytes()).send();

2、执行tag消息过滤

//单个tag生产

MessageId msgId = producer.newMessage().value(value.getBytes()) .tags("TagA").send();

//多个tag生产

producer.newMessage()

.value("my-sync-message".getBytes())

.tags("TagA", "TagB","TagC")//支持设置多个标签

.send();

//指定tag进行消费

Consumer consumer = client.newConsumer()

.topicByTag(topic, "TagA || TagB")

//.topicByTag(topic, "TagA ") 单个

//.topic(topic, "*") 订阅所有

//.topicByTagsPattern(topic, "Tag.*") 正则

.subscriptionName("my-subscription")

.subscriptionType(SubscriptionType.Shared)

.subscribe();

3、延时消息

//延迟

MessageId msgId = producer.newMessage().value(value.getBytes())

.deliverAfter(delay, TimeUnit.SECONDS)

.send();

//定时

MessageId msgId = producer.newMessage().value(value.getBytes())

.deliverAt(timestamp)

.send();

4、消息重试

Consumer<byte[]> consumer = client.newConsumer()

.subscriptionType(SubscriptionType.Shared)

.enableRetry(true)

.deadLetterPolicy(DeadLetterPolicy.builder()

.maxRedeliverCount(maxRedeliverCount)

.build())

.topic(topic)

.subscriptionName(groupName)

.subscribe();

//指定延迟时间

consumer.reconsumeLater(msg, 1000L, TimeUnit.MILLISECONDS);

//指定延迟等级

consumer.reconsumeLater(msg, 1);

//等级递增

consumer.reconsumeLater(msg);

5、消息压缩,//生产者设置压缩类型,支持ZLIB、ZSTD、SNAPPY、LZ4

//生产者设置压缩类型,支持ZLIB、ZSTD、SNAPPY、LZ4

Producer<byte[]> producer = client.newProducer()

.enableBatching(false)

.topic(topic)

.compressionType(CompressionType.LZ4)

.create();

6、批量发生消息

//可以设置批量的大小和消息条数

Producer<byte[]> producer = client.newProducer()

.enableBatching(true)

.batchingMaxBytes(1024*32)

.batchingMaxMessages(1000)

.topic(topic)

.create();

以上就是整个TDMQ架构,产品特性,以及客户端初始化以及消费、生产主要核心流程代码,希望对大家有帮助。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 背景
  • TDMQ各组件说明
    • topic 介绍
      • broker 介绍
        • 客户端实践
          • 1、客户端初始化,创建Client(java语言为例)
          • TDMQ消息的订阅模式
      • TDMQ高级特性
      相关产品与服务
      消息队列 CMQ 版
      消息队列 CMQ 版(TDMQ for CMQ,简称 TDMQ CMQ 版)是一款分布式高可用的消息队列服务,它能够提供可靠的,基于消息的异步通信机制,能够将分布式部署的不同应用(或同一应用的不同组件)中的信息传递,存储在可靠有效的 CMQ 队列中,防止消息丢失。TDMQ CMQ 版支持多进程同时读写,收发互不干扰,无需各应用或组件始终处于运行状态。
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档