Java SDK

最近更新时间:2025-06-17 14:49:01

我的收藏

功能概述

Eclipse Paho Java Client 是用 Java 编写的 MQTT 客户端库(MQTT Java Client),可用于 JVM 或其他 Java 兼容平台(例如 Android)。
Eclipse Paho Java Client 提供了 MqttAsyncClient 和 MqttClient 异步和同步 API。

云资源准备

请您先参见 创建资源 操作步骤完成云资源准备。

环境准备

通过 Maven 安装 Paho Java

MQTT 5 Paho SDK
MQTT 3.1.1 Paho SDK
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.mqttv5.client</artifactId>
<version>1.2.5</version>
</dependency>
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.5</version>
</dependency>

示例代码

MQTT 5
MQTT 5 TLS
MQTT 3.1.1
MQTT 3.1.1 TLS
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeUnit;

import java.nio.ByteBuffer;
import org.eclipse.paho.mqttv5.client.IMqttToken;
import org.eclipse.paho.mqttv5.client.MqttCallback;
import org.eclipse.paho.mqttv5.client.MqttClient;
import org.eclipse.paho.mqttv5.client.MqttConnectionOptions;
import org.eclipse.paho.mqttv5.client.MqttDisconnectResponse;
import org.eclipse.paho.mqttv5.client.persist.MemoryPersistence;
import org.eclipse.paho.mqttv5.common.MqttException;
import org.eclipse.paho.mqttv5.common.MqttMessage;
import org.eclipse.paho.mqttv5.common.packet.MqttProperties;

public class QuickStart {
public static void main(String[] args) throws MqttException, InterruptedException {
// 从MQTT控制台获取接入点:
// 通过Private Link实现VPC网络打通的用户, 使用内网接入点;
// 通过公网访问的用户, 确保公网安全策略允许, 程序运行机器有公网接入;
String serverUri = "tcp://mqtt-xxx.mqtt.tencenttdmq.com:1883";

// 合法的Client Identifier包含 数字0-9, 小写英文字母a-z, 以及大写英文字母A-Z, 总长度为1-23个字符
// 参考 https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901059
String clientId = "QuickStart";

// 在控制台 --> 认证 Tab页创建账户, 复制用户名和密码
String username = "YOUR_USERNAME";
String password = "YOUR_PASSWORD";

// MQTT topic
String pubTopic = "home/test";
String[] topicFilters = new String[]{pubTopic, "home/#", "home/+"};
int[] qos = new int[]{1, 1, 1};

int total = 16;

MqttClient client = new MqttClient(serverUri, clientId, new MemoryPersistence());
client.setTimeToWait(3000);
MqttConnectionOptions options = new MqttConnectionOptions();
options.setUserName(username);
options.setPassword(password.getBytes(StandardCharsets.UTF_8));
options.setCleanStart(true);
options.setAutomaticReconnect(true);

client.setCallback(new MqttCallback() {
@Override
public void disconnected(MqttDisconnectResponse response) {
System.out.println("Disconnected: " + response.getReasonString());
}

@Override
public void mqttErrorOccurred(MqttException e) {
e.printStackTrace();
}

@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
byte[] payload = message.getPayload();
String content;
if (4 == payload.length) {
ByteBuffer buf = ByteBuffer.wrap(payload);
content = String.valueOf(buf.getInt());
} else {
content = new String(payload, StandardCharsets.UTF_8);
}
System.out.printf("Message arrived, topic=%s, QoS=%d content=[%s]%n",
topic, message.getQos(), content);
}

@Override
public void deliveryComplete(IMqttToken token) {

}

@Override
public void connectComplete(boolean reconnect, String serverURI) {
System.out.println(reconnect ? "Reconnected" : "Connected" + " to " + serverURI);
try {
// Subscribe
client.subscribe(topicFilters, qos);
} catch (MqttException e) {
e.printStackTrace();
}
}

@Override
public void authPacketArrived(int i, MqttProperties properties) {
}
});

client.connect(options);

for (int i = 0; i < total; i++) {
String msg = "Hello MQTT " + i;
MqttMessage message = new MqttMessage(msg.getBytes(StandardCharsets.UTF_8));
message.setQos(1);
System.out.printf("Prepare to publish message %d%n", i);
client.publish(pubTopic, message);
System.out.printf("Published message %d%n", i);
TimeUnit.SECONDS.sleep(3);
}
TimeUnit.SECONDS.sleep(3);

client.disconnect();
}
}
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
import java.util.List;
import java.util.concurrent.TimeUnit;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLSocketFactory;
import org.eclipse.paho.mqttv5.client.IMqttToken;
import org.eclipse.paho.mqttv5.client.MqttCallback;
import org.eclipse.paho.mqttv5.client.MqttClient;
import org.eclipse.paho.mqttv5.client.MqttConnectionOptions;
import org.eclipse.paho.mqttv5.client.MqttDisconnectResponse;
import org.eclipse.paho.mqttv5.client.persist.MemoryPersistence;
import org.eclipse.paho.mqttv5.common.MqttException;
import org.eclipse.paho.mqttv5.common.MqttMessage;
import org.eclipse.paho.mqttv5.common.packet.MqttProperties;
import org.eclipse.paho.mqttv5.common.packet.UserProperty;

