操作场景
本文以使用 Paho Java Client 为例介绍通过开源终端 SDK 实现消息收发的操作过程,帮助您更好地理解消息收发的完整过程。
前提条件
下载 Demo
操作步骤
步骤1:安装 Java 依赖库
在 Java 项目中引入相关依赖,以 Maven 工程为例,在 pom.xml 添加以下依赖:
<dependency> <groupId>org.eclipse.paho</groupId> <artifactId>org.eclipse.paho.client.mqttv3</artifactId> <version>1.2.5</version> </dependency> <dependency> <groupId>org.bouncycastle</groupId> <artifactId>bcpkix-jdk15on</artifactId> <version>1.70</version> </dependency>
步骤2:参数和配置说明
参数 | 说明 |
BROKER_ADDR | ![]() |
USERNAME | 连接用户名,请从控制台集群认证管理页处复制。 |
PASSWORD | 连接用户名匹配的密码,请从控制台集群认证管理页处复制。 ![]() |
FIRST_TOPIC | MQTT 第一级 Topic,请从控制台集群 Topic 页处复制。 |
步骤3:消息发布
示例代码如下:
String server = "tcp://" + BROKER_ADDR;// 请使用全局唯一的 clientIdString clientId = MqttClient.generateClientId();// 如果启用持久session,请使用文件等非易失性存储介质 client = new MqttClient(server, clientId, new MemoryPersistence());client.setCallback(new MqttCallback() {@Override public void connectionLost(Throwable cause) { System.out.println(MessageFormat.format("connection lost, cause: {0}", cause)); } @Override public void messageArrived(String topic, MqttMessage message) throws Exception { // 处理业务逻辑 System.out.println(MessageFormat.format("received message from topic: {0}, payload: {1}", topic, message.toString())); } @Override public void deliveryComplete(IMqttDeliveryToken token) { try { System.out.println(MessageFormat.format("delivered message to topics: {0}", Arrays.asList(token.getTopics()))); } catch (Exception e) { e.printStackTrace(); } } });// 设置请求超时时间,避免无限等待 client.setTimeToWait(3000);MqttConnectOptions options = new MqttConnectOptions(); options.setUserName(USERNAME); options.setPassword(PASSWORD.toCharArray()); options.setCleanSession(false); options.setConnectionTimeout(3000);// 连接 broker client.connect(options); if (!client.isConnected()) { System.out.println("Failed to connect to broker: " + server); return; } else { System.out.println("Connected to broker: " + server); }// 向 topic 发布消息 for (int i = 0; i < 10; i++) {MqttMessage msg = new MqttMessage("payload".getBytes(StandardCharsets.UTF_8)); msg.setQos(1); msg.setRetained(false); client.publish(FIRST_TOPIC + "/second/first", msg); }// 断开与 broker 的连接 client.disconnect();// 关闭 client client.close();
步骤4:消息订阅
示例代码如下:
String server = "tcp://" + BROKER_ADDR;// 请使用全局唯一的 clientIdString clientId = MqttClient.generateClientId();// 如果启用持久session,请使用文件等非易失性存储介质 client = new MqttClient(server, clientId, new MemoryPersistence());client.setCallback(new MqttCallback() {@Override public void connectionLost(Throwable cause) { System.out.println(MessageFormat.format("connection lost, cause: {0}", cause)); } @Override public void messageArrived(String topic, MqttMessage message) throws Exception { // 处理业务逻辑 System.out.println(MessageFormat.format("received message from topic: {0}, payload: {1}", topic, message.toString())); } @Override public void deliveryComplete(IMqttDeliveryToken token) { try { System.out.println(MessageFormat.format("delivered message to topics: {0}", Arrays.asList(token.getTopics()))); } catch (Exception e) { e.printStackTrace(); } } });// 设置请求超时时间,避免无限等待 client.setTimeToWait(3000);MqttConnectOptions options = new MqttConnectOptions(); options.setUserName(USERNAME); options.setPassword(PASSWORD.toCharArray()); options.setCleanSession(false); options.setConnectionTimeout(3000);// 连接 broker client.connect(options); if (!client.isConnected()) { System.out.println("Failed to connect to broker: " + server); return; } else { System.out.println("Connected to broker: " + server); }// 订阅 topicFilter String topicFilter = FIRST_TOPIC + "/second/first"; client.subscribe(topicFilter, 1); System.out.println("Subscribed to: " + topicFilter);TimeUnit.MINUTES.sleep(10);// 取消订阅client.unsubscribe();// 断开与 broker 的连接 client.disconnect();// 关闭 client client.close();