【物聯(lián)網(wǎng)】mqtt初體驗(yàn)
文章目錄
- 安裝EMQX
- java集成
- 添加依賴
- mqtt配置參數(shù)
- 發(fā)布組件
- 訂閱組件
- 測試接口
- 接口測試
最近在了解物聯(lián)網(wǎng)云平臺(tái)方面的知識(shí),接觸了mqtt協(xié)議,只看書籍難免有些枯燥,所以直接試驗(yàn)一下,便于鞏固理論知識(shí)。
broker服務(wù)器操作系統(tǒng):centos7
broker服務(wù)程序:EMQX
虛擬機(jī)IP地址:192.168.89.82
安裝EMQX
在自己的虛擬機(jī)環(huán)境下進(jìn)行安裝,按照EMQX官方資料操作即可,步驟如下:
[root@centos7-82 ~]# cd /usr/src
[root@centos7-82 src]# wget https://www.emqx.com/zh/downloads/broker/5.0.17/emqx-5.0.17-el7-amd64.tar.gz
[root@centos7-82 src]# mkdir -p emqx
[root@centos7-82 src]# tar -zxvf emqx-5.0.17-el7-amd64.tar.gz -C emqx
[root@centos7-82 src]# ./emqx/bin/emqx start
啟動(dòng)后,控制臺(tái)日志如下:

可以看下emqx端口:

瀏覽器訪問地址如下:http://192.168.89.82:18083/ 默認(rèn)的用戶名密碼為admin、public,第一次登錄后會(huì)首先要求修改密碼。
java集成
為了快速體驗(yàn),直接一個(gè)springboot工程里既有發(fā)布客戶端也有訂閱客戶端。
添加依賴
org.eclipse.paho
org.eclipse.paho.client.mqttv3
1.2.5
mqtt配置參數(shù)
mqtt.broker=tcp://192.168.89.82:1883
mqtt.username=admin
mqtt.password=admin
發(fā)布組件
@Component
public class PublishSample {
private static final Logger log = LoggerFactory.getLogger(PublishSample.class);
@Autowired
public MqttPropertiesConfig mqttPropertiesConfig;
public void sentMsg(String content, String clientId, String topic, int qos){
try {
MqttClient mqttClient = new MqttClient(mqttPropertiesConfig.getBroker(), clientId, new MemoryPersistence());
// 連接參數(shù)
MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
// 設(shè)置用戶名和密碼
mqttConnectOptions.setUserName(mqttPropertiesConfig.getUsername());
mqttConnectOptions.setPassword(mqttPropertiesConfig.getPassword().toCharArray());
mqttConnectOptions.setConnectionTimeout(60);
mqttConnectOptions.setKeepAliveInterval(60);
// 連接
mqttClient.connect(mqttConnectOptions);
// 創(chuàng)建消息并設(shè)置 QoS
MqttMessage message = new MqttMessage(content.getBytes());
message.setQos(qos);
// 發(fā)布消息
mqttClient.publish(topic, message);
log.info("Message published");
log.info("topic: {}", topic);
log.info("message content: {}", content);
// 關(guān)閉連接
mqttClient.disconnect();
// 關(guān)閉客戶端
mqttClient.close();
} catch (MqttException e) {
throw new RuntimeException(e);
}
}
}
訂閱組件
@Component
public class SubscribeSample {
private static final Logger log = LoggerFactory.getLogger(SubscribeSample.class);
@Autowired
public MqttPropertiesConfig mqttPropertiesConfig;
public void subTest(String clientId, String topic, int qos){
try {
MqttClient client = new MqttClient(mqttPropertiesConfig.getBroker(), clientId, new MemoryPersistence());
// 連接參數(shù)
MqttConnectOptions options = new MqttConnectOptions();
options.setUserName(mqttPropertiesConfig.getUsername());
options.setPassword(mqttPropertiesConfig.getPassword().toCharArray());
options.setConnectionTimeout(60);
options.setKeepAliveInterval(60);
// 設(shè)置回調(diào)
client.setCallback(new MqttCallback() {
public void connectionLost(Throwable cause) {
log.info("connectionLost: {}", cause.getMessage());
}
public void messageArrived(String topic, MqttMessage message) {
log.info("topic: {}", topic);
log.info("Qos: {}", message.getQos());
log.info("message content: {}", new String(message.getPayload()));
}
public void deliveryComplete(IMqttDeliveryToken token) {
log.info("deliveryComplete---------{}", token.isComplete());
}
});
client.connect(options);
client.subscribe(topic, qos);
} catch (Exception e) {
e.printStackTrace();
}
}
}
測試接口
@RestController
public class TestController {
private static final Logger log = LoggerFactory.getLogger(TestController.class);
@Autowired
public PublishSample publishSample;
@Autowired
public SubscribeSample subscribeSample;
/**
* 發(fā)布接口
* @param content
* @return
*/
@RequestMapping("sent")
public String sent(String content){
String clientId = "lizx_pub_client1";
int qos = 0;
String topic = "mqtt/test";
log.info("sent");
publishSample.sentMsg(content, clientId, topic, qos);
return "success";
}
/**
* 模擬訂閱客戶端1
* @return
*/
@RequestMapping("sub1")
public String sub1(){
String clientId = "lizx_sub_client1";
int qos = 0;
String topic = "mqtt/test";
log.info("sub1");
subscribeSample.subTest(clientId, topic, qos);
return "success";
}
/**
* 模擬訂閱客戶端2
* @return
*/
@RequestMapping("sub2")
public String sub2(){
String clientId = "lizx_sub_client2";
int qos = 0;
String topic = "mqtt/test";
log.info("sub2");
subscribeSample.subTest(clientId, topic, qos);
return "success";
}
}
接口測試
直接簡單瀏覽器兩個(gè)標(biāo)簽頁分別輸入:
http://127.0.0.1:8080/sub1 http://127.0.0.1:8080/sub2 然后再打開一個(gè)標(biāo)簽頁輸入:
http://127.0.0.1:8080/sent?content=Hello%20MQTT
后臺(tái)日志如下:
2023-02-14 16:19:39.970 INFO 5708 --- [nio-8080-exec-1] com.lizx.emqx.client.web.TestController : sub1
2023-02-14 16:19:46.651 INFO 5708 --- [nio-8080-exec-2] com.lizx.emqx.client.web.TestController : sub2
2023-02-14 16:19:50.384 INFO 5708 --- [nio-8080-exec-3] com.lizx.emqx.client.web.TestController : sent
2023-02-14 16:19:50.697 INFO 5708 --- [nio-8080-exec-3] c.lizx.emqx.client.sample.PublishSample : Message published
2023-02-14 16:19:50.698 INFO 5708 --- [nio-8080-exec-3] c.lizx.emqx.client.sample.PublishSample : topic: mqtt/test
2023-02-14 16:19:50.703 INFO 5708 --- [izx_sub_client2] c.l.emqx.client.sample.SubscribeSample : topic: mqtt/test
2023-02-14 16:19:50.703 INFO 5708 --- [izx_sub_client1] c.l.emqx.client.sample.SubscribeSample : topic: mqtt/test
2023-02-14 16:19:50.705 INFO 5708 --- [nio-8080-exec-3] c.lizx.emqx.client.sample.PublishSample : message content: Hello MQTT
2023-02-14 16:19:50.705 INFO 5708 --- [izx_sub_client2] c.l.emqx.client.sample.SubscribeSample : Qos: 0
2023-02-14 16:19:50.705 INFO 5708 --- [izx_sub_client1] c.l.emqx.client.sample.SubscribeSample : Qos: 0
2023-02-14 16:19:50.706 INFO 5708 --- [izx_sub_client2] c.l.emqx.client.sample.SubscribeSample : message content: Hello MQTT
2023-02-14 16:19:50.706 INFO 5708 --- [izx_sub_client1] c.l.emqx.client.sample.SubscribeSample : message content: Hello MQTT
本文僅代表作者觀點(diǎn),版權(quán)歸原創(chuàng)者所有,如需轉(zhuǎn)載請(qǐng)?jiān)谖闹凶⒚鱽碓醇白髡呙帧?/p>
免責(zé)聲明:本文系轉(zhuǎn)載編輯文章,僅作分享之用。如分享內(nèi)容、圖片侵犯到您的版權(quán)或非授權(quán)發(fā)布,請(qǐng)及時(shí)與我們聯(lián)系進(jìn)行審核處理或刪除,您可以發(fā)送材料至郵箱:service@tojoy.com