public class QuickStartTls {

public static SSLSocketFactory buildSSLSocketFactory() throws NoSuchAlgorithmException, KeyManagementException {
SSLContext ctx = SSLContext.getInstance("TLS");
ctx.init(null, null, null);
return ctx.getSocketFactory();
}

public static void main(String[] args) throws Exception {
// 从MQTT控制台获取接入点:
// 通过Private Link实现VPC网络打通的用户, 使用内网接入点;
// 通过公网访问的用户, 确保公网安全策略允许, 程序运行机器有公网接入;
String serverUri = "ssl://mqtt-xxx.mqtt.tencenttdmq.com:8883";

// 合法的Client Identifier包含 数字0-9, 小写英文字母a-z, 以及大写英文字母A-Z, 总长度为1-23个字符
// 参考 https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901059
String clientId = "QuickStartTls";

// 在控制台 --> 认证 Tab页创建账户, 复制用户名和密码
String username = "YOUR_USERNAME";
String password = "YOUR_PASSWORD";

// MQTT topic
String topicName = "home/test";
String[] topicFilters = new String[]{topicName, "home/#", "home/+"};
int[] qos = new int[]{1, 1, 1};

int total = 16;

MqttClient client = new MqttClient(serverUri, clientId, new MemoryPersistence());
client.setTimeToWait(3000);
MqttConnectionOptions options = new MqttConnectionOptions();
options.setSocketFactory(buildSSLSocketFactory());
options.setHttpsHostnameVerificationEnabled(false);
options.setUserName(username);
options.setPassword(password.getBytes(StandardCharsets.UTF_8));
options.setCleanStart(true);
options.setAutomaticReconnect(true);

client.setCallback(new MqttCallback() {
@Override
public void disconnected(MqttDisconnectResponse response) {
System.out.println("Disconnected: " + response.getReasonString());
}

@Override
public void mqttErrorOccurred(MqttException e) {
e.printStackTrace();
}

@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
byte[] payload = message.getPayload();
String content;
if (4 == payload.length) {
ByteBuffer buf = ByteBuffer.wrap(payload);
content = String.valueOf(buf.getInt());
} else {
content = new String(payload, StandardCharsets.UTF_8);
}
System.out.printf("Message arrived, topic=%s, QoS=%d content=[%s]%n",
topic, message.getQos(), content);
}

@Override
public void deliveryComplete(IMqttToken token) {

}

@Override
public void connectComplete(boolean reconnect, String serverURI) {
System.out.println(reconnect ? "Reconnected" : "Connected" + " to " + serverURI);
try {
// Subscribe
IMqttToken token = client.subscribe(topicFilters, qos);
int[] reasonCodes = token.getReasonCodes();
for (int i = 0; i < reasonCodes.length; i++) {
System.out.printf("Subscribed to topic %s with QoS=%d, Granted-QoS: %d%n",
topicFilters[i], qos[i], reasonCodes[i]);
}

if (token.isComplete()) {
List<UserProperty> userProperties = token.getResponseProperties().getUserProperties();
printUserProperties(userProperties);
}
} catch (MqttException e) {
e.printStackTrace();
}
}

@Override
public void authPacketArrived(int i, MqttProperties properties) {
}
});

client.connect(options);

for (int i = 0; i < total; i++) {
String msg = "Hello MQTT " + i;
MqttMessage message = new MqttMessage(msg.getBytes(StandardCharsets.UTF_8));
message.setQos(1);
System.out.printf("Prepare to publish message %d%n", i);
client.publish(topicName, message);
System.out.printf("Published message %d%n", i);
TimeUnit.SECONDS.sleep(3);
}
TimeUnit.SECONDS.sleep(3);

client.disconnect();
}

