MQTT 是一种基于客户端服务端架构的发布/订阅模式的消息传输协议。它的设计思想是轻巧、开放、 简单、规范,易于实现。这些特点使得它对很多场景来说都是很好的选择,特别是对于受限的环境如机器与机器的通信(M2M)以及物联网环境(IoT)
关键词解释:
subscribe:订阅某个主题,未来这个主题收到的消息都会发给你
publish:发送消息给某个主题,只要是订阅了这个主题的客户端,都会收到这条消息
Payload:发送的数据,一般数据格式为string、hex(16进制字符串)、json,这个由前后端商定
QoS: 分为三个等级,0:最多交付一次, 1:至少交付一次, 2:只交付一次
其中,使用 QoS 0 可能丢失消息,使用 QoS 1 可以保证收到消息,但消息可能重复,使用 QoS 2 可以保证消息既不丢失也不重复。QoS 等级从低到高,不仅意味着消息可靠性的提升,也意味着传输复杂程度的提升。
Retain:遗嘱消息,客户端订阅某个主题后,会将这个主题之前最近一次的消息发送过来。
适用场景:比如某个温度传感器,设置的每隔1个小时发送一次温度数据,刚好某个app要展示这个温度传感器数据,打开app的时候,刚好错过温度传感器上次一发送数据,正常情况下需要再等待1个小时,但是如果设置消息为Retain,app打开之后,依旧可以收到温度传感器上一次发送的数据
import mqtt from "../../utils/mqtt4.1.0.js"
/**mqtt连接,不支持发送16进制buffer数据,暂弃用*/
connectmqtt() {
var that = this
const options = {
keepalive: 30,
//protocolVersion: 4, //MQTT V3.1.1
connectTimeout: 4000,
clientId: 'sdfkskr43',//这个地方最好用一个随机字符串方法生成
//port: 8084,
username: '',
password: '',
}
//此处需要用wxs,请注意!!!
client = mqtt.connect(`wxs://xx.com/mqtt`, options)
client.on('connect', (e) => {
console.log('服务器连接成功', e)
client.subscribe('abc', { qos: 2 }, function (err) {
if (!err) {
console.log('订阅成功', err)
}
})
})
//信息监听
client.on('message', function (topic, massage) {
console.log('收到' + this.ab2hex(massage))
})
client.on('reconnect', (error) => {
console.log('正在重连', error)
})
client.on('error', (error) => {
console.log('连接失败', error)
})
// 连接断开后触发的回调
client.on("close", function () {
console.log("已断开连接")
});
// 客户端脱机下线触发回调
client.on("offline", function () {
console.log("您已断开连接,请检查网络")
});
//当客户端发送任何数据包时发出。这包括publish()以及MQTT用于管理订阅和连接的包
client.on("packetsend", (packet) => {
//console.log("客户端已发出报文", packet);
});
},
//mqtt发送数据,只支持string,不支持16进制buffer
mqttSend(msg) {
client.publish('abc', msg, { qos: 2 }, function (err) {
console.log('send', err)
})
},
/**将ArrayBuffer转换成字符串*/
ab2hex(buffer){
var hexArr = Array.prototype.map.call(
new Uint8Array(buffer),
function (bit) {
return ('00' + bit.toString(16)).slice(-2)
}
)
return hexArr.join('');
}
var paho = require("../../utils/paho-mqtt-min.js");
var client
/**paho连接,支持发送16进制buffer数据*/
connectPaho() {
client = new paho.Client('wss://xx.com/mqtt', 'dfsdwfsdfsd'); //后面这个字符串可以用一个随机字符串方法随机生成
var connectOptions = {
//invocationContext: { host: "broker.emqx.io", port: 8084, clientId: 'xcx_wdc_' + util.randomWord(false, 43) },
timeout: 10,
//useSSL: true,
cleanSession: false, //实现QoS>0必须设置为false,代表不清除,是持久会话
keepAliveInterval: 5,
reconnect: false, //意外断开之后会重连,第一次1秒后重连,往后每次翻倍直到2分钟,后续保持2分钟
mqttVersion: 4,
userName: '',
password: '',
onSuccess: function () {
console.log('连接成功');
client.onMessageArrived = function (msg) {
//所有接收的消息都在这里,相关消息的业务处理都写在这里
var message = this.ab2hex(msg.payloadBytes)
console.log(message)
}
client.subscribe('abc', { qos: 2 });
},
onFailure: function (option) {
console.log('fail', option)
}
}
client.connect(connectOptions)
},
//paho发送数据,支持发送16进制buffer
pahoSend(modbus) {
var message = new paho.Message(this.string2buffer(modbus))
message.destinationName = 'abc'
message.qos = 2
message.retained = false
client.send(message)
},
/**将ArrayBuffer转换成字符串*/
ab2hex(buffer){
var hexArr = Array.prototype.map.call(
new Uint8Array(buffer),
function (bit) {
return ('00' + bit.toString(16)).slice(-2)
}
)
return hexArr.join('');
},
/**将16进制转化为ArrayBuffer*/
string2buffer(str){
return new Uint8Array(str.match(/[\da-f]{2}/gi).map(function (h) {
return parseInt(h, 16)
})).buffer
},
//切换后台断开
onHide() {
client.disconnect()
},
//切换前台重连
onShow() {
this.connectPaho()
},
server {
listen 443 ssl;
server_name localhost;
ssl on;
ssl_certificate /var/lib/nginx/ssl/xx.pem;
ssl_certificate_key /var/lib/nginx/ssl/xx.key;
ssl_session_timeout 5m;
ssl_session_cache shared:SSL:10m;
ssl_protocols TLSv1 TLSv1.1 TLSv1.2 SSLv2 SSLv3;
ssl_ciphers ALL:!ADH:!EXPORT56:RC4+RSA:+HIGH:+MEDIUM:+LOW:+SSLv2:+EXP;
ssl_prefer_server_ciphers on;
ssl_verify_client off;
location /mqtt {
proxy_pass https://localhost:8084;
proxy_http_version 1.1;
# 最后两行的set_header表示将http协议头升级为websocket协议
proxy_set_header Sec-WebSocket-Protocol mqtt;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
}
}
nginx这样配置之后,小程序mqtt连接的端口就不需要写了,自动会访问443端口下的/mqtt,然后转到8084端口,这样就正常了,还有问题可以后面留言
因为大部分前端一般不做后端工作,所以会出现小程序连接mqtt失败、发送消息失败等问题。但是无法确认是前端问题还是后端问题,这里推荐EMQX官方提供的测试工具和公共测试服务器,当使用后端配置的mqtt服务,调试出现问题的时候,可以用来测试,如果是正常的,大概率是你们后端配置的mqtt有问题了。
#mqtt测试工具
https://www.emqx.com/zh/products/mqttx
#mqtt公共测试服务器
https://www.emqx.com/zh/mqtt/public-mqtt5-broker