柚子快報激活碼778899分享:物聯(lián)網(wǎng) MQTT-Java
柚子快報激活碼778899分享:物聯(lián)網(wǎng) MQTT-Java
背景:最近接觸到MQTT協(xié)議用來和設(shè)備進行數(shù)據(jù)對接。先將設(shè)備的信息接入網(wǎng)關(guān),網(wǎng)關(guān)配置采集的信息后發(fā)送到MQTT服務(wù)器中,我們在啟動java程序來拉取MQTT topic中的數(shù)據(jù)
MQTT服務(wù)器使用 EMQX(Docker 部署指南 | EMQX 5.1 文檔) 文檔很全,我這里使用開源版本。
MQTT客戶端工具使用 MQTTX(MQTTX: Your All-in-one MQTT Client Toolbox)
使用Java語言,完成MQTT的學(xué)習(xí)和測試,
1.引入pom依賴
2.編寫測試代碼
@Test
void testSub() throws MqttException, InterruptedException {
String clientId = "client-testSub2";
MqttConnectOptions options = new MqttConnectOptions();
options.setServerURIs(new String[]{"tcp://192.168.**.**:1883"});
options.setUserName("");
options.setPassword("".toCharArray());
options.setConnectionTimeout(10);
options.setAutomaticReconnect(true);
options.setCleanSession(false);
MqttClient mqttClient = new MqttClient(options.getServerURIs()[0], clientId, new MemoryPersistence());
mqttClient.connect(options);
mqttClient.setCallback(new MqttCallback() {
@Override
public void connectionLost(Throwable cause) {
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
System.out.println(topic + "收到messageId=" + message.getId() + ",payload=" + new String(message.getPayload()));
String msg = new String(message.getPayload());
if ("1".equals(msg)) {
throw new RuntimeException("模擬消息處理失敗");
}
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
}
});
mqttClient.subscribe(new String[]{"topic/testPublish"}, new int[] {1});
while(true) {
Thread.sleep(30 * 1000);
}
}
3.假設(shè)MQTT服務(wù)端已安裝完畢,MQTTX已安裝完畢,使用MQTTX連接MQTT服務(wù)端
4.測試retain消息
mqtt協(xié)議設(shè)計retain消息是用來,當(dāng)一個訂閱者訂閱了一個topic后,這個topic的retain消息就可以接收到,如果訂閱者id不變且保留session,當(dāng)不重新訂閱topic時不會收到retain消息,如果重新訂閱則會再次收到retain消息。注意:一個topic只能有一條retain消息,新發(fā)的會替代舊的;當(dāng)不需要保留retain消息時,發(fā)送一條payload為空的消息就可以了;這里無論qos是什么都一樣
retain消息:用來針對數(shù)據(jù)采集間隔時間長的情景,這樣新上線的訂閱者就可以得到上一次設(shè)備的數(shù)據(jù)。
4.1 發(fā)送retain消息
4.2 啟動Java程序
當(dāng)關(guān)閉Java程序,再次啟動時,它會再次消費
發(fā)送一個內(nèi)容為空的Retain消息
消費者會再次接收一個內(nèi)容為空的Retain消息
再次啟動就不會再收到Retain消息了
5.訂閱消息及指定等級
QOS=0? 至多一次,消息發(fā)送接收依賴TCP
QOS=1?承諾至少會有一次發(fā)送給接收者
QOS=2?保證消息僅僅傳送到目的地一次
結(jié)論:訂閱消息的處理邏輯要看發(fā)送消息的qos及訂閱消息的qos,取最下的那個執(zhí)行。
當(dāng)相同的clientId訂閱同一個主題時,消息將會以輪詢的方式被每個消費者消費
5.1 發(fā)送qos=0 接收qos=0 1 2
5.1.1 發(fā)送消費會出現(xiàn)異常的消息,也只會處理一次
5.2 發(fā)送qos=1 2 接收qos=0
效果等同 發(fā)送qos=0 接收qos=0 1 2
5.2.1 發(fā)送消費會出現(xiàn)異常的消息,也只會處理一次
5.3 發(fā)送qos=1 接收qos=1 2
發(fā)送一次會導(dǎo)致消費失敗的消息
5.3.1 消費失敗后會再次收到失敗的消息,重復(fù)消費
5.4 發(fā)送qos=2 接收 2
效果和發(fā)送qos=1 接收qos=1 2的一致
5.4.1 消費失敗后會再次收到失敗的消息,重復(fù)消費
6.cleanSession
結(jié)論:無論qos為多少,cleanSession為true時,消費端下線時的消息都不能被消費。cleanSession為false時,mqtt broker沒有自動清除session時,消費端下線時的消息可以被消費。
7. 異常
7.1 消費能觸發(fā)消費異常的消息
當(dāng)發(fā)送qos=0 接收qos=0,先發(fā)送一條會觸發(fā)消費異常的消息,程序啟動后會觸發(fā)異常,程序會自動退出。如果程序先啟動起來,再收到能觸發(fā)異常的消息,程序會正常執(zhí)行
柚子快報激活碼778899分享:物聯(lián)網(wǎng) MQTT-Java
相關(guān)鏈接
本文內(nèi)容根據(jù)網(wǎng)絡(luò)資料整理,出于傳遞更多信息之目的,不代表金鑰匙跨境贊同其觀點和立場。
轉(zhuǎn)載請注明,如有侵權(quán),聯(lián)系刪除。