static void printUserProperties(List<UserProperty> userProperties) {
if (null != userProperties) {
for (UserProperty userProperty : userProperties) {
System.out.printf("User property: %s = %s%n", userProperty.getKey(), userProperty.getValue());
}
}
}
}


import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeUnit;

import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

public class QuickStart {
public static void main(String[] args) throws MqttException, InterruptedException {
// 从MQTT控制台获取接入点:
// 通过Private Link实现VPC网络打通的用户, 使用内网接入点;
// 通过公网访问的用户, 确保公网安全策略允许, 程序运行机器有公网接入;
String serverUri = "tcp://mqtt-xxx.mqtt.tencenttdmq.com:1883";

// 合法的Client Identifier包含 数字0-9, 小写英文字母a-z, 以及大写英文字母A-Z, 总长度为1-23个字符
// 参考 https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901059
String clientId = "QuickStart";

// 在控制台 --> 认证 Tab页创建账户, 复制用户名和密码
String username = "YOUR_USERNAME";
String password = "YOUR_PASSWORD";

// 确认一级主题 "home" 在MQTT控制台已经创建
String pubTopic = "home/test";
String[] topicFilters = new String[]{pubTopic, "home/#", "home/+"};
int[] qos = new int[]{1, 1, 1};

int total = 16;

try (MqttClient client = new MqttClient(serverUri, clientId, new MemoryPersistence())) {
client.setTimeToWait(3000);
MqttConnectOptions options = new MqttConnectOptions();
options.setUserName(username);
options.setPassword(password.toCharArray());
options.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1_1);
options.setCleanSession(true);
options.setAutomaticReconnect(true);

MqttCallback callback = new MqttCallback(client, topicFilters, qos);
client.setCallback(callback);

client.connect(options);

for (int i = 0; i < total; i++) {
String msg = "Hello MQTT " + i;
MqttMessage message = new MqttMessage(msg.getBytes(StandardCharsets.UTF_8));
message.setQos(1);
System.out.printf("Prepare to publish message %d%n", i);
client.publish(pubTopic, message);
System.out.printf("Published message %d%n", i);
TimeUnit.SECONDS.sleep(3);
}
TimeUnit.SECONDS.sleep(3);

client.disconnect();
}
}

static class MqttCallback implements MqttCallbackExtended {

private final MqttClient client;

private final String[] topicFilters;
private final int[] qos;

public MqttCallback(MqttClient client, String[] topicFilters, int[] qos) {
this.client = client;
this.topicFilters = topicFilters;
this.qos = qos;
}

public void messageArrived(String topic, MqttMessage message) {
System.out.printf("Message arrived, topic=%s, QoS=%d, Dup=%s, Retained=%s, content=[%s]%n",
topic, message.getQos(), message.isDuplicate(), message.isRetained(),
new String(message.getPayload(), StandardCharsets.UTF_8));
}

public void connectionLost(Throwable cause) {
System.out.println("connectionLost: " + cause.getMessage());
}

@Override
public void connectComplete(boolean reconnect, String serverURI) {
System.out.println(reconnect ? "Reconnected" : "Connected" + " to " + serverURI);
try {
client.subscribe(topicFilters, qos);
System.out.printf("Subscribed %d topics%n", topicFilters.length);
} catch (MqttException e) {
e.printStackTrace();
}
}

public void deliveryComplete(IMqttDeliveryToken token) {
System.out.printf("Delivery completed: packet-id=%d%n", token.getMessageId());
}
}
}
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLSocketFactory;
import java.nio.charset.StandardCharsets;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
import java.util.concurrent.TimeUnit;

