发送与接收普通消息

最近更新时间:2024-05-28 15:05:51

我的收藏

操作场景

TDMQ RocketMQ 5.x 版支持用户通过内网或公网使用 HTTP 协议接入,并兼容社区的多语言 HTTP SDK
本文以调用 PHP SDK 为例介绍通过 HTTP SDK 实现消息收发的操作过程,帮助您更好地理解消息收发的完整过程。
注意:
暂不支持通过使用 HTTP 协议实现事务消息。

前提条件

安装 PHP。
更多示例可以参见开源社区的 Demo 示例

重试机制

HTTP 采用固定重试间隔的机制:
重试间隔
最大重试次数
5分钟
可通过修改消费组配置实现自定义最大重试次数,默认 16 次。
说明:
客户端在重试间隔内 ACK 这条消息,表示消费成功,不会重试。
重试间隔到期后客户端仍未 ACK,客户端会重新消费到这条消息。
每次消费的消息句柄只在重试间隔内有效,过期无效。

操作步骤

步骤1:安装 PHP 依赖库

在 PHP 项目中引入相关依赖:
{
"require": {
"aliyunmq/mq-http-sdk": ">=1.0.4"
}
}

步骤2:获取参数

1. 登录消息队列 RocketMQ 控制台,选择集群。
2. 复制集群 ID、接入地址等参数。




步骤3:生产消息

创建消息生产者

private $client;
private $producer;
public function __construct()
{
// 获取 client
$this->client = new MQClient(
endpoint,
accessKey,
secretKey
);

$topic = topicName;
$instanceId = instanceId;

// 获取 producer
$this->producer = $this->client->getProducer($instanceId, $topic);
}
参数
说明
endpoint
接入地址,在基本信息页面获取。
accessKey
角色 AccessKey,在集群权限页面获取。
secretKey
角色 SecretKey,在集群权限页面获取。
instanceId
集群 ID。
topicName
主题名称,在 Topic 页面获取。

发送消息

public function run()
{
try
{
for ($i=0; $i<8; $i++)
{
$publishMessage = new TopicMessage(
"hello mq!"
);
// 设置属性
$publishMessage->putProperty("a", $i);
$result = $this->producer->publishMessage($publishMessage);
print "Send success. msgId is:" . $result->getMessageId() . ", bodyMD5 is:" . $result->getMessageBodyMD5() . "\\n";
}
} catch (\\Exception $e) {
print_r($e->getMessage() . "\\n");
}
}

步骤3:消费消息

创建消费者

private $client;
private $consumer;

public function __construct()
{
// 获取 client
$this->client = new MQClient(
endpoint,
accessKey,
secretKey
);

$topic = topicName;
$groupId = groupName;
$instanceId = instanceId;

// 获取consumer
$this->consumer = $this->client->getConsumer($instanceId, $topic, $groupId);
}
参数
说明
endpoint
接入地址,在基本信息页面获取。
accessKey
角色 AccessKey,在集群权限页面获取。
secretKey
角色 SecretKey,在集群权限页面获取。
instanceId
集群 ID。
topicName
主题名称,在 Topic 页面获取。
groupName
消费组名称,在 Group 页面获取。

订阅消息

public function run()
{
while (True) {
try {
// 长轮询消费消息
// 长轮询表示如果 topic 没有消息则请求会在服务端等待,如果有消息可以消费则立即返回
// 如果对消费延迟比较敏感,强烈建议并发拉取消息
$messages = $this->consumer->consumeMessage(
batchSize,
waitSeconds
);
} catch (\\Exception $e) {
if ($e instanceof MQ\\Exception\\MessageNotExistException) {
printf("No message, contine long polling!RequestId:%s\\n", $e->getRequestId());
continue;
}

print_r($e->getMessage() . "\\n");

sleep(1);
continue;
}

print "consume finish, messages:\\n";

$receiptHandles = array();
foreach ($messages as $message) {
$receiptHandles[] = $message->getReceiptHandle();
printf("MessageID:%s TAG:%s BODY:%s \\nPublishTime:%d, FirstConsumeTime:%d, \\nConsumedTimes:%d, NextConsumeTime:%d,MessageKey:%s\\n",
$message->getMessageId(), $message->getMessageTag(), $message->getMessageBody(),
$message->getPublishTime(), $message->getFirstConsumeTime(), $message->getConsumedTimes(), $message->getNextConsumeTime(),
$message->getMessageKey());
print_r($message->getProperties());
// 处理业务
}

print_r($receiptHandles);
try {
$this->consumer->ackMessage($receiptHandles);
} catch (\\Exception $e) {
if ($e instanceof MQ\\Exception\\AckMessageException) {
printf("Ack Error, RequestId:%s\\n", $e->getRequestId());
foreach ($e->getAckMessageErrorItems() as $errorItem) {
printf("\\tReceiptHandle:%s, ErrorCode:%s, ErrorMsg:%s\\n", $errorItem->getReceiptHandle(), $errorItem->getErrorCode(), $errorItem->getErrorCode());
}
}
}
print "ack finish\\n";
}
}
参数
说明
batchSize
一次拉取的消息条数,支持最多16条。
waitSeconds
一次拉取的轮询等待时间,支持最长30秒。