使用 SDK 收发消息

最近更新时间:2024-12-27 14:25:13

我的收藏

操作场景

本文以使用 Paho Java Client 为例介绍通过开源终端 SDK 实现消息收发的操作过程,帮助您更好地理解消息收发的完整过程。

前提条件

操作步骤

步骤1:安装 Java 依赖库

在 Java 项目中引入相关依赖,以 Maven 工程为例,在 pom.xml 添加以下依赖:
<dependency> <groupId>org.eclipse.paho</groupId> <artifactId>org.eclipse.paho.client.mqttv3</artifactId> <version>1.2.5</version> </dependency> <dependency> <groupId>org.bouncycastle</groupId> <artifactId>bcpkix-jdk15on</artifactId> <version>1.70</version> </dependency>

步骤2:参数和配置说明

参数
说明
BROKER_ADDR
broker 连接地址,可以从 控制台 目标集群基本信息 > 接入信息处复制。位置如下图所示。格式:mqtt-xxx-gz.mqtt.qcloud.tencenttdmq.com:1883



USERNAME
连接用户名,请从控制台集群认证管理页处复制。
PASSWORD
连接用户名匹配的密码,请从控制台集群认证管理页处复制。



FIRST_TOPIC
MQTT 第一级 Topic,请从控制台集群 Topic 页处复制。

步骤3:消息发布

示例代码如下:
String server = "tcp://" + BROKER_ADDR;

// 请使用全局唯一的 clientId
String clientId = MqttClient.generateClientId();
// 如果启用持久session,请使用文件等非易失性存储介质 client = new MqttClient(server, clientId, new MemoryPersistence());
client.setCallback(new MqttCallback() {
@Override public void connectionLost(Throwable cause) { System.out.println(MessageFormat.format("connection lost, cause: {0}", cause)); } @Override public void messageArrived(String topic, MqttMessage message) throws Exception { // 处理业务逻辑 System.out.println(MessageFormat.format("received message from topic: {0}, payload: {1}", topic, message.toString())); } @Override public void deliveryComplete(IMqttDeliveryToken token) { try { System.out.println(MessageFormat.format("delivered message to topics: {0}", Arrays.asList(token.getTopics()))); } catch (Exception e) { e.printStackTrace(); } } });
// 设置请求超时时间,避免无限等待 client.setTimeToWait(3000);

MqttConnectOptions options = new MqttConnectOptions(); options.setUserName(USERNAME); options.setPassword(PASSWORD.toCharArray()); options.setCleanSession(false); options.setConnectionTimeout(3000);
// 连接 broker client.connect(options); if (!client.isConnected()) { System.out.println("Failed to connect to broker: " + server); return; } else { System.out.println("Connected to broker: " + server); }
// 向 topic 发布消息 for (int i = 0; i < 10; i++) {
MqttMessage msg = new MqttMessage("payload".getBytes(StandardCharsets.UTF_8)); msg.setQos(1); msg.setRetained(false); client.publish(FIRST_TOPIC + "/second/first", msg); }
// 断开与 broker 的连接 client.disconnect();

// 关闭 client client.close();

步骤4:消息订阅

示例代码如下:
String server = "tcp://" + BROKER_ADDR;

// 请使用全局唯一的 clientId
String clientId = MqttClient.generateClientId();
// 如果启用持久session,请使用文件等非易失性存储介质 client = new MqttClient(server, clientId, new MemoryPersistence());
client.setCallback(new MqttCallback() {
@Override public void connectionLost(Throwable cause) { System.out.println(MessageFormat.format("connection lost, cause: {0}", cause)); } @Override public void messageArrived(String topic, MqttMessage message) throws Exception { // 处理业务逻辑 System.out.println(MessageFormat.format("received message from topic: {0}, payload: {1}", topic, message.toString())); } @Override public void deliveryComplete(IMqttDeliveryToken token) { try { System.out.println(MessageFormat.format("delivered message to topics: {0}", Arrays.asList(token.getTopics()))); } catch (Exception e) { e.printStackTrace(); } } });
// 设置请求超时时间,避免无限等待 client.setTimeToWait(3000);

MqttConnectOptions options = new MqttConnectOptions(); options.setUserName(USERNAME); options.setPassword(PASSWORD.toCharArray()); options.setCleanSession(false); options.setConnectionTimeout(3000);

// 连接 broker client.connect(options); if (!client.isConnected()) { System.out.println("Failed to connect to broker: " + server); return; } else { System.out.println("Connected to broker: " + server); }
// 订阅 topicFilter String topicFilter = FIRST_TOPIC + "/second/first"; client.subscribe(topicFilter, 1); System.out.println("Subscribed to: " + topicFilter);
TimeUnit.MINUTES.sleep(10);

// 取消订阅
client.unsubscribe();

// 断开与 broker 的连接 client.disconnect();

// 关闭 client client.close();