public class QuickStartTls {

public static SSLSocketFactory buildSSLSocketFactory() throws NoSuchAlgorithmException, KeyManagementException {
SSLContext ctx = SSLContext.getInstance("TLS");
ctx.init(null, null, null);
return ctx.getSocketFactory();
}

public static void main(String[] args) throws MqttException, InterruptedException, NoSuchAlgorithmException, KeyManagementException {

// 从MQTT控制台获取接入点:
// 通过Private Link实现VPC网络打通的用户, 使用内网接入点;
// 通过公网访问的用户, 确保公网安全策略允许, 程序运行机器有公网接入;
String serverUri = "ssl://mqtt-xxxx.mqtt.tencenttdmq.com:8883";
// 合法的Client Identifier包含 数字0-9, 小写英文字母a-z, 以及大写英文字母A-Z, 总长度为1-23个字符
// 参考 https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901059
String clientId = "ClientQuickStartTls";

// 在控制台 --> 认证 Tab页创建账户, 复制用户名和密码
String username = "YOUR_USERNAME";
String password = "YOUR_PASSWORD";

// 确认一级主题 "home" 在MQTT控制台已经创建
String topic = "home/test";

String[] topicFilters = new String[]{"home/test"};
int[] qos = new int[]{1};

int total = 16;

try (MqttClient client = new MqttClient(serverUri, clientId, new MemoryPersistence())) {
client.setTimeToWait(3000);
MqttConnectOptions options = new MqttConnectOptions();
options.setUserName(username);
options.setPassword(password.toCharArray());
options.setSocketFactory(buildSSLSocketFactory());
options.setHttpsHostnameVerificationEnabled(false);
options.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1_1);
options.setCleanSession(true);
options.setAutomaticReconnect(true);
client.connect(options);

client.setCallback(new MqttCallback(client, topicFilters, qos));

for (int i = 0; i < total; i++) {
String msg = "Hello MQTT " + i;
MqttMessage message = new MqttMessage(msg.getBytes());
message.setQos(1);
System.out.printf("Prepare to publish message %d%n", i);
client.publish(topic, message);
System.out.printf("Published message %d%n", i);
TimeUnit.SECONDS.sleep(1);
}

TimeUnit.SECONDS.sleep(3);

client.disconnect();
}
}

static class MqttCallback implements MqttCallbackExtended {

private final MqttClient client;

private final String[] topicFilters;
private final int[] qos;

public MqttCallback(MqttClient client, String[] topicFilters, int[] qos) {
this.client = client;
this.topicFilters = topicFilters;
this.qos = qos;
}

public void messageArrived(String topic, MqttMessage message) {
System.out.printf("Message arrived, topic=%s, QoS=%d content=[%s]%n", topic, message.getQos(),
new String(message.getPayload(), StandardCharsets.UTF_8));
}

public void connectionLost(Throwable cause) {
System.out.println("connectionLost: " + cause.getMessage());
}

@Override
public void connectComplete(boolean reconnect, String serverURI) {
System.out.println(reconnect ? "Reconnected" : "Connect" + serverURI);
try {
client.subscribe(topicFilters, qos);
System.out.printf("Subscribed %d topics%n", topicFilters.length);
} catch (MqttException e) {
e.printStackTrace();
}
}

public void deliveryComplete(IMqttDeliveryToken token) {
System.out.printf("Delivery completed: packet-id=%d%n", token.getMessageId());
}
}
}
参数
说明
pubTopic
待发布消息的目标主题。
消息主题可以包含多级(level), 通过'/'隔开。第一级主题(Topic)需要在控制台 Topic Tab 页创建。详情参见 Topic Names and Topic Filters
topicFilters
一个或者多个订阅表达式。
订阅表达式可以包含通配符(Wildcards),详情参见 Topic Names and Topic Filters
qos
服务质量数组,数组长度必须与 Topic Filters 数据保持一致。
最常用的 QoS 为1, 即至少投递一次(At-Least-Once),详情参见 Quality of Service levels and protocol flows
serverUri
MQTT 实例接入点,在控制台目标集群基本信息 > 接入信息模块复制。位置如下图所示。
标准接入点为: tcp://mqtt-xxx.mqtt.tencenttdmq.com:1883
TLS 接入点为: ssl://mqtt-xxx.mqtt.tencenttdmq.com:8883
标准 WebSocket 接入点: ws://mqtt-xxx.mqtt.tencenttdmq.com:80/mqtt
TLS WebSocket 接入点: wss://mqtt-xxx.mqtt.tencenttdmq.com:443/mqtt



clientId
设备唯一标识符: 例如车辆车架号 VIN、产品序列号等。
合法的 Client Identifier 包含: 数字0-9, 小写英文字母 a-z, 大写英文字母 A-Z; 总长度为1-23个字符。详情参见 Client Identifier
username
连接用户名,在控制台集群详情页认证管理页面复制。



password
连接用户名匹配的密码,在控制台集群详情页认证管理页面复制。