Eclipse Paho Java Client 是用 Java 编写的 MQTT 客户端库(MQTT Java Client),可用于 JVM 或其他 Java 兼容平台(例如 Android)。
Eclipse Paho Java Client 提供了 MqttAsyncClient 和 MqttClient 异步和同步 API。
通过 Maven 安装 Paho Java
<dependency><groupId>org.eclipse.paho</groupId><artifactId>org.eclipse.paho.client.mqttv3</artifactId><version>1.2.5</version></dependency>
Paho Java 使用示例
QuickStart
package com.tencenttdmq.mqtt.demo;import java.nio.charset.StandardCharsets;import java.util.concurrent.TimeUnit;import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;import org.eclipse.paho.client.mqttv3.MqttClient;import org.eclipse.paho.client.mqttv3.MqttConnectOptions;import org.eclipse.paho.client.mqttv3.MqttException;import org.eclipse.paho.client.mqttv3.MqttMessage;import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;public class QuickStart {public static void main(String[] args) throws MqttException, InterruptedException {// 从MQTT控制台获取接入点:// 通过Private Link实现VPC网络打通的用户, 使用内网接入点;// 通过公网访问的用户, 确保公网安全策略允许, 程序运行机器有公网接入;String serverUri = "tcp://mqtt-xxx.mqtt.tencenttdmq.com:1883";// 合法的Client Identifier包含 数字0-9, 小写英文字母a-z, 以及大写英文字母A-Z, 总长度为1-23个字符// 参考 https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901059String clientId = "QuickStart";// 在控制台 --> 认证 Tab页创建账户, 复制用户名和密码String username = "YOUR_USERNAME";String password = "YOUR_PASSWORD";// 确认一级主题 "home" 在MQTT控制台已经创建String pubTopic = "home/test";String[] topicFilters = new String[]{pubTopic, "home/#", "home/+"};int[] qos = new int[]{1, 1, 1};int total = 16;try (MqttClient client = new MqttClient(serverUri, clientId, new MemoryPersistence())) {client.setTimeToWait(3000);MqttConnectOptions options = new MqttConnectOptions();options.setUserName(username);options.setPassword(password.toCharArray());options.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1_1);options.setCleanSession(true);options.setAutomaticReconnect(true);MqttCallback callback = new MqttCallback(client, topicFilters, qos);client.setCallback(callback);client.connect(options);for (int i = 0; i < total; i++) {String msg = "Hello MQTT " + i;MqttMessage message = new MqttMessage(msg.getBytes(StandardCharsets.UTF_8));message.setQos(1);System.out.printf("Prepare to publish message %d%n", i);client.publish(pubTopic, message);System.out.printf("Published message %d%n", i);TimeUnit.SECONDS.sleep(3);}TimeUnit.SECONDS.sleep(3);client.disconnect();}}static class MqttCallback implements MqttCallbackExtended {private final MqttClient client;private final String[] topicFilters;private final int[] qos;public MqttCallback(MqttClient client, String[] topicFilters, int[] qos) {this.client = client;this.topicFilters = topicFilters;this.qos = qos;}public void messageArrived(String topic, MqttMessage message) {System.out.printf("Message arrived, topic=%s, QoS=%d, Dup=%s, Retained=%s, content=[%s]%n",topic, message.getQos(), message.isDuplicate(), message.isRetained(),new String(message.getPayload(), StandardCharsets.UTF_8));}public void connectionLost(Throwable cause) {System.out.println("connectionLost: " + cause.getMessage());}@Overridepublic void connectComplete(boolean reconnect, String serverURI) {System.out.println(reconnect ? "Reconnected" : "Connected" + " to " + serverURI);try {client.subscribe(topicFilters, qos);System.out.printf("Subscribed %d topics%n", topicFilters.length);} catch (MqttException e) {e.printStackTrace();}}public void deliveryComplete(IMqttDeliveryToken token) {System.out.printf("Delivery completed: packet-id=%d%n", token.getMessageId());}}}
package com.tencenttdmq.mqtt.demo;import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;import org.eclipse.paho.client.mqttv3.MqttClient;import org.eclipse.paho.client.mqttv3.MqttConnectOptions;import org.eclipse.paho.client.mqttv3.MqttException;import org.eclipse.paho.client.mqttv3.MqttMessage;import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;import javax.net.ssl.SSLContext;import javax.net.ssl.SSLSocketFactory;import java.nio.charset.StandardCharsets;import java.security.KeyManagementException;import java.security.NoSuchAlgorithmException;import java.util.concurrent.TimeUnit;public class QuickStartTls {public static SSLSocketFactory buildSSLSocketFactory() throws NoSuchAlgorithmException, KeyManagementException {SSLContext ctx = SSLContext.getInstance("TLS");ctx.init(null, null, null);return ctx.getSocketFactory();}public static void main(String[] args) throws MqttException, InterruptedException, NoSuchAlgorithmException, KeyManagementException {// 从MQTT控制台获取接入点:// 通过Private Link实现VPC网络打通的用户, 使用内网接入点;// 通过公网访问的用户, 确保公网安全策略允许, 程序运行机器有公网接入;String serverUri = "ssl://mqtt-xxxx.mqtt.tencenttdmq.com:8883";// 合法的Client Identifier包含 数字0-9, 小写英文字母a-z, 以及大写英文字母A-Z, 总长度为1-23个字符// 参考 https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901059String clientId = "ClientQuickStartTls";// 在控制台 --> 认证 Tab页创建账户, 复制用户名和密码String username = "YOUR_USERNAME";String password = "YOUR_PASSWORD";// 确认一级主题 "home" 在MQTT控制台已经创建String topic = "home/test";String[] topicFilters = new String[]{"home/test"};int[] qos = new int[]{1};int total = 16;try (MqttClient client = new MqttClient(serverUri, clientId, new MemoryPersistence())) {client.setTimeToWait(3000);MqttConnectOptions options = new MqttConnectOptions();options.setUserName(username);options.setPassword(password.toCharArray());options.setSocketFactory(buildSSLSocketFactory());options.setHttpsHostnameVerificationEnabled(false);options.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1_1);options.setCleanSession(true);options.setAutomaticReconnect(true);client.connect(options);client.setCallback(new MqttCallback(client, topicFilters, qos));for (int i = 0; i < total; i++) {String msg = "Hello MQTT " + i;MqttMessage message = new MqttMessage(msg.getBytes());message.setQos(1);System.out.printf("Prepare to publish message %d%n", i);client.publish(topic, message);System.out.printf("Published message %d%n", i);TimeUnit.SECONDS.sleep(1);}TimeUnit.SECONDS.sleep(3);client.disconnect();}}static class MqttCallback implements MqttCallbackExtended {private final MqttClient client;private final String[] topicFilters;private final int[] qos;public MqttCallback(MqttClient client, String[] topicFilters, int[] qos) {this.client = client;this.topicFilters = topicFilters;this.qos = qos;}public void messageArrived(String topic, MqttMessage message) {System.out.printf("Message arrived, topic=%s, QoS=%d content=[%s]%n", topic, message.getQos(),new String(message.getPayload(), StandardCharsets.UTF_8));}public void connectionLost(Throwable cause) {System.out.println("connectionLost: " + cause.getMessage());}@Overridepublic void connectComplete(boolean reconnect, String serverURI) {System.out.println(reconnect ? "Reconnected" : "Connect" + serverURI);try {client.subscribe(topicFilters, qos);System.out.printf("Subscribed %d topics%n", topicFilters.length);} catch (MqttException e) {e.printStackTrace();}}public void deliveryComplete(IMqttDeliveryToken token) {System.out.printf("Delivery completed: packet-id=%d%n", token.getMessageId());}}}
参数 | 说明 |
pubTopic | 待发布消息的目标主题。 |
topicFilters | 一个或者多个订阅表达式。 |
qos | 服务质量数组,数组长度必须与Topic Filters数据保持一致。 |
serverUri | MQTT实例接入点,在控制台目标集群基本信息 > 接入信息模块复制。位置如下图所示。 标准接入点为: tcp://mqtt-xxx.mqtt.tencenttdmq.com:1883 TLS接入点为: ssl://mqtt-xxx.mqtt.tencenttdmq.com:8883 标准WebSocket接入点: ws://mqtt-xxx.mqtt.tencenttdmq.com:80/mqtt TLS WebSocket接入点: wss://mqtt-xxx.mqtt.tencenttdmq.com:443/mqtt ![]() |
clientId | 设备唯一标识符: 例如车辆车架号 VIN、产品序列号等。 |
username | 连接用户名,在控制台集群详情页认证管理页页面复制。 ![]() |
password | 连接用户名匹配的密码,在控制台集群详情页认证管理页页面复制。